Skip to content

feat(service): pipeline lifecycle state machine and workflow cleanup#251

Merged
tonyxiao merged 16 commits intov2from
pipeline-state-machine-for-real
Apr 6, 2026
Merged

feat(service): pipeline lifecycle state machine and workflow cleanup#251
tonyxiao merged 16 commits intov2from
pipeline-state-machine-for-real

Conversation

@tonyxiao
Copy link
Copy Markdown
Collaborator

@tonyxiao tonyxiao commented Apr 5, 2026

Summary

This branch turns pipeline lifecycle handling into an explicit state machine across the service API, Temporal workflows, protocol surface, and dashboard usage.

API and status model

  • add desired_status (active | paused | deleted) as the user-controlled lifecycle field
  • keep workflow-controlled execution status in the stored pipeline record and derive the user-facing status from desired_status plus workflow state
  • remove dedicated pause/resume style lifecycle endpoints in favor of lifecycle updates via PATCH /pipelines/{id}
  • add DELETE /pipelines/{id} support for pipeline teardown
  • simplify API responses so status comes from the store rather than Temporal queries / ad hoc waiting
  • regenerate OpenAPI outputs for the updated service and engine schemas

Temporal workflows and activities

  • rename workflow activities for clarity: pipelineSetup, pipelineSync, pipelineTeardown
  • refactor pipelineWorkflow around explicit persisted workflow state, explicit lifecycle transitions, and co-located live/reconcile wait helpers
  • clean up the dual-lane workflow control flow for setup, pause, reconcile, continue-as-new, and teardown
  • update the Google Sheets workflow and shared workflow helpers to match the renamed activity surface
  • add update-pipeline-status activity wiring for workflow-driven status persistence

Protocol, engine, dashboard, and connector updates

  • update protocol/service handling around the SourceInputMessage envelope and pipeline status naming
  • adjust engine API tests and schemas to match the new lifecycle/status model
  • fix the dashboard source_discover call to use x-source rather than x-pipeline
  • update Google Sheets destination code/tests and related service Docker / E2E paths affected by the new lifecycle flow

Tests and docs

  • expand mocked Temporal workflow coverage for live input delivery, concurrent live/reconcile behavior, pause/resume queueing, teardown transitions, and status progression
  • make the live-drain assertions contract-based instead of timing-fragile under CI scheduling
  • document the workflow design principles and lessons learned in docs/service/pipeline-workflow-dual-lane.md

Test plan

  • pnpm --filter @stripe/sync-service build
  • pnpm --filter @stripe/sync-service test -- src/__tests__/workflow.test.ts
  • pnpm --filter @stripe/sync-service test
  • pnpm lint
  • PR CI green

Copilot AI review requested due to automatic review settings April 5, 2026 18:04
Comment on lines +98 to +99
const cleaned = path
.replace(/\{[^}]+\}/g, '') // remove path params
const rowKey = appends[index]?.rowKey
if (!rowKey) continue
rowAssignments[streamName] ??= {}
rowAssignments[streamName][rowKey] = range.startRow + index
if (!setupDone) {
await setup(pipelineId)
catalog = await discoverCatalog(pipelineId)
setupDone = true
}

let resolver: ConnectorResolver
beforeAll(async () => {
Comment on lines +8 to +15
import {
configSchema as googleSheetsConfigSchema,
createDestination as createGoogleSheetsDestination,
parseGoogleSheetsMetaLog,
ROW_KEY_FIELD,
ROW_NUMBER_FIELD,
serializeRowKey,
} from '@stripe/sync-destination-google-sheets'
import { destinationControlMsg } from '@stripe/sync-protocol'
import type { sheets_v4 } from 'googleapis'
import { google } from 'googleapis'
import { z } from 'zod'
Comment on lines +6 to +14
import {
GOOGLE_SHEETS_META_LOG_PREFIX,
formatGoogleSheetsMetaLog,
parseGoogleSheetsMetaLog,
ROW_KEY_FIELD,
ROW_NUMBER_FIELD,
serializeRowKey,
stripSystemFields,
} from './metadata.js'
serializeRowKey,
stripSystemFields,
} from './metadata.js'
import defaultSpec, { configSchema } from './spec.js'
upsert,
withPgConnectProxy,
} from '@stripe/sync-util-postgres'
import { z } from 'zod'
} from '@stripe/sync-util-postgres'
import { z } from 'zod'
import { buildCreateTableWithSchema, runSqlAdditive } from './schemaProjection.js'
import defaultSpec, { configSchema } from './spec.js'
TeardownOutput,
} from '@stripe/sync-protocol'
import { sourceControlMsg } from '@stripe/sync-protocol'
import { z } from 'zod'
} from '@stripe/sync-protocol'
import { sourceControlMsg } from '@stripe/sync-protocol'
import { z } from 'zod'
import defaultSpec, { configSchema } from './spec.js'
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a pipeline lifecycle state machine built around persisted desired_status (user intent) and workflow_status (workflow execution), removing prior reliance on Temporal query endpoints and enabling immediate, synchronous API responses.

Changes:

  • Add desired_status / workflow_status fields and a deriveStatus() helper to compute user-facing status on reads.
  • Rework Temporal workflows/activities to persist workflow transitions via updateWorkflowStatus and react to updates via signals.
  • Expand the monorepo tooling/apps: new Supabase integration edge functions, dashboard app, visualizer app, and updated Docker/GitHub workflows.

Reviewed changes

Copilot reviewed 140 out of 641 changed files in this pull request and generated 10 comments.

Show a summary per file
File Description
apps/visualizer/postcss.config.mjs Tailwind PostCSS config for the visualizer app
apps/visualizer/package.json Adds a standalone Next.js visualizer package
apps/visualizer/next.config.ts Configures static export + wasm handling for visualizer
apps/visualizer/README.md Documents visualizer workflow and generated artifacts
apps/visualizer/.gitignore Ignores Next/Vercel outputs for visualizer
apps/supabase/vitest.integration.config.ts Adds Vitest integration test config segregation
apps/supabase/vitest.e2e.config.ts Adds Vitest E2E config with longer timeout
apps/supabase/vitest.config.ts Excludes integration/E2E patterns from unit tests
apps/supabase/tsconfig.json Adds TS build config for supabase integration app
apps/supabase/src/supabase.ts Updates installer: deploys consolidated edge functions; adds skipInitialSync
apps/supabase/src/lib.ts Switches exports to ESM .js extension
apps/supabase/src/index.ts Adds barrel exports for supabase package
apps/supabase/src/edge-functions/stripe-webhook.ts Adds Stripe webhook edge function implementation
apps/supabase/src/edge-functions/stripe-sync.ts Adds sync worker edge function with pg state + debounce
apps/supabase/src/edge-functions/deno.json Updates Deno import map for pg
apps/supabase/src/edge-function-code.ts Adds raw bundling exports for edge functions
apps/supabase/src/tests/supabase.e2e.test.ts Adds end-to-end validation against real Supabase project
apps/supabase/src/tests/install.e2e.test.ts Adds install/uninstall E2E test placeholder for CI
apps/supabase/src/tests/edge-runtime.smoke.test.ts Adds smoke test for Supabase edge-runtime container
apps/supabase/src/tests/deploy.e2e.test.ts Adds deploy/invoke/delete edge function E2E test
apps/supabase/src/tests/bundle.test.ts Validates bundled edge-function code presence/shape
apps/supabase/package.json Defines supabase integration package exports/scripts
apps/supabase/build.mjs Builds/bundles edge functions into dist via esbuild
apps/service/vitest.integration.config.ts Adds service integration test config
apps/service/vitest.config.ts Adds service unit test config
apps/service/tsconfig.json Adds TS build config for service app
apps/service/src/temporal/workflows/pipeline-workflow.ts New pipeline workflow implementing desired/workflow status loop
apps/service/src/temporal/workflows/index.ts Exports workflows entrypoint
apps/service/src/temporal/workflows/google-sheet-pipeline-workflow.ts Adds Google Sheets pipeline workflow with pause/delete
apps/service/src/temporal/workflows/backfill-pipeline-workflow.ts Adds backfill-only workflow baseline
apps/service/src/temporal/workflows/_shared.ts Shared signals + activity proxies for workflows
apps/service/src/temporal/worker.ts Adds Temporal worker factory wiring activities
apps/service/src/temporal/activities/write-google-sheets-from-queue.ts Adds Sheets destination queue write activity
apps/service/src/temporal/activities/write-from-queue.ts Adds generic write-from-queue activity
apps/service/src/temporal/activities/update-workflow-status.ts Adds activity to persist workflow_status transitions
apps/service/src/temporal/activities/teardown.ts Adds teardown activity passing through to engine
apps/service/src/temporal/activities/sync-immediate.ts Adds immediate sync activity and config persistence updates
apps/service/src/temporal/activities/setup.ts Adds setup activity that persists connector-emitted config
apps/service/src/temporal/activities/read-into-queue.ts Adds read-to-Kafka activity for read/write mode
apps/service/src/temporal/activities/read-google-sheets-into-queue.ts Adds Google Sheets read-to-queue with row key enrichment
apps/service/src/temporal/activities/index.ts Provides activity factory returning Temporal activities
apps/service/src/temporal/activities/get-desired-status.ts Adds activity to read desired_status from store
apps/service/src/temporal/activities/discover-catalog.ts Adds activity to discover and apply stream selection
apps/service/src/temporal/activities/_shared.ts Adds remote engine + Kafka queue helpers for activities
apps/service/src/logger.ts Adds pino logger with redaction defaults
apps/service/src/lib/utils.ts Adds retry policy constants + deepEqual helper
apps/service/src/lib/stores.ts Defines PipelineStore interface for service persistence
apps/service/src/lib/stores-memory.ts Adds in-memory PipelineStore implementation
apps/service/src/lib/stores-fs.ts Adds file-backed PipelineStore implementation
apps/service/src/lib/createSchemas.ts Adds desired/workflow enums + deriveStatus and pipeline schemas
apps/service/src/index.ts Adds public exports for service consumers
apps/service/src/cli.test.ts Adds test verifying CLI flags thread through to worker creation
apps/service/src/api/index.ts Adds API barrel export
apps/service/src/tests/openapi.test.ts Validates generated service OpenAPI is schema-valid
apps/service/package.json Defines sync-service package, exports, deps, and scripts
apps/engine/vitest.config.ts Adds engine Vitest config exclusions/timeouts
apps/engine/tsconfig.json Adds TS build config for engine app
apps/engine/src/serve-command.ts Adds serve action to start engine HTTP API
apps/engine/src/logger.ts Adds engine logger with pretty transport option
apps/engine/src/lib/state-store.ts Introduces StateStore interface + readonly store
apps/engine/src/lib/source-test.ts Adds test source connector for protocol tests
apps/engine/src/lib/source-exec.ts Wraps subprocess connectors as Source implementations
apps/engine/src/lib/select-state-store.ts Adds optional destination-colocated state store selection
apps/engine/src/lib/resolver.test.ts Adds tests for resolver behavior and caching
apps/engine/src/lib/remote-engine.ts Adds HTTP remote engine client with NDJSON streaming
apps/engine/src/lib/pipeline.ts Adds pipeline middleware helpers (catalog enforcement, limits, etc.)
apps/engine/src/lib/ndjson.ts Adds NDJSON parsing/streaming helpers
apps/engine/src/lib/ndjson.test.ts Adds tests for NDJSON parsing helpers
apps/engine/src/lib/index.ts Adds engine library barrel exports
apps/engine/src/lib/exec.test.ts Adds tests for exec-wrapped connectors
apps/engine/src/lib/exec-helpers.ts Adds subprocess spawn helpers for NDJSON streaming
apps/engine/src/lib/destination-test.ts Adds test destination connector
apps/engine/src/lib/destination-filter.ts Adds field selection pruning for destination catalog
apps/engine/src/lib/destination-filter.test.ts Adds tests for destination catalog field pruning
apps/engine/src/lib/destination-exec.ts Wraps subprocess connectors as Destination implementations
apps/engine/src/lib/default-connectors.ts Defines default in-process connectors for engine
apps/engine/src/lib/createSchemas.ts Builds typed connector schemas for OpenAPI generation
apps/engine/src/index.ts Re-exports engine library + API helpers
apps/engine/src/cli/supabase.ts Adds engine CLI subcommand for Supabase install/uninstall
apps/engine/src/cli/index.ts Adds CLI entrypoint runner
apps/engine/src/cli/command.ts Builds CLI from OpenAPI spec + adds serve/supabase subcommands
apps/engine/src/api/openapi-utils.ts Adds helpers to patch OpenAPI schemas (control message refs)
apps/engine/src/api/index.ts Adds standalone engine API server entrypoint
apps/engine/src/tests/sync.test.ts Adds sync lifecycle tests with dockerized Postgres
apps/engine/src/tests/stripe-to-postgres.test.ts Adds stripe-mock → Postgres integration tests
apps/engine/src/tests/openapi.test.ts Validates engine OpenAPI document + schema shape
apps/engine/src/tests/docker.test.ts Adds Docker image build/run tests
apps/engine/package.json Defines engine package exports, CLI bin, deps, scripts
apps/dashboard/vitest.config.ts Adds dashboard unit test config
apps/dashboard/vite.config.ts Adds Vite config + dev proxy to engine/service APIs
apps/dashboard/tsconfig.json Adds dashboard TS config + OpenAPI type paths
apps/dashboard/src/pages/PipelineList.tsx Adds pipeline list UI
apps/dashboard/src/pages/PipelineDetail.tsx Adds pipeline detail UI with pause/resume/delete actions
apps/dashboard/src/main.tsx Adds dashboard React entrypoint
apps/dashboard/src/lib/utils.ts Adds className utility helper
apps/dashboard/src/lib/stream-groups.ts Adds stream grouping/filtering heuristics for UI
apps/dashboard/src/lib/stream-groups.test.ts Adds tests for stream grouping/filtering
apps/dashboard/src/lib/api.ts Adds engine/service API client helpers
apps/dashboard/src/index.css Adds tailwind import
apps/dashboard/src/components/StreamSelector.tsx Adds grouped stream selector component
apps/dashboard/src/components/JsonSchemaForm.tsx Adds JSON-schema-driven connector config form
apps/dashboard/src/App.tsx Adds minimal client-side router for dashboard
apps/dashboard/playwright.config.ts Adds Playwright config for dashboard E2E
apps/dashboard/package.json Defines dashboard app package + deps/tools
apps/dashboard/index.html Adds Vite HTML entry
apps/dashboard/e2e/pipeline-create.test.ts Adds UI E2E test for pipeline creation flow
apps/dashboard/e2e/global-teardown.ts Adds E2E teardown to stop local engine server
apps/dashboard/e2e/global-setup.ts Adds E2E setup that boots engine server
apps/dashboard/Dockerfile Adds docker dev container for dashboard
apps/dashboard/.gitignore Ignores Vite cache
README.md Removes legacy monorepo README content
Dockerfile Reworks Docker build to multi-target monorepo images
AGENTS.md Updates contributor/agent docs and repo map
.vscode/settings.json Updates Deno enable paths and Ruby LSP config
.vscode/extensions.json Adds VS Code extension recommendations
.verdaccio/config.yaml Adds Verdaccio config for local/private registry
.ruby-version Pins Ruby version
.prettierignore Adds ignores for docs/out, visualizer/out, terraform, generated
.npmrc Configures @stripe registry via STRIPE_NPM_REGISTRY env
.github/workflows/release.yml Changes release workflow to promote a specific SHA to npmjs
.github/workflows/docs.yml Removes docs publish workflow
.github/workflows/audit.yml Adds scheduled/manual repo audit workflow
.githooks/pre-push Adds pre-push check for generated OpenAPI specs
.eslintrc.js Removes legacy ESLint config file
.dockerignore Stops ignoring dist/ to support new Docker build strategy
Files not reviewed (2)
  • apps/engine/src/generated/openapi.json: Language not supported
  • apps/service/src/generated/openapi.json: Language not supported

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

export * from '@stripe/sync-protocol'
export { enforceCatalog, log, filterType, persistState, collect, pipe } from './pipeline.js'
export { createEngine, buildCatalog } from './engine.js'
export { SourceReadOptions, ConnectorInfo, ConnectorListItem } from './engine.js'
Copy link

Copilot AI Apr 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SourceReadOptions, ConnectorInfo, and ConnectorListItem are imported as types elsewhere (e.g. remote-engine.ts uses import type { ... } from './engine.js'). Re-exporting them as values (export { ... }) will fail type-checking/build when they are type-only exports. Change this line to a type-only re-export (export type { ... }) to avoid emitting invalid runtime exports.

Suggested change
export { SourceReadOptions, ConnectorInfo, ConnectorListItem } from './engine.js'
export type { SourceReadOptions, ConnectorInfo, ConnectorListItem } from './engine.js'

Copilot uses AI. Check for mistakes.
): Promise<void> {
let desiredStatus = 'active'
let deleted = false
let updated = false
Copy link

Copilot AI Apr 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The workflow never reads desired_status from the store unless it first receives an updateSignal (sets updated = true). This means pipelines created with desired_status = 'paused' or 'deleted' will still start as 'active' until an update signal arrives, violating the intended state machine semantics. Make the initial desired status read unconditional (e.g., call getDesiredStatus once during startup, or initialize updated = true / remove the if (!updated) return fast-path for the first read).

Suggested change
let updated = false
let updated = true

Copilot uses AI. Check for mistakes.
Comment on lines +45 to +49
async function refreshDesiredStatus() {
if (!updated) return
updated = false
desiredStatus = await getDesiredStatus(pipelineId)
}
Copy link

Copilot AI Apr 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The workflow never reads desired_status from the store unless it first receives an updateSignal (sets updated = true). This means pipelines created with desired_status = 'paused' or 'deleted' will still start as 'active' until an update signal arrives, violating the intended state machine semantics. Make the initial desired status read unconditional (e.g., call getDesiredStatus once during startup, or initialize updated = true / remove the if (!updated) return fast-path for the first read).

Copilot uses AI. Check for mistakes.
pipelineId: string,
opts?: GoogleSheetPipelineWorkflowOpts
): Promise<void> {
let desiredStatus = 'active'
Copy link

Copilot AI Apr 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue as pipeline-workflow.ts: desired_status is only fetched after an updateSignal, so the initial store value is ignored. If a pipeline starts paused/deleted, this workflow will still proceed as active until an update signal occurs. Read desired_status unconditionally at least once before entering the main loops.

Copilot uses AI. Check for mistakes.
Comment on lines +63 to +67
async function refreshDesiredStatus() {
if (!updated) return
updated = false
desiredStatus = await getDesiredStatus(pipelineId)
}
Copy link

Copilot AI Apr 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue as pipeline-workflow.ts: desired_status is only fetched after an updateSignal, so the initial store value is ignored. If a pipeline starts paused/deleted, this workflow will still proceed as active until an update signal occurs. Read desired_status unconditionally at least once before entering the main loops.

Copilot uses AI. Check for mistakes.
// /source_discover streams NDJSON — read line-by-line and find the catalog message
const response = await fetch('/api/engine/source_discover', {
method: 'POST',
headers: { 'x-pipeline': JSON.stringify({ source, destination: { type: '_' } }) },
Copy link

Copilot AI Apr 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/source_discover is invoked here with an x-pipeline header, but the engine client (createRemoteEngine) and typical OpenAPI shape indicate /source_discover expects an x-source header (or at least not a pipeline header). As written, discovery will likely fail or return a 4xx/5xx once the engine enforces headers. Align this request with the engine endpoint contract (send the expected header and shape).

Suggested change
headers: { 'x-pipeline': JSON.stringify({ source, destination: { type: '_' } }) },
headers: { 'x-source': JSON.stringify(source) },

Copilot uses AI. Check for mistakes.
Comment on lines +87 to +101
export async function pausePipeline(id: string): Promise<Pipeline> {
const { data, error, response } = await service.POST('/pipelines/{id}/pause', {
params: { path: { id } },
})
if (error) throw new Error(`POST /pipelines/${id}/pause: ${response.status}`)
return data as Pipeline
}

export async function resumePipeline(id: string): Promise<Pipeline> {
const { data, error, response } = await service.POST('/pipelines/{id}/resume', {
params: { path: { id } },
})
if (error) throw new Error(`POST /pipelines/${id}/resume: ${response.status}`)
return data as Pipeline
}
Copy link

Copilot AI Apr 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description states dedicated pause/resume/delete endpoints are removed in favor of PATCH { desired_status }, but the dashboard client still calls POST /pipelines/{id}/pause and /resume. This will break the UI once the service endpoints are removed/renamed. Update the dashboard to use the new PATCH-based lifecycle API (and update the OpenAPI types accordingly).

Copilot uses AI. Check for mistakes.
Comment on lines +43 to +45
const stripeKey = Deno.env.get('STRIPE_SECRET_KEY')!
const webhookSecret = Deno.env.get('STRIPE_WEBHOOK_SECRET')!

Copy link

Copilot AI Apr 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These non-null assertions will cause the edge function to throw (500 with an opaque runtime error) when the env vars are not set/misconfigured. Prefer explicit validation and return a structured 500 JSON error (similar to the SUPABASE_DB_URL not set branch) so misconfiguration is diagnosable and doesn't surface as an unhandled exception.

Suggested change
const stripeKey = Deno.env.get('STRIPE_SECRET_KEY')!
const webhookSecret = Deno.env.get('STRIPE_WEBHOOK_SECRET')!
const stripeKey = Deno.env.get('STRIPE_SECRET_KEY')
if (!stripeKey) {
return jsonResponse({ error: 'STRIPE_SECRET_KEY not set' }, 500)
}
const webhookSecret = Deno.env.get('STRIPE_WEBHOOK_SECRET')
if (!webhookSecret) {
return jsonResponse({ error: 'STRIPE_WEBHOOK_SECRET not set' }, 500)
}

Copilot uses AI. Check for mistakes.
Comment on lines +58 to +79
await new Promise<void>((resolve) => {
let resolved = false
const finish = () => {
if (resolved) return
resolved = true
resolve()
}

consumer.run({
eachMessage: async ({ partition, message }) => {
if (message.value) {
messages.push(JSON.parse(message.value.toString()) as Message)
offsets.set(partition, (BigInt(message.offset) + 1n).toString())
}
if (messages.length >= maxBatch) finish()
},
})

setTimeout(finish, 2000)
})

await consumer.stop()
Copy link

Copilot AI Apr 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The promise returned by consumer.run() is not awaited/handled. If it rejects (e.g., broker disconnect), this can become an unhandled rejection and/or leave the consumer in a bad state. Capture and await the run promise (or attach a .catch(...) that calls finish() and records/logs the error) before proceeding to consumer.stop()/commitOffsets().

Suggested change
await new Promise<void>((resolve) => {
let resolved = false
const finish = () => {
if (resolved) return
resolved = true
resolve()
}
consumer.run({
eachMessage: async ({ partition, message }) => {
if (message.value) {
messages.push(JSON.parse(message.value.toString()) as Message)
offsets.set(partition, (BigInt(message.offset) + 1n).toString())
}
if (messages.length >= maxBatch) finish()
},
})
setTimeout(finish, 2000)
})
await consumer.stop()
let runPromise: Promise<void>
await new Promise<void>((resolve, reject) => {
let resolved = false
let timer: ReturnType<typeof setTimeout>
const finish = () => {
if (resolved) return
resolved = true
clearTimeout(timer)
resolve()
}
runPromise = consumer
.run({
eachMessage: async ({ partition, message }) => {
if (message.value) {
messages.push(JSON.parse(message.value.toString()) as Message)
offsets.set(partition, (BigInt(message.offset) + 1n).toString())
}
if (messages.length >= maxBatch) finish()
},
})
.catch((error) => {
clearTimeout(timer)
if (resolved) throw error
resolved = true
reject(error)
throw error
})
timer = setTimeout(finish, 2000)
})
await consumer.stop()
await runPromise!

Copilot uses AI. Check for mistakes.
DEFAULT_SYNC_OBJECTS,
} from '@stripe/sync-source-stripe'
import destinationPostgres, { type Config as DestConfig } from '@stripe/sync-destination-postgres'
import pg from 'npm:pg@8'
Copy link

Copilot AI Apr 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

apps/supabase/src/edge-functions/deno.json defines an import map entry for pg, but this function imports npm:pg@8 directly. To keep dependency resolution consistent (and make future upgrades centralized), prefer importing via the import map (e.g., import pg from 'pg') or remove the import map entry if it's no longer intended to be used.

Suggested change
import pg from 'npm:pg@8'
import pg from 'pg'

Copilot uses AI. Check for mistakes.
@tonyxiao tonyxiao changed the base branch from main to v2 April 5, 2026 18:07
@tonyxiao tonyxiao force-pushed the pipeline-state-machine-for-real branch from 240641e to 7e5721d Compare April 5, 2026 19:22
Two orthogonal stored fields replace the live Temporal query for status:

- `desired_status` (active/paused/deleted) — written by API via PATCH
- `workflow_status` (setup/backfill/ready/paused/teardown/error) — written by Temporal via activity
- `status` — derived on read from the two fields (e.g. pausing, resuming, tearing_down)

API changes:
- All lifecycle changes via PATCH { desired_status: "..." }
- Transition validation: deleted is terminal (409)
- Immediate synchronous response (no more live Temporal queries)

Workflow changes:
- Generic update signal (no payload) — workflow reads desired_status from store
- New activities: getDesiredStatus, updateWorkflowStatus
- Workflows write workflow_status at every transition point
- Remove deleteSignal — unify to single update signal
- Remove statusQuery/stateQuery (no longer needed for user-facing status)

Dashboard: derive Pipeline type from generated OpenAPI spec, update API client

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
@tonyxiao tonyxiao force-pushed the pipeline-state-machine-for-real branch from 16959f8 to 04b5cf4 Compare April 5, 2026 19:29
tonyxiao and others added 15 commits April 5, 2026 12:39
…CH lifecycle

- Remove checks for /pipelines/{id}/pause and /pipelines/{id}/resume paths
- Check status is a string instead of status.paused boolean
- Use PATCH with desired_status instead of POST /pause and /resume

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
…StatusSignal payload

Signal now carries the desired_status value directly, eliminating the
`updated` flag, `refreshDesiredStatus()` helper, and `getDesiredStatus`
activity from all three pipeline workflows. Callers pass desired_status
explicitly on workflow start and signal only when desired_status changes.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
status is now a flat string (e.g. 'backfilling'), not an object with .phase

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
…ne status rename

- protocol: `SourceInput = unknown` → typed `SourceInputMessage` Zod schema
  with wire envelope `{ type: 'source_input', source_input: unknown }`
- service: rename `workflow_status` field on Pipeline → `status`
- service: rename `updateWorkflowStatus` activity → `updatePipelineStatus`
- service: remove derived `status` field and `deriveStatus` helper; the
  stored `status` (WorkflowStatus) is now returned directly from the API
- workflows: internal input queues typed as `SourceInputMessage[]`;
  signal handlers typed accordingly
- `update-workflow-status.ts` → `update-pipeline-status.ts`

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
- workflow.test.ts: remove orphaned `_desiredStatus = 'active'` in beforeEach
  (variable was never declared — leftover from a previous refactor)
- app.test.ts: update workflow start args assertion to match current two-arg
  form `[pipelineId, { desiredStatus }]`

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
…ivity

- Add setStatus() helper that skips updatePipelineStatus when already at target status
- Rename syncState→sourceState, readComplete→eofCompleted for clarity
- Drop manual spread-merge; drainMessages already accumulates state from initialState
- Simplify while loop: while(true) + break on deleted, remove redundant pre-loop check

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
Restores the DELETE route removed during the state machine refactor.
Signals desired_status='deleted' to the running workflow, then removes
the pipeline from the store and returns {id, deleted: true}.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
source_discover accepts x-source (JSON-encoded source config), not x-pipeline.
The old x-pipeline with destination: { type: '_' } was both wrong and would
fail schema validation.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
… → pipelineSync

- Flatten workflow to explicit status transitions (no refreshStatus)
- Single setup block, single teardown block
- liveLoop owns pause/resume status, backfillLoop owns backfill/ready transitions
- Remove CancellationScope, shouldExit, awaitActive, LANE_POLL_MS
- Rename syncImmediate activity to pipelineSync across all files
- Rename sync-immediate.ts → pipeline-sync.ts

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
Refactor the Temporal pipeline workflow around explicit workflow state, root-owned lifecycle control, and co-located lane wait helpers. Rename pipeline setup/teardown activities for consistency, update mocked-activity workflow tests to match the current signal contract, and capture the resulting workflow-writing principles in service docs.

Constraint: Keep external PipelineStatus values unchanged while improving internal workflow state modeling
Rejected: Add deleted to PipelineStatus | broader API/schema change than this refactor
Confidence: medium
Scope-risk: moderate
Directive: Keep desiredStatus as requested intent and workflow state as actual execution state; do not collapse them back into one field without re-evaluating pause/delete semantics
Tested: pnpm --filter @stripe/sync-service build
Tested: pnpm --filter @stripe/sync-service test -- src/__tests__/workflow.test.ts
Not-tested: Full repository test suite
Not-tested: GitHub Actions CI
Committed-By-Agent: codex
Co-authored-by: codex <noreply@openai.com>
Expand the mocked-activity workflow coverage to match the current signal contract and actual lane behavior. The suite now verifies live source input delivery, concurrent live/reconcile progress, phase-driven status transitions, and queueing of live events across pause/resume.

Constraint: Keep the tests aligned with observable workflow behavior rather than brittle batch timing assumptions
Rejected: Exact live batch count assertions under concurrent signal delivery | timing-sensitive and not part of the contract
Confidence: high
Scope-risk: narrow
Directive: When the workflow changes, update these tests around behavior and status transitions first; avoid reintroducing stale signal names or activity names
Tested: pnpm --filter @stripe/sync-service test -- src/__tests__/workflow.test.ts
Tested: pnpm --filter @stripe/sync-service test
Tested: pnpm lint
Not-tested: GitHub Actions CI
Committed-By-Agent: codex
Co-authored-by: codex <noreply@openai.com>
Keep the mocked Temporal coverage focused on observable behavior instead of a specific concurrent batch count. The live-drain case still verifies multiple live batches, at least one batch overlapping an in-flight reconcile slice, and delivery of all queued events.

Constraint: CI timing differs from local Temporal test timing, so exact overlap counts are not stable
Rejected: Keep requiring two overlapping live batches | flaky under CI scheduler variability
Confidence: high
Scope-risk: narrow
Directive: Assert on delivered events and overlap existence, not exact batch segmentation, for concurrent workflow tests
Tested: pnpm --filter @stripe/sync-service test -- src/__tests__/workflow.test.ts
Tested: pnpm --filter @stripe/sync-service test
Tested: pnpm lint
Not-tested: GitHub Actions CI
Committed-By-Agent: codex
Co-authored-by: codex <noreply@openai.com>
@tonyxiao tonyxiao changed the title feat(service): pipeline state machine — desired_status + workflow_status feat(service): pipeline lifecycle state machine and workflow cleanup Apr 6, 2026
@tonyxiao tonyxiao merged commit 4761810 into v2 Apr 6, 2026
13 checks passed
@tonyxiao tonyxiao deleted the pipeline-state-machine-for-real branch April 6, 2026 14:45
tonyxiao added a commit that referenced this pull request Apr 6, 2026
The backfill logic was consolidated into pipelineWorkflow's
state machine in #251. The standalone backfill workflow is no
longer referenced or needed.

Committed-By-Agent: claude
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants