Skip to content

Commit

Permalink
Merge pull request #93 from gravity-technologies/enable-offer-to-cluster
Browse files Browse the repository at this point in the history
Enable Offer To Cluster
  • Loading branch information
ethanf committed Apr 15, 2023
2 parents 9076de0 + 0b99fb6 commit 9203254
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 7 deletions.
63 changes: 61 additions & 2 deletions cluster/cluster.go
@@ -1,15 +1,74 @@
package cluster

import "github.com/lirm/aeron-go/aeron/idlestrategy"
import (
"github.com/lirm/aeron-go/aeron/atomic"
"github.com/lirm/aeron-go/aeron/idlestrategy"
"github.com/lirm/aeron-go/cluster/codecs"
)

type Cluster interface {

// LogPosition returns the position the log has reached in bytes as of the current message
LogPosition() int64

// MemberId returns the unique id for the hosting member of the cluster. Useful only for debugging purposes
MemberId() int32

// Role returns the role the cluster node is playing
Role() Role

// Time returns the cluster time as time units since 1 Jan 1970 UTC
Time() int64

// TimeUnit returns the unit of time applied when timestamping and time operations
TimeUnit() codecs.ClusterTimeUnitEnum

// IdleStrategy returns the IdleStrategy which should be used by the service when it experiences back-pressure on egress,
// closing sessions, making timer requests, or any long-running actions
IdleStrategy() idlestrategy.Idler

// ScheduleTimer schedules a timer for a given deadline
// ScheduleTimer schedules a timer for a given deadline and provide a correlation id to identify the timer when it expires or
// for cancellation. This action is asynchronous and will race with the timer expiring.
//
// If the correlationId is for an existing scheduled timer then it will be rescheduled to the new deadline. However,
// it is best to generate correllationIds in a monotonic fashion and be aware of potential clashes with other
// services in the same cluster. Service isolation can be achieved by using the upper bits for service id.
//
// Timers should only be scheduled or cancelled in the context of processing
// - onSessionMessage
// - onTimerEvent
// - onSessionOpen
// - onSessionClose
// If applied to other events then they are not guaranteed to be reliable.
//
// Callers of this method should loop until the method succeeds.
//
// The cluster's idle strategy must be used in the body of the loop to allow for the clustered service to be
// shutdown if required.
//
// ScheduleTimer returns true if the event to schedule a timer request has been sent or false if back-pressure is applied
ScheduleTimer(correlationId int64, deadline int64) bool

// CancelTimer cancels a previously scheduled timer. This action is asynchronous and will race with the timer expiring.
//
// Timers should only be scheduled or cancelled in the context of processing
// - onSessionMessage
// - onTimerEvent
// - onSessionOpen
// - onSessionClose
// If applied to other events then they are not guaranteed to be reliable.
//
// Callers of this method should loop until the method succeeds.
//
// CancelTimer returns true if the event to cancel request has been sent or false if back-pressure is applied.
CancelTimer(correlationId int64) bool

// Offer a message as ingress to the cluster for sequencing. This will happen efficiently over IPC to the
// consensus module and set the cluster session as the negative value of the cluster's ServiceID
//
// Callers of this method should loop until the method succeeds.
//
// The cluster's idle strategy must be used in the body of the loop to allow for the clustered service to be
// shutdown if required
Offer(*atomic.Buffer, int32, int32) int64
}
16 changes: 16 additions & 0 deletions cluster/clustered_service.go
Expand Up @@ -8,16 +8,23 @@ import (
)

type ClusteredService interface {
// OnStart is called to initialize the service and load snapshot state, where the snapshot image can be nil if no previous snapshot exists.
//
// Note: As this can potentially be a long-running operation, the implementation should use Cluster.IdleStrategy() and
// occasionally call IdleStrategy.Idle(), especially when polling the Image returns 0
OnStart(cluster Cluster, image aeron.Image)

// OnSessionOpen notifies the clustered service that a session has been opened for a client to the cluster
OnSessionOpen(session ClientSession, timestamp int64)

// OnSessionClose notifies the clustered service that a session has been closed for a client to the cluster
OnSessionClose(
session ClientSession,
timestamp int64,
closeReason codecs.CloseReasonEnum,
)

// OnSessionMessage notifies the clustered service that a message has been received to be processed by a clustered service
OnSessionMessage(
session ClientSession,
timestamp int64,
Expand All @@ -27,14 +34,23 @@ type ClusteredService interface {
header *logbuffer.Header,
)

// OnTimerEvent notifies the clustered service that a scheduled timer has expired
OnTimerEvent(correlationId, timestamp int64)

// OnTakeSnapshot instructs the clustered service to take a snapshot and store its state to the provided aeron archive Publication.
//
// Note: As this is a potentially long-running operation the implementation should use
// Cluster.idleStrategy() and then occasionally call IdleStrategy.idle()
// especially when the snapshot ExclusivePublication returns Publication.BACK_PRESSURED
OnTakeSnapshot(publication *aeron.Publication)

// OnRoleChange notifies the clustered service that the cluster node has changed role
OnRoleChange(role Role)

// OnTerminate notifies the clustered service that the container is going to terminate
OnTerminate(cluster Cluster)

// OnNewLeadershipTermEvent notifies the clustered service that an election has been successful and a leader has entered a new term
OnNewLeadershipTermEvent(
leadershipTermId int64,
logPosition int64,
Expand Down
15 changes: 15 additions & 0 deletions cluster/clustered_service_agent.go
Expand Up @@ -726,6 +726,10 @@ func (agent *ClusteredServiceAgent) Time() int64 {
return agent.clusterTime
}

func (agent *ClusteredServiceAgent) TimeUnit() codecs.ClusterTimeUnitEnum {
return agent.timeUnit
}

func (agent *ClusteredServiceAgent) IdleStrategy() idlestrategy.Idler {
return agent
}
Expand All @@ -738,4 +742,15 @@ func (agent *ClusteredServiceAgent) CancelTimer(correlationId int64) bool {
return agent.proxy.cancelTimer(correlationId)
}

func (agent *ClusteredServiceAgent) Offer(buffer *atomic.Buffer, offset, length int32) int64 {
if agent.role != Leader {
return ClientSessionMockedOffer
}

hdrBuf := agent.sessionMsgHdrBuffer
hdrBuf.PutInt64(SBEHeaderLength+8, -int64(agent.opts.ServiceId))
hdrBuf.PutInt64(SBEHeaderLength+16, agent.clusterTime)
return agent.proxy.Offer2(hdrBuf, 0, hdrBuf.Capacity(), buffer, offset, length)
}

// END CLUSTER IMPLEMENTATION
26 changes: 21 additions & 5 deletions cluster/consensus_module_proxy.go
Expand Up @@ -81,13 +81,13 @@ func (proxy *consensusModuleProxy) scheduleTimer(correlationId int64, deadline i
buf := proxy.initBuffer(scheduleTimerTemplateId, scheduleTimerBlockLength)
buf.PutInt64(SBEHeaderLength, correlationId)
buf.PutInt64(SBEHeaderLength+8, deadline)
return proxy.offer(buf, SBEHeaderLength+scheduleTimerBlockLength) >= 0
return proxy.offer(buf, 0, SBEHeaderLength+scheduleTimerBlockLength) >= 0
}

func (proxy *consensusModuleProxy) cancelTimer(correlationId int64) bool {
buf := proxy.initBuffer(cancelTimerTemplateId, cancelTimerBlockLength)
buf.PutInt64(SBEHeaderLength, correlationId)
return proxy.offer(buf, SBEHeaderLength+cancelTimerBlockLength) >= 0
return proxy.offer(buf, 0, SBEHeaderLength+cancelTimerBlockLength) >= 0
}

func (proxy *consensusModuleProxy) initBuffer(templateId uint16, blockLength uint16) *atomic.Buffer {
Expand All @@ -102,15 +102,31 @@ func (proxy *consensusModuleProxy) initBuffer(templateId uint16, blockLength uin
// send to our request publication
func (proxy *consensusModuleProxy) send(payload []byte) {
buffer := atomic.MakeBuffer(payload)
for proxy.offer(buffer, buffer.Capacity()) < 0 {
for proxy.offer(buffer, 0, buffer.Capacity()) < 0 {
proxy.idleStrategy.Idle(0)
}
}

func (proxy *consensusModuleProxy) offer(buffer *atomic.Buffer, length int32) int64 {
func (proxy *consensusModuleProxy) offer(buffer *atomic.Buffer, offset, length int32) int64 {
var result int64
for i := 0; i < 3; i++ {
result = proxy.publication.Offer(buffer, 0, length, nil)
result = proxy.publication.Offer(buffer, offset, length, nil)
if result >= 0 {
break
} else if result == aeron.NotConnected || result == aeron.PublicationClosed || result == aeron.MaxPositionExceeded {
panic(fmt.Sprintf("offer failed, result=%d", result))
}
}
return result
}

func (proxy *consensusModuleProxy) Offer2(
bufferOne *atomic.Buffer, offsetOne int32, lengthOne int32,
bufferTwo *atomic.Buffer, offsetTwo int32, lengthTwo int32,
) int64 {
var result int64
for i := 0; i < 3; i++ {
result = proxy.publication.Offer2(bufferOne, offsetOne, lengthOne, bufferTwo, offsetTwo, lengthTwo, nil)
if result >= 0 {
break
} else if result == aeron.NotConnected || result == aeron.PublicationClosed || result == aeron.MaxPositionExceeded {
Expand Down

0 comments on commit 9203254

Please sign in to comment.