Skip to content

feat(service): Google Sheets workflow — 3 parallel loops, generic read activity#254

Closed
tonyxiao wants to merge 481 commits intomainfrom
sheet-workflow
Closed

feat(service): Google Sheets workflow — 3 parallel loops, generic read activity#254
tonyxiao wants to merge 481 commits intomainfrom
sheet-workflow

Conversation

@tonyxiao
Copy link
Copy Markdown
Collaborator

@tonyxiao tonyxiao commented Apr 6, 2026

Summary

  • Refactor googleSheetPipelineWorkflow to use 3 parallel loops (liveEventLoop, reconcileLoop, writeLoop) matching the pipelineWorkflow pattern
  • Single GoogleSheetWorkflowState object replaces scattered booleans (setupDone, readComplete, pendingWrites)
  • derivePipelineStatus() + setState() pattern for consistent status reporting
  • EOF-based completion detection replaces deepEqual
  • Rename readGoogleSheetsIntoQueuereadIntoQueue (now generic — reads from source into Kafka)
  • Move row key/number augmentation from read to write activity (only write is Sheets-specific)
  • Extract immutable mergeStateMessage() utility for consistent state merge across activities
  • Rename RowIndexRowIndexMap, rowAssignmentsrowIndexMap for clarity
  • Rename connector type google-sheetsgoogle_sheets (snake_case on the wire)

Test plan

  • pnpm build passes
  • pnpm test — all 23 service tests pass
  • Workflow tests cover: backfill → ready, live events, pause/resume, delete/teardown, catalog discovery

🤖 Generated with Claude Code

tonyxiao and others added 30 commits March 23, 2026 22:05
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
GIT_COMMIT and BUILD_DATE are available as env vars at runtime.
OCI labels from docker/metadata-action are also included.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
hasBody() fix: revert body !== null fallback (unsafe — Node.js 24 HTTP
server gives non-null body even for bodyless POSTs). Instead, require
tests to pass explicit Content-Length headers when sending bodies. Added
bodyHeaders() helper to app.test.ts that sets Content-Type +
Content-Length for all three body-sending tests (/read, /write, /run).

docker.test.ts: beforeAll() inherited the default 10s hookTimeout but
docker build takes up to 2.5 min. Add explicit 180_000ms timeout to
match the describe() block timeout.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
The Dockerfile was failing with 'pnpm install --prod --frozen-lockfile'
because pnpm can't install workspace:* packages in isolation. Fix:

- tsup.config.ts for apps/sync-engine: add noExternal: [/^@stripe\//]
  so tsup bundles all workspace packages into dist at build time
- Dockerfile: replace pnpm prod-deps stage with a lean 'deps' stage
  that strips @stripe/* deps from package.json and uses npm install,
  since those are now bundled
- apps/sync-engine/package.json: simplify build/dev scripts to just
  'tsup' (now reads from tsup.config.ts); add @hono/swagger-ui,
  @hono/zod-openapi, googleapis
- ci.yml: fix docker-test.sh path (scripts/ → tests/)
- pnpm-lock.yaml: update for new deps

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
pnpm --filter @stripe/sync-engine... builds sync-engine AND all of its
workspace dependencies in dependency order. tsup's noExternal needs the
compiled dist of workspace packages to exist before it can bundle them.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
The filtered build with ... suffix failed because tsup couldn't
resolve cross-package imports during bundling.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
Replace the node -e hack (stripping @stripe/* deps + npm install) with
pnpm deploy --prod --legacy, which handles workspace→real version
conversion and uses pnpm-lock.yaml for deterministic dep resolution.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
- docker stop: use -t 0 flag so Docker sends SIGKILL immediately
  instead of waiting 10 s for graceful shutdown, which raced the
  execFileSync 10 s timeout and caused ETIMEDOUT on cleanup
- pnpm deploy: replace --legacy flag with inject-workspace-packages=true
  (.npmrc + lockfile) for proper workspace package injection in deploy
  without requiring the legacy linker

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
docker.test.ts builds a Docker image in beforeAll (~90 s), blocking
the worker thread. Vitest's default rpcTimeout is 5 s, so the main
process declares the worker unresponsive and throws
"[vitest-worker]: Timeout calling onTaskUpdate" even though all tests
pass. Raising rpcTimeout/testTimeout/hookTimeout to 300 s / 180 s
gives the Docker build enough headroom.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
docker.test.ts runs execSync(docker build) in beforeAll, blocking
the worker event loop for ~90 s. With the default pool='threads'
(worker_threads), the blocked event loop prevents Vitest's internal
RPC heartbeat, causing "Timeout calling onTaskUpdate".

Using pool='forks' (child_process.fork + IPC) avoids this because IPC
communication doesn't rely on the worker's event loop being responsive.
Also remove the ineffective rpcTimeout option.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
execSync blocks the worker event loop for ~90 s during the Docker
image build, preventing Vitest's internal RPC heartbeat and causing
"Timeout calling onTaskUpdate". Replace with async spawn + Promise
so the event loop stays responsive throughout the build.

Also simplify vitest.config.ts to testTimeout/hookTimeout only.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
Four-level discovery model: in-process import, @stripe/* npm scope,
explicit config mapping, and PATH auto-discovery. Includes trust
boundaries, configuration modes (local vs all), and resolution order.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
…covery

Replace the undifferentiated --connector-discovery=local/all pattern with
three orthogonal flags: --connectors-from-command-map, --connectors-from-path,
and --connectors-from-npm. Each controls exactly one resolution strategy.

Key changes:
- loader.ts: new ConnectorsFrom interface; createConnectorResolver now takes
  (registered, connectorsFrom?) as separate args; resolution order is
  registered → commandMap → path → npm
- subprocess.ts: spawnSource/spawnDestination now accept a command string that
  may include base args (e.g. "npx @stripe/source-stripe"), split on whitespace
  at spawn time — unifying all three strategies as command strings
- cli.ts: add three flags to addOptions() and serve command
- sync-command/check-command: wire flags, npm defaults to on for CLI
- serve-command: wire flags, npm defaults to off for serve
- docs: rewrite connector-discovery.md, update connector-loading.md

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
…est connectors

- source-exec.ts: sourceExec(cmd) — former spawnSource, now correctly pipes
  $stdin to subprocess stdin so subprocess sources receive live input events
- destination-exec.ts: destinationExec(cmd) — former spawnDestination
- subprocess.ts: now purely internal helpers (spawnAndCollect, spawnAndStream,
  spawnWithStdin, splitCmd) exported for use by the two exec files
- source-test.ts / destination-test.ts: rename testSource→sourceTest,
  testDestination→destinationTest, testSourceSpec→sourceTestSpec, etc.
  following the <role><Variant> naming convention
- All callers updated across the monorepo

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
…ishing

The @stripe scope is owned by Stripe on npm — rename to @tx-stripe so we
can test publishing to the public registry.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
…ublishable

- Add Verdaccio service to compose.yml with custom config allowing
  unauthenticated publish for @stripe/* packages
- Replace "private": true with "version": "0.1.0" in store-postgres
  and stateful-sync so they can be published

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
…ationFromExec

Factory functions that produce a connector from an exec command should follow
the create*From* convention already used in the codebase (e.g. createEngineFromParams).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
New CI job publishes all workspace packages to a local Verdaccio registry
and verifies `npx @stripe/sync-engine` works from a clean directory:
- --help exits 0
- --version returns output
- check command loads connectors (fails on credentials, not loading)

The publish script rewrites workspace:* to real versions and uses the
Verdaccio REST API directly to avoid npm interactive auth prompts.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
- Add profiles: [verdaccio] so `docker compose up -d --wait` in other
  jobs doesn't start/wait on Verdaccio
- Fix healthcheck to use 0.0.0.0 (container binds there, not localhost)
- Replace GH Actions services: block with docker compose --profile
  since service containers can't mount volumes from the checkout

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
engine.write() and engine.run() now yield all DestinationOutput messages
(state, log, error) instead of only StateMessage. Previously, collect()
in pipeline.ts swallowed logs and errors, routing them to callbacks that
the API passed as {} — silently dropping all destination feedback.

Also fixes the API passing empty callbacks {} instead of undefined,
which caused source-side messages to be silently dropped too.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
Make client_id and client_secret optional in the spec with fallback to
GOOGLE_CLIENT_ID / GOOGLE_CLIENT_SECRET env vars. Export an envVars map
for discoverability. Config values take priority over env vars.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
Exports now resolve to TypeScript source (./src/) so tsx and bun
can import workspace packages without building first. The
publishConfig.exports overrides to dist/ during pnpm publish so
npm consumers still get built JS.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
Local dev: bin entries point to src/*.ts with #!/usr/bin/env tsx so
pnpm exec / npx resolves to TypeScript directly — no build needed.

Published: publishConfig.bin overrides to dist/*.js. Build scripts
sed-replace tsx→node shebangs in dist so npm consumers use node.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
Deploys stripe/sync-engine:v2 from Docker Hub as a second ECS service
alongside the existing mock server. Runs on Fargate ARM64, connected to
the same RDS instance via DATABASE_URL.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
tonyxiao and others added 26 commits April 3, 2026 09:04
… todos

- Split docs/plans/ into active/ and completed/ subfolders
- Add docs/design/ for architectural docs (ideas, design proposals)
- Rename all legacy plan-NNN/idea-NNN files with YYYY-MM-DD prefix
- Add docs/changelog.md (shipped features) and docs/todos.md (replaces backlog.md)
- Delete docs/plans/backlog.md (merged into todos.md)
- Add e2e/docs.test.ts enforcing YYYY-MM-DD-*.md naming across all three dirs

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
* feat(protocol): Zod .describe() for OpenAPI field descriptions + executed-plan convention

- Convert all JSDoc comments on Zod fields in protocol.ts to .describe()
  calls so descriptions appear in the generated OpenAPI spec. Covers
  Stream, ConfiguredStream, ConfiguredCatalog, ConnectorSpecification,
  CheckResult, and all message types (RecordMessage, StateMessage,
  CatalogMessage, LogMessage, ErrorMessage, StreamStatusMessage).
- Add .describe() to inline schemas in service app.ts / createSchemas.ts
  (StreamConfig, LogEntry, Pipeline, PipelineWithStatusSchema.status).
- Regenerate OpenAPI specs: engine 73 → 123 description fields,
  service 290 → 348 description fields.
- Add executed-plan convention to AGENTS.md: non-trivial PRs should
  have a plan artifact in docs/plans/YYYY-MM-DD-slug.md.
- Archive engine interface refactor plan (PR #233) to docs/plans/.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* docs: move Zod .describe() rule to principles.md, keep AGENTS.md light

AGENTS.md should be an index/map — detailed rules belong in
docs/architecture/principles.md.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* docs: AGENTS.md is an index — add rule 0 to keep it light

Before adding anything to AGENTS.md, check for a better home in
docs/architecture/ first.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* chore: regenerate OpenAPI specs on rebased v2

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
…ts inside engine (#233)

* refactor(engine): per-call PipelineConfig on Engine interface

Move PipelineConfig from construction time to per-method-call on the
Engine interface. createEngine(resolver) and createRemoteEngine(url) now
take only their core dependency; pipeline config + state flow in on each
call.

- Engine interface: setup/teardown/check/discover/read/write/sync all
  take PipelineConfig as first arg; ReadOpts/SyncOpts carry state and
  stateLimit per call
- createEngine(resolver): resolver only; config validation + catalog
  discovery happen per call; adds discover() method
- createRemoteEngine(url): URL only; state/stateLimit in per-call opts
- app.ts: one Engine created at startup, reused across all routes
- activities.ts: one Engine created at factory time, reused across all
  activity calls
- Fix: variable capture bug in read() caused self-referential generator
  deadlock (let output was captured by its own reassigned IIFE closure)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(engine): Zod-inferred SyncOpts/SetupResult, drop ReadOpts

- Convert SyncOpts and SetupResult from manual interfaces to Zod schemas
  with z.infer<> types (consistent with PipelineConfig in protocol)
- Remove ReadOpts (was identical to SyncOpts); all callers now use SyncOpts
- readIntoQueue and stateHeaders/queryParams/post in remote-engine use
  SyncOpts & { input? } instead of repeating field definitions inline

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(protocol): remove deprecated SyncEngineParams alias

SyncEngineParams was a deprecated alias for PipelineConfig. Update the
one remaining callsite (engine.test.ts) to use PipelineConfig directly.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(engine): add listConnectors() to Engine interface

Move connector listing from app.ts (calling resolver directly) onto the
Engine interface so the HTTP layer delegates uniformly through the engine
like every other route. Remote engine implements it via GET /connectors.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(engine): ConnectorInfo as Zod schema

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(engine): prefix Engine interface methods by resource

pipelineRead/Write/Sync/Setup/Teardown/Check, sourceDiscover, connectorList
on the Engine interface + matching operationId snake_case in app.ts.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(engine): snake_case interface + /meta/* endpoints

- Rename all Engine interface methods to snake_case:
  pipelineSetup→pipeline_setup, pipelineTeardown→pipeline_teardown,
  pipelineCheck→pipeline_check, sourceDiscover→source_discover,
  pipelineRead→pipeline_read, pipelineWrite→pipeline_write,
  pipelineSync→pipeline_sync

- Replace connectorList() with four /meta/* endpoints:
  GET /meta/sources          → meta_sources()
  GET /meta/sources/:type    → meta_source(type)
  GET /meta/destinations     → meta_destinations()
  GET /meta/destinations/:type → meta_destination(type)

- Update all callsites: app.ts, remote-engine.ts, activities.ts,
  all tests, dashboard api.ts + PipelineCreate.tsx

- Regenerate openapi.json and openapi.d.ts

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(engine): move takeLimits inside engine, remove from HTTP layer

- SyncParams.timeLimitMs → SyncParams.timeLimit (seconds, matching SyncOpts)
- Remove takeLimits() from /read and /sync HTTP handlers — pass stateLimit/timeLimit via SyncOpts instead
- engine.pipeline_read() and pipeline_sync() always apply takeLimits, emitting eof:complete on natural completion
- Regenerate OpenAPI spec with /meta/* endpoints
- Update engine.test.ts to expect eof:complete from direct engine calls

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* fix(ci): remove prettier from openapi generation to fix drift check

The --check mode generates JSON to a tmpdir path (outside __generated__),
so prettier formats it. But the committed files are in __generated__/ which
is excluded from prettier. This caused a permanent format mismatch.

Both JSON files were already in their canonical format (engine: compact from
curl, service: JSON.stringify(spec, null, 2)), so the prettier step was
redundant and caused the drift check to always fail.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(engine): rename meta_sources→meta_sources_list, return {data:[]}

- Rename Engine interface methods: meta_sources → meta_sources_list,
  meta_destinations → meta_destinations_list (symmetry with list semantics)
- Add ConnectorListItem schema (ConnectorInfo + type field) for list responses
- Return {data: ConnectorListItem[]} from list endpoints (array wrapped in
  standard envelope, nesting lives inside the Engine, not the HTTP layer)
- Update operationIds: meta_sources_list, meta_destinations_list
- Fix e2e/connector-loading.test.sh: check → pipeline-check command
- Regenerate openapi.json and openapi.d.ts

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
…rkflow (#237)

* feat(destination): Google Sheets destination with setup/teardown

- Full setup/teardown lifecycle for Google Sheets destination
- Sheet writer with batched upserts and header management
- Scripts for manual testing via engine HTTP server
- Source-stripe: expose setup() config updates for destination wiring

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* Add a dedicated Sheets workflow for row-index upserts (#228)

* Add dedicated Sheets workflow for row-index upserts

Committed-By-Agent: codex
Co-authored-by: codex <noreply@openai.com>

* Document Google Sheets row-index workflow rationale

Committed-By-Agent: codex
Co-authored-by: codex <noreply@openai.com>

* Refactor service workflows into dedicated modules

Committed-By-Agent: codex
Co-authored-by: codex <noreply@openai.com>

* Fix Sheets worker Kafka wiring and header growth

Committed-By-Agent: codex
Co-authored-by: codex <noreply@openai.com>

* Keep Sheets row metadata out of engine pipeline

Committed-By-Agent: codex
Co-authored-by: codex <noreply@openai.com>

* Split Temporal activities into per-file modules

Committed-By-Agent: codex
Co-authored-by: codex <noreply@openai.com>

* Check only spreadsheet id for Sheets pipeline updates

Committed-By-Agent: codex
Co-authored-by: codex <noreply@openai.com>

* Merge Temporal activity helpers into _shared

Committed-By-Agent: codex
Co-authored-by: codex <noreply@openai.com>

* Remove workflow barrel and rename shared to _shared

Committed-By-Agent: codex
Co-authored-by: codex <noreply@openai.com>

* Remove unused Google Sheets helper scripts

Committed-By-Agent: codex
Co-authored-by: codex <noreply@openai.com>

* Use protocol PipelineConfig in workflow types

Committed-By-Agent: codex
Co-authored-by: codex <noreply@openai.com>

* Restore Temporal workflow entrypoint

Committed-By-Agent: codex
Co-authored-by: codex <noreply@openai.com>

* Use activities folder entrypoint

Committed-By-Agent: codex
Co-authored-by: codex <noreply@openai.com>

* Serialize service Temporal tests

Committed-By-Agent: codex
Co-authored-by: codex <noreply@openai.com>

* Revert "Serialize service Temporal tests"

This reverts commit c8f48b5.

---------

Co-authored-by: codex <noreply@openai.com>

* fix(service): update activity files to use snake_case Engine API

After merging v2, the activity files used old camelCase API methods
(engine.setup, engine.read, engine.sync, engine.write, engine.teardown)
and the old 3-arg createRemoteEngine signature. Update to match v2's
refactored Engine interface:
- createRemoteEngine(url) → single arg
- engine.pipeline_setup/teardown/read/write/sync
- SyncOpts type for opts parameters

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-authored-by: codex <noreply@openai.com>
…imit through workflow (#239)

* refactor(service): use eof message to detect read completion instead of state comparison

Replace deepEqual(state, before) with eof?.reason === 'complete' in both
sync and read-write workflow modes. The eof terminal message is the
authoritative signal from the engine; state equality is unreliable when
the last page returns records but no new state keys.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* feat(service): thread timeLimit through workflow to backfill activity calls

Add timeLimit to pipelineWorkflow opts, pass it to syncImmediate and
readIntoQueue on backfill pages, and thread it through continueAsNew so
it survives history rollovers. Event-driven paths (input batch) are not
time-limited since they are already bounded by batch size.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* docs: explain half-duplex HTTP streaming constraint in remote engine (DDR-007)

Add a JSDoc note on createRemoteEngine and a decision record explaining
why duplex:'half' is used, why full-duplex isn't needed today, and where
to look if requirements change.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* docs(ddr-007): remove incorrect claim that full-duplex contradicts the pipeline model

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* fix(service): add eof to drainMessages and activity return types after refactor

The activities refactor split activities.ts into individual files but
dropped eof tracking from drainMessages and the readIntoQueue/syncImmediate
return types. Re-add eof to all three so the workflow can use
eof.reason === 'complete' for read completion detection.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
Adds a `changes` job using dorny/paths-filter to detect whether any
non-docs files changed. The `build`, `build_amd64`, `publish_npm`, and
`publish_dockerhub` jobs are gated on this output; `build_manifest`,
`e2e_docker`, `e2e_service`, and `publish_dockerhub` cascade via `needs`.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
State machine for pipeline lifecycle (starting → running → paused/error →
deleting → deleted), internal sub-states (backfilling, streaming,
reconciling), auto-pause on permanent errors, and webhook events.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
* feat(protocol): envelope messages + everything-is-a-stream

Airbyte-aligned protocol rewrite:
- Envelope messages: { type: 'record', record: { stream, data, emitted_at } }
- TraceMessage replaces ErrorMessage + StreamStatusMessage
- New message types: spec, connection_status, control, trace
- All Source/Destination methods return AsyncIterable (everything is a stream)
- setup() emits ControlMessage instead of returning Partial<Config>
- CLI subcommands uniformly stream NDJSON

Updates protocol package + all 3 connectors. Engine layer not yet updated.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* fixup! feat(protocol): envelope messages + everything-is-a-stream

Committed-By-Agent: claude

* feat(engine): update engine + service for envelope protocol

- source-exec/destination-exec: all commands use spawnAndStream (async iterable)
- pipeline.ts: envelope field access (msg.record.stream, msg.state.data, etc.)
- engine.ts: async createEngine, collectSpec/collectCatalog/collectControls
- resolver.ts: async createConnectorResolver, async configSchemaFromSpec
- remote-engine.ts: ConnectionStatusPayload, CatalogPayload types
- source-test/destination-test: async generator envelope messages
- service activities: envelope field access throughout
- app.ts, serve-command, cli: async resolver/app initialization

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* chore: format

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* fix: google-sheets test fixes + format

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* fix: update service, e2e, supabase for envelope protocol + fix lint

- Service app.test.ts: async createConnectorResolver
- Service workflow.test.ts: stale dist rebuild
- Service activities: envelope field access (read-into-queue-with-state)
- E2e conformance: async spec() via collectSpec
- E2e stripe-to-postgres: async createEngine + envelope state access
- Supabase: async discover/setup + envelope state access
- Fix lint (unused var in supabase)
- Format all files

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* fix(ci): await createConnectorResolver in generate-openapi.sh

createConnectorResolver was made async in this PR but the script
wasn't committed with the await keyword.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* feat(engine): source_discover returns AsyncIterable<DiscoverOutput>

Convert Engine.source_discover from Promise<CatalogPayload> to
AsyncIterable<DiscoverOutput> (CatalogMessage | LogMessage | TraceMessage),
making it consistent with spec/check/read which all stream messages.

- Engine interface and createEngine implementation yield directly from
  connector.discover() without buffering
- /discover HTTP endpoint now streams NDJSON (same pattern as /read)
- remote-engine.ts streams NDJSON from /discover and yields DiscoverOutput
- Internal callers (pipeline_setup/read/write) use collectCatalog() to
  buffer when they need the final catalog
- service discover-catalog activity uses parseNdjsonStream + collectCatalog
- Regenerate OpenAPI specs for updated /discover response type

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* fix(dashboard): update discover to parse NDJSON stream

/discover now returns NDJSON instead of JSON. Replace the openapi-fetch
call with a raw fetch that reads NDJSON lines and extracts the catalog
message.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(engine): rename SyncOpts → SourceReadOptions + add JSDoc to engine.ts

SyncOpts was ambiguous — the options object only applies to the source read
phase (state, stateLimit, timeLimit). SourceReadOptions makes the scope clear.

Also adds JSDoc to the Engine interface, its methods, exported Zod schemas,
buildCatalog(), and createEngine().

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ipelineWorkflow (#243)

* docs: plan for structured stream state + per-stream reset signal

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* docs: plan for Airbyte-aligned protocol v2 (envelope messages, TraceMessage, NDJSON everywhere)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(service): inline Google Sheets helpers + rename readIntoQueueWithState

Move withRowKey, compactGoogleSheetsMessages, addRowNumbers, and
augmentGoogleSheetsCatalog out of _shared.ts and into the single activity
that uses each, eliminating the @stripe/sync-destination-google-sheets
coupling from the generic shared file.

Rename readIntoQueueWithState → readGoogleSheetsIntoQueue (and its factory +
file) so the activity name reflects its Google Sheets-only scope.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(service): rename workflows + remove read-write mode from pipelineWorkflow

- Rename pipeline.ts → pipeline-workflow.ts (same export name pipelineWorkflow)
- Rename pipeline-google-sheets.ts → google-sheet-pipeline-workflow.ts,
  export pipelineGoogleSheetsWorkflow → googleSheetPipelineWorkflow
- Remove read-write mode (separate readLoop/writeLoop) from pipelineWorkflow;
  the workflow now only handles the syncImmediate (Postgres) path
- Remove readIntoQueue/writeFromQueue proxy exports from workflows/_shared.ts
- Update all call sites (app.ts, app.test.ts, workflow.test.ts)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* feat(service): weekly reconciliation via condition timeout in pipelineWorkflow

After the initial full sync completes (EOF reached), block on a timed
condition instead of waiting indefinitely. If no webhook event arrives
within reconInterval (default 1 week), reset readComplete so the next
loop iteration re-syncs from the latest state cursor until EOF again.

State (syncState) is preserved across recon cycles — only the EOF marker
(readComplete) resets, so each recon picks up incrementally from the
last known cursor.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* feat(service): add simplestPipelineWorkflow (backfill + recon, no setup/webhooks)

Minimal pipeline workflow for sources that don't need credential setup or
webhook-driven incremental sync. Runs syncImmediate to EOF (backfill), then
waits reconInterval (default 1 week) before resetting the EOF marker and
re-syncing from the latest state cursor. No setup/teardown, no inputQueue.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(service): hardcode ONE_WEEK_MS, remove reconInterval opt

reconInterval was never modified at runtime so threading it through
continueAsNew was unnecessary boilerplate. Hardcode ONE_WEEK_MS directly.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(service): rename readComplete → backfillComplete in simplestPipelineWorkflow

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(service): simplify workflow loops — split conditions + maybeContinueAsNew

Replace the combined condition expression with separate if-statements,
one per state: paused-wait, idle-wait (recon timer), and active work.
Each branch has a single clear purpose and its own await condition().

Also rename tickIteration() → maybeContinueAsNew() to better reflect
what it does.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(service): rename simplestPipelineWorkflow → backfillPipelineWorkflow

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(workflow): remove phase state from pipelineWorkflow + fix GS test overrides

Assume setup is idempotent — drop the `phase` opt that existed solely to
skip setup on continueAsNew continuations. Always run setup at workflow
start; remove `phase` from PipelineWorkflowOpts and from the continueAsNew
call. Simplifies statusQuery to always return phase:'running'.

Also rename per-test activity overrides readIntoQueueWithState →
readGoogleSheetsIntoQueue in workflow.test.ts, and remove the now-invalid
"skips setup when phase is running" test.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* fix(service): update RecordMessage field access for v2 nested structure

v2 changed RecordMessage from flat {stream, data} to nested
{record: {stream, data}}. Update withRowKey, compactGoogleSheetsMessages,
and addRowNumbers to use .record.stream / .record.data.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
* docs: plan for structured stream state + per-stream reset signal

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* docs: plan for Airbyte-aligned protocol v2 (envelope messages, TraceMessage, NDJSON everywhere)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(engine): rename connector list field data → items

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* chore: regenerate openapi after data → items rename

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* fix(dashboard): update getSources/getDestinations to items field

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* fix(engine): update app.test.ts assertions for items rename

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* fix(engine): OAS {type} path params + prettier-format generated specs

- Change /meta/sources/:type and /meta/destinations/:type to use OAS
  {type} format in app.ts so generated spec and openapi-fetch types match
- Pipe generated specs through prettier for readable git diffs
- Regenerate openapi.json and openapi.d.ts to reflect both changes

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* fix(test): update path assertions to OAS {type} format

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(engine): rename RPC paths to underscore style + fix meta operationIds

RPC endpoints are not REST resources — use flat underscore paths at root level:
  /setup       → /pipeline_setup
  /teardown    → /pipeline_teardown
  /check       → /pipeline_check
  /read        → /pipeline_read
  /write       → /pipeline_write
  /sync        → /pipeline_sync
  /discover    → /source_discover

/meta/* paths unchanged (those are RESTful).

Also rename inconsistent operationIds:
  meta_source      → meta_sources_get
  meta_destination → meta_destinations_get

Generated files need regeneration after this commit.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* fix(dashboard): revert as any cast on source_discover (will type once spec regenerated)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(engine): SourceReadOptions with catalog + pipeline_sync delegates to read

- Replace SyncOpts with SourceReadOptions { catalog?, state?, stateLimit?, timeLimit? }
  Caller can pass a pre-built ConfiguredCatalog to skip discover in pipeline_read
- Move takeLimits into pipeline_read (where the stream originates)
- pipeline_write captures eof from its message input and re-emits it, so
  eof propagates through to sync callers without a redundant takeLimits
- pipeline_write accepts optional ConfiguredCatalog to avoid double-discover
- pipeline_sync: no pipeline_setup call; discovers once, builds both catalogs,
  delegates to pipeline_write(pipeline, pipeline_read(pipeline, {...opts, catalog}), filteredCatalog)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* fix(service): rename SyncOpts → SourceReadOptions in activity files

catalog is now part of SourceReadOptions directly.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* chore: regenerate OpenAPI spec after path renames

- /setup → /pipeline_setup
- /teardown → /pipeline_teardown
- /check → /pipeline_check
- /discover → /source_discover
- /read → /pipeline_read
- /write → /pipeline_write
- /sync → /pipeline_sync
- operationId meta_source → meta_sources_get
- operationId meta_destination → meta_destinations_get

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* test(engine): fix test failures after route renames + setup/eof changes

- Update `/read` and `/sync` paths to `/pipeline_read` and `/pipeline_sync` in app.test.ts
- Add pipeline_setup() call before pipeline_sync() in sync.test.ts (setup no longer implicit)
- Update pipeline_write test: expects [state, eof] (length 2) not [state] (length 1)
- Fix pipeline_sync no-input test: expects [eof] (length 1) not length 2

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* test(e2e): call pipeline_setup before pipeline_sync in e2e tests

pipeline_setup is no longer called inside pipeline_sync, so tests must
call it explicitly to create the postgres schema and tables.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* fix(ci): regenerate openapi specs + update docker.test.sh for /pipeline_* paths

- Regenerate apps/engine/src/__generated__/openapi.{json,d.ts} to match
  the merged app.ts (which now has envelope protocol types from v2 + /pipeline_* paths)
- Regenerate apps/service/src/__generated__/openapi.{json,d.ts}
- Update e2e/docker.test.sh: /read → /pipeline_read, /write → /pipeline_write,
  /setup → /pipeline_setup (align with connector-list-items path naming)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* fix(e2e): update smokescreen test to use /pipeline_* routes

Routes were renamed from /read, /setup, /write to /pipeline_read,
/pipeline_setup, /pipeline_write in the engine API refactor.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
* feat(service): restore FS pipeline store as authoritative source of truth

Add back the file-system pipeline store so pipeline config survives Temporal
workflow failures, continueAsNew gaps, and restarts.

- `stores.ts`: PipelineStore interface
- `stores-fs.ts`: filePipelineStore (JSON files per pipeline in a directory)
- `app.ts`: routes read/write from FS store when present; Temporal used only
  for workflow lifecycle + status queries
- `cli.ts`: --data-dir flag on serve command; DATA_DIR env var for CLI subcommands

When --data-dir is omitted the service falls back to Temporal-only mode for
backwards compatibility.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(service): workflows and activities reference pipelines by ID only

Activities receive pipelineId instead of PipelineConfig, look up config
from the PipelineStore. Workflows carry only the pipeline ID through
continueAsNew — no more full pipeline object in Temporal history.

- Activities: all 8 refactored to accept pipelineId, look up via store
- setup() writes config mutations (webhook endpoints) back to store
- Workflows: pipelineId input, no in-memory pipeline, no configQuery
- updateSignal simplified to { paused?: boolean } (config in store)
- stores.ts: add update() method and toConfig() utility

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(service): API and CLI use PipelineStore as source of truth

- createApp requires PipelineStore (no Temporal-only fallback)
- POST /pipelines: write to store, start workflow with just pipelineId
- GET /pipelines: read from store, merge Temporal status
- PATCH: write to store, signal workflow (no config payload)
- Pause/resume: read pipeline from store for response
- Delete: signal workflow teardown, then remove from store
- CLI: --data-dir is required on serve, worker, and webhook commands
- Worker: threads PipelineStore through to createActivities

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* test(service): update tests for pipelineId-only workflows

- app.test.ts: createApp requires PipelineStore, use memoryPipelineStore
- workflow.test.ts: workflows receive pipelineId string, remove config query test
- cli.test.ts: worker command requires --data-dir
- Add stores-memory.ts: in-memory PipelineStore for testing

All 20 tests pass (19 + 1 skipped).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* chore: format

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* chore: regenerate OpenAPI specs

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* ci: add --data-dir to service and worker Docker containers

The service and worker CLI now require --data-dir. Use a shared Docker
volume so both containers access the same pipeline store.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
…it, nested configs, typed pipeline_read (#247)

* refactor(engine): rename meta_source/meta_destination → meta_sources_get/meta_destinations_get

Consistent naming with meta_sources_list/meta_destinations_list. Also
remove `as any` casts in remote-engine.ts now that OpenAPI types match.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(connectors): move full spec into spec.ts default export

Each connector's spec.ts now exports default ConnectorSpecification
(config + stream_state where applicable). The spec() method in each
index.ts yields the default directly instead of inlining z.toJSONSchema
calls. Stripe's stream state Zod schemas move from index.ts to spec.ts.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* fix(source-stripe): rename camelCase state fields to snake_case

pageCursor → page_cursor, numSegments → num_segments, inFlight → in_flight
in SegmentState, BackfillState, StripeStreamState and all Zod schemas.

Also expands principle #4 (snake_case on the wire) to explicitly call out
connector spec schemas (config, stream_state, input).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(engine): rename timeLimitMs → timeLimit, simplify pipeline_sync

takeLimits opts now take timeLimit in seconds (consistent with the HTTP
query param name). The ms conversion happens inside takeLimits.

Also: drop the setup call from pipeline_sync (callers set up pipelines
explicitly), remove redundant explicit return-type annotations now that
the object satisfies Engine.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* chore: format

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* fix(engine): fix tests broken by timeLimitMs→timeLimit and meta route renames

- pipeline.test.ts: update timeLimitMs → timeLimit, adjust values to seconds
- remote-engine.test.ts: meta_source → meta_sources_get, meta_destination → meta_destinations_get
- engine.ts: restore correct pipeline_sync body (limits on write output, not read)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* fix(engine): restore explicit Engine type annotation to fix circular ref

satisfies Engine on a self-referencing object literal causes TS7022/7023
implicit any errors. Revert to const engine: Engine = { ... }.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* feat(engine): typed pipeline_read request body with source-key wrapping

Each NDJSON line in the pipeline_read body must now be wrapped under the
source type key for sources that declare an `input` JSON Schema in their
spec (e.g. {"stripe": {...webhookEvent...}}). The handler unwraps the key
and validates the payload against the input schema before passing it to
source.read().$stdin.

Sources without an input schema (e.g. test fixtures) continue to receive
the body as-is for backward compatibility.

On the OAS side, injectConnectorSchemas now:
- Injects a StripeWebhookInput (etc.) component schema from spec.input
- Builds a SourceInput oneOf where each variant wraps the input schema
  under the source type key
- Patches /pipeline_read POST with a required:false application/x-ndjson
  requestBody referencing SourceInput

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(protocol): nested connector config envelope { type, [type]: payload }

Restructures source and destination configs from flat to nested, matching
the existing message protocol pattern (e.g. RecordMessage, StateMessage):

  Before: { "type": "stripe", "api_key": "sk_test_..." }
  After:  { "type": "stripe", "stripe": { "api_key": "sk_test_..." } }

Changes:
- engine.ts: add configPayload() helper; replace 9 spread-destructuring
  sites with configPayload(pipeline.source/destination)
- createSchemas.ts: build discriminated union variants as
  z.object({ type: z.literal(name), [name]: obj }) instead of
  obj.extend({ type: z.literal(name) })
- service/app.ts: update google-sheets spreadsheet_id guard to access
  nested destination['google-sheets']?.spreadsheet_id
- protocol.ts: update PipelineConfig JSDoc to document envelope format
- All test files (10): update pipeline config literals to nested format
- dashboard/PipelineCreate.tsx: use [sourceType]/[destType] computed keys

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* chore: format

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* fix(e2e): update stripe-to-postgres tests for nested config envelope

Update makePipeline() calls to use the nested { type, [type]: payload }
connector config format introduced in a89f303.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* chore: regenerate openapi specs

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* fix(engine): nested SourceConfig/DestinationConfig schemas in OAS spec; SourceInput discriminated union

- injectConnectorSchemas: build StripeSourceConfig as { type, stripe: {...} }
  instead of flat { type, api_key, ... } — matches the wire format
- SourceInput oneOf: add discriminator { propertyName: "type" } and include
  a type enum field in each variant, matching the SourceConfig pattern
- Regenerate openapi.json + openapi.d.ts

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* fix(engine): x-pipeline required; rename StripeWebhookInput→StripeEventInput; regen spec

- Remove .optional() from xPipelineHeader — header is always required
- Rename StripeWebhookInput → StripeEventInput in connectorInputSchemaName
- Update app.test.ts missing-header assertion (Zod now validates before handler)
- Regenerate openapi.json + openapi.d.ts

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(engine): invert OAS schema names to role-first convention

SourceStripe / DestinationPostgres / DestinationGoogleSheets instead of
StripeSourceConfig / PostgresDestinationConfig — matches package naming
convention (@stripe/sync-source-stripe → SourceStripe).

StripeEventInput → StripeSourceInput (later: Source${pascal}Input pattern).
Update tests and regenerate openapi.json + openapi.d.ts.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* fix(e2e): update shell tests for nested connector config envelope

Update all shell E2E test scripts to use the nested config format
introduced in a89f303:
  Before: {"source":{"type":"stripe","api_key":"..."}}
  After:  {"source":{"type":"stripe","stripe":{"api_key":"..."}}}

Files updated: docker.test.sh, smokescreen.test.sh,
connector-loading.test.sh, publish.test.sh

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* fix(e2e): add pipeline_setup before pipeline_sync in backfill test

pipeline_sync no longer calls pipeline_setup internally (removed in
bff449a). The backfill test must call setup explicitly to create the
destination schema/tables before syncing.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* fix(engine): SourceInput as proper discriminated union with named $ref variants

SourceStripeInput is now the full wrapper schema { type, stripe: {...event...} },
mirroring how SourceStripe wraps the config payload. SourceInput.oneOf uses
$ref instead of inline objects, enabling proper code generation.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(engine): polymorphism lives only at union level; raw Config schemas are payload-only

- SourceStripeConfig / DestinationPostgresConfig / DestinationGoogleSheetsConfig
  contain only the raw connector payload (no type field, no nested wrapper key)
- SourceConfig / DestinationConfig / SourceInput discriminated unions define
  the { type, [name]: { $ref: ...Config } } envelope inline — polymorphism at top only
- Rename SourceStripe → SourceStripeConfig, DestinationPostgres → DestinationPostgresConfig, etc.
- Update tests and regenerate openapi.json + openapi.d.ts

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* fix(engine): use const instead of enum for single-value type discriminators in OAS

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* feat(source-stripe): cache discover results in-memory by api_version

Adds an in-memory Map<api_version, CatalogPayload> cache to source-stripe's
discover() so that pipeline_sync (which calls discover twice — once in
pipeline_read, once in pipeline_write) doesn't repeat the SpecParser.parse +
catalogFromOpenApi work. For the bundled spec this is CPU-only (no HTTP), so
the cache eliminates redundant computation rather than network calls.

Also expands the buildCatalog JSDoc in engine.ts to document the catalog
hydration design: we store only the user's minimal stream selection in
PipelineConfig.streams and re-discover at runtime, so the catalog always
reflects the current API shape. Contrast with Airbyte which persists the
full ConfiguredAirbyteCatalog after the initial discover.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(protocol): convert per-command output types to Zod schemas

SpecOutput, CheckOutput, DiscoverOutput, SetupOutput, and TeardownOutput
were plain TypeScript type aliases (union of message types). Convert them
to proper Zod discriminated unions with .meta({ id }) so they produce
correct OAS schemas. The inferred types remain identical.

Use DiscoverOutput in the engine's /source_discover route so the OAS spec
accurately reflects that discover only emits catalog, log, and trace
messages — not the full Message union.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* chore: regenerate OpenAPI specs

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(scripts): generate engine OAS spec in-process instead of via subprocess

Replace the subprocess + port-binding + curl approach with a direct
in-process app.request('/openapi.json') call, matching the pattern
already used for the service spec. Eliminates ~6s of startup/polling
overhead. The engine spec is structurally identical; the formatting
change comes from prettier normalizing the in-process JSON output.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* feat(hono-zod-openapi): JSON content header support via .meta({ param: { content } })

Header fields annotated with .meta({ param: { content: 'application/json' } })
on a z.string().transform(JSON.parse).pipe(schema) chain get:

- Runtime: zValidator parses JSON via transform, validates via pipe (no
  custom middleware needed — standard zValidator handles everything)
- OAS spec: parameter uses content: { 'application/json': { schema } }
  with the pipe output type instead of schema: { type: 'string' }

Includes 8 unit tests covering runtime validation + OAS spec generation.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* feat(engine): createConnectorSchemas — typed Zod unions with .meta({ id })

Builds SourceConfig, DestinationConfig, SourceInput discriminated unions
and PipelineConfig from the connector resolver. Each schema gets a
.meta({ id }) annotation for automatic OAS component registration by
zod-openapi. Individual config schemas (SourceStripeConfig, etc.) contain
only the raw connector payload — the { type, [name]: payload } envelope
is at the union level.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(engine): wire typed Zod schemas into route definitions, delete parseSyncParams + injectConnectorSchemas

Routes now use z.string().transform(JSON.parse).pipe(TypedPipelineConfig)
with .meta({ param: { content: 'application/json' } }) for X-Pipeline and
X-State headers. This gives:
- Runtime: Zod validates the full typed config (discriminated union)
- OAS spec: auto-generated content encoding + named component schemas

Changes:
- app.ts: replace parseSyncParams with c.req.valid('header')['x-pipeline'],
  build typed schemas from createConnectorSchemas(resolver), remove
  injectConnectorSchemas/addDiscriminators from /openapi.json handler
- createSchemas.ts: add connectorVariantName for named discriminated union
  variants (enables discriminator.mapping in OAS)
- hono-zod-openapi: pass pipe output Zod schemas to createDocument for
  full recursive component discovery (fixes missing nested schemas)
- app.test.ts: update assertions for new validation behavior

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* chore: regenerate openapi specs

Schemas now auto-generated from Zod .meta({ id }) annotations via
createDocument — no more injectConnectorSchemas post-processing.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(engine): simplify pipeline_read input validation using SourceInput schema

Replace the manual rawInputJsonSchema lookup + z.fromJSONSchema + unwrap
logic with the pre-built SourceInput discriminated union from
createConnectorSchemas. Each NDJSON line is validated via SourceInput.parse()
then unwrapped — ~15 lines replaced with 3.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(scripts): rewrite OAS generation to run from TS source via bun

- No build step required: reads TypeScript directly via bun
- Two-phase: generate specs to temp files, diff against committed, exit
  early if nothing changed (~1s no-change path vs ~4s before)
- Phase 2 (openapi-typescript) only runs when specs actually changed,
  with both .d.ts generations running in parallel
- Add "type": "module" to root package.json (repo is ESM-only; this
  also makes tsx work correctly for scripts in the repo root)
- Extract inline node -e script to scripts/generate-openapi-specs.ts

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* test(engine): add SourceInput validation tests for pipeline_read

Tests the SourceInput.parse() path with a source that has rawInputJsonSchema:
- Valid wrapped input: accepted, unwrapped data passed to source
- Invalid input (wrong type): stream errors with Zod validation failure

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(engine): delete injectConnectorSchemas and naming helpers from openapi-utils

All connector schema injection is now handled by Zod .meta({ id })
annotations in createConnectorSchemas — no post-hoc spec patching needed.

Deleted: injectConnectorSchemas(), connectorSchemaName(),
connectorInputSchemaName(), capitalize().

Kept: endpointTable() (used by engine + service), addDiscriminators()
(still used by service until it adopts .meta({ id }) on its schemas).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor: adopt .meta({ id }) in service createSchemas, delete addDiscriminators

Service's createSchemas now uses .meta({ id }) on all discriminated union
variants + unions, matching the engine pattern. zod-openapi auto-generates
discriminator.mapping — no post-processing needed.

addDiscriminators() deleted from openapi-utils.ts (no callers remain).
openapi-utils.ts is now just endpointTable() (11 lines).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* fix(hono-zod-openapi): eliminate *Output duplicate schemas via outputIdSuffix: ''

zod-openapi generates FooOutput duplicates when the same schema appears in
both request and response positions. Setting outputIdSuffix: '' in
createDocument options prevents this at the source.

Also: use raw $ref objects for NDJSON content schemas in route definitions
(not auto-validated anyway), register Zod schemas once via components in
getOpenAPI31Document. Removes the post-processing band-aid entirely.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* fix(engine): resolve dev script import failures by adding --conditions bun

tsx doesn't recognize the "bun" export condition by default, so workspace
packages resolved to stale dist/ builds instead of TS source. Passing
--conditions bun makes Node use the source entry points directly.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* docs: fix dev command in AGENTS.md

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* chore: regenerate openapi specs (Output duplicates eliminated)

-3119 lines from removing *Output duplicate schemas. outputIdSuffix: ''
in the library prevents zod-openapi from generating separate input/output
schema variants.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor: inline discriminated union variants in SourceConfig/DestinationConfig

Remove .meta({ id }) from variant wrappers so they inline into oneOf
instead of generating separate SourceStripe/DestinationPostgres named
schemas. Payload schemas (SourceStripeConfig, etc.) stay as named $refs.

Delete unused connectorVariantName/connectorInputVariantName helpers.
Regenerate specs.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* fix(engine): restore addDiscriminators for inline oneOf variants

zod-openapi only generates discriminator.mapping for named ($ref) variants.
Inline variants (SourceConfig, DestinationConfig, SourceInput) need the
addDiscriminators post-processing to add discriminator: { propertyName: "type" }.

Reverted the meta({ discriminator }) approach (doesn't propagate through
createDocument). addDiscriminators is a lightweight spec walker, not the
heavy injectConnectorSchemas it replaced.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* feat(engine): document and test optional requestBody on pipeline_read and pipeline_sync

- Add requestBody: { required: false } to pipeline_read and pipeline_sync routes
  using SourceInput schema when available, falling back to Message
- Add spec tests verifying optional requestBody and SourceInput schema ref
- Add runtime tests for pipeline_sync push mode (raw unwrapped input)
- pipeline_read validates and unwraps SourceInput envelope in handler
- pipeline_sync passes stream as-is (no handler-level unwrapping)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* feat(hono-zod-openapi): auto-generate discriminator.propertyName via override

zod-openapi only emits discriminator when variants have .meta({ id }) for
the mapping. But OAS 3.1 only requires propertyName. The library now uses
createDocument's override option to inject discriminator: { propertyName }
for any z.discriminatedUnion schema — works for both named $ref and
inline variants.

Deletes addDiscriminators from openapi-utils.ts and both app handlers.
openapi-utils.ts is now just endpointTable() (11 lines).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* chore: add pre-push hook to verify OpenAPI specs are up to date

Runs ./scripts/generate-openapi.sh --check before push. Catches stale
specs locally instead of waiting for CI.

Install: git config core.hooksPath .githooks

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(protocol): align ControlMessage with Airbyte, add MessageBase envelope, stream utilities

- Add MessageBase with _emitted_by (origin tag) and _ts (ISO 8601 timestamp)
  as engine-injected envelope metadata on all 9 message types
- Rename control_type: 'config_update' → 'connector_config' (Airbyte alignment)
- Add SyncOutput union (DestinationOutput | ControlMessage) for pipeline_sync
- Add stream-utils: channel(), merge(), split(), map() async iterable primitives
- Refactor pipeline_sync to fork read stream via split() + merge():
  data+eof → destination, source signals (control/log/trace) → caller
  Fully streaming, no buffering
- Service syncImmediate now collects and persists control config patches
- Update protocol-comparison doc to reflect alignment

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* chore: regenerate OpenAPI specs

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* fix(ci): fall back to npx tsx when bun is not available

The generate-openapi.sh script hardcoded `bun` which isn't installed
in the CI runner. Now tries bun first, falls back to npx tsx.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* fix(e2e): accept Zod discriminator error for unknown connector type

The TypedPipelineConfig discriminated union rejects unknown connector
types at header validation before the engine sees them. The test now
accepts either "not found" (engine-level) or "No matching discriminator"
(Zod validation) as valid rejection of unknown connectors.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(service): move createRemoteEngine into ActivitiesContext as engine

Every activity was creating a new remote engine on each call. Now the
engine client is created once in ActivitiesContext and shared across all
activities. JSDoc documents it as a drop-in replacement for a local engine.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor: snake_case SourceReadOptions, remove toConfig, rename syncOpts

- SourceReadOptions: stateLimit → state_limit, timeLimit → time_limit
- Remove toConfig() — inline destructure `const { id: _, ...config } = pipeline`
- Rename syncOpts → readOpts in activities
- Update all usages across engine, service, workflows, and tests

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(service): remove unnecessary type casts in activities

drainMessages and collectError now accept Message instead of
Record<string, unknown>, eliminating the need for casts at call sites.
Use discriminated union narrowing instead of manual casts.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor: remove SetupResult, setup activity returns void

The workflow never uses the setup return value. The activity now persists
config patches internally and returns void. SetupResult Zod schema removed.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(protocol): stream pipeline_check/setup/teardown, generic collect helpers

Engine interface: pipeline_check, pipeline_setup, and pipeline_teardown now
return AsyncIterable<Message> streams (tagged with _emitted_by) instead of
collapsed Promise types. Every pipeline operation is now a stream.

Protocol helpers: replace 5 named collectors (collectSpec, collectCatalog,
collectConnectionStatus, collectControls, drainStream) with 3 generic helpers:
- collectMessages(stream, ...types) — collect all messages of given type(s)
- collectFirst(stream, type) — collect first message, throw if missing
- drain(stream) — consume all, throw on trace errors

HTTP routes: /pipeline_check (now POST), /pipeline_setup, /pipeline_teardown
all return NDJSON streams instead of JSON.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(service): rename pipelines → pipelineStore for clarity

Consistent naming: the variable is a store, not the pipelines themselves.
API route paths (/pipelines) and operation IDs unchanged.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(service): use SourceInput type alias instead of unknown[]

Locally defined SourceInput = unknown for readability — makes it clear
these are source input events (e.g. webhook payloads), not arbitrary data.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* chore: regenerate OpenAPI specs

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(engine): simplify check/setup/teardown stream composition

- merge() now accepts falsy values (false | null | undefined) and skips
  them, so conditional streams can be passed directly without an array
  + filter workaround
- pipeline_check/setup/teardown no longer need a local streams[] array
  or push() helper — streams are composed inline as merge() arguments
- discover-catalog activity: destructure source/streams directly from
  pipeline instead of spreading away id

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* fix(test): drain pipeline_setup AsyncIterable in sync lifecycle test

pipeline_setup returns AsyncIterable<SetupOutput>, not a Promise.
Using plain `await` on an AsyncIterable doesn't iterate it, so
setup never ran and the destination table wasn't created.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* fix(service): remove stale cast in write-google-sheets-from-queue

collectError now accepts Message directly; the old `as unknown as
Record<string, unknown>` cast was left over from before the protocol
refactor.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* chore: add dev:engine shortcut to root package.json

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* fix(service): update test to use renamed pipelineStore option

Commit 9caee1d renamed `pipelines` to `pipelineStore` in AppOptions
but the test was not updated, causing all CRUD tests to 500.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(schemas): add connectorUnionId helper, regenerate OpenAPI specs

- Add `connectorUnionId(role)` helper to ensure both engine and service
  createSchemas use a shared function for union schema IDs
- Regenerate OpenAPI specs to match current naming convention
  (SourceStripe not SourceStripeConfig, Source/Destination/Pipeline
   not SourceConfig/DestinationConfig/PipelineConfig)
- Update tests to match regenerated schema names

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(schemas): add Config suffix to OAS schema names, regenerate specs

Rename OAS component schemas: Source→SourceConfig, Destination→DestinationConfig,
Pipeline→PipelineConfig, SourceStripe→SourceStripeConfig, etc. via connectorSchemaName()
and connectorUnionId(). Update tests and regenerate OpenAPI specs. Fix formatting.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* fix(e2e): drain pipeline_setup AsyncIterable in stripe-to-postgres tests

pipeline_setup now returns AsyncIterable<SetupOutput> — bare `await`
does not consume the iterable, so setup never executes and tables are
never created. Use drain() to fully consume the stream.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…SourceInput envelope (#248)

* feat(protocol): add SyncState + discriminated StatePayload (stream | global)

- SyncState = { streams, global } replaces Record<string, unknown>
- StatePayload is now z.union([StreamStatePayload, GlobalStatePayload])
- StreamStatePayload defaults state_type to 'stream' for backward compat
- Source.read() and SyncParams.state updated to accept SyncState
- stateMsg helper supports both old { stream, data } and new { state_type: 'global', data }
- stateStream helper returns undefined for global state

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* feat(engine): update StateStore interface — get() → SyncState, add setGlobal()

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* feat(engine): route global/stream state in persistState; pass global through enforceCatalog

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* feat(engine): SourceReadOptions.state → SyncState

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* feat(engine): pretty-print logs in dev; add build info to /health

- LOG_PRETTY=true in dev script enables pino-pretty via transport
- /health now returns GIT_COMMIT, COMMIT_URL, BUILD_DATE from env

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* chore: regenerate openapi specs

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* feat(engine): X-State header accepts SyncState with old-format backward compat

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* feat(state-postgres): _global reserved row + setGlobal(); get() returns SyncState

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* feat(service): thread SyncState through workflows + drainMessages

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* chore: regenerate openapi specs (temporary simplified x-state header)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* feat(source-stripe): update state access to state.streams (SyncState type infra)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* chore: restore full x-state header; add plan + protocol-comparison docs

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* fix: update all state consumers for SyncState type (destinations, CLI, google-sheets workflow, tests)

- destination-postgres/google-sheets: narrow state_type before accessing .stream
- protocol/cli.ts: wrap legacy flat state in { streams, global }
- app.ts: use transform+pipe instead of z.union for zod-openapi compat
- google-sheet-pipeline-workflow: thread SyncState through read/write state
- read-google-sheets-into-queue: accumulate SyncState
- workflow.test.ts: expect state.streams.customers instead of state.customers

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* fix: update workflow test mocks to use SyncState format

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* chore: format

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* feat(service): hardcode syncImmediate limits — state_limit: 100, time_limit: 10s

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* chore: regenerate openapi specs

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* fix: update supabase edge function + read-into-queue for SyncState; add copilot review plan

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(engine): source_discover uses X-Source header instead of X-Pipeline

The endpoint only needs source config, not a full pipeline. Removes the
fake `destination: { type: '_' }` hack from remote-engine.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* chore: regenerate openapi specs

Committed-By-Agent: claude

* fix: use fetch HTTP client for Stripe SDK, fix error handling in NDJSON streams

- Use Stripe.createFetchHttpClient() to avoid WebPlatformFunctions crash
  when running under tsx --conditions bun
- Make ndjsonResponse accept a typed onError callback instead of emitting
  an off-protocol { type: 'error' } message
- logApiStream now yields a proper TraceMessage on failure instead of
  re-throwing (errors stay in the NDJSON stream)
- Suppress unhandled promise rejections in merge() by adding no-op .catch()
  to pending promises

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(protocol): rename stream-utils to async-iterable-utils

Rename to reflect that these are generic async iterable combinators,
not stream-specific. Add JSDoc error-handling docs to each function
and fix split() swallowing unhandled rejections.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* feat(source-stripe): expose supported API versions via config JSON Schema

Change api_version from z.string() to z.enum(SUPPORTED_API_VERSIONS) so
the config JSON Schema includes an enum of supported versions. Clients
read GET /meta/sources/stripe → config_schema.properties.api_version.enum
to discover available versions. Zero protocol changes, zero new endpoints.

DDR-008 documents the decision and alternatives considered.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(openapi): rename bundled-spec.json → oas/2026-03-25.dahlia.json

Use the actual API version as the filename inside an oas/ directory.
This supports bundling multiple versions in the future — each gets its
own {version}.json file. Build script copies the whole oas/ dir to dist/.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(protocol): role-prefixed message types + Stripe polymorphism pattern

- ControlPayload: control_type 'connector_config' → 'source_config' | 'destination_config'
  with matching sub-hash fields (Stripe {type, [type]: payload} pattern)
- StateMessage → SourceStateMessage (type: 'source_state', source_state: payload)
- ConnectorSpecification: stream_state → source_state_stream, input → source_input,
  added source_state_global
- New SourceInputMessage protocol type
- Principle 10: Stripe polymorphism pattern
- Decision record: docs/plans/control-config-polymorphism.md

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* chore: regenerate OpenAPI specs

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* docs: update AGENTS.md for SourceStateMessage + OpenAPI regen workflow

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* chore: format client.test.ts

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* revert: remove createFetchHttpClient() from Stripe client options

Breaks smokescreen E2E test — will fix in a separate commit.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(protocol): full config replacement for control messages

Switch from shallow-merge to full-replacement semantics for control
messages, aligning with Airbyte's actual implementation. Connectors
now emit the complete updated config; the orchestrator replaces the
stored config wholesale.

- Add sourceControlMsg/destinationControlMsg helpers to protocol
- Connectors emit full config: `{ ...config, ...updates }`
- Service consumers replace instead of merge (last config wins)
- OAS spec: ControlMessage source_config/destination_config now
  $ref actual connector config schemas (e.g. SourceStripeConfig)
- Add protocol control.test.ts + app.test.ts assertion for typed OAS

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* chore: regenerate OpenAPI specs

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor: SourceInput envelope format { type: 'source_input', source_input }

Align SourceInput with the message envelope pattern used by all other
protocol messages. The connector-specific payload is now unwrapped from
source_input instead of a connector-keyed field.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* chore: regenerate OpenAPI specs

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(source-stripe): replace Stripe Node SDK with typed fetch client

Remove the `stripe` npm package from source-stripe and engine. All Stripe
API calls now use a typed fetch wrapper with Zod-validated responses:

- `makeClient()` returns `{ getAccount, listEvents, listWebhookEndpoints,
  createWebhookEndpoint, deleteWebhookEndpoint }` using raw fetch
- Webhook signature verification via HMAC-SHA256 (`verifyWebhookSignature`)
  replaces `stripe.webhooks.constructEvent()`
- Service webhook handler now verifies signatures at the HTTP boundary
  before forwarding to Temporal
- Stripe API types (StripeAccount, StripeWebhookEndpoint, StripeApiList)
  defined as Zod schemas in packages/openapi
- StripeEvent schema (renamed from webhookEventSchema) stays in source-stripe
- Dead `stripe` param removed from `processStripeEvent` signature
- SDK remains only as devDependency in service/e2e for test fixtures

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* chore: use bun --watch in dev scripts when bun is available

Add x:watch helper script to engine and service packages that detects
bun availability and uses bun --watch, falling back to tsx --watch.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* chore: update pnpm-lock.yaml

Committed-By-Agent: claude

* fix: update lockfile after removing stripe SDK dependency

The stripe@^17.7.0 dep was removed from apps/engine/package.json but
pnpm-lock.yaml was not regenerated, breaking all CI jobs on frozen install.
Also fixes formatting in createSchemas.ts.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* docs: update todos

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* fix(source-stripe): make StripeAccountSchema.created optional

The /v1/account endpoint doesn't always return created (e.g. for some
platform/test accounts). The field is only used with a ?? fallback in
getAccountCreatedTimestamp, so making it optional is correct.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* Fix source_discover swallowing errors, add proxy and discover tests

- Wrap source_discover route in logApiStream so network/spec errors
  are emitted as trace error messages instead of silently closing
- Add tests for POST /source_discover: happy path, error propagation,
  missing header 400
- Add fetchWithProxy call-site tests covering proxy, no-proxy, localhost
  bypass, and NO_PROXY domain bypass

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(source-stripe): replace magic timestamp with named constant

Replace 1293840000 with STRIPE_LAUNCH_TIMESTAMP derived from the ISO date
'2011-01-01T00:00:00Z', with a comment explaining the fallback rationale.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…refactors (#250)

* Accept any api_version string, advertise known versions via anyOf enum

Replace z.enum(SUPPORTED_API_VERSIONS) with z.string() so the engine
accepts non-bundled versions (CDN-fetched). Advertise known versions
in the JSON schema via anyOf so z.fromJSONSchema in the resolver
produces a union that accepts any string rather than a strict enum.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* feat(openapi): generate SUPPORTED_API_VERSIONS from oas/ directory

Replace hardcoded BUNDLED_API_VERSION / SUPPORTED_API_VERSIONS in
specFetchHelper.ts with a generated src/versions.ts derived from the
files present in packages/openapi/oas/*.json.

- Add scripts/generate-versions.mjs — scans oas/, writes src/versions.ts
- Hook into build: node scripts/generate-versions.mjs && tsc
- specFetchHelper.ts imports from ./src/versions.js (no more hardcodes)
- Revert api_version from z.string() back to z.enum(SUPPORTED_API_VERSIONS)
- Remove anyOf JSON Schema hack; enum is now the direct source of truth
- Update spec.test.ts to assert strict enum validation

Adding a new bundled API version is now: drop the .json into oas/,
run pnpm --filter @stripe/sync-openapi build. Same oas/ directory
powers the docs CDN (docs/scripts/generate-stripe-specs.mjs).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* feat(openapi): fetch SUPPORTED_API_VERSIONS from CDN manifest at build time

generate-versions.mjs now fetches stripe-sync.dev/stripe-api-specs/manifest.json
to populate SUPPORTED_API_VERSIONS (51 versions) while BUNDLED_API_VERSION
stays derived from the single oas/*.json file.

The CDN manifest is produced by docs/scripts/generate-stripe-specs.mjs —
same source, shared contract.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* chore: regenerate OpenAPI specs with 51-version api_version enum

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* feat(openapi): build SUPPORTED_API_VERSIONS from committed oas/manifest.json

Remove network fetch at build time. Instead:
- oas/manifest.json is committed alongside the bundled spec
- generate-versions.mjs reads it locally (reproducible, no network)
- docs/scripts/generate-stripe-specs.mjs now also writes manifest to
  packages/openapi/oas/manifest.json when updating the CDN

To pick up new Stripe API versions:
  1. Run docs/scripts/generate-stripe-specs.mjs
  2. Commit updated oas/manifest.json
  3. Run pnpm --filter @stripe/sync-openapi build

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(openapi): move CDN spec generation into packages/openapi/scripts/

- Add scripts/generate-all-specs.mjs (heavyweight): moved from
  docs/scripts/generate-stripe-specs.mjs; walks stripe/openapi git
  history, writes spec files + manifest.json to <outputDir> for CDN,
  and updates src/versions.ts as a side effect
- Keep scripts/generate-versions.mjs (lightweight): standalone utility
  to bootstrap src/versions.ts from just the bundled oas/ file
- Remove build-time version generation from package.json — src/versions.ts
  is a committed file updated by the heavyweight script; build = tsc only
- Remove oas/manifest.json — not needed in the package
- docs/scripts/generate-stripe-specs.mjs → thin wrapper that delegates
  to packages/openapi/scripts/generate-all-specs.mjs

packages/openapi is now the source of truth for Stripe API version
discovery. CDN invokes generate-all-specs.mjs with an output directory.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(service): remove generic queue activities, fix global state replacement

- Delete read-into-queue.ts and write-from-queue.ts — unused generic
  queue activities; pipelineWorkflow and backfillPipelineWorkflow use
  syncImmediate directly, and googleSheetPipelineWorkflow has its own
  GS-specific queue activities
- Fix global source_state handling: replace wholesale (state.global = data)
  instead of merging (Object.assign) — consistent with stream state
  which already does per-stream replacement

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(openapi): rename generate-all-specs → generate-specs, add --versions filter

- Rename generate-all-specs.mjs → generate-specs.mjs
- Add --versions flag to generate specific versions only (e.g. for
  updating the bundled spec in oas/):
    node scripts/generate-specs.mjs oas --versions 2026-03-25.dahlia
  Without the flag, all versions are generated (CDN use case).
- Update docs/scripts/generate-stripe-specs.mjs wrapper reference
- Update generate-versions.mjs comment reference

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* fix(openapi): resolve [object Object] content key in header params

- Fix getParamContentType() to extract the media type string from the
  content object rather than returning the whole object, which coerced
  to '[object Object]' as a property key
- Rename x-state header to x-source-state for clarity
- Add .meta({ id: 'SourceState' }) to SyncState for a named OAS schema
- Regenerate openapi.json / openapi.d.ts

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(state): activity owns state merge, workflow holds full copy

Activities now receive the current full SyncState as initial state and
return the updated full SyncState. Workflows simply assign result.state
directly — no merge logic in the workflow layer.

- drainMessages accepts initialState and starts from it
- syncImmediate passes readOpts.state to drainMessages
- readGoogleSheetsIntoQueue starts state from readOpts.state
- writeGoogleSheetsFromQueue accepts opts.state and starts from it
- All workflows: syncState = result.state (no spread merge)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(protocol): rename SyncState → SourceState, backfillComplete → reconcileComplete

- Rename SyncState Zod schema and type to SourceState across all packages
  (.meta({ id: 'SourceState' }) was already correct)
- Rename backfillComplete → reconcileComplete in backfill workflow
- Add reconcileComplete to BackfillPipelineWorkflowOpts so it survives continueAsNew

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* chore: regenerate OpenAPI specs (SourceState description update)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* docs(todos): clean up orphan items, promote active work to Now

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
…ons (#252)

Add `"customConditions": ["bun"]` to tsconfig.base.json so TypeScript
resolves workspace package imports through the existing "bun" export
condition (pointing to source .ts files) instead of "types" (pointing
to dist/*.d.ts). With this change, `tsc --noEmit` and `pnpm build` pass
with no dist/ directories present.

Update scripts/ts-run Node fallback to pass --conditions bun to tsx,
so the same source-resolution behaviour works without bun installed.

Update docs/slides demo scripts to invoke connectors and the engine
via scripts/ts-run instead of node dist/ paths.

External consumers and published packages are unaffected — they do not
set customConditions, so they continue to resolve via "types"/"import".


Committed-By-Agent: claude

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
…251)

* feat(service): pipeline state machine — desired_status + workflow_status

Two orthogonal stored fields replace the live Temporal query for status:

- `desired_status` (active/paused/deleted) — written by API via PATCH
- `workflow_status` (setup/backfill/ready/paused/teardown/error) — written by Temporal via activity
- `status` — derived on read from the two fields (e.g. pausing, resuming, tearing_down)

API changes:
- All lifecycle changes via PATCH { desired_status: "..." }
- Transition validation: deleted is terminal (409)
- Immediate synchronous response (no more live Temporal queries)

Workflow changes:
- Generic update signal (no payload) — workflow reads desired_status from store
- New activities: getDesiredStatus, updateWorkflowStatus
- Workflows write workflow_status at every transition point
- Remove deleteSignal — unify to single update signal
- Remove statusQuery/stateQuery (no longer needed for user-facing status)

Dashboard: derive Pipeline type from generated OpenAPI spec, update API client

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* test(service): update app.test.ts for new status model and single PATCH lifecycle

- Remove checks for /pipelines/{id}/pause and /pipelines/{id}/resume paths
- Check status is a string instead of status.paused boolean
- Use PATCH with desired_status instead of POST /pause and /resume

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(service): replace updateSignal+getDesiredStatus with desiredStatusSignal payload

Signal now carries the desired_status value directly, eliminating the
`updated` flag, `refreshDesiredStatus()` helper, and `getDesiredStatus`
activity from all three pipeline workflows. Callers pass desired_status
explicitly on workflow start and signal only when desired_status changes.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* test(e2e): fix status assertion for flat string model

status is now a flat string (e.g. 'backfilling'), not an object with .phase

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(protocol,service): SourceInputMessage envelope type + pipeline status rename

- protocol: `SourceInput = unknown` → typed `SourceInputMessage` Zod schema
  with wire envelope `{ type: 'source_input', source_input: unknown }`
- service: rename `workflow_status` field on Pipeline → `status`
- service: rename `updateWorkflowStatus` activity → `updatePipelineStatus`
- service: remove derived `status` field and `deriveStatus` helper; the
  stored `status` (WorkflowStatus) is now returned directly from the API
- workflows: internal input queues typed as `SourceInputMessage[]`;
  signal handlers typed accordingly
- `update-workflow-status.ts` → `update-pipeline-status.ts`

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* fix(service): fix pre-existing test failures

- workflow.test.ts: remove orphaned `_desiredStatus = 'active'` in beforeEach
  (variable was never declared — leftover from a previous refactor)
- app.test.ts: update workflow start args assertion to match current two-arg
  form `[pipelineId, { desiredStatus }]`

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* chore: apply linter formatting to pipeline-workflow.ts

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* chore: format

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* chore: regenerate OpenAPI specs

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(workflow): setStatus dedup + sourceState accumulation in activity

- Add setStatus() helper that skips updatePipelineStatus when already at target status
- Rename syncState→sourceState, readComplete→eofCompleted for clarity
- Drop manual spread-merge; drainMessages already accumulates state from initialState
- Simplify while loop: while(true) + break on deleted, remove redundant pre-loop check

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* feat(service): add DELETE /pipelines/{id} endpoint

Restores the DELETE route removed during the state machine refactor.
Signals desired_status='deleted' to the running workflow, then removes
the pipeline from the store and returns {id, deleted: true}.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* fix(dashboard): use x-source header for source_discover (not x-pipeline)

source_discover accepts x-source (JSON-encoded source config), not x-pipeline.
The old x-pipeline with destination: { type: '_' } was both wrong and would
fail schema validation.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude

* refactor(workflow): simplify pipeline workflow + rename syncImmediate → pipelineSync

- Flatten workflow to explicit status transitions (no refreshStatus)
- Single setup block, single teardown block
- liveLoop owns pause/resume status, backfillLoop owns backfill/ready transitions
- Remove CancellationScope, shouldExit, awaitActive, LANE_POLL_MS
- Rename syncImmediate activity to pipelineSync across all files
- Rename sync-immediate.ts → pipeline-sync.ts

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude

* Clarify pipeline workflow state transitions

Refactor the Temporal pipeline workflow around explicit workflow state, root-owned lifecycle control, and co-located lane wait helpers. Rename pipeline setup/teardown activities for consistency, update mocked-activity workflow tests to match the current signal contract, and capture the resulting workflow-writing principles in service docs.

Constraint: Keep external PipelineStatus values unchanged while improving internal workflow state modeling
Rejected: Add deleted to PipelineStatus | broader API/schema change than this refactor
Confidence: medium
Scope-risk: moderate
Directive: Keep desiredStatus as requested intent and workflow state as actual execution state; do not collapse them back into one field without re-evaluating pause/delete semantics
Tested: pnpm --filter @stripe/sync-service build
Tested: pnpm --filter @stripe/sync-service test -- src/__tests__/workflow.test.ts
Not-tested: Full repository test suite
Not-tested: GitHub Actions CI
Committed-By-Agent: codex
Co-authored-by: codex <noreply@openai.com>

* Cover pipeline workflow edge cases with mocked Temporal tests

Expand the mocked-activity workflow coverage to match the current signal contract and actual lane behavior. The suite now verifies live source input delivery, concurrent live/reconcile progress, phase-driven status transitions, and queueing of live events across pause/resume.

Constraint: Keep the tests aligned with observable workflow behavior rather than brittle batch timing assumptions
Rejected: Exact live batch count assertions under concurrent signal delivery | timing-sensitive and not part of the contract
Confidence: high
Scope-risk: narrow
Directive: When the workflow changes, update these tests around behavior and status transitions first; avoid reintroducing stale signal names or activity names
Tested: pnpm --filter @stripe/sync-service test -- src/__tests__/workflow.test.ts
Tested: pnpm --filter @stripe/sync-service test
Tested: pnpm lint
Not-tested: GitHub Actions CI
Committed-By-Agent: codex
Co-authored-by: codex <noreply@openai.com>

* Relax live-drain workflow assertions to match contract

Keep the mocked Temporal coverage focused on observable behavior instead of a specific concurrent batch count. The live-drain case still verifies multiple live batches, at least one batch overlapping an in-flight reconcile slice, and delivery of all queued events.

Constraint: CI timing differs from local Temporal test timing, so exact overlap counts are not stable
Rejected: Keep requiring two overlapping live batches | flaky under CI scheduler variability
Confidence: high
Scope-risk: narrow
Directive: Assert on delivered events and overlap existence, not exact batch segmentation, for concurrent workflow tests
Tested: pnpm --filter @stripe/sync-service test -- src/__tests__/workflow.test.ts
Tested: pnpm --filter @stripe/sync-service test
Tested: pnpm lint
Not-tested: GitHub Actions CI
Committed-By-Agent: codex
Co-authored-by: codex <noreply@openai.com>

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-authored-by: codex <noreply@openai.com>
The backfill logic was consolidated into pipelineWorkflow's
state machine in #251. The standalone backfill workflow is no
longer referenced or needed.

Committed-By-Agent: claude
…read activity

- Split readLoop into liveEventLoop + reconcileLoop + writeLoop
- Single GoogleSheetWorkflowState object replaces scattered booleans
- derivePipelineStatus() + setState() pattern matches pipeline-workflow.ts
- EOF-based completion detection replaces deepEqual
- Rename readGoogleSheetsIntoQueue → readIntoQueue (now generic)
- Move row key/number augmentation from read to write activity
- Treat SourceState as atomic in workflow; activities own merge logic
- Rename RowIndex → RowIndexMap, state → sourceState for clarity

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
…signatures

- Postgres test: use pipelineId arg (not full config), add pipelineStore,
  fix signal names (source_input, desired_status)
- Google Sheets test: use googleSheetPipelineWorkflow, nested destination
  envelope format, pre-populate in-memory pipeline store
- CLI: default --data-dir to ~/.stripe-sync for all commands

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
…sage utility

- Rename RowIndex → RowIndexMap, rowAssignments → rowIndexMap for clarity
- Extract immutable mergeStateMessage() utility in activities/_shared.ts
- Remove redundant state variable aliases in write activity
- Use assignment pattern (state = mergeStateMessage(state, msg)) consistently

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
Follow snake_case API convention for the google sheets destination
type identifier (registry key, type discriminator, nested config key).
Package/directory names unchanged.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
Copilot AI review requested due to automatic review settings April 6, 2026 20:17
@tonyxiao
Copy link
Copy Markdown
Collaborator Author

tonyxiao commented Apr 6, 2026

Closing in favor of #253 which targets v2

@tonyxiao tonyxiao closed this Apr 6, 2026
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a broader “engine + service + dashboard + supabase integration” structure and refactors the Google Sheets Temporal workflow to follow the shared pipeline workflow pattern, with queue-backed reads/writes and improved workflow state handling.

Changes:

  • Refactors the Google Sheets Temporal workflow into 3 parallel loops (live events, reconcile/read, write) with unified workflow state + status derivation.
  • Adds Supabase edge functions for Stripe webhook ingestion and cron-driven sync, plus bundling/tests for edge function code.
  • Adds new engine/service/dashboard apps, CLI entrypoints, Docker targets, and supporting utilities/tests/config.

Reviewed changes

Copilot reviewed 126 out of 640 changed files in this pull request and generated 13 comments.

Show a summary per file
File Description
apps/supabase/src/lib.ts Updates re-export to use ESM .js extension.
apps/supabase/src/index.ts Adds a barrel entrypoint exporting package modules.
apps/supabase/src/edge-functions/stripe-webhook.ts Adds Stripe webhook handler edge function using sync connectors.
apps/supabase/src/edge-functions/stripe-sync.ts Adds cron-like sync edge function with PG-backed state and auth.
apps/supabase/src/edge-functions/deno.json Updates Deno import map to use pg.
apps/supabase/src/edge-function-code.ts Adds raw-bundled edge function source exports for deployment.
apps/supabase/src/tests/supabase.e2e.test.ts Adds Supabase E2E tests covering webhook + backfill flows.
apps/supabase/src/tests/install.e2e.test.ts Adds install/uninstall E2E test harness (not CI-enabled).
apps/supabase/src/tests/edge-runtime.smoke.test.ts Adds dockerized edge-runtime smoke test.
apps/supabase/src/tests/deploy.e2e.test.ts Adds deploy/invoke/delete edge function E2E test (not CI-enabled).
apps/supabase/src/tests/bundle.test.ts Adds tests asserting edge-function code is bundled into dist output.
apps/supabase/package.json Introduces publishable @stripe/sync-integration-supabase package manifest.
apps/supabase/build.mjs Adds esbuild-based bundling pipeline including custom ?raw loader.
apps/service/vitest.integration.config.ts Adds Vitest integration test config.
apps/service/vitest.config.ts Adds Vitest unit test config.
apps/service/tsconfig.json Adds TypeScript build configuration for service app.
apps/service/src/temporal/workflows/pipeline-workflow.ts Adds generic pipeline workflow implementation (live + reconcile).
apps/service/src/temporal/workflows/index.ts Exports workflow entrypoints.
apps/service/src/temporal/workflows/google-sheet-pipeline-workflow.ts Refactors Google Sheets workflow into live/reconcile/write loops.
apps/service/src/temporal/workflows/_shared.ts Centralizes workflow signals + proxied activities types.
apps/service/src/temporal/worker.ts Adds worker factory wiring activities and workflows path.
apps/service/src/temporal/activities/write-google-sheets-from-queue.ts Adds queue consumer → Google Sheets writer activity with row augmentation.
apps/service/src/temporal/activities/update-pipeline-status.ts Adds activity to persist pipeline status updates.
apps/service/src/temporal/activities/read-into-queue.ts Adds generic read-into-Kafka activity for queue-backed workflows.
apps/service/src/temporal/activities/pipeline-teardown.ts Adds pipeline teardown activity wrapper.
apps/service/src/temporal/activities/pipeline-sync.ts Adds pipeline sync activity wrapper that persists connector config updates.
apps/service/src/temporal/activities/pipeline-setup.ts Adds pipeline setup activity wrapper that persists connector config updates.
apps/service/src/temporal/activities/index.ts Registers activities for the Temporal worker.
apps/service/src/temporal/activities/discover-catalog.ts Adds catalog discovery activity (uses source discover + selection).
apps/service/src/temporal/activities/_shared.ts Adds shared activity context (remote engine + Kafka queue utilities).
apps/service/src/logger.ts Adds pino logger with redaction defaults.
apps/service/src/lib/utils.ts Adds retry policy + constants (+ deepEqual utility).
apps/service/src/lib/stores.ts Defines PipelineStore interface.
apps/service/src/lib/stores-memory.ts Adds in-memory PipelineStore implementation for tests.
apps/service/src/lib/stores-fs.ts Adds filesystem-backed PipelineStore implementation.
apps/service/src/lib/createSchemas.ts Adds service-level schema factory (desired/workflow statuses + pipeline schemas).
apps/service/src/index.ts Adds service barrel exports (schemas, app factory, Temporal bindings).
apps/service/src/cli.ts Adds service CLI commands (serve/worker/webhook + OpenAPI-driven commands).
apps/service/src/cli.test.ts Adds unit tests for CLI worker flag plumbing.
apps/service/src/api/index.ts Adds API barrel exports.
apps/service/src/tests/openapi.test.ts Adds OpenAPI schema validation test for service spec.
apps/service/package.json Introduces publishable @stripe/sync-service package manifest.
apps/engine/vitest.config.ts Adds Vitest config for engine app.
apps/engine/tsconfig.json Adds TypeScript build configuration for engine app.
apps/engine/src/serve-command.ts Adds engine serve action with connector discovery flags.
apps/engine/src/logger.ts Adds pino logger with optional pretty transport and redaction.
apps/engine/src/lib/state-store.ts Adds StateStore interface + readonly implementation.
apps/engine/src/lib/source-test.ts Adds test source connector for engine tests.
apps/engine/src/lib/source-exec.ts Adds subprocess-backed source connector wrapper.
apps/engine/src/lib/select-state-store.ts Adds optional destination-colocated state store resolver.
apps/engine/src/lib/resolver.test.ts Adds resolver tests (specifier, bin resolution, caching, fallback).
apps/engine/src/lib/remote-engine.ts Adds HTTP remote-engine implementing Engine interface over OpenAPI+NDJSON.
apps/engine/src/lib/remote-engine.test.ts Adds remote-engine integration tests against in-process HTTP server.
apps/engine/src/lib/pipeline.ts Adds pipeline composition helpers (catalog enforcement, limits, logging, pipe).
apps/engine/src/lib/ndjson.ts Adds NDJSON parsing/streaming utilities.
apps/engine/src/lib/ndjson.test.ts Adds tests covering NDJSON parsing for strings/chunks/streams.
apps/engine/src/lib/index.ts Adds engine library barrel exports.
apps/engine/src/lib/exec.test.ts Adds tests covering exec wrappers using real built connector bins.
apps/engine/src/lib/exec-helpers.ts Adds subprocess helpers (spawn + NDJSON streaming + splitCmd).
apps/engine/src/lib/destination-test.ts Adds test destination connector for engine tests.
apps/engine/src/lib/destination-filter.ts Adds catalog selection filter (fields + primary keys).
apps/engine/src/lib/destination-filter.test.ts Adds tests for catalog selection filtering.
apps/engine/src/lib/destination-exec.ts Adds subprocess-backed destination connector wrapper.
apps/engine/src/lib/default-connectors.ts Adds default bundled connectors (stripe/postgres/google_sheets).
apps/engine/src/lib/createSchemas.ts Adds connector-driven Zod schema factory + OpenAPI naming helpers.
apps/engine/src/index.ts Adds engine public entrypoint exports (library + app factory).
apps/engine/src/cli/supabase.ts Adds CLI commands for Supabase install/uninstall integration.
apps/engine/src/cli/index.ts Adds CLI bin entrypoint.
apps/engine/src/cli/command.ts Adds CLI program builder with connector discovery flags + OpenAPI CLI.
apps/engine/src/api/openapi-utils.ts Adds OpenAPI patch helpers (endpoint table + ControlMessage patching).
apps/engine/src/api/index.ts Adds standalone “engine API” entrypoint server.
apps/engine/src/tests/sync.test.ts Adds dockerized Postgres tests for sync checkpoints/resume.
apps/engine/src/tests/stripe-to-postgres.test.ts Adds stripe-mock → postgres integration tests + selective sync/backfill tests.
apps/engine/src/tests/openapi.test.ts Adds OpenAPI spec validation and schema sanity tests.
apps/engine/src/tests/docker.test.ts Adds Docker image build/run tests for engine distribution.
apps/engine/package.json Introduces publishable @stripe/sync-engine app package manifest.
apps/dashboard/vitest.config.ts Adds Vitest config for dashboard.
apps/dashboard/vite.config.ts Adds Vite config including proxying to engine/service APIs.
apps/dashboard/tsconfig.json Adds TS config with OpenAPI type paths for engine/service.
apps/dashboard/src/pages/PipelineList.tsx Adds pipeline list UI with delete and status badge.
apps/dashboard/src/pages/PipelineDetail.tsx Adds pipeline detail UI with pause/resume/delete and stream table list.
apps/dashboard/src/main.tsx Adds dashboard React entrypoint.
apps/dashboard/src/lib/utils.ts Adds Tailwind class merge helper.
apps/dashboard/src/lib/stream-groups.ts Adds stream grouping/filter heuristics for UI and helpers.
apps/dashboard/src/lib/stream-groups.test.ts Adds tests for stream grouping/filtering.
apps/dashboard/src/lib/api.ts Adds typed API clients for engine/service via openapi-fetch.
apps/dashboard/src/index.css Adds Tailwind entrypoint CSS import.
apps/dashboard/src/components/StreamSelector.tsx Adds grouped stream selector UI with expand/search/select-all.
apps/dashboard/src/components/JsonSchemaForm.tsx Adds JSON-schema-driven config form renderer.
apps/dashboard/src/App.tsx Adds minimal SPA routing and page switching.
apps/dashboard/playwright.config.ts Adds Playwright E2E configuration.
apps/dashboard/package.json Introduces dashboard app manifest and test scripts.
apps/dashboard/index.html Adds dashboard HTML entrypoint.
apps/dashboard/e2e/pipeline-create.test.ts Adds E2E test for full pipeline creation flow.
apps/dashboard/e2e/global-teardown.ts Adds E2E teardown for engine server.
apps/dashboard/e2e/global-setup.ts Adds E2E setup to boot engine API during e2e.
apps/dashboard/Dockerfile Adds dashboard dev Dockerfile.
apps/dashboard/.gitignore Adds dashboard-specific ignore rules.
README.md Removes top-level README content (repo docs likely moved elsewhere).
Dockerfile Replaces single-image Dockerfile with multi-target engine/service images.
AGENTS.md Replaces agent notes with expanded monorepo architecture + conventions.
.vscode/settings.json Updates workspace settings (Deno paths + Ruby LSP config).
.vscode/extensions.json Adds VS Code extension recommendations.
.verdaccio/config.yaml Adds Verdaccio configuration for local npm registry.
.ruby-version Pins Ruby version for infra tooling.
.prettierignore Expands ignore list (docs/out, terraform, generated files).
.npmrc Adds registry indirection for @stripe/* via environment config.
.github/workflows/release.yml Reworks release workflow to “promote” packages by SHA.
.github/workflows/docs.yml Removes docs publish workflow.
.github/workflows/audit.yml Adds scheduled repo audit workflow gated by secret availability.
.githooks/pre-push Adds pre-push hook to ensure OpenAPI specs are generated/checked.
.eslintrc.js Removes root ESLint config (likely migrated elsewhere).
.dockerignore Adjusts dockerignore entries (no longer ignores dist/).
Files not reviewed (2)
  • apps/engine/src/generated/openapi.json: Language not supported
  • apps/service/src/generated/openapi.json: Language not supported

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

rowIndexMap,
sourceState: syncState,
})

Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The workflow ignores result.errors coming back from writeGoogleSheetsFromQueue. Since the activity currently returns errors instead of throwing, write failures can be silently dropped and the workflow will continue as if the queue was handled successfully. Recommended fix: either (mandatory) make the activity throw when errors.length > 0, or (also acceptable) handle result.errors here by failing the workflow / updating pipeline status to error and stopping the loops—otherwise you risk silent data loss.

Suggested change
if (result.errors.length > 0) {
throw new Error(
`writeGoogleSheetsFromQueue failed with ${result.errors.length} error(s): ${result.errors
.map((error) => String(error))
.join('; ')}`,
)
}

Copilot uses AI. Check for mistakes.
Comment on lines +162 to +166
if (result.written > 0) {
syncState = result.state
} else {
await setState({ pendingWrites: false })
}
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The workflow ignores result.errors coming back from writeGoogleSheetsFromQueue. Since the activity currently returns errors instead of throwing, write failures can be silently dropped and the workflow will continue as if the queue was handled successfully. Recommended fix: either (mandatory) make the activity throw when errors.length > 0, or (also acceptable) handle result.errors here by failing the workflow / updating pipeline status to error and stopping the loops—otherwise you risk silent data loss.

Copilot uses AI. Check for mistakes.
const destinationConfig = googleSheetsConfigSchema.parse(config.destination)
const filteredCatalog = augmentGoogleSheetsCatalog(opts.catalog)
const destination = createGoogleSheetsDestination()
const errors: RunResult['errors'] = []
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This activity collects connector error traces into errors but never throws. In Temporal, returning an errors array is easy to ignore (and currently is ignored by the workflow), so failures may not trigger retries and can lead to silent partial sync. Mandatory fix: if errors.length > 0, throw an Error (ideally preserving failure_type / stream in the message) so the activity fails and Temporal retry policy applies.

Copilot uses AI. Check for mistakes.
}
}
}

Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This activity collects connector error traces into errors but never throws. In Temporal, returning an errors array is easy to ignore (and currently is ignored by the workflow), so failures may not trigger retries and can lead to silent partial sync. Mandatory fix: if errors.length > 0, throw an Error (ideally preserving failure_type / stream in the message) so the activity fails and Temporal retry policy applies.

Suggested change
if (errors.length > 0) {
const details = errors
.map((error) => {
const errorRecord = error as Record<string, unknown>
const failureType =
typeof errorRecord.failure_type === 'string' ? errorRecord.failure_type : 'unknown_failure'
const stream = typeof errorRecord.stream === 'string' ? errorRecord.stream : 'unknown_stream'
return `${failureType}:${stream}`
})
.join(', ')
throw new Error(`Google Sheets write failed with connector errors: ${details}`)
}

Copilot uses AI. Check for mistakes.
await consumer.subscribe({ topic, fromBeginning: true })

try {
await new Promise<void>((resolve) => {
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KafkaJS consumer.run() returns a Promise that can reject (e.g., broker errors). Here it’s not awaited or captured, so failures can surface as unhandled promise rejections and the activity may continue/commit offsets in an undefined state. Recommended fix (mandatory for robustness): capture the Promise from consumer.run(), ensure it’s awaited (or stopped/cancelled deterministically on finish()), and propagate errors so the activity fails and retries cleanly.

Copilot uses AI. Check for mistakes.
Comment on lines +1 to +10
// @ts-ignore
import setupCodeRaw from './edge-functions/stripe-setup.ts?raw'
// @ts-ignore
import webhookCodeRaw from './edge-functions/stripe-webhook.ts?raw'
// @ts-ignore
import syncCodeRaw from './edge-functions/stripe-sync.ts?raw'

export const setupFunctionCode = setupCodeRaw as string
export const webhookFunctionCode = webhookCodeRaw as string
export const syncFunctionCode = syncCodeRaw as string
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Relying on @ts-ignore for ?raw imports suppresses type-checking broadly and can mask real issues. Recommended fix: add a narrow ambient module declaration for *.ts?raw (or *?raw) so these imports are typed as string without suppressing TypeScript errors.

Suggested change
// @ts-ignore
import setupCodeRaw from './edge-functions/stripe-setup.ts?raw'
// @ts-ignore
import webhookCodeRaw from './edge-functions/stripe-webhook.ts?raw'
// @ts-ignore
import syncCodeRaw from './edge-functions/stripe-sync.ts?raw'
export const setupFunctionCode = setupCodeRaw as string
export const webhookFunctionCode = webhookCodeRaw as string
export const syncFunctionCode = syncCodeRaw as string
declare module './edge-functions/stripe-setup.ts?raw' {
const content: string
export default content
}
declare module './edge-functions/stripe-webhook.ts?raw' {
const content: string
export default content
}
declare module './edge-functions/stripe-sync.ts?raw' {
const content: string
export default content
}
import setupCodeRaw from './edge-functions/stripe-setup.ts?raw'
import webhookCodeRaw from './edge-functions/stripe-webhook.ts?raw'
import syncCodeRaw from './edge-functions/stripe-sync.ts?raw'
export const setupFunctionCode = setupCodeRaw
export const webhookFunctionCode = webhookCodeRaw
export const syncFunctionCode = syncCodeRaw

Copilot uses AI. Check for mistakes.
const destType = String(pipeline.destination?.type ?? 'unknown')
const phase = pipeline.status ?? 'unknown'
const paused = pipeline.desired_status === 'paused'
const iteration = 0
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iteration is hard-coded to 0, so the iteration UI can never render and the variable is effectively dead code. Recommended fix: either remove this block entirely or wire it to a real value from the service response (if available) so it reflects actual workflow iteration/continue-as-new behavior.

Suggested change
const iteration = 0
const iteration =
typeof (pipeline as Pipeline & { iteration?: number }).iteration === 'number'
? (pipeline as Pipeline & { iteration?: number }).iteration
: undefined

Copilot uses AI. Check for mistakes.
<StatusBadge phase={phase} paused={paused} />
</div>
<p className="text-sm text-gray-500">{pipeline.id}</p>
{iteration > 0 && <p className="text-xs text-gray-400">Iteration {iteration}</p>}
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iteration is hard-coded to 0, so the iteration UI can never render and the variable is effectively dead code. Recommended fix: either remove this block entirely or wire it to a real value from the service response (if available) so it reflects actual workflow iteration/continue-as-new behavior.

Suggested change
{iteration > 0 && <p className="text-xs text-gray-400">Iteration {iteration}</p>}

Copilot uses AI. Check for mistakes.
Comment on lines +9 to +13
try {
await context.pipelineStore.update(pipelineId, { status })
} catch {
// Pipeline may have been removed — no-op
}
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Catching and swallowing all errors will also hide operational failures (e.g., filesystem permission issues, transient I/O errors), leaving workflow status stale and making incidents harder to detect. Recommended fix: only suppress the specific 'not found' case (if your PipelineStore distinguishes it), and otherwise log or rethrow so failures are observable.

Copilot uses AI. Check for mistakes.
Comment on lines 1 to 5
db
test
dist
node_modules
.tours
.vscode
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With dist/ no longer excluded, Docker build context can grow significantly (especially in a monorepo), slowing local and CI builds and reducing layer cache efficiency. Recommended fix: re-add dist/ (and any other build outputs) to .dockerignore, since the Dockerfile already builds and copies artifacts from build stages.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants