Releases: thanhtham010891/agora-etl
Release v0.4.0
agora-etl-rs becomes Agora's optional
acceleration substrate for measured core hot paths while Python keeps runtime
semantics.
Added
agora.core.accelerationas the single optional acceleration boundary.DeliveryConfig(acceleration_mode=...)withauto,off, andrequired.- Declarative
[performance]config withaccelerationandprofile. - Acceleration status in
Pipeline.explain(), run summaries, tracing
attributes,agora run --plan,agora doctor, andagora doctor --json. - Performance profiles that resolve to explicit runtime settings while keeping
manual overrides authoritative. - Benchmark matrix runner for 0.3.3 baseline, pure Python 0.4.0, and
Rust-enabled 0.4.0 runs.
Changed
- Runtime modules no longer import
agora_rsdirectly. - Existing Rust primitives are selected through the acceleration facade.
- Direct flush, Rust prefetch, checkpoint state, metrics accumulator, and linear
batch buffer decisions are visible in runtime metrics. throughputprofile is the opt-in gate for Rust source prefetch outside
buffered lanes until release benchmarks justify broader activation.
Removed
agora.core.runtime.RecordDeliveryCoordinatoragora.core.sink._WRITE_OKagora.core.contextcompatibility exports:
_NOOP_SPAN_SCOPE,_BoundLogger,_NoopSpanScope,_PipelineSpanScope,
and_normalize_trace_value- Legacy source/sink data-plane bool flag inference
BaseSource.runtime_counters()compatibility shim
Release Rule
Performance claims must be backed by benchmark JSON. Candidate Rust expansions,
including JSONL batch encoding and Arrow IPC transport, remain benchmark-gated.
Release v0.3.3
Focus
0.3.3 is a short additive release that makes the 0.3.x AI and
observability story easier to trust in real worker deployments.
This release is about tightening existing extension paths rather than opening a
new platform phase:
- the AI provider contract now matches real provider capabilities
- Anthropic is now part of the official plugin bundle
- config-driven workers can enable OpenTelemetry through the existing tracing path
- short-term release docs and packaging now match the shipped support story
Highlights
AI provider capabilities are now explicit
Agora now treats completion and embeddings as separate capabilities instead of
implying every provider must implement both.
AIProvider remains as the compatibility-facing completion contract for
existing middleware, while the codebase now exposes clearer capability helpers:
CompletionProviderEmbeddingProviderrequire_completion_provider(...)require_embedding_provider(...)
This makes completion-only providers such as Anthropic first-class without
forcing fake embedding implementations, and it gives embedding-dependent paths a
clear failure mode when the wrong provider is supplied.
Anthropic is now part of the official first-party bundle
Anthropic support has been promoted into agora-etl-plugins through the new
anthropic extra.
The official import and install story is now:
pip install "agora-etl-plugins[anthropic]"from agora_plugins.anthropic import AnthropicProvider
The supported scope stays intentionally narrow and honest:
- completion
- structured JSON output
- completion-driven AI middlewares
It still does not promise:
- embeddings
- embedding-based classification
- embedding stores
This keeps the public bundle story truthful while still making Anthropic an
official first-party integration.
Config-driven workers can now turn on OTel without custom bootstrap code
Worker startup can now be driven directly from agora/v1 config, including
top-level worker settings and per-pipeline schedules.
Tracing config also gained an auto_configure path for OpenTelemetry. When
enabled, Agora now:
- reuses an existing global tracer provider if one is already configured
- otherwise auto-configures an OTLP exporter when the optional OTel SDK and
exporter packages are installed
This keeps observability on the existing tracing path instead of introducing a
second tracing system.
Docs and packaging now tell a cleaner release story
The 0.3.3 docs now align better with the real public support boundary:
- plugin docs position Anthropic as part of the official bundle
- worker and observability docs show the config-only OTel path
- changelog and release metadata now match the promoted plugin surface
Upgrade notes
For AI users
- completion-only providers are now valid without embedding methods
- embedding-based features still require an embedding-capable provider
- Anthropic should now be installed from
agora-etl-plugins[anthropic]
For worker operators
agora worker --config ...can now build aWorkerPooldirectly from TOML- each config-driven worker pipeline must define
[pipelines.<name>.schedule] - OTel config can now stay declarative when the optional OTel packages are
installed
For plugin users
agora-etl-pluginsnow targetsagora-etl>=0.3.3- Anthropic is now part of the official bundle instead of a separate release
story
Validation
Release validation for 0.3.3 includes:
- full package-local coverage for the Anthropic official bundle path
- worker/config/tracing regression coverage for OTel auto-wiring
- strict docs builds for the main Agora documentation site
- full monorepo CI for
agora-etlandagora-etl-plugins
Release v0.3.2
Focus
0.3.2 turns the refactored 0.3.x core into a tighter release contract.
This release is about hardening what already exists:
- explicit public boundaries
- preservation-backed runtime behavior
- clearer plugin contracts
- stronger diagnostics, security posture, and performance guardrails
Highlights
Public API boundaries are now frozen more deliberately
Agora now treats the package facades as explicit release contracts:
agoraagora.coreagora.core.<domain>agora.core.runtimeas an advanced public facade
Architecture tests now fail when public surfaces drift, when compatibility
exports are undocumented, or when plugin-facing code leaks into underscore
support modules by accident.
Runtime guarantees are preservation-backed end to end
The 0.3.x runtime guarantee matrix is now fully preservation-backed. This
covers lane selection, ordering, checkpoint cadence, DLQ routing, retry
boundaries, shutdown/cancellation behavior, and process/buffered invariants.
This release is therefore much safer for continued internal refactor: the code
can keep moving without reopening ambiguity about preserved runtime semantics.
Observability and health diagnostics are safer
Built-in health payloads now redact and truncate last_error before exposing it
through metrics/health surfaces.
Plugin diagnostics are also clearer:
agora plugins listnow shows broken entry-points explicitly- entry-point key collisions with built-ins/public keys are surfaced as
diagnostics instead of silently winning by import order agora doctoris more explicit about its trust boundary during config import
and pipeline-build checks
Plugin contract cleanup is now reflected in code, tests, and docs
The official agora-etl-plugins bundle and the incubating plugin packages now
follow the updated contract more consistently:
- MANIFEST compatibility binds to
AGORA_PLUGIN_MANIFEST_VERSIONinstead of
hard-coded stale values - package metadata points at the real
agora-etlcore artifact - plugin contract tests now fail when plugin packages depend on internal Agora
support modules or on legacy module paths that now have public replacements
For 0.3.x, two advanced public plugin modules remain intentionally supported:
agora.core.registryfor MANIFEST compatibility bindingagora.core.retryfor shared retry helpers
Benchmark coverage now has a concrete maintenance budget
0.3.2 records the first post-refactor benchmark baseline and a simple perf
budget for future maintenance work.
The benchmark split now covers:
- file-backed lane throughput
- runtime orchestration-heavy shapes such as buffered execution, sink fan-out,
and observability overhead - Arrow process transport microbenchmarks
This makes it possible to catch hot-path regressions intentionally instead of
relying on anecdotal before/after impressions.
Upgrade notes
For builders
- Prefer
agoraand documentedagora.core.<domain>facades over file-level
modules. - Avoid underscore-prefixed imports even if a compatibility export still exists
in0.3.x.
For plugin authors
- Bind MANIFEST compatibility from
agora.core.registry. - Prefer
agora.state,agora.runner, and the documented plugin-contract
modules indocs/plugins/contract.md. - Treat
agora.core.registryandagora.core.retryas advanced public plugin
contracts in0.3.x.
Deprecated aliases retained through 0.3.x
agora.core.runtime.RecordDeliveryCoordinatoragora.core.sink._WRITE_OKagora.core.context._NOOP_SPAN_SCOPEagora.core.context._BoundLoggeragora.core.context._NoopSpanScopeagora.core.context._PipelineSpanScopeagora.core.context._normalize_trace_value
These remain compatibility bridges in 0.3.x and are still targeted for
removal in 0.4.0.
Validation
Release validation for 0.3.2 includes:
- public surface / architecture tests
- preservation coverage for runtime guarantees and plugin contract behavior
- package-local CI for
agora-etl-plugins - benchmark baselines for the canonical Phase 5 scenarios
Release v0.3.1
Focus
0.3.1 hardens the public runtime contract shipped in 0.3.0.
This release closes semantic gaps between runtime docs and real execution,
tightens lane/data-plane planning, improves recovery explainability at the CLI,
and adds preservation coverage so these guarantees do not drift silently.
Highlights
Runtime planning and delivery semantics are more stable
Agora now keeps source, planner, writer, and sink decisions aligned more
consistently across the public runtime entrypoints and internal lane
orchestration.
Key hardening areas include:
- lane selection and data-plane validation across
pipeline.py,
executor.py,source.py,sink.py, and_plan.py - buffered and ordered delivery behavior in
_buffered.py,
_delivery.py, and_lanes.py - startup and shutdown rollback so partially opened sink graphs do not leak
state when later components fail to open - checkpoint and source-delivery gating so acknowledgement remains tied to
successful downstream handling
This release is intentionally about making 0.3.x safer to trust under stress,
not about adding a brand-new execution model.
Public data-plane contract is now easier for plugin authors to follow
Agora now exposes and documents the preferred contract helpers:
source_data_plane_spec()sink_data_plane_spec()SourceDataPlaneSpecSinkDataPlaneSpec
The legacy capability booleans still work in 0.3.x, but docs now steer
custom source and sink authors toward explicit data-plane specs instead of
guessing from older flags.
Related docs were tightened in:
docs/source/custom.mddocs/sink/custom.mddocs/plugins/contract.md
Recovery UX is more explainable from the CLI
Two recovery-oriented CLI surfaces are now much more actionable:
agora checkpoint inspectagora diagnose
They now expose runtime recovery posture more directly, including details such
as recovery support, resume keys, checkpoint granularity, resume behavior, and
warnings when resuming large file-backed sources may be costly.
This makes it easier to answer practical operator questions like:
- can this pipeline actually resume?
- what unit of progress is persisted?
- what does "resume" mean for this source?
- is recovery likely to be cheap or expensive?
agora doctor is now a stronger preflight tool
agora doctor now goes beyond basic configuration shape checks.
It can validate:
- selected pipeline resolution from config
- import references used by config-backed components
- pipeline buildability before execution
- recovery posture
- DLQ replay support
- required environment variables for the selected pipeline
This helps catch miswired configs earlier, before a real run discovers them the
hard way.
Spawn safety and test isolation are tighter
The CLI path bootstrapping helper now ignores mocked or otherwise non-pathlike
ctx.cwd values when adding project paths to sys.path.
That closes a subtle failure mode where test doubles could pollute process-spawn
state and later break process-batch execution with pickling failures.
Tests
Release validation for 0.3.1 includes:
- full package CI from the checked-out source tree
- preservation coverage for runtime guarantees, recovery edge cases, plugin
contract, and baseline behaviors - targeted CLI coverage for
doctor,checkpoint inspect, anddiagnose - public API coverage for the data-plane helper surface
New or expanded test areas include:
tests/core/test_doctor_cli.pytests/core/test_recovery_cli.pytests/core/test_data_plane_public_api.pytests/preservation/test_runtime_guarantees.pytests/preservation/test_recovery_edge_cases.py
Release v0.3.0
Focus
Process-isolated batch transforms for CPU-heavy or memory-isolated workloads,
with recovery semantics strong enough for long-running batch pipelines.
Highlights
Source-bound max_records limits
run(max_records=N) now applies a source wrapper before execution starts
instead of stopping later inside the runtime lanes.
This makes bounded runs behave the same across:
.build().fan_out().route().run().run_sync()
For batch and Arrow sources, the final emitted batch is now trimmed at the
source boundary instead of overshooting and stopping after a full batch has
already been processed.
Middleware data-plane validation is now explicit
Agora now validates source and middleware data planes during runtime planning.
Supported shapes:
- Arrow-emitting source plus all-Arrow middleware chain
- Arrow-emitting source plus all Python-row middleware chain
- Python-row source plus all Python-row middleware chain
Rejected shape:
- any chain that mixes Arrow middleware with Python-row/list-batch middleware
This now fails fast with a PipelineError instead of silently switching data
shape mid-chain.
Arrow fan-out now chooses the best path per sink
When the pipeline stays on the Arrow chain and the writer is a fan-out:
- sinks with
write_arrow_batch()receive the originalRecordBatch - sinks without Arrow support receive
list[dict]fallback only at the sink boundary
This keeps Arrow-native sinks fast without forcing every sink in the fan-out to
implement the same write contract.
Execution semantics now expose first-class data planes
Agora now resolves one shared vocabulary across source, middleware, writer, and
sink boundaries:
python_rowspython_batchesarrow_batches
The runtime plan now records:
- the plane emitted by the source
- the plane that actually entered the writer
- which sinks had to downgrade at the writer/sink boundary
That contract is also introspectable from user code through
BaseSource.data_plane_spec(), BaseSource.emitted_data_plane, and
BaseSink.data_plane_spec().
The shared vocabulary is now part of the root public API too:
from agora import DataPlanefrom agora import SourceDataPlaneSpec, SinkDataPlaneSpec
Legacy batch/Arrow bool flags are now deprecated
The older data-plane booleans still work in 0.3.x, but Agora now treats them
as compatibility shims instead of the primary extension contract.
Deprecated source flags:
supports_batch_emitemits_arrow_batches
Deprecated sink flags:
batch_writable_nativearrow_passthrough_native
Deprecated capability fields:
SinkCapabilities.batch_writable_nativeSinkCapabilities.arrow_passthrough_native
Preferred replacement:
- sources override
data_plane_spec()and returnSourceDataPlaneSpec - sinks advertise
accepted_data_planes/native_data_planes, or return
explicit planes fromsink_capabilities()
When Agora has to infer a non-row plane from the legacy booleans, it now emits
DeprecationWarning once per source/sink class. The compatibility bridge is
planned for removal in 0.4.0.
Built pipelines can now explain their planned execution shape
BoundPipeline.explain() now returns a PipelineExplain snapshot without
starting the source. This surfaces:
- planned lane selection
- source and writer data planes
- middleware compatibility matrix
- per-sink selected plane and downgrade markers
CsvSink now reports Arrow-native writes vs downgrade-to-rows
CsvSink.write_arrow_batch() still prefers pyarrow.csv.write_csv(), but it
now downgrades safely to Python-row CSV writing when Arrow CSV rejects the
batch, such as with nested types or invalid UTF-8 payloads.
The run summary and runtime metrics now expose:
csv_arrow_native_batch_countcsv_arrow_native_row_countcsv_arrow_downgrade_batch_countcsv_arrow_downgrade_row_count
This makes it much easier to distinguish:
- Arrow lane selected by the planner
- Arrow path active in the runtime
- CSV sink actually staying native for the full run
ProcessBatchMiddleware for process-isolated batch execution
Agora now supports ProcessBatchMiddleware, a batch-lane middleware that runs a
sync transform in a managed ProcessPoolExecutor while keeping checkpointing,
DLQ routing, and sink delivery in the main runtime process.
This gives batch pipelines a clean path for:
- CPU-heavy transforms
- blocking native-library calls
- memory isolation between orchestration and compute
ArrowProcessBatchMiddleware for columnar process execution
Agora now also supports ArrowProcessBatchMiddleware, an Arrow-native sibling
that keeps batches columnar across the process boundary.
This path is intended for:
- Arrow-native sources and sinks
- vectorized transforms using
pyarrow.compute - CPU-heavy columnar work where Python-object batches would add too much
serialization overhead
The first cut transports Arrow batches as Arrow IPC bytes and currently requires
the worker transform to preserve input row count.
Timeout recovery now recycles the worker pool
If a process batch exceeds timeout_s, Agora treats that batch as failed and
recycles the worker pool before accepting the next batch.
This avoids a poisoned-pool failure mode where later batches would otherwise
queue behind a stuck worker and fail in a cascade.
In ordered pipelined mode, unresolved sibling batches from that recycled
worker-pool generation are failed too, instead of being committed from stale
worker state.
Ordered pipelined process execution is now active
When max_workers > 1 and max_in_flight_batches > 1, both
ProcessBatchMiddleware and ArrowProcessBatchMiddleware now keep multiple
batches in flight while still committing them in source order.
This turns max_in_flight_batches into a real throughput control for ordered
process-batch pipelines.
Batch-only contract is explicit
ProcessBatchMiddleware now rejects per-record execution directly. It must be
paired with a source that emits batches (supports_batch_emit=True).
This keeps the public contract honest and avoids silent per-record process hops.
Stability Status
ProcessBatchMiddleware is intended as a stable 0.3.0 feature for
Python-object batch pipelines when all of the following are true:
- the source is batch-capable
- the batch payload is pickleable
- the transform function is pickleable
- the pipeline keeps source-order commit semantics
- if pipelined process execution is enabled, it uses
ordered=True
ArrowProcessBatchMiddleware is intended as a stable 0.3.0 feature for
Arrow-native process-batch pipelines when all of the following are true:
- the source emits
pa.RecordBatch - the worker transform is pickleable
- the worker transform returns
pa.RecordBatch - the transform preserves row count
- the pipeline keeps source-order commit semantics
- if pipelined process execution is enabled, it uses
ordered=True
The following mode is not available in 0.3.0:
ordered=Falsefor pipelined process execution
Ordered pipelined execution is active when max_workers > 1 and
max_in_flight_batches > 1. Unordered pipelined commit is not available yet
and is rejected explicitly when both ordered=False and
max_in_flight_batches > 1.
Release Validation
Release validation for this feature should include:
- targeted process-batch unit coverage
- end-to-end batch-lane coverage for success, DLQ, timeout recovery, and
checkpoint gating - ordered pipelined coverage for both Python-object and Arrow-native process
batches - adjacent batch/runtime preservation coverage
- package CI from source checkout with
PYTHONPATH=src
Recommended commands:
PYTHONPATH=src ./.venv/bin/pytest -q \
tests/core/test_process_batch_middleware.py \
tests/integration/test_process_batch_integration.py
PYTHONPATH=src ./.venv/bin/pytest -q \
tests/core/test_batch_pipeline.py \
tests/preservation/test_runtime_guarantees.py -k "batch or checkpoint or dlq"Release Criteria
- timed-out process work does not poison later batches in the same run
- checkpoint advancement remains gated on downstream handling after process
execution - failed process batches route to the DLQ or stop the run according to the
configured failure policy - package docs clearly state batch-only usage and timeout semantics
- the targeted process-batch and adjacent batch/runtime suites pass from the
source tree
Release v0.2.2
Focus
Operational visibility for long-lived workers, safer partial-batch delivery,
and a more production-shaped Kafka worker example.
Highlights
Live worker observability for long-lived pipelines
WorkerPool now registers pipelines with the metrics collector before the first
run completes, so /health and /metrics can show long-lived consumers while
they are still running.
Live health payloads now expose:
- registered pipeline metadata and schedule
- running lifecycle state and active run id
- live throughput and elapsed runtime
- live
records_consumed,records_written,records_dropped,records_errored records_pendingfor in-flight records that have not reached a terminal outcome yet
This makes the health surface useful for continuous workers that may not finish
a run for hours or days.
Timed batch flushes for long-lived sources
DeliveryConfig now supports batch_flush_interval_ms for batched delivery.
This allows long-lived sources to flush partial sink batches after a bounded
wait, instead of holding records indefinitely until the batch fills or the run
ends.
The implementation uses a single-owner flush task for pending_writes, which
avoids duplicate delivery when timeout-based flushing and new arrivals happen
close together.
Live metrics no longer under-report buffered source counters
Live /health snapshots now include hot-path source counters that have not yet
been flushed into the main PipelineMetrics object. This fixes confusing
observability states where records_written could temporarily appear higher
than records_consumed during a live run.
Kafka worker example updated for long-lived operation
The etl-order-reliability example now models long-lived Kafka consumers more
closely:
- consumer pipelines stay attached to their Kafka group instead of exiting
after short chunked runs - the worker no longer auto-runs the sample producer
- Kafka UI is included in the local Docker stack for topic, group, and lag inspection
- the example uses timed batch flushing to avoid stuck partial batches at low traffic
Tests
Release validation includes:
- worker observability and health response coverage for running pipelines
- regression coverage for timed partial-batch flushes
- example worker tests for the Kafka reliability example
- full package CI from the source tree
v0.2.1
Focus
Correctness hardening, performance improvements, and Arrow-native sink expansion.
0.2.1 fixes several bugs introduced in 0.2.0's batch architecture, improves
throughput by ~2x on per-record pipelines via delivery batching, and extends the
Arrow fast path to CsvSink and JsonLinesSink.
Performance
Delivery batching — ~2x throughput on per-record pipelines
DeliveryConfig(batch_size=N) now delivers measurable throughput gains on the
linear lane. Profiling shows delivery overhead (write dispatch, checkpoint
decision, metrics flush) accounts for ~40% of per-record pipeline time. Batching
reduces the number of delivery function calls from O(records) to O(records/N).
Recommended default for per-record pipelines: batch_size=100.
# Before (0.2.0 default — batch_size=1)
Pipeline(source).build(sink).run() # ~124K rec/s JSONL
# After (0.2.1 recommended)
Pipeline(source).build(sink, config=DeliveryConfig(batch_size=100)).run() # ~279K rec/s JSONLMeasured on 100K rows, median of 3 runs, NullSink:
| Lane | batch_size=1 | batch_size=100 | Gain |
|---|---|---|---|
| JSONL direct | 124K rec/s | 279K rec/s | 2.2x |
| CSV direct | 89K rec/s | 164K rec/s | 1.8x |
Arrow-native CsvSink and JsonLinesSink
Both sinks now implement write_arrow_batch(batch: pa.RecordBatch) and satisfy
the ArrowNativeSink protocol. Arrow pipelines that previously fell back to
to_pylist() at the sink boundary now stay columnar end-to-end.
# Now stays on the Arrow fast path — no to_pylist() at sink
Pipeline(ArrowCsvSource("input.csv")).build(CsvSink("output.csv", row_mapper=lambda r: r)).run()
Pipeline(ArrowJsonLinesSource("input.jsonl")).build(JsonLinesSink("output.jsonl")).run()CsvSink.write_arrow_batch() uses pyarrow.csv.write_csv() (C extension) —
faster than Python csv.DictWriter for large batches.
JsonLinesSink.write_arrow_batch() uses to_pylist() + batch JSON serialization
in a single write syscall, bypassing the per-record serializer call.
Bug fixes
BatchMiddleware missing process() and on_error() fallbacks
BatchMiddleware subclasses raised AttributeError when used with a non-batch
source (linear lane). The linear lane calls process() per record — but
BatchMiddleware only defined process_batch().
Fixed: BatchMiddleware now provides a process() fallback that wraps the
single record in a list, calls process_batch([record], ctx), and returns the
first result. An on_error() fallback is also added for consistency.
This means BatchMapMiddleware and BatchFilterMiddleware now work correctly
with any source, not just batch-emit sources.
MiddlewareChain.process_batch() — OCP violation fixed via double-dispatch
process_batch() previously used a chain of isinstance checks to dispatch
to BatchMiddleware, ArrowBatchMiddleware, MapMiddleware, and
FilterMiddleware. Adding a new middleware type required editing this method.
Fixed: each middleware type now implements apply_in_batch() — a double-dispatch
hook that process_batch() calls directly. The method is now a clean loop with
no isinstance dispatch.
DeliveryEngine — duplicate flush logic eliminated
flush_pending_writes() and flush_batch_direct() shared ~200 lines of
duplicated batch-failure handling logic. Both methods now delegate to shared
_flush_batch_outcomes() and _commit_outcomes() helpers.
write_batch_result() (155 lines, 3 code paths) split into three focused
private methods: _write_batch_middleware_failure(), _write_batch_passthrough(),
_write_batch_filtered().
assert isinstance(outcome, CheckpointedOutcome) replaced
Production code in _delivery.py used assert for type narrowing — stripped
by python -O. Replaced with raise TypeError guards.
ScheduledPipelineRunner — private attribute access removed
ScheduledPipelineRunner accessed self._pipeline._state, self._pipeline._factory,
self._pipeline._max_errors, etc. directly. All private attributes are now
exposed as public properties on ScheduledPipeline:
New properties: state, max_records, max_consecutive_errors, backoff_policy,
pre_run_hook, on_run_complete, observers, and build() method.
Schema.from_dict — hash mismatch policy
Schema.from_dict() previously logged a warning on hash mismatch and continued
loading — a tampered or corrupted schema under FREEZE contract went undetected.
Fixed: Schema.from_dict(data, strict_hash=True) now raises ValueError on
hash mismatch. BackendSchemaStore automatically enables strict_hash=True
when constructed with contract=SchemaContract.FREEZE.
SchemaEvolution.merge() — hash short-circuit
merge() previously rebuilt the merged columns dict from scratch on every call,
even when the schema had not changed. Fixed: early return when
old.hash == new.hash.
SchemaProcessor — SchemaInferrer reuse
_infer_record_schema() previously allocated a new SchemaInferrer per record.
Fixed: a single SchemaInferrer is created in __init__ and reused via
reset() before each record.
EmbeddingStore.mark_if_new() — atomic check-and-add
The default DedupStore.mark_if_new() is documented as non-atomic. Under
concurrent execution (buffered lane), two coroutines could both see
exists() == False and both call add().
Fixed: EmbeddingStore now overrides mark_if_new() with an asyncio.Lock
that embeds the key once and performs check + append atomically.
Import path validation in declarative configs
ImportRefConfig now validates that {"import": "..."} values in TOML configs
are valid dotted Python identifiers with an optional :attribute suffix.
Malformed paths (absolute paths, shell metacharacters, traversal) are rejected
at config parse time with a clear error message.
New: Rust CheckpointState (agora-rs)
CheckpointState is now implemented in Rust (agora-rs) and used automatically
when the extension is installed. The Python implementation remains as a fallback.
The Rust implementation eliminates per-record Python attribute access for
increment(), should_save(), and mark_saved() — the three hot-path
checkpoint operations called on every delivered record.
make_checkpoint_state() is the new factory function — returns the Rust
implementation when available, Python otherwise.
New: Benchmark system (benchmarks/)
A new subprocess-isolated benchmark suite replaces the previous ad-hoc scripts.
# Run all lanes (csv, jsonl, parquet) with median of 3 runs
python benchmarks/run.py --rows 100000
# Run specific lanes
python benchmarks/run.py --rows 100000 --only csv jsonl
# Increase statistical confidence
python benchmarks/run.py --rows 100000 --median 5
# Save results to JSON
python benchmarks/run.py --rows 100000 --saveEach case runs in an isolated subprocess — no GC noise or event-loop state
leaks between measurements. Fixture data is regenerated before each run.
Results show median elapsed and throughput across --median runs.
Cases per lane:
| Case | CSV | JSONL | Parquet |
|---|---|---|---|
| direct | ✓ | ✓ | ✓ |
| map | ✓ | ✓ | ✓ |
| filter | ✓ | ✓ | ✓ |
| map_filter | ✓ | ✓ | ✓ |
| batch_map | ✓ | ✓ | — |
| batch_filter | ✓ | ✓ | — |
| arrow | ✓ | ✓ | ✓ |
| arrow_map | ✓ | ✓ | ✓ |
| arrow_filter | ✓ | ✓ | ✓ |
| arrow_to_csv | ✓ | — | — |
| arrow_to_jsonl | — | ✓ | — |
New public exports
Added to from agora.core.runtime import ...:
make_checkpoint_state— factory for Rust-backed or PythonCheckpointState
Added to from agora.config.pipeline import ...:
validate_import_path— validates declarative import path format
Tests
690 tests pass (up from 649 in 0.2.0).
New test coverage:
tests/core/test_pipeline.py— batch_size delivery batching, direct flush,
delivery success hooks,BatchMiddlewareon linear lanetests/core/test_tracing.py— tracing span lifecycle,InMemoryTracertests/core/test_observability_metrics.py— metrics collector, Prometheus exportertests/core/test_batch_pipeline.py— extended batch pipeline coverage
Release v0.2.0
Focus
Batch-native architecture. 0.2.0 introduces a new execution lane that allows
sources to emit native batches, middlewares to process batches, and sinks to
consume batches — eliminating the per-record overhead that dominated 0.1.x
throughput.
Performance
The Arrow fast path activates when ParquetSource(use_arrow_batches=True) is
paired with ParquetSink and no per-record middleware transforms the batch.
In this configuration, pa.RecordBatch objects flow from source to sink with
zero Python object allocation per row.
New: Batch-native execution lane
Protocols (src/agora/core/batch.py)
Three new opt-in protocols:
BatchableSource[T] — a source that can emit native batches via
stream_batches(). The runtime calls this instead of stream() when
supports_batch_emit = True. Each yielded value is a list[T] or a
pa.RecordBatch (for Arrow-native sources).
BatchMiddleware[T, U] — a middleware that processes a whole batch at
once via process_batch(records, ctx) -> list[U | None]. Return None at
position i to drop records[i]. Raise to fail the entire batch.
ArrowNativeSink — a sink that can consume pa.RecordBatch directly via
write_arrow_batch(batch). When both source and sink are Arrow-native and no
middleware transforms the batch, the runtime uses the shortest path with zero
row materialization.
Failure policy for BatchMiddleware (Option A): if process_batch() raises,
the entire batch is routed to the DLQ (if configured) or the run aborts. There
is no per-record fallback.
Runtime batch lane (_buffered.py)
New ExecutionCoordinator.run_batch_pipeline() method. Activated
automatically when is_batch_capable_source(source) returns True. The
existing per-record and buffered paths are unchanged.
Guarantees preserved in the batch lane:
- Batches are committed in source order.
- Checkpoint advances once per batch, after the batch is durably written.
- Sink fail-closed and DLQ policies are honored per batch.
CancelledErroraborts without committing the in-flight batch.
MiddlewareChain additions
has_batch_stages()— returns True if any middleware is aBatchMiddleware.process_batch(records, ctx)— runs the batch through the chain. For
BatchMiddlewarestages: callsprocess_batch(). For regularMiddleware
stages: applies per-record within the batch loop.
Delivery layer additions
New DeliveryEngine.write_batch_result() — delivers a processed
batch to the sink and advances the checkpoint once. Handles batch failure
(Option A), sink errors, DLQ routing, and LOG_AND_CONTINUE policy.
(RecordDeliveryCoordinator remains as a deprecated alias for DeliveryEngine.)
ParquetSource — Arrow batch mode
ParquetSource gains a new use_arrow_batches=False constructor parameter.
When True:
supports_batch_emitis set toTrue.stream_batches()yieldspa.RecordBatchobjects directly — no
to_pylist()call, no Python dict allocation per row.row_mapperis bypassed in batch mode.- Checkpoint still tracks
row_numberacross batches for resume support. - Resume uses Arrow batch slicing instead of row-by-row skipping.
The default is False — existing pipelines using row_mapper are unaffected.
ParquetSink — Arrow batch mode
ParquetSink gains write_arrow_batch(batch: pa.RecordBatch) — writes an
Arrow batch directly via pa.Table.from_batches([batch]) without calling
row_mapper or _rows_to_columns(). The existing write() and
write_batch() paths are unchanged.
Breaking changes
AGORA_API_VERSION removed
The AGORA_API_VERSION alias (deprecated in 0.1.9) has been removed from
agora.core.registry. Use AGORA_PLUGIN_MANIFEST_VERSION instead.
# Before (0.1.9 — deprecated)
from agora.core.registry import AGORA_API_VERSION
# After (0.2.0)
from agora.core.registry import AGORA_PLUGIN_MANIFEST_VERSIONAGORA_PLUGIN_MANIFEST_VERSION bumped to "0.4"
The manifest schema version bumps from "0.3" to "0.4". Plugin packages
that declare MANIFEST.agora_api_version = "0.3" will be excluded from the
active registry and shown as incompatible in agora plugins list.
Update your plugin's MANIFEST:
MANIFEST = _Manifest(
package="my-plugin",
version="...",
agora_api_version="0.4", # was "0.3"
)New public exports
Added to from agora import ...:
BatchableSourceBatchMiddlewareBatchProcessResultBatchFailureArrowNativeSinkis_batch_capable_sourceis_arrow_native_sink
Tests
New: batch pipeline unit tests
Added tests/core/test_batch_pipeline.py (13 tests):
is_batch_capable_source()detectionis_arrow_native_sink()detectionBatchMiddleware.process_batch()dispatch- Dropped records (
Noneresults) - Source order across batches
- Checkpoint advances once per batch
- Batch failure (Option A) → entire batch to DLQ
- Batch failure without DLQ aborts run
- No-middleware batch pipeline
max_recordsrespected in batch mode
New: batch-path preservation tests
Added 4 batch-path variants to tests/preservation/test_runtime_guarantees.py:
- Source order in batch mode
- Checkpoint advances after each batch is durably written
- Sink fail-closed in batch mode
- DLQ routing on batch failure (Option A)
All 649 tests pass.
Packaging
- Bumped
agora-etlto0.2.0. - Removed
AGORA_API_VERSIONalias. - Bumped
AGORA_PLUGIN_MANIFEST_VERSIONto"0.4".
Release v0.1.9
Documentation
New: Plugin Contract
Added plugins/contract.md — single source of
truth for plugin authors. Covers:
- stability labels (
stable/provisional/internal) for every
entry-point group - what each label promises through the
0.1.x → 0.2.xtransition - plugin author obligations (base class requirements, internal path
restrictions) - what the runtime does with incompatible plugins (excluded from active
registry, still in diagnostics) - stable base class shapes for sources, sinks, middlewares, dedup stores,
and dedup strategies
New: Manifest Contract
Added plugins/manifest.md — explains the
two-version model that plugin authors frequently confuse:
AGORA_PLUGIN_MANIFEST_VERSION = "0.3"tracks the MANIFEST metadata
schema, not theagora-etlpackage version- full compatibility matrix (matching / mismatching / no MANIFEST)
- how to read the
registry_entrypoint_incompatiblewarning - deprecation notice for
AGORA_API_VERSIONalias (removed in0.2.0) - when
AGORA_PLUGIN_MANIFEST_VERSIONbumps vs when the package bumps
Updated: plugins/index.md and plugins/developing.md
plugins/index.mdnow links tocontract.mdandmanifest.mdfrom the
"Start here" section.plugins/developing.md"Manifest compatibility" section now points to
manifest.mdinstead of carrying its own short paragraph.
MkDocs nav
Added Plugin Contract and Manifest Contract to the Plugins section in
mkdocs.yml.
Code
Deprecation: AGORA_API_VERSION
Added a deprecation docstring to AGORA_API_VERSION in
agora/core/registry.py. The alias still works and resolves to the same
value as AGORA_PLUGIN_MANIFEST_VERSION. It will be removed in 0.2.0.
Migration is a one-line change:
# Before
from agora.core.registry import AGORA_API_VERSION
# After
from agora.core.registry import AGORA_PLUGIN_MANIFEST_VERSIONClearer incompatibility warning
The registry_entrypoint_incompatible log message now includes a hint
field that tells operators exactly what to update and where to find the
compatibility model. Behavior is unchanged — only the message is more
actionable.
Tests
Preservation: plugin contract
Added tests/preservation/test_plugin_contract.py (19 tests):
AGORA_API_VERSIONalias equalsAGORA_PLUGIN_MANIFEST_VERSION- each
stableentry-point group has aRegistryat the declared module
path and attribute, withload_entrypoints()callable (6 groups) - each
provisionalentry-point group has aRegistryat the declared
path (4 groups) - manifest matrix: matching version → loaded,
compatible=True - manifest matrix: mismatching version → excluded,
compatible=False,
still in diagnostics - manifest matrix: no MANIFEST → loaded,
compatible=None - incompatible plugin does not abort discovery of other plugins in the
same group - built-in sources registered under their declared names
- built-in sinks registered under their declared names
- built-in runners registered under their declared names
describe_items()returnsRegistryItemInfoobjects
All 72 preservation tests pass (53 from 0.1.8 + 19 new).
Release v0.1.8
Documentation
New: Runtime Guarantees
Added guides/runtime-guarantees.md — the
single source of truth for what the runtime promises. Covers:
- source-order delivery in linear and buffered modes
- sink fail-closed semantics and when each policy applies
- the full checkpoint advancement table (success, drop, DLQ, fail-closed,
log-and-continue) - DLQ routing and DLQ failure policy
- DLQ replay acknowledgement
- cancellation and lifecycle ordering
- intentional non-guarantees: at-least-once delivery, no two-phase commit,
untrusted-config caveat, public-edge hardening, cross-source ordering
The page is now the authoritative contract. architecture.md and
failure-handling.md cross-link into it instead of restating guarantees in
prose.
New: Recovery Support Matrix
Added guides/recovery-matrix.md — the
per-source resume contract. Covers:
- which built-in sources are checkpointable and what their resume key looks
like - the three conditions a source must satisfy to be treated as checkpointable
- per-source notes for
JsonLinesSource,CsvSource,ParquetSource,
HTTPSource, andIterableSource - what
resumedoes and does not promise - checkpoint failure handling for both the load and save paths
- a minimal recipe for adding resume support to a custom source
sources.md and guides/checkpointing.md now cross-link into the matrix
instead of carrying their own short tables.
Tests
Preservation: runtime guarantees
Added tests/preservation/test_runtime_guarantees.py (12 tests). Each test
maps to one declared guarantee in runtime-guarantees.md:
- linear-mode source-order commit
- buffered-mode source-order commit when the buffered stage resolves futures
in reverse order - sink fail-closed propagates without DLQ
- checkpoint advances on successful write
- checkpoint advances when middleware DLQs the record
- DLQ routing carries
stage="middleware"and middleware name - middleware failure without DLQ continues the pipeline and counts the error
- sink failure with DLQ advances the checkpoint
is_checkpoint_capablerequires the explicitsupports_checkpointflagDLQFailurePolicy.RAISEpropagates DLQ write errorsDLQFailurePolicy.LOG_ONLY(default) swallows DLQ write errorsSinkFailurePolicy.LOG_AND_CONTINUEadvances the checkpoint over a failed
write
Preservation: recovery edge cases
Added tests/preservation/test_recovery_edge_cases.py (10 tests):
- corrupted checkpoint payload (missing fields) raises a typed
TypeError—
protects the0.1.7fix - non-dict checkpoint payload raises a typed
TypeError prepare_resumefailure aborts the run underFAIL_CLOSEDprepare_resumefailure underLOG_AND_CONTINUEstarts from scratch- checkpoint
load()failure aborts the run underFAIL_CLOSED - checkpoint
load()failure underLOG_AND_CONTINUEstarts from scratch - checkpoint
save()failure underLOG_AND_CONTINUEdoes not retry-storm —
protects the0.1.7mark_savedfix - non-checkpointable source paired with a checkpoint store runs without
checkpointing instead of raising - two-run resume restores progress from the saved position
SQLiteCheckpointStoresurvives close/reopen — basis for cross-process
resume