Skip to content

Release v0.2.0

Choose a tag to compare

@thanhtham010891 thanhtham010891 released this 30 May 19:02
· 9 commits to main since this release

Focus

Batch-native architecture. 0.2.0 introduces a new execution lane that allows
sources to emit native batches, middlewares to process batches, and sinks to
consume batches — eliminating the per-record overhead that dominated 0.1.x
throughput.

Performance

The Arrow fast path activates when ParquetSource(use_arrow_batches=True) is
paired with ParquetSink and no per-record middleware transforms the batch.
In this configuration, pa.RecordBatch objects flow from source to sink with
zero Python object allocation per row.

New: Batch-native execution lane

Protocols (src/agora/core/batch.py)

Three new opt-in protocols:

BatchableSource[T] — a source that can emit native batches via
stream_batches(). The runtime calls this instead of stream() when
supports_batch_emit = True. Each yielded value is a list[T] or a
pa.RecordBatch (for Arrow-native sources).

BatchMiddleware[T, U] — a middleware that processes a whole batch at
once via process_batch(records, ctx) -> list[U | None]. Return None at
position i to drop records[i]. Raise to fail the entire batch.

ArrowNativeSink — a sink that can consume pa.RecordBatch directly via
write_arrow_batch(batch). When both source and sink are Arrow-native and no
middleware transforms the batch, the runtime uses the shortest path with zero
row materialization.

Failure policy for BatchMiddleware (Option A): if process_batch() raises,
the entire batch is routed to the DLQ (if configured) or the run aborts. There
is no per-record fallback.

Runtime batch lane (_buffered.py)

New ExecutionCoordinator.run_batch_pipeline() method. Activated
automatically when is_batch_capable_source(source) returns True. The
existing per-record and buffered paths are unchanged.

Guarantees preserved in the batch lane:

  • Batches are committed in source order.
  • Checkpoint advances once per batch, after the batch is durably written.
  • Sink fail-closed and DLQ policies are honored per batch.
  • CancelledError aborts without committing the in-flight batch.

MiddlewareChain additions

  • has_batch_stages() — returns True if any middleware is a BatchMiddleware.
  • process_batch(records, ctx) — runs the batch through the chain. For
    BatchMiddleware stages: calls process_batch(). For regular Middleware
    stages: applies per-record within the batch loop.

Delivery layer additions

New DeliveryEngine.write_batch_result() — delivers a processed
batch to the sink and advances the checkpoint once. Handles batch failure
(Option A), sink errors, DLQ routing, and LOG_AND_CONTINUE policy.
(RecordDeliveryCoordinator remains as a deprecated alias for DeliveryEngine.)

ParquetSource — Arrow batch mode

ParquetSource gains a new use_arrow_batches=False constructor parameter.
When True:

  • supports_batch_emit is set to True.
  • stream_batches() yields pa.RecordBatch objects directly — no
    to_pylist() call, no Python dict allocation per row.
  • row_mapper is bypassed in batch mode.
  • Checkpoint still tracks row_number across batches for resume support.
  • Resume uses Arrow batch slicing instead of row-by-row skipping.

The default is False — existing pipelines using row_mapper are unaffected.

ParquetSink — Arrow batch mode

ParquetSink gains write_arrow_batch(batch: pa.RecordBatch) — writes an
Arrow batch directly via pa.Table.from_batches([batch]) without calling
row_mapper or _rows_to_columns(). The existing write() and
write_batch() paths are unchanged.

Breaking changes

AGORA_API_VERSION removed

The AGORA_API_VERSION alias (deprecated in 0.1.9) has been removed from
agora.core.registry. Use AGORA_PLUGIN_MANIFEST_VERSION instead.

# Before (0.1.9 — deprecated)
from agora.core.registry import AGORA_API_VERSION

# After (0.2.0)
from agora.core.registry import AGORA_PLUGIN_MANIFEST_VERSION

AGORA_PLUGIN_MANIFEST_VERSION bumped to "0.4"

The manifest schema version bumps from "0.3" to "0.4". Plugin packages
that declare MANIFEST.agora_api_version = "0.3" will be excluded from the
active registry and shown as incompatible in agora plugins list.

Update your plugin's MANIFEST:

MANIFEST = _Manifest(
    package="my-plugin",
    version="...",
    agora_api_version="0.4",  # was "0.3"
)

New public exports

Added to from agora import ...:

  • BatchableSource
  • BatchMiddleware
  • BatchProcessResult
  • BatchFailure
  • ArrowNativeSink
  • is_batch_capable_source
  • is_arrow_native_sink

Tests

New: batch pipeline unit tests

Added tests/core/test_batch_pipeline.py (13 tests):

  • is_batch_capable_source() detection
  • is_arrow_native_sink() detection
  • BatchMiddleware.process_batch() dispatch
  • Dropped records (None results)
  • Source order across batches
  • Checkpoint advances once per batch
  • Batch failure (Option A) → entire batch to DLQ
  • Batch failure without DLQ aborts run
  • No-middleware batch pipeline
  • max_records respected in batch mode

New: batch-path preservation tests

Added 4 batch-path variants to tests/preservation/test_runtime_guarantees.py:

  • Source order in batch mode
  • Checkpoint advances after each batch is durably written
  • Sink fail-closed in batch mode
  • DLQ routing on batch failure (Option A)

All 649 tests pass.

Packaging

  • Bumped agora-etl to 0.2.0.
  • Removed AGORA_API_VERSION alias.
  • Bumped AGORA_PLUGIN_MANIFEST_VERSION to "0.4".