Skip to content

Commit

Permalink
Merge pull request #13 from ethpandaops/feat/data-stream
Browse files Browse the repository at this point in the history
feat: Add Callback DataStream
  • Loading branch information
cortze committed Jun 3, 2024
2 parents 90aecd7 + 1164f9c commit a050845
Show file tree
Hide file tree
Showing 9 changed files with 334 additions and 114 deletions.
80 changes: 36 additions & 44 deletions eth/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ type Node struct {
connBeacon metric.Int64ObservableGauge
connAge metric.Float64Histogram
connMedianAge metric.Float64ObservableGauge

// eventCallbacks contains a list of callbacks that are executed when an event is received
eventCallbacks []func(ctx context.Context, event *host.TraceEvent)
}

// NewNode initializes a new [Node] using the provided configuration.
Expand All @@ -78,7 +81,6 @@ func NewNode(cfg *NodeConfig) (*Node, error) {

var ds host.DataStream
if cfg.AWSConfig != nil {

droppedTraces, err := cfg.Meter.Int64Counter("dropped_traces")
if err != nil {
return nil, fmt.Errorf("new dropped_traces counter: %w", err)
Expand Down Expand Up @@ -106,9 +108,10 @@ func NewNode(cfg *NodeConfig) (*Node, error) {
if err != nil {
return nil, fmt.Errorf("new kinesis producer: %w", err)
}
ds = p

ds = host.NewKinesisDataStream(p)
} else {
ds = host.NoopDataStream{}
ds = host.NewCallbackDataStream()
}

hostCfg := &host.Config{
Expand Down Expand Up @@ -199,15 +202,27 @@ func NewNode(cfg *NodeConfig) (*Node, error) {

// finally, initialize hermes node
n := &Node{
cfg: cfg,
host: h,
ds: ds,
sup: suture.NewSimple("eth"),
reqResp: reqResp,
pubSub: pubSub,
pryClient: pryClient,
peerer: NewPeerer(h, pryClient),
disc: disc,
cfg: cfg,
host: h,
ds: ds,
sup: suture.NewSimple("eth"),
reqResp: reqResp,
pubSub: pubSub,
pryClient: pryClient,
peerer: NewPeerer(h, pryClient),
disc: disc,
eventCallbacks: []func(ctx context.Context, event *host.TraceEvent){},
}

if ds.Type() == host.DataStreamTypeCallback {
cbDs := ds.(*host.CallbackDataStream)

cbDs.OnEvent(func(ctx context.Context, event *host.TraceEvent) {
for _, cb := range n.eventCallbacks {
cb(ctx, event)
}
})

}

// initialize custom prometheus metrics
Expand Down Expand Up @@ -301,6 +316,11 @@ func (n *Node) initMetrics(cfg *NodeConfig) (err error) {
return nil
}

// OnEvent registers a callback that is executed when an event is received.
func (n *Node) OnEvent(cb func(ctx context.Context, event *host.TraceEvent)) {
n.eventCallbacks = append(n.eventCallbacks, cb)
}

// Start starts the listening process.
func (n *Node) Start(ctx context.Context) error {
defer logDeferErr(n.host.Close, "Failed closing libp2p host")
Expand Down Expand Up @@ -473,45 +493,17 @@ func terminateSupervisorTreeOnErr(err error) error {

// startDataStream starts the data stream and implements a graceful shutdown
func (n *Node) startDataStream(ctx context.Context) (func(), error) {
dsCtx, dsCancel := context.WithCancel(context.Background())
backgroundCtx := context.Background()

go func() {
if err := n.ds.Start(dsCtx); err != nil {
if err := n.ds.Start(backgroundCtx); err != nil {
slog.Warn("Failed to start data stream", tele.LogAttrError(err))
}
}()

cleanupFn := func() {
producer, ok := n.ds.(*gk.Producer)
if !ok {
dsCancel()
return
}

slog.Info("Waiting for Kinesis producer to become idle", "timeout", "15s")
// wait until the producer is idle
timeoutCtx, timeoutCncl := context.WithTimeout(dsCtx, 15*time.Second)
if err := producer.WaitIdle(timeoutCtx); err != nil {
slog.Info("Error waiting for producer to become idle", tele.LogAttrError(err))
}
timeoutCncl()

// stop the producer
dsCancel()

slog.Info("Stopped Kinesis producer, waiting for shutdown", "timeout", "5s")
// wait until the producer has stopped
timeoutCtx, timeoutCncl = context.WithTimeout(dsCtx, 5*time.Second)
if err := producer.WaitStopped(timeoutCtx); err != nil {
slog.Info("Error waiting for producer to stop", tele.LogAttrError(err))
}
timeoutCncl()
}

producer, ok := n.ds.(*gk.Producer)
if !ok {
return cleanupFn, nil
n.ds.Stop(ctx)
}

return cleanupFn, producer.WaitIdle(ctx)
return cleanupFn, nil
}
Loading

0 comments on commit a050845

Please sign in to comment.