Skip to content

feat(core): Implement batch aware partitioning#607

Merged
iambriccardo merged 18 commits intomainfrom
attempt-size-batching
Feb 20, 2026
Merged

feat(core): Implement batch aware partitioning#607
iambriccardo merged 18 commits intomainfrom
attempt-size-batching

Conversation

@iambriccardo
Copy link
Contributor

@iambriccardo iambriccardo commented Feb 19, 2026

This PR implements a new algorithm for sizing batches based on estimated rows size instead of just having a fixed amount. The algorithm computes for each stream, the batch size in bytes that considers how many active streams are there in the system at any point in time. This is done to make sure that each stream gets its share of bytes that can allocate. Since streams start/stop during the execution, the system updates its batch size every few ms to make sure it doesn't overprovision or underprovision.

The reason for why we have this algorithm together with the backpressure is that they are designed to work in tandem. If we had only the backpressure, the system would OOM immediately on certain tables because it accepts so many rows super fast and the memory monitor doesn't have time to react. Instead with this mechanism we make batching size aware, so that we try to keep all incoming streams at a certain memory ratio (20% is a good value from my empirical tests) and then in case memory grows on other areas of the system, backpressure will kick in nonetheless. The whole system needs some tweaking and prod data to validate how it works, but from my internal tests it performs well. My biggest gripe with the system is that we calculate the stream bytes but the memory usage as a consequence of the stream data could be more since we allocate more structures for processing a batch. Because of this, we should try to optimize memory usage when doing vector allocations for batches that can risk overloading memory temporarily.

The PR also cleans up some old dangling configs that I cleaned up as part of the BatchConfig change and improves some other internals related to watch channels.

@coderabbitai
Copy link

coderabbitai bot commented Feb 19, 2026

No actionable comments were generated in the recent review. 🎉


📝 Walkthrough

Summary by CodeRabbit

  • New Features

    • Memory-based batch sizing with a cached per-pipeline budget and configurable memory refresh interval.
    • CDC rows now include operation type and sequence number metadata.
    • New size-hinting for events and rows to enable byte-aware batching.
  • Configuration Changes

    • Replaced fixed max batch size with a memory budget ratio for dynamic batching.
    • Added memory_refresh_interval_ms setting and updated example/defaults.
  • Tests

    • Updated tests and helpers to use new row accessors and comparison helpers.

Walkthrough

This pull request introduces a memory-budget-based batching system to replace fixed-size batch limits. Key changes include: removing ApiBatchConfig and consolidating batch configuration, relocating memory_refresh_interval_ms to PipelineConfig, introducing a new BatchBudgetController for dynamic batch sizing based on memory budgets and active streams, adding a SizeHint trait for memory estimation, encapsulating TableRow.values with accessor methods, and making MemoryMonitor mandatory throughout the pipeline. The system now tracks per-stream byte budgets, cascades batch budget controllers through worker hierarchies, and estimates row sizes to inform batching decisions dynamically.

Sequence Diagram(s)

sequenceDiagram
    participant Pipeline as Pipeline
    participant MemoryMonitor as MemoryMonitor
    participant BatchBudgetController as BatchBudgetController
    participant CachedBatchBudget as CachedBatchBudget
    participant BatchBackpressureStream as BatchBackpressureStream
    participant Worker as Worker/ApplyLoop

    Pipeline->>MemoryMonitor: new(pipeline_id, shutdown_rx,<br/>memory_backpressure_config,<br/>memory_refresh_interval_ms)
    MemoryMonitor->>MemoryMonitor: initialize backpressure state (if configured)
    MemoryMonitor->>MemoryMonitor: spawn periodic refresh loop<br/>(memory_refresh_interval_ms)
    Pipeline->>BatchBudgetController: new(pipeline_id, memory_monitor,<br/>memory_budget_ratio)
    BatchBudgetController->>BatchBudgetController: track active streams count
    Pipeline->>Worker: start(batch_budget)
    Worker->>CachedBatchBudget: cached()
    BatchBudgetController-->>CachedBatchBudget: return cached view
    Worker->>BatchBackpressureStream: wrap(stream, batch_config,<br/>memory_subscription,<br/>cached_batch_budget)
    loop For each event
        BatchBackpressureStream->>BatchBackpressureStream: accumulate bytes via<br/>event.size_hint()
        alt bytes >= current_batch_size_bytes()
            BatchBackpressureStream->>BatchBackpressureStream: flush batch
            BatchBackpressureStream->>Worker: send batch
            BatchBackpressureStream->>BatchBackpressureStream: reset byte counter
        end
    end
    Worker->>BatchBudgetController: register_stream_load(1)
    BatchBudgetController-->>Worker: return ActiveStreamsGuard
    BatchBudgetController->>MemoryMonitor: poll total_memory_bytes()
    MemoryMonitor->>MemoryMonitor: periodic refresh updates<br/>total memory tracking
    note over BatchBudgetController: ideal_batch_size = total_memory * ratio / active_streams
Loading
sequenceDiagram
    participant TableRow as TableRow
    participant Cell as Cell
    participant SizeHint as SizeHint Trait

    TableRow->>TableRow: new(values: Vec<Cell>)
    TableRow->>TableRow: estimate_table_row_allocated_bytes()
    loop for each Cell
        TableRow->>Cell: estimate per-cell size
        Cell-->>TableRow: cell size estimate
    end
    TableRow->>TableRow: store size_hint_bytes
    TableRow-->>TableRow: return TableRow
    
    note over TableRow: Public API
    TableRow->>TableRow: values() -> &[Cell]
    TableRow->>TableRow: values_mut() -> &mut Vec<Cell>
    TableRow->>TableRow: into_values(self) -> Vec<Cell>
    
    note over TableRow: Size Accounting
    TableRow->>SizeHint: impl SizeHint for TableRow
    SizeHint->>TableRow: size_hint() -> stored<br/>size_hint_bytes
Loading

Comment @coderabbitai help to get the list of available commands and usage tips.

@iambriccardo iambriccardo changed the title attempt size batching feat(core): Implement batch aware partitioning Feb 19, 2026
@coveralls
Copy link

coveralls commented Feb 19, 2026

Coverage Status

coverage: 68.234% (-0.6%) from 68.806%
when pulling a8440c6 on attempt-size-batching
into 9586374 on main.

Copy link
Contributor

@bnjjj bnjjj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have benchmarks somewhere to double check the size computation doesn't impact performances ?

@iambriccardo
Copy link
Contributor Author

Do we have benchmarks somewhere to double check the size computation doesn't impact performances ?

This is a great point that I wanted to raise. We do have very old bechmarks and it's on my todo list to implement a ci script to detect performance regressions but we don't have that as of now.

@iambriccardo iambriccardo marked this pull request as ready for review February 20, 2026 12:08
@iambriccardo iambriccardo requested a review from a team as a code owner February 20, 2026 12:08
@iambriccardo
Copy link
Contributor Author

iambriccardo commented Feb 20, 2026

We discussed offline the performance problems with the new bytes estimation. It would be problematic for jsons (due to recursion), but we want to observe how it performs in prod before deciding to further optimize it.

@iambriccardo iambriccardo requested a review from bnjjj February 20, 2026 12:11
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@scripts/deploy-local-replicator-orbstack.sh`:
- Around line 16-19: The MEMORY_LIMIT default contains an invalid Kubernetes
unit ("300mi"); update the MEMORY_LIMIT assignment (variable MEMORY_LIMIT and
fallback MAC_MEMORY_LIMIT) to use the correct case-sensitive BinarySI unit
"300Mi" so the line reads with "300Mi" instead of "300mi" to prevent kubectl
apply failures.

@iambriccardo iambriccardo merged commit f8123eb into main Feb 20, 2026
13 checks passed
@iambriccardo iambriccardo deleted the attempt-size-batching branch February 20, 2026 12:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants