Skip to content

Destination drain#321

Merged
tonyxiao merged 6 commits intov2from
destination_drain
Apr 23, 2026
Merged

Destination drain#321
tonyxiao merged 6 commits intov2from
destination_drain

Conversation

@Yostra
Copy link
Copy Markdown
Collaborator

@Yostra Yostra commented Apr 22, 2026

Summary

Turns soft_time_limit into a useful tool to trigger destination flush instead of a fixed ±1s buffer around time_limit, and rebuilds the Sheets destination around it so we never checkpoint state past data we haven't written.

The old model

takeLimits took a single time_limit and silently split it into a soft cutoff at deadline − 1s and a hard cutoff at deadline + 1s. It was wrapped around the destination's output, so when the soft deadline fired the destination got return()'d in the middle of its flush. The Sheets destination defended against that with a finally { await flushAll() } — which mostly worked but had two sharp edges:

  1. Buffered source_state messages got yielded mid-stream, before the corresponding rows were actually in the sheet. A time-limit cut could leave the engine with a cursor ahead of the data.
  2. A long flushAll (wide catalogs take tens of seconds) could be killed by the same time_limit that was supposed to let it finish.

The new model

soft_time_limit is a cooperative source-side stop. It's a separate, explicit option on SourceReadOptions and a separate /pipeline_sync query param. When it fires, we close the source iterator with return(), the for await loop in the destination sees done, and the destination's own post-loop code runs — flush, emit state, yield final logs — bounded only by the hard time_limit. takeLimits stops inventing ±1s buffers; soft and hard are just two independent deadlines and whichever fires first wins.
soft_time_limit is dynamic per destination. ConnectorSpecification now accepts soft_limit_fraction (0–1). When a caller doesn't pass soft_time_limit explicitly, the engine derives it from the destination's spec: time_limit × fraction, falling back to time_limit − 1 for destinations that don't declare one (preserves the old behaviour for Postgres and friends). Google Sheets declares 0.5 because its flush tail can easily eat half the budget; a fast destination can stay at the default. This means the source-read window automatically shrinks for destinations that need more flush time — no caller tuning required.
The soft limit is the destination's flush cue. The Sheets destination no longer flushes in a finally. It now:

  • buffers source_state for the entire write() call alongside the record buffers,
  • treats $stdin closing (either natural EOF or limitSource.return() from a soft hit) as the one and only "time to drain" signal,
  • runs a single flushAll after the drain,
  • only yields the buffered state messages if that flush succeeded,
  • surfaces flush failures as connection_status: failed instead.
    If the hard time_limit interrupts the flush itself, the iterator's return() skips the state yield — which is exactly what we want. We'd rather re-sync the remainder next run than advance a cursor past rows that never landed.
    New limitSource helper on the engine side. It wraps takeLimits around the source stream but hides the synthetic terminal eof and exposes a stopped flag instead. That's what lets the destination's write() keep running after a soft cut — the destination sees a normal done, not a fake eof. The engine reads gate.stopped when it builds its own outgoing eof so has_more=true still reflects "we stopped because of a limit, come back for more."
    Flush heartbeats. While flushAll is in flight, uploadToSheet yields a flushing to Sheets (in progress, Ns) debug log every 20s (override via flushHeartbeatMs). Two reasons: Temporal stops treating the activity as idle, and humans tailing logs see that we're actually waiting on Sheets, not hung.

Supporting changes

  • 10M-cell-per-spreadsheet guard in applyBatch. We now add up current grid + append payload (and the projected post-expansion grid, to catch column growth) and throw locally with a readable message instead of waiting for Sheets to return an opaque API error. MAX_CELLS_PER_SPREADSHEET is exported so tests can assert against it.
  • Temporal drainMessages heartbeat switched to 15s wall-clock (was every 50 messages). Decouples heartbeat cadence from source throughput — a slow source no longer goes silent, and a fast source doesn't heartbeat-storm.
  • Default timeLimit bumped 30 → 300 in the service backfill loop, the backfill workflow, and the /pipelines/:id/backfill route. With the soft/hard split doing real work now, 30s didn't give destinations a fair flush window.
  • Merge grid expansion requests into the data batchUpdate in applyBatch so each flush makes a single Sheets API call instead of two, halving write-quota usage on expanding flushes.
  • Sheets sync path rewrote metadata reuse. Previously one getSpreadsheetMeta snapshot was reused across ensureSheets + ensureIntroSheet + protectSheets; now we refetch after ensureSheets so the "Sheet1 renamed" state is visible downstream. (Unrelated to the drain work, but it shook out of testing wide-catalog setup.)
  • A handful of log.warn calls in writer.ts that were just informational are now log.debug.

Behavioural shape, in one line

Soft limit = "source, wrap it up." Hard limit = "everyone, stop." Destinations declare how much slack they need between the two. State checkpoints never get ahead of durable writes.

How to test

  • Engine soft/hard split: new pipeline_sync() graceful close suite in engine.test.ts and takeLimits / limitSource suites in pipeline.test.ts cover natural completion, soft-cut drains, hard-cut forces, and the combined case.
  • Sheets ordering guarantees: state is re-emitted after flush, not mid-stream, state messages are suppressed when flushAll fails, and emits heartbeat log messages while flushAll is in flight in index.test.ts.
  • Cell cap: applyBatch cell-count limit suite covers single-batch overflow, projected-grid overflow, the update-only path, and end-to-end failure surfacing through dest.write().
    Quickest smoke test: run a Sheets backfill against a wide catalog with time_limit=60, confirm the response is eof.has_more=true with an ending_state that matches what's actually in the sheet (not ahead of it), and watch for the flushing to Sheets (in progress, Ns) log during the final drain.

@Yostra Yostra force-pushed the destination_drain branch 4 times, most recently from 2c7d0a5 to 7445efc Compare April 23, 2026 01:33
@Yostra Yostra marked this pull request as ready for review April 23, 2026 02:35
@Yostra Yostra force-pushed the destination_drain branch from 081d663 to 88e8089 Compare April 23, 2026 05:32
},
{
"in": "query",
"name": "soft_time_limit",
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this a query param i thought it is calculated automatically?

},
"additionalProperties": {}
},
"soft_limit_fraction": {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should make a note this only applies to destinations?

async function* gate(): AsyncIterable<T> {
for await (const msg of takeLimits<T>(opts)(source)) {
if (msg.type === 'eof') {
state.stopped = (msg as EofMessage).eof.has_more
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think you should just call this hasMore for ease of use

const elapsedSec = Math.round((Date.now() - flushStartedAt) / 1000)
log.info(`flushing to Sheets (in progress, ${elapsedSec}s)`)
yield {
type: 'log' as const,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't yeid log. use logger.debug

@tonyxiao tonyxiao merged commit 0306967 into v2 Apr 23, 2026
16 checks passed
@tonyxiao tonyxiao deleted the destination_drain branch April 23, 2026 06:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants