Skip to content

Conversation

@nischitpra
Copy link
Collaborator

@nischitpra nischitpra commented Sep 11, 2025

Summary by CodeRabbit

  • New Features

    • Added configurable S3 flush timeout; defaults to 5 minutes, and timeout behavior/messages now reflect this setting.
  • Bug Fixes

    • Migration range now uses the RPC-reported latest block for end-boundary decisions, preventing overshooting and correctly handling an unset end block.
    • Improved reliability with a fatal log if the latest block cannot be retrieved via RPC.

@zeet-co
Copy link

zeet-co bot commented Sep 11, 2025

insight-indexer - insight-8453-base

We're building your pull request over Zeet. We have provided links for Zeet apps linked to this github repo

Application Name Dashboard Link Preview Link
insight-8453-base-backfill-migration-5 Go to App in Zeet Console Preview App
insight-8453-base-backfill-migration-12 Go to App in Zeet Console Preview App
insight-8453-base-backfill-migration-9 Go to App in Zeet Console Preview App
insight-8453-base-backfill-migration-14 Go to App in Zeet Console Preview App
insight-8453-base-backfill-migration-1 Go to App in Zeet Console Preview App
insight-8453-base-backfill-migration-6 Go to App in Zeet Console Preview App
insight-8453-base-backfill-migration-4 Go to App in Zeet Console Preview App
insight-8453-base-backfill-migration-10 Go to App in Zeet Console Preview App
insight-8453-base-backfill-migration-15 Go to App in Zeet Console Preview App
insight-8453-base-backfill-migration-13 Go to App in Zeet Console Preview App
insight-8453-base-backfill-migration-17 Go to App in Zeet Console Preview App
insight-8453-base-backfill-migration-8 Go to App in Zeet Console Preview App
insight-8453-base-backfill-migration-11 Go to App in Zeet Console Preview App
insight-8453-base-backfill-migration-7 Go to App in Zeet Console Preview App
insight-8453-base-backfill-migration-3 Go to App in Zeet Console Preview App
insight-8453-base-backfill-migration-16 Go to App in Zeet Console Preview App
insight-8453-base-backfill-migration-2 Go to App in Zeet Console Preview App
insight-8453-base-backfill-migration-18 Go to App in Zeet Console Preview App

@coderabbitai
Copy link

coderabbitai bot commented Sep 11, 2025

Walkthrough

Introduces RPC-based latest-block retrieval for migration boundary calculations; adjusts end-block logic accordingly. Adds S3 storage configuration field for flush timeout. Updates S3 storage flush wait logic to use configurable timeout with a default applied when unset.

Changes

Cohort / File(s) Summary of edits
Migration boundary via RPC
cmd/migrate_valid.go
Adds latestBlockRPC fetch; updates DetermineMigrationBoundaries to base endBlock on RPC latest when targetEndBlock is 0 or within RPC range; logs fatal on RPC failure; retains existing log line using stored latest block.
S3 config: flush timeout
configs/config.go
Adds public field FlushTimeout int (tag mapstructure:"flushTimeoutSeconds") to S3StorageConfig; comment notes default behavior.
S3 flush behavior
internal/storage/s3.go
Applies default FlushTimeout=300s when unset; replaces hard-coded 60s waits with configurable timeout in flush wait paths; updates timeout messages to reflect configured value.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant CLI as CLI
  participant Migrator as Migrator
  participant RPC as RPC Node
  participant Store as DB/Store

  CLI->>Migrator: DetermineMigrationBoundaries(targetEndBlock)
  Migrator->>RPC: GetLatestBlock()
  alt RPC ok
    RPC-->>Migrator: latestBlockRPC
    Migrator->>Store: Read latestBlockStored
    alt targetEndBlock == 0
      Migrator-->>CLI: endBlock = latestBlockRPC
    else targetEndBlock > 0
      alt targetEndBlock <= latestBlockRPC
        Migrator-->>CLI: endBlock = targetEndBlock
      else targetEndBlock > latestBlockRPC
        Migrator-->>CLI: endBlock = latestBlockStored (prior logic)
      end
    end
  else RPC error
    Migrator-->>CLI: Fatal log and exit
  end
Loading
sequenceDiagram
  autonumber
  participant App as App
  participant S3 as S3Storage
  note over S3: On init: if cfg.FlushTimeout==0 → set 300s

  App->>S3: TriggerFlush()
  alt flush channel accepts
    S3->>S3: wait on s.flushDoneCh within FlushTimeout
    alt completed
      S3-->>App: ok
    else timeout
      S3-->>App: error (timeout with configured seconds)
    end
  else channel full
    S3->>S3: wait for completion within FlushTimeout
    alt completed
      S3-->>App: ok
    else timeout
      S3-->>App: error (timeout with configured seconds)
    end
  end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Pre-merge checks (2 passed, 1 warning)

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The PR title "migration boundry fix" directly references the migration-boundary logic change in cmd/migrate_valid.go so it is related to the changeset, but it contains a spelling error ("boundry" → "boundary") and does not reflect the added S3 FlushTimeout config/implementation changes, making it only partially descriptive.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch np/migration_boundry_fix

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (2)
configs/config.go (1)

96-96: Align comment with runtime default (doc-runtime mismatch).

The comment says default 60s, but S3 connector sets default to 300s. Align the docs to avoid confusion.

-    FlushTimeout     int   `mapstructure:"flushTimeoutSeconds"`  // Timeout in seconds for flush operations (default: 60)
+    FlushTimeout     int   `mapstructure:"flushTimeoutSeconds"`  // Timeout in seconds for flush operations (default: 300)

Optionally consider using time.Duration for timeout fields to make units explicit.

internal/storage/s3.go (1)

106-108: Default looks good; add basic bounds check.

If a negative value is configured, the timeout becomes negative. Guard it to fall back to default.

- if cfg.FlushTimeout == 0 {
+ if cfg.FlushTimeout <= 0 {
     cfg.FlushTimeout = 300 // 5 mins default
 }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between bd3eca1 and cccd5ca.

📒 Files selected for processing (3)
  • cmd/migrate_valid.go (1 hunks)
  • configs/config.go (1 hunks)
  • internal/storage/s3.go (2 hunks)

Comment on lines +351 to 356
latestBlockRPC, err := m.rpcClient.GetLatestBlockNumber(context.Background())
if err != nil {
log.Fatal().Err(err).Msg("Failed to get latest block from RPC")
}
log.Info().Msgf("Latest block in main storage: %d", latestBlockStored)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix log formatting for big.Int.

Using %d with *big.Int produces malformed output. Use %s or call .String().

- log.Info().Msgf("Latest block in main storage: %d", latestBlockStored)
+ log.Info().Msgf("Latest block in main storage: %s", latestBlockStored.String())
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
latestBlockRPC, err := m.rpcClient.GetLatestBlockNumber(context.Background())
if err != nil {
log.Fatal().Err(err).Msg("Failed to get latest block from RPC")
}
log.Info().Msgf("Latest block in main storage: %d", latestBlockStored)
latestBlockRPC, err := m.rpcClient.GetLatestBlockNumber(context.Background())
if err != nil {
log.Fatal().Err(err).Msg("Failed to get latest block from RPC")
}
log.Info().Msgf("Latest block in main storage: %s", latestBlockStored.String())
🤖 Prompt for AI Agents
In cmd/migrate_valid.go around lines 351 to 356, the log uses fmt %d with a
*big.Int which yields malformed output; change the log to use %s with
latestBlockStored.String() (or call .String() and pass that) so the big.Int is
formatted correctly in the log message.

Comment on lines +358 to 364
if targetEndBlock.Sign() > 0 && targetEndBlock.Cmp(latestBlockRPC) <= 0 {
endBlock = targetEndBlock
}
if targetEndBlock.Uint64() == 0 {
endBlock = latestBlockRPC
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

End-boundary can exceed RPC latest when targetEndBlock > RPC; also missing start<=end guard.

Ensure we never migrate beyond RPC capability and validate the range.

- endBlock := latestBlockStored
- if targetEndBlock.Sign() > 0 && targetEndBlock.Cmp(latestBlockRPC) <= 0 {
-   endBlock = targetEndBlock
- }
- if targetEndBlock.Uint64() == 0 {
-   endBlock = latestBlockRPC
- }
+ // Cap by RPC latest, optionally by user-specified target
+ endBlock := new(big.Int).Set(latestBlockRPC)
+ if targetEndBlock.Sign() > 0 {
+   if targetEndBlock.Cmp(latestBlockRPC) <= 0 {
+     endBlock = targetEndBlock
+   } else {
+     // Requested end is beyond RPC; cap at RPC latest
+     endBlock = latestBlockRPC
+   }
+ }
+
+ // Validate final range
+ if endBlock.Cmp(targetStartBlock) < 0 {
+   log.Fatal().Msgf("Invalid migration range: end block %s is less than start block %s", endBlock.String(), targetStartBlock.String())
+ }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if targetEndBlock.Sign() > 0 && targetEndBlock.Cmp(latestBlockRPC) <= 0 {
endBlock = targetEndBlock
}
if targetEndBlock.Uint64() == 0 {
endBlock = latestBlockRPC
}
// Cap by RPC latest, optionally by user-specified target
endBlock := new(big.Int).Set(latestBlockRPC)
if targetEndBlock.Sign() > 0 {
if targetEndBlock.Cmp(latestBlockRPC) <= 0 {
endBlock = targetEndBlock
} else {
// Requested end is beyond RPC; cap at RPC latest
endBlock = latestBlockRPC
}
}
// Validate final range
if endBlock.Cmp(targetStartBlock) < 0 {
log.Fatal().Msgf("Invalid migration range: end block %s is less than start block %s", endBlock.String(), targetStartBlock.String())
}
🤖 Prompt for AI Agents
In cmd/migrate_valid.go around lines 358 to 364, the current logic can set
endBlock past the RPC latest and lacks a start<=end guard; change it to: if
targetEndBlock is zero use latestBlockRPC, else if targetEndBlock >
latestBlockRPC set endBlock = latestBlockRPC (cap to RPC), otherwise set
endBlock = targetEndBlock; after computing endBlock validate that
startBlock.Cmp(endBlock) <= 0 and return an error (or exit) if startBlock >
endBlock to prevent an invalid range.

Comment on lines 312 to 327
select {
case <-s.flushDoneCh:
return nil
case <-time.After(60 * time.Second):
return fmt.Errorf("flush timeout after 60 seconds")
case <-time.After(time.Duration(s.config.FlushTimeout) * time.Second):
return fmt.Errorf("flush timeout after %d seconds", s.config.FlushTimeout)
}
default:
// Flush channel is full, likely a flush is already in progress
// Wait for it to complete
select {
case <-s.flushDoneCh:
return nil
case <-time.After(60 * time.Second):
return fmt.Errorf("flush timeout after 60 seconds")
case <-time.After(time.Duration(s.config.FlushTimeout) * time.Second):
return fmt.Errorf("flush timeout after %d seconds", s.config.FlushTimeout)
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Flush completion uses a shared unbuffered channel; concurrent Flush callers can time out or hang.

Only one waiter will receive from flushDoneCh per flush; others will block until another flush or hit timeout. Guard Flush with a mutex to serialize calls (minimal change), or switch to a broadcast/WaitGroup pattern.

Minimal mutex approach:

 type S3Connector struct {
@@
   flushDoneCh chan struct{} // Signals when flush is complete
   flushTimer  *time.Timer
   timerMu     sync.Mutex
+  flushMu     sync.Mutex
   lastAddTime time.Time

And at the beginning of Flush():

 func (s *S3Connector) Flush() error {
+  s.flushMu.Lock()
+  defer s.flushMu.Unlock()

Longer-term: replace flushDoneCh with a per-flush ack (e.g., send a chan struct{} via flushCh) or use a sync.Cond to broadcast completion to all waiters.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
select {
case <-s.flushDoneCh:
return nil
case <-time.After(60 * time.Second):
return fmt.Errorf("flush timeout after 60 seconds")
case <-time.After(time.Duration(s.config.FlushTimeout) * time.Second):
return fmt.Errorf("flush timeout after %d seconds", s.config.FlushTimeout)
}
default:
// Flush channel is full, likely a flush is already in progress
// Wait for it to complete
select {
case <-s.flushDoneCh:
return nil
case <-time.After(60 * time.Second):
return fmt.Errorf("flush timeout after 60 seconds")
case <-time.After(time.Duration(s.config.FlushTimeout) * time.Second):
return fmt.Errorf("flush timeout after %d seconds", s.config.FlushTimeout)
}
}
type S3Connector struct {
// ...
flushDoneCh chan struct{} // Signals when flush is complete
flushTimer *time.Timer
timerMu sync.Mutex
flushMu sync.Mutex
lastAddTime time.Time
}
func (s *S3Connector) Flush() error {
s.flushMu.Lock()
defer s.flushMu.Unlock()
select {
case <-s.flushDoneCh:
return nil
case <-time.After(time.Duration(s.config.FlushTimeout) * time.Second):
return fmt.Errorf("flush timeout after %d seconds", s.config.FlushTimeout)
}
default:
// Flush channel is full, likely a flush is already in progress
// Wait for it to complete
select {
case <-s.flushDoneCh:
return nil
case <-time.After(time.Duration(s.config.FlushTimeout) * time.Second):
return fmt.Errorf("flush timeout after %d seconds", s.config.FlushTimeout)
}
}
}

@nischitpra nischitpra merged commit efbdc81 into main Sep 11, 2025
5 of 6 checks passed
@nischitpra nischitpra deleted the np/migration_boundry_fix branch September 11, 2025 20:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants