Skip to content

Release v0.3.0

Choose a tag to compare

@thanhtham010891 thanhtham010891 released this 05 Jun 16:02
· 4 commits to main since this release

Focus

Process-isolated batch transforms for CPU-heavy or memory-isolated workloads,
with recovery semantics strong enough for long-running batch pipelines.


Highlights

Source-bound max_records limits

run(max_records=N) now applies a source wrapper before execution starts
instead of stopping later inside the runtime lanes.

This makes bounded runs behave the same across:

  • .build()
  • .fan_out()
  • .route()
  • .run()
  • .run_sync()

For batch and Arrow sources, the final emitted batch is now trimmed at the
source boundary instead of overshooting and stopping after a full batch has
already been processed.

Middleware data-plane validation is now explicit

Agora now validates source and middleware data planes during runtime planning.

Supported shapes:

  • Arrow-emitting source plus all-Arrow middleware chain
  • Arrow-emitting source plus all Python-row middleware chain
  • Python-row source plus all Python-row middleware chain

Rejected shape:

  • any chain that mixes Arrow middleware with Python-row/list-batch middleware

This now fails fast with a PipelineError instead of silently switching data
shape mid-chain.

Arrow fan-out now chooses the best path per sink

When the pipeline stays on the Arrow chain and the writer is a fan-out:

  • sinks with write_arrow_batch() receive the original RecordBatch
  • sinks without Arrow support receive list[dict] fallback only at the sink boundary

This keeps Arrow-native sinks fast without forcing every sink in the fan-out to
implement the same write contract.

Execution semantics now expose first-class data planes

Agora now resolves one shared vocabulary across source, middleware, writer, and
sink boundaries:

  • python_rows
  • python_batches
  • arrow_batches

The runtime plan now records:

  • the plane emitted by the source
  • the plane that actually entered the writer
  • which sinks had to downgrade at the writer/sink boundary

That contract is also introspectable from user code through
BaseSource.data_plane_spec(), BaseSource.emitted_data_plane, and
BaseSink.data_plane_spec().

The shared vocabulary is now part of the root public API too:

  • from agora import DataPlane
  • from agora import SourceDataPlaneSpec, SinkDataPlaneSpec

Legacy batch/Arrow bool flags are now deprecated

The older data-plane booleans still work in 0.3.x, but Agora now treats them
as compatibility shims instead of the primary extension contract.

Deprecated source flags:

  • supports_batch_emit
  • emits_arrow_batches

Deprecated sink flags:

  • batch_writable_native
  • arrow_passthrough_native

Deprecated capability fields:

  • SinkCapabilities.batch_writable_native
  • SinkCapabilities.arrow_passthrough_native

Preferred replacement:

  • sources override data_plane_spec() and return SourceDataPlaneSpec
  • sinks advertise accepted_data_planes / native_data_planes, or return
    explicit planes from sink_capabilities()

When Agora has to infer a non-row plane from the legacy booleans, it now emits
DeprecationWarning once per source/sink class. The compatibility bridge is
planned for removal in 0.4.0.

Built pipelines can now explain their planned execution shape

BoundPipeline.explain() now returns a PipelineExplain snapshot without
starting the source. This surfaces:

  • planned lane selection
  • source and writer data planes
  • middleware compatibility matrix
  • per-sink selected plane and downgrade markers

CsvSink now reports Arrow-native writes vs downgrade-to-rows

CsvSink.write_arrow_batch() still prefers pyarrow.csv.write_csv(), but it
now downgrades safely to Python-row CSV writing when Arrow CSV rejects the
batch, such as with nested types or invalid UTF-8 payloads.

The run summary and runtime metrics now expose:

  • csv_arrow_native_batch_count
  • csv_arrow_native_row_count
  • csv_arrow_downgrade_batch_count
  • csv_arrow_downgrade_row_count

This makes it much easier to distinguish:

  • Arrow lane selected by the planner
  • Arrow path active in the runtime
  • CSV sink actually staying native for the full run

ProcessBatchMiddleware for process-isolated batch execution

Agora now supports ProcessBatchMiddleware, a batch-lane middleware that runs a
sync transform in a managed ProcessPoolExecutor while keeping checkpointing,
DLQ routing, and sink delivery in the main runtime process.

This gives batch pipelines a clean path for:

  • CPU-heavy transforms
  • blocking native-library calls
  • memory isolation between orchestration and compute

ArrowProcessBatchMiddleware for columnar process execution

Agora now also supports ArrowProcessBatchMiddleware, an Arrow-native sibling
that keeps batches columnar across the process boundary.

This path is intended for:

  • Arrow-native sources and sinks
  • vectorized transforms using pyarrow.compute
  • CPU-heavy columnar work where Python-object batches would add too much
    serialization overhead

The first cut transports Arrow batches as Arrow IPC bytes and currently requires
the worker transform to preserve input row count.

Timeout recovery now recycles the worker pool

If a process batch exceeds timeout_s, Agora treats that batch as failed and
recycles the worker pool before accepting the next batch.

This avoids a poisoned-pool failure mode where later batches would otherwise
queue behind a stuck worker and fail in a cascade.

In ordered pipelined mode, unresolved sibling batches from that recycled
worker-pool generation are failed too, instead of being committed from stale
worker state.

Ordered pipelined process execution is now active

When max_workers > 1 and max_in_flight_batches > 1, both
ProcessBatchMiddleware and ArrowProcessBatchMiddleware now keep multiple
batches in flight while still committing them in source order.

This turns max_in_flight_batches into a real throughput control for ordered
process-batch pipelines.

Batch-only contract is explicit

ProcessBatchMiddleware now rejects per-record execution directly. It must be
paired with a source that emits batches (supports_batch_emit=True).

This keeps the public contract honest and avoids silent per-record process hops.


Stability Status

ProcessBatchMiddleware is intended as a stable 0.3.0 feature for
Python-object batch pipelines when all of the following are true:

  • the source is batch-capable
  • the batch payload is pickleable
  • the transform function is pickleable
  • the pipeline keeps source-order commit semantics
  • if pipelined process execution is enabled, it uses ordered=True

ArrowProcessBatchMiddleware is intended as a stable 0.3.0 feature for
Arrow-native process-batch pipelines when all of the following are true:

  • the source emits pa.RecordBatch
  • the worker transform is pickleable
  • the worker transform returns pa.RecordBatch
  • the transform preserves row count
  • the pipeline keeps source-order commit semantics
  • if pipelined process execution is enabled, it uses ordered=True

The following mode is not available in 0.3.0:

  • ordered=False for pipelined process execution

Ordered pipelined execution is active when max_workers > 1 and
max_in_flight_batches > 1. Unordered pipelined commit is not available yet
and is rejected explicitly when both ordered=False and
max_in_flight_batches > 1.


Release Validation

Release validation for this feature should include:

  • targeted process-batch unit coverage
  • end-to-end batch-lane coverage for success, DLQ, timeout recovery, and
    checkpoint gating
  • ordered pipelined coverage for both Python-object and Arrow-native process
    batches
  • adjacent batch/runtime preservation coverage
  • package CI from source checkout with PYTHONPATH=src

Recommended commands:

PYTHONPATH=src ./.venv/bin/pytest -q \
  tests/core/test_process_batch_middleware.py \
  tests/integration/test_process_batch_integration.py

PYTHONPATH=src ./.venv/bin/pytest -q \
  tests/core/test_batch_pipeline.py \
  tests/preservation/test_runtime_guarantees.py -k "batch or checkpoint or dlq"

Release Criteria

  • timed-out process work does not poison later batches in the same run
  • checkpoint advancement remains gated on downstream handling after process
    execution
  • failed process batches route to the DLQ or stop the run according to the
    configured failure policy
  • package docs clearly state batch-only usage and timeout semantics
  • the targeted process-batch and adjacent batch/runtime suites pass from the
    source tree