Skip to content

Conversation

@jakeloo
Copy link
Member

@jakeloo jakeloo commented Aug 9, 2025

Summary

  • replace old publisher modes with default and new parallel mode
  • run commit and publish loops concurrently when in parallel mode and track last published blocks
  • only publish during commits when in default mode
  • initialize parallel publisher cursor from main storage to prevent re-publishing

Testing

  • go test ./... (command produced no output)

https://chatgpt.com/codex/tasks/task_b_6896dc37689c8332b734b4405875abf1

Summary by CodeRabbit

  • New Features
    • Introduced a configurable "publisher mode" with options for "default" (sequential) or "parallel" (concurrent commit and publish) operation, selectable via command-line flag or configuration file.
    • Added new configuration options and examples for publisher mode.
    • Added concurrent commit and publish processing to improve performance in parallel mode.
  • Bug Fixes
    • Improved handling and tracking of published block numbers in storage backends.
  • Tests
    • Added comprehensive tests for the new parallel publisher mode and related edge cases.
  • Chores
    • Extended storage interfaces and mocks to support new publisher tracking functionality.

@coderabbitai
Copy link

coderabbitai bot commented Aug 9, 2025

Walkthrough

A new "parallel" publisher mode is introduced, allowing commit and publish operations to run concurrently. This involves new configuration options, updates to storage interfaces and connectors to track the last published block, and significant changes to the committer's control flow. Tests and mocks are extended to cover the new mode and its edge cases, with supporting configuration and CLI flag changes.

Changes

Cohort / File(s) Change Summary
Committer: Parallel Publisher Mode Implementation
internal/orchestrator/committer.go
Introduces "parallel" publisher mode, refactors commit and publish logic into separate loops, adds atomic counters for last committed and published blocks, synchronizes state on startup, and adjusts commit and publish methods accordingly.
Committer Tests: Parallel Mode Coverage
internal/orchestrator/committer_test.go
Adds extensive tests for parallel publisher mode including commit, publish, cleanup behaviors, initialization edge cases, gap handling, and refactors existing tests to align with new logic.
Staging Storage Interface & Mocks
internal/storage/connector.go, test/mocks/MockIStagingStorage.go
Adds GetLastPublishedBlockNumber and SetLastPublishedBlockNumber methods to the staging storage interface and mock, with supporting expectation and helper methods for testing.
ClickHouse Storage Connector
internal/storage/clickhouse.go
Implements GetLastPublishedBlockNumber and SetLastPublishedBlockNumber methods for ClickHouse, querying and updating the publish cursor in the database; enhances GetStagingData to support block range queries.
Postgres Storage Connector
internal/storage/postgres.go
Implements GetLastPublishedBlockNumber and SetLastPublishedBlockNumber methods for Postgres, supporting retrieval and upsert of the publish cursor with conflict handling; enhances GetStagingData to support block range queries.
Configuration: Publisher Mode
cmd/root.go, configs/config.go, configs/config.example.yml, configs/test_config.yml
Adds "publisher-mode" CLI persistent flag and configuration option, updates the PublisherConfig struct, and documents the new option in example and test configuration files.
Postgres Connector Tests
internal/storage/postgres_connector_test.go
Adds test case for GetStagingData method filtering by block range in Postgres connector.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant CLI/Config
    participant Committer
    participant MainStorage
    participant StagingStorage

    User->>CLI/Config: Set publisher mode ("default" or "parallel")
    CLI/Config->>Committer: Initialize with publisher mode

    alt Parallel Mode
        Committer->>StagingStorage: initializeParallelPublisher()
        Committer->>Committer: Start runCommitLoop() in goroutine
        Committer->>Committer: Start runPublishLoop() in goroutine
        loop Commit Loop
            Committer->>MainStorage: Commit blocks
        end
        loop Publish Loop
            Committer->>StagingStorage: getSequentialBlockDataToPublish()
            Committer->>StagingStorage: publish()
            Committer->>StagingStorage: SetLastPublishedBlockNumber()
        end
    else Default Mode
        Committer->>Committer: Start runCommitLoop() only
        loop Commit Loop
            Committer->>MainStorage: Commit blocks
            Committer->>StagingStorage: publish (after commit)
            Committer->>StagingStorage: SetLastPublishedBlockNumber()
        end
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~40 minutes

Note

🔌 MCP (Model Context Protocol) integration is now available in Early Access!

Pro users can now connect to remote MCP servers under the Integrations page to get reviews and chat conversations that understand additional development context.


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e6da3c4 and e2681cc.

📒 Files selected for processing (2)
  • internal/storage/clickhouse.go (2 hunks)
  • internal/storage/postgres.go (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • internal/storage/clickhouse.go
  • internal/storage/postgres.go
✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch cz0ok3-codex/implement-publish-before-storage-mechanism

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai generate unit tests to generate unit tests for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🔭 Outside diff range comments (1)
internal/orchestrator/committer.go (1)

139-187: Potential data-race on Committer.workMode across commit & publish goroutines

runCommitLoop writes c.workMode, while runPublishLoop (and the main select logic) read it concurrently with no synchronisation. Under -race, this surfaces as a race; in production it can mis-order reads leading to skipped publish/commit iterations.

Guard the field with a sync.RWMutex, an atomic.Value, or fold the mode into the existing workModeChan fan-out so that only one goroutine owns the state.

🧹 Nitpick comments (2)
internal/storage/postgres.go (1)

347-364: Align ErrNoRows semantics with existing cursor helpers

GetLastPublishedBlockNumber silently maps sql.ErrNoRows0, whereas GetLastReorgCheckedBlockNumber (defined above) surfaces the error. Divergent behaviour for two cursor helpers that hit the same table is surprising and invites subtle bugs in callers that treat them interchangeably. Either bubble the error consistently or document the different contract explicitly.

internal/orchestrator/committer.go (1)

366-436: getSequentialBlockDataToPublish builds negative/oversize slices when blocksPerCommit ≤ 0

The loop derives blockCount := endBlock-startBlock + 1 directly from blocksPerCommit-1. If mis-configured (blocksPerCommit ≤ 0) blockCount becomes ≤ 0 and the subsequent make([]*big.Int, blockCount) panics.

Add a sanity check once during NewCommitter (if blocksPerCommit <= 0 { … }) or early-return error here.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7bfa6de and 22592de.

📒 Files selected for processing (10)
  • cmd/root.go (2 hunks)
  • configs/config.example.yml (1 hunks)
  • configs/config.go (1 hunks)
  • configs/test_config.yml (1 hunks)
  • internal/orchestrator/committer.go (6 hunks)
  • internal/orchestrator/committer_test.go (2 hunks)
  • internal/storage/clickhouse.go (1 hunks)
  • internal/storage/connector.go (1 hunks)
  • internal/storage/postgres.go (1 hunks)
  • test/mocks/MockIStagingStorage.go (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (2)
internal/orchestrator/committer.go (4)
configs/config.go (1)
  • Cfg (208-208)
internal/publisher/publisher.go (1)
  • Publisher (21-24)
internal/common/block.go (2)
  • BlockData (61-66)
  • Block (8-33)
internal/storage/connector.go (1)
  • QueryFilter (11-27)
internal/orchestrator/committer_test.go (5)
internal/common/block.go (2)
  • BlockData (61-66)
  • Block (8-33)
configs/config.go (2)
  • Cfg (208-208)
  • Config (194-206)
internal/storage/connector.go (1)
  • IStorage (67-71)
internal/orchestrator/committer.go (1)
  • NewCommitter (50-77)
internal/orchestrator/work_mode_monitor.go (1)
  • WorkModeLive (21-21)
🔇 Additional comments (9)
configs/test_config.yml (1)

67-67: Publisher mode default in test config — LGTM

Setting publisher.mode: default aligns with the new CLI flag and struct. No issues.

configs/config.example.yml (1)

193-197: Publisher mode docs and default — LGTM

Clear explanation of "default" vs "parallel" and sensible default set to "default".

cmd/root.go (1)

254-254: Bind of publisher.mode — LGTM

Binding publisher.mode to the new flag is correct and consistent with the struct tag.

internal/orchestrator/committer_test.go (4)

327-331: Setting ChainId in test block data is correct

Including ChainId on committed blocks improves realism and prevents multi-chain ambiguity in downstream calls. LGTM.


428-452: Initialize from main storage when last published is zero — LGTM

Correct behavior: seed last-published with main’s max to avoid re-publishing old blocks.


453-477: Seek ahead to main’s max when behind — LGTM

Brings publisher cursor up to date. Looks good.


478-503: Do not regress when publisher cursor is ahead — LGTM

Correctly avoids overwriting with a lower value.

test/mocks/MockIStagingStorage.go (2)

75-103: New mock: GetLastPublishedBlockNumber — LGTM

Method and typed helpers are consistent with mockery style and interface usage. Note: this will panic if called without an expectation; tests must stub it explicitly.

Ensure all tests that might trigger this call set expectations to avoid the panic from “no return value specified”.


133-178: New mock: SetLastPublishedBlockNumber — LGTM

Signature and helpers look correct; matches test usage with RunAndReturn.

rootCmd.PersistentFlags().Bool("api-contractApiRequest-disableCompression", false, "Disable compression for contract API request")
rootCmd.PersistentFlags().Int("api-contractApiRequest-timeout", 10, "Timeout in seconds for contract API request")
rootCmd.PersistentFlags().Bool("publisher-enabled", false, "Toggle publisher")
rootCmd.PersistentFlags().String("publisher-mode", "default", "Publisher mode: default or parallel")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Validate --publisher-mode against allowed values.

Add a fast-fail validation for default|parallel to avoid silent misconfigurations.

Example (place in init() after binds or in initConfig()):

mode := viper.GetString("publisher.mode")
if mode == "" {
  viper.Set("publisher.mode", "default")
} else if mode != "default" && mode != "parallel" {
  // Use your logger if preferred
  panic(fmt.Errorf("invalid --publisher-mode %q (allowed: default, parallel)", mode))
}
🤖 Prompt for AI Agents
In cmd/root.go at line 138, the --publisher-mode flag is defined but lacks
validation for allowed values. Add a validation step after flag binding or in
the initConfig() function to check if the value is either "default" or
"parallel". If the value is empty, set it to "default". If it is any other
value, immediately fail by panicking with a clear error message indicating the
invalid value and allowed options.


type PublisherConfig struct {
Enabled bool `mapstructure:"enabled"`
Mode string `mapstructure:"mode"`
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Harden publisher.mode with validation (and consider a typed enum).

Right now Mode accepts any string. Add early validation to prevent misconfigurations; optionally use a typed enum for clarity.

Apply this minimal diff to use a typed enum in the struct:

 type PublisherConfig struct {
   Enabled      bool                       `mapstructure:"enabled"`
-  Mode         string                     `mapstructure:"mode"`
+  Mode         PublisherMode              `mapstructure:"mode"`
   Brokers      string                     `mapstructure:"brokers"`
   Username     string                     `mapstructure:"username"`
   Password     string                     `mapstructure:"password"`
   Blocks       BlockPublisherConfig       `mapstructure:"blocks"`
   Transactions TransactionPublisherConfig `mapstructure:"transactions"`
   Traces       TracePublisherConfig       `mapstructure:"traces"`
   Events       EventPublisherConfig       `mapstructure:"events"`
 }

Add these supporting declarations (outside the selected range) near the top-level types:

// PublisherMode defines allowed publisher modes.
type PublisherMode string

const (
  PublisherModeDefault  PublisherMode = "default"
  PublisherModeParallel PublisherMode = "parallel"
)

func (m PublisherMode) IsValid() bool {
  switch m {
  case PublisherModeDefault, PublisherModeParallel:
    return true
  default:
    return false
  }
}

And validate in LoadConfig (after viper.Unmarshal, before using Cfg):

// Default and validate publisher mode
if Cfg.Publisher.Mode == "" {
  Cfg.Publisher.Mode = PublisherModeDefault
}
if !Cfg.Publisher.Mode.IsValid() {
  return fmt.Errorf("invalid publisher.mode: %q (allowed: %q, %q)", Cfg.Publisher.Mode, PublisherModeDefault, PublisherModeParallel)
}
🤖 Prompt for AI Agents
In configs/config.go at line 175, replace the Mode field's type from string to a
new typed enum PublisherMode. Define the PublisherMode type and its allowed
constants (PublisherModeDefault and PublisherModeParallel) near the top-level
types, along with an IsValid() method to check validity. Then, in the LoadConfig
function, after unmarshaling the config, add validation to set a default mode if
empty and return an error if the mode is invalid, ensuring early detection of
misconfigurations.

Comment on lines 385 to 427
func TestPublishParallelMode(t *testing.T) {
defer func() { config.Cfg = config.Config{} }()
config.Cfg.Publisher.Mode = "parallel"

mockRPC := mocks.NewMockIRPCClient(t)
mockMainStorage := mocks.NewMockIMainStorage(t)
mockStagingStorage := mocks.NewMockIStagingStorage(t)
mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t)
mockStorage := storage.IStorage{
MainStorage: mockMainStorage,
StagingStorage: mockStagingStorage,
OrchestratorStorage: mockOrchestratorStorage,
}
committer := NewCommitter(mockRPC, mockStorage)
committer.workMode = WorkModeLive

chainID := big.NewInt(1)
blockData := []common.BlockData{
{Block: common.Block{ChainId: chainID, Number: big.NewInt(101)}},
{Block: common.Block{ChainId: chainID, Number: big.NewInt(102)}},
}

publishDone := make(chan struct{})

mockRPC.EXPECT().GetChainID().Return(chainID)
mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(100), nil)
mockStagingStorage.EXPECT().GetStagingData(mock.Anything).Return(blockData, nil)
mockRPC.EXPECT().GetChainID().Return(chainID)
mockStagingStorage.EXPECT().SetLastPublishedBlockNumber(chainID, big.NewInt(102)).RunAndReturn(func(*big.Int, *big.Int) error {
close(publishDone)
return nil
})

err := committer.publish(context.Background())
assert.NoError(t, err)

select {
case <-publishDone:
case <-time.After(2 * time.Second):
t.Fatal("SetLastPublishedBlockNumber was not called")
}
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Tighten publish verification: validate QueryFilter and assert no commit-side mutations

Strengthen the test by (a) checking the QueryFilter passed to GetStagingData, and (b) asserting that no InsertBlockData/DeleteStagingData happens during publish.

Apply this diff:

-  mockStagingStorage.EXPECT().GetStagingData(mock.Anything).Return(blockData, nil)
+  mockStagingStorage.EXPECT().GetStagingData(mock.Anything).RunAndReturn(
+    func(qf storage.QueryFilter) ([]common.BlockData, error) {
+      assert.Equal(t, chainID, qf.ChainId)
+      // Optional: verify range starts from last+1 and ends at returned max
+      if assert.GreaterOrEqual(t, len(qf.BlockNumbers), 1) {
+        assert.Equal(t, big.NewInt(101), qf.BlockNumbers[0])
+        assert.Equal(t, big.NewInt(102), qf.BlockNumbers[len(qf.BlockNumbers)-1])
+      }
+      return blockData, nil
+    },
+  )

   mockRPC.EXPECT().GetChainID().Return(chainID)
   mockStagingStorage.EXPECT().SetLastPublishedBlockNumber(chainID, big.NewInt(102)).RunAndReturn(func(*big.Int, *big.Int) error {
     close(publishDone)
     return nil
   })

And add explicit not-called assertions after the publish completes:

   err := committer.publish(context.Background())
   assert.NoError(t, err)
@@
   case <-time.After(2 * time.Second):
     t.Fatal("SetLastPublishedBlockNumber was not called")
   }
+
+  // Publish must not write to main storage or delete staging data.
+  mockMainStorage.AssertNotCalled(t, "InsertBlockData", mock.Anything)
+  mockStagingStorage.AssertNotCalled(t, "DeleteStagingData", mock.Anything)
🤖 Prompt for AI Agents
In internal/orchestrator/committer_test.go between lines 385 and 427, enhance
the TestPublishParallelMode by verifying the QueryFilter argument passed to
mockStagingStorage.GetStagingData to ensure it matches expected criteria, and
add assertions after publish completes to confirm that InsertBlockData and
DeleteStagingData methods on mockStagingStorage are not called, thereby
tightening the test to validate correct behavior without unintended mutations.

Comment on lines +1078 to +1101
func (c *ClickHouseConnector) GetLastPublishedBlockNumber(chainId *big.Int) (*big.Int, error) {
query := fmt.Sprintf("SELECT cursor_value FROM %s.cursors FINAL WHERE cursor_type = 'publish'", c.cfg.Database)
if chainId.Sign() > 0 {
query += fmt.Sprintf(" AND chain_id = %s", chainId.String())
}
var blockNumberString string
err := c.conn.QueryRow(context.Background(), query).Scan(&blockNumberString)
if err != nil {
if err == sql.ErrNoRows {
return big.NewInt(0), nil
}
return nil, err
}
blockNumber, ok := new(big.Int).SetString(blockNumberString, 10)
if !ok {
return nil, fmt.Errorf("failed to parse block number: %s", blockNumberString)
}
return blockNumber, nil
}

func (c *ClickHouseConnector) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error {
query := fmt.Sprintf("INSERT INTO %s.cursors (chain_id, cursor_type, cursor_value) VALUES (%s, 'publish', '%s')", c.cfg.Database, chainId, blockNumber.String())
return c.conn.Exec(context.Background(), query)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Duplicate-row risk & non-deterministic reads

  1. SetLastPublishedBlockNumber performs a plain INSERT – no ON CONFLICT/ALTER equivalent.
    Repeated calls will accumulate multiple rows for the same (chain_id,'publish'), breaking uniqueness assumptions.

  2. GetLastPublishedBlockNumber selects cursor_value without ORDER BY / LIMIT 1. With duplicates, the driver will return whichever row ClickHouse happens to stream first, so the committer may read a stale or forward-skipped cursor and either re-publish or skip blocks.

  3. Because the query and insert embed chainId/blockNumber directly into the SQL string, the usual parameterisation benefits (server-side caching, quoting safety) are also lost.

Consider switching to an UPSERT-by-ReplacingMergeTree pattern (or ALTER TABLE … UPDATE) or store the cursor in a ReplacingMergeTree(primary key (chain_id,cursor_type), version updated_at) and ORDER BY updated_at DESC LIMIT 1 on read; alternatively perform a DELETE WHERE followed by single insert in one transaction.

🤖 Prompt for AI Agents
In internal/storage/clickhouse.go around lines 1078 to 1101, the
SetLastPublishedBlockNumber method uses a plain INSERT without handling
duplicates, which can cause multiple rows for the same (chain_id, 'publish')
key, and GetLastPublishedBlockNumber reads without ORDER BY or LIMIT, leading to
non-deterministic results. To fix this, refactor the storage to use a
ReplacingMergeTree table with primary key (chain_id, cursor_type) and a version
column like updated_at, then modify SetLastPublishedBlockNumber to perform an
UPSERT or an INSERT that replaces existing rows, and update
GetLastPublishedBlockNumber to include ORDER BY updated_at DESC LIMIT 1 to
ensure deterministic reads. Also, change both methods to use parameterized
queries instead of embedding chainId and blockNumber directly into the SQL
string to improve safety and performance.

Comment on lines +86 to +87
GetLastPublishedBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error)
SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Document semantics and ensure atomic, monotonic updates for publisher cursor.

Please add doc comments clarifying:

  • What is returned when no cursor exists (nil vs 0)?
  • Units (block number) and whether it’s inclusive.
  • Concurrency expectations: updates must be atomic and never decrease (monotonic).

This helps avoid re-publishing or regression under the new parallel mode.

Suggested inline comments:

 type IStagingStorage interface {
   InsertStagingData(data []common.BlockData) error
   GetStagingData(qf QueryFilter) (data []common.BlockData, err error)
   DeleteStagingData(data []common.BlockData) error
   GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (maxBlockNumber *big.Int, err error)
-  GetLastPublishedBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error)
-  SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error
+  // GetLastPublishedBlockNumber returns the highest published block for the chain.
+  // If no cursor exists, return (nil, nil) or (big.NewInt(0), nil) consistently across implementations (documented behavior).
+  GetLastPublishedBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error)
+  // SetLastPublishedBlockNumber atomically persists the last published block for the chain.
+  // Implementations must guarantee monotonic non-decreasing updates (no regression) under concurrent writers.
+  SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error
   DeleteOlderThan(chainId *big.Int, blockNumber *big.Int) error
 }

Run this script to verify all implementations and mocks are updated:


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "Searching for implementations of Get/SetLastPublishedBlockNumber..."
rg -n --glob '!**/vendor/**' 'func\s+\(.*\)\s+GetLastPublishedBlockNumber\(' -A 2 -B 2
rg -n --glob '!**/vendor/**' 'func\s+\(.*\)\s+SetLastPublishedBlockNumber\(' -A 2 -B 2

echo
echo "Checking ClickHouse and Postgres connectors contain implementations..."
rg -n --glob '!**/vendor/**' 'GetLastPublishedBlockNumber\(' | rg -n 'clickhouse|postgres' || true
rg -n --glob '!**/vendor/**' 'SetLastPublishedBlockNumber\(' | rg -n 'clickhouse|postgres' || true

echo
echo "Checking mocks implement new methods..."
rg -n --glob '!**/vendor/**' 'MockIStagingStorage.*GetLastPublishedBlockNumber|MockIStagingStorage.*SetLastPublishedBlockNumber' -A 3 -B 3

Length of output: 11031


Add documentation and enforce atomic, monotonic publisher‐cursor updates

The new interface methods and their implementations exist (including mocks), but we still need to:

• Document exact semantics in internal/storage/connector.go:
– What is returned when no cursor exists (nil vs zero).
– That the returned block number is inclusive.
– Concurrency expectations: readers safe; writers must never regress.

• Update the interface with doc comments, for example:

 type IStagingStorage interface {
   …  
-  GetLastPublishedBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error)
-  SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error
+  // GetLastPublishedBlockNumber returns the highest‐published block number for a chain.
+  // If no cursor exists, returns (nil, nil). Block numbers are inclusive.
+  GetLastPublishedBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error)
+
+  // SetLastPublishedBlockNumber atomically persists a non‐decreasing cursor value.
+  // Under concurrent writers, implementations must guarantee monotonic updates (never regress).
+  SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error
   DeleteOlderThan(chainId *big.Int, blockNumber *big.Int) error
 }

• Enforce monotonic upserts in each connector:
– PostgresConnector: switch to
INSERT INTO cursors (…) VALUES (…) ON CONFLICT(chain_id, cursor_type) DO UPDATE SET cursor_value = GREATEST(cursors.cursor_value, EXCLUDED.cursor_value)
– ClickHouseConnector: use an atomic upsert or materialized‐view approach that never writes a lower value.

Mocks already include the new methods. Once comments and upsert logic are in place, the publisher cursor will be well‐documented and safe under parallel execution.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
GetLastPublishedBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error)
SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error
type IStagingStorage interface {
// GetLastPublishedBlockNumber returns the highest-published block number for a chain.
// If no cursor exists, returns (nil, nil). Block numbers are inclusive.
GetLastPublishedBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error)
// SetLastPublishedBlockNumber atomically persists a non-decreasing cursor value.
// Under concurrent writers, implementations must guarantee monotonic updates (never regress).
SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error
DeleteOlderThan(chainId *big.Int, blockNumber *big.Int) error
}

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 22592de and 643b8bd.

📒 Files selected for processing (2)
  • internal/orchestrator/committer.go (8 hunks)
  • internal/orchestrator/committer_test.go (4 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
internal/orchestrator/committer_test.go (6)
internal/common/block.go (2)
  • BlockData (61-66)
  • Block (8-33)
configs/config.go (2)
  • Cfg (208-208)
  • Config (194-206)
internal/orchestrator/work_mode_monitor.go (2)
  • WorkModeLive (21-21)
  • WorkModeBackfill (22-22)
internal/storage/connector.go (1)
  • IStorage (67-71)
internal/orchestrator/committer.go (1)
  • NewCommitter (52-81)
internal/rpc/rpc.go (2)
  • BlocksPerRequestConfig (35-40)
  • GetFullBlockResult (18-22)
🪛 GitHub Actions: Go Code Format Check
internal/orchestrator/committer_test.go

[error] 1-1: gofmt formatting check failed. Run 'gofmt -w' to fix code style issues.

internal/orchestrator/committer.go

[error] 1-1: gofmt formatting check failed. Run 'gofmt -w' to fix code style issues.

🔇 Additional comments (3)
internal/orchestrator/committer.go (2)

194-216: Concurrent cleanup can double-fire – ensure idempotency

cleanupProcessedStagingBlocks may be invoked concurrently by both commit and publish goroutines.
If the implementation of DeleteOlderThan is not internally idempotent this could lead to overlapping deletes or lock contention.

Verify the storage driver guards against this, or add a lightweight mutex around the cleanup invocation.


1-1: Run gofmt – CI is red

The Go formatting check failed; run gofmt -w ./... before merging.

internal/orchestrator/committer_test.go (1)

1-1: Tests need gofmt as well

CI formatting job flagged this file. Execute gofmt -w internal/orchestrator/committer_test.go.

Comment on lines 104 to 112
lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID)
if err != nil {
log.Error().Err(err).Msg("failed to get last published block number")
} else if lastPublished != nil && lastPublished.Sign() > 0 {
c.lastPublishedBlock.Store(lastPublished.Uint64())
} else {
c.lastPublishedBlock.Store(c.lastCommittedBlock.Load())
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

lastPublishedBlock fallback is not persisted → publish loop can stall

When GetLastPublishedBlockNumber returns nil, we fall back to lastCommittedBlock but never write that value back to staging storage.
getSequentialBlockDataToPublish() will call GetLastPublishedBlockNumber again on every iteration, keep getting nil, and continuously query the [fromBlock …] range, effectively starving the publisher if the chain is already ahead of fromBlock.

if lastPublished == nil || lastPublished.Sign() == 0 {
-    c.lastPublishedBlock.Store(c.lastCommittedBlock.Load())
+    fallback := new(big.Int).SetUint64(c.lastCommittedBlock.Load())
+    // persist once so subsequent publish iterations make progress
+    _ = c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, fallback)
+    c.lastPublishedBlock.Store(fallback.Uint64())
}

Without this, the “parallel” mode may never publish after a fresh deployment.

🤖 Prompt for AI Agents
In internal/orchestrator/committer.go around lines 104 to 112, when
GetLastPublishedBlockNumber returns nil and the code falls back to
lastCommittedBlock, the fallback value is not persisted back to staging storage.
To fix this, after storing lastCommittedBlock in lastPublishedBlock, also write
this value back to staging storage to ensure subsequent calls to
GetLastPublishedBlockNumber return the updated block number and prevent the
publish loop from stalling.

Comment on lines 374 to 423
func (c *Committer) getSequentialBlockDataToPublish(ctx context.Context) ([]common.BlockData, error) {
chainID := c.rpc.GetChainID()
lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID)
if err != nil {
return nil, fmt.Errorf("failed to get last published block number: %v", err)
}

startBlock := new(big.Int).Set(c.commitFromBlock)
if lastPublished != nil && lastPublished.Sign() > 0 {
startBlock = new(big.Int).Add(lastPublished, big.NewInt(1))
}

endBlock := new(big.Int).Add(startBlock, big.NewInt(int64(c.blocksPerCommit-1)))
blockCount := new(big.Int).Sub(endBlock, startBlock).Int64() + 1
blockNumbers := make([]*big.Int, blockCount)
for i := int64(0); i < blockCount; i++ {
blockNumbers[i] = new(big.Int).Add(startBlock, big.NewInt(i))
}

blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{ChainId: chainID, BlockNumbers: blockNumbers})
if err != nil {
return nil, fmt.Errorf("error fetching blocks to publish: %v", err)
}
if len(blocksData) == 0 {
return nil, nil
}

sort.Slice(blocksData, func(i, j int) bool {
return blocksData[i].Block.Number.Cmp(blocksData[j].Block.Number) < 0
})
if blocksData[0].Block.Number.Cmp(startBlock) != 0 {
log.Debug().Msgf("First block to publish %s does not match expected %s", blocksData[0].Block.Number.String(), startBlock.String())
return nil, nil
}

sequential := []common.BlockData{blocksData[0]}
expected := new(big.Int).Add(blocksData[0].Block.Number, big.NewInt(1))
for i := 1; i < len(blocksData); i++ {
if blocksData[i].Block.Number.Cmp(blocksData[i-1].Block.Number) == 0 {
continue
}
if blocksData[i].Block.Number.Cmp(expected) != 0 {
break
}
sequential = append(sequential, blocksData[i])
expected.Add(expected, big.NewInt(1))
}

return sequential, nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid DB round-trip on every publish iteration

getSequentialBlockDataToPublish re-queries GetLastPublishedBlockNumber for each loop, although the value is already maintained in c.lastPublishedBlock (atomic).
This adds an unnecessary hot-path DB hit and causes contention once the publisher is caught up.

Consider relying on the in-memory counter and persisting only after a successful publish:

-lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID)
-...
-if lastPublished != nil && lastPublished.Sign() > 0 {
-    startBlock = new(big.Int).Add(lastPublished, big.NewInt(1))
+lastPublished := new(big.Int).SetUint64(c.lastPublishedBlock.Load())
+if lastPublished.Sign() > 0 {
+    startBlock = new(big.Int).Add(lastPublished, big.NewInt(1))
 }

Still keep the initial sync in Start() to seed the counter from storage.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (c *Committer) getSequentialBlockDataToPublish(ctx context.Context) ([]common.BlockData, error) {
chainID := c.rpc.GetChainID()
lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID)
if err != nil {
return nil, fmt.Errorf("failed to get last published block number: %v", err)
}
startBlock := new(big.Int).Set(c.commitFromBlock)
if lastPublished != nil && lastPublished.Sign() > 0 {
startBlock = new(big.Int).Add(lastPublished, big.NewInt(1))
}
endBlock := new(big.Int).Add(startBlock, big.NewInt(int64(c.blocksPerCommit-1)))
blockCount := new(big.Int).Sub(endBlock, startBlock).Int64() + 1
blockNumbers := make([]*big.Int, blockCount)
for i := int64(0); i < blockCount; i++ {
blockNumbers[i] = new(big.Int).Add(startBlock, big.NewInt(i))
}
blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{ChainId: chainID, BlockNumbers: blockNumbers})
if err != nil {
return nil, fmt.Errorf("error fetching blocks to publish: %v", err)
}
if len(blocksData) == 0 {
return nil, nil
}
sort.Slice(blocksData, func(i, j int) bool {
return blocksData[i].Block.Number.Cmp(blocksData[j].Block.Number) < 0
})
if blocksData[0].Block.Number.Cmp(startBlock) != 0 {
log.Debug().Msgf("First block to publish %s does not match expected %s", blocksData[0].Block.Number.String(), startBlock.String())
return nil, nil
}
sequential := []common.BlockData{blocksData[0]}
expected := new(big.Int).Add(blocksData[0].Block.Number, big.NewInt(1))
for i := 1; i < len(blocksData); i++ {
if blocksData[i].Block.Number.Cmp(blocksData[i-1].Block.Number) == 0 {
continue
}
if blocksData[i].Block.Number.Cmp(expected) != 0 {
break
}
sequential = append(sequential, blocksData[i])
expected.Add(expected, big.NewInt(1))
}
return sequential, nil
}
func (c *Committer) getSequentialBlockDataToPublish(ctx context.Context) ([]common.BlockData, error) {
chainID := c.rpc.GetChainID()
// Use in-memory counter instead of hitting the DB every iteration
lastPublished := new(big.Int).SetUint64(c.lastPublishedBlock.Load())
startBlock := new(big.Int).Set(c.commitFromBlock)
if lastPublished.Sign() > 0 {
startBlock = new(big.Int).Add(lastPublished, big.NewInt(1))
}
endBlock := new(big.Int).Add(startBlock, big.NewInt(int64(c.blocksPerCommit-1)))
blockCount := new(big.Int).Sub(endBlock, startBlock).Int64() + 1
blockNumbers := make([]*big.Int, blockCount)
for i := int64(0); i < blockCount; i++ {
blockNumbers[i] = new(big.Int).Add(startBlock, big.NewInt(i))
}
blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{ChainId: chainID, BlockNumbers: blockNumbers})
if err != nil {
return nil, fmt.Errorf("error fetching blocks to publish: %v", err)
}
if len(blocksData) == 0 {
return nil, nil
}
sort.Slice(blocksData, func(i, j int) bool {
return blocksData[i].Block.Number.Cmp(blocksData[j].Block.Number) < 0
})
if blocksData[0].Block.Number.Cmp(startBlock) != 0 {
log.Debug().Msgf(
"First block to publish %s does not match expected %s",
blocksData[0].Block.Number.String(),
startBlock.String(),
)
return nil, nil
}
sequential := []common.BlockData{blocksData[0]}
expected := new(big.Int).Add(blocksData[0].Block.Number, big.NewInt(1))
for i := 1; i < len(blocksData); i++ {
if blocksData[i].Block.Number.Cmp(blocksData[i-1].Block.Number) == 0 {
continue
}
if blocksData[i].Block.Number.Cmp(expected) != 0 {
break
}
sequential = append(sequential, blocksData[i])
expected.Add(expected, big.NewInt(1))
}
return sequential, nil
}
🤖 Prompt for AI Agents
In internal/orchestrator/committer.go lines 374 to 423, the method
getSequentialBlockDataToPublish currently fetches the last published block
number from the database on every call, causing unnecessary DB hits and
contention. To fix this, modify the method to use the in-memory atomic counter
c.lastPublishedBlock instead of querying
storage.StagingStorage.GetLastPublishedBlockNumber each time. Ensure that
c.lastPublishedBlock is properly initialized once during the Start() method from
storage, and update it only after a successful publish, avoiding repeated DB
round-trips in the hot path.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
internal/orchestrator/committer.go (2)

98-107: Persist the fallback lastPublishedBlock to storage

When GetLastPublishedBlockNumber returns nil, the fallback value is stored only in memory.
Without persisting it to StagingStorage.SetLastPublishedBlockNumber, every publish loop re-queries the DB, always gets nil, and repeatedly republishes the same range, potentially stalling progress.
Persist once after the fallback assignment to unblock the loop.


370-385: Hot-path DB hit in getSequentialBlockDataToPublish

The function re-fetches lastPublishedBlock from storage on every call even though the value is already tracked atomically in c.lastPublishedBlock.
This adds an avoidable round-trip and becomes a bottleneck once the publisher is near the chain tip.
Use the in-memory counter and persist only after a successful publish.

🧹 Nitpick comments (1)
internal/orchestrator/committer_test.go (1)

347-351: Replace fixed time.Sleep with assert.Eventually to reduce flakiness

Using a hard-coded 2 s sleep can make the test brittle on slow CI runners.
Leverage assert.Eventually (or a wait-group) so the test polls until the expectation is met within a timeout, eliminating unnecessary delays and race conditions.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 643b8bd and bb0a3fe.

📒 Files selected for processing (2)
  • internal/orchestrator/committer.go (8 hunks)
  • internal/orchestrator/committer_test.go (4 hunks)

Comment on lines +462 to 471
if config.Cfg.Publisher.Mode == "default" {
highest := highestBlock.Number.Uint64()
go func() {
stagingDeleteStart := time.Now()
if err := c.storage.StagingStorage.DeleteStagingData(blockData); err != nil {
log.Error().Err(err).Msg("Failed to delete staging data")
if err := c.publisher.PublishBlockData(blockData); err != nil {
log.Error().Err(err).Msg("Failed to publish block data to kafka")
return
}
log.Debug().Str("metric", "staging_delete_duration").Msgf("StagingStorage.DeleteStagingData duration: %f", time.Since(stagingDeleteStart).Seconds())
metrics.StagingDeleteDuration.Observe(time.Since(stagingDeleteStart).Seconds())
c.lastPublishedBlock.Store(highest)
c.cleanupProcessedStagingBlocks()
}()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

default publisher mode never persists the publish cursor

In “default” mode, the commit path publishes inside a goroutine and updates the in-memory lastPublishedBlock, but does not call SetLastPublishedBlockNumber.
After a restart the cursor falls back to 0/FromBlock, so already-published blocks are re-emitted.

Persist the highest block number after a successful publish when running in default mode:

+if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, highestBlock.Number); err != nil {
+    log.Error().Err(err).Msg("failed to persist last published block number")
+}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if config.Cfg.Publisher.Mode == "default" {
highest := highestBlock.Number.Uint64()
go func() {
stagingDeleteStart := time.Now()
if err := c.storage.StagingStorage.DeleteStagingData(blockData); err != nil {
log.Error().Err(err).Msg("Failed to delete staging data")
if err := c.publisher.PublishBlockData(blockData); err != nil {
log.Error().Err(err).Msg("Failed to publish block data to kafka")
return
}
log.Debug().Str("metric", "staging_delete_duration").Msgf("StagingStorage.DeleteStagingData duration: %f", time.Since(stagingDeleteStart).Seconds())
metrics.StagingDeleteDuration.Observe(time.Since(stagingDeleteStart).Seconds())
c.lastPublishedBlock.Store(highest)
c.cleanupProcessedStagingBlocks()
}()
if config.Cfg.Publisher.Mode == "default" {
highest := highestBlock.Number.Uint64()
go func() {
if err := c.publisher.PublishBlockData(blockData); err != nil {
log.Error().Err(err).Msg("Failed to publish block data to kafka")
return
}
c.lastPublishedBlock.Store(highest)
if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, highestBlock.Number); err != nil {
log.Error().Err(err).Msg("failed to persist last published block number")
}
c.cleanupProcessedStagingBlocks()
}()
🤖 Prompt for AI Agents
In internal/orchestrator/committer.go around lines 462 to 471, the code in
"default" publisher mode updates the in-memory lastPublishedBlock but does not
persist the publish cursor by calling SetLastPublishedBlockNumber. To fix this,
after a successful publish inside the goroutine, add a call to
SetLastPublishedBlockNumber with the highest block number to persist the cursor
and prevent re-emission of already-published blocks after a restart.

return
}

c.runCommitLoop(ctx, interval)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be in the else if not parallel else it'll run twice

blockNumbers[i] = new(big.Int).Add(startBlock, big.NewInt(i))
}

blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{ChainId: chainID, BlockNumbers: blockNumbers})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getting by block range would be better than passing all block numbers since this is doing the same thing

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🔭 Outside diff range comments (2)
internal/orchestrator/committer.go (2)

146-149: Data race on c.workMode across goroutines

c.workMode is written in runCommitLoop and read in both loops without synchronization → data race.

Minimal fix with RWMutex:

 type Committer struct {
   ...
-  workMode           WorkMode
+  workMode           WorkMode
+  workModeMu         sync.RWMutex
   workModeChan       chan WorkMode
   ...
 }

Write side:

- if workMode != c.workMode && workMode != "" {
-   log.Info().Msgf("Committer work mode changing from %s to %s", c.workMode, workMode)
-   c.workMode = workMode
- }
+ c.workModeMu.Lock()
+ if workMode != "" && workMode != c.workMode {
+   log.Info().Msgf("Committer work mode changing from %s to %s", c.workMode, workMode)
+   c.workMode = workMode
+ }
+ c.workModeMu.Unlock()

Read side:

- if c.workMode == "" {
+ c.workModeMu.RLock()
+ wm := c.workMode
+ c.workModeMu.RUnlock()
+ if wm == "" {
   ...
 }

And in runPublishLoop:

- if c.workMode == "" {
+ c.workModeMu.RLock()
+ wm := c.workMode
+ c.workModeMu.RUnlock()
+ if wm == "" {
   ...
 }

Also applies to: 152-156, 179-181


221-224: Move error check before logging to prevent nil pointer deref

The call to GetMaxBlockNumber can return (nil, err) (on scan failures), so calling .String() before checking err (or guarding against a nil *big.Int) may panic. Update the logic to handle errors and nil results first, then log.

• File: internal/orchestrator/committer.go (lines ~221–224)
• Move the if err != nil immediately after the storage call
• Default latestCommittedBlockNumber to zero if it’s nil
• Perform log.Debug() only after those checks

--- a/internal/orchestrator/committer.go
+++ b/internal/orchestrator/committer.go
@@ -221,9 +221,14 @@ func (c *Committer) commit(...) (...){
-   latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID())
-   log.Debug().Msgf("Committer found this max block number in main storage: %s", latestCommittedBlockNumber.String())
-   if err != nil {
+   latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID())
+   if err != nil {
+       return nil, err
+   }
+   if latestCommittedBlockNumber == nil {
+       latestCommittedBlockNumber = big.NewInt(0)
+   }
+   log.Debug().Msgf("Committer found this max block number in main storage: %s", latestCommittedBlockNumber.String())
♻️ Duplicate comments (4)
internal/orchestrator/committer.go (4)

278-283: Use range-based staging query in backfill to reduce payload and DB overhead

For contiguous blocksToCommit, querying by StartBlock/EndBlock is more efficient than enumerating all BlockNumbers.

- blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{BlockNumbers: blockNumbers, ChainId: c.rpc.GetChainID()})
+ blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{
+   ChainId:    c.rpc.GetChainID(),
+   StartBlock: blockNumbers[0],
+   EndBlock:   blockNumbers[len(blockNumbers)-1],
+ })

