Skip to content

refactor(workflow-engine): optimize loop history pruning#4370

Merged
NathanFlurry merged 3 commits intomainfrom
queue-next-timeout-fix
Mar 8, 2026
Merged

refactor(workflow-engine): optimize loop history pruning#4370
NathanFlurry merged 3 commits intomainfrom
queue-next-timeout-fix

Conversation

@NathanFlurry
Copy link
Member

@NathanFlurry NathanFlurry commented Mar 6, 2026

Description

This change consolidates loop history management by merging historyEvery/historyKeep config options into historyPruneInterval and historySize. It adds incremental compaction tracking via lastPrunedUpTo to avoid redundant deletion scans, uses range-based deletions (deleteRange) instead of per-iteration prefix deletes, and defers checkpoint flushes to run in parallel with user code for performance.

Known issue: queue message replay bug (NOT fixed in this PR)

A failing test (should not replay the previous loop message after queue.next timeout) documents a bug where queue.nextBatch with a timeout inside a loop replays previously consumed messages. This test is intentionally failing to demonstrate the issue for a future fix.

Type of change

  • Refactor (consolidate loop config API, optimize pruning)
  • New feature (range-based deletions, incremental compaction, deferred flushes)
  • This change requires a documentation update

How Has This Been Tested?

Added new test cases covering:

  • Incremental compaction efficiency (no re-deletion)
  • Compaction on break without reaching checkpoint interval
  • Crash recovery with deferred flushes
  • Loops breaking before first checkpoint
  • Extreme case with historyPruneInterval=1
  • deleteRange half-open range semantics
  • Queue message timeout replay bug in loops (currently failing, documents known bug)

Checklist:

  • Code follows project style guidelines
  • Self-review of code completed
  • Documentation updated to reflect API changes
  • Added comprehensive test coverage
  • All tests pass (2 tests intentionally failing to document queue replay bug)

@railway-app railway-app bot temporarily deployed to rivet-frontend / rivet-pr-4370 March 6, 2026 08:29 Destroyed
@railway-app
Copy link

railway-app bot commented Mar 6, 2026

🚅 Deployed to the rivet-pr-4370 environment in rivet-frontend

Service Status Web Updated (UTC)
frontend-inspector 🕒 Building (View Logs) Web Mar 8, 2026 at 4:02 am
frontend-cloud 🕒 Building (View Logs) Web Mar 8, 2026 at 4:02 am
website 😴 Sleeping (View Logs) Web Mar 6, 2026 at 8:41 am
mcp-hub ✅ Success (View Logs) Web Mar 6, 2026 at 8:31 am
ladle ❌ Build Failed (View Logs) Web Mar 6, 2026 at 8:30 am

@pkg-pr-new
Copy link

pkg-pr-new bot commented Mar 6, 2026

More templates

@rivetkit/cloudflare-workers

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/cloudflare-workers@4370

@rivetkit/framework-base

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/framework-base@4370

@rivetkit/next-js

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/next-js@4370

@rivetkit/react

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/react@4370

rivetkit

pnpm add https://pkg.pr.new/rivet-dev/rivet/rivetkit@4370

@rivetkit/sql-loader

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/sql-loader@4370

@rivetkit/sqlite-vfs

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/sqlite-vfs@4370

@rivetkit/traces

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/traces@4370

@rivetkit/workflow-engine

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/workflow-engine@4370

@rivetkit/virtual-websocket

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/virtual-websocket@4370

@rivetkit/engine-runner

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/engine-runner@4370

@rivetkit/engine-runner-protocol

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/engine-runner-protocol@4370

commit: af2876a

@claude
Copy link

claude bot commented Mar 6, 2026

Code Review

Overall this is a solid refactor. Collapsing three config knobs (commitInterval/historyEvery/historyKeep) into two (historyPruneInterval/historySize) is a cleaner API, the incremental compaction tracking avoids O(N) rescans on every checkpoint, and range-based deletions reduce driver round-trips. A few things to address before merge.


Breaking change not labeled

commitInterval, historyEvery, and historyKeep are removed with no deprecation shim. TypeScript will catch call-sites at compile time, but the PR does not check the Breaking change box and the description does not mention semver implications. Please mark it and call it out in the changelog.


Deferred flush does not run in parallel with user code

The PR description says defers checkpoint flushes to run in parallel with user code for performance, and the inline comment repeats this. In practice the flush is awaited at the very top of the next iteration, before config.run is called, so there is no overlap with user code. The actual benefit is that the IO begins slightly earlier (right after the prune decision rather than at the next-iteration boundary), which can reduce the perceived latency if the IO completes before the await is reached. That is a legitimate micro-optimization, but the comments should reflect what actually happens so future readers are not confused.


Missing input validation for historyPruneInterval

The removed code had a TODO comment noting that commitInterval must be greater than 0. That TODO was dropped without adding validation. With historyPruneInterval = 0, iteration modulo 0 evaluates to NaN in JavaScript, so the periodic prune branch never fires and history grows without bound. Add a guard at loop entry: if (historyPruneInterval <= 0) throw new Error(historyPruneInterval must be greater than 0)


deleteRange is a breaking addition to EngineDriver

Adding a required method to a public interface is a breaking change for external implementors. Only InMemoryDriver (testing) and ActorWorkflowDriver (rivetkit) are updated in this PR. If any other concrete implementations exist they will fail at compile time. Consider whether deleteRange should have a default implementation (e.g. a fallback loop of delete calls) so existing implementors do not break immediately.


Non-null assertion in refactored deleteEntriesWithPrefix

storage.ts uses deletions.prefixes[0]! after the refactor. collectDeletionsForPrefix always pushes exactly one element, so this is safe today, but the assertion is fragile. Either destructure directly or add a runtime check to make the invariant explicit.


Minor: weak assertion in historyPruneInterval=1 test

tests/loops.test.ts (should handle historyPruneInterval of 1) uses expect(minIteration).toBeGreaterThanOrEqual(2). With historyPruneInterval=1 and historySize=1, after the final break-path prune iteration 2 is deleted and only iteration 3 remains, so the lower bound should be >= 3. Using >= 2 makes the test pass even if one extra iteration leaks through.


Minor: test comment calls break iteration 9 but variable is 8

In the should-not-re-delete-already-pruned-iterations test, the comment says At break (iteration 9). The iteration variable when the break fires is 8; the value 9 comes from passing iteration + 1 into collectLoopPruning. The comment mixes the two meanings. Clarifying (e.g. at break, currentIteration passed as iteration+1=9) would help readers trace through the logic.


What looks good

  • Incremental compaction via lastPrunedUpTo correctly avoids re-scanning already-deleted ranges; the mock-spy test in loops.test.ts validates the call count precisely.
  • collectLoopPruning early-exit conditions (currentIteration <= historySize and fromIteration >= keepFrom) are correct and avoid unnecessary work.
  • deleteRange semantics (half-open [start, end)) are documented in driver.ts, implemented correctly in InMemoryDriver, and validated by the new KV test.
  • Crash-recovery test (should resume from saved state after crash in later iteration) gives good confidence that the deferred-flush model does not regress durability.
  • Queue replay bug documented with a clearly failing test and a note in the PR description is the right approach while the root cause is investigated.

Consolidate historyEvery and historyKeep config options into commitInterval to simplify the API. Add incremental compaction tracking via lastForgottenUpTo to avoid re-scanning already-forgotten iterations. Defer checkpoint flushes to run in parallel with the next iteration's user code. Introduce PendingDeletions interface for collecting deletions to execute alongside checkpoint writes. Expand test coverage for compaction behavior, crash recovery, and queue timeout edge cases. Update all documentation to reflect the simplified API.
…nterval and add historySize

Rename checkpointInterval to historyPruneInterval to clarify that it controls how often old iterations are pruned. Add separate historySize option to control how many iterations are retained, defaulting to historyPruneInterval. Rename internal method collectLoopCompaction to collectLoopPruning and update all terminology from compact/checkpoint to prune throughout comments, tests, and docs.
@NathanFlurry NathanFlurry changed the title fix: simplify loop history management and defer checkpoint flushes refactor(workflow-engine): optimize loop history pruning Mar 8, 2026
Replace per-iteration prefix deletes with a single deleteRange call
and track lastPrunedUpTo to avoid redundant re-scans. Add deleteRange
to EngineDriver interface with implementations in both test and
production drivers.
@NathanFlurry NathanFlurry force-pushed the queue-next-timeout-fix branch from 22226bf to a44899b Compare March 8, 2026 04:02
@railway-app railway-app bot temporarily deployed to rivet-frontend / rivet-pr-4370 March 8, 2026 04:02 Destroyed
@NathanFlurry NathanFlurry merged commit 7aa24c4 into main Mar 8, 2026
10 of 22 checks passed
@NathanFlurry NathanFlurry deleted the queue-next-timeout-fix branch March 8, 2026 04:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant