Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func init() {
rootCmd.PersistentFlags().Bool("api-contractApiRequest-disableCompression", false, "Disable compression for contract API request")
rootCmd.PersistentFlags().Int("api-contractApiRequest-timeout", 10, "Timeout in seconds for contract API request")
rootCmd.PersistentFlags().Bool("publisher-enabled", false, "Toggle publisher")
rootCmd.PersistentFlags().String("publisher-mode", "default", "Publisher mode: default or parallel")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Validate --publisher-mode against allowed values.

Add a fast-fail validation for default|parallel to avoid silent misconfigurations.

Example (place in init() after binds or in initConfig()):

mode := viper.GetString("publisher.mode")
if mode == "" {
  viper.Set("publisher.mode", "default")
} else if mode != "default" && mode != "parallel" {
  // Use your logger if preferred
  panic(fmt.Errorf("invalid --publisher-mode %q (allowed: default, parallel)", mode))
}
🤖 Prompt for AI Agents
In cmd/root.go at line 138, the --publisher-mode flag is defined but lacks
validation for allowed values. Add a validation step after flag binding or in
the initConfig() function to check if the value is either "default" or
"parallel". If the value is empty, set it to "default". If it is any other
value, immediately fail by panicking with a clear error message indicating the
invalid value and allowed options.

rootCmd.PersistentFlags().String("publisher-brokers", "", "Kafka brokers")
rootCmd.PersistentFlags().Bool("publisher-blocks-enabled", false, "Toggle block publisher")
rootCmd.PersistentFlags().String("publisher-blocks-topicName", "", "Kafka topic name for blocks")
Expand Down Expand Up @@ -250,6 +251,7 @@ func init() {
viper.BindPFlag("api.contractApiRequest.disableCompression", rootCmd.PersistentFlags().Lookup("api-contractApiRequest-disableCompression"))
viper.BindPFlag("api.contractApiRequest.timeout", rootCmd.PersistentFlags().Lookup("api-contractApiRequest-timeout"))
viper.BindPFlag("publisher.enabled", rootCmd.PersistentFlags().Lookup("publisher-enabled"))
viper.BindPFlag("publisher.mode", rootCmd.PersistentFlags().Lookup("publisher-mode"))
viper.BindPFlag("publisher.brokers", rootCmd.PersistentFlags().Lookup("publisher-brokers"))
viper.BindPFlag("publisher.blocks.enabled", rootCmd.PersistentFlags().Lookup("publisher-blocks-enabled"))
viper.BindPFlag("publisher.blocks.topicName", rootCmd.PersistentFlags().Lookup("publisher-blocks-topicName"))
Expand Down
4 changes: 3 additions & 1 deletion configs/config.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,11 @@ api:
publisher:
# Whether the publisher is enabled
enabled: true
# Publisher mode: "default" publishes after storage commit, "parallel" runs publishing alongside committing
mode: default
# Kafka broker addresses (comma-separated)
brokers: localhost:9092

# Block publishing configuration
blocks:
# Whether to publish block data
Expand Down
1 change: 1 addition & 0 deletions configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ type EventPublisherConfig struct {

type PublisherConfig struct {
Enabled bool `mapstructure:"enabled"`
Mode string `mapstructure:"mode"`
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Harden publisher.mode with validation (and consider a typed enum).

Right now Mode accepts any string. Add early validation to prevent misconfigurations; optionally use a typed enum for clarity.

Apply this minimal diff to use a typed enum in the struct:

 type PublisherConfig struct {
   Enabled      bool                       `mapstructure:"enabled"`
-  Mode         string                     `mapstructure:"mode"`
+  Mode         PublisherMode              `mapstructure:"mode"`
   Brokers      string                     `mapstructure:"brokers"`
   Username     string                     `mapstructure:"username"`
   Password     string                     `mapstructure:"password"`
   Blocks       BlockPublisherConfig       `mapstructure:"blocks"`
   Transactions TransactionPublisherConfig `mapstructure:"transactions"`
   Traces       TracePublisherConfig       `mapstructure:"traces"`
   Events       EventPublisherConfig       `mapstructure:"events"`
 }

Add these supporting declarations (outside the selected range) near the top-level types:

// PublisherMode defines allowed publisher modes.
type PublisherMode string

const (
  PublisherModeDefault  PublisherMode = "default"
  PublisherModeParallel PublisherMode = "parallel"
)

func (m PublisherMode) IsValid() bool {
  switch m {
  case PublisherModeDefault, PublisherModeParallel:
    return true
  default:
    return false
  }
}

And validate in LoadConfig (after viper.Unmarshal, before using Cfg):

// Default and validate publisher mode
if Cfg.Publisher.Mode == "" {
  Cfg.Publisher.Mode = PublisherModeDefault
}
if !Cfg.Publisher.Mode.IsValid() {
  return fmt.Errorf("invalid publisher.mode: %q (allowed: %q, %q)", Cfg.Publisher.Mode, PublisherModeDefault, PublisherModeParallel)
}
🤖 Prompt for AI Agents
In configs/config.go at line 175, replace the Mode field's type from string to a
new typed enum PublisherMode. Define the PublisherMode type and its allowed
constants (PublisherModeDefault and PublisherModeParallel) near the top-level
types, along with an IsValid() method to check validity. Then, in the LoadConfig
function, after unmarshaling the config, add validation to set a default mode if
empty and return an error if the mode is invalid, ensuring early detection of
misconfigurations.

Brokers string `mapstructure:"brokers"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
Expand Down
1 change: 1 addition & 0 deletions configs/test_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ api:

publisher:
enabled: false
mode: default

validation:
mode: minimal
Expand Down
241 changes: 190 additions & 51 deletions internal/orchestrator/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"math/big"
"sort"
"sync"
"sync/atomic"
"time"

"github.com/rs/zerolog/log"
Expand All @@ -25,7 +27,8 @@ type Committer struct {
storage storage.IStorage
commitFromBlock *big.Int
rpc rpc.IRPCClient
lastCommittedBlock *big.Int
lastCommittedBlock atomic.Uint64
lastPublishedBlock atomic.Uint64
publisher *publisher.Publisher
workMode WorkMode
workModeChan chan WorkMode
Expand Down Expand Up @@ -58,15 +61,17 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, opts ...Committe

commitFromBlock := big.NewInt(int64(config.Cfg.Committer.FromBlock))
committer := &Committer{
triggerIntervalMs: triggerInterval,
blocksPerCommit: blocksPerCommit,
storage: storage,
commitFromBlock: commitFromBlock,
rpc: rpc,
lastCommittedBlock: commitFromBlock,
publisher: publisher.GetInstance(),
workMode: "",
}
triggerIntervalMs: triggerInterval,
blocksPerCommit: blocksPerCommit,
storage: storage,
commitFromBlock: commitFromBlock,
rpc: rpc,
publisher: publisher.GetInstance(),
workMode: "",
}
cfb := commitFromBlock.Uint64()
committer.lastCommittedBlock.Store(cfb)
committer.lastPublishedBlock.Store(cfb)

for _, opt := range opts {
opt(committer)
Expand All @@ -79,15 +84,63 @@ func (c *Committer) Start(ctx context.Context) {
interval := time.Duration(c.triggerIntervalMs) * time.Millisecond

log.Debug().Msgf("Committer running")
chainID := c.rpc.GetChainID()

latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(chainID)
if err != nil {
// It's okay to fail silently here; this value is only used for staging cleanup and
// the worker loop will eventually correct the state and delete as needed.
log.Error().Msgf("Error getting latest committed block number: %v", err)
} else if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 {
c.lastCommittedBlock.Store(latestCommittedBlockNumber.Uint64())
}

lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID)
if err != nil {
// It's okay to fail silently here; it's only used for staging cleanup and will be
// corrected by the worker loop.
log.Error().Err(err).Msg("failed to get last published block number")
} else if lastPublished != nil && lastPublished.Sign() > 0 {
c.lastPublishedBlock.Store(lastPublished.Uint64())
} else {
c.lastPublishedBlock.Store(c.lastCommittedBlock.Load())
}

c.cleanupProcessedStagingBlocks()

if config.Cfg.Publisher.Mode == "parallel" {
var wg sync.WaitGroup
publishInterval := interval / 2
if publishInterval <= 0 {
publishInterval = interval
}
wg.Add(2)
go func() {
defer wg.Done()
c.runPublishLoop(ctx, publishInterval)
}()
// allow the publisher to start before the committer
time.Sleep(publishInterval)
go func() {
defer wg.Done()
c.runCommitLoop(ctx, interval)
}()
<-ctx.Done()
wg.Wait()
log.Info().Msg("Committer shutting down")
c.publisher.Close()
return
}

// Clean up staging data before starting the committer
c.cleanupStagingData()
c.runCommitLoop(ctx, interval)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be in the else if not parallel else it'll run twice

log.Info().Msg("Committer shutting down")
c.publisher.Close()
}

func (c *Committer) runCommitLoop(ctx context.Context, interval time.Duration) {
for {
select {
case <-ctx.Done():
log.Info().Msg("Committer shutting down")
c.publisher.Close()
return
case workMode := <-c.workModeChan:
if workMode != c.workMode && workMode != "" {
Expand Down Expand Up @@ -116,26 +169,46 @@ func (c *Committer) Start(ctx context.Context) {
}
}

func (c *Committer) cleanupStagingData() {
// Get the last committed block number from main storage
latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID())
if err != nil {
log.Error().Msgf("Error getting latest committed block number: %v", err)
return
func (c *Committer) runPublishLoop(ctx context.Context, interval time.Duration) {
for {
select {
case <-ctx.Done():
return
default:
time.Sleep(interval)
if c.workMode == "" {
log.Debug().Msg("Committer work mode not set, skipping publish")
continue
}
if err := c.publish(ctx); err != nil {
log.Error().Err(err).Msg("Error publishing blocks")
}
}
}
}

if latestCommittedBlockNumber.Sign() == 0 {
log.Debug().Msg("No blocks committed yet, skipping staging data cleanup")
func (c *Committer) cleanupProcessedStagingBlocks() {
committed := c.lastCommittedBlock.Load()
published := c.lastPublishedBlock.Load()
if published == 0 || committed == 0 {
return
}

// Delete all staging data older than the latest committed block number
if err := c.storage.StagingStorage.DeleteOlderThan(c.rpc.GetChainID(), latestCommittedBlockNumber); err != nil {
log.Error().Msgf("Error deleting staging data older than %v: %v", latestCommittedBlockNumber, err)
limit := committed
if published < limit {
limit = published
}
if limit == 0 {
return
}

log.Info().Msgf("Deleted staging data older than or equal to %v", latestCommittedBlockNumber)
chainID := c.rpc.GetChainID()
blockNumber := new(big.Int).SetUint64(limit)
stagingDeleteStart := time.Now()
if err := c.storage.StagingStorage.DeleteOlderThan(chainID, blockNumber); err != nil {
log.Error().Err(err).Msg("Failed to delete staging data")
return
}
log.Debug().Str("metric", "staging_delete_duration").Msgf("StagingStorage.DeleteOlderThan duration: %f", time.Since(stagingDeleteStart).Seconds())
metrics.StagingDeleteDuration.Observe(time.Since(stagingDeleteStart).Seconds())
}

func (c *Committer) getBlockNumbersToCommit(ctx context.Context) ([]*big.Int, error) {
Expand All @@ -155,8 +228,9 @@ func (c *Committer) getBlockNumbersToCommit(ctx context.Context) ([]*big.Int, er
// If no blocks have been committed yet, start from the fromBlock specified in the config
latestCommittedBlockNumber = new(big.Int).Sub(c.commitFromBlock, big.NewInt(1))
} else {
if latestCommittedBlockNumber.Cmp(c.lastCommittedBlock) < 0 {
log.Warn().Msgf("Max block in storage (%s) is less than last committed block in memory (%s).", latestCommittedBlockNumber.String(), c.lastCommittedBlock.String())
lastCommitted := new(big.Int).SetUint64(c.lastCommittedBlock.Load())
if latestCommittedBlockNumber.Cmp(lastCommitted) < 0 {
log.Warn().Msgf("Max block in storage (%s) is less than last committed block in memory (%s).", latestCommittedBlockNumber.String(), lastCommitted.String())
return []*big.Int{}, nil
}
}
Expand Down Expand Up @@ -293,13 +367,89 @@ func (c *Committer) getSequentialBlockDataToCommit(ctx context.Context) ([]commo
return sequentialBlockData, nil
}

func (c *Committer) getSequentialBlockDataToPublish(ctx context.Context) ([]common.BlockData, error) {
chainID := c.rpc.GetChainID()
lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID)
if err != nil {
return nil, fmt.Errorf("failed to get last published block number: %v", err)
}

startBlock := new(big.Int).Set(c.commitFromBlock)
if lastPublished != nil && lastPublished.Sign() > 0 {
startBlock = new(big.Int).Add(lastPublished, big.NewInt(1))
}

endBlock := new(big.Int).Add(startBlock, big.NewInt(int64(c.blocksPerCommit-1)))

blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{
ChainId: chainID,
StartBlock: startBlock,
EndBlock: endBlock,
})
if err != nil {
return nil, fmt.Errorf("error fetching blocks to publish: %v", err)
}
if len(blocksData) == 0 {
return nil, nil
}

sort.Slice(blocksData, func(i, j int) bool {
return blocksData[i].Block.Number.Cmp(blocksData[j].Block.Number) < 0
})
if blocksData[0].Block.Number.Cmp(startBlock) != 0 {
log.Debug().Msgf("First block to publish %s does not match expected %s", blocksData[0].Block.Number.String(), startBlock.String())
return nil, nil
}

sequential := []common.BlockData{blocksData[0]}
expected := new(big.Int).Add(blocksData[0].Block.Number, big.NewInt(1))
for i := 1; i < len(blocksData); i++ {
if blocksData[i].Block.Number.Cmp(blocksData[i-1].Block.Number) == 0 {
continue
}
if blocksData[i].Block.Number.Cmp(expected) != 0 {
break
}
sequential = append(sequential, blocksData[i])
expected.Add(expected, big.NewInt(1))
}

return sequential, nil
}

func (c *Committer) publish(ctx context.Context) error {
blockData, err := c.getSequentialBlockDataToPublish(ctx)
if err != nil {
return err
}
if len(blockData) == 0 {
return nil
}

if err := c.publisher.PublishBlockData(blockData); err != nil {
return err
}

chainID := c.rpc.GetChainID()
highest := blockData[len(blockData)-1].Block.Number
if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, highest); err != nil {
return err
}
c.lastPublishedBlock.Store(highest.Uint64())
go c.cleanupProcessedStagingBlocks()
return nil
}

func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) error {
blockNumbers := make([]*big.Int, len(blockData))
highestBlock := blockData[0].Block
for i, block := range blockData {
blockNumbers[i] = block.Block.Number
if block.Block.Number.Cmp(highestBlock.Number) > 0 {
highestBlock = block.Block
}
}
log.Debug().Msgf("Committing %d blocks", len(blockNumbers))

mainStorageStart := time.Now()
if err := c.storage.MainStorage.InsertBlockData(blockData); err != nil {
log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers)
Expand All @@ -308,31 +458,20 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er
log.Debug().Str("metric", "main_storage_insert_duration").Msgf("MainStorage.InsertBlockData duration: %f", time.Since(mainStorageStart).Seconds())
metrics.MainStorageInsertDuration.Observe(time.Since(mainStorageStart).Seconds())

go func() {
if err := c.publisher.PublishBlockData(blockData); err != nil {
log.Error().Err(err).Msg("Failed to publish block data to kafka")
}
}()

if c.workMode == WorkModeBackfill {
if config.Cfg.Publisher.Mode == "default" {
highest := highestBlock.Number.Uint64()
go func() {
stagingDeleteStart := time.Now()
if err := c.storage.StagingStorage.DeleteStagingData(blockData); err != nil {
log.Error().Err(err).Msg("Failed to delete staging data")
if err := c.publisher.PublishBlockData(blockData); err != nil {
log.Error().Err(err).Msg("Failed to publish block data to kafka")
return
}
log.Debug().Str("metric", "staging_delete_duration").Msgf("StagingStorage.DeleteStagingData duration: %f", time.Since(stagingDeleteStart).Seconds())
metrics.StagingDeleteDuration.Observe(time.Since(stagingDeleteStart).Seconds())
c.lastPublishedBlock.Store(highest)
c.cleanupProcessedStagingBlocks()
}()
Comment on lines +461 to 470
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

default publisher mode never persists the publish cursor

In “default” mode, the commit path publishes inside a goroutine and updates the in-memory lastPublishedBlock, but does not call SetLastPublishedBlockNumber.
After a restart the cursor falls back to 0/FromBlock, so already-published blocks are re-emitted.

Persist the highest block number after a successful publish when running in default mode:

+if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, highestBlock.Number); err != nil {
+    log.Error().Err(err).Msg("failed to persist last published block number")
+}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if config.Cfg.Publisher.Mode == "default" {
highest := highestBlock.Number.Uint64()
go func() {
stagingDeleteStart := time.Now()
if err := c.storage.StagingStorage.DeleteStagingData(blockData); err != nil {
log.Error().Err(err).Msg("Failed to delete staging data")
if err := c.publisher.PublishBlockData(blockData); err != nil {
log.Error().Err(err).Msg("Failed to publish block data to kafka")
return
}
log.Debug().Str("metric", "staging_delete_duration").Msgf("StagingStorage.DeleteStagingData duration: %f", time.Since(stagingDeleteStart).Seconds())
metrics.StagingDeleteDuration.Observe(time.Since(stagingDeleteStart).Seconds())
c.lastPublishedBlock.Store(highest)
c.cleanupProcessedStagingBlocks()
}()
if config.Cfg.Publisher.Mode == "default" {
highest := highestBlock.Number.Uint64()
go func() {
if err := c.publisher.PublishBlockData(blockData); err != nil {
log.Error().Err(err).Msg("Failed to publish block data to kafka")
return
}
c.lastPublishedBlock.Store(highest)
if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, highestBlock.Number); err != nil {
log.Error().Err(err).Msg("failed to persist last published block number")
}
c.cleanupProcessedStagingBlocks()
}()
🤖 Prompt for AI Agents
In internal/orchestrator/committer.go around lines 462 to 471, the code in
"default" publisher mode updates the in-memory lastPublishedBlock but does not
persist the publish cursor by calling SetLastPublishedBlockNumber. To fix this,
after a successful publish inside the goroutine, add a call to
SetLastPublishedBlockNumber with the highest block number to persist the cursor
and prevent re-emission of already-published blocks after a restart.

}

// Find highest block number from committed blocks
highestBlock := blockData[0].Block
for _, block := range blockData {
if block.Block.Number.Cmp(highestBlock.Number) > 0 {
highestBlock = block.Block
}
}
c.lastCommittedBlock = new(big.Int).Set(highestBlock.Number)
c.lastCommittedBlock.Store(highestBlock.Number.Uint64())
go c.cleanupProcessedStagingBlocks()

Comment on lines +473 to 475
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid duplicate cleanup calls in default mode

You run cleanup from the publish goroutine and again unconditionally after commit. In default mode this triggers twice; the outer one may run before lastPublishedBlock advances, doing extra work.

- c.lastCommittedBlock.Store(highestBlock.Number.Uint64())
- go c.cleanupProcessedStagingBlocks()
+ c.lastCommittedBlock.Store(highestBlock.Number.Uint64())
+ if config.Cfg.Publisher.Mode == "parallel" {
+   go c.cleanupProcessedStagingBlocks()
+ }

Also applies to: 469-469

🤖 Prompt for AI Agents
In internal/orchestrator/committer.go around lines 469 and 473 to 475, the
cleanupProcessedStagingBlocks function is called twice in default mode: once
from the publish goroutine and again unconditionally after commit, causing
redundant work. To fix this, remove the unconditional call to
cleanupProcessedStagingBlocks after commit and ensure it is only called once in
the appropriate place, such as within the publish goroutine, to avoid duplicate
cleanup executions.

// Update metrics for successful commits
metrics.SuccessfulCommits.Add(float64(len(blockData)))
Expand Down
Loading