Skip to content

feat: support Spark structured streaming write#399

Closed
wombatu-kun wants to merge 2 commits into
lance-format:mainfrom
wombatu-kun:structured-streaming-write
Closed

feat: support Spark structured streaming write#399
wombatu-kun wants to merge 2 commits into
lance-format:mainfrom
wombatu-kun:structured-streaming-write

Conversation

@wombatu-kun
Copy link
Copy Markdown
Contributor

Closes #246

What's done

Implements StreamingWrite (Spark DataSource V2) for Lance tables, allowing Spark Structured Streaming queries to write to Lance using all three output modes: Append, Complete, and Update.

The per-task write path is reused verbatim from the existing batch code (LanceDataWriter + ArrowBatchWriteBuffer) — streaming-specific logic is confined entirely to the driver-side commit step.


Key implementation details

Two-transaction exactly-once protocol (StreamingCommitProtocol)

Lance's Transaction.Builder accepts only one operation, making it impossible to atomically combine an Append with an epoch-watermark update. The sink works around this with two sequential transactions:

  • Txn1Append or Overwrite fragments, stamped with {streaming.queryId, streaming.epochId} in transaction properties.
  • Txn2UpdateConfig that persists spark.lance.streaming.lastEpoch.<queryId> = epochId in the dataset config map.

On restart after a crash, the sink reads the watermark (Txn2) and the transaction properties history (Txn1) to decide what to do:

  • epochId ≤ lastEpochSKIP_REPLAY (fast path, no I/O)
  • Txn1 found in history, Txn2 missing → RECOVERY_TXN2_ONLY (self-heal, run Txn2 only)
  • Fresh epoch → FULL_COMMIT (Txn1 then Txn2)

Output modes

  • Append — Lance Append operation (default).
  • CompleteSparkWriteBuilder.truncate() flips overwrite=true; each epoch runs a Lance Overwrite, replacing the entire table.
  • UpdateSparkWriteBuilder implements the internal Spark marker SupportsStreamingUpdateAsAppend. Spark routes update-mode rows through the same append path; the sink does not perform row-level upserts.

Required option: streamingQueryId — a globally unique string per logical streaming query. It is the idempotency key for both the epoch watermark and the recovery scan. Without it the sink fails fast with an actionable error.

Empty micro-batches are detected on the driver (no fragments → early return) and treated as no-ops: no Lance transaction is issued and the epoch watermark is intentionally not bumped.


Known limitations / design trade-offs

Bounded at-least-once fallback. If more than maxRecoveryLookback (default: 100) unrelated commits land between a Txn1→Txn2 crash and the subsequent retry, the recovery scan misses the prior Txn1 and re-appends the data — producing a bounded duplicate. This is documented behavior; users with very high-churn tables can raise maxRecoveryLookback up to 10 000.

No auto-create. The target Lance table must exist before the streaming query starts. The sink validates this at construction time and fails with a clear error if the table is absent.

Update mode is append-on-update, not upsert. For aggregation queries in Update mode, each micro-batch emits delta rows that are appended rather than merged into existing rows. This is the documented semantic; native MERGE-based upsert would require a separate implementation.

Staged commits (CTAS / REPLACE TABLE) are incompatible with streaming and are rejected immediately with an actionable error. The staged-commit flow commits exactly once via Table.commitStagedChanges(), which is structurally incompatible with the per-epoch commit cadence.

useCommitCoordinator() is overridden to return false (Lance commits are already atomic via CommitBuilder), but the @Override annotation is deliberately omitted because the method was added to the StreamingWrite interface in Spark 3.5 and its presence would break compilation on Spark 3.4.

@github-actions github-actions Bot added the enhancement New feature or request label Apr 9, 2026
@wombatu-kun wombatu-kun force-pushed the structured-streaming-write branch 3 times, most recently from 98c08b9 to 123b49f Compare April 9, 2026 04:36
@wombatu-kun wombatu-kun force-pushed the structured-streaming-write branch from 123b49f to ffece2a Compare April 12, 2026 03:47
@hamersaw
Copy link
Copy Markdown
Collaborator

hamersaw commented Apr 13, 2026

I think a design doc would be extremely useful in parsing out how this works. As I understand it, each micro-batch uses an Append transaction to write fragments and then another transaction to increment the watermark in the dataset manifest. These should be done atomically, but Lance's transaction API makes this difficult, so there is a restart mechanism that reads N transactions to get the latest watermark.

From a high-level it sounds like this will create a massive number of tiny fragments (ex. one for each microbatch) and require an excessive number of dataset manifest updates. In practice, the dataset manifest size increases linearly with the number of fragments, and the speed of commits degrades with increasing dataset manifest size. I posit that even with relatively infrequent micro-batch syncs, given two manifest updates per micro-batch the time to process on will exceed the interval at which they're scheduled. I think some benchmarks showing scalability of this solution would be really useful in pushing the PR forward.

Adds a streaming-write scalability benchmark (E1-E4) under benchmark/ and
back-fills docs/src/operations/streaming/streaming-writes.md with the measured
numbers, addressing the reviewer concern on PR lance-format#399 about manifest growth and
sustained commit latency.

Benchmark harness (benchmark/src/.../streaming/):
  - StreamingScalabilityBenchmark — driver with --experiment {E1|E2|E3|E4}
  - MicroBatchTimingListener — captures per-batch wall time, fragment count,
    Lance version, and manifest bytes (handles Lance's MAX_U64 - version
    filename encoding for local FS)
  - run-streaming-benchmark.sh / run-streaming-benchmark-docker.sh —
    host-Spark and docker wrappers; the docker variant pushes the locally
    built bundle into /opt/spark/jars to override the stale Maven Central jar

Reference results committed under benchmark/streaming-results/:
  - E1 (no OPTIMIZE, 2k batches): manifest grows ~83 B/fragment; p50 commit
    grows linearly to ~2.3 s at 21k fragments — exceeds a 1 s trigger after
    ~10k fragments.
  - E2 (every-50 / every-200): OPTIMIZE keeps fragments bounded; p90 commit
    stays at ~191 ms / ~250 ms in steady state.
  - E3 (recovery scan): ~0.5 ms × min(maxRecoveryLookback, current_version);
    default 100 keeps restart cost under ~75 ms.
  - E4 (sustained throughput): all of 1 s / 5 s / 15 s triggers achieve the
    input rate at 1k rows/s — pick by latency target, not throughput.

Doc updates:
  - streaming-writes.md gains a "Scalability and Operational Tuning" section
    with the measured tables, an explicit "OPTIMIZE is required, not
    optional" callout, and concrete trigger / lookback guidance.
  - LanceSparkWriteOptions: comment near DEFAULT_MAX_RECOVERY_LOOKBACK now
    cites the measured ~0.5 ms/version cost.
  - benchmark/README.md gains a full streaming-benchmark section (CSV
    schema, flag table, env vars, reproduction commands).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@wombatu-kun
Copy link
Copy Markdown
Contributor Author

Thanks for the review — you were right to push for benchmarks before merging. The two-transaction protocol's scalability characteristics turned out to be exactly the shape you predicted, and the numbers make the operational story much sharper than prose would. Just pushed 82cefb2 which:

  • Adds a streaming-scalability benchmark harness (E1–E4) under benchmark/ — driver + listener + host & docker run scripts.
  • Back-fills docs/src/operations/streaming/streaming-writes.md with the measured numbers and a clear "OPTIMIZE is required, not optional" callout — see the new Scalability and Operational Tuning section.
  • Commits the four reference CSVs under benchmark/streaming-results/ so the data is auditable without re-running.

Quick answers to each of your concerns, with data:

1. "Each micro-batch uses Append + UpdateConfig (two manifest updates)." Confirmed and measured: the dataset version increments by exactly 2 per micro-batch in every E1–E4 run. Now explicitly documented.

2. "Tiny fragments + manifest size grows linearly." Measured ~83 bytes per fragment in the manifest; with Spark default parallelism that's ~1 KB added per micro-batch on local[12] at 5 000 rows/s.

3. "Commit speed degrades with manifest size; will exceed the trigger interval." Confirmed — and this is the strongest argument for what we did about it. Without compaction (E1, ~2 000 batches):

Fragment count p50 commit p90 commit
0 – 5 000 243 ms 387 ms
10 000 – 15 000 1.19 s 1.40 s
20 000 – 25 000 2.29 s 2.51 s

The relationship is essentially linear (p50 ≈ 0.1 × fragment_count ms on the reference setup), so a processingTime("1 second") trigger crosses its budget at ~10 000 fragments — i.e. minutes-to-hours of sustained streaming, depending on rate.

OPTIMIZE keeps it bounded (E2, same workload):

OPTIMIZE cadence Steady-state fragments p90 commit
Never (E1) grows without bound drifts up
Every 50 batches ~600 191 ms
Every 200 batches cycles 600 ↔ 2 400 250 ms

So the headline operational rule (now in the docs): OPTIMIZE every 50–200 micro-batches.

4. "Restart reads N transactions to get the watermark." Measured at ~0.5 ms per visited version (E3). Cost = 0.5 ms × min(maxRecoveryLookback, current_version), paid once per restart. With the default maxRecoveryLookback=100, restart cost stays under ~75 ms regardless of dataset history depth — even the worst case (10 000 lookback, 10 000 versions) is ~5 s, paid once.

Bonus: E4 — sustained throughput vs. trigger interval. At 1 000 rows/s, all of 1 s / 5 s / 15 s triggers achieved the input rate without back-pressure. Trigger choice is a data-freshness vs. OPTIMIZE-cadence trade-off, not a throughput trade-off — now explicitly framed that way in the docs.

The benchmark + docs together should give operators concrete numbers to plan around. Skipped the formal design doc you mentioned since the two-transaction protocol is already documented in streaming-writes.md — the gap was the operational tuning, which is now back-filled with measurements. Happy to add a separate design doc if you'd still prefer one.

Object-storage measurements (S3 / GCS) are deliberately out of scope here — the docs flag this as "not yet measured" rather than guessing at numbers. Worth a follow-up PR with a parameterized run if useful.

@hamersaw
Copy link
Copy Markdown
Collaborator

Given these duration estimates I'm quite hesitant to move forward with this implementation. The reason being, if we are targeting batch-ish write speeds we would want 5s-30s write interval at the slowest. It doesn't seem like we can reliably make two manifest updates + write the data within that time frame, especially if we are doing frequent compactions in the background.

My biggest concern in merging this without adequate performance is that improving that will likely be a breaking change, because we can not adhere to the requirements of this PR (ex. epoch times in dataset manifest config).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support Spark structured streaming

2 participants