Skip to content

Commit

Permalink
add: hide event handler from storage public API
Browse files Browse the repository at this point in the history
  • Loading branch information
coufalja committed Jan 16, 2024
1 parent 25841df commit fadcfb2
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 54 deletions.
6 changes: 3 additions & 3 deletions regattaserver/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type TablesServer struct {
}

func (t *TablesServer) Create(ctx context.Context, req *regattapb.CreateTableRequest) (*regattapb.CreateTableResponse, error) {
ctx, err := t.AuthFunc(ctx)
_, err := t.AuthFunc(ctx)
if err != nil {
return nil, err
}
Expand All @@ -43,7 +43,7 @@ func (t *TablesServer) Create(ctx context.Context, req *regattapb.CreateTableReq
}

func (t *TablesServer) Delete(ctx context.Context, req *regattapb.DeleteTableRequest) (*regattapb.DeleteTableResponse, error) {
ctx, err := t.AuthFunc(ctx)
_, err := t.AuthFunc(ctx)
if err != nil {
return nil, err
}
Expand All @@ -60,7 +60,7 @@ func (t *TablesServer) Delete(ctx context.Context, req *regattapb.DeleteTableReq
}

func (t *TablesServer) List(ctx context.Context, _ *regattapb.ListTablesRequest) (*regattapb.ListTablesResponse, error) {
ctx, err := t.AuthFunc(ctx)
_, err := t.AuthFunc(ctx)
if err != nil {
return nil, err
}
Expand Down
16 changes: 8 additions & 8 deletions storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ const defaultQueryTimeout = 5 * time.Second

func New(cfg Config) (*Engine, error) {
e := &Engine{
cfg: cfg,
log: cfg.Log,
eventsCh: make(chan any, 1),
stop: make(chan struct{}),
cfg: cfg,
log: cfg.Log,
stop: make(chan struct{}),
}
e.events = &events{eventsCh: make(chan any, 1), engine: e}

nh, err := createNodeHost(e)
if err != nil {
Expand Down Expand Up @@ -70,7 +70,7 @@ type Engine struct {
*table.Manager
cfg Config
log *zap.SugaredLogger
eventsCh chan any
events *events
stop chan struct{}
LogReader logreader.Interface
Cluster *cluster.Cluster
Expand All @@ -82,7 +82,7 @@ func (e *Engine) Start() error {
if err := e.Manager.Start(); err != nil {
return err
}
go e.dispatchEvents()
go e.events.dispatchEvents()
return nil
}

Expand Down Expand Up @@ -254,8 +254,8 @@ func createNodeHost(e *Engine) (*dragonboat.NodeHost, error) {
EnableMetrics: true,
MaxReceiveQueueSize: e.cfg.MaxReceiveQueueSize,
MaxSendQueueSize: e.cfg.MaxSendQueueSize,
SystemEventListener: e,
RaftEventListener: e,
SystemEventListener: e.events,
RaftEventListener: e.events,
}

if e.cfg.LogDBImplementation == Tan {
Expand Down
86 changes: 44 additions & 42 deletions storage/engine_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,34 @@ import (
"github.com/lni/dragonboat/v4/raftio"
)

func (e *Engine) dispatchEvents() {
for {
select {
case evt := <-e.eventsCh:
e.log.Infof("raft: %T %+v", evt, evt)
switch ev := evt.(type) {
case nodeHostShuttingDown:
return
case leaderUpdated, nodeUnloaded, membershipChanged:
e.Cluster.Notify()
case nodeReady:
if ev.ReplicaID == e.cfg.NodeID && e.LogCache != nil {
e.LogCache.NodeReady(ev.ShardID)
}
e.Cluster.Notify()
case nodeDeleted:
if ev.ReplicaID == e.cfg.NodeID && e.LogCache != nil {
e.LogCache.NodeDeleted(ev.ShardID)
}
e.Cluster.Notify()
case logCompacted:
if ev.ReplicaID == e.cfg.NodeID && e.LogCache != nil {
e.LogCache.LogCompacted(ev.ShardID)
}
e.Cluster.Notify()
type events struct {
eventsCh chan any
engine *Engine
}

func (e *events) dispatchEvents() {
for evt := range e.eventsCh {
e.engine.log.Infof("raft: %T %+v", evt, evt)
switch ev := evt.(type) {
case nodeHostShuttingDown:
return
case leaderUpdated, nodeUnloaded, membershipChanged:
e.engine.Cluster.Notify()
case nodeReady:
if ev.ReplicaID == e.engine.cfg.NodeID && e.engine.LogCache != nil {
e.engine.LogCache.NodeReady(ev.ShardID)
}
e.engine.Cluster.Notify()
case nodeDeleted:
if ev.ReplicaID == e.engine.cfg.NodeID && e.engine.LogCache != nil {
e.engine.LogCache.NodeDeleted(ev.ShardID)
}
e.engine.Cluster.Notify()
case logCompacted:
if ev.ReplicaID == e.engine.cfg.NodeID && e.engine.LogCache != nil {
e.engine.LogCache.LogCompacted(ev.ShardID)
}
e.engine.Cluster.Notify()
}
}
}
Expand All @@ -43,7 +45,7 @@ type leaderUpdated struct {
LeaderID uint64
}

func (e *Engine) LeaderUpdated(info raftio.LeaderInfo) {
func (e *events) LeaderUpdated(info raftio.LeaderInfo) {
e.eventsCh <- leaderUpdated{
ShardID: info.ShardID,
ReplicaID: info.ReplicaID,
Expand All @@ -54,7 +56,7 @@ func (e *Engine) LeaderUpdated(info raftio.LeaderInfo) {

type nodeHostShuttingDown struct{}

func (e *Engine) NodeHostShuttingDown() {
func (e *events) NodeHostShuttingDown() {
e.eventsCh <- nodeHostShuttingDown{}
}

Expand All @@ -63,7 +65,7 @@ type nodeUnloaded struct {
ReplicaID uint64
}

func (e *Engine) NodeUnloaded(info raftio.NodeInfo) {
func (e *events) NodeUnloaded(info raftio.NodeInfo) {
e.eventsCh <- nodeUnloaded{
ShardID: info.ShardID,
ReplicaID: info.ReplicaID,
Expand All @@ -75,7 +77,7 @@ type nodeDeleted struct {
ReplicaID uint64
}

func (e *Engine) NodeDeleted(info raftio.NodeInfo) {
func (e *events) NodeDeleted(info raftio.NodeInfo) {
e.eventsCh <- nodeDeleted{
ShardID: info.ShardID,
ReplicaID: info.ReplicaID,
Expand All @@ -87,7 +89,7 @@ type nodeReady struct {
ReplicaID uint64
}

func (e *Engine) NodeReady(info raftio.NodeInfo) {
func (e *events) NodeReady(info raftio.NodeInfo) {
e.eventsCh <- nodeReady{
ShardID: info.ShardID,
ReplicaID: info.ReplicaID,
Expand All @@ -99,7 +101,7 @@ type membershipChanged struct {
ReplicaID uint64
}

func (e *Engine) MembershipChanged(info raftio.NodeInfo) {
func (e *events) MembershipChanged(info raftio.NodeInfo) {
e.eventsCh <- membershipChanged{
ShardID: info.ShardID,
ReplicaID: info.ReplicaID,
Expand All @@ -111,7 +113,7 @@ type connectionEstablished struct {
SnapshotConnection bool
}

func (e *Engine) ConnectionEstablished(info raftio.ConnectionInfo) {
func (e *events) ConnectionEstablished(info raftio.ConnectionInfo) {
e.eventsCh <- connectionEstablished{
Address: info.Address,
SnapshotConnection: info.SnapshotConnection,
Expand All @@ -123,7 +125,7 @@ type connectionFailed struct {
SnapshotConnection bool
}

func (e *Engine) ConnectionFailed(info raftio.ConnectionInfo) {
func (e *events) ConnectionFailed(info raftio.ConnectionInfo) {
e.eventsCh <- connectionFailed{
Address: info.Address,
SnapshotConnection: info.SnapshotConnection,
Expand All @@ -137,7 +139,7 @@ type sendSnapshotStarted struct {
Index uint64
}

func (e *Engine) SendSnapshotStarted(info raftio.SnapshotInfo) {
func (e *events) SendSnapshotStarted(info raftio.SnapshotInfo) {
e.eventsCh <- sendSnapshotStarted{
ShardID: info.ShardID,
ReplicaID: info.ReplicaID,
Expand All @@ -153,7 +155,7 @@ type sendSnapshotCompleted struct {
Index uint64
}

func (e *Engine) SendSnapshotCompleted(info raftio.SnapshotInfo) {
func (e *events) SendSnapshotCompleted(info raftio.SnapshotInfo) {
e.eventsCh <- sendSnapshotCompleted{
ShardID: info.ShardID,
ReplicaID: info.ReplicaID,
Expand All @@ -169,7 +171,7 @@ type sendSnapshotAborted struct {
Index uint64
}

func (e *Engine) SendSnapshotAborted(info raftio.SnapshotInfo) {
func (e *events) SendSnapshotAborted(info raftio.SnapshotInfo) {
e.eventsCh <- sendSnapshotAborted{
ShardID: info.ShardID,
ReplicaID: info.ReplicaID,
Expand All @@ -185,7 +187,7 @@ type snapshotReceived struct {
Index uint64
}

func (e *Engine) SnapshotReceived(info raftio.SnapshotInfo) {
func (e *events) SnapshotReceived(info raftio.SnapshotInfo) {
e.eventsCh <- snapshotReceived{
ShardID: info.ShardID,
ReplicaID: info.ReplicaID,
Expand All @@ -201,7 +203,7 @@ type snapshotRecovered struct {
Index uint64
}

func (e *Engine) SnapshotRecovered(info raftio.SnapshotInfo) {
func (e *events) SnapshotRecovered(info raftio.SnapshotInfo) {
e.eventsCh <- snapshotRecovered{
ShardID: info.ShardID,
ReplicaID: info.ReplicaID,
Expand All @@ -217,7 +219,7 @@ type snapshotCreated struct {
Index uint64
}

func (e *Engine) SnapshotCreated(info raftio.SnapshotInfo) {
func (e *events) SnapshotCreated(info raftio.SnapshotInfo) {
e.eventsCh <- snapshotCreated{
ShardID: info.ShardID,
ReplicaID: info.ReplicaID,
Expand All @@ -233,7 +235,7 @@ type snapshotCompacted struct {
Index uint64
}

func (e *Engine) SnapshotCompacted(info raftio.SnapshotInfo) {
func (e *events) SnapshotCompacted(info raftio.SnapshotInfo) {
e.eventsCh <- snapshotCompacted{
ShardID: info.ShardID,
ReplicaID: info.ReplicaID,
Expand All @@ -247,7 +249,7 @@ type logCompacted struct {
ReplicaID uint64
}

func (e *Engine) LogCompacted(info raftio.EntryInfo) {
func (e *events) LogCompacted(info raftio.EntryInfo) {
e.eventsCh <- logCompacted{
ShardID: info.ShardID,
ReplicaID: info.ReplicaID,
Expand All @@ -259,7 +261,7 @@ type logDBCompacted struct {
ReplicaID uint64
}

func (e *Engine) LogDBCompacted(info raftio.EntryInfo) {
func (e *events) LogDBCompacted(info raftio.EntryInfo) {
e.eventsCh <- logDBCompacted{
ShardID: info.ShardID,
ReplicaID: info.ReplicaID,
Expand Down
3 changes: 2 additions & 1 deletion storage/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,7 +788,8 @@ func newTestConfig() Config {
}

func newTestEngine(t *testing.T, cfg Config) *Engine {
e := &Engine{cfg: cfg, eventsCh: make(chan any, 1), stop: make(chan struct{}), log: zaptest.NewLogger(t).Sugar()}
e := &Engine{cfg: cfg, stop: make(chan struct{}), log: zaptest.NewLogger(t).Sugar()}
e.events = &events{eventsCh: make(chan any, 1), engine: e}
nh, err := createNodeHost(e)
require.NoError(t, err)
e.NodeHost = nh
Expand Down

0 comments on commit fadcfb2

Please sign in to comment.