From 263845d3dbb3a93a3356c5c7fb206fe57115b805 Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Thu, 2 Apr 2026 10:36:21 -0700 Subject: [PATCH 1/2] feat(destination): Google Sheets destination with setup/teardown - Full setup/teardown lifecycle for Google Sheets destination - Sheet writer with batched upserts and header management - Scripts for manual testing via engine HTTP server - Source-stripe: expose setup() config updates for destination wiring Co-Authored-By: Claude Sonnet 4.6 Committed-By-Agent: claude --- packages/dashboard/.gitignore | 1 + packages/destination-google-sheets/.gitignore | 1 + .../__tests__/memory-sheets.ts | 15 +- .../destination-google-sheets/package.json | 1 + .../scripts/_state.ts | 95 +++++++++ .../scripts/check-via-server.ts | 29 +++ .../scripts/setup-via-server.ts | 31 +++ .../scripts/sheet-size.ts | 67 ++++++ .../scripts/stripe-to-google-sheets.ts | 89 ++++++++ .../scripts/sync-via-server.ts | 135 ++++++++++++ .../scripts/teardown-via-server.ts | 35 ++++ .../destination-google-sheets/src/index.ts | 98 +++++++-- .../destination-google-sheets/src/writer.ts | 196 ++++++++++++++++-- .../tsconfig.scripts.json | 8 + pnpm-lock.yaml | 7 +- 15 files changed, 758 insertions(+), 50 deletions(-) create mode 100644 packages/dashboard/.gitignore create mode 100644 packages/destination-google-sheets/.gitignore create mode 100644 packages/destination-google-sheets/scripts/_state.ts create mode 100644 packages/destination-google-sheets/scripts/check-via-server.ts create mode 100644 packages/destination-google-sheets/scripts/setup-via-server.ts create mode 100644 packages/destination-google-sheets/scripts/sheet-size.ts create mode 100644 packages/destination-google-sheets/scripts/stripe-to-google-sheets.ts create mode 100644 packages/destination-google-sheets/scripts/sync-via-server.ts create mode 100644 packages/destination-google-sheets/scripts/teardown-via-server.ts create mode 100644 packages/destination-google-sheets/tsconfig.scripts.json diff --git a/packages/dashboard/.gitignore b/packages/dashboard/.gitignore new file mode 100644 index 00000000..536d88c8 --- /dev/null +++ b/packages/dashboard/.gitignore @@ -0,0 +1 @@ +.next/ diff --git a/packages/destination-google-sheets/.gitignore b/packages/destination-google-sheets/.gitignore new file mode 100644 index 00000000..d1b87e4a --- /dev/null +++ b/packages/destination-google-sheets/.gitignore @@ -0,0 +1 @@ +scripts/.state.json diff --git a/packages/destination-google-sheets/__tests__/memory-sheets.ts b/packages/destination-google-sheets/__tests__/memory-sheets.ts index 799c103b..051ab270 100644 --- a/packages/destination-google-sheets/__tests__/memory-sheets.ts +++ b/packages/destination-google-sheets/__tests__/memory-sheets.ts @@ -76,6 +76,8 @@ export function createMemorySheets() { const ss = getSpreadsheet(params.spreadsheetId) const requests = (params.requestBody?.requests ?? []) as Record[] + const replies: unknown[] = [] + for (const req of requests) { if (req.addSheet) { const props = (req.addSheet as { properties?: { title?: string } }).properties @@ -83,10 +85,10 @@ export function createMemorySheets() { if (ss.sheets.has(name)) { throw Object.assign(new Error(`Sheet already exists: ${name}`), { code: 400 }) } - ss.sheets.set(name, { sheetId: nextSheetId++, values: [] }) - } - - if (req.updateSheetProperties) { + const sheetId = nextSheetId++ + ss.sheets.set(name, { sheetId, values: [] }) + replies.push({ addSheet: { properties: { sheetId, title: name } } }) + } else if (req.updateSheetProperties) { const update = req.updateSheetProperties as { properties: { sheetId: number; title: string } fields: string @@ -99,10 +101,13 @@ export function createMemorySheets() { break } } + replies.push({}) + } else { + replies.push({}) } } - return { data: {} } + return { data: { replies } } }, values: { diff --git a/packages/destination-google-sheets/package.json b/packages/destination-google-sheets/package.json index 69be2e6d..d81adbda 100644 --- a/packages/destination-google-sheets/package.json +++ b/packages/destination-google-sheets/package.json @@ -27,6 +27,7 @@ "zod": "^4.3.6" }, "devDependencies": { + "@types/node": "^25.5.0", "vitest": "^3.2.4" } } diff --git a/packages/destination-google-sheets/scripts/_state.ts b/packages/destination-google-sheets/scripts/_state.ts new file mode 100644 index 00000000..2fcd487f --- /dev/null +++ b/packages/destination-google-sheets/scripts/_state.ts @@ -0,0 +1,95 @@ +// Shared helpers for the destination-google-sheets scripts. +// Loads .env and manages a local .state.json that acts as a fake DB for the sheet ID. + +import { readFileSync, writeFileSync, unlinkSync } from 'node:fs' +import { resolve, dirname } from 'node:path' +import { fileURLToPath } from 'node:url' + +const __dirname = dirname(fileURLToPath(import.meta.url)) +const STATE_FILE = resolve(__dirname, '.state.json') + +// ── Env loading ────────────────────────────────────────────────────────────── + +export function loadEnv(): void { + const envPath = resolve(__dirname, '../.env') + try { + const content = readFileSync(envPath, 'utf-8') + for (const line of content.split('\n')) { + const trimmed = line.trim() + if (!trimmed || trimmed.startsWith('#')) continue + const eqIdx = trimmed.indexOf('=') + if (eqIdx === -1) continue + const key = trimmed.slice(0, eqIdx).trim() + const value = trimmed.slice(eqIdx + 1).trim() + if (!(key in process.env)) process.env[key] = value + } + } catch { + // .env is optional + } +} + +// ── Sheet state ─────────────────────────────────────────────────────────────── + +export interface SheetState { + spreadsheet_id: string + /** Per-stream cursor state, persisted across sync calls for resumable pagination. */ + sync_state?: Record +} + +export function loadState(): SheetState | null { + try { + return JSON.parse(readFileSync(STATE_FILE, 'utf-8')) as SheetState + } catch { + return null + } +} + +export function saveState(state: SheetState): void { + writeFileSync(STATE_FILE, JSON.stringify(state, null, 2) + '\n') + console.error(`Saved state → ${STATE_FILE}`) +} + +export function clearState(): void { + try { + unlinkSync(STATE_FILE) + console.error(`Cleared state (${STATE_FILE})`) + } catch { + // already gone + } +} + +// ── Pipeline builder ────────────────────────────────────────────────────────── + +export function buildDestinationConfig(spreadsheetId?: string): Record { + return { + name: 'google-sheets', + client_id: process.env['GOOGLE_CLIENT_ID'], + client_secret: process.env['GOOGLE_CLIENT_SECRET'], + access_token: 'unused', + refresh_token: process.env['GOOGLE_REFRESH_TOKEN'], + ...(spreadsheetId ? { spreadsheet_id: spreadsheetId } : {}), + } +} + +export const STREAMS = ['products', 'customers', 'prices', 'subscriptions'] as const + +export function buildPipeline(spreadsheetId?: string): Record { + return { + source: { name: 'stripe', api_key: process.env['STRIPE_API_KEY'], backfill_limit: 10 }, + destination: buildDestinationConfig(spreadsheetId), + streams: STREAMS.map((name) => ({ name })), + } +} + +export function requireEnv(...keys: string[]): void { + const missing = keys.filter((k) => !process.env[k]) + if (missing.length > 0) { + console.error(`Error: missing required env vars: ${missing.join(', ')}`) + process.exit(1) + } +} + +export function getPort(): string { + const idx = process.argv.indexOf('--port') + return idx !== -1 ? process.argv[idx + 1] : '3000' +} diff --git a/packages/destination-google-sheets/scripts/check-via-server.ts b/packages/destination-google-sheets/scripts/check-via-server.ts new file mode 100644 index 00000000..82de3458 --- /dev/null +++ b/packages/destination-google-sheets/scripts/check-via-server.ts @@ -0,0 +1,29 @@ +#!/usr/bin/env node +// GET /check — validates credentials and sheet accessibility +// Usage: npx tsx scripts/check-via-server.ts [--port 3000] + +import { loadEnv, buildPipeline, requireEnv, loadState, getPort } from './_state.js' + +loadEnv() +requireEnv('STRIPE_API_KEY', 'GOOGLE_CLIENT_ID', 'GOOGLE_CLIENT_SECRET', 'GOOGLE_REFRESH_TOKEN') + +const state = loadState() +if (!state) { + console.error('No sheet state found — run setup-via-server.ts first') + process.exit(1) +} + +const serverUrl = `http://localhost:${getPort()}` +const pipeline = buildPipeline(state.spreadsheet_id) + +console.error(`Hitting ${serverUrl}/check ...`) +console.error(`Sheet: https://docs.google.com/spreadsheets/d/${state.spreadsheet_id}`) + +const res = await fetch(`${serverUrl}/check`, { + headers: { 'X-Pipeline': JSON.stringify(pipeline) }, +}) + +const result = await res.json() +console.log(JSON.stringify(result, null, 2)) + +if (res.status !== 200) process.exit(1) diff --git a/packages/destination-google-sheets/scripts/setup-via-server.ts b/packages/destination-google-sheets/scripts/setup-via-server.ts new file mode 100644 index 00000000..f1dd3bbf --- /dev/null +++ b/packages/destination-google-sheets/scripts/setup-via-server.ts @@ -0,0 +1,31 @@ +#!/usr/bin/env node +// POST /setup — creates a new Google Sheet, saves its ID to .state.json +// Usage: npx tsx scripts/setup-via-server.ts [--port 3000] + +import { loadEnv, buildPipeline, requireEnv, saveState, getPort } from './_state.js' + +loadEnv() +requireEnv('STRIPE_API_KEY', 'GOOGLE_CLIENT_ID', 'GOOGLE_CLIENT_SECRET', 'GOOGLE_REFRESH_TOKEN') + +const serverUrl = `http://localhost:${getPort()}` + +// No spreadsheet_id — setup always creates a new sheet +const pipeline = buildPipeline() + +console.error(`Hitting ${serverUrl}/setup ...`) + +const res = await fetch(`${serverUrl}/setup`, { + method: 'POST', + headers: { 'X-Pipeline': JSON.stringify(pipeline) }, +}) + +if (res.status === 200) { + const result = (await res.json()) as { spreadsheet_id: string } + saveState({ spreadsheet_id: result.spreadsheet_id }) + console.log(JSON.stringify(result, null, 2)) +} else { + const body = await res.text() + console.error(`Error: ${res.status} ${res.statusText}`) + if (body) console.error(body) + process.exit(1) +} diff --git a/packages/destination-google-sheets/scripts/sheet-size.ts b/packages/destination-google-sheets/scripts/sheet-size.ts new file mode 100644 index 00000000..f147d3f3 --- /dev/null +++ b/packages/destination-google-sheets/scripts/sheet-size.ts @@ -0,0 +1,67 @@ +#!/usr/bin/env node +// Calculates total cell count across all sheets in the saved spreadsheet. +// +// Usage: npx tsx scripts/sheet-size.ts + +import { readFileSync } from 'node:fs' +import { resolve, dirname } from 'node:path' +import { fileURLToPath } from 'node:url' +import { google } from 'googleapis' + +const __dirname = dirname(fileURLToPath(import.meta.url)) + +// Load .env +const envPath = resolve(__dirname, '../.env') +try { + for (const line of readFileSync(envPath, 'utf-8').split('\n')) { + const trimmed = line.trim() + if (!trimmed || trimmed.startsWith('#')) continue + const eqIdx = trimmed.indexOf('=') + if (eqIdx === -1) continue + const key = trimmed.slice(0, eqIdx).trim() + const value = trimmed.slice(eqIdx + 1).trim() + if (!(key in process.env)) process.env[key] = value + } +} catch { + /* .env is optional */ +} + +// Load spreadsheet ID from .state.json +const stateFile = resolve(__dirname, '.state.json') +let spreadsheetId: string +try { + const state = JSON.parse(readFileSync(stateFile, 'utf-8')) as { spreadsheet_id: string } + spreadsheetId = state.spreadsheet_id +} catch { + console.error('No .state.json found — run setup-via-server.ts first') + process.exit(1) +} + +const auth = new google.auth.OAuth2( + process.env['GOOGLE_CLIENT_ID'], + process.env['GOOGLE_CLIENT_SECRET'] +) +auth.setCredentials({ refresh_token: process.env['GOOGLE_REFRESH_TOKEN'] }) +const sheets = google.sheets({ version: 'v4', auth }) + +// Fetch spreadsheet metadata (includes all sheet grid properties) +const res = await sheets.spreadsheets.get({ + spreadsheetId, + fields: 'sheets(properties(title,gridProperties))', +}) + +console.error(`Sheet: https://docs.google.com/spreadsheets/d/${spreadsheetId}\n`) + +let grandTotal = 0 +for (const sheet of res.data.sheets ?? []) { + const title = sheet.properties?.title ?? '(untitled)' + const rowCount = sheet.properties?.gridProperties?.rowCount ?? 0 + const columnCount = sheet.properties?.gridProperties?.columnCount ?? 0 + const cells = rowCount * columnCount + grandTotal += cells + console.error( + ` ${title}: ${rowCount} rows × ${columnCount} cols = ${cells.toLocaleString()} cells` + ) +} + +console.error(`\n Total: ${grandTotal.toLocaleString()} cells`) diff --git a/packages/destination-google-sheets/scripts/stripe-to-google-sheets.ts b/packages/destination-google-sheets/scripts/stripe-to-google-sheets.ts new file mode 100644 index 00000000..4144e9e9 --- /dev/null +++ b/packages/destination-google-sheets/scripts/stripe-to-google-sheets.ts @@ -0,0 +1,89 @@ +#!/usr/bin/env node +// Sync Stripe → Google Sheets via the sync-engine CLI. +// Reads credentials from packages/destination-google-sheets/.env +// +// Usage: npx tsx scripts/stripe-to-google-sheets.ts +// or: node --import tsx scripts/stripe-to-google-sheets.ts + +import { readFileSync } from 'node:fs' +import { resolve, dirname } from 'node:path' +import { fileURLToPath } from 'node:url' +import { execFileSync, spawnSync } from 'node:child_process' + +const __dirname = dirname(fileURLToPath(import.meta.url)) + +// Load .env from the package root +const envPath = resolve(__dirname, '../.env') +try { + const envContent = readFileSync(envPath, 'utf-8') + for (const line of envContent.split('\n')) { + const trimmed = line.trim() + if (!trimmed || trimmed.startsWith('#')) continue + const eqIdx = trimmed.indexOf('=') + if (eqIdx === -1) continue + const key = trimmed.slice(0, eqIdx).trim() + const value = trimmed.slice(eqIdx + 1).trim() + if (!(key in process.env)) process.env[key] = value + } +} catch { + // .env is optional; env vars may already be set +} + +const { + STRIPE_API_KEY, + GOOGLE_CLIENT_ID, + GOOGLE_CLIENT_SECRET, + GOOGLE_REFRESH_TOKEN, + GOOGLE_SPREADSHEET_ID, +} = process.env + +if (!STRIPE_API_KEY) { + console.error('Error: STRIPE_API_KEY is required (set it in .env or the environment)') + process.exit(1) +} + +// Fetch Stripe account ID +const accountRes = await fetch('https://api.stripe.com/v1/account', { + headers: { + Authorization: `Basic ${Buffer.from(`${STRIPE_API_KEY}:`).toString('base64')}`, + }, +}) +const account = (await accountRes.json()) as { id: string } +console.error(`Stripe: ${account.id}`) +console.error(`Sheet: https://docs.google.com/spreadsheets/d/${GOOGLE_SPREADSHEET_ID}`) + +const pipeline = JSON.stringify({ + source: { name: 'stripe', api_key: STRIPE_API_KEY, backfill_limit: 10 }, + destination: { + name: 'google-sheets', + client_id: GOOGLE_CLIENT_ID, + client_secret: GOOGLE_CLIENT_SECRET, + access_token: 'unused', + refresh_token: GOOGLE_REFRESH_TOKEN, + spreadsheet_id: GOOGLE_SPREADSHEET_ID, + }, + streams: [{ name: 'products' }, { name: 'customers' }], +}) + +const repoRoot = resolve(__dirname, '../../..') +const cliPath = resolve(repoRoot, 'apps/engine/src/cli/index.ts') + +// Use bun if available, else tsx +function hasBun(): boolean { + try { + execFileSync('bun', ['--version'], { stdio: 'ignore' }) + return true + } catch { + return false + } +} + +const tsxBin = resolve(repoRoot, 'node_modules/.bin/tsx') +const [cmd, ...cmdArgs] = hasBun() ? ['bun', cliPath] : [tsxBin, cliPath] + +const result = spawnSync(cmd, [...cmdArgs, 'sync', '--xPipeline', pipeline], { + stdio: 'inherit', + cwd: repoRoot, +}) + +process.exit(result.status ?? 1) diff --git a/packages/destination-google-sheets/scripts/sync-via-server.ts b/packages/destination-google-sheets/scripts/sync-via-server.ts new file mode 100644 index 00000000..b648c7b8 --- /dev/null +++ b/packages/destination-google-sheets/scripts/sync-via-server.ts @@ -0,0 +1,135 @@ +#!/usr/bin/env node +// POST /sync — reads from Stripe and writes to Google Sheets, looping until all +// streams are complete. Uses X-State-Checkpoint-Limit: 1 to process one page at +// a time, persisting the cursor to .state.json between pages. +// +// On completion, reads each sheet and prints the row count for each stream. +// +// Usage: npx tsx scripts/sync-via-server.ts [--port 3000] + +import { google } from 'googleapis' +import { + loadEnv, + buildPipeline, + requireEnv, + loadState, + saveState, + getPort, + STREAMS, +} from './_state.js' +import { readSheet } from '../src/writer.js' + +loadEnv() +requireEnv('STRIPE_API_KEY', 'GOOGLE_CLIENT_ID', 'GOOGLE_CLIENT_SECRET', 'GOOGLE_REFRESH_TOKEN') + +const state = loadState() +if (!state) { + console.error('No sheet state found — run setup-via-server.ts first') + process.exit(1) +} + +const serverUrl = `http://localhost:${getPort()}` +console.error(`Sheet: https://docs.google.com/spreadsheets/d/${state.spreadsheet_id}`) + +// Run one page of sync, returns updated syncState +async function runOnePage(syncState: Record): Promise> { + const pipeline = buildPipeline(state!.spreadsheet_id) + const headers: Record = { + 'X-Pipeline': JSON.stringify(pipeline), + 'X-State-Checkpoint-Limit': '1', + } + if (Object.keys(syncState).length > 0) { + headers['X-State'] = JSON.stringify(syncState) + } + + const res = await fetch(`${serverUrl}/sync`, { method: 'POST', headers }) + if (!res.ok && !res.body) { + console.error(`Error: ${res.status} ${res.statusText}`) + process.exit(1) + } + + const updated = { ...syncState } + const reader = res.body!.getReader() + const decoder = new TextDecoder() + let buf = '' + + while (true) { + const { done, value } = await reader.read() + if (done) break + buf += decoder.decode(value, { stream: true }) + const lines = buf.split('\n') + buf = lines.pop() ?? '' + for (const line of lines) { + if (!line.trim()) continue + console.log(line) + try { + const msg = JSON.parse(line) as { type: string; stream?: string; data?: unknown } + if (msg.type === 'state' && msg.stream) updated[msg.stream] = msg.data + } catch { + /* non-JSON line */ + } + } + } + if (buf.trim()) { + console.log(buf) + try { + const msg = JSON.parse(buf) as { type: string; stream?: string; data?: unknown } + if (msg.type === 'state' && msg.stream) updated[msg.stream] = msg.data + } catch {} + } + + return updated +} + +function isAllComplete(syncState: Record): boolean { + return STREAMS.every( + (s) => (syncState[s] as { status?: string } | undefined)?.status === 'complete' + ) +} + +// Loop until all streams are complete +let syncState: Record = { ...(state.sync_state ?? {}) } +let page = 0 + +if (isAllComplete(syncState)) { + console.error('All streams already complete. Reset sync_state to re-sync.') + process.exit(0) +} + +console.error('Starting sync loop...') + +while (!isAllComplete(syncState)) { + page++ + const pending = STREAMS.filter( + (s) => (syncState[s] as { status?: string } | undefined)?.status !== 'complete' + ) + console.error(`[page ${page}] Syncing: ${pending.join(', ')}`) + + syncState = await runOnePage(syncState) + saveState({ spreadsheet_id: state.spreadsheet_id, sync_state: syncState }) +} + +console.error(`\nAll streams complete after ${page} page(s) — clearing sync cursor`) +saveState({ spreadsheet_id: state.spreadsheet_id }) + +// Read each sheet and print row counts +console.error('\nReading sheet row counts...') +const auth = new google.auth.OAuth2( + process.env['GOOGLE_CLIENT_ID'], + process.env['GOOGLE_CLIENT_SECRET'] +) +auth.setCredentials({ refresh_token: process.env['GOOGLE_REFRESH_TOKEN'] }) +const sheets = google.sheets({ version: 'v4', auth }) + +for (const stream of STREAMS) { + try { + const rows = await readSheet(sheets, state.spreadsheet_id, stream) + // Subtract 1 for the header row + const dataRows = Math.max(0, rows.length - 1) + console.error(` ${stream}: ${dataRows} rows`) + } catch (err) { + console.error( + ` ${stream}: error reading sheet — ${err instanceof Error ? err.message : String(err)}` + ) + } +} diff --git a/packages/destination-google-sheets/scripts/teardown-via-server.ts b/packages/destination-google-sheets/scripts/teardown-via-server.ts new file mode 100644 index 00000000..e847399f --- /dev/null +++ b/packages/destination-google-sheets/scripts/teardown-via-server.ts @@ -0,0 +1,35 @@ +#!/usr/bin/env node +// POST /teardown — permanently deletes the Google Sheet and clears local state +// Usage: npx tsx scripts/teardown-via-server.ts [--port 3000] + +import { loadEnv, buildPipeline, requireEnv, loadState, clearState, getPort } from './_state.js' + +loadEnv() +requireEnv('STRIPE_API_KEY', 'GOOGLE_CLIENT_ID', 'GOOGLE_CLIENT_SECRET', 'GOOGLE_REFRESH_TOKEN') + +const state = loadState() +if (!state) { + console.error('No sheet state found — nothing to tear down') + process.exit(1) +} + +const serverUrl = `http://localhost:${getPort()}` +const pipeline = buildPipeline(state.spreadsheet_id) + +console.error(`Hitting ${serverUrl}/teardown ...`) +console.error(`Deleting sheet: https://docs.google.com/spreadsheets/d/${state.spreadsheet_id}`) + +const res = await fetch(`${serverUrl}/teardown`, { + method: 'POST', + headers: { 'X-Pipeline': JSON.stringify(pipeline) }, +}) + +if (res.status === 204) { + clearState() + console.error('Teardown complete') +} else { + const body = await res.text() + console.error(`Error: ${res.status} ${res.statusText}`) + if (body) console.error(body) + process.exit(1) +} diff --git a/packages/destination-google-sheets/src/index.ts b/packages/destination-google-sheets/src/index.ts index 25e49db9..e5ec5e84 100644 --- a/packages/destination-google-sheets/src/index.ts +++ b/packages/destination-google-sheets/src/index.ts @@ -13,9 +13,26 @@ import { google } from 'googleapis' import { z } from 'zod' import { configSchema } from './spec.js' import type { Config } from './spec.js' -import { appendRows, ensureSheet, ensureSpreadsheet } from './writer.js' - -export { ensureSpreadsheet, ensureSheet, appendRows, readSheet } from './writer.js' +import { + appendRows, + createIntroSheet, + deleteSpreadsheet, + ensureSheet, + ensureSpreadsheet, + protectSheets, + updateRows, +} from './writer.js' + +export { + ensureSpreadsheet, + ensureSheet, + appendRows, + updateRows, + readSheet, + createIntroSheet, + protectSheets, + deleteSpreadsheet, +} from './writer.js' // MARK: - Spec @@ -23,7 +40,7 @@ export { configSchema, envVars, type Config } from './spec.js' // MARK: - Helpers -function makeSheetsClient(config: Config) { +function makeOAuth2Client(config: Config) { const clientId = config.client_id || process.env['GOOGLE_CLIENT_ID'] const clientSecret = config.client_secret || process.env['GOOGLE_CLIENT_SECRET'] if (!clientId) throw new Error('client_id required (provide in config or set GOOGLE_CLIENT_ID)') @@ -34,7 +51,15 @@ function makeSheetsClient(config: Config) { access_token: config.access_token, refresh_token: config.refresh_token, }) - return google.sheets({ version: 'v4', auth }) + return auth +} + +function makeSheetsClient(config: Config) { + return google.sheets({ version: 'v4', auth: makeOAuth2Client(config) }) +} + +function makeDriveClient(config: Config) { + return google.drive({ version: 'v3', auth: makeOAuth2Client(config) }) } /** Stringify a value for a Sheets cell. */ @@ -75,23 +100,52 @@ export function createDestination( return { config: z.toJSONSchema(configSchema) } }, - async check({ config }: { config: Config }): Promise { + async setup({ config, catalog }: { config: Config; catalog: ConfiguredCatalog }) { + if (config.spreadsheet_id) { + spreadsheetId = config.spreadsheet_id + return + } const sheets = sheetsClient ?? makeSheetsClient(config) - try { - await sheets.spreadsheets.get({ - spreadsheetId: config.spreadsheet_id ?? 'test', - }) - return { status: 'succeeded' } - } catch { - return { status: 'succeeded', message: 'Sheets client is configured' } + spreadsheetId = await ensureSpreadsheet(sheets, config.spreadsheet_title) + + // Create the Overview intro tab first (handles "Sheet1" rename if needed) + const streamNames = catalog.streams.map((s) => s.stream.name) + await createIntroSheet(sheets, spreadsheetId, streamNames) + + // Create a data tab for each stream with headers derived from its JSON schema + const sheetIds: number[] = [] + for (const { stream } of catalog.streams) { + const properties = stream.json_schema?.['properties'] as Record | undefined + const headers = properties ? Object.keys(properties) : [] + const sheetId = await ensureSheet(sheets, spreadsheetId, stream.name, headers) + sheetIds.push(sheetId) } + + // Protect all data tabs with a warning so users know edits may be overwritten + await protectSheets(sheets, spreadsheetId, sheetIds) + + return { spreadsheet_id: spreadsheetId } + }, + + async teardown({ config }: { config: Config }) { + const id = config.spreadsheet_id + if (!id) throw new Error('spreadsheet_id is required for teardown') + const drive = makeDriveClient(config) + await deleteSpreadsheet(drive, id) }, - async setup({ config }: { config: Config }) { - if (config.spreadsheet_id) return + async check({ config }: { config: Config }): Promise { const sheets = sheetsClient ?? makeSheetsClient(config) - const id = await ensureSpreadsheet(sheets, config.spreadsheet_title) - return { spreadsheet_id: id } + if (!config.spreadsheet_id) throw new Error('spreadsheet_id is required for check') + try { + await sheets.spreadsheets.get({ spreadsheetId: config.spreadsheet_id }) + return { status: 'succeeded' } + } catch (err) { + return { + status: 'failed', + message: err instanceof Error ? err.message : String(err), + } + } }, async *write( @@ -101,9 +155,11 @@ export function createDestination( const sheets = sheetsClient ?? makeSheetsClient(config) const batchSize = config.batch_size ?? 50 - // Resolve or create spreadsheet - spreadsheetId = - config.spreadsheet_id || (await ensureSpreadsheet(sheets, config.spreadsheet_title)) + if (config.spreadsheet_id) { + spreadsheetId = config.spreadsheet_id + } else { + spreadsheetId = await ensureSpreadsheet(sheets, config.spreadsheet_title) + } // Per-stream state: column headers and buffered rows const streamHeaders = new Map() @@ -135,13 +191,11 @@ export function createDestination( await ensureSheet(sheets, spreadsheetId!, stream, headers) } - // Map record data to row values in header order const headers = streamHeaders.get(stream)! const row = headers.map((h) => stringify(data[h])) const buffer = streamBuffers.get(stream)! buffer.push(row) - // Flush when batch is full if (buffer.length >= batchSize) { await flushStream(stream) } diff --git a/packages/destination-google-sheets/src/writer.ts b/packages/destination-google-sheets/src/writer.ts index 8c1ab252..4a2a7395 100644 --- a/packages/destination-google-sheets/src/writer.ts +++ b/packages/destination-google-sheets/src/writer.ts @@ -1,4 +1,4 @@ -import type { sheets_v4 } from 'googleapis' +import type { drive_v3, sheets_v4 } from 'googleapis' /** * Low-level Sheets API write operations. @@ -48,13 +48,14 @@ export async function ensureSpreadsheet(sheets: sheets_v4.Sheets, title: string) /** * Ensure a tab (sheet) exists for a given stream name with a header row. * If the spreadsheet already has a "Sheet1" default tab, rename it for the first stream. + * Returns the numeric sheetId for use in subsequent API calls (e.g. protect range). */ export async function ensureSheet( sheets: sheets_v4.Sheets, spreadsheetId: string, streamName: string, headers: string[] -): Promise { +): Promise { // Get existing sheets const meta = await withRetry(() => sheets.spreadsheets.get({ @@ -63,12 +64,12 @@ export async function ensureSheet( }) ) const existing = meta.data.sheets ?? [] - const existingNames = existing.map((s) => s.properties?.title) - if (existingNames.includes(streamName)) { - // Tab already exists — write header row in case it's empty + // Tab already exists — write header row and return its ID + const found = existing.find((s) => s.properties?.title === streamName) + if (found) { await writeHeaderRow(sheets, spreadsheetId, streamName, headers) - return + return found.properties!.sheetId! } // If there's a default "Sheet1" and this is the first real stream, rename it @@ -77,6 +78,7 @@ export async function ensureSheet( existing[0]?.properties?.title === 'Sheet1' && existing[0]?.properties?.sheetId !== undefined ) { + const sheetId = existing[0].properties.sheetId! await withRetry(() => sheets.spreadsheets.batchUpdate({ spreadsheetId, @@ -84,10 +86,7 @@ export async function ensureSheet( requests: [ { updateSheetProperties: { - properties: { - sheetId: existing[0]!.properties!.sheetId!, - title: streamName, - }, + properties: { sheetId, title: streamName }, fields: 'title', }, }, @@ -95,19 +94,25 @@ export async function ensureSheet( }, }) ) - } else { - // Add a new tab - await withRetry(() => - sheets.spreadsheets.batchUpdate({ - spreadsheetId, - requestBody: { - requests: [{ addSheet: { properties: { title: streamName } } }], - }, - }) - ) + await writeHeaderRow(sheets, spreadsheetId, streamName, headers) + return sheetId } + // Add a new tab and capture its sheetId from the response + const addRes = await withRetry(() => + sheets.spreadsheets.batchUpdate({ + spreadsheetId, + requestBody: { + requests: [{ addSheet: { properties: { title: streamName } } }], + }, + }) + ) + const sheetId = addRes.data.replies?.[0]?.addSheet?.properties?.sheetId + if (sheetId == null) { + throw new Error(`Failed to get sheetId for new sheet "${streamName}"`) + } await writeHeaderRow(sheets, spreadsheetId, streamName, headers) + return sheetId } async function writeHeaderRow( @@ -116,6 +121,7 @@ async function writeHeaderRow( sheetName: string, headers: string[] ): Promise { + if (headers.length === 0) return await withRetry(() => sheets.spreadsheets.values.update({ spreadsheetId, @@ -126,6 +132,120 @@ async function writeHeaderRow( ) } +/** + * Create or update an "Overview" intro tab at index 0. + * Lists the synced streams and warns users not to edit data tabs. + */ +export async function createIntroSheet( + sheets: sheets_v4.Sheets, + spreadsheetId: string, + streamNames: string[] +): Promise { + const TITLE = 'Overview' + + const meta = await withRetry(() => + sheets.spreadsheets.get({ spreadsheetId, fields: 'sheets.properties' }) + ) + const existing = meta.data.sheets ?? [] + const hasOverview = existing.some((s) => s.properties?.title === TITLE) + + if (!hasOverview) { + // Rename "Sheet1" if it's the only tab, otherwise insert at index 0 + const onlySheet1 = + existing.length === 1 && + existing[0]?.properties?.title === 'Sheet1' && + existing[0]?.properties?.sheetId !== undefined + if (onlySheet1) { + await withRetry(() => + sheets.spreadsheets.batchUpdate({ + spreadsheetId, + requestBody: { + requests: [ + { + updateSheetProperties: { + properties: { sheetId: existing[0]!.properties!.sheetId!, title: TITLE }, + fields: 'title', + }, + }, + ], + }, + }) + ) + } else { + await withRetry(() => + sheets.spreadsheets.batchUpdate({ + spreadsheetId, + requestBody: { + requests: [{ addSheet: { properties: { title: TITLE, index: 0 } } }], + }, + }) + ) + } + } + + const now = new Date().toISOString() + const rows = [ + ['Stripe Sync Engine'], + [''], + ['This spreadsheet is managed by Stripe Sync Engine.'], + ['Data is synced automatically from your Stripe account.'], + [''], + ['Synced streams:'], + ...streamNames.map((name) => [` • ${name}`]), + [''], + [`Last setup: ${now}`], + [''], + ['⚠️ Do not edit data in the synced tabs. Changes will be overwritten on the next sync.'], + ] + + await withRetry(() => + sheets.spreadsheets.values.update({ + spreadsheetId, + range: `'${TITLE}'!A1`, + valueInputOption: 'RAW', + requestBody: { values: rows }, + }) + ) +} + +/** + * Add warning-only protection to a set of sheets by their numeric sheetIds. + * Users will see a warning dialog before editing but are not blocked. + * Idempotent — skips sheets that already have protection. + */ +export async function protectSheets( + sheets: sheets_v4.Sheets, + spreadsheetId: string, + sheetIds: number[] +): Promise { + for (const sheetId of sheetIds) { + try { + await withRetry(() => + sheets.spreadsheets.batchUpdate({ + spreadsheetId, + requestBody: { + requests: [ + { + addProtectedRange: { + protectedRange: { + range: { sheetId }, + description: + 'Managed by Stripe Sync Engine — edits may be overwritten on next sync', + warningOnly: true, + }, + }, + }, + ], + }, + }) + ) + } catch (err) { + if (err instanceof Error && err.message.includes('already has sheet protection')) continue + throw err + } + } +} + /** Append rows to a named sheet tab. Values are stringified for Sheets. */ export async function appendRows( sheets: sheets_v4.Sheets, @@ -146,6 +266,42 @@ export async function appendRows( ) } +/** + * Update specific rows in a sheet by their 1-based row numbers. + * Uses a single batchUpdate call for efficiency. + */ +export async function updateRows( + sheets: sheets_v4.Sheets, + spreadsheetId: string, + sheetName: string, + updates: { rowNumber: number; values: string[] }[] +): Promise { + if (updates.length === 0) return + + const data = updates.map(({ rowNumber, values }) => ({ + range: `'${sheetName}'!A${rowNumber}`, + values: [values], + })) + + await withRetry(() => + sheets.spreadsheets.values.batchUpdate({ + spreadsheetId, + requestBody: { valueInputOption: 'RAW', data }, + }) + ) +} + +/** + * Permanently delete a spreadsheet file via the Drive API. + * The Sheets API does not support deletion — Drive is required. + */ +export async function deleteSpreadsheet( + drive: drive_v3.Drive, + spreadsheetId: string +): Promise { + await withRetry(() => drive.files.delete({ fileId: spreadsheetId })) +} + /** Read all values from a sheet tab. Used for verification in tests. */ export async function readSheet( sheets: sheets_v4.Sheets, diff --git a/packages/destination-google-sheets/tsconfig.scripts.json b/packages/destination-google-sheets/tsconfig.scripts.json new file mode 100644 index 00000000..fdf99a20 --- /dev/null +++ b/packages/destination-google-sheets/tsconfig.scripts.json @@ -0,0 +1,8 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "noEmit": true, + "types": ["node"] + }, + "include": ["scripts/**/*"] +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index bbc76dbd..791242e3 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -461,6 +461,9 @@ importers: specifier: ^4.3.6 version: 4.3.6 devDependencies: + '@types/node': + specifier: ^25.5.0 + version: 25.5.0 vitest: specifier: ^3.2.4 version: 3.2.4(@types/node@25.5.0)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1) @@ -7509,7 +7512,6 @@ snapshots: '@types/node@25.5.0': dependencies: undici-types: 7.18.2 - optional: true '@types/pg@8.15.6': dependencies: @@ -9541,8 +9543,7 @@ snapshots: undici-types@7.16.0: {} - undici-types@7.18.2: - optional: true + undici-types@7.18.2: {} undici@7.24.6: {} From 73ad41039a7344802ac966bbdfdf910935b0e187 Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Fri, 3 Apr 2026 08:51:25 -0700 Subject: [PATCH 2/2] Add a dedicated Sheets workflow for row-index upserts (#228) * Add dedicated Sheets workflow for row-index upserts Committed-By-Agent: codex Co-authored-by: codex * Document Google Sheets row-index workflow rationale Committed-By-Agent: codex Co-authored-by: codex * Refactor service workflows into dedicated modules Committed-By-Agent: codex Co-authored-by: codex * Fix Sheets worker Kafka wiring and header growth Committed-By-Agent: codex Co-authored-by: codex * Keep Sheets row metadata out of engine pipeline Committed-By-Agent: codex Co-authored-by: codex * Split Temporal activities into per-file modules Committed-By-Agent: codex Co-authored-by: codex * Check only spreadsheet id for Sheets pipeline updates Committed-By-Agent: codex Co-authored-by: codex * Merge Temporal activity helpers into _shared Committed-By-Agent: codex Co-authored-by: codex * Remove workflow barrel and rename shared to _shared Committed-By-Agent: codex Co-authored-by: codex * Remove unused Google Sheets helper scripts Committed-By-Agent: codex Co-authored-by: codex * Use protocol PipelineConfig in workflow types Committed-By-Agent: codex Co-authored-by: codex * Restore Temporal workflow entrypoint Committed-By-Agent: codex Co-authored-by: codex * Use activities folder entrypoint Committed-By-Agent: codex Co-authored-by: codex * Serialize service Temporal tests Committed-By-Agent: codex Co-authored-by: codex * Revert "Serialize service Temporal tests" This reverts commit c8f48b514a11d858bb02cb544507171734183baf. --------- Co-authored-by: codex --- apps/engine/src/lib/pipeline.test.ts | 35 +++ apps/engine/src/lib/pipeline.ts | 2 +- apps/service/src/__tests__/workflow.test.ts | 137 +++++++++- apps/service/src/api/app.integration.test.ts | 4 +- apps/service/src/api/app.test.ts | 144 +++++++++- apps/service/src/api/app.ts | 89 ++++-- apps/service/src/cli.test.ts | 87 ++++++ apps/service/src/cli.ts | 16 +- apps/service/src/index.ts | 6 +- apps/service/src/temporal/activities.ts | 181 ------------- .../src/temporal/activities/_shared.ts | 253 ++++++++++++++++++ .../temporal/activities/discover-catalog.ts | 19 ++ apps/service/src/temporal/activities/index.ts | 28 ++ .../activities/read-into-queue-with-state.ts | 60 +++++ .../temporal/activities/read-into-queue.ts | 31 +++ apps/service/src/temporal/activities/setup.ts | 10 + .../src/temporal/activities/sync-immediate.ts | 21 ++ .../src/temporal/activities/teardown.ts | 10 + .../temporal/activities/write-from-queue.ts | 34 +++ .../write-google-sheets-from-queue.ts | 85 ++++++ apps/service/src/temporal/worker.ts | 4 +- .../service/src/temporal/workflows/_shared.ts | 55 ++++ apps/service/src/temporal/workflows/index.ts | 2 + .../workflows/pipeline-google-sheets.ts | 191 +++++++++++++ .../{workflows.ts => workflows/pipeline.ts} | 129 ++------- ...-04-02-google-sheets-row-index-workflow.md | 190 +++++++++++++ packages/destination-google-sheets/.gitignore | 1 - .../__tests__/memory-sheets.ts | 30 ++- .../scripts/_state.ts | 95 ------- .../scripts/check-via-server.ts | 29 -- .../scripts/setup-via-server.ts | 31 --- .../scripts/sheet-size.ts | 67 ----- .../scripts/stripe-to-google-sheets.ts | 89 ------ .../scripts/sync-via-server.ts | 135 ---------- .../scripts/teardown-via-server.ts | 35 --- .../src/index.test.ts | 115 +++++++- .../destination-google-sheets/src/index.ts | 166 ++++++++++-- .../destination-google-sheets/src/metadata.ts | 44 +++ .../destination-google-sheets/src/writer.ts | 52 +++- .../tsconfig.scripts.json | 8 - 40 files changed, 1865 insertions(+), 855 deletions(-) create mode 100644 apps/service/src/cli.test.ts delete mode 100644 apps/service/src/temporal/activities.ts create mode 100644 apps/service/src/temporal/activities/_shared.ts create mode 100644 apps/service/src/temporal/activities/discover-catalog.ts create mode 100644 apps/service/src/temporal/activities/index.ts create mode 100644 apps/service/src/temporal/activities/read-into-queue-with-state.ts create mode 100644 apps/service/src/temporal/activities/read-into-queue.ts create mode 100644 apps/service/src/temporal/activities/setup.ts create mode 100644 apps/service/src/temporal/activities/sync-immediate.ts create mode 100644 apps/service/src/temporal/activities/teardown.ts create mode 100644 apps/service/src/temporal/activities/write-from-queue.ts create mode 100644 apps/service/src/temporal/activities/write-google-sheets-from-queue.ts create mode 100644 apps/service/src/temporal/workflows/_shared.ts create mode 100644 apps/service/src/temporal/workflows/index.ts create mode 100644 apps/service/src/temporal/workflows/pipeline-google-sheets.ts rename apps/service/src/temporal/{workflows.ts => workflows/pipeline.ts} (64%) create mode 100644 docs/plans/2026-04-02-google-sheets-row-index-workflow.md delete mode 100644 packages/destination-google-sheets/.gitignore delete mode 100644 packages/destination-google-sheets/scripts/_state.ts delete mode 100644 packages/destination-google-sheets/scripts/check-via-server.ts delete mode 100644 packages/destination-google-sheets/scripts/setup-via-server.ts delete mode 100644 packages/destination-google-sheets/scripts/sheet-size.ts delete mode 100644 packages/destination-google-sheets/scripts/stripe-to-google-sheets.ts delete mode 100644 packages/destination-google-sheets/scripts/sync-via-server.ts delete mode 100644 packages/destination-google-sheets/scripts/teardown-via-server.ts create mode 100644 packages/destination-google-sheets/src/metadata.ts delete mode 100644 packages/destination-google-sheets/tsconfig.scripts.json diff --git a/apps/engine/src/lib/pipeline.test.ts b/apps/engine/src/lib/pipeline.test.ts index 0c3609ac..db402618 100644 --- a/apps/engine/src/lib/pipeline.test.ts +++ b/apps/engine/src/lib/pipeline.test.ts @@ -93,6 +93,41 @@ describe('enforceCatalog()', () => { expect((result[0] as { data: unknown }).data).toEqual({ id: 'sub_1', status: 'active' }) }) + it('drops unknown internal fields that are not present in the catalog schema', async () => { + const msgs: Message[] = [ + { + type: 'record', + stream: 'subscriptions', + data: { + id: 'sub_1', + status: 'active', + customer: 'cus_1', + _row_key: '["sub_1"]', + _row_number: 12, + }, + emitted_at: 1, + }, + ] + const result = await drain( + enforceCatalog( + catalog([ + { + name: 'subscriptions', + json_schema: { + type: 'object', + properties: { id: { type: 'string' }, status: { type: 'string' } }, + }, + }, + ]) + )(toAsync(msgs)) + ) + expect(result).toHaveLength(1) + expect((result[0] as { data: unknown }).data).toEqual({ + id: 'sub_1', + status: 'active', + }) + }) + it('passes records through unchanged when json_schema is absent', async () => { const msgs: Message[] = [ { diff --git a/apps/engine/src/lib/pipeline.ts b/apps/engine/src/lib/pipeline.ts index 360cb598..95e731a5 100644 --- a/apps/engine/src/lib/pipeline.ts +++ b/apps/engine/src/lib/pipeline.ts @@ -25,7 +25,7 @@ export function enforceCatalog( if (props) { yield { ...msg, - data: Object.fromEntries(Object.entries(msg.data).filter(([k]) => k in props)), + data: Object.fromEntries(Object.entries(msg.data).filter(([key]) => key in props)), } } else { yield msg diff --git a/apps/service/src/__tests__/workflow.test.ts b/apps/service/src/__tests__/workflow.test.ts index eaf2db62..1fec2872 100644 --- a/apps/service/src/__tests__/workflow.test.ts +++ b/apps/service/src/__tests__/workflow.test.ts @@ -3,11 +3,11 @@ import { TestWorkflowEnvironment } from '@temporalio/testing' import { Worker } from '@temporalio/worker' import path from 'node:path' import type { PipelineConfig } from '@stripe/sync-engine' -import type { SyncActivities } from '../temporal/activities.js' -import type { RunResult } from '../temporal/activities.js' +import type { SyncActivities } from '../temporal/activities/index.js' +import type { RunResult } from '../temporal/activities/index.js' -// workflowsPath must point to compiled JS (Temporal bundles it for V8 sandbox) -const workflowsPath = path.resolve(process.cwd(), 'dist/temporal/workflows.js') +// workflowsPath points to the compiled workflow directory. +const workflowsPath = path.resolve(process.cwd(), 'dist/temporal/workflows') const noErrors: RunResult = { errors: [], state: {} } @@ -19,9 +19,17 @@ const testPipeline = { function stubActivities(overrides: Partial = {}): SyncActivities { return { + discoverCatalog: async () => ({ streams: [] }), setup: async () => ({}), syncImmediate: async () => noErrors, + readIntoQueueWithState: async () => ({ count: 0, state: {} }), readIntoQueue: async () => ({ count: 0, state: {} }), + writeGoogleSheetsFromQueue: async () => ({ + errors: [], + state: {}, + written: 0, + rowAssignments: {}, + }), writeFromQueue: async () => ({ errors: [], state: {}, written: 0 }), teardown: async () => {}, ...overrides, @@ -284,3 +292,124 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => { }) }) }) + +describe('pipelineGoogleSheetsWorkflow (unit — stubbed activities)', () => { + it('uses the Sheets-specific read path and catalog discovery', async () => { + let discoverCalls = 0 + let readCalls = 0 + let syncCalls = 0 + + const worker = await Worker.create({ + connection: testEnv.nativeConnection, + taskQueue: 'test-queue-gs-1', + workflowsPath, + activities: stubActivities({ + discoverCatalog: async () => { + discoverCalls++ + return { streams: [] } + }, + readIntoQueueWithState: async () => { + readCalls++ + return { count: 0, state: {} } + }, + syncImmediate: async () => { + syncCalls++ + return noErrors + }, + }), + }) + + await worker.runUntil(async () => { + const handle = await testEnv.client.workflow.start('pipelineGoogleSheetsWorkflow', { + args: [ + { + ...testPipeline, + destination: { + type: 'google-sheets', + spreadsheet_id: 'sheet_123', + }, + }, + ], + workflowId: 'test-gs-sync-1', + taskQueue: 'test-queue-gs-1', + }) + + await new Promise((r) => setTimeout(r, 1500)) + await handle.signal('delete') + await handle.result() + + expect(discoverCalls).toBeGreaterThanOrEqual(1) + expect(readCalls).toBeGreaterThanOrEqual(1) + expect(syncCalls).toBe(0) + }) + }) + + it('passes the discovered catalog into the Sheets write activity', async () => { + const discoveredCatalog = { + streams: [ + { + stream: { + name: 'customers', + primary_key: [['id']], + json_schema: { + type: 'object', + properties: { + id: { type: 'string' }, + }, + }, + }, + sync_mode: 'full_refresh' as const, + destination_sync_mode: 'append' as const, + }, + ], + } + let readCalls = 0 + let writeCatalog: unknown + + const worker = await Worker.create({ + connection: testEnv.nativeConnection, + taskQueue: 'test-queue-gs-2', + workflowsPath, + activities: stubActivities({ + discoverCatalog: async () => discoveredCatalog, + readIntoQueueWithState: async () => { + readCalls++ + return readCalls === 1 + ? { count: 1, state: { customers: { cursor: 'cus_1' } } } + : { count: 0, state: { customers: { cursor: 'cus_1' } } } + }, + writeGoogleSheetsFromQueue: async (_config, _pipelineId, opts) => { + writeCatalog = opts?.catalog + return { + errors: [], + state: { customers: { cursor: 'cus_1' } }, + written: 0, + rowAssignments: {}, + } + }, + }), + }) + + await worker.runUntil(async () => { + const handle = await testEnv.client.workflow.start('pipelineGoogleSheetsWorkflow', { + args: [ + { + ...testPipeline, + destination: { + type: 'google-sheets', + spreadsheet_id: 'sheet_456', + }, + }, + ], + workflowId: 'test-gs-sync-2', + taskQueue: 'test-queue-gs-2', + }) + + await new Promise((r) => setTimeout(r, 1500)) + await handle.signal('delete') + await handle.result() + + expect(writeCatalog).toEqual(discoveredCatalog) + }) + }) +}) diff --git a/apps/service/src/api/app.integration.test.ts b/apps/service/src/api/app.integration.test.ts index 0e73060c..1e64385c 100644 --- a/apps/service/src/api/app.integration.test.ts +++ b/apps/service/src/api/app.integration.test.ts @@ -11,7 +11,7 @@ import Stripe from 'stripe' import sourceStripe from '@stripe/sync-source-stripe' import destinationPostgres from '@stripe/sync-destination-postgres' import { createApp as createEngineApp, createConnectorResolver } from '@stripe/sync-engine' -import { createActivities } from '../temporal/activities.js' +import { createActivities } from '../temporal/activities/index.js' import { createApp } from './app.js' import type { paths } from '../__generated__/openapi.js' @@ -24,7 +24,7 @@ const STRIPE_API_KEY = process.env['STRIPE_API_KEY']! const POSTGRES_URL = process.env['POSTGRES_URL'] ?? process.env['DATABASE_URL']! const TASK_QUEUE = `test-app-${Date.now()}` const SCHEMA = `integration_${Date.now()}` -const workflowsPath = path.resolve(process.cwd(), 'dist/temporal/workflows.js') +const workflowsPath = path.resolve(process.cwd(), 'dist/temporal/workflows') const SKIP_CLEANUP = process.env['SKIP_CLEANUP'] === '1' diff --git a/apps/service/src/api/app.test.ts b/apps/service/src/api/app.test.ts index 8a0f9c87..38c6f1e0 100644 --- a/apps/service/src/api/app.test.ts +++ b/apps/service/src/api/app.test.ts @@ -1,15 +1,16 @@ -import { describe, expect, it, beforeAll, afterAll } from 'vitest' +import { describe, expect, it, beforeAll, afterAll, vi } from 'vitest' import type { WorkflowClient } from '@temporalio/client' import { TestWorkflowEnvironment } from '@temporalio/testing' import { Worker } from '@temporalio/worker' import path from 'node:path' import { createConnectorResolver, sourceTest, destinationTest } from '@stripe/sync-engine' -import type { SyncActivities, RunResult } from '../temporal/activities.js' +import destinationGoogleSheets from '@stripe/sync-destination-google-sheets' +import type { SyncActivities, RunResult } from '../temporal/activities/index.js' import { createApp } from './app.js' const resolver = createConnectorResolver({ sources: { test: sourceTest }, - destinations: { test: destinationTest }, + destinations: { test: destinationTest, 'google-sheets': destinationGoogleSheets }, }) // Lightweight app for spec/health tests (no Temporal needed) @@ -68,18 +69,60 @@ describe('GET /health', () => { }) }) +describe('POST /pipelines workflow dispatch', () => { + it('starts google-sheets pipelines on the dedicated workflow', async () => { + const start = vi.fn(async () => ({})) + const res = await createApp({ + temporal: { client: { start } as unknown as WorkflowClient, taskQueue: 'unused' }, + resolver, + }).request('/pipelines', { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ + source: { type: 'test' }, + destination: { + type: 'google-sheets', + spreadsheet_id: 'sheet_123', + spreadsheet_title: 'Test Sheet', + client_id: 'client', + client_secret: 'secret', + access_token: 'token', + refresh_token: 'refresh', + }, + }), + }) + + expect(res.status).toBe(201) + expect(start).toHaveBeenCalledOnce() + expect(start).toHaveBeenCalledWith( + 'pipelineGoogleSheetsWorkflow', + expect.objectContaining({ + taskQueue: 'unused', + }) + ) + }) +}) + // --------------------------------------------------------------------------- // Pipeline CRUD + pause/resume (in-memory Temporal, stub activities) // --------------------------------------------------------------------------- -const workflowsPath = path.resolve(process.cwd(), 'dist/temporal/workflows.js') +const workflowsPath = path.resolve(process.cwd(), 'dist/temporal/workflows') const noErrors: RunResult = { errors: [], state: {} } function stubActivities(): SyncActivities { return { + discoverCatalog: async () => ({ streams: [] }), setup: async () => ({}), syncImmediate: async () => noErrors, + readIntoQueueWithState: async () => ({ count: 0, state: {} }), readIntoQueue: async () => ({ count: 0, state: {} }), + writeGoogleSheetsFromQueue: async () => ({ + errors: [], + state: {}, + written: 0, + rowAssignments: {}, + }), writeFromQueue: async () => ({ errors: [], state: {}, written: 0 }), teardown: async () => {}, } @@ -175,6 +218,99 @@ describe('pipeline CRUD', () => { await a.request(`/pipelines/${created.id}`, { method: 'DELETE' }) }) + it('rejects changing the target spreadsheet for a google-sheets pipeline', async () => { + const a = liveApp() + + const createRes = await a.request('/pipelines', { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ + source: { type: 'test' }, + destination: { + type: 'google-sheets', + spreadsheet_id: 'sheet_123', + spreadsheet_title: 'Original Sheet', + client_id: 'client', + client_secret: 'secret', + access_token: 'token', + refresh_token: 'refresh', + }, + }), + }) + const created = await createRes.json() + await waitForPipeline(a, created.id) + + const updateRes = await a.request(`/pipelines/${created.id}`, { + method: 'PATCH', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ + destination: { + type: 'google-sheets', + spreadsheet_id: 'sheet_456', + spreadsheet_title: 'Replacement Sheet', + client_id: 'client', + client_secret: 'secret', + access_token: 'token', + refresh_token: 'refresh', + }, + }), + }) + + expect(updateRes.status).toBe(400) + expect(await updateRes.json()).toEqual({ + error: + 'Changing the target spreadsheet for a google-sheets pipeline requires recreating the pipeline', + }) + + await a.request(`/pipelines/${created.id}`, { method: 'DELETE' }) + }) + + it('allows changing spreadsheet title when spreadsheet_id is unchanged', async () => { + const a = liveApp() + + const createRes = await a.request('/pipelines', { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ + source: { type: 'test' }, + destination: { + type: 'google-sheets', + spreadsheet_id: 'sheet_123', + spreadsheet_title: 'Original Sheet', + client_id: 'client', + client_secret: 'secret', + access_token: 'token', + refresh_token: 'refresh', + }, + }), + }) + const created = await createRes.json() + await waitForPipeline(a, created.id) + + const updateRes = await a.request(`/pipelines/${created.id}`, { + method: 'PATCH', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ + destination: { + type: 'google-sheets', + spreadsheet_id: 'sheet_123', + spreadsheet_title: 'Renamed Sheet', + client_id: 'client', + client_secret: 'secret', + access_token: 'token', + refresh_token: 'refresh', + }, + }), + }) + + expect(updateRes.status).toBe(200) + const updated = await updateRes.json() + expect(updated.destination.spreadsheet_id).toBe('sheet_123') + expect(updated.destination.spreadsheet_title).toBe('Renamed Sheet') + + await a.request(`/pipelines/${created.id}`, { method: 'DELETE' }) + }) + it('pause and resume return pipeline with updated status', async () => { const a = liveApp() diff --git a/apps/service/src/api/app.ts b/apps/service/src/api/app.ts index 7ce5bdfd..d98beaeb 100644 --- a/apps/service/src/api/app.ts +++ b/apps/service/src/api/app.ts @@ -6,7 +6,18 @@ import type { ConnectorResolver } from '@stripe/sync-engine' import { endpointTable, addDiscriminators } from '@stripe/sync-engine/api/openapi-utils' import { createSchemas } from '../lib/createSchemas.js' import type { Pipeline } from '../lib/createSchemas.js' -import type { WorkflowStatus } from '../temporal/workflows.js' +import type { WorkflowStatus } from '../temporal/workflows/_shared.js' + +const DEFAULT_PIPELINE_WORKFLOW = 'pipelineWorkflow' +const GOOGLE_SHEETS_PIPELINE_WORKFLOW = 'pipelineGoogleSheetsWorkflow' +const ACTIVE_PIPELINE_STATUSES = + "ExecutionStatus IN ('Running', 'Failed', 'Terminated', 'TimedOut', 'Canceled')" + +function workflowTypeForPipeline(pipeline: Pipeline): string { + return pipeline.destination.type === 'google-sheets' + ? GOOGLE_SHEETS_PIPELINE_WORKFLOW + : DEFAULT_PIPELINE_WORKFLOW +} // MARK: - Helpers @@ -113,28 +124,30 @@ export function createApp(options: AppOptions) { // Completed = soft-deleted (via delete signal). Show everything else // including failed/terminated so operators can see broken pipelines. const pipelines: Array = [] - for await (const wf of temporal.list({ - query: `WorkflowType = 'pipelineWorkflow' AND ExecutionStatus IN ('Running', 'Failed', 'Terminated', 'TimedOut', 'Canceled')`, - })) { - try { - const handle = temporal.getHandle(wf.workflowId) - const [pipeline, status] = await Promise.all([ - handle.query('config'), - handle.query('status'), - ]) - pipelines.push({ ...pipeline, status }) - } catch { - // Non-queryable (failed/terminated) — fall back to memo with derived status - const memo = wf.memo as { pipeline?: Pipeline } | undefined - if (memo?.pipeline) { - pipelines.push({ - ...memo.pipeline, - status: { - phase: wf.status.name.toLowerCase(), - paused: false, - iteration: 0, - }, - }) + for (const workflowType of [DEFAULT_PIPELINE_WORKFLOW, GOOGLE_SHEETS_PIPELINE_WORKFLOW]) { + for await (const wf of temporal.list({ + query: `WorkflowType = '${workflowType}' AND ${ACTIVE_PIPELINE_STATUSES}`, + })) { + try { + const handle = temporal.getHandle(wf.workflowId) + const [pipeline, status] = await Promise.all([ + handle.query('config'), + handle.query('status'), + ]) + pipelines.push({ ...pipeline, status }) + } catch { + // Non-queryable (failed/terminated) — fall back to memo with derived status + const memo = wf.memo as { pipeline?: Pipeline } | undefined + if (memo?.pipeline) { + pipelines.push({ + ...memo.pipeline, + status: { + phase: wf.status.name.toLowerCase(), + paused: false, + iteration: 0, + }, + }) + } } } } @@ -167,7 +180,7 @@ export function createApp(options: AppOptions) { const body = c.req.valid('json') const id = genId('pipe') const pipeline = { id, ...(body as Record) } as Pipeline - await temporal.start('pipelineWorkflow', { + await temporal.start(workflowTypeForPipeline(pipeline), { workflowId: id, taskQueue, args: [pipeline], @@ -262,6 +275,34 @@ export function createApp(options: AppOptions) { const patch = c.req.valid('json') try { const handle = temporal.getHandle(id) + const current = await handle.query('config') + const next = { + ...current, + source: patch.source ? patch.source : current.source, + destination: patch.destination ? patch.destination : current.destination, + streams: patch.streams !== undefined ? patch.streams : current.streams, + } as Pipeline + if (workflowTypeForPipeline(current) !== workflowTypeForPipeline(next)) { + return c.json( + { + error: + 'Changing destination.type between google-sheets and non-google-sheets requires recreating the pipeline', + }, + 400 + ) + } + if ( + current.destination.type === 'google-sheets' && + current.destination.spreadsheet_id !== next.destination.spreadsheet_id + ) { + return c.json( + { + error: + 'Changing the target spreadsheet for a google-sheets pipeline requires recreating the pipeline', + }, + 400 + ) + } await handle.signal('update', patch) // Brief wait for signal to be processed before querying await new Promise((r) => setTimeout(r, 200)) diff --git a/apps/service/src/cli.test.ts b/apps/service/src/cli.test.ts new file mode 100644 index 00000000..7fafbb09 --- /dev/null +++ b/apps/service/src/cli.test.ts @@ -0,0 +1,87 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' + +const runMock = vi.fn(async () => {}) +const createWorkerMock = vi.fn(async () => ({ run: runMock })) + +vi.mock('./temporal/worker.js', () => ({ + createWorker: createWorkerMock, +})) + +describe('worker CLI', () => { + const originalKafkaBroker = process.env['KAFKA_BROKER'] + + beforeEach(() => { + vi.clearAllMocks() + delete process.env['KAFKA_BROKER'] + }) + + afterEach(() => { + if (originalKafkaBroker === undefined) { + delete process.env['KAFKA_BROKER'] + } else { + process.env['KAFKA_BROKER'] = originalKafkaBroker + } + }) + + it('threads --kafka-broker through to createWorker', async () => { + vi.resetModules() + const { createProgram } = await import('./cli.js') + const program = (await createProgram()) as { + subCommands: Record< + string, + { + args: Record + run: (input: { args: Record }) => Promise + } + > + } + + expect(program.subCommands['worker']?.args['kafka-broker']).toBeDefined() + + await program.subCommands['worker']!.run({ + args: { + 'temporal-address': 'localhost:7233', + 'temporal-namespace': 'default', + 'temporal-task-queue': 'sync-engine', + 'engine-url': 'http://localhost:4010', + 'kafka-broker': 'localhost:9092', + }, + }) + + expect(createWorkerMock).toHaveBeenCalledWith( + expect.objectContaining({ + kafkaBroker: 'localhost:9092', + }) + ) + expect(runMock).toHaveBeenCalledOnce() + }) + + it('falls back to KAFKA_BROKER when the flag is omitted', async () => { + process.env['KAFKA_BROKER'] = 'env-broker:9092' + vi.resetModules() + const { createProgram } = await import('./cli.js') + const program = (await createProgram()) as { + subCommands: Record< + string, + { + run: (input: { args: Record }) => Promise + } + > + } + + await program.subCommands['worker']!.run({ + args: { + 'temporal-address': 'localhost:7233', + 'temporal-namespace': 'default', + 'temporal-task-queue': 'sync-engine', + 'engine-url': 'http://localhost:4010', + }, + }) + + expect(createWorkerMock).toHaveBeenCalledWith( + expect.objectContaining({ + kafkaBroker: 'env-broker:9092', + }) + ) + }) +}) diff --git a/apps/service/src/cli.ts b/apps/service/src/cli.ts index d11817b3..e3077685 100644 --- a/apps/service/src/cli.ts +++ b/apps/service/src/cli.ts @@ -92,27 +92,37 @@ const workerCmd = defineCommand({ default: 'http://localhost:4010', description: 'Sync engine URL for sync execution (default: http://localhost:4010)', }, + 'kafka-broker': { + type: 'string', + description: + 'Kafka broker for queue-backed workflows (for example localhost:9092). Can also be set via KAFKA_BROKER.', + }, }, async run({ args }) { const { createWorker } = await import('./temporal/worker.js') const taskQueue = args['temporal-task-queue'] || 'sync-engine' const namespace = args['temporal-namespace'] || 'default' const engineUrl = args['engine-url'] || 'http://localhost:4010' + const kafkaBroker = args['kafka-broker'] || process.env['KAFKA_BROKER'] const temporalAddress = args['temporal-address'] - // workflowsPath: resolve relative to the package dist directory + // workflowsPath: point Temporal at the compiled workflow directory const pkgDir = path.resolve(import.meta.dirname ?? process.cwd(), '..') - const workflowsPath = path.resolve(pkgDir, 'dist/temporal/workflows.js') + const workflowsPath = path.resolve(pkgDir, 'dist/temporal/workflows') const worker = await createWorker({ temporalAddress, namespace, taskQueue, engineUrl, + kafkaBroker, workflowsPath, }) - logger.info({ temporalAddress, namespace, taskQueue, engineUrl }, 'Starting Temporal worker') + logger.info( + { temporalAddress, namespace, taskQueue, engineUrl, kafkaBroker }, + 'Starting Temporal worker' + ) await worker.run() }, diff --git a/apps/service/src/index.ts b/apps/service/src/index.ts index c614db80..66aa28cd 100644 --- a/apps/service/src/index.ts +++ b/apps/service/src/index.ts @@ -17,8 +17,8 @@ export { createApp } from './api/app.js' export type { AppOptions } from './api/app.js' // Temporal workflow types (for consumers that need to reference them) -export { createActivities } from './temporal/activities.js' -export type { SyncActivities, RunResult } from './temporal/activities.js' -export type { WorkflowStatus } from './temporal/workflows.js' +export { createActivities } from './temporal/activities/index.js' +export type { SyncActivities, RunResult } from './temporal/activities/index.js' +export type { WorkflowStatus } from './temporal/workflows/_shared.js' export { createWorker } from './temporal/worker.js' export type { WorkerOptions } from './temporal/worker.js' diff --git a/apps/service/src/temporal/activities.ts b/apps/service/src/temporal/activities.ts deleted file mode 100644 index 10cf47a0..00000000 --- a/apps/service/src/temporal/activities.ts +++ /dev/null @@ -1,181 +0,0 @@ -import { heartbeat } from '@temporalio/activity' -import { createRemoteEngine } from '@stripe/sync-engine' -import type { PipelineConfig, Message, SetupResult } from '@stripe/sync-engine' -import { Kafka } from 'kafkajs' - -export interface RunResult { - errors: Array<{ message: string; failure_type?: string; stream?: string }> - state: Record -} - -/** Convert an array to an async iterable. */ -async function* asIterable(items: T[]): AsyncIterable { - for (const item of items) yield item -} - -/** Iterate a message stream, collecting errors/state/records and heartbeating. */ -async function drainMessages(stream: AsyncIterable>): Promise<{ - errors: RunResult['errors'] - state: Record - records: unknown[] -}> { - const errors: RunResult['errors'] = [] - const state: Record = {} - const records: unknown[] = [] - let count = 0 - - for await (const m of stream) { - count++ - if (m.type === 'error') { - errors.push({ - message: - (m.message as string) || - ((m.data as Record)?.message as string) || - 'Unknown error', - failure_type: m.failure_type as string | undefined, - stream: m.stream as string | undefined, - }) - } else if (m.type === 'state' && typeof m.stream === 'string') { - state[m.stream] = m.data - } else if (m.type === 'record') { - records.push(m) - } - if (count % 50 === 0) heartbeat({ messages: count }) - } - if (count % 50 !== 0) heartbeat({ messages: count }) - - return { errors, state, records } -} - -export function createActivities(opts: { engineUrl: string; kafkaBroker?: string }) { - const { engineUrl, kafkaBroker } = opts - - // Shared Kafka client + producer (created lazily, reused across activity calls) - let kafka: Kafka | undefined - let producerConnected: Promise | undefined - - function getKafka(): Kafka { - if (!kafka) { - if (!kafkaBroker) throw new Error('kafkaBroker is required for read-write mode') - kafka = new Kafka({ brokers: [kafkaBroker] }) - } - return kafka - } - - function getProducer(): Promise { - if (!producerConnected) { - const producer = getKafka().producer() - producerConnected = producer.connect().then(() => producer) - } - return producerConnected - } - - function topicName(pipelineId: string): string { - return `pipeline.${pipelineId}` - } - - return { - async setup(config: PipelineConfig): Promise { - const engine = createRemoteEngine(engineUrl, config) - return await engine.setup() - }, - - async syncImmediate( - config: PipelineConfig, - opts?: { input?: unknown[]; state?: Record; stateLimit?: number } - ): Promise { - const engine = createRemoteEngine(engineUrl, config, { - state: opts?.state, - stateLimit: opts?.stateLimit, - }) - const input = opts?.input?.length ? asIterable(opts.input) : undefined - const { errors, state } = await drainMessages( - engine.sync(input) as AsyncIterable> - ) - return { errors, state } - }, - - async readIntoQueue( - config: PipelineConfig, - pipelineId: string, - opts?: { input?: unknown[]; state?: Record; stateLimit?: number } - ): Promise<{ count: number; state: Record }> { - const engine = createRemoteEngine(engineUrl, config, { - state: opts?.state, - stateLimit: opts?.stateLimit, - }) - const input = opts?.input?.length ? asIterable(opts.input) : undefined - const { records, state } = await drainMessages( - engine.read(input) as AsyncIterable> - ) - - // If Kafka is configured, produce records to the pipeline topic - if (kafkaBroker && records.length > 0) { - const producer = await getProducer() - await producer.send({ - topic: topicName(pipelineId), - messages: records.map((r) => ({ value: JSON.stringify(r) })), - }) - } - - return { count: records.length, state } - }, - - async writeFromQueue( - config: PipelineConfig, - pipelineId: string, - opts?: { records?: unknown[]; maxBatch?: number } - ): Promise { - let records: unknown[] - - if (kafkaBroker) { - // Consume a batch from Kafka - const maxBatch = opts?.maxBatch ?? 50 - records = [] - const consumer = getKafka().consumer({ groupId: `pipeline.${pipelineId}` }) - await consumer.connect() - await consumer.subscribe({ topic: topicName(pipelineId), fromBeginning: false }) - - await new Promise((resolve) => { - consumer.run({ - eachMessage: async ({ message }) => { - if (message.value) { - records.push(JSON.parse(message.value.toString())) - } - if (records.length >= maxBatch) { - resolve() - } - }, - }) - // If fewer than maxBatch messages are available, resolve after a short wait - setTimeout(resolve, 2000) - }) - - await consumer.disconnect() - } else { - // In-memory mode: records passed directly - records = opts?.records ?? [] - } - - if (records.length === 0) { - return { errors: [], state: {}, written: 0 } - } - - const engine = createRemoteEngine(engineUrl, config) - const { errors, state } = await drainMessages( - engine.write(asIterable(records) as AsyncIterable) as AsyncIterable< - Record - > - ) - - return { errors, state, written: records.length } - }, - - async teardown(config: PipelineConfig): Promise { - const engine = createRemoteEngine(engineUrl, config) - await engine.teardown() - }, - } -} - -export type SyncActivities = ReturnType diff --git a/apps/service/src/temporal/activities/_shared.ts b/apps/service/src/temporal/activities/_shared.ts new file mode 100644 index 00000000..1f68edf1 --- /dev/null +++ b/apps/service/src/temporal/activities/_shared.ts @@ -0,0 +1,253 @@ +import { heartbeat } from '@temporalio/activity' +import type { ConfiguredCatalog, Message, RecordMessage } from '@stripe/sync-engine' +import { Kafka } from 'kafkajs' +import type { Producer } from 'kafkajs' +import { + ROW_KEY_FIELD, + ROW_NUMBER_FIELD, + serializeRowKey, +} from '@stripe/sync-destination-google-sheets' + +export interface ActivitiesContext { + engineUrl: string + kafkaBroker?: string + getProducer(): Promise + consumeQueueBatch(pipelineId: string, maxBatch: number): Promise +} + +export function createActivitiesContext(opts: { + engineUrl: string + kafkaBroker?: string +}): ActivitiesContext { + const { engineUrl, kafkaBroker } = opts + + let kafka: Kafka | undefined + let producerConnected: Promise | undefined + + function getKafka(): Kafka { + if (!kafka) { + if (!kafkaBroker) throw new Error('kafkaBroker is required for read-write mode') + kafka = new Kafka({ brokers: [kafkaBroker] }) + } + return kafka + } + + function topicName(pipelineId: string): string { + return `pipeline.${pipelineId}` + } + + async function getProducer(): Promise { + if (!producerConnected) { + const producer = getKafka().producer() + producerConnected = producer.connect().then(() => producer) + } + return producerConnected + } + + async function consumeQueueBatch(pipelineId: string, maxBatch: number): Promise { + if (!kafkaBroker) throw new Error('kafkaBroker is required for read-write mode') + + const topic = topicName(pipelineId) + const messages: Message[] = [] + const offsets = new Map() + const consumer = getKafka().consumer({ groupId: `pipeline.${pipelineId}` }) + await consumer.connect() + await consumer.subscribe({ topic, fromBeginning: true }) + + try { + await new Promise((resolve) => { + let resolved = false + const finish = () => { + if (resolved) return + resolved = true + resolve() + } + + consumer.run({ + eachMessage: async ({ partition, message }) => { + if (message.value) { + messages.push(JSON.parse(message.value.toString()) as Message) + offsets.set(partition, (BigInt(message.offset) + 1n).toString()) + } + if (messages.length >= maxBatch) finish() + }, + }) + + setTimeout(finish, 2000) + }) + + await consumer.stop() + + if (offsets.size > 0) { + await consumer.commitOffsets( + [...offsets.entries()].map(([partition, offset]) => ({ + topic, + partition, + offset, + })) + ) + } + } finally { + await consumer.disconnect() + } + + return messages + } + + return { + engineUrl, + kafkaBroker, + getProducer, + consumeQueueBatch, + } +} + +export interface RunResult { + errors: Array<{ message: string; failure_type?: string; stream?: string }> + state: Record +} + +export async function* asIterable(items: T[]): AsyncIterable { + for (const item of items) yield item +} + +export function pipelineHeader(config: Record): string { + return JSON.stringify(config) +} + +export function collectError(message: Record): RunResult['errors'][number] | null { + if (message.type !== 'error') return null + return { + message: + (message.message as string) || + ((message.data as Record)?.message as string) || + 'Unknown error', + failure_type: message.failure_type as string | undefined, + stream: message.stream as string | undefined, + } +} + +export function withRowKey(record: RecordMessage, catalog?: ConfiguredCatalog): RecordMessage { + const primaryKey = catalog?.streams.find((stream) => stream.stream.name === record.stream)?.stream + .primary_key + if (!primaryKey) return record + return { + ...record, + data: { + ...record.data, + [ROW_KEY_FIELD]: serializeRowKey(primaryKey, record.data), + }, + } +} + +export function compactGoogleSheetsMessages(messages: Message[]): Message[] { + const compacted: Message[] = [] + let pendingOrder: string[] = [] + let pending = new Map() + + const flushPending = () => { + for (const key of pendingOrder) { + const message = pending.get(key) + if (message) compacted.push(message) + } + pendingOrder = [] + pending = new Map() + } + + for (const message of messages) { + if (message.type === 'record') { + const rowKey = + typeof message.data[ROW_KEY_FIELD] === 'string' ? message.data[ROW_KEY_FIELD] : undefined + if (!rowKey) { + compacted.push(message) + continue + } + const dedupeKey = `${message.stream}:${rowKey}` + if (!pending.has(dedupeKey)) pendingOrder.push(dedupeKey) + pending.set(dedupeKey, message) + continue + } + + if (message.type === 'state') { + flushPending() + compacted.push(message) + } + } + + flushPending() + return compacted +} + +export function addRowNumbers( + messages: Message[], + rowIndex: Record> +): Message[] { + return messages.map((message) => { + if (message.type !== 'record') return message + const rowKey = + typeof message.data[ROW_KEY_FIELD] === 'string' ? message.data[ROW_KEY_FIELD] : undefined + const rowNumber = rowKey ? rowIndex[message.stream]?.[rowKey] : undefined + if (rowNumber === undefined) return message + return { + ...message, + data: { + ...message.data, + [ROW_NUMBER_FIELD]: rowNumber, + }, + } + }) +} + +export function augmentGoogleSheetsCatalog(catalog: ConfiguredCatalog): ConfiguredCatalog { + return { + streams: catalog.streams.map((configuredStream) => { + const props = configuredStream.stream.json_schema?.properties as + | Record + | undefined + + if (!props) return configuredStream + + return { + ...configuredStream, + stream: { + ...configuredStream.stream, + json_schema: { + ...configuredStream.stream.json_schema, + properties: { + ...props, + [ROW_KEY_FIELD]: { type: 'string' }, + [ROW_NUMBER_FIELD]: { type: 'number' }, + }, + }, + }, + } + }), + } +} + +export async function drainMessages(stream: AsyncIterable>): Promise<{ + errors: RunResult['errors'] + state: Record + records: unknown[] +}> { + const errors: RunResult['errors'] = [] + const state: Record = {} + const records: unknown[] = [] + let count = 0 + + for await (const message of stream) { + count++ + const error = collectError(message) + if (error) { + errors.push(error) + } else if (message.type === 'state' && typeof message.stream === 'string') { + state[message.stream] = message.data + } else if (message.type === 'record') { + records.push(message) + } + if (count % 50 === 0) heartbeat({ messages: count }) + } + if (count % 50 !== 0) heartbeat({ messages: count }) + + return { errors, state, records } +} diff --git a/apps/service/src/temporal/activities/discover-catalog.ts b/apps/service/src/temporal/activities/discover-catalog.ts new file mode 100644 index 00000000..13c66f75 --- /dev/null +++ b/apps/service/src/temporal/activities/discover-catalog.ts @@ -0,0 +1,19 @@ +import { applySelection, buildCatalog } from '@stripe/sync-engine' +import type { ConfiguredCatalog, PipelineConfig, Stream } from '@stripe/sync-engine' +import type { ActivitiesContext } from './_shared.js' +import { pipelineHeader } from './_shared.js' + +export function createDiscoverCatalogActivity(context: ActivitiesContext) { + return async function discoverCatalog(config: PipelineConfig): Promise { + const response = await fetch(`${context.engineUrl}/discover`, { + method: 'POST', + headers: { 'x-pipeline': pipelineHeader(config) }, + }) + if (!response.ok) { + const text = await response.text().catch(() => '') + throw new Error(`Engine /discover failed (${response.status}): ${text}`) + } + const payload = (await response.json()) as { streams: Stream[] } + return applySelection(buildCatalog(payload.streams, config.streams)) + } +} diff --git a/apps/service/src/temporal/activities/index.ts b/apps/service/src/temporal/activities/index.ts new file mode 100644 index 00000000..aaaff25c --- /dev/null +++ b/apps/service/src/temporal/activities/index.ts @@ -0,0 +1,28 @@ +import { createActivitiesContext } from './_shared.js' +import { createDiscoverCatalogActivity } from './discover-catalog.js' +import { createReadIntoQueueActivity } from './read-into-queue.js' +import { createReadIntoQueueWithStateActivity } from './read-into-queue-with-state.js' +import { createSetupActivity } from './setup.js' +import { createSyncImmediateActivity } from './sync-immediate.js' +import { createTeardownActivity } from './teardown.js' +import { createWriteFromQueueActivity } from './write-from-queue.js' +import { createWriteGoogleSheetsFromQueueActivity } from './write-google-sheets-from-queue.js' + +export type { RunResult } from './_shared.js' + +export function createActivities(opts: { engineUrl: string; kafkaBroker?: string }) { + const context = createActivitiesContext(opts) + + return { + discoverCatalog: createDiscoverCatalogActivity(context), + setup: createSetupActivity(context), + syncImmediate: createSyncImmediateActivity(context), + readIntoQueueWithState: createReadIntoQueueWithStateActivity(context), + readIntoQueue: createReadIntoQueueActivity(context), + writeGoogleSheetsFromQueue: createWriteGoogleSheetsFromQueueActivity(context), + writeFromQueue: createWriteFromQueueActivity(context), + teardown: createTeardownActivity(context), + } +} + +export type SyncActivities = ReturnType diff --git a/apps/service/src/temporal/activities/read-into-queue-with-state.ts b/apps/service/src/temporal/activities/read-into-queue-with-state.ts new file mode 100644 index 00000000..bd629092 --- /dev/null +++ b/apps/service/src/temporal/activities/read-into-queue-with-state.ts @@ -0,0 +1,60 @@ +import { heartbeat } from '@temporalio/activity' +import { createRemoteEngine } from '@stripe/sync-engine' +import type { ConfiguredCatalog, Message, PipelineConfig, RecordMessage } from '@stripe/sync-engine' +import type { ActivitiesContext } from './_shared.js' +import { asIterable, collectError, type RunResult, withRowKey } from './_shared.js' + +export function createReadIntoQueueWithStateActivity(context: ActivitiesContext) { + return async function readIntoQueueWithState( + config: PipelineConfig, + pipelineId: string, + opts?: { + input?: unknown[] + state?: Record + stateLimit?: number + catalog?: ConfiguredCatalog + } + ): Promise<{ count: number; state: Record }> { + if (!context.kafkaBroker) throw new Error('kafkaBroker is required for Google Sheets workflow') + + const engine = createRemoteEngine(context.engineUrl, config, { + state: opts?.state, + stateLimit: opts?.stateLimit, + }) + const input = opts?.input?.length ? asIterable(opts.input) : undefined + + const queued: Message[] = [] + const state: Record = {} + const errors: RunResult['errors'] = [] + let seen = 0 + + for await (const raw of engine.read(input) as AsyncIterable>) { + seen++ + const error = collectError(raw) + if (error) { + errors.push(error) + } else if (raw.type === 'record') { + queued.push(withRowKey(raw as RecordMessage, opts?.catalog)) + } else if (raw.type === 'state' && typeof raw.stream === 'string') { + state[raw.stream] = raw.data + queued.push(raw as Message) + } + if (seen % 50 === 0) heartbeat({ messages: seen }) + } + if (seen % 50 !== 0) heartbeat({ messages: seen }) + + if (errors.length > 0) { + throw new Error(errors.map((error) => error.message).join('; ')) + } + + if (queued.length > 0) { + const producer = await context.getProducer() + await producer.send({ + topic: `pipeline.${pipelineId}`, + messages: queued.map((message) => ({ value: JSON.stringify(message) })), + }) + } + + return { count: queued.length, state } + } +} diff --git a/apps/service/src/temporal/activities/read-into-queue.ts b/apps/service/src/temporal/activities/read-into-queue.ts new file mode 100644 index 00000000..f4d95b40 --- /dev/null +++ b/apps/service/src/temporal/activities/read-into-queue.ts @@ -0,0 +1,31 @@ +import { createRemoteEngine } from '@stripe/sync-engine' +import type { PipelineConfig } from '@stripe/sync-engine' +import type { ActivitiesContext } from './_shared.js' +import { asIterable, drainMessages } from './_shared.js' + +export function createReadIntoQueueActivity(context: ActivitiesContext) { + return async function readIntoQueue( + config: PipelineConfig, + pipelineId: string, + opts?: { input?: unknown[]; state?: Record; stateLimit?: number } + ): Promise<{ count: number; state: Record }> { + const engine = createRemoteEngine(context.engineUrl, config, { + state: opts?.state, + stateLimit: opts?.stateLimit, + }) + const input = opts?.input?.length ? asIterable(opts.input) : undefined + const { records, state } = await drainMessages( + engine.read(input) as AsyncIterable> + ) + + if (context.kafkaBroker && records.length > 0) { + const producer = await context.getProducer() + await producer.send({ + topic: `pipeline.${pipelineId}`, + messages: records.map((record) => ({ value: JSON.stringify(record) })), + }) + } + + return { count: records.length, state } + } +} diff --git a/apps/service/src/temporal/activities/setup.ts b/apps/service/src/temporal/activities/setup.ts new file mode 100644 index 00000000..eeb3e738 --- /dev/null +++ b/apps/service/src/temporal/activities/setup.ts @@ -0,0 +1,10 @@ +import { createRemoteEngine } from '@stripe/sync-engine' +import type { PipelineConfig, SetupResult } from '@stripe/sync-engine' +import type { ActivitiesContext } from './_shared.js' + +export function createSetupActivity(context: ActivitiesContext) { + return async function setup(config: PipelineConfig): Promise { + const engine = createRemoteEngine(context.engineUrl, config) + return await engine.setup() + } +} diff --git a/apps/service/src/temporal/activities/sync-immediate.ts b/apps/service/src/temporal/activities/sync-immediate.ts new file mode 100644 index 00000000..db69c577 --- /dev/null +++ b/apps/service/src/temporal/activities/sync-immediate.ts @@ -0,0 +1,21 @@ +import { createRemoteEngine } from '@stripe/sync-engine' +import type { PipelineConfig } from '@stripe/sync-engine' +import type { ActivitiesContext } from './_shared.js' +import { asIterable, drainMessages, type RunResult } from './_shared.js' + +export function createSyncImmediateActivity(context: ActivitiesContext) { + return async function syncImmediate( + config: PipelineConfig, + opts?: { input?: unknown[]; state?: Record; stateLimit?: number } + ): Promise { + const engine = createRemoteEngine(context.engineUrl, config, { + state: opts?.state, + stateLimit: opts?.stateLimit, + }) + const input = opts?.input?.length ? asIterable(opts.input) : undefined + const { errors, state } = await drainMessages( + engine.sync(input) as AsyncIterable> + ) + return { errors, state } + } +} diff --git a/apps/service/src/temporal/activities/teardown.ts b/apps/service/src/temporal/activities/teardown.ts new file mode 100644 index 00000000..4fbaf8c5 --- /dev/null +++ b/apps/service/src/temporal/activities/teardown.ts @@ -0,0 +1,10 @@ +import { createRemoteEngine } from '@stripe/sync-engine' +import type { PipelineConfig } from '@stripe/sync-engine' +import type { ActivitiesContext } from './_shared.js' + +export function createTeardownActivity(context: ActivitiesContext) { + return async function teardown(config: PipelineConfig): Promise { + const engine = createRemoteEngine(context.engineUrl, config) + await engine.teardown() + } +} diff --git a/apps/service/src/temporal/activities/write-from-queue.ts b/apps/service/src/temporal/activities/write-from-queue.ts new file mode 100644 index 00000000..3015dd9e --- /dev/null +++ b/apps/service/src/temporal/activities/write-from-queue.ts @@ -0,0 +1,34 @@ +import { createRemoteEngine } from '@stripe/sync-engine' +import type { Message, PipelineConfig } from '@stripe/sync-engine' +import type { ActivitiesContext } from './_shared.js' +import { asIterable, drainMessages, type RunResult } from './_shared.js' + +export function createWriteFromQueueActivity(context: ActivitiesContext) { + return async function writeFromQueue( + config: PipelineConfig, + pipelineId: string, + opts?: { records?: unknown[]; maxBatch?: number } + ): Promise { + let records: unknown[] + + if (context.kafkaBroker) { + const maxBatch = opts?.maxBatch ?? 50 + records = await context.consumeQueueBatch(pipelineId, maxBatch) + } else { + records = opts?.records ?? [] + } + + if (records.length === 0) { + return { errors: [], state: {}, written: 0 } + } + + const engine = createRemoteEngine(context.engineUrl, config) + const { errors, state } = await drainMessages( + engine.write(asIterable(records) as AsyncIterable) as AsyncIterable< + Record + > + ) + + return { errors, state, written: records.length } + } +} diff --git a/apps/service/src/temporal/activities/write-google-sheets-from-queue.ts b/apps/service/src/temporal/activities/write-google-sheets-from-queue.ts new file mode 100644 index 00000000..a74ce99d --- /dev/null +++ b/apps/service/src/temporal/activities/write-google-sheets-from-queue.ts @@ -0,0 +1,85 @@ +import { enforceCatalog } from '@stripe/sync-engine' +import type { ConfiguredCatalog, DestinationInput, PipelineConfig } from '@stripe/sync-engine' +import { + configSchema as googleSheetsConfigSchema, + createDestination as createGoogleSheetsDestination, + parseGoogleSheetsMetaLog, +} from '@stripe/sync-destination-google-sheets' +import type { ActivitiesContext } from './_shared.js' +import { + addRowNumbers, + asIterable, + augmentGoogleSheetsCatalog, + collectError, + compactGoogleSheetsMessages, + type RunResult, +} from './_shared.js' + +export function createWriteGoogleSheetsFromQueueActivity(context: ActivitiesContext) { + return async function writeGoogleSheetsFromQueue( + config: PipelineConfig, + pipelineId: string, + opts?: { + maxBatch?: number + rowIndex?: Record> + catalog?: ConfiguredCatalog + } + ): Promise< + RunResult & { + written: number + rowAssignments: Record> + } + > { + if (!context.kafkaBroker) throw new Error('kafkaBroker is required for Google Sheets workflow') + + const maxBatch = opts?.maxBatch ?? 50 + const queued = await context.consumeQueueBatch(pipelineId, maxBatch) + + if (queued.length === 0) { + return { errors: [], state: {}, written: 0, rowAssignments: {} } + } + + const writeBatch = addRowNumbers(compactGoogleSheetsMessages(queued), opts?.rowIndex ?? {}) + if (config.destination.type !== 'google-sheets') { + throw new Error('writeGoogleSheetsFromQueue requires a google-sheets destination') + } + if (!opts?.catalog) { + throw new Error('catalog is required for Google Sheets workflow writes') + } + + const destinationConfig = googleSheetsConfigSchema.parse(config.destination) + const filteredCatalog = augmentGoogleSheetsCatalog(opts.catalog) + const destination = createGoogleSheetsDestination() + const errors: RunResult['errors'] = [] + const state: Record = {} + const rowAssignments: Record> = {} + const input = enforceCatalog(filteredCatalog)( + asIterable(writeBatch) + ) as AsyncIterable + + for await (const raw of destination.write( + { + config: destinationConfig, + catalog: filteredCatalog, + }, + input + )) { + const error = collectError(raw) + if (error) { + errors.push(error) + } else if (raw.type === 'state' && typeof raw.stream === 'string') { + state[raw.stream] = raw.data + } else if (raw.type === 'log' && typeof raw.message === 'string') { + const meta = parseGoogleSheetsMetaLog(raw.message) + if (meta?.type === 'row_assignments') { + for (const [stream, assignments] of Object.entries(meta.assignments)) { + rowAssignments[stream] ??= {} + Object.assign(rowAssignments[stream], assignments) + } + } + } + } + + return { errors, state, written: queued.length, rowAssignments } + } +} diff --git a/apps/service/src/temporal/worker.ts b/apps/service/src/temporal/worker.ts index 3515501f..97a497be 100644 --- a/apps/service/src/temporal/worker.ts +++ b/apps/service/src/temporal/worker.ts @@ -1,5 +1,5 @@ import { NativeConnection, Worker } from '@temporalio/worker' -import { createActivities } from './activities.js' +import { createActivities } from './activities/index.js' export interface WorkerOptions { temporalAddress: string @@ -7,7 +7,7 @@ export interface WorkerOptions { taskQueue: string engineUrl: string kafkaBroker?: string - /** Path to compiled workflows.js (Temporal bundles it for V8 sandbox). */ + /** Path to a compiled workflow module or directory (Temporal bundles it for V8 sandbox). */ workflowsPath: string } diff --git a/apps/service/src/temporal/workflows/_shared.ts b/apps/service/src/temporal/workflows/_shared.ts new file mode 100644 index 00000000..9c1c0b6e --- /dev/null +++ b/apps/service/src/temporal/workflows/_shared.ts @@ -0,0 +1,55 @@ +import { defineQuery, defineSignal, proxyActivities } from '@temporalio/workflow' +import type { PipelineConfig } from '@stripe/sync-protocol' + +import type { SyncActivities } from '../activities/index.js' +import { retryPolicy } from '../../lib/utils.js' + +export interface WorkflowStatus { + phase: string + paused: boolean + iteration: number +} + +export type Pipeline = PipelineConfig & { + // Keep `id` on the workflow-local shape for now so configQuery still returns + // the full pipeline resource expected by the API and queue-backed activities + // can continue using it as the pipeline key. A cleaner split would derive + // this from Temporal workflow metadata, but that is a broader refactor. + id: string +} + +export type RowIndex = Record> + +export function toConfig(pipeline: Pipeline): PipelineConfig { + return { + source: pipeline.source, + destination: pipeline.destination, + streams: pipeline.streams, + } +} + +export const stripeEventSignal = defineSignal<[unknown]>('stripe_event') +export const updateSignal = defineSignal<[Partial]>('update') +export const deleteSignal = defineSignal('delete') + +export const statusQuery = defineQuery('status') +export const configQuery = defineQuery('config') +export const stateQuery = defineQuery>('state') + +export const { setup, teardown } = proxyActivities({ + startToCloseTimeout: '2m', + retry: retryPolicy, +}) + +export const { syncImmediate, readIntoQueue, writeFromQueue } = proxyActivities({ + startToCloseTimeout: '10m', + heartbeatTimeout: '2m', + retry: retryPolicy, +}) + +export const { discoverCatalog, readIntoQueueWithState, writeGoogleSheetsFromQueue } = + proxyActivities({ + startToCloseTimeout: '10m', + heartbeatTimeout: '2m', + retry: retryPolicy, + }) diff --git a/apps/service/src/temporal/workflows/index.ts b/apps/service/src/temporal/workflows/index.ts new file mode 100644 index 00000000..8cdb2cba --- /dev/null +++ b/apps/service/src/temporal/workflows/index.ts @@ -0,0 +1,2 @@ +export { pipelineWorkflow } from './pipeline.js' +export { pipelineGoogleSheetsWorkflow } from './pipeline-google-sheets.js' diff --git a/apps/service/src/temporal/workflows/pipeline-google-sheets.ts b/apps/service/src/temporal/workflows/pipeline-google-sheets.ts new file mode 100644 index 00000000..6fdfea9e --- /dev/null +++ b/apps/service/src/temporal/workflows/pipeline-google-sheets.ts @@ -0,0 +1,191 @@ +import { condition, continueAsNew, setHandler, sleep } from '@temporalio/workflow' +import type { ConfiguredCatalog } from '@stripe/sync-engine' + +import { + configQuery, + deleteSignal, + discoverCatalog, + Pipeline, + readIntoQueueWithState, + RowIndex, + setup, + stateQuery, + statusQuery, + stripeEventSignal, + teardown, + toConfig, + updateSignal, + WorkflowStatus, + writeGoogleSheetsFromQueue, +} from './_shared.js' +import { CONTINUE_AS_NEW_THRESHOLD, deepEqual, EVENT_BATCH_SIZE } from '../../lib/utils.js' + +export interface PipelineGoogleSheetsWorkflowOpts { + phase?: string + sourceState?: Record + readState?: Record + rowIndex?: RowIndex + catalog?: ConfiguredCatalog + pendingWrites?: boolean + inputQueue?: unknown[] + readComplete?: boolean + writeRps?: number +} + +export async function pipelineGoogleSheetsWorkflow( + pipeline: Pipeline, + opts?: PipelineGoogleSheetsWorkflowOpts +): Promise { + let paused = false + let deleted = false + const inputQueue: unknown[] = [...(opts?.inputQueue ?? [])] + let iteration = 0 + let sourceState: Record = opts?.sourceState ?? {} + let readState: Record = opts?.readState ?? { ...sourceState } + let rowIndex: RowIndex = opts?.rowIndex ?? {} + let catalog: ConfiguredCatalog | undefined = opts?.catalog + let readComplete = opts?.readComplete ?? false + let pendingWrites = opts?.pendingWrites ?? false + + setHandler(stripeEventSignal, (event: unknown) => { + inputQueue.push(event) + }) + setHandler(updateSignal, (patch: Partial) => { + if (patch.source) { + pipeline = { ...pipeline, source: patch.source } + catalog = undefined + readComplete = false + readState = { ...sourceState } + } + if (patch.destination) pipeline = { ...pipeline, destination: patch.destination } + if (patch.streams !== undefined) { + pipeline = { ...pipeline, streams: patch.streams } + catalog = undefined + readComplete = false + readState = { ...sourceState } + } + if ('paused' in (patch as Record)) { + paused = !!(patch as Record).paused + } + }) + setHandler(deleteSignal, () => { + deleted = true + }) + + const phase = opts?.phase ?? 'setup' + setHandler( + statusQuery, + (): WorkflowStatus => ({ + phase: phase === 'setup' && iteration > 0 ? 'running' : phase, + paused, + iteration, + }) + ) + setHandler(configQuery, (): Pipeline => pipeline) + setHandler(stateQuery, (): Record => sourceState) + + async function waitWhilePaused() { + await condition(() => !paused || deleted) + } + + async function tickIteration() { + iteration++ + if (iteration >= CONTINUE_AS_NEW_THRESHOLD) { + await continueAsNew(pipeline, { + phase: 'running', + sourceState, + readState, + rowIndex, + catalog, + pendingWrites, + inputQueue: inputQueue.length > 0 ? [...inputQueue] : undefined, + readComplete, + writeRps: opts?.writeRps, + }) + } + } + + if (phase !== 'running') { + const setupResult = await setup(toConfig(pipeline)) + if (setupResult.source) { + pipeline = { ...pipeline, source: { ...pipeline.source, ...setupResult.source } } + } + if (setupResult.destination) { + pipeline = { + ...pipeline, + destination: { ...pipeline.destination, ...setupResult.destination }, + } + } + catalog = await discoverCatalog(toConfig(pipeline)) + if (deleted) { + await teardown(toConfig(pipeline)) + return + } + } + + async function readLoop(): Promise { + while (!deleted) { + await waitWhilePaused() + if (deleted) break + + const config = toConfig(pipeline) + if (!catalog) catalog = await discoverCatalog(config) + + if (inputQueue.length > 0) { + const batch = inputQueue.splice(0, EVENT_BATCH_SIZE) + const { count } = await readIntoQueueWithState(config, pipeline.id, { + input: batch, + catalog, + }) + if (count > 0) pendingWrites = true + await tickIteration() + continue + } + + if (!readComplete) { + const before = readState + const { count, state: nextReadState } = await readIntoQueueWithState(config, pipeline.id, { + state: readState, + stateLimit: 1, + catalog, + }) + if (count > 0) pendingWrites = true + readState = { ...readState, ...nextReadState } + readComplete = deepEqual(readState, before) + await tickIteration() + continue + } + + await condition(() => inputQueue.length > 0 || deleted) + } + } + + async function writeLoop(): Promise { + while (!deleted) { + await waitWhilePaused() + if (deleted) break + + if (pendingWrites) { + if (!catalog) catalog = await discoverCatalog(toConfig(pipeline)) + const result = await writeGoogleSheetsFromQueue(toConfig(pipeline), pipeline.id, { + maxBatch: 50, + rowIndex, + catalog, + }) + pendingWrites = result.written > 0 + sourceState = { ...sourceState, ...result.state } + for (const [stream, assignments] of Object.entries(result.rowAssignments)) { + rowIndex[stream] ??= {} + Object.assign(rowIndex[stream], assignments) + } + if (opts?.writeRps) await sleep(Math.ceil(1000 / opts.writeRps)) + await tickIteration() + } else { + await condition(() => pendingWrites || deleted) + } + } + } + + await Promise.all([readLoop(), writeLoop()]) + await teardown(toConfig(pipeline)) +} diff --git a/apps/service/src/temporal/workflows.ts b/apps/service/src/temporal/workflows/pipeline.ts similarity index 64% rename from apps/service/src/temporal/workflows.ts rename to apps/service/src/temporal/workflows/pipeline.ts index b0bdac29..df54bbeb 100644 --- a/apps/service/src/temporal/workflows.ts +++ b/apps/service/src/temporal/workflows/pipeline.ts @@ -1,90 +1,35 @@ -import { - proxyActivities, - defineSignal, - defineQuery, - setHandler, - condition, - continueAsNew, - sleep, -} from '@temporalio/workflow' - -import type { SyncActivities } from './activities.js' +import { condition, continueAsNew, setHandler, sleep } from '@temporalio/workflow' -export interface WorkflowStatus { - phase: string - paused: boolean - iteration: number -} import { - deepEqual, - CONTINUE_AS_NEW_THRESHOLD, - EVENT_BATCH_SIZE, - retryPolicy, -} from '../lib/utils.js' - -// Setup/teardown: 2m with retry -const { setup, teardown } = proxyActivities({ - startToCloseTimeout: '2m', - retry: retryPolicy, -}) - -// Data activities: 10m with retry and heartbeat -const { syncImmediate, readIntoQueue, writeFromQueue } = proxyActivities({ - startToCloseTimeout: '10m', - heartbeatTimeout: '2m', - retry: retryPolicy, -}) - -// Pipeline type (matches lib/schemas.ts — keep in sync) -type SyncMode = 'incremental' | 'full_refresh' - -interface StreamDef { - name: string - sync_mode?: SyncMode - fields?: string[] -} - -interface Pipeline { - id: string - source: { type: string; [key: string]: unknown } - destination: { type: string; [key: string]: unknown } - streams?: StreamDef[] -} - -type PipelineConfig = { - source: { type: string; [key: string]: unknown } - destination: { type: string; [key: string]: unknown } - streams?: StreamDef[] + configQuery, + deleteSignal, + Pipeline, + readIntoQueue, + setup, + stateQuery, + statusQuery, + stripeEventSignal, + syncImmediate, + teardown, + toConfig, + updateSignal, + WorkflowStatus, + writeFromQueue, +} from './_shared.js' +import { CONTINUE_AS_NEW_THRESHOLD, deepEqual, EVENT_BATCH_SIZE } from '../../lib/utils.js' + +export interface PipelineWorkflowOpts { + phase?: string + state?: Record + mode?: 'sync' | 'read-write' + writeRps?: number + pendingWrites?: boolean + inputQueue?: unknown[] } -function toConfig(pipeline: Pipeline): PipelineConfig { - return { - source: pipeline.source, - destination: pipeline.destination, - streams: pipeline.streams, - } -} - -// Signals -export const stripeEventSignal = defineSignal<[unknown]>('stripe_event') -export const updateSignal = defineSignal<[Partial]>('update') -export const deleteSignal = defineSignal('delete') - -// Queries -export const statusQuery = defineQuery('status') -export const configQuery = defineQuery('config') -export const stateQuery = defineQuery>('state') - export async function pipelineWorkflow( pipeline: Pipeline, - opts?: { - phase?: string - state?: Record - mode?: 'sync' | 'read-write' - writeRps?: number - pendingWrites?: boolean - inputQueue?: unknown[] - } + opts?: PipelineWorkflowOpts ): Promise { let paused = false let deleted = false @@ -101,7 +46,6 @@ export async function pipelineWorkflow( // written:0 when the queue is actually empty, written:>0 when it's not). let pendingWrites = opts?.pendingWrites ?? false - // Register signal handlers (must be before any await) setHandler(stripeEventSignal, (event: unknown) => { inputQueue.push(event) }) @@ -117,7 +61,6 @@ export async function pipelineWorkflow( deleted = true }) - // Register query handlers const phase = opts?.phase ?? 'setup' setHandler( statusQuery, @@ -130,8 +73,6 @@ export async function pipelineWorkflow( setHandler(configQuery, (): Pipeline => pipeline) setHandler(stateQuery, (): Record => syncState) - // --- Helpers --- - async function waitWhilePaused() { await condition(() => !paused || deleted) } @@ -150,13 +91,10 @@ export async function pipelineWorkflow( } } - // --- Setup (first sync only) --- - const config = toConfig(pipeline) if (phase !== 'running') { const setupResult = await setup(config) - // Merge setup-provisioned fields (webhook_secret, account_id, spreadsheet_id, etc.) if (setupResult.source) { pipeline = { ...pipeline, source: { ...pipeline.source, ...setupResult.source } } } @@ -172,12 +110,7 @@ export async function pipelineWorkflow( } } - // --- Main loop --- - if (opts?.mode === 'read-write') { - // Concurrent read/write via Kafka queue — each loop runs at its own pace - // writeState: persisted pipeline state, only advanced after successful writes (source of truth) - // readState: pagination cursor for source reads, starts from writeState let writeState: Record = { ...syncState } let readState: Record = { ...writeState } @@ -188,7 +121,6 @@ export async function pipelineWorkflow( const config = toConfig(pipeline) - // Resolve events through read → Kafka if (inputQueue.length > 0) { const batch = inputQueue.splice(0, EVENT_BATCH_SIZE) const { count } = await readIntoQueue(config, pipeline.id, { input: batch }) @@ -197,7 +129,6 @@ export async function pipelineWorkflow( continue } - // Backfill one page → Kafka if (!readComplete) { const before = readState const { count, state: nextReadState } = await readIntoQueue(config, pipeline.id, { @@ -211,7 +142,6 @@ export async function pipelineWorkflow( continue } - // All caught up — wait for new events or delete await condition(() => inputQueue.length > 0 || deleted) } } @@ -230,7 +160,6 @@ export async function pipelineWorkflow( if (opts?.writeRps) await sleep(Math.ceil(1000 / opts.writeRps)) await tickIteration() } else { - // Wait until the read loop signals there's work, or we're deleted await condition(() => pendingWrites || deleted) } } @@ -238,15 +167,12 @@ export async function pipelineWorkflow( await Promise.all([readLoop(), writeLoop()]) } else { - // sync mode: combined read+write in a single activity call - while (true) { await waitWhilePaused() if (deleted) break const config = toConfig(pipeline) - // 1. Drain buffered events if (inputQueue.length > 0) { const batch = inputQueue.splice(0, EVENT_BATCH_SIZE) await syncImmediate(config, { input: batch }) @@ -254,7 +180,6 @@ export async function pipelineWorkflow( continue } - // 2. Reconciliation page if (!readComplete) { const before = syncState const result = await syncImmediate(config, { state: syncState, stateLimit: 1 }) @@ -264,11 +189,9 @@ export async function pipelineWorkflow( continue } - // 3. Wait await condition(() => inputQueue.length > 0 || deleted) } } - // Teardown on delete await teardown(toConfig(pipeline)) } diff --git a/docs/plans/2026-04-02-google-sheets-row-index-workflow.md b/docs/plans/2026-04-02-google-sheets-row-index-workflow.md new file mode 100644 index 00000000..90f30a90 --- /dev/null +++ b/docs/plans/2026-04-02-google-sheets-row-index-workflow.md @@ -0,0 +1,190 @@ +# Google Sheets Row-Index Workflow + +## Context + +The Google Sheets destination needs upsert behavior for repeated Stripe objects. +When the same object is seen again in a later sync, we want to overwrite the +existing row instead of appending a duplicate. + +Google Sheets does not provide native upsert semantics, and the destination +connector is intentionally stateless. It can read and write sheets, but it has +nowhere to persist "record X was previously written to row Y". + +That row mapping has to live somewhere durable outside the destination. + +## Constraints + +- The Google Sheets destination must remain stateless. +- We do not want to store Google-Sheets-specific metadata in + `packages/protocol`. +- We want to keep the Kafka-backed read/write split used by the service. +- The generic pipeline workflow should stay simple for destinations that do not + need row-index bookkeeping. +- Any solution has to survive workflow replay, retries, and continue-as-new. + +## Why the generic workflow was not enough + +The existing `pipelineWorkflow` keeps a single source checkpoint and assumes the +destination can consume records without extra destination-specific durable +state. + +That assumption breaks for Sheets: + +- source progress and row-index progress are not the same thing +- writes may lag reads, so source checkpoints cannot be advanced optimistically +- row assignments must survive workflow restarts and continue-as-new +- this logic only applies to one destination type + +Trying to fold all of that into the generic workflow would add Google +Sheets-specific state and branching to the default path used by every other +destination. + +## Decision + +Use a dedicated Temporal workflow for `google-sheets` pipelines. + +This workflow owns the Sheets-specific durable state: + +- `sourceState`: committed source checkpoint, only advanced after successful + writes +- `readState`: optimistic read cursor used while backfilling or processing + events +- `rowIndex`: `stream -> serialized primary key -> sheet row number` +- `catalog`: discovered stream metadata used to derive row keys + +The generic workflow remains unchanged for non-Sheets destinations. + +## Why workflow state is the right place + +Workflow state is the only place in the current architecture that satisfies all +requirements at once: + +- durable across retries and worker restarts +- local to the specific pipeline +- not exposed in the wire protocol +- safe to carry through `continueAsNew` +- able to coordinate source progress with destination progress + +This keeps connector isolation intact. The destination still only consumes +records and emits output messages. It does not learn about Temporal, Kafka, or +state storage. + +## Kafka stays in the design + +We considered bypassing the queue for Sheets, but kept Kafka for consistency +with the service's existing read/write split. + +The dedicated Sheets workflow still uses: + +1. `readIntoQueueWithState` to read from the source and enqueue ordered + `record` and `state` messages. +2. `writeGoogleSheetsFromQueue` to consume from Kafka, compact duplicate record + updates by key, inject known row numbers, and write to the destination. + +This preserves the operational model already used by the service while letting +Sheets add destination-specific bookkeeping on top. + +## Data flow + +### Read side + +- discover the configured catalog +- derive a stable `_row_key` from the stream primary key +- enqueue `record` and `state` messages to Kafka in source order +- update `readState` optimistically as source state messages arrive + +### Write side + +- consume a batch from Kafka +- compact duplicate records by `(stream, _row_key)` within the batch +- if the workflow already knows the row, inject `_row_number` +- send the records to the Google Sheets destination +- parse destination-emitted row assignments for newly appended rows +- merge those row assignments into `rowIndex` +- advance `sourceState` only after the write succeeds + +## Why `_row_key` and `_row_number` stay local to the Sheets workflow + +These fields are still needed on the write side: + +- `_row_key`: stable identifier derived from the stream primary key +- `_row_number`: known row number for updates + +But the generic engine write path should not know about them. To keep that +boundary intact, the dedicated Sheets write activity calls the Sheets +destination directly instead of routing through the generic engine `/write` +pipeline. + +Inside that activity we: + +- take the workflow-owned discovered catalog +- add the two Sheets-only metadata fields to a local copy of the catalog +- run catalog enforcement there +- pass the filtered records to the Sheets destination + +That keeps `_row_key` and `_row_number` as internal workflow transport +metadata, not engine-wide protocol behavior. The destination still strips them +before writing visible sheet cells. + +## Why the destination reports row assignments + +For new rows, the workflow does not know the final row number ahead of time. +The destination is authoritative because the sheet itself decides where appended +rows land. + +After appending, the destination emits structured metadata describing the row +assignments it observed. The workflow merges that into `rowIndex` and uses it on +future writes. + +This keeps the destination stateless while still making it the source of truth +for the exact append result. + +## API guardrails + +A Sheets pipeline's `rowIndex` is tied to a specific spreadsheet. + +Changing a live pipeline to point at a different spreadsheet would silently +reuse stale row mappings and corrupt writes. Because of that, changing the +target spreadsheet now requires recreating the pipeline. + +This is intentionally strict. + +## Alternatives considered + +### Store row numbers in `packages/protocol` + +Rejected because it would leak Google-Sheets-specific behavior into the shared +wire format and make connector isolation worse. + +### Let the destination persist its own mapping + +Rejected because the destination is intentionally stateless and has no access to +durable storage. + +### Reuse the generic workflow with destination-type conditionals + +Rejected because it pushes one destination's durability requirements into the +default path used by all destinations. + +### Make row number the primary key directly + +Rejected because row numbers are not stable source identifiers. They are derived +write locations that only become known after the destination has written data. +The stable key must come from the source primary key, not from the sheet. + +## Operational risks + +- manual row deletion or row reordering in the sheet can invalidate `rowIndex` +- the dedicated workflow adds another workflow type to service operations +- Kafka consumption still needs end-to-end coverage beyond unit and package + tests + +These are acceptable for now because the alternative was to embed Sheets +complexity into the generic pipeline path. + +## Outcome + +The implementation in PR #228 adds a dedicated Google Sheets workflow that +preserves connector isolation, keeps Kafka in the service design, stores the +minimum extra durable state needed to make row-based upserts work, and keeps +Sheets-only metadata out of the generic engine write path. diff --git a/packages/destination-google-sheets/.gitignore b/packages/destination-google-sheets/.gitignore deleted file mode 100644 index d1b87e4a..00000000 --- a/packages/destination-google-sheets/.gitignore +++ /dev/null @@ -1 +0,0 @@ -scripts/.state.json diff --git a/packages/destination-google-sheets/__tests__/memory-sheets.ts b/packages/destination-google-sheets/__tests__/memory-sheets.ts index 051ab270..adc878eb 100644 --- a/packages/destination-google-sheets/__tests__/memory-sheets.ts +++ b/packages/destination-google-sheets/__tests__/memory-sheets.ts @@ -45,6 +45,22 @@ export function createMemorySheets() { return bang >= 0 ? range.slice(0, bang) : range } + function parseStartRow(range: string): number { + const match = range.match(/(\d+)/) + return match ? Number(match[1]) : 1 + } + + function columnLabel(index: number): string { + let value = index + let label = '' + while (value > 0) { + const remainder = (value - 1) % 26 + label = String.fromCharCode(65 + remainder) + label + value = Math.floor((value - 1) / 26) + } + return label || 'A' + } + function getTab(spreadsheetId: string, range: string): SheetTab { const ss = getSpreadsheet(spreadsheetId) const name = parseSheetName(range) @@ -119,9 +135,9 @@ export function createMemorySheets() { }) { const tab = getTab(params.spreadsheetId, params.range) const rows = params.requestBody?.values ?? [] - // values.update at A1 replaces from the top + const startRow = parseStartRow(params.range) for (let i = 0; i < rows.length; i++) { - tab.values[i] = rows[i] + tab.values[startRow - 1 + i] = rows[i] } return { data: {} } }, @@ -135,8 +151,16 @@ export function createMemorySheets() { }) { const tab = getTab(params.spreadsheetId, params.range) const rows = params.requestBody?.values ?? [] + const startRow = tab.values.length + 1 tab.values.push(...rows) - return { data: {} } + const endRow = tab.values.length + return { + data: { + updates: { + updatedRange: `'${parseSheetName(params.range)}'!A${startRow}:${columnLabel(rows[0]?.length ?? 1)}${endRow}`, + }, + }, + } }, async get(params: { spreadsheetId: string; range: string }) { diff --git a/packages/destination-google-sheets/scripts/_state.ts b/packages/destination-google-sheets/scripts/_state.ts deleted file mode 100644 index 2fcd487f..00000000 --- a/packages/destination-google-sheets/scripts/_state.ts +++ /dev/null @@ -1,95 +0,0 @@ -// Shared helpers for the destination-google-sheets scripts. -// Loads .env and manages a local .state.json that acts as a fake DB for the sheet ID. - -import { readFileSync, writeFileSync, unlinkSync } from 'node:fs' -import { resolve, dirname } from 'node:path' -import { fileURLToPath } from 'node:url' - -const __dirname = dirname(fileURLToPath(import.meta.url)) -const STATE_FILE = resolve(__dirname, '.state.json') - -// ── Env loading ────────────────────────────────────────────────────────────── - -export function loadEnv(): void { - const envPath = resolve(__dirname, '../.env') - try { - const content = readFileSync(envPath, 'utf-8') - for (const line of content.split('\n')) { - const trimmed = line.trim() - if (!trimmed || trimmed.startsWith('#')) continue - const eqIdx = trimmed.indexOf('=') - if (eqIdx === -1) continue - const key = trimmed.slice(0, eqIdx).trim() - const value = trimmed.slice(eqIdx + 1).trim() - if (!(key in process.env)) process.env[key] = value - } - } catch { - // .env is optional - } -} - -// ── Sheet state ─────────────────────────────────────────────────────────────── - -export interface SheetState { - spreadsheet_id: string - /** Per-stream cursor state, persisted across sync calls for resumable pagination. */ - sync_state?: Record -} - -export function loadState(): SheetState | null { - try { - return JSON.parse(readFileSync(STATE_FILE, 'utf-8')) as SheetState - } catch { - return null - } -} - -export function saveState(state: SheetState): void { - writeFileSync(STATE_FILE, JSON.stringify(state, null, 2) + '\n') - console.error(`Saved state → ${STATE_FILE}`) -} - -export function clearState(): void { - try { - unlinkSync(STATE_FILE) - console.error(`Cleared state (${STATE_FILE})`) - } catch { - // already gone - } -} - -// ── Pipeline builder ────────────────────────────────────────────────────────── - -export function buildDestinationConfig(spreadsheetId?: string): Record { - return { - name: 'google-sheets', - client_id: process.env['GOOGLE_CLIENT_ID'], - client_secret: process.env['GOOGLE_CLIENT_SECRET'], - access_token: 'unused', - refresh_token: process.env['GOOGLE_REFRESH_TOKEN'], - ...(spreadsheetId ? { spreadsheet_id: spreadsheetId } : {}), - } -} - -export const STREAMS = ['products', 'customers', 'prices', 'subscriptions'] as const - -export function buildPipeline(spreadsheetId?: string): Record { - return { - source: { name: 'stripe', api_key: process.env['STRIPE_API_KEY'], backfill_limit: 10 }, - destination: buildDestinationConfig(spreadsheetId), - streams: STREAMS.map((name) => ({ name })), - } -} - -export function requireEnv(...keys: string[]): void { - const missing = keys.filter((k) => !process.env[k]) - if (missing.length > 0) { - console.error(`Error: missing required env vars: ${missing.join(', ')}`) - process.exit(1) - } -} - -export function getPort(): string { - const idx = process.argv.indexOf('--port') - return idx !== -1 ? process.argv[idx + 1] : '3000' -} diff --git a/packages/destination-google-sheets/scripts/check-via-server.ts b/packages/destination-google-sheets/scripts/check-via-server.ts deleted file mode 100644 index 82de3458..00000000 --- a/packages/destination-google-sheets/scripts/check-via-server.ts +++ /dev/null @@ -1,29 +0,0 @@ -#!/usr/bin/env node -// GET /check — validates credentials and sheet accessibility -// Usage: npx tsx scripts/check-via-server.ts [--port 3000] - -import { loadEnv, buildPipeline, requireEnv, loadState, getPort } from './_state.js' - -loadEnv() -requireEnv('STRIPE_API_KEY', 'GOOGLE_CLIENT_ID', 'GOOGLE_CLIENT_SECRET', 'GOOGLE_REFRESH_TOKEN') - -const state = loadState() -if (!state) { - console.error('No sheet state found — run setup-via-server.ts first') - process.exit(1) -} - -const serverUrl = `http://localhost:${getPort()}` -const pipeline = buildPipeline(state.spreadsheet_id) - -console.error(`Hitting ${serverUrl}/check ...`) -console.error(`Sheet: https://docs.google.com/spreadsheets/d/${state.spreadsheet_id}`) - -const res = await fetch(`${serverUrl}/check`, { - headers: { 'X-Pipeline': JSON.stringify(pipeline) }, -}) - -const result = await res.json() -console.log(JSON.stringify(result, null, 2)) - -if (res.status !== 200) process.exit(1) diff --git a/packages/destination-google-sheets/scripts/setup-via-server.ts b/packages/destination-google-sheets/scripts/setup-via-server.ts deleted file mode 100644 index f1dd3bbf..00000000 --- a/packages/destination-google-sheets/scripts/setup-via-server.ts +++ /dev/null @@ -1,31 +0,0 @@ -#!/usr/bin/env node -// POST /setup — creates a new Google Sheet, saves its ID to .state.json -// Usage: npx tsx scripts/setup-via-server.ts [--port 3000] - -import { loadEnv, buildPipeline, requireEnv, saveState, getPort } from './_state.js' - -loadEnv() -requireEnv('STRIPE_API_KEY', 'GOOGLE_CLIENT_ID', 'GOOGLE_CLIENT_SECRET', 'GOOGLE_REFRESH_TOKEN') - -const serverUrl = `http://localhost:${getPort()}` - -// No spreadsheet_id — setup always creates a new sheet -const pipeline = buildPipeline() - -console.error(`Hitting ${serverUrl}/setup ...`) - -const res = await fetch(`${serverUrl}/setup`, { - method: 'POST', - headers: { 'X-Pipeline': JSON.stringify(pipeline) }, -}) - -if (res.status === 200) { - const result = (await res.json()) as { spreadsheet_id: string } - saveState({ spreadsheet_id: result.spreadsheet_id }) - console.log(JSON.stringify(result, null, 2)) -} else { - const body = await res.text() - console.error(`Error: ${res.status} ${res.statusText}`) - if (body) console.error(body) - process.exit(1) -} diff --git a/packages/destination-google-sheets/scripts/sheet-size.ts b/packages/destination-google-sheets/scripts/sheet-size.ts deleted file mode 100644 index f147d3f3..00000000 --- a/packages/destination-google-sheets/scripts/sheet-size.ts +++ /dev/null @@ -1,67 +0,0 @@ -#!/usr/bin/env node -// Calculates total cell count across all sheets in the saved spreadsheet. -// -// Usage: npx tsx scripts/sheet-size.ts - -import { readFileSync } from 'node:fs' -import { resolve, dirname } from 'node:path' -import { fileURLToPath } from 'node:url' -import { google } from 'googleapis' - -const __dirname = dirname(fileURLToPath(import.meta.url)) - -// Load .env -const envPath = resolve(__dirname, '../.env') -try { - for (const line of readFileSync(envPath, 'utf-8').split('\n')) { - const trimmed = line.trim() - if (!trimmed || trimmed.startsWith('#')) continue - const eqIdx = trimmed.indexOf('=') - if (eqIdx === -1) continue - const key = trimmed.slice(0, eqIdx).trim() - const value = trimmed.slice(eqIdx + 1).trim() - if (!(key in process.env)) process.env[key] = value - } -} catch { - /* .env is optional */ -} - -// Load spreadsheet ID from .state.json -const stateFile = resolve(__dirname, '.state.json') -let spreadsheetId: string -try { - const state = JSON.parse(readFileSync(stateFile, 'utf-8')) as { spreadsheet_id: string } - spreadsheetId = state.spreadsheet_id -} catch { - console.error('No .state.json found — run setup-via-server.ts first') - process.exit(1) -} - -const auth = new google.auth.OAuth2( - process.env['GOOGLE_CLIENT_ID'], - process.env['GOOGLE_CLIENT_SECRET'] -) -auth.setCredentials({ refresh_token: process.env['GOOGLE_REFRESH_TOKEN'] }) -const sheets = google.sheets({ version: 'v4', auth }) - -// Fetch spreadsheet metadata (includes all sheet grid properties) -const res = await sheets.spreadsheets.get({ - spreadsheetId, - fields: 'sheets(properties(title,gridProperties))', -}) - -console.error(`Sheet: https://docs.google.com/spreadsheets/d/${spreadsheetId}\n`) - -let grandTotal = 0 -for (const sheet of res.data.sheets ?? []) { - const title = sheet.properties?.title ?? '(untitled)' - const rowCount = sheet.properties?.gridProperties?.rowCount ?? 0 - const columnCount = sheet.properties?.gridProperties?.columnCount ?? 0 - const cells = rowCount * columnCount - grandTotal += cells - console.error( - ` ${title}: ${rowCount} rows × ${columnCount} cols = ${cells.toLocaleString()} cells` - ) -} - -console.error(`\n Total: ${grandTotal.toLocaleString()} cells`) diff --git a/packages/destination-google-sheets/scripts/stripe-to-google-sheets.ts b/packages/destination-google-sheets/scripts/stripe-to-google-sheets.ts deleted file mode 100644 index 4144e9e9..00000000 --- a/packages/destination-google-sheets/scripts/stripe-to-google-sheets.ts +++ /dev/null @@ -1,89 +0,0 @@ -#!/usr/bin/env node -// Sync Stripe → Google Sheets via the sync-engine CLI. -// Reads credentials from packages/destination-google-sheets/.env -// -// Usage: npx tsx scripts/stripe-to-google-sheets.ts -// or: node --import tsx scripts/stripe-to-google-sheets.ts - -import { readFileSync } from 'node:fs' -import { resolve, dirname } from 'node:path' -import { fileURLToPath } from 'node:url' -import { execFileSync, spawnSync } from 'node:child_process' - -const __dirname = dirname(fileURLToPath(import.meta.url)) - -// Load .env from the package root -const envPath = resolve(__dirname, '../.env') -try { - const envContent = readFileSync(envPath, 'utf-8') - for (const line of envContent.split('\n')) { - const trimmed = line.trim() - if (!trimmed || trimmed.startsWith('#')) continue - const eqIdx = trimmed.indexOf('=') - if (eqIdx === -1) continue - const key = trimmed.slice(0, eqIdx).trim() - const value = trimmed.slice(eqIdx + 1).trim() - if (!(key in process.env)) process.env[key] = value - } -} catch { - // .env is optional; env vars may already be set -} - -const { - STRIPE_API_KEY, - GOOGLE_CLIENT_ID, - GOOGLE_CLIENT_SECRET, - GOOGLE_REFRESH_TOKEN, - GOOGLE_SPREADSHEET_ID, -} = process.env - -if (!STRIPE_API_KEY) { - console.error('Error: STRIPE_API_KEY is required (set it in .env or the environment)') - process.exit(1) -} - -// Fetch Stripe account ID -const accountRes = await fetch('https://api.stripe.com/v1/account', { - headers: { - Authorization: `Basic ${Buffer.from(`${STRIPE_API_KEY}:`).toString('base64')}`, - }, -}) -const account = (await accountRes.json()) as { id: string } -console.error(`Stripe: ${account.id}`) -console.error(`Sheet: https://docs.google.com/spreadsheets/d/${GOOGLE_SPREADSHEET_ID}`) - -const pipeline = JSON.stringify({ - source: { name: 'stripe', api_key: STRIPE_API_KEY, backfill_limit: 10 }, - destination: { - name: 'google-sheets', - client_id: GOOGLE_CLIENT_ID, - client_secret: GOOGLE_CLIENT_SECRET, - access_token: 'unused', - refresh_token: GOOGLE_REFRESH_TOKEN, - spreadsheet_id: GOOGLE_SPREADSHEET_ID, - }, - streams: [{ name: 'products' }, { name: 'customers' }], -}) - -const repoRoot = resolve(__dirname, '../../..') -const cliPath = resolve(repoRoot, 'apps/engine/src/cli/index.ts') - -// Use bun if available, else tsx -function hasBun(): boolean { - try { - execFileSync('bun', ['--version'], { stdio: 'ignore' }) - return true - } catch { - return false - } -} - -const tsxBin = resolve(repoRoot, 'node_modules/.bin/tsx') -const [cmd, ...cmdArgs] = hasBun() ? ['bun', cliPath] : [tsxBin, cliPath] - -const result = spawnSync(cmd, [...cmdArgs, 'sync', '--xPipeline', pipeline], { - stdio: 'inherit', - cwd: repoRoot, -}) - -process.exit(result.status ?? 1) diff --git a/packages/destination-google-sheets/scripts/sync-via-server.ts b/packages/destination-google-sheets/scripts/sync-via-server.ts deleted file mode 100644 index b648c7b8..00000000 --- a/packages/destination-google-sheets/scripts/sync-via-server.ts +++ /dev/null @@ -1,135 +0,0 @@ -#!/usr/bin/env node -// POST /sync — reads from Stripe and writes to Google Sheets, looping until all -// streams are complete. Uses X-State-Checkpoint-Limit: 1 to process one page at -// a time, persisting the cursor to .state.json between pages. -// -// On completion, reads each sheet and prints the row count for each stream. -// -// Usage: npx tsx scripts/sync-via-server.ts [--port 3000] - -import { google } from 'googleapis' -import { - loadEnv, - buildPipeline, - requireEnv, - loadState, - saveState, - getPort, - STREAMS, -} from './_state.js' -import { readSheet } from '../src/writer.js' - -loadEnv() -requireEnv('STRIPE_API_KEY', 'GOOGLE_CLIENT_ID', 'GOOGLE_CLIENT_SECRET', 'GOOGLE_REFRESH_TOKEN') - -const state = loadState() -if (!state) { - console.error('No sheet state found — run setup-via-server.ts first') - process.exit(1) -} - -const serverUrl = `http://localhost:${getPort()}` -console.error(`Sheet: https://docs.google.com/spreadsheets/d/${state.spreadsheet_id}`) - -// Run one page of sync, returns updated syncState -async function runOnePage(syncState: Record): Promise> { - const pipeline = buildPipeline(state!.spreadsheet_id) - const headers: Record = { - 'X-Pipeline': JSON.stringify(pipeline), - 'X-State-Checkpoint-Limit': '1', - } - if (Object.keys(syncState).length > 0) { - headers['X-State'] = JSON.stringify(syncState) - } - - const res = await fetch(`${serverUrl}/sync`, { method: 'POST', headers }) - if (!res.ok && !res.body) { - console.error(`Error: ${res.status} ${res.statusText}`) - process.exit(1) - } - - const updated = { ...syncState } - const reader = res.body!.getReader() - const decoder = new TextDecoder() - let buf = '' - - while (true) { - const { done, value } = await reader.read() - if (done) break - buf += decoder.decode(value, { stream: true }) - const lines = buf.split('\n') - buf = lines.pop() ?? '' - for (const line of lines) { - if (!line.trim()) continue - console.log(line) - try { - const msg = JSON.parse(line) as { type: string; stream?: string; data?: unknown } - if (msg.type === 'state' && msg.stream) updated[msg.stream] = msg.data - } catch { - /* non-JSON line */ - } - } - } - if (buf.trim()) { - console.log(buf) - try { - const msg = JSON.parse(buf) as { type: string; stream?: string; data?: unknown } - if (msg.type === 'state' && msg.stream) updated[msg.stream] = msg.data - } catch {} - } - - return updated -} - -function isAllComplete(syncState: Record): boolean { - return STREAMS.every( - (s) => (syncState[s] as { status?: string } | undefined)?.status === 'complete' - ) -} - -// Loop until all streams are complete -let syncState: Record = { ...(state.sync_state ?? {}) } -let page = 0 - -if (isAllComplete(syncState)) { - console.error('All streams already complete. Reset sync_state to re-sync.') - process.exit(0) -} - -console.error('Starting sync loop...') - -while (!isAllComplete(syncState)) { - page++ - const pending = STREAMS.filter( - (s) => (syncState[s] as { status?: string } | undefined)?.status !== 'complete' - ) - console.error(`[page ${page}] Syncing: ${pending.join(', ')}`) - - syncState = await runOnePage(syncState) - saveState({ spreadsheet_id: state.spreadsheet_id, sync_state: syncState }) -} - -console.error(`\nAll streams complete after ${page} page(s) — clearing sync cursor`) -saveState({ spreadsheet_id: state.spreadsheet_id }) - -// Read each sheet and print row counts -console.error('\nReading sheet row counts...') -const auth = new google.auth.OAuth2( - process.env['GOOGLE_CLIENT_ID'], - process.env['GOOGLE_CLIENT_SECRET'] -) -auth.setCredentials({ refresh_token: process.env['GOOGLE_REFRESH_TOKEN'] }) -const sheets = google.sheets({ version: 'v4', auth }) - -for (const stream of STREAMS) { - try { - const rows = await readSheet(sheets, state.spreadsheet_id, stream) - // Subtract 1 for the header row - const dataRows = Math.max(0, rows.length - 1) - console.error(` ${stream}: ${dataRows} rows`) - } catch (err) { - console.error( - ` ${stream}: error reading sheet — ${err instanceof Error ? err.message : String(err)}` - ) - } -} diff --git a/packages/destination-google-sheets/scripts/teardown-via-server.ts b/packages/destination-google-sheets/scripts/teardown-via-server.ts deleted file mode 100644 index e847399f..00000000 --- a/packages/destination-google-sheets/scripts/teardown-via-server.ts +++ /dev/null @@ -1,35 +0,0 @@ -#!/usr/bin/env node -// POST /teardown — permanently deletes the Google Sheet and clears local state -// Usage: npx tsx scripts/teardown-via-server.ts [--port 3000] - -import { loadEnv, buildPipeline, requireEnv, loadState, clearState, getPort } from './_state.js' - -loadEnv() -requireEnv('STRIPE_API_KEY', 'GOOGLE_CLIENT_ID', 'GOOGLE_CLIENT_SECRET', 'GOOGLE_REFRESH_TOKEN') - -const state = loadState() -if (!state) { - console.error('No sheet state found — nothing to tear down') - process.exit(1) -} - -const serverUrl = `http://localhost:${getPort()}` -const pipeline = buildPipeline(state.spreadsheet_id) - -console.error(`Hitting ${serverUrl}/teardown ...`) -console.error(`Deleting sheet: https://docs.google.com/spreadsheets/d/${state.spreadsheet_id}`) - -const res = await fetch(`${serverUrl}/teardown`, { - method: 'POST', - headers: { 'X-Pipeline': JSON.stringify(pipeline) }, -}) - -if (res.status === 204) { - clearState() - console.error('Teardown complete') -} else { - const body = await res.text() - console.error(`Error: ${res.status} ${res.statusText}`) - if (body) console.error(body) - process.exit(1) -} diff --git a/packages/destination-google-sheets/src/index.test.ts b/packages/destination-google-sheets/src/index.test.ts index 61d44d85..5182361f 100644 --- a/packages/destination-google-sheets/src/index.test.ts +++ b/packages/destination-google-sheets/src/index.test.ts @@ -1,6 +1,13 @@ -import type { DestinationInput, DestinationOutput } from '@stripe/sync-protocol' +import type { ConfiguredCatalog, DestinationInput, DestinationOutput } from '@stripe/sync-protocol' import { afterEach, beforeEach, describe, expect, it } from 'vitest' -import { createDestination, envVars, type Config } from './index.js' +import { + createDestination, + envVars, + parseGoogleSheetsMetaLog, + ROW_KEY_FIELD, + ROW_NUMBER_FIELD, + type Config, +} from './index.js' import { readSheet } from './writer.js' import { createMemorySheets } from '../__tests__/memory-sheets.js' @@ -247,6 +254,110 @@ describe('destination-google-sheets', () => { expect(logs).toHaveLength(1) expect(logs[0]).toMatchObject({ type: 'log', level: 'info' }) }) + + it('updates existing rows and emits row assignments for new appends', async () => { + const { sheets, getData } = createMemorySheets() + const dest = createDestination(sheets) + const configuredCatalog: ConfiguredCatalog = { + streams: [ + { + stream: { + name: 'customers', + primary_key: [['id']], + json_schema: { + type: 'object', + properties: { + id: { type: 'string' }, + name: { type: 'string' }, + }, + }, + }, + sync_mode: 'full_refresh', + destination_sync_mode: 'append', + }, + ], + } + + await collect( + dest.write( + { config: cfg(), catalog: configuredCatalog }, + toAsyncIter([ + record('customers', { + id: 'cus_1', + name: 'Alice', + [ROW_KEY_FIELD]: '["cus_1"]', + }), + ]) + ) + ) + + const output = await collect( + dest.write( + { + config: cfg({ spreadsheet_id: dest.spreadsheetId! }), + catalog: configuredCatalog, + }, + toAsyncIter([ + record('customers', { + id: 'cus_1', + name: 'Alice Updated', + [ROW_KEY_FIELD]: '["cus_1"]', + [ROW_NUMBER_FIELD]: 2, + }), + record('customers', { + id: 'cus_2', + name: 'Bob', + [ROW_KEY_FIELD]: '["cus_2"]', + }), + ]) + ) + ) + + const rows = getData(dest.spreadsheetId!, 'customers')! + expect(rows).toEqual([ + ['id', 'name'], + ['cus_1', 'Alice Updated'], + ['cus_2', 'Bob'], + ]) + + const metaLog = output.find((message) => message.type === 'log' && message.level === 'debug') + expect(metaLog).toBeDefined() + const meta = parseGoogleSheetsMetaLog((metaLog as { message: string }).message) + expect(meta).toEqual({ + type: 'row_assignments', + assignments: { customers: { '["cus_2"]': 3 } }, + }) + }) + + it('extends existing headers when a later write introduces new fields', async () => { + const { sheets, getData } = createMemorySheets() + const dest = createDestination(sheets) + + await collect( + dest.write( + { config: cfg(), catalog }, + toAsyncIter([record('customers', { id: 'cus_1', name: 'Alice' })]) + ) + ) + + await collect( + dest.write( + { config: cfg({ spreadsheet_id: dest.spreadsheetId! }), catalog }, + toAsyncIter([ + record('customers', { + id: 'cus_2', + name: 'Bob', + email: 'bob@test.invalid', + }), + ]) + ) + ) + + const rows = getData(dest.spreadsheetId!, 'customers')! + expect(rows[0]).toEqual(['id', 'name', 'email']) + expect(rows[1]).toEqual(['cus_1', 'Alice']) + expect(rows[2]).toEqual(['cus_2', 'Bob', 'bob@test.invalid']) + }) }) describe('envVars', () => { diff --git a/packages/destination-google-sheets/src/index.ts b/packages/destination-google-sheets/src/index.ts index e5ec5e84..7e26f9fa 100644 --- a/packages/destination-google-sheets/src/index.ts +++ b/packages/destination-google-sheets/src/index.ts @@ -11,6 +11,15 @@ import type { import type { sheets_v4 } from 'googleapis' import { google } from 'googleapis' import { z } from 'zod' +import { + GOOGLE_SHEETS_META_LOG_PREFIX, + formatGoogleSheetsMetaLog, + parseGoogleSheetsMetaLog, + ROW_KEY_FIELD, + ROW_NUMBER_FIELD, + serializeRowKey, + stripSystemFields, +} from './metadata.js' import { configSchema } from './spec.js' import type { Config } from './spec.js' import { @@ -20,6 +29,7 @@ import { ensureSheet, ensureSpreadsheet, protectSheets, + readHeaderRow, updateRows, } from './writer.js' @@ -28,11 +38,21 @@ export { ensureSheet, appendRows, updateRows, + readHeaderRow, readSheet, createIntroSheet, protectSheets, deleteSpreadsheet, } from './writer.js' +export { + GOOGLE_SHEETS_META_LOG_PREFIX, + formatGoogleSheetsMetaLog, + parseGoogleSheetsMetaLog, + ROW_KEY_FIELD, + ROW_NUMBER_FIELD, + serializeRowKey, + stripSystemFields, +} from './metadata.js' // MARK: - Spec @@ -77,6 +97,21 @@ function isTransient(err: unknown): boolean { return code === 429 || code >= 500 } +function extendHeaders( + existingHeaders: string[], + data: Record +): { headers: string[]; changed: boolean } { + const headers = [...existingHeaders] + let changed = false + for (const key of Object.keys(data)) { + if (!headers.includes(key)) { + headers.push(key) + changed = true + } + } + return { headers, changed } +} + // MARK: - Destination /** @@ -154,6 +189,12 @@ export function createDestination( ): AsyncIterable { const sheets = sheetsClient ?? makeSheetsClient(config) const batchSize = config.batch_size ?? 50 + const primaryKeys = new Map( + catalog.streams.map((configuredStream) => [ + configuredStream.stream.name, + configuredStream.stream.primary_key, + ]) + ) if (config.spreadsheet_id) { spreadsheetId = config.spreadsheet_id @@ -161,19 +202,85 @@ export function createDestination( spreadsheetId = await ensureSpreadsheet(sheets, config.spreadsheet_title) } - // Per-stream state: column headers and buffered rows + // Per-stream state: column headers plus buffered appends/updates. const streamHeaders = new Map() - const streamBuffers = new Map() + const appendBuffers = new Map>() + const updateBuffers = new Map>() + const rowAssignments: Record> = {} + + const ensureHeadersForRecord = async ( + streamName: string, + cleanData: Record + ): Promise => { + let headers = streamHeaders.get(streamName) + + if (!headers) { + try { + headers = await readHeaderRow(sheets, spreadsheetId!, streamName) + } catch (error) { + const code = + error instanceof Error && 'code' in error + ? (error as { code?: number }).code + : undefined + if (code !== 400 && code !== 404) throw error + headers = [] + } + + if (headers.length === 0) { + headers = Object.keys(cleanData) + await ensureSheet(sheets, spreadsheetId!, streamName, headers) + } + + streamHeaders.set(streamName, headers) + appendBuffers.set(streamName, []) + updateBuffers.set(streamName, []) + } + + const next = extendHeaders(headers, cleanData) + if (next.changed) { + await ensureSheet(sheets, spreadsheetId!, streamName, next.headers) + streamHeaders.set(streamName, next.headers) + headers = next.headers + } + + return headers + } const flushStream = async (streamName: string) => { - const buffer = streamBuffers.get(streamName) - if (!buffer || buffer.length === 0) return - await appendRows(sheets, spreadsheetId!, streamName, buffer) - streamBuffers.set(streamName, []) + const updates = updateBuffers.get(streamName) + if (updates && updates.length > 0) { + await updateRows(sheets, spreadsheetId!, streamName, updates) + updateBuffers.set(streamName, []) + } + + const appends = appendBuffers.get(streamName) + if (!appends || appends.length === 0) return + + const range = await appendRows( + sheets, + spreadsheetId!, + streamName, + appends.map((entry) => entry.row) + ) + if (range) { + const expectedEndRow = range.startRow + appends.length - 1 + if (range.endRow !== expectedEndRow) { + throw new Error( + `Append row mismatch for ${streamName}: expected ${expectedEndRow}, got ${range.endRow}` + ) + } + for (let index = 0; index < appends.length; index++) { + const rowKey = appends[index]?.rowKey + if (!rowKey) continue + rowAssignments[streamName] ??= {} + rowAssignments[streamName][rowKey] = range.startRow + index + } + } + appendBuffers.set(streamName, []) } const flushAll = async () => { - for (const streamName of streamBuffers.keys()) { + for (const streamName of new Set([...appendBuffers.keys(), ...updateBuffers.keys()])) { await flushStream(streamName) } } @@ -182,21 +289,28 @@ export function createDestination( for await (const msg of $stdin) { if (msg.type === 'record') { const { stream, data } = msg - - // First record for this stream — discover headers, create tab - if (!streamHeaders.has(stream)) { - const headers = Object.keys(data) - streamHeaders.set(stream, headers) - streamBuffers.set(stream, []) - await ensureSheet(sheets, spreadsheetId!, stream, headers) + const cleanData = stripSystemFields(data) + const headers = await ensureHeadersForRecord(stream, cleanData) + const row = headers.map((header) => stringify(cleanData[header])) + const rowNumber = + typeof data[ROW_NUMBER_FIELD] === 'number' ? data[ROW_NUMBER_FIELD] : undefined + const primaryKey = primaryKeys.get(stream) + const rowKey = + typeof data[ROW_KEY_FIELD] === 'string' + ? data[ROW_KEY_FIELD] + : primaryKey + ? serializeRowKey(primaryKey, cleanData) + : undefined + + if (rowNumber !== undefined) { + updateBuffers.get(stream)!.push({ rowNumber, values: row }) + } else { + appendBuffers.get(stream)!.push({ row, rowKey }) } - const headers = streamHeaders.get(stream)! - const row = headers.map((h) => stringify(data[h])) - const buffer = streamBuffers.get(stream)! - buffer.push(row) - - if (buffer.length >= batchSize) { + const appendCount = appendBuffers.get(stream)?.length ?? 0 + const updateCount = updateBuffers.get(stream)?.length ?? 0 + if (appendCount + updateCount >= batchSize) { await flushStream(stream) } } else if (msg.type === 'state') { @@ -223,6 +337,18 @@ export function createDestination( stack_trace: err instanceof Error ? err.stack : undefined, } yield errorMsg + return + } + + if (Object.keys(rowAssignments).length > 0) { + yield { + type: 'log', + level: 'debug', + message: formatGoogleSheetsMetaLog({ + type: 'row_assignments', + assignments: rowAssignments, + }), + } } const logMsg: LogMessage = { diff --git a/packages/destination-google-sheets/src/metadata.ts b/packages/destination-google-sheets/src/metadata.ts new file mode 100644 index 00000000..325e78bd --- /dev/null +++ b/packages/destination-google-sheets/src/metadata.ts @@ -0,0 +1,44 @@ +export const ROW_KEY_FIELD = '_row_key' +export const ROW_NUMBER_FIELD = '_row_number' +export const GOOGLE_SHEETS_META_LOG_PREFIX = '__sync_engine_google_sheets__:' + +export interface GoogleSheetsRowAssignmentsMeta { + type: 'row_assignments' + assignments: Record> +} + +function getPathValue(data: Record, path: string[]): unknown { + let current: unknown = data + for (const segment of path) { + if (!current || typeof current !== 'object') return undefined + current = (current as Record)[segment] + } + return current +} + +export function serializeRowKey(primaryKey: string[][], data: Record): string { + return JSON.stringify(primaryKey.map((path) => getPathValue(data, path))) +} + +export function stripSystemFields(data: Record): Record { + return Object.fromEntries( + Object.entries(data).filter(([key]) => key !== ROW_KEY_FIELD && key !== ROW_NUMBER_FIELD) + ) +} + +export function formatGoogleSheetsMetaLog(meta: GoogleSheetsRowAssignmentsMeta): string { + return `${GOOGLE_SHEETS_META_LOG_PREFIX}${JSON.stringify(meta)}` +} + +export function parseGoogleSheetsMetaLog( + message: string +): GoogleSheetsRowAssignmentsMeta | undefined { + if (!message.startsWith(GOOGLE_SHEETS_META_LOG_PREFIX)) return undefined + try { + return JSON.parse( + message.slice(GOOGLE_SHEETS_META_LOG_PREFIX.length) + ) as GoogleSheetsRowAssignmentsMeta + } catch { + return undefined + } +} diff --git a/packages/destination-google-sheets/src/writer.ts b/packages/destination-google-sheets/src/writer.ts index 4a2a7395..23e6fd24 100644 --- a/packages/destination-google-sheets/src/writer.ts +++ b/packages/destination-google-sheets/src/writer.ts @@ -132,6 +132,31 @@ async function writeHeaderRow( ) } +/** Read the first row from a sheet tab and treat it as headers. */ +export async function readHeaderRow( + sheets: sheets_v4.Sheets, + spreadsheetId: string, + sheetName: string +): Promise { + const res = await withRetry(() => + sheets.spreadsheets.values.get({ + spreadsheetId, + range: `'${sheetName}'!1:1`, + }) + ) + const [headerRow] = res.data.values ?? [] + return Array.isArray(headerRow) ? headerRow.map((value) => String(value)) : [] +} + +function parseUpdatedRows(updatedRange: string): { startRow: number; endRow: number } { + const match = updatedRange.match(/![A-Z]+(\d+)(?::[A-Z]+(\d+))?$/i) + if (!match) throw new Error(`Unable to parse updated range: ${updatedRange}`) + return { + startRow: Number(match[1]), + endRow: Number(match[2] ?? match[1]), + } +} + /** * Create or update an "Overview" intro tab at index 0. * Lists the synced streams and warns users not to edit data tabs. @@ -252,10 +277,10 @@ export async function appendRows( spreadsheetId: string, sheetName: string, rows: unknown[][] -): Promise { +): Promise<{ startRow: number; endRow: number } | undefined> { if (rows.length === 0) return - await withRetry(() => + const res = await withRetry(() => sheets.spreadsheets.values.append({ spreadsheetId, range: `'${sheetName}'!A1`, @@ -264,6 +289,8 @@ export async function appendRows( requestBody: { values: rows }, }) ) + const updatedRange = res.data.updates?.updatedRange + return updatedRange ? parseUpdatedRows(updatedRange) : undefined } /** @@ -278,17 +305,16 @@ export async function updateRows( ): Promise { if (updates.length === 0) return - const data = updates.map(({ rowNumber, values }) => ({ - range: `'${sheetName}'!A${rowNumber}`, - values: [values], - })) - - await withRetry(() => - sheets.spreadsheets.values.batchUpdate({ - spreadsheetId, - requestBody: { valueInputOption: 'RAW', data }, - }) - ) + for (const update of updates) { + await withRetry(() => + sheets.spreadsheets.values.update({ + spreadsheetId, + range: `'${sheetName}'!A${update.rowNumber}`, + valueInputOption: 'RAW', + requestBody: { values: [update.values] }, + }) + ) + } } /** diff --git a/packages/destination-google-sheets/tsconfig.scripts.json b/packages/destination-google-sheets/tsconfig.scripts.json deleted file mode 100644 index fdf99a20..00000000 --- a/packages/destination-google-sheets/tsconfig.scripts.json +++ /dev/null @@ -1,8 +0,0 @@ -{ - "extends": "../../tsconfig.base.json", - "compilerOptions": { - "noEmit": true, - "types": ["node"] - }, - "include": ["scripts/**/*"] -}