98-107: Persist fallback when lastPublished is missing to avoid publish stall

When GetLastPublishedBlockNumber returns nil/zero, you set lastPublishedBlock from lastCommittedBlock but don’t persist it. This can stall the publish loop on fresh deployments.

Apply:

- } else {
-   c.lastPublishedBlock.Store(c.lastCommittedBlock.Load())
- }
+ } else {
+   fallback := new(big.Int).SetUint64(c.lastCommittedBlock.Load())
+   if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, fallback); err != nil {
+     log.Error().Err(err).Msg("failed to persist fallback last published block number")
+   }
+   c.lastPublishedBlock.Store(fallback.Uint64())
+ }

370-418: Avoid DB round-trip on every publish iteration; use in-memory cursor

getSequentialBlockDataToPublish hits storage for the last published block each time despite tracking it atomically. This is a hot path and adds avoidable latency/DB load.

Seed c.lastPublishedBlock once in Start() (already done), then:

- chainID := c.rpc.GetChainID()
- lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID)
- if err != nil {
-   return nil, fmt.Errorf("failed to get last published block number: %v", err)
- }
-
- startBlock := new(big.Int).Set(c.commitFromBlock)
- if lastPublished != nil && lastPublished.Sign() > 0 {
-   startBlock = new(big.Int).Add(lastPublished, big.NewInt(1))
- }
+ chainID := c.rpc.GetChainID()
+ startBlock := new(big.Int).SetUint64(c.lastPublishedBlock.Load())
+ if startBlock.Sign() == 0 {
+   startBlock = new(big.Int).Set(c.commitFromBlock)
+ } else {
+   startBlock = new(big.Int).Add(startBlock, big.NewInt(1))
+ }

461-471: Default mode: publish cursor is not persisted (re-publish on restart)

In default mode you update the in-memory lastPublishedBlock, but never call SetLastPublishedBlockNumber. After restart, the cursor resets and blocks are re-emitted.

Persist after successful publish:

- if config.Cfg.Publisher.Mode == "default" {
-   highest := highestBlock.Number.Uint64()
-   go func() {
+ if config.Cfg.Publisher.Mode == "default" {
+   highestU64 := highestBlock.Number.Uint64()
+   highestBN := new(big.Int).SetUint64(highestU64)
+   chainID := c.rpc.GetChainID()
+   go func() {
       if err := c.publisher.PublishBlockData(blockData); err != nil {
         log.Error().Err(err).Msg("Failed to publish block data to kafka")
         return
       }
-      c.lastPublishedBlock.Store(highest)
-      c.cleanupProcessedStagingBlocks()
+      if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, highestBN); err != nil {
+        log.Error().Err(err).Msg("failed to persist last published block number")
+      }
+      c.lastPublishedBlock.Store(highestU64)
+      c.cleanupProcessedStagingBlocks()
     }()
   }
🧹 Nitpick comments (1)
internal/storage/postgres_connector_test.go (1)

170-179: Strengthen range-query assertions and make failure fatal

Good addition covering StartBlock/EndBlock. Tighten the test to catch ordering/inclusivity bugs and fail fast:

  • Use require.NoError to stop on error.
  • Assert the exact block numbers and order.
  • Consider adding a single-block range case for boundary inclusivity.

Example tweak:

- retrievedDataRange, err := conn.GetStagingData(rangeQf)
- assert.NoError(t, err)
- assert.Len(t, retrievedDataRange, 2)
+ retrievedDataRange, err := conn.GetStagingData(rangeQf)
+ require.NoError(t, err)
+ require.Len(t, retrievedDataRange, 2)
+ assert.Equal(t, big.NewInt(100), retrievedDataRange[0].Block.Number)
+ assert.Equal(t, big.NewInt(101), retrievedDataRange[1].Block.Number)
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bb0a3fe and e6da3c4.

📒 Files selected for processing (5)
  • internal/orchestrator/committer.go (8 hunks)
  • internal/storage/clickhouse.go (2 hunks)
  • internal/storage/connector.go (2 hunks)
  • internal/storage/postgres.go (2 hunks)
  • internal/storage/postgres_connector_test.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • internal/storage/connector.go
  • internal/storage/postgres.go
  • internal/storage/clickhouse.go

Comment on lines +473 to 475
c.lastCommittedBlock.Store(highestBlock.Number.Uint64())
go c.cleanupProcessedStagingBlocks()

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid duplicate cleanup calls in default mode

You run cleanup from the publish goroutine and again unconditionally after commit. In default mode this triggers twice; the outer one may run before lastPublishedBlock advances, doing extra work.

- c.lastCommittedBlock.Store(highestBlock.Number.Uint64())
- go c.cleanupProcessedStagingBlocks()
+ c.lastCommittedBlock.Store(highestBlock.Number.Uint64())
+ if config.Cfg.Publisher.Mode == "parallel" {
+   go c.cleanupProcessedStagingBlocks()
+ }

Also applies to: 469-469

🤖 Prompt for AI Agents
In internal/orchestrator/committer.go around lines 469 and 473 to 475, the
cleanupProcessedStagingBlocks function is called twice in default mode: once
from the publish goroutine and again unconditionally after commit, causing
redundant work. To fix this, remove the unconditional call to
cleanupProcessedStagingBlocks after commit and ensure it is only called once in
the appropriate place, such as within the publish goroutine, to avoid duplicate
cleanup executions.

@nischitpra nischitpra merged commit 13db8af into main Aug 10, 2025
5 checks passed
@nischitpra nischitpra deleted the cz0ok3-codex/implement-publish-before-storage-mechanism branch August 10, 2025 14:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants