From fadcfb2ce7c6fe99fe0976cc8a784d8f122f967d Mon Sep 17 00:00:00 2001 From: Jakub Coufal Date: Tue, 16 Jan 2024 12:53:00 +0100 Subject: [PATCH] add: hide event handler from storage public API --- regattaserver/tables.go | 6 +-- storage/engine.go | 16 ++++---- storage/engine_events.go | 86 ++++++++++++++++++++-------------------- storage/engine_test.go | 3 +- 4 files changed, 57 insertions(+), 54 deletions(-) diff --git a/regattaserver/tables.go b/regattaserver/tables.go index 430dd9d2..128406e5 100644 --- a/regattaserver/tables.go +++ b/regattaserver/tables.go @@ -22,7 +22,7 @@ type TablesServer struct { } func (t *TablesServer) Create(ctx context.Context, req *regattapb.CreateTableRequest) (*regattapb.CreateTableResponse, error) { - ctx, err := t.AuthFunc(ctx) + _, err := t.AuthFunc(ctx) if err != nil { return nil, err } @@ -43,7 +43,7 @@ func (t *TablesServer) Create(ctx context.Context, req *regattapb.CreateTableReq } func (t *TablesServer) Delete(ctx context.Context, req *regattapb.DeleteTableRequest) (*regattapb.DeleteTableResponse, error) { - ctx, err := t.AuthFunc(ctx) + _, err := t.AuthFunc(ctx) if err != nil { return nil, err } @@ -60,7 +60,7 @@ func (t *TablesServer) Delete(ctx context.Context, req *regattapb.DeleteTableReq } func (t *TablesServer) List(ctx context.Context, _ *regattapb.ListTablesRequest) (*regattapb.ListTablesResponse, error) { - ctx, err := t.AuthFunc(ctx) + _, err := t.AuthFunc(ctx) if err != nil { return nil, err } diff --git a/storage/engine.go b/storage/engine.go index 3a0279de..0de06efc 100644 --- a/storage/engine.go +++ b/storage/engine.go @@ -24,11 +24,11 @@ const defaultQueryTimeout = 5 * time.Second func New(cfg Config) (*Engine, error) { e := &Engine{ - cfg: cfg, - log: cfg.Log, - eventsCh: make(chan any, 1), - stop: make(chan struct{}), + cfg: cfg, + log: cfg.Log, + stop: make(chan struct{}), } + e.events = &events{eventsCh: make(chan any, 1), engine: e} nh, err := createNodeHost(e) if err != nil { @@ -70,7 +70,7 @@ type Engine struct { *table.Manager cfg Config log *zap.SugaredLogger - eventsCh chan any + events *events stop chan struct{} LogReader logreader.Interface Cluster *cluster.Cluster @@ -82,7 +82,7 @@ func (e *Engine) Start() error { if err := e.Manager.Start(); err != nil { return err } - go e.dispatchEvents() + go e.events.dispatchEvents() return nil } @@ -254,8 +254,8 @@ func createNodeHost(e *Engine) (*dragonboat.NodeHost, error) { EnableMetrics: true, MaxReceiveQueueSize: e.cfg.MaxReceiveQueueSize, MaxSendQueueSize: e.cfg.MaxSendQueueSize, - SystemEventListener: e, - RaftEventListener: e, + SystemEventListener: e.events, + RaftEventListener: e.events, } if e.cfg.LogDBImplementation == Tan { diff --git a/storage/engine_events.go b/storage/engine_events.go index 9004449a..2367f959 100644 --- a/storage/engine_events.go +++ b/storage/engine_events.go @@ -6,32 +6,34 @@ import ( "github.com/lni/dragonboat/v4/raftio" ) -func (e *Engine) dispatchEvents() { - for { - select { - case evt := <-e.eventsCh: - e.log.Infof("raft: %T %+v", evt, evt) - switch ev := evt.(type) { - case nodeHostShuttingDown: - return - case leaderUpdated, nodeUnloaded, membershipChanged: - e.Cluster.Notify() - case nodeReady: - if ev.ReplicaID == e.cfg.NodeID && e.LogCache != nil { - e.LogCache.NodeReady(ev.ShardID) - } - e.Cluster.Notify() - case nodeDeleted: - if ev.ReplicaID == e.cfg.NodeID && e.LogCache != nil { - e.LogCache.NodeDeleted(ev.ShardID) - } - e.Cluster.Notify() - case logCompacted: - if ev.ReplicaID == e.cfg.NodeID && e.LogCache != nil { - e.LogCache.LogCompacted(ev.ShardID) - } - e.Cluster.Notify() +type events struct { + eventsCh chan any + engine *Engine +} + +func (e *events) dispatchEvents() { + for evt := range e.eventsCh { + e.engine.log.Infof("raft: %T %+v", evt, evt) + switch ev := evt.(type) { + case nodeHostShuttingDown: + return + case leaderUpdated, nodeUnloaded, membershipChanged: + e.engine.Cluster.Notify() + case nodeReady: + if ev.ReplicaID == e.engine.cfg.NodeID && e.engine.LogCache != nil { + e.engine.LogCache.NodeReady(ev.ShardID) } + e.engine.Cluster.Notify() + case nodeDeleted: + if ev.ReplicaID == e.engine.cfg.NodeID && e.engine.LogCache != nil { + e.engine.LogCache.NodeDeleted(ev.ShardID) + } + e.engine.Cluster.Notify() + case logCompacted: + if ev.ReplicaID == e.engine.cfg.NodeID && e.engine.LogCache != nil { + e.engine.LogCache.LogCompacted(ev.ShardID) + } + e.engine.Cluster.Notify() } } } @@ -43,7 +45,7 @@ type leaderUpdated struct { LeaderID uint64 } -func (e *Engine) LeaderUpdated(info raftio.LeaderInfo) { +func (e *events) LeaderUpdated(info raftio.LeaderInfo) { e.eventsCh <- leaderUpdated{ ShardID: info.ShardID, ReplicaID: info.ReplicaID, @@ -54,7 +56,7 @@ func (e *Engine) LeaderUpdated(info raftio.LeaderInfo) { type nodeHostShuttingDown struct{} -func (e *Engine) NodeHostShuttingDown() { +func (e *events) NodeHostShuttingDown() { e.eventsCh <- nodeHostShuttingDown{} } @@ -63,7 +65,7 @@ type nodeUnloaded struct { ReplicaID uint64 } -func (e *Engine) NodeUnloaded(info raftio.NodeInfo) { +func (e *events) NodeUnloaded(info raftio.NodeInfo) { e.eventsCh <- nodeUnloaded{ ShardID: info.ShardID, ReplicaID: info.ReplicaID, @@ -75,7 +77,7 @@ type nodeDeleted struct { ReplicaID uint64 } -func (e *Engine) NodeDeleted(info raftio.NodeInfo) { +func (e *events) NodeDeleted(info raftio.NodeInfo) { e.eventsCh <- nodeDeleted{ ShardID: info.ShardID, ReplicaID: info.ReplicaID, @@ -87,7 +89,7 @@ type nodeReady struct { ReplicaID uint64 } -func (e *Engine) NodeReady(info raftio.NodeInfo) { +func (e *events) NodeReady(info raftio.NodeInfo) { e.eventsCh <- nodeReady{ ShardID: info.ShardID, ReplicaID: info.ReplicaID, @@ -99,7 +101,7 @@ type membershipChanged struct { ReplicaID uint64 } -func (e *Engine) MembershipChanged(info raftio.NodeInfo) { +func (e *events) MembershipChanged(info raftio.NodeInfo) { e.eventsCh <- membershipChanged{ ShardID: info.ShardID, ReplicaID: info.ReplicaID, @@ -111,7 +113,7 @@ type connectionEstablished struct { SnapshotConnection bool } -func (e *Engine) ConnectionEstablished(info raftio.ConnectionInfo) { +func (e *events) ConnectionEstablished(info raftio.ConnectionInfo) { e.eventsCh <- connectionEstablished{ Address: info.Address, SnapshotConnection: info.SnapshotConnection, @@ -123,7 +125,7 @@ type connectionFailed struct { SnapshotConnection bool } -func (e *Engine) ConnectionFailed(info raftio.ConnectionInfo) { +func (e *events) ConnectionFailed(info raftio.ConnectionInfo) { e.eventsCh <- connectionFailed{ Address: info.Address, SnapshotConnection: info.SnapshotConnection, @@ -137,7 +139,7 @@ type sendSnapshotStarted struct { Index uint64 } -func (e *Engine) SendSnapshotStarted(info raftio.SnapshotInfo) { +func (e *events) SendSnapshotStarted(info raftio.SnapshotInfo) { e.eventsCh <- sendSnapshotStarted{ ShardID: info.ShardID, ReplicaID: info.ReplicaID, @@ -153,7 +155,7 @@ type sendSnapshotCompleted struct { Index uint64 } -func (e *Engine) SendSnapshotCompleted(info raftio.SnapshotInfo) { +func (e *events) SendSnapshotCompleted(info raftio.SnapshotInfo) { e.eventsCh <- sendSnapshotCompleted{ ShardID: info.ShardID, ReplicaID: info.ReplicaID, @@ -169,7 +171,7 @@ type sendSnapshotAborted struct { Index uint64 } -func (e *Engine) SendSnapshotAborted(info raftio.SnapshotInfo) { +func (e *events) SendSnapshotAborted(info raftio.SnapshotInfo) { e.eventsCh <- sendSnapshotAborted{ ShardID: info.ShardID, ReplicaID: info.ReplicaID, @@ -185,7 +187,7 @@ type snapshotReceived struct { Index uint64 } -func (e *Engine) SnapshotReceived(info raftio.SnapshotInfo) { +func (e *events) SnapshotReceived(info raftio.SnapshotInfo) { e.eventsCh <- snapshotReceived{ ShardID: info.ShardID, ReplicaID: info.ReplicaID, @@ -201,7 +203,7 @@ type snapshotRecovered struct { Index uint64 } -func (e *Engine) SnapshotRecovered(info raftio.SnapshotInfo) { +func (e *events) SnapshotRecovered(info raftio.SnapshotInfo) { e.eventsCh <- snapshotRecovered{ ShardID: info.ShardID, ReplicaID: info.ReplicaID, @@ -217,7 +219,7 @@ type snapshotCreated struct { Index uint64 } -func (e *Engine) SnapshotCreated(info raftio.SnapshotInfo) { +func (e *events) SnapshotCreated(info raftio.SnapshotInfo) { e.eventsCh <- snapshotCreated{ ShardID: info.ShardID, ReplicaID: info.ReplicaID, @@ -233,7 +235,7 @@ type snapshotCompacted struct { Index uint64 } -func (e *Engine) SnapshotCompacted(info raftio.SnapshotInfo) { +func (e *events) SnapshotCompacted(info raftio.SnapshotInfo) { e.eventsCh <- snapshotCompacted{ ShardID: info.ShardID, ReplicaID: info.ReplicaID, @@ -247,7 +249,7 @@ type logCompacted struct { ReplicaID uint64 } -func (e *Engine) LogCompacted(info raftio.EntryInfo) { +func (e *events) LogCompacted(info raftio.EntryInfo) { e.eventsCh <- logCompacted{ ShardID: info.ShardID, ReplicaID: info.ReplicaID, @@ -259,7 +261,7 @@ type logDBCompacted struct { ReplicaID uint64 } -func (e *Engine) LogDBCompacted(info raftio.EntryInfo) { +func (e *events) LogDBCompacted(info raftio.EntryInfo) { e.eventsCh <- logDBCompacted{ ShardID: info.ShardID, ReplicaID: info.ReplicaID, diff --git a/storage/engine_test.go b/storage/engine_test.go index b16c94b1..2edd736f 100644 --- a/storage/engine_test.go +++ b/storage/engine_test.go @@ -788,7 +788,8 @@ func newTestConfig() Config { } func newTestEngine(t *testing.T, cfg Config) *Engine { - e := &Engine{cfg: cfg, eventsCh: make(chan any, 1), stop: make(chan struct{}), log: zaptest.NewLogger(t).Sugar()} + e := &Engine{cfg: cfg, stop: make(chan struct{}), log: zaptest.NewLogger(t).Sugar()} + e.events = &events{eventsCh: make(chan any, 1), engine: e} nh, err := createNodeHost(e) require.NoError(t, err) e.NodeHost = nh