Skip to content

v0.2.1

Choose a tag to compare

@thanhtham010891 thanhtham010891 released this 01 Jun 10:59
· 7 commits to main since this release

Focus

Correctness hardening, performance improvements, and Arrow-native sink expansion.
0.2.1 fixes several bugs introduced in 0.2.0's batch architecture, improves
throughput by ~2x on per-record pipelines via delivery batching, and extends the
Arrow fast path to CsvSink and JsonLinesSink.


Performance

Delivery batching — ~2x throughput on per-record pipelines

DeliveryConfig(batch_size=N) now delivers measurable throughput gains on the
linear lane. Profiling shows delivery overhead (write dispatch, checkpoint
decision, metrics flush) accounts for ~40% of per-record pipeline time. Batching
reduces the number of delivery function calls from O(records) to O(records/N).

Recommended default for per-record pipelines: batch_size=100.

# Before (0.2.0 default — batch_size=1)
Pipeline(source).build(sink).run()                    # ~124K rec/s JSONL

# After (0.2.1 recommended)
Pipeline(source).build(sink, config=DeliveryConfig(batch_size=100)).run()  # ~279K rec/s JSONL

Measured on 100K rows, median of 3 runs, NullSink:

Lane batch_size=1 batch_size=100 Gain
JSONL direct 124K rec/s 279K rec/s 2.2x
CSV direct 89K rec/s 164K rec/s 1.8x

Arrow-native CsvSink and JsonLinesSink

Both sinks now implement write_arrow_batch(batch: pa.RecordBatch) and satisfy
the ArrowNativeSink protocol. Arrow pipelines that previously fell back to
to_pylist() at the sink boundary now stay columnar end-to-end.

# Now stays on the Arrow fast path — no to_pylist() at sink
Pipeline(ArrowCsvSource("input.csv")).build(CsvSink("output.csv", row_mapper=lambda r: r)).run()
Pipeline(ArrowJsonLinesSource("input.jsonl")).build(JsonLinesSink("output.jsonl")).run()

CsvSink.write_arrow_batch() uses pyarrow.csv.write_csv() (C extension) —
faster than Python csv.DictWriter for large batches.
JsonLinesSink.write_arrow_batch() uses to_pylist() + batch JSON serialization
in a single write syscall, bypassing the per-record serializer call.


Bug fixes

BatchMiddleware missing process() and on_error() fallbacks

BatchMiddleware subclasses raised AttributeError when used with a non-batch
source (linear lane). The linear lane calls process() per record — but
BatchMiddleware only defined process_batch().

Fixed: BatchMiddleware now provides a process() fallback that wraps the
single record in a list, calls process_batch([record], ctx), and returns the
first result. An on_error() fallback is also added for consistency.

This means BatchMapMiddleware and BatchFilterMiddleware now work correctly
with any source, not just batch-emit sources.

MiddlewareChain.process_batch() — OCP violation fixed via double-dispatch

process_batch() previously used a chain of isinstance checks to dispatch
to BatchMiddleware, ArrowBatchMiddleware, MapMiddleware, and
FilterMiddleware. Adding a new middleware type required editing this method.

Fixed: each middleware type now implements apply_in_batch() — a double-dispatch
hook that process_batch() calls directly. The method is now a clean loop with
no isinstance dispatch.

DeliveryEngine — duplicate flush logic eliminated

flush_pending_writes() and flush_batch_direct() shared ~200 lines of
duplicated batch-failure handling logic. Both methods now delegate to shared
_flush_batch_outcomes() and _commit_outcomes() helpers.

write_batch_result() (155 lines, 3 code paths) split into three focused
private methods: _write_batch_middleware_failure(), _write_batch_passthrough(),
_write_batch_filtered().

assert isinstance(outcome, CheckpointedOutcome) replaced

Production code in _delivery.py used assert for type narrowing — stripped
by python -O. Replaced with raise TypeError guards.

ScheduledPipelineRunner — private attribute access removed

ScheduledPipelineRunner accessed self._pipeline._state, self._pipeline._factory,
self._pipeline._max_errors, etc. directly. All private attributes are now
exposed as public properties on ScheduledPipeline:

New properties: state, max_records, max_consecutive_errors, backoff_policy,
pre_run_hook, on_run_complete, observers, and build() method.

Schema.from_dict — hash mismatch policy

Schema.from_dict() previously logged a warning on hash mismatch and continued
loading — a tampered or corrupted schema under FREEZE contract went undetected.

Fixed: Schema.from_dict(data, strict_hash=True) now raises ValueError on
hash mismatch. BackendSchemaStore automatically enables strict_hash=True
when constructed with contract=SchemaContract.FREEZE.

SchemaEvolution.merge() — hash short-circuit

merge() previously rebuilt the merged columns dict from scratch on every call,
even when the schema had not changed. Fixed: early return when
old.hash == new.hash.

SchemaProcessorSchemaInferrer reuse

_infer_record_schema() previously allocated a new SchemaInferrer per record.
Fixed: a single SchemaInferrer is created in __init__ and reused via
reset() before each record.

EmbeddingStore.mark_if_new() — atomic check-and-add

The default DedupStore.mark_if_new() is documented as non-atomic. Under
concurrent execution (buffered lane), two coroutines could both see
exists() == False and both call add().

Fixed: EmbeddingStore now overrides mark_if_new() with an asyncio.Lock
that embeds the key once and performs check + append atomically.

Import path validation in declarative configs

ImportRefConfig now validates that {"import": "..."} values in TOML configs
are valid dotted Python identifiers with an optional :attribute suffix.
Malformed paths (absolute paths, shell metacharacters, traversal) are rejected
at config parse time with a clear error message.


New: Rust CheckpointState (agora-rs)

CheckpointState is now implemented in Rust (agora-rs) and used automatically
when the extension is installed. The Python implementation remains as a fallback.

The Rust implementation eliminates per-record Python attribute access for
increment(), should_save(), and mark_saved() — the three hot-path
checkpoint operations called on every delivered record.

make_checkpoint_state() is the new factory function — returns the Rust
implementation when available, Python otherwise.


New: Benchmark system (benchmarks/)

A new subprocess-isolated benchmark suite replaces the previous ad-hoc scripts.

# Run all lanes (csv, jsonl, parquet) with median of 3 runs
python benchmarks/run.py --rows 100000

# Run specific lanes
python benchmarks/run.py --rows 100000 --only csv jsonl

# Increase statistical confidence
python benchmarks/run.py --rows 100000 --median 5

# Save results to JSON
python benchmarks/run.py --rows 100000 --save

Each case runs in an isolated subprocess — no GC noise or event-loop state
leaks between measurements. Fixture data is regenerated before each run.
Results show median elapsed and throughput across --median runs.

Cases per lane:

Case CSV JSONL Parquet
direct
map
filter
map_filter
batch_map
batch_filter
arrow
arrow_map
arrow_filter
arrow_to_csv
arrow_to_jsonl

New public exports

Added to from agora.core.runtime import ...:

  • make_checkpoint_state — factory for Rust-backed or Python CheckpointState

Added to from agora.config.pipeline import ...:

  • validate_import_path — validates declarative import path format

Tests

690 tests pass (up from 649 in 0.2.0).

New test coverage:

  • tests/core/test_pipeline.py — batch_size delivery batching, direct flush,
    delivery success hooks, BatchMiddleware on linear lane
  • tests/core/test_tracing.py — tracing span lifecycle, InMemoryTracer
  • tests/core/test_observability_metrics.py — metrics collector, Prometheus exporter
  • tests/core/test_batch_pipeline.py — extended batch pipeline coverage