feat: add pipeline_sync_batch, takeThroughStates, sync_batch CLI improvements, and observability#349
Merged
Conversation
Adds structured info-level logs for every Stripe event received across all ingress paths (HTTP webhook, WebSocket, service endpoint) for observability. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Committed-By-Agent: claude
0277e7a to
be37ad0
Compare
state_count was never incremented — the old code only created a new stream entry when missing, but createInitialProgress pre-populates all streams so the branch never ran. Bug existed since the reducer was introduced in d6ee08e. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
Committed-By-Agent: codex Co-authored-by: codex <noreply@openai.com>
- Add pipeline_sync_batch to Engine interface, implementation, remote
engine, engine API (/pipeline_sync_batch), and service API
(/pipelines/{id}/sync_batch) with CLI subcommand. Returns EofPayload
as JSON instead of streaming NDJSON.
- Add --raw flag to pipelines sync CLI to dump raw NDJSON to stdout.
- Add total_record_count and total_state_count to ProgressPayload.derived.
- Fix states_per_second to include per-stream state counts.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
Committed-By-Agent: codex Co-authored-by: codex <noreply@openai.com>
Committed-By-Agent: codex Co-authored-by: codex <noreply@openai.com>
Committed-By-Agent: codex Co-authored-by: codex <noreply@openai.com>
| @@ -1,4 +1,6 @@ | |||
| import { z } from 'zod' | |||
| import { AsyncIterableX } from 'ix/asynciterable' | |||
| import { map as ixMap, tap as ixTap } from 'ix/asynciterable/operators' | |||
- --loop: repeat sync_batch until has_more is false - --raw: output JSONL to stdout instead of progress to stderr - --state-limit: stop after N source_state messages per request - --time-limit: stop after N seconds per request - Progress rendered via formatProgress after each iteration - JSONL output (one line per eof) for piping into jq Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
Extracts the stripe-rate-limited-reason header (global-rate, global-concurrency, endpoint-rate, etc.) from 429 responses and includes it in retry log messages. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
…tests and update OpenAPI - Add missing required `total_record_count` and `total_state_count` fields to `derived` objects in test fixtures (engine, service, e2e, state-reducer tests) - Regenerate engine and service OpenAPI specs (adds pipeline_sync_batch to table) - Format-only refactors (no logic changes) in protocol.ts, helpers.ts, reducer.ts, app.ts, src-websocket.ts Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Committed-By-Agent: claude
There was a problem hiding this comment.
Pull request overview
This PR adds a new non-streaming “batch sync” mode to the engine/service, plus supporting protocol helpers, CLI enhancements, and extra webhook/Stripe retry observability to make syncs easier to operate and resume in request/response environments.
Changes:
- Add
pipeline_sync_batch(engine) and/pipelines/{id}/sync_batch(service) endpoints that return a single JSONEofPayloadwithhas_morefor resumable batching. - Introduce
takeThroughStates(stateLimit)IxJS operator and tests to bound each batch bysource_statecheckpoints. - Extend progress tracking (
total_record_count,total_state_count) and add additional logging for Stripe/webhook ingestion and 429 retry context.
Reviewed changes
Copilot reviewed 32 out of 37 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| scripts/source-discover.sh | Helper script to call source_discover and pretty-print NDJSON output. |
| pnpm-lock.yaml | Adds ix dependency entries. |
| packages/source-stripe/src/src-websocket.ts | Logs webhook event IDs/types received over Stripe CLI websocket. |
| packages/source-stripe/src/src-webhook.ts | Logs verified webhook signature event IDs/types. |
| packages/source-stripe/src/src-list-api.test.ts | String literal update in test data. |
| packages/source-stripe/src/retry.ts | Adds logging of Stripe rate limit reason header on retries. |
| packages/source-stripe/src/process-event.ts | Logs webhook event IDs/types when processing events. |
| packages/protocol/src/protocol.ts | Adds derived progress totals (total_record_count, total_state_count). |
| packages/protocol/src/iter-utils/takeThroughStates.ts | New operator to stop after N source_state messages (inclusive). |
| packages/protocol/src/iter-utils/takeThroughStates.test.ts | Tests for takeThroughStates (currently coupled to engine test helpers). |
| packages/protocol/src/index.ts | Exports takeThroughStates. |
| packages/protocol/src/helpers.ts | Updates emptySyncState() derived defaults with new totals. |
| packages/protocol/package.json | Adds ix dependency. |
| packages/logger/src/format/progress.test.tsx | Updates progress formatting tests for new derived fields. |
| e2e/test-server-sync.test.ts | Updates E2E test fixture progress shape. |
| docs/todos.md | Adds new TODOs (includes a typo). |
| apps/service/src/cli.ts | Adds sync_batch CLI command and --raw for sync; batch looping/progress output. |
| apps/service/src/cli.test.ts | Updates CLI tests for new derived fields. |
| apps/service/src/api/app.ts | Adds /pipelines/{id}/sync_batch endpoint and webhook ingestion logging. |
| apps/service/src/api/app.test.ts | Updates API tests for new derived fields. |
| apps/service/src/tests/workflow.test.ts | Updates workflow tests for new derived fields. |
| apps/service/src/generated/openapi.json | Regenerated service OpenAPI (adds sync_batch + EofPayload). |
| apps/service/src/generated/openapi.d.ts | Regenerated service OpenAPI types. |
| apps/engine/src/lib/state-reducer.ts | Adjusts initialize handling for continuation run_id behavior. |
| apps/engine/src/lib/state-reducer.test.ts | Updates tests + adds continuation preservation test. |
| apps/engine/src/lib/remote-engine.ts | Adds pipeline_sync_batch() client call. |
| apps/engine/src/lib/progress/reducer.ts | Computes total record/state counts and adjusts state counting behavior. |
| apps/engine/src/lib/progress/reducer.test.ts | Updates tests for new state counting semantics and derived fields. |
| apps/engine/src/lib/progress/format.test.tsx | Updates formatting tests for new derived fields. |
| apps/engine/src/lib/index.ts | Exports BatchSyncOptions. |
| apps/engine/src/lib/engine.ts | Implements pipeline_sync_batch() and adds IxJS dependency usage. |
| apps/engine/src/lib/engine.test.ts | Adds tests for pipeline_sync_batch() plus derived field fixture updates. |
| apps/engine/src/api/app.ts | Adds POST /pipeline_sync_batch engine endpoint (JSON response). |
| apps/engine/src/tests/openapi.test.ts | Asserts batch sync uses narrowed request schema. |
| apps/engine/src/generated/openapi.json | Regenerated engine OpenAPI (adds pipeline_sync_batch + new derived fields). |
| apps/engine/src/generated/openapi.d.ts | Regenerated engine OpenAPI types. |
| apps/engine/package.json | Adds ix dependency. |
Files not reviewed (3)
- apps/engine/src/generated/openapi.json: Language not supported
- apps/service/src/generated/openapi.json: Language not supported
- pnpm-lock.yaml: Language not supported
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @@ -1,4 +1,6 @@ | |||
| import { z } from 'zod' | |||
| import { AsyncIterableX } from 'ix/asynciterable' | |||
| import { map as ixMap, tap as ixTap } from 'ix/asynciterable/operators' | |||
Comment on lines
+30
to
+32
| }" | while IFS= read -r line; do | ||
| echo "${line}" | jq . | ||
| done |
|
|
||
| - Fix: webhook pipeline_sync should not pre-populate all streams in progress, maybe separate pathway for webhooks instead of a single method | ||
| - Explicit success / failure for various methods like setup / teardown. Switch to http 200/400/500 standard for error code | ||
| - More robust serialization & deserliazation support in general. lots of bugs caused by those basic pieces missing still. |
Comment on lines
109
to
+111
| ): AsyncGenerator<Message> { | ||
| log.info({ eventId: event.id, eventType: event.type }, 'webhook event received') | ||
|
|
Comment on lines
+732
to
+735
| 'time-limit': { | ||
| type: 'string', | ||
| description: 'Stop after N seconds per request', | ||
| }, |
Comment on lines
111
to
116
| elapsed_ms: elapsedMs, | ||
| global_state_count: | ||
| msg.source_state.state_type === 'global' | ||
| ? progress.global_state_count + 1 | ||
| : progress.global_state_count, | ||
| } |
Comment on lines
+8
to
+15
| export function takeThroughStates( | ||
| stateLimit: number, | ||
| options?: TakeThroughStatesOptions | ||
| ): (messages: AsyncIterable<Message>) => AsyncIterable<Message> { | ||
| let stateCount = 0 | ||
|
|
||
| return (messages: AsyncIterable<Message>) => | ||
| takeWhile((message: Message) => { |
Comment on lines
+4
to
+7
| import { destinationTest } from '../../../../apps/engine/src/lib/destination-test.js' | ||
| import { sourceTest } from '../../../../apps/engine/src/lib/source-test.js' | ||
| import { takeThroughStates } from './takeThroughStates.js' | ||
|
|
| } | ||
|
|
||
| const raw = args.raw === true | ||
| const maxIterations = parseInt(args.loop ?? '1') |
Comment on lines
+176
to
+179
| state: SyncState.optional().meta({ | ||
| description: | ||
| 'SyncState ({ source, destination, sync_run }). Falls back to empty state if invalid.', | ||
| }), |
Yostra
approved these changes
Apr 28, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
The existing
pipeline_syncendpoint streams NDJSON, which works well in theory but is difficult to use in practice:pipeline_sync_batchtakes a simpler approach: run up to N state checkpoints, then stop and return the fullEofPayloadas a single JSON response. The caller gets a clean result, persists it, and calls again. Easier to observe, easier to retry, and fits naturally into request/response infrastructure.Summary
Core:
pipeline_sync_batchendpoint (engine + service)POST /pipeline_sync_batchengine endpoint: runs the full read → write pipeline and returnsEofPayloadas a single JSON response (no streaming)POST /pipelines/{id}/sync_batchservice endpoint: stateful wrapper that loads pipeline from store, runs batch sync, and persists the endingsync_statestate,run_id, andstate_limit; defaults to a batch limit of 1000 source_state messageshas_more: truein the response indicates the sync was cut short by the state limittakeThroughStateshelper (packages/protocol)takeThroughStates(stateLimit, opts)IxJS operator: yields messages through the Nthsource_state, then stops (inclusive)onLimitReachedcallback fires when the limit is hit so callers can sethas_morepipeline_sync_batchto bound each requestProgress:
total_record_count/total_state_countin derivedtotal_record_countandtotal_state_counttoProgressPayload.derivedprogressReducernow computes these from per-stream counts on every tickCLI: new
sync_batchcommand (apps/service)sync_batch <id>command that callsPOST /pipelines/{id}/sync_batch--loop <N>: repeat up to N times (0 = loop untilhas_moreis false)--raw: output raw JSON to stdout instead of rendered progress--state-limit: passstate_limitper request--time-limit: passtime_limitper requestObservability
source-stripe: logStripe-Rate-Limited-Reasonheader on 429 retries (rate_limit_reasonfield + message suffix)source-stripe: log event IDs at all webhook ingress points (webhook HTTP, WebSocket, process-event)service: logeventId/eventTypeon webhook ingestionOther
scripts/source-discover.shhelper scriptHow to test
Runs 2 iterations of up to 10 state checkpoints each against the pipeline, resetting state on the first iteration. Check progress output between iterations and confirm
has_more/statusin the final JSON.