diff --git a/apps/engine/src/__generated__/openapi.d.ts b/apps/engine/src/__generated__/openapi.d.ts index 4b62ad88..fc30a2d1 100644 --- a/apps/engine/src/__generated__/openapi.d.ts +++ b/apps/engine/src/__generated__/openapi.d.ts @@ -399,6 +399,8 @@ export interface components { source_input?: { [key: string]: unknown; }; + /** @description Fraction of `time_limit` to use as default `soft_time_limit` (e.g. 0.5). */ + soft_limit_fraction?: number; }; }; ConnectionStatusMessage: { @@ -1011,6 +1013,8 @@ export interface operations { query?: { /** @description Stop streaming after N seconds. */ time_limit?: number; + /** @description Soft wall-clock deadline in seconds. Stops reading from the source between messages; the destination continues to drain and flush until time_limit fires. */ + soft_time_limit?: number; /** @description Optional sync run identifier used to track bounded sync progress. */ run_id?: string; }; @@ -1094,6 +1098,8 @@ export interface operations { query?: { /** @description Stop streaming after N seconds. */ time_limit?: number; + /** @description Soft wall-clock deadline in seconds. Stops reading from the source between messages; the destination continues to drain and flush until time_limit fires. */ + soft_time_limit?: number; /** @description Optional sync run identifier used to track bounded sync progress. */ run_id?: string; }; diff --git a/apps/engine/src/__generated__/openapi.json b/apps/engine/src/__generated__/openapi.json index eee77530..69585d0e 100644 --- a/apps/engine/src/__generated__/openapi.json +++ b/apps/engine/src/__generated__/openapi.json @@ -331,12 +331,23 @@ "name": "time_limit", "schema": { "description": "Stop streaming after N seconds.", - "example": "10", + "example": "300", "type": "number", "exclusiveMinimum": 0 }, "description": "Stop streaming after N seconds." }, + { + "in": "query", + "name": "soft_time_limit", + "schema": { + "description": "Soft wall-clock deadline in seconds. Stops reading from the source between messages; the destination continues to drain and flush until time_limit fires.", + "example": "150", + "type": "number", + "exclusiveMinimum": 0 + }, + "description": "Soft wall-clock deadline in seconds. Stops reading from the source between messages; the destination continues to drain and flush until time_limit fires." + }, { "in": "query", "name": "run_id", @@ -513,12 +524,23 @@ "name": "time_limit", "schema": { "description": "Stop streaming after N seconds.", - "example": "10", + "example": "300", "type": "number", "exclusiveMinimum": 0 }, "description": "Stop streaming after N seconds." }, + { + "in": "query", + "name": "soft_time_limit", + "schema": { + "description": "Soft wall-clock deadline in seconds. Stops reading from the source between messages; the destination continues to drain and flush until time_limit fires.", + "example": "150", + "type": "number", + "exclusiveMinimum": 0 + }, + "description": "Soft wall-clock deadline in seconds. Stops reading from the source between messages; the destination continues to drain and flush until time_limit fires." + }, { "in": "query", "name": "run_id", @@ -1198,6 +1220,12 @@ "type": "string" }, "additionalProperties": {} + }, + "soft_limit_fraction": { + "description": "Fraction of `time_limit` to use as default `soft_time_limit` (e.g. 0.5).", + "type": "number", + "exclusiveMinimum": 0, + "maximum": 1 } }, "required": [ diff --git a/apps/engine/src/api/app.ts b/apps/engine/src/api/app.ts index 6bf257b0..cb0af6b6 100644 --- a/apps/engine/src/api/app.ts +++ b/apps/engine/src/api/app.ts @@ -179,8 +179,19 @@ export async function createApp(resolver: ConnectorResolver) { const syncQueryParams = z.object({ time_limit: z.coerce.number().positive().optional().meta({ description: 'Stop streaming after N seconds.', - example: '10', + example: '300', }), + soft_time_limit: z.coerce + .number() + .positive() + .optional() + .meta({ + description: + 'Soft wall-clock deadline in seconds. Stops reading from the source ' + + 'between messages; the destination continues to drain and flush until ' + + 'time_limit fires.', + example: '150', + }), run_id: z.string().optional().meta({ description: 'Optional sync run identifier used to track bounded sync progress.', example: 'run_demo', @@ -529,7 +540,7 @@ export async function createApp(resolver: ConnectorResolver) { }, }) app.openapi(pipelineSyncRoute, async (c) => { - const { time_limit, run_id } = c.req.valid('query') + const { time_limit, soft_time_limit, run_id } = c.req.valid('query') const pipeline = requireHeaderValue( c.req.valid('header')['x-pipeline'], @@ -564,7 +575,11 @@ export async function createApp(resolver: ConnectorResolver) { ) const ac = createConnectionAbort(c, onDisconnect) - const output = engine.pipeline_sync(pipeline, { state, time_limit, run_id }, input) + const output = engine.pipeline_sync( + pipeline, + { state, time_limit, soft_time_limit, run_id }, + input + ) return ndjsonResponse(logApiStream('Engine API /pipeline_sync', output, context, startedAt), { signal: ac.signal, }) diff --git a/apps/engine/src/lib/engine.test.ts b/apps/engine/src/lib/engine.test.ts index 630a0cdf..fa489d94 100644 --- a/apps/engine/src/lib/engine.test.ts +++ b/apps/engine/src/lib/engine.test.ts @@ -1687,6 +1687,163 @@ describe('engine.pipeline_sync() pipeline', () => { }) }) +// --------------------------------------------------------------------------- +// engine.pipeline_sync() graceful close (soft_time_limit) +// --------------------------------------------------------------------------- + +describe('engine.pipeline_sync() graceful close', () => { + /** + * Mirrors destination-google-sheets: records and source_state are buffered + * during the loop; flushAll and state yields run after $stdin ends + * (no finally — iterator.return() drops the batch by design). + */ + function makeBufferingDestination(flushLog: string[]): Destination { + return { + async *spec() { + yield { type: 'spec', spec: { config: {} } } as SpecOutput + }, + async *check() { + yield { type: 'connection_status', connection_status: { status: 'succeeded' } } + }, + async *write(_params, $stdin) { + const bufferedRecords: string[] = [] + const bufferedStates: SourceStateMessage[] = [] + for await (const msg of $stdin) { + if (msg.type === 'record') { + bufferedRecords.push((msg.record.data as { id: string }).id) + } else if (msg.type === 'source_state') { + bufferedStates.push(msg as SourceStateMessage) + } + } + flushLog.push(`flushed:${bufferedRecords.join(',')}`) + for (const state of bufferedStates) { + yield state + } + }, + } + } + + function customersSource(readBody: () => AsyncIterable): Source { + return { + async *spec() { + yield { type: 'spec', spec: { config: {} } } as SpecOutput + }, + async *check() { + yield { type: 'connection_status', connection_status: { status: 'succeeded' } } + }, + async *discover() { + yield { + type: 'catalog', + catalog: { streams: [{ name: 'customers', primary_key: [['id']] }] }, + } as CatalogMessage + }, + async *read() { + yield* readBody() + }, + } + } + + it('eof.has_more=false on natural source completion', async () => { + const flushLog: string[] = [] + async function* body(): AsyncIterable { + yield { + type: 'record', + record: { + stream: 'customers', + data: { id: 'cus_1' }, + emitted_at: '2024-01-01T00:00:00.000Z', + }, + } satisfies RecordMessage + yield { + type: 'source_state', + source_state: { state_type: 'stream', stream: 'customers', data: { cursor: 'cus_1' } }, + } satisfies SourceStateMessage + } + + const engine = await createEngine( + makeResolver(customersSource(body), makeBufferingDestination(flushLog)) + ) + const results = await drain(engine.pipeline_sync(defaultPipeline)) + + const eof = results.find((m) => m.type === 'eof')! + expect(eof.eof.has_more).toBe(false) + expect(flushLog).toEqual(['flushed:cus_1']) + const states = results.filter((m) => m.type === 'source_state') + expect(states).toHaveLength(1) + }) + + it("state emitted after destination's flush is reflected in eof.ending_state", async () => { + const flushLog: string[] = [] + async function* body(): AsyncIterable { + yield { + type: 'record', + record: { + stream: 'customers', + data: { id: 'cus_1' }, + emitted_at: '2024-01-01T00:00:00.000Z', + }, + } satisfies RecordMessage + yield { + type: 'source_state', + source_state: { + state_type: 'stream', + stream: 'customers', + data: { cursor: 'cus_1' }, + }, + } satisfies SourceStateMessage + } + + const engine = await createEngine( + makeResolver(customersSource(body), makeBufferingDestination(flushLog)) + ) + const results = await drain(engine.pipeline_sync(defaultPipeline)) + const eof = results.find((m) => m.type === 'eof')! + expect(eof.eof.ending_state?.source.streams.customers).toEqual({ cursor: 'cus_1' }) + }) + + it('soft_time_limit drains destination; eof.has_more=true with post-flush state', async () => { + const flushLog: string[] = [] + async function* body(): AsyncIterable { + let i = 0 + while (true) { + yield { + type: 'record', + record: { + stream: 'customers', + data: { id: `cus_${++i}` }, + emitted_at: '2024-01-01T00:00:00.000Z', + }, + } satisfies RecordMessage + yield { + type: 'source_state', + source_state: { + state_type: 'stream', + stream: 'customers', + data: { cursor: `cus_${i}` }, + }, + } satisfies SourceStateMessage + await new Promise((r) => setTimeout(r, 20)) + } + } + + const engine = await createEngine( + makeResolver(customersSource(body), makeBufferingDestination(flushLog)) + ) + const results = await drain( + engine.pipeline_sync(defaultPipeline, { soft_time_limit: 0.3, time_limit: 5 }) + ) + + const eof = results.find((m) => m.type === 'eof')! + expect(eof.eof.has_more).toBe(true) + // Destination ran its finally (flush happened) + expect(flushLog.length).toBe(1) + // Engine received post-flush state and advanced ending_state + const states = results.filter((m) => m.type === 'source_state') + expect(states.length).toBeGreaterThan(0) + expect(eof.eof.ending_state?.source.streams.customers).toHaveProperty('cursor') + }, 10_000) +}) + function waitForAbortOrRelease( signal: AbortSignal, onAbort: () => void, diff --git a/apps/engine/src/lib/engine.ts b/apps/engine/src/lib/engine.ts index f5e5de0e..f5ae53be 100644 --- a/apps/engine/src/lib/engine.ts +++ b/apps/engine/src/lib/engine.ts @@ -24,7 +24,7 @@ import { const engineMsg = createEngineMessageFactory() import { log } from '../logger.js' -import { enforceCatalog, filterType, tapLog, pipe, takeLimits } from './pipeline.js' +import { enforceCatalog, filterType, tapLog, pipe, takeLimits, limitSource } from './pipeline.js' import { createInitialProgress, progressReducer } from './progress/index.js' import { stateReducer, isProgressTrigger } from './state-reducer.js' import { applySelection, excludeTerminalStreams } from './destination-filter.js' @@ -37,6 +37,8 @@ export const SourceReadOptions = z.object({ state: z.unknown().optional(), /** Wall-clock time limit in seconds; the stream stops after this duration. */ time_limit: z.number().positive().optional(), + /** Wall-clock time limit in seconds; the source read stops after this duration. */ + soft_time_limit: z.number().positive().optional(), /** Identifies the current sync run. If it differs from state.sync_run.run_id, run progress is reset. */ run_id: z.string().optional(), }) @@ -46,6 +48,7 @@ export interface SourceReadOptions { | { streams: Record; global: Record } | Record time_limit?: number + soft_time_limit?: number run_id?: string } @@ -215,13 +218,17 @@ function configPayload(envelope: { async function getSpec( connector: { spec(): AsyncIterable }, rawConfig: Record -): Promise<{ config: Record; streamStateSchema?: z.ZodType }> { +): Promise<{ + config: Record + streamStateSchema?: z.ZodType + softLimitFraction?: number +}> { const specMsg = await collectFirst(connector.spec(), 'spec') const config = z.fromJSONSchema(specMsg.spec.config).parse(rawConfig) as Record const streamStateSchema = specMsg.spec.source_state_stream ? z.fromJSONSchema(specMsg.spec.source_state_stream) : undefined - return { config, streamStateSchema } + return { config, streamStateSchema, softLimitFraction: specMsg.spec.soft_limit_fraction } } /** Discover and build catalog for a pipeline. */ @@ -254,7 +261,11 @@ async function resolvePipeline( const normalizedState = parseSyncState(state, srcSpec.streamStateSchema) return { source: { connector: srcConnector, config: srcSpec.config }, - destination: { connector: destConnector, config: destSpec.config }, + destination: { + connector: destConnector, + config: destSpec.config, + softLimitFraction: destSpec.softLimitFraction, + }, catalog, filteredCatalog, state: normalizedState, @@ -334,6 +345,21 @@ function emit(msg: Record): SyncOutput { return { ...msg, _emitted_by: 'engine', _ts: new Date().toISOString() } as unknown as SyncOutput } +/** + * Derive `soft_time_limit` from `time_limit` when the caller didn't set one. + * Destinations can request a fraction via `spec.soft_limit_fraction` + * (e.g. 0.5 for Sheets); otherwise soft = time_limit - 1 (mirrors the old + * two-phase takeLimits behaviour). Returns undefined for `time_limit < 2`. + */ +function defaultSoftTimeLimit( + timeLimit: number | undefined, + fraction: number | undefined +): number | undefined { + if (timeLimit == null || timeLimit < 2) return undefined + const derived = fraction != null ? timeLimit * fraction : timeLimit - 1 + return derived > 0 ? derived : undefined +} + /** Accumulate source state from messages. Pure. */ // MARK: - Factory @@ -504,6 +530,7 @@ export async function createEngine(resolver: ConnectorResolver): Promise const parsed = map(raw, (msg) => Message.parse(msg)) yield* takeLimits({ time_limit: opts?.time_limit, + soft_time_limit: opts?.time_limit ? opts?.time_limit - 1 : undefined, signal, })(parsed) as AsyncIterable })() @@ -562,7 +589,18 @@ export async function createEngine(resolver: ConnectorResolver): Promise input ) - const destInput = pipe(sourceOutput, enforceCatalog(activeFilteredCatalog), tapLog) + // Graceful close: soft_time_limit stops reading from the source + // between messages while the destination drains/flushes until + // time_limit fires on destOutput. Default: `time_limit - 1`, or + // `time_limit * spec.soft_limit_fraction` for destinations with + // long flush tails (e.g. Sheets: 0.5). + const softTimeLimit = + opts?.soft_time_limit ?? + defaultSoftTimeLimit(opts?.time_limit, p.destination.softLimitFraction) + // signal is enforced on destOutput's takeLimits below — don't duplicate here. + const sourceGate = limitSource(sourceOutput, { soft_time_limit: softTimeLimit }) + + const destInput = pipe(sourceGate.iterable, enforceCatalog(activeFilteredCatalog), tapLog) const destOutput = p.destination.connector.write( { config: p.destination.config, catalog: activeFilteredCatalog }, destInput @@ -581,7 +619,7 @@ export async function createEngine(resolver: ConnectorResolver): Promise yield emit( engineMsg.eof({ status: runProgress.derived.status, - has_more: hasMore, + has_more: hasMore || sourceGate.stopped, ending_state: syncState, run_progress: runProgress, request_progress: requestProgress, diff --git a/apps/engine/src/lib/pipeline.test.ts b/apps/engine/src/lib/pipeline.test.ts index bfc5f153..247666d5 100644 --- a/apps/engine/src/lib/pipeline.test.ts +++ b/apps/engine/src/lib/pipeline.test.ts @@ -1,6 +1,6 @@ import { describe, expect, it, vi, beforeEach } from 'vitest' import type { ConfiguredCatalog, DestinationOutput, Message } from '@stripe/sync-protocol' -import { enforceCatalog, filterType, tapLog, pipe, takeLimits } from './pipeline.js' +import { enforceCatalog, filterType, tapLog, pipe, takeLimits, limitSource } from './pipeline.js' vi.mock('../logger.js', () => ({ log: { @@ -415,7 +415,7 @@ describe('takeLimits()', () => { expect(result.length).toBeLessThanOrEqual(3) }) - it('soft cutoff: emits eof with time_limit reason between messages when deadline-1s crossed', async () => { + it('soft_time_limit: stops between messages cooperatively', async () => { async function* fastMessages(): AsyncIterable { let i = 0 while (true) { @@ -432,16 +432,15 @@ describe('takeLimits()', () => { } const start = Date.now() - const result = await drain(takeLimits({ time_limit: 3 })(fastMessages())) + const result = await drain(takeLimits({ soft_time_limit: 0.5 })(fastMessages())) const elapsed = Date.now() - start const eof = result.at(-1) as any expect(eof).toMatchObject({ type: 'eof', eof: { has_more: true } }) - // Soft deadline fires at ~2s (deadline - 1s buffer) - expect(elapsed).toBeGreaterThan(1500) - expect(elapsed).toBeLessThan(4000) + expect(elapsed).toBeGreaterThanOrEqual(500) + expect(elapsed).toBeLessThan(1500) }) - it('hard cutoff: forces return when source blocks past deadline+1s', async () => { + it('time_limit: hard cutoff forces return even if upstream is blocked', async () => { async function* blockingSource(): AsyncIterable { yield { type: 'record', @@ -451,7 +450,6 @@ describe('takeLimits()', () => { emitted_at: '2024-01-01T00:00:00.000Z', }, } - // Block for 10 seconds — way past the hard deadline await new Promise((r) => setTimeout(r, 10_000)) yield { type: 'record', @@ -464,14 +462,40 @@ describe('takeLimits()', () => { } const start = Date.now() - const result = await drain(takeLimits({ time_limit: 2 })(blockingSource())) + const result = await drain(takeLimits({ time_limit: 0.5 })(blockingSource())) const elapsed = Date.now() - start const eof = result.at(-1) as any expect(eof).toMatchObject({ type: 'eof', eof: { has_more: true } }) - // Hard deadline fires at ~3s (deadline + 1s), NOT at 10s - expect(elapsed).toBeGreaterThan(2000) - expect(elapsed).toBeLessThan(5000) - }, 10_000) + expect(elapsed).toBeGreaterThanOrEqual(400) + expect(elapsed).toBeLessThan(2000) + }, 5_000) + + it('soft and hard together: soft wins when reached first', async () => { + async function* fastMessages(): AsyncIterable { + let i = 0 + while (true) { + yield { + type: 'record', + record: { + stream: 'customers', + data: { id: `cus_${++i}` }, + emitted_at: '2024-01-01T00:00:00.000Z', + }, + } + await new Promise((r) => setTimeout(r, 20)) + } + } + + const start = Date.now() + const result = await drain( + takeLimits({ soft_time_limit: 0.3, time_limit: 2 })(fastMessages()) + ) + const elapsed = Date.now() - start + const eof = result.at(-1) as any + expect(eof).toMatchObject({ type: 'eof', eof: { has_more: true } }) + expect(elapsed).toBeGreaterThanOrEqual(300) + expect(elapsed).toBeLessThan(1500) + }) it('abort signal: terminates immediately when signal is aborted', async () => { async function* infiniteSource(): AsyncIterable { @@ -560,6 +584,64 @@ describe('takeLimits()', () => { }) }) +// --------------------------------------------------------------------------- +// limitSource() +// --------------------------------------------------------------------------- + +describe('limitSource()', () => { + it('passes messages through and reports stopped=false on natural completion', async () => { + const msgs: Message[] = [ + { + type: 'record', + record: { + stream: 'customers', + data: { id: 'cus_1' }, + emitted_at: '2024-01-01T00:00:00.000Z', + }, + }, + { + type: 'source_state', + source_state: { state_type: 'stream', stream: 'customers', data: { cursor: '1' } }, + }, + ] + const gate = limitSource(toAsync(msgs)) + const result = await drain(gate.iterable) + expect(result).toHaveLength(2) + expect(gate.stopped).toBe(false) + }) + + it('reports stopped=true when a limit fires, and never yields the synthetic eof', async () => { + const ac = new AbortController() + ac.abort() + const gate = limitSource(toAsync([]), { signal: ac.signal }) + const result = await drain(gate.iterable) + expect(result).toHaveLength(0) + expect(gate.stopped).toBe(true) + }) + + it('reports stopped=true when soft_time_limit fires between messages', async () => { + async function* fastMessages(): AsyncIterable { + let i = 0 + while (true) { + yield { + type: 'record', + record: { + stream: 'customers', + data: { id: `cus_${++i}` }, + emitted_at: '2024-01-01T00:00:00.000Z', + }, + } + await new Promise((r) => setTimeout(r, 20)) + } + } + const gate = limitSource(fastMessages(), { soft_time_limit: 0.2 }) + const result = await drain(gate.iterable) + expect(result.length).toBeGreaterThan(0) + expect(result.every((m) => m.type !== 'eof')).toBe(true) + expect(gate.stopped).toBe(true) + }) +}) + // --------------------------------------------------------------------------- // pipe() // --------------------------------------------------------------------------- diff --git a/apps/engine/src/lib/pipeline.ts b/apps/engine/src/lib/pipeline.ts index 9191fa92..36a89baa 100644 --- a/apps/engine/src/lib/pipeline.ts +++ b/apps/engine/src/lib/pipeline.ts @@ -104,23 +104,22 @@ export function filterType( export interface TakeLimitsOptions { time_limit?: number + soft_time_limit?: number signal?: AbortSignal } -const DEADLINE_BUFFER_MS = 1000 - /** * Applies stream limits and emits an `eof` terminal message as the final item. * - * - `time_limit`: two-phase wall-clock deadline: - * - **soft** (deadline − 1 s): checked between messages, graceful return - * - **hard** (deadline + 1 s): `Promise.race` forces return even if upstream blocks - * For short time limits (< 2 s) soft = hard = deadline. - * - `signal`: external `AbortSignal` (e.g. client disconnect). When aborted the - * stream terminates immediately with `reason: 'aborted'`. + * - `soft_time_limit`: between-message cooperative deadline. Closes the + * iterator via `return()`, letting upstream `finally` blocks run. + * - `time_limit`: hard deadline enforced via `Promise.race`. Forces eof even + * if upstream is blocked. + * - `signal`: external `AbortSignal` (e.g. client disconnect). Terminates + * immediately. * * When multiple limits are set, whichever fires first wins. - * The last yielded item is always `{ type: 'eof', eof: { reason, ... } }`. + * The last yielded item is always `{ type: 'eof', eof: { has_more } }`. */ export function takeLimits( opts: TakeLimitsOptions = {} @@ -128,29 +127,24 @@ export function takeLimits( return async function* (messages) { const startedAt = Date.now() - const hasTimeLimit = opts.time_limit != null && opts.time_limit > 0 - const nominalDeadline = hasTimeLimit ? startedAt + opts.time_limit! * 1000 : undefined const softDeadline = - nominalDeadline != null - ? opts.time_limit! >= 2 - ? nominalDeadline - DEADLINE_BUFFER_MS - : nominalDeadline + opts.soft_time_limit != null && opts.soft_time_limit > 0 + ? startedAt + opts.soft_time_limit * 1000 : undefined const hardDeadline = - nominalDeadline != null - ? opts.time_limit! >= 2 - ? nominalDeadline + DEADLINE_BUFFER_MS - : nominalDeadline + opts.time_limit != null && opts.time_limit > 0 + ? startedAt + opts.time_limit * 1000 : undefined const needsRace = hardDeadline != null || opts.signal != null + const needsSlowPath = needsRace || softDeadline != null function makeEof(hasMore: boolean): EofMessage { return { type: 'eof' as const, eof: { has_more: hasMore } } as EofMessage } - // Fast path: no time limit and no signal — simple cooperative loop - if (!needsRace) { + // Fast path: no limits and no signal — simple cooperative loop + if (!needsSlowPath) { for await (const msg of messages) { yield msg } @@ -235,7 +229,6 @@ export function takeLimits( 'SYNC_TIME_LIMIT_HARD' ) yield makeEof(true) - // Fire-and-forget: don't await return() since the iterator may be blocked closeIteratorInBackground() return } @@ -257,12 +250,11 @@ export function takeLimits( const msg = result.value yield msg - // Check soft deadline between messages if (softDeadline != null && Date.now() >= softDeadline) { log.warn( { elapsed_ms: Date.now() - startedAt, - time_limit: opts.time_limit, + soft_time_limit: opts.soft_time_limit, event: 'SYNC_TIME_LIMIT_SOFT', }, 'SYNC_TIME_LIMIT_SOFT' @@ -279,6 +271,54 @@ export function takeLimits( } } +// MARK: - limitSource + +/** + * Handle returned by {@link limitSource}. Read `stopped` *after* draining the + * iterable to decide whether the upstream stopped because of a limit (used to + * set `eof.has_more`). + */ +export interface LimitSourceHandle { + iterable: AsyncIterable + /** True once a limit fired (soft_time_limit, time_limit, or signal). */ + readonly stopped: boolean +} + +/** + * Source-side graceful stop. Wraps {@link takeLimits} and hides its synthetic + * terminal `eof` marker — when a limit fires, `stopped` flips true and the + * iterable returns done, letting downstream stages (e.g. destination + * `write()`) run their `finally` blocks and yield post-teardown messages + * naturally (such as flushed state updates). + * + * Typical usage: + * const gate = limitSource(sourceOutput, { soft_time_limit, signal }) + * const destOutput = destination.write(cfg, pipe(gate.iterable, ...)) + * for await (const msg of destOutput) { ... } + * // Use gate.stopped to populate eof.has_more + */ +export function limitSource( + source: AsyncIterable, + opts: TakeLimitsOptions = {} +): LimitSourceHandle { + const state = { stopped: false } + async function* gate(): AsyncIterable { + for await (const msg of takeLimits(opts)(source)) { + if (msg.type === 'eof') { + state.stopped = (msg as EofMessage).eof.has_more + return + } + yield msg as T + } + } + return { + iterable: gate(), + get stopped() { + return state.stopped + }, + } +} + // MARK: - collect /** diff --git a/apps/engine/src/lib/remote-engine.ts b/apps/engine/src/lib/remote-engine.ts index cdfcecd1..1e84b00e 100644 --- a/apps/engine/src/lib/remote-engine.ts +++ b/apps/engine/src/lib/remote-engine.ts @@ -67,6 +67,7 @@ export function createRemoteEngine(engineUrl: string): Engine { function queryParams(opts?: SourceReadOptions & { only?: string }): Record { const q: Record = {} if (opts?.time_limit != null) q.time_limit = String(opts.time_limit) + if (opts?.soft_time_limit != null) q.soft_time_limit = String(opts.soft_time_limit) if (opts?.run_id != null) q.run_id = opts.run_id if (opts?.only != null) q.only = opts.only return q diff --git a/apps/service/src/api/app.ts b/apps/service/src/api/app.ts index 6df0a7db..532f611d 100644 --- a/apps/service/src/api/app.ts +++ b/apps/service/src/api/app.ts @@ -836,7 +836,7 @@ export function createApp(options: AppOptions) { const result = await runBackfillToCompletion({ pipelineSync: activities.pipelineSync }, id, { syncState: pipeline.sync_state ?? emptySyncState(), syncRunId, - timeLimit: time_limit ?? 30, + timeLimit: time_limit ?? 300, }) return c.json({ eof: result.eof, sync_state: result.syncState }, 200) diff --git a/apps/service/src/temporal/activities/_shared.ts b/apps/service/src/temporal/activities/_shared.ts index 196e4448..f07a0724 100644 --- a/apps/service/src/temporal/activities/_shared.ts +++ b/apps/service/src/temporal/activities/_shared.ts @@ -41,6 +41,7 @@ export async function drainMessages( let destConfig: Record | undefined let eof: EofPayload | undefined let count = 0 + let lastHb = 0 for await (const message of stream) { count++ @@ -53,9 +54,13 @@ export async function drainMessages( destConfig = message.control.destination_config! } } - if (count % 50 === 0) heartbeat({ messages: count }) + const now = Date.now() + if (now - lastHb >= 15_000) { + heartbeat({ messages: count }) + lastHb = now + } } - if (count % 50 !== 0) heartbeat({ messages: count }) + heartbeat({ messages: count }) return { sourceConfig, destConfig, eof } } diff --git a/apps/service/src/temporal/activities/pipeline-sync.ts b/apps/service/src/temporal/activities/pipeline-sync.ts index ca0dd310..d85e3c81 100644 --- a/apps/service/src/temporal/activities/pipeline-sync.ts +++ b/apps/service/src/temporal/activities/pipeline-sync.ts @@ -10,9 +10,13 @@ export function createPipelineSyncActivity(context: ActivitiesContext) { opts?: SourceReadOptions & { input?: SourceInputMessage[] } ): Promise<{ eof: EofPayload }> { const pipeline = await context.pipelineStore.get(pipelineId) + const { id: _, ...config } = pipeline const { input: inputArr, ...readOpts } = opts ?? {} const input = inputArr?.length ? asIterable(inputArr) : undefined + + // Destination-specific soft_time_limit defaults now live in the engine + // (driven by spec.soft_limit_fraction). The activity just forwards readOpts. const initialState = parseSyncState(readOpts.state) const { sourceConfig, destConfig, eof } = await drainMessages( context.engine.pipeline_sync(config, readOpts, input), diff --git a/apps/service/src/temporal/lib/backfill-loop.ts b/apps/service/src/temporal/lib/backfill-loop.ts index 1b8ef214..41c5501e 100644 --- a/apps/service/src/temporal/lib/backfill-loop.ts +++ b/apps/service/src/temporal/lib/backfill-loop.ts @@ -5,6 +5,7 @@ export interface BackfillLoopOpts { syncState: SyncState syncRunId: string timeLimit?: number + softLimit?: number } /** @@ -18,7 +19,7 @@ export async function backfillStep( ): Promise<{ eof: EofPayload; syncState: SyncState }> { const { eof } = await activities.pipelineSync(pipelineId, { state: opts.syncState, - time_limit: opts.timeLimit ?? 30, + time_limit: opts.timeLimit ?? 300, run_id: opts.syncRunId, }) const syncState = eof.ending_state ?? opts.syncState diff --git a/apps/service/src/temporal/workflows/pipeline-backfill.ts b/apps/service/src/temporal/workflows/pipeline-backfill.ts index 486d80f3..3ce8a62f 100644 --- a/apps/service/src/temporal/workflows/pipeline-backfill.ts +++ b/apps/service/src/temporal/workflows/pipeline-backfill.ts @@ -29,12 +29,11 @@ export async function pipelineBackfill( const syncRunId = workflowInfo().runId let syncState = opts.syncState let operationCount = 0 - while (true) { const result = await backfillStep({ pipelineSync }, pipelineId, { syncState, syncRunId, - timeLimit: 30, + timeLimit: 300, }) syncState = result.syncState operationCount++ diff --git a/packages/destination-google-sheets/src/index.test.ts b/packages/destination-google-sheets/src/index.test.ts index 0d0cd434..3e15b606 100644 --- a/packages/destination-google-sheets/src/index.test.ts +++ b/packages/destination-google-sheets/src/index.test.ts @@ -8,7 +8,7 @@ import { ROW_NUMBER_FIELD, type Config, } from './index.js' -import { readSheet } from './writer.js' +import { applyBatch, MAX_CELLS_PER_SPREADSHEET, readSheet, type StreamBatchOps } from './writer.js' import { createMemorySheets } from '../__tests__/memory-sheets.js' /** Collect all output from the destination's write() generator. */ @@ -88,7 +88,9 @@ describe('destination-google-sheets', () => { expect(rows[5]).toEqual(['5']) }) - it('state passthrough — flushes buffer then re-emits state', async () => { + it('state is re-emitted after flush, not mid-stream', async () => { + // State messages are buffered and yielded only after flushAll succeeds, + // so the engine only advances its checkpoint once the data is durable. const { sheets, getData, getSpreadsheetIds } = createMemorySheets() const dest = createDestination(sheets) @@ -103,7 +105,6 @@ describe('destination-google-sheets', () => { dest.write({ config: cfg({ batch_size: 100 }), catalog }, toAsyncIter(messages)) ) - // State should be re-emitted (envelope format) const states = output.filter((m) => m.type === 'source_state') expect(states).toHaveLength(1) expect(states[0]).toMatchObject({ @@ -111,12 +112,87 @@ describe('destination-google-sheets', () => { source_state: { stream: 'orders', data: { cursor: 'o2' } }, }) - // All 3 records should be written (2 flushed by state, 1 flushed at end) + // Ordering: every record passthrough precedes every state in the output. + const lastRecordIdx = output.findLastIndex((m) => m.type === 'record') + const firstStateIdx = output.findIndex((m) => m.type === 'source_state') + expect(lastRecordIdx).toBeGreaterThanOrEqual(0) + expect(firstStateIdx).toBeGreaterThan(lastRecordIdx) + + // All 3 records should be written (flushed at end before state was yielded) const id = getSpreadsheetIds()[0] const rows = getData(id, 'orders')! expect(rows).toHaveLength(4) // header + 3 rows }) + it('emits heartbeat log messages while flushAll is in flight', async () => { + // Slow batchUpdate + low flushHeartbeatMs to observe the heartbeat loop (keeps HTTP responses non-idle). + const { sheets } = createMemorySheets() + const originalBatchUpdate = sheets.spreadsheets.batchUpdate.bind(sheets.spreadsheets) + sheets.spreadsheets.batchUpdate = (async (params: unknown) => { + await new Promise((r) => setTimeout(r, 120)) + return originalBatchUpdate(params as Parameters[0]) + }) as unknown as typeof sheets.spreadsheets.batchUpdate + + const dest = createDestination(sheets, { flushHeartbeatMs: 20 }) + const messages: DestinationInput[] = [ + record('beat', { id: 'b1' }), + state('beat', { cursor: 'b1' }), + ] + + const output = await collect(dest.write({ config: cfg(), catalog }, toAsyncIter(messages))) + + const heartbeats = output.filter( + (m) => m.type === 'log' && m.log.message.startsWith('flushing to Sheets') + ) + expect(heartbeats.length).toBeGreaterThanOrEqual(1) + + // State still emits after the flush completes + const states = output.filter((m) => m.type === 'source_state') + expect(states).toHaveLength(1) + // And every heartbeat precedes the state + const lastHeartbeatIdx = output.findLastIndex( + (m) => m.type === 'log' && m.log.message.startsWith('flushing to Sheets') + ) + const stateIdx = output.findIndex((m) => m.type === 'source_state') + expect(lastHeartbeatIdx).toBeLessThan(stateIdx) + }) + + it('state messages are suppressed when flushAll fails', async () => { + // If the flush throws, we must NOT yield buffered state — otherwise the + // engine would checkpoint cursors the sheet never received. + const { sheets } = createMemorySheets() + // Force batchUpdate to fail so applyBatch throws inside flushAll. + const originalBatchUpdate = sheets.spreadsheets.batchUpdate.bind(sheets.spreadsheets) + let firstBatch = true + sheets.spreadsheets.batchUpdate = (async (params: unknown) => { + if (firstBatch) { + // allow initial sheet creation to succeed + firstBatch = false + return originalBatchUpdate(params as Parameters[0]) + } + // 400 is non-retriable, so withRetry doesn't back-off 30+ seconds + throw Object.assign(new Error('boom'), { code: 400 }) + }) as unknown as typeof sheets.spreadsheets.batchUpdate + + const dest = createDestination(sheets) + const messages: DestinationInput[] = [ + record('orders', { id: 'o1' }), + state('orders', { cursor: 'o1' }), + ] + + const output = await collect( + dest.write({ config: cfg({ batch_size: 100 }), catalog }, toAsyncIter(messages)) + ) + + // No state should escape since flush failed. + expect(output.filter((m) => m.type === 'source_state')).toHaveLength(0) + // A failed connection_status should surface instead. + const connFail = output.find( + (m) => m.type === 'connection_status' && m.connection_status.status === 'failed' + ) + expect(connFail).toBeDefined() + }) + it('multi-stream — two streams get independent tabs and headers', async () => { const { sheets, getData, getSpreadsheetIds } = createMemorySheets() const dest = createDestination(sheets) @@ -788,3 +864,134 @@ describe('makeSheetsClient env var fallback', () => { }).rejects.toThrow('client_secret required (provide in config or set GOOGLE_CLIENT_SECRET)') }) }) + +describe('applyBatch cell-count limit', () => { + // Enforces the 10M-cell per-spreadsheet cap locally so the failure is loud rather than an opaque API reject. + + async function setupSpreadsheet() { + const { sheets, getSpreadsheetIds } = createMemorySheets() + const created = await sheets.spreadsheets.create({ + requestBody: { properties: { title: 'Limit Test' } }, + }) + const spreadsheetId = created.data.spreadsheetId! + const meta = await sheets.spreadsheets.get({ spreadsheetId }) + const sheetId = meta.data.sheets![0]!.properties!.sheetId! + return { sheets, spreadsheetId, sheetId, getSpreadsheetIds } + } + + /** Inflate reported grid dimensions so applyBatch sees a near-cap spreadsheet without writing millions of rows. */ + function overrideGridProperties( + sheets: Parameters[0], + rowCount: number, + columnCount: number + ) { + type InflatedResponse = { + data: { + sheets?: Array<{ + properties?: { gridProperties?: { rowCount?: number; columnCount?: number } } + }> + } + } + const originalGet = sheets.spreadsheets.get.bind(sheets.spreadsheets) as unknown as ( + params: unknown + ) => Promise + sheets.spreadsheets.get = (async (params: unknown) => { + const response = await originalGet(params) + for (const s of response.data.sheets ?? []) { + if (s.properties?.gridProperties) { + s.properties.gridProperties.rowCount = rowCount + s.properties.gridProperties.columnCount = columnCount + } + } + return response + }) as unknown as typeof sheets.spreadsheets.get + } + + it('throws when a single flush tries to write more than 10 million cells', async () => { + const { sheets, spreadsheetId, sheetId } = await setupSpreadsheet() + + // 10,001 rows × 1,001 cells ≈ 10.01M (shared row array — applyBatch only reads row.length). + const wideRow: string[] = new Array(1001).fill('x') + const appends: string[][] = new Array(10_001).fill(wideRow) + + const opsByStream = new Map([ + ['Sheet1', { sheetId, updates: [], appends, existingRowCount: 0 }], + ]) + + await expect(applyBatch(sheets, spreadsheetId, opsByStream)).rejects.toThrow( + /refusing to flush .* cells in a single batch/ + ) + }) + + it('throws when current grid + appended cells would cross 10 million', async () => { + const { sheets, spreadsheetId, sheetId } = await setupSpreadsheet() + + // Pretend the sheet already has 999,900 × 10 = 9,999,000 cells allocated. + overrideGridProperties(sheets, 999_900, 10) + + // Append 200 × 10 = 2,000 cells → 10,001,000 total, over the cap. + const row: string[] = new Array(10).fill('x') + const appends: string[][] = new Array(200).fill(row) + + const opsByStream = new Map([ + ['Sheet1', { sheetId, updates: [], appends, existingRowCount: 0 }], + ]) + + await expect(applyBatch(sheets, spreadsheetId, opsByStream)).rejects.toThrow( + /would exceed the .*-cell-per-spreadsheet limit/ + ) + }) + + it('allows a flush that stays at or below 10 million cells', async () => { + const { sheets, spreadsheetId, sheetId } = await setupSpreadsheet() + + // Grid currently holds 500,000 × 10 = 5,000,000 cells. + overrideGridProperties(sheets, 500_000, 10) + + // Append 100,000 × 10 = 1,000,000 cells → 6M total, well under the cap. + const row: string[] = new Array(10).fill('y') + const appends: string[][] = new Array(100_000).fill(row) + + const opsByStream = new Map([ + ['Sheet1', { sheetId, updates: [], appends, existingRowCount: 0 }], + ]) + + await expect(applyBatch(sheets, spreadsheetId, opsByStream)).resolves.toBeDefined() + }) + + it('ignores the update-only path (updates overwrite allocated cells, no growth)', async () => { + const { sheets, spreadsheetId, sheetId } = await setupSpreadsheet() + + // Even with the grid at the cap, updates overwrite existing cells and shouldn't trip the append check. + overrideGridProperties(sheets, 1_000_000, 10) + + const updates = [{ rowNumber: 2, values: ['a', 'b', 'c'] }] + const opsByStream = new Map([ + ['Sheet1', { sheetId, updates, appends: [], existingRowCount: 0 }], + ]) + + await expect(applyBatch(sheets, spreadsheetId, opsByStream)).resolves.toBeDefined() + }) + + it('propagates the limit error through dest.write() as connection_status failed', async () => { + const { sheets } = createMemorySheets() + overrideGridProperties(sheets, 1_000_000, 20) // 20M cells, well over cap + + const dest = createDestination(sheets) + const messages: DestinationInput[] = [record('big', { id: 'r1', name: 'A' })] + + const output = await collect(dest.write({ config: cfg(), catalog }, toAsyncIter(messages))) + + const failure = output.find( + (m) => m.type === 'connection_status' && m.connection_status.status === 'failed' + ) + expect(failure).toBeDefined() + expect((failure as { connection_status: { message: string } }).connection_status.message).toMatch( + /cell-per-spreadsheet limit/ + ) + }) + + it('exports MAX_CELLS_PER_SPREADSHEET as 10 million', () => { + expect(MAX_CELLS_PER_SPREADSHEET).toBe(10_000_000) + }) +}) diff --git a/packages/destination-google-sheets/src/index.ts b/packages/destination-google-sheets/src/index.ts index e1635613..7694eab0 100644 --- a/packages/destination-google-sheets/src/index.ts +++ b/packages/destination-google-sheets/src/index.ts @@ -1,4 +1,4 @@ -import type { Destination } from '@stripe/sync-protocol' +import type { Destination, SourceStateMessage } from '@stripe/sync-protocol' import { createSourceMessageFactory } from '@stripe/sync-protocol' const msg = createSourceMessageFactory() @@ -119,13 +119,50 @@ function extendHeaders( // MARK: - Destination +/** Runs flushAll, yielding heartbeat logs while it runs; returns any flush error via `yield*`. */ +async function* uploadToSheet( + flushAll: () => Promise, + heartbeatMs: number +): AsyncGenerator<{ type: 'log'; log: { level: 'debug'; message: string } }, unknown, unknown> { + const flushState = { done: false, error: undefined as unknown } + const flushP = flushAll().then( + () => { + flushState.done = true + }, + (err) => { + flushState.error = err + flushState.done = true + } + ) + const flushStartedAt = Date.now() + while (!flushState.done) { + await Promise.race([flushP, new Promise((r) => setTimeout(r, heartbeatMs))]) + if (flushState.done) break + const elapsedSec = Math.round((Date.now() - flushStartedAt) / 1000) + log.info(`flushing to Sheets (in progress, ${elapsedSec}s)`) + yield { + type: 'log' as const, + log: { + level: 'debug' as const, + message: `flushing to Sheets (in progress, ${elapsedSec}s)`, + }, + } + } + return flushState.error +} + /** * Create a Google Sheets destination. * * Pass a `sheetsClient` to inject a fake for testing; omit it for production * (each method creates a real client from config credentials). + * `options.flushHeartbeatMs` overrides the in-progress heartbeat cadence (default 20s). */ -export function createDestination(sheetsClient?: sheets_v4.Sheets): Destination { +export function createDestination( + sheetsClient?: sheets_v4.Sheets, + options?: { flushHeartbeatMs?: number } +): Destination { + const flushHeartbeatMs = options?.flushHeartbeatMs ?? 20_000 const destination = { async *spec() { yield { type: 'spec' as const, spec: defaultSpec } @@ -138,9 +175,6 @@ export function createDestination(sheetsClient?: sheets_v4.Sheets): Destination< ? await createSpreadsheet(sheets, config.spreadsheet_title) : config.spreadsheet_id! - // Fetch metadata once, reuse for all setup steps - const meta = await getSpreadsheetMeta(sheets, spreadsheetId) - // Ensure every catalog stream has a tab and headers (single batchUpdate + single values.batchUpdate). // Data tabs must exist before the Overview is written: its rows contain // `=COUNTUNIQUE(''!A2:A)` formulas that Sheets parses with @@ -150,19 +184,16 @@ export function createDestination(sheetsClient?: sheets_v4.Sheets): Destination< const properties = stream.json_schema?.['properties'] as Record | undefined return { streamName: stream.name, headers: properties ? Object.keys(properties) : [] } }) - const sheetIdMap = await ensureSheets(sheets, spreadsheetId, meta, streamHeaders) + // Refetch meta before each step that reads titles; reusing one snapshot renamed Sheet1 twice. + const metaBeforeEnsure = await getSpreadsheetMeta(sheets, spreadsheetId) + const sheetIdMap = await ensureSheets(sheets, spreadsheetId, metaBeforeEnsure, streamHeaders) const sheetIds = catalog.streams.map((s) => sheetIdMap.get(s.stream.name)!) - // Re-fetch metadata after ensureSheets: it may have renamed Sheet1 to the first - // stream tab, making the original `meta` stale. ensureIntroSheet uses meta to - // check whether Sheet1 exists (to rename vs. insert) — if it sees the stale - // Sheet1 entry it will rename the first stream's tab to "Overview". - const freshMeta = await getSpreadsheetMeta(sheets, spreadsheetId) - const streamNames = catalog.streams.map((s) => s.stream.name) - await ensureIntroSheet(sheets, spreadsheetId, freshMeta, streamNames) + const metaAfterEnsure = await getSpreadsheetMeta(sheets, spreadsheetId) + await ensureIntroSheet(sheets, spreadsheetId, metaAfterEnsure, streamNames) - await protectSheets(sheets, spreadsheetId, freshMeta, sheetIds) + await protectSheets(sheets, spreadsheetId, metaAfterEnsure, sheetIds) if (isNew) { yield msg.control({ @@ -455,18 +486,19 @@ export function createDestination(sheetsClient?: sheets_v4.Sheets): Destination< updateBuffers.set(streamName, []) } - log.debug({ durationMs: Date.now() - flushStart }, 'flushAll done') + log.info({ durationMs: Date.now() - flushStart }, 'flushAll done') } const writeStart = Date.now() let recordCount = 0 let stateCount = 0 - let writeError: unknown = undefined - let cancelled = true + // Buffer source_state until after flushAll so checkpoints only advance once records are durable. + const bufferedStates: SourceStateMessage[] = [] + let flushSucceeded = false - // try/finally ensures flushAll runs even when the consumer closes us - // early via iterator.return() (e.g. takeLimits eof). Otherwise the - // buffered batch would be silently dropped. + // Flush runs only after $stdin completes normally. Early iterator.return() + // (hard time_limit / abort) drops the batch — state-after-flush must not + // advance a checkpoint past data we never wrote. try { for await (const msg of $stdin) { if (msg.type === 'record') { @@ -505,48 +537,43 @@ export function createDestination(sheetsClient?: sheets_v4.Sheets): Destination< appendBuffers.get(stream)!.push({ row }) } yield msg + } else if (msg.type === 'source_state') { + stateCount++ + bufferedStates.push(msg) } else { - if (msg.type === 'source_state') stateCount++ - // Pass through non-record messages immediately; data is flushed at end. yield msg } } - - cancelled = false log.debug( { durationMs: Date.now() - writeStart, recordCount, stateCount }, - '$stdin drained' + 'Source drained in google sheet write, starting upload step...' ) + const flushError = yield* uploadToSheet(flushAll, flushHeartbeatMs) + if (flushError) { + log.error({ err: flushError }, 'flushAll failed during teardown') + const errMsg = flushError instanceof Error ? flushError.message : String(flushError) + yield { + type: 'connection_status' as const, + connection_status: { status: 'failed' as const, message: errMsg }, + } + } else { + flushSucceeded = true + for (const state of bufferedStates) { + yield state + } + } } catch (err: unknown) { - cancelled = false - writeError = err log.error( { err, durationMs: Date.now() - writeStart, recordCount, stateCount }, 'write() error' ) - } finally { - if (cancelled) { - log.warn( - { durationMs: Date.now() - writeStart, recordCount, stateCount }, - 'write() cancelled by consumer; flushing buffered data anyway' - ) - } - try { - await flushAll() - } catch (flushErr) { - log.error({ err: flushErr }, 'flushAll failed during teardown') - if (!writeError) writeError = flushErr - } - } - - if (writeError) { - const errMsg = writeError instanceof Error ? writeError.message : String(writeError) - log.error(errMsg) yield { type: 'connection_status' as const, - connection_status: { status: 'failed' as const, message: errMsg }, + connection_status: { + status: 'failed' as const, + message: err instanceof Error ? err.message : String(err), + }, } - return } if (Object.keys(rowAssignments).length > 0) { @@ -558,13 +585,14 @@ export function createDestination(sheetsClient?: sheets_v4.Sheets): Destination< yield { type: 'log' as const, log: { level: 'debug' as const, message: metaMsg } } } - log.info(`Sheets destination: wrote to spreadsheet ${spreadsheetId}`) - yield { - type: 'log' as const, - log: { - level: 'info' as const, - message: `Sheets destination: wrote to spreadsheet ${spreadsheetId}`, - }, + if (flushSucceeded) { + yield { + type: 'log' as const, + log: { + level: 'info' as const, + message: `Sheets destination: wrote to spreadsheet ${spreadsheetId}`, + }, + } } }, } satisfies Destination diff --git a/packages/destination-google-sheets/src/spec.ts b/packages/destination-google-sheets/src/spec.ts index 535b0d39..210d8eae 100644 --- a/packages/destination-google-sheets/src/spec.ts +++ b/packages/destination-google-sheets/src/spec.ts @@ -29,4 +29,6 @@ export type Config = z.infer export default { config: z.toJSONSchema(configSchema), + // sheet flushAll can take tens of seconds on wide catalogs; give it half the budget. + soft_limit_fraction: 0.5, } satisfies ConnectorSpecification diff --git a/packages/destination-google-sheets/src/writer.ts b/packages/destination-google-sheets/src/writer.ts index c68d9517..a942c039 100644 --- a/packages/destination-google-sheets/src/writer.ts +++ b/packages/destination-google-sheets/src/writer.ts @@ -13,6 +13,9 @@ const BACKOFF_BASE_MS = 1000 const BACKOFF_MAX_MS = 32000 const MAX_RETRIES = 5 +// Per-spreadsheet hard cap (https://support.google.com/drive/answer/37603); enforce locally for a clear error. +export const MAX_CELLS_PER_SPREADSHEET = 10_000_000 + async function withRetry(fn: () => Promise, label?: string): Promise { let delay = BACKOFF_BASE_MS const overallStart = Date.now() @@ -49,7 +52,6 @@ async function withRetry(fn: () => Promise, label?: string): Promise { if (label) { log.warn( { - err, label, attempt: attempt + 1, maxRetries: MAX_RETRIES, @@ -714,7 +716,7 @@ export async function applyBatch( } const phase1Start = Date.now() await Promise.all(probes) - log.warn( + log.debug( { parallelCalls: probes.length, durationMs: Date.now() - phase1Start }, 'phase1 (reads) done' ) @@ -730,6 +732,7 @@ export async function applyBatch( // 2a) appendDimension — only for grids that don't already fit. const phase2aStart = Date.now() + const projectedGridBySheet = new Map() for (const [, ops] of opsByStream) { const maxUpdateRow = ops.updates.reduce((m, u) => Math.max(m, u.rowNumber), 0) const maxAppendRow = ops.appends.length > 0 ? ops.existingRowCount + ops.appends.length : 0 @@ -760,6 +763,13 @@ export async function applyBatch( }, }) } + // Track projected post-expansion grid so the cap check below sees column growth. + if (neededRows > current.rowCount || neededCols > current.columnCount) { + projectedGridBySheet.set(ops.sheetId, { + rowCount: neededRows > current.rowCount ? neededRows + EXPAND_ROW_BUFFER : current.rowCount, + columnCount: neededCols > current.columnCount ? neededCols : current.columnCount, + }) + } } const expansionCount = expansionRequests.length log.debug( @@ -838,7 +848,7 @@ export async function applyBatch( appendStartRows.set(streamName, { appendStartRow: startRow }) appendRowCount += ops.appends.length } - log.warn( + log.debug( { streams: appendStartRows.size, rows: appendRowCount, @@ -854,42 +864,41 @@ export async function applyBatch( const totalCells = updateCellCount + appendCellCount const totalBytesEstimate = updateBytesEstimate + appendBytesEstimate - // ── Phase 3a (grid expansion — runs first, only if needed) ───── - if (expansionRequests.length > 0) { - const expandStart = Date.now() - try { - const res = await withRetry( - () => - sheets.spreadsheets.batchUpdate({ - spreadsheetId, - requestBody: { requests: expansionRequests }, - }), - 'gridExpansion' - ) - log.warn( - { - status: res.status, - requests: expansionRequests.length, - durationMs: Date.now() - expandStart, - }, - 'gridExpansion OK' - ) - } catch (err) { - log.error( - { err, requests: expansionRequests.length, durationMs: Date.now() - expandStart }, - 'gridExpansion FAILED' + // Reject a batch that alone exceeds the per-spreadsheet cap — no grid state can save it. + if (totalCells > MAX_CELLS_PER_SPREADSHEET) { + throw new Error( + `Google Sheets destination: refusing to flush ${totalCells.toLocaleString()} cells in a single batch (exceeds the ${MAX_CELLS_PER_SPREADSHEET.toLocaleString()}-cell-per-spreadsheet limit)` + ) + } + + // Skip when gridInfo is empty (probe failed) and let the API respond. + if (gridInfo.size > 0) { + let currentGridCells = 0 + let projectedGridCells = 0 + for (const [sheetId, info] of gridInfo) { + currentGridCells += info.rowCount * info.columnCount + const p = projectedGridBySheet.get(sheetId) ?? info + projectedGridCells += p.rowCount * p.columnCount + } + // max() catches both near-cap append and column expansion growing all rows. + const worstCaseCells = Math.max(currentGridCells + appendCellCount, projectedGridCells) + if (worstCaseCells > MAX_CELLS_PER_SPREADSHEET) { + throw new Error( + `Google Sheets destination: ${worstCaseCells.toLocaleString()} cells would exceed the ${MAX_CELLS_PER_SPREADSHEET.toLocaleString()}-cell-per-spreadsheet limit (current grid: ${currentGridCells.toLocaleString()}, projected grid: ${projectedGridCells.toLocaleString()}, append payload: ${appendCellCount.toLocaleString()})` ) - throw err } } - // ── Phase 3b (single batchUpdate with all data writes) ────────── - if (dataRequests.length === 0) return appendStartRows + // ── Phase 3 (single batchUpdate: expansions first, then data writes) ── + // Requests within a batchUpdate are applied in order, so appendDimension + // runs before pasteData and the grid is guaranteed to fit. + const allRequests = [...expansionRequests, ...dataRequests] + if (allRequests.length === 0) return appendStartRows - log.warn( + log.debug( { streams: opsByStream.size, - totalRequests: dataRequests.length, + totalRequests: allRequests.length, expansions: expansionCount, updateRows: updateRowCount, appendRows: appendRowCount, @@ -905,14 +914,14 @@ export async function applyBatch( () => sheets.spreadsheets.batchUpdate({ spreadsheetId, - requestBody: { requests: dataRequests }, + requestBody: { requests: allRequests }, }), 'batchUpdate' ) log.debug( { status: res.status, - requests: dataRequests.length, + requests: allRequests.length, cells: totalCells, replies: res.data.replies?.length ?? 0, wallClockMs: Date.now() - httpStart, @@ -924,7 +933,7 @@ export async function applyBatch( log.error( { err, - totalRequests: dataRequests.length, + totalRequests: allRequests.length, expansions: expansionCount, updateRows: updateRowCount, appendRows: appendRowCount, diff --git a/packages/protocol/src/protocol.ts b/packages/protocol/src/protocol.ts index 1e6568a3..465184fe 100644 --- a/packages/protocol/src/protocol.ts +++ b/packages/protocol/src/protocol.ts @@ -171,6 +171,12 @@ export const ConnectorSpecification = z .record(z.string(), z.unknown()) .optional() .describe('JSON Schema for the read() input parameter (e.g. a webhook event).'), + soft_limit_fraction: z + .number() + .positive() + .max(1) + .optional() + .describe('Fraction of `time_limit` to use as default `soft_time_limit` (e.g. 0.5).'), }) .describe('JSON Schema describing the configuration a connector requires.') export type ConnectorSpecification = z.infer diff --git a/packages/source-stripe/src/index.ts b/packages/source-stripe/src/index.ts index 33f5aefc..c5ec1c56 100644 --- a/packages/source-stripe/src/index.ts +++ b/packages/source-stripe/src/index.ts @@ -261,7 +261,7 @@ export function createStripeSource( // Derive concurrency params from API key mode (overridable via config) const liveMode = config.api_key.startsWith('sk_live_') || config.api_key.startsWith('rk_live_') - const maxRequestsPerSecond = config.rate_limit ?? (liveMode ? 20 : 10) + const maxRequestsPerSecond = config.rate_limit ?? (liveMode ? 50 : 10) // 50% of rate limits by default const maxConcurrentStreams = Math.min(maxRequestsPerSecond, catalog.streams.length) const rateLimiter = externalRateLimiter ?? createInMemoryRateLimiter(maxRequestsPerSecond)