Skip to content

Commit

Permalink
add: hide event handler from storage public API
Browse files Browse the repository at this point in the history
  • Loading branch information
coufalja committed Jan 16, 2024
1 parent 25841df commit eda6907
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 38 deletions.
16 changes: 8 additions & 8 deletions storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ const defaultQueryTimeout = 5 * time.Second

func New(cfg Config) (*Engine, error) {
e := &Engine{
cfg: cfg,
log: cfg.Log,
eventsCh: make(chan any, 1),
stop: make(chan struct{}),
cfg: cfg,
log: cfg.Log,
stop: make(chan struct{}),
}
e.events = &events{eventsCh: make(chan any, 1), engine: e}

nh, err := createNodeHost(e)
if err != nil {
Expand Down Expand Up @@ -70,7 +70,7 @@ type Engine struct {
*table.Manager
cfg Config
log *zap.SugaredLogger
eventsCh chan any
events *events
stop chan struct{}
LogReader logreader.Interface
Cluster *cluster.Cluster
Expand All @@ -82,7 +82,7 @@ func (e *Engine) Start() error {
if err := e.Manager.Start(); err != nil {
return err
}
go e.dispatchEvents()
go e.events.dispatchEvents()
return nil
}

Expand Down Expand Up @@ -254,8 +254,8 @@ func createNodeHost(e *Engine) (*dragonboat.NodeHost, error) {
EnableMetrics: true,
MaxReceiveQueueSize: e.cfg.MaxReceiveQueueSize,
MaxSendQueueSize: e.cfg.MaxSendQueueSize,
SystemEventListener: e,
RaftEventListener: e,
SystemEventListener: e.events,
RaftEventListener: e.events,
}

if e.cfg.LogDBImplementation == Tan {
Expand Down
63 changes: 34 additions & 29 deletions storage/engine_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,36 @@ import (
"github.com/lni/dragonboat/v4/raftio"
)

func (e *Engine) dispatchEvents() {
type events struct {
eventsCh chan any
engine *Engine
}

func (e *events) dispatchEvents() {
for {

Check failure on line 15 in storage/engine_events.go

View workflow job for this annotation

GitHub Actions / golangci

S1000: should use for range instead of for { select {} } (gosimple)
select {
case evt := <-e.eventsCh:
e.log.Infof("raft: %T %+v", evt, evt)
e.engine.log.Infof("raft: %T %+v", evt, evt)
switch ev := evt.(type) {
case nodeHostShuttingDown:
return
case leaderUpdated, nodeUnloaded, membershipChanged:
e.Cluster.Notify()
e.engine.Cluster.Notify()
case nodeReady:
if ev.ReplicaID == e.cfg.NodeID && e.LogCache != nil {
e.LogCache.NodeReady(ev.ShardID)
if ev.ReplicaID == e.engine.cfg.NodeID && e.engine.LogCache != nil {
e.engine.LogCache.NodeReady(ev.ShardID)
}
e.Cluster.Notify()
e.engine.Cluster.Notify()
case nodeDeleted:
if ev.ReplicaID == e.cfg.NodeID && e.LogCache != nil {
e.LogCache.NodeDeleted(ev.ShardID)
if ev.ReplicaID == e.engine.cfg.NodeID && e.engine.LogCache != nil {
e.engine.LogCache.NodeDeleted(ev.ShardID)
}
e.Cluster.Notify()
e.engine.Cluster.Notify()
case logCompacted:
if ev.ReplicaID == e.cfg.NodeID && e.LogCache != nil {
e.LogCache.LogCompacted(ev.ShardID)
if ev.ReplicaID == e.engine.cfg.NodeID && e.engine.LogCache != nil {
e.engine.LogCache.LogCompacted(ev.ShardID)
}
e.Cluster.Notify()
e.engine.Cluster.Notify()
}
}
}
Expand All @@ -43,7 +48,7 @@ type leaderUpdated struct {
LeaderID uint64
}

func (e *Engine) LeaderUpdated(info raftio.LeaderInfo) {
func (e *events) LeaderUpdated(info raftio.LeaderInfo) {
e.eventsCh <- leaderUpdated{
ShardID: info.ShardID,
ReplicaID: info.ReplicaID,
Expand All @@ -54,7 +59,7 @@ func (e *Engine) LeaderUpdated(info raftio.LeaderInfo) {

type nodeHostShuttingDown struct{}

func (e *Engine) NodeHostShuttingDown() {
func (e *events) NodeHostShuttingDown() {
e.eventsCh <- nodeHostShuttingDown{}
}

Expand All @@ -63,7 +68,7 @@ type nodeUnloaded struct {
ReplicaID uint64
}

func (e *Engine) NodeUnloaded(info raftio.NodeInfo) {
func (e *events) NodeUnloaded(info raftio.NodeInfo) {
e.eventsCh <- nodeUnloaded{
ShardID: info.ShardID,
ReplicaID: info.ReplicaID,
Expand All @@ -75,7 +80,7 @@ type nodeDeleted struct {
ReplicaID uint64
}

func (e *Engine) NodeDeleted(info raftio.NodeInfo) {
func (e *events) NodeDeleted(info raftio.NodeInfo) {
e.eventsCh <- nodeDeleted{
ShardID: info.ShardID,
ReplicaID: info.ReplicaID,
Expand All @@ -87,7 +92,7 @@ type nodeReady struct {
ReplicaID uint64
}

func (e *Engine) NodeReady(info raftio.NodeInfo) {
func (e *events) NodeReady(info raftio.NodeInfo) {
e.eventsCh <- nodeReady{
ShardID: info.ShardID,
ReplicaID: info.ReplicaID,
Expand All @@ -99,7 +104,7 @@ type membershipChanged struct {
ReplicaID uint64
}

func (e *Engine) MembershipChanged(info raftio.NodeInfo) {
func (e *events) MembershipChanged(info raftio.NodeInfo) {
e.eventsCh <- membershipChanged{
ShardID: info.ShardID,
ReplicaID: info.ReplicaID,
Expand All @@ -111,7 +116,7 @@ type connectionEstablished struct {
SnapshotConnection bool
}

func (e *Engine) ConnectionEstablished(info raftio.ConnectionInfo) {
func (e *events) ConnectionEstablished(info raftio.ConnectionInfo) {
e.eventsCh <- connectionEstablished{
Address: info.Address,
SnapshotConnection: info.SnapshotConnection,
Expand All @@ -123,7 +128,7 @@ type connectionFailed struct {
SnapshotConnection bool
}

func (e *Engine) ConnectionFailed(info raftio.ConnectionInfo) {
func (e *events) ConnectionFailed(info raftio.ConnectionInfo) {
e.eventsCh <- connectionFailed{
Address: info.Address,
SnapshotConnection: info.SnapshotConnection,
Expand All @@ -137,7 +142,7 @@ type sendSnapshotStarted struct {
Index uint64
}

func (e *Engine) SendSnapshotStarted(info raftio.SnapshotInfo) {
func (e *events) SendSnapshotStarted(info raftio.SnapshotInfo) {
e.eventsCh <- sendSnapshotStarted{
ShardID: info.ShardID,
ReplicaID: info.ReplicaID,
Expand All @@ -153,7 +158,7 @@ type sendSnapshotCompleted struct {
Index uint64
}

func (e *Engine) SendSnapshotCompleted(info raftio.SnapshotInfo) {
func (e *events) SendSnapshotCompleted(info raftio.SnapshotInfo) {
e.eventsCh <- sendSnapshotCompleted{
ShardID: info.ShardID,
ReplicaID: info.ReplicaID,
Expand All @@ -169,7 +174,7 @@ type sendSnapshotAborted struct {
Index uint64
}

func (e *Engine) SendSnapshotAborted(info raftio.SnapshotInfo) {
func (e *events) SendSnapshotAborted(info raftio.SnapshotInfo) {
e.eventsCh <- sendSnapshotAborted{
ShardID: info.ShardID,
ReplicaID: info.ReplicaID,
Expand All @@ -185,7 +190,7 @@ type snapshotReceived struct {
Index uint64
}

func (e *Engine) SnapshotReceived(info raftio.SnapshotInfo) {
func (e *events) SnapshotReceived(info raftio.SnapshotInfo) {
e.eventsCh <- snapshotReceived{
ShardID: info.ShardID,
ReplicaID: info.ReplicaID,
Expand All @@ -201,7 +206,7 @@ type snapshotRecovered struct {
Index uint64
}

func (e *Engine) SnapshotRecovered(info raftio.SnapshotInfo) {
func (e *events) SnapshotRecovered(info raftio.SnapshotInfo) {
e.eventsCh <- snapshotRecovered{
ShardID: info.ShardID,
ReplicaID: info.ReplicaID,
Expand All @@ -217,7 +222,7 @@ type snapshotCreated struct {
Index uint64
}

func (e *Engine) SnapshotCreated(info raftio.SnapshotInfo) {
func (e *events) SnapshotCreated(info raftio.SnapshotInfo) {
e.eventsCh <- snapshotCreated{
ShardID: info.ShardID,
ReplicaID: info.ReplicaID,
Expand All @@ -233,7 +238,7 @@ type snapshotCompacted struct {
Index uint64
}

func (e *Engine) SnapshotCompacted(info raftio.SnapshotInfo) {
func (e *events) SnapshotCompacted(info raftio.SnapshotInfo) {
e.eventsCh <- snapshotCompacted{
ShardID: info.ShardID,
ReplicaID: info.ReplicaID,
Expand All @@ -247,7 +252,7 @@ type logCompacted struct {
ReplicaID uint64
}

func (e *Engine) LogCompacted(info raftio.EntryInfo) {
func (e *events) LogCompacted(info raftio.EntryInfo) {
e.eventsCh <- logCompacted{
ShardID: info.ShardID,
ReplicaID: info.ReplicaID,
Expand All @@ -259,7 +264,7 @@ type logDBCompacted struct {
ReplicaID uint64
}

func (e *Engine) LogDBCompacted(info raftio.EntryInfo) {
func (e *events) LogDBCompacted(info raftio.EntryInfo) {
e.eventsCh <- logDBCompacted{
ShardID: info.ShardID,
ReplicaID: info.ReplicaID,
Expand Down
3 changes: 2 additions & 1 deletion storage/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,7 +788,8 @@ func newTestConfig() Config {
}

func newTestEngine(t *testing.T, cfg Config) *Engine {
e := &Engine{cfg: cfg, eventsCh: make(chan any, 1), stop: make(chan struct{}), log: zaptest.NewLogger(t).Sugar()}
e := &Engine{cfg: cfg, stop: make(chan struct{}), log: zaptest.NewLogger(t).Sugar()}
e.events = &events{eventsCh: make(chan any, 1), engine: e}
nh, err := createNodeHost(e)
require.NoError(t, err)
e.NodeHost = nh
Expand Down

0 comments on commit eda6907

Please sign in to comment.