diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md new file mode 100644 index 00000000..17882f6f --- /dev/null +++ b/.github/pull_request_template.md @@ -0,0 +1,19 @@ +## Summary + + + +- + +## How to test (optional) + + + +- + +## Related + + + +- Closes # + +> Thanks for contributing โค๏ธ diff --git a/README.md b/README.md index e674df26..053ac90c 100644 --- a/README.md +++ b/README.md @@ -12,65 +12,27 @@ This monorepo contains packages for synchronizing your Stripe account with a Pos --- -## Motivation - -Sometimes you want to analyze your billing data using SQL. Even more importantly, you want to join your billing data to your product/business data. - -This project synchronizes your Stripe account to a PostgreSQL database. It can be a new database, or an existing PostgreSQL database. - ---- - ## Quick Start -The easiest way to sync Stripe data to PostgreSQL: +The easiest way to sync Stripe data to PostgreSQL is using the CLI. It will run a full historical backfill, and optionally stay alive to stream real-time events. -```typescript -import { StripeSync } from 'stripe-experiment-sync' +### CLI (Webhook Mode) -const sync = new StripeSync({ - poolConfig: { - connectionString: process.env.DATABASE_URL, - max: 10, - }, - stripeSecretKey: process.env.STRIPE_SECRET_KEY, -}) - -// Create a managed webhook - automatically syncs all Stripe events -const webhook = await sync.webhook.findOrCreateManagedWebhook('https://example.com/stripe-webhooks') - -// Cleanup when done -await sync.close() +```bash +npx stripe-experiment-sync sync \ + --stripe-key $STRIPE_API_KEY \ + --database-url $DATABASE_URL \ + --listen-mode webhook \ + --ngrok-token $NGROK_AUTH_TOKEN ``` -### Manual Webhook Processing +### CLI (Websocket Mode) -If you need to process webhooks in your own Express/Node.js app: - -```typescript -import express from 'express' -import { StripeSync } from 'stripe-experiment-sync' - -const app = express() -const sync = new StripeSync({ - poolConfig: { - connectionString: process.env.DATABASE_URL, - max: 10, - }, - stripeSecretKey: process.env.STRIPE_SECRET_KEY, -}) - -app.post('/stripe-webhooks', express.raw({ type: 'application/json' }), async (req, res) => { - const signature = req.headers['stripe-signature'] - - try { - await sync.webhook.processWebhook(req.body, signature) - res.status(200).send({ received: true }) - } catch (error) { - res.status(400).send({ error: error.message }) - } -}) - -app.listen(3000) +```bash +npx stripe-experiment-sync sync \ + --stripe-key $STRIPE_API_KEY \ + --database-url $DATABASE_URL \ + --listen-mode websocket ``` ### Supabase Edge Functions @@ -84,24 +46,6 @@ npx stripe-experiment-sync supabase install \ --stripe-key $STRIPE_API_KEY ``` -### CLI Commands - -```bash -# Run database migrations -npx stripe-experiment-sync migrate --database-url $DATABASE_URL - -# Start local sync with ngrok tunnel -npx stripe-experiment-sync start \ - --stripe-key $STRIPE_API_KEY \ - --ngrok-token $NGROK_AUTH_TOKEN \ - --database-url $DATABASE_URL - -# Backfill historical data -npx stripe-experiment-sync backfill customer \ - --stripe-key $STRIPE_API_KEY \ - --database-url $DATABASE_URL -``` - --- ## Configuration Options @@ -127,9 +71,8 @@ npx stripe-experiment-sync backfill customer \ - Automatically runs database migrations to create the `stripe` schema with tables matching Stripe objects. - Creates managed webhooks in Stripe for automatic event synchronization. - Processes webhook events and syncs data to PostgreSQL in real-time. -- Supports backfilling historical data from Stripe. +- Supports syncing historical data from Stripe. - Tracks sync runs and provides observability into sync operations. -- Built-in retry logic for rate limits and transient errors. --- @@ -142,123 +85,19 @@ Each package has its own README with installation, configuration, and usage inst --- -## Supabase Edge Function Deployment - -Deploy the sync engine to Supabase Edge Functions for serverless operation with automatic webhook processing. See the [sync-engine README](./packages/sync-engine/README.md#supabase-deployment) for detailed instructions. - -```bash -npx stripe-experiment-sync supabase install \ - --token $SUPABASE_ACCESS_TOKEN \ - --project $SUPABASE_PROJECT_REF \ - --stripe-key $STRIPE_API_KEY -``` +## Webhook Support ---- +For the full event matrix (supported, unsupported, and caveats), see [`docs/webhook-event-support.md`](./docs/webhook-event-support.md). -## Webhook Support +## Syncing Data -- [ ] `balance.available` -- [x] `charge.captured` ๐ŸŸข -- [x] `charge.expired` ๐ŸŸข -- [x] `charge.failed` ๐ŸŸข -- [x] `charge.pending` ๐ŸŸข -- [x] `charge.refunded` ๐ŸŸข -- [x] `charge.refund.updated` ๐ŸŸก - For updates on all refunds, listen to `refund.updated` instead -- [x] `charge.succeeded` ๐ŸŸข -- [x] `charge.updated` ๐ŸŸข -- [x] `charge.dispute.closed` ๐ŸŸข -- [x] `charge.dispute.created` ๐ŸŸข -- [x] `charge.dispute.funds_reinstated` ๐ŸŸข -- [x] `charge.dispute.funds_withdrawn` ๐ŸŸข -- [x] `charge.dispute.updated` ๐ŸŸข -- [x] `checkout.session.async_payment_failed` ๐ŸŸข -- [x] `checkout.session.async_payment_succeeded` ๐ŸŸข -- [x] `checkout.session.completed` ๐ŸŸข -- [x] `credit_note.created` ๐ŸŸข -- [x] `credit_note.updated` ๐ŸŸข -- [x] `credit_note.voided` ๐ŸŸข -- [x] `customer.created` ๐ŸŸข -- [x] `customer.deleted` ๐ŸŸข -- [ ] `customer.source.created` -- [ ] `customer.source.updated` -- [x] `customer.subscription.created` ๐ŸŸข -- [x] `customer.subscription.deleted` ๐ŸŸข -- [x] `customer.subscription.paused` ๐ŸŸข -- [x] `customer.subscription.pending_update_applied` ๐ŸŸข -- [x] `customer.subscription.pending_update_expired` ๐ŸŸข -- [x] `customer.subscription.resumed` ๐ŸŸข -- [x] `customer.subscription.trial_will_end` ๐ŸŸข -- [x] `customer.subscription.updated` ๐ŸŸข -- [x] `customer.tax_id.created` ๐ŸŸข -- [x] `customer.tax_id.deleted` ๐ŸŸข -- [x] `customer.tax_id.updated` ๐ŸŸข -- [x] `customer.updated` ๐ŸŸข -- [x] `invoice.created` ๐ŸŸข -- [x] `invoice.deleted` ๐ŸŸข -- [x] `invoice.finalized` ๐ŸŸข -- [x] `invoice.finalization_failed` ๐ŸŸข -- [x] `invoice.marked_uncollectible` ๐ŸŸข -- [x] `invoice.paid` ๐ŸŸข -- [x] `invoice.payment_action_required` ๐ŸŸข -- [x] `invoice.payment_failed` ๐ŸŸข -- [x] `invoice.payment_succeeded` ๐ŸŸข -- [x] `invoice.sent` ๐ŸŸข -- [x] `invoice.upcoming` โ€” Acknowledged and skipped (preview object with no `id`) -- [x] `invoice.updated` ๐ŸŸข -- [x] `invoice.overdue` ๐ŸŸข -- [x] `invoice.overpaid` ๐ŸŸข -- [x] `invoice.will_be_due` ๐ŸŸข -- [x] `invoice.voided` ๐ŸŸข -- [ ] `issuing_authorization.request` -- [ ] `issuing_card.created` -- [ ] `issuing_cardholder.created` -- [x] `payment_intent.amount_capturable_updated` ๐ŸŸข -- [x] `payment_intent.canceled` ๐ŸŸข -- [x] `payment_intent.created` ๐ŸŸข -- [x] `payment_intent.partially_refunded` ๐ŸŸข -- [x] `payment_intent.payment_failed` ๐ŸŸข -- [x] `payment_intent.processing` ๐ŸŸข -- [x] `payment_intent.requires_action` ๐ŸŸข -- [x] `payment_intent.succeeded` ๐ŸŸข -- [x] `payment_method.attached` ๐ŸŸข -- [x] `payment_method.automatically_updated` ๐ŸŸข -- [x] `payment_method.detached` ๐ŸŸข -- [x] `payment_method.updated` ๐ŸŸข -- [x] `plan.created` ๐ŸŸข -- [x] `plan.deleted` ๐ŸŸข -- [x] `plan.updated` ๐ŸŸข -- [x] `price.created` ๐ŸŸข -- [x] `price.deleted` ๐ŸŸข -- [x] `price.updated` ๐ŸŸข -- [x] `product.created` ๐ŸŸข -- [x] `product.deleted` ๐ŸŸข -- [x] `product.updated` ๐ŸŸข -- [x] `radar.early_fraud_warning.created` ๐ŸŸข -- [x] `radar.early_fraud_warning.updated` ๐ŸŸข -- [x] `refund.created` ๐ŸŸข -- [x] `refund.failed` ๐ŸŸข -- [x] `refund.updated` ๐ŸŸข -- [x] `review.opened` ๐ŸŸข -- [x] `review.closed` ๐ŸŸข -- [x] `setup_intent.canceled` ๐ŸŸข -- [x] `setup_intent.created` ๐ŸŸข -- [x] `setup_intent.requires_action` ๐ŸŸข -- [x] `setup_intent.setup_failed` ๐ŸŸข -- [x] `setup_intent.succeeded` ๐ŸŸข -- [x] `subscription_schedule.aborted` ๐ŸŸข -- [x] `subscription_schedule.canceled` ๐ŸŸข -- [x] `subscription_schedule.completed` ๐ŸŸข -- [x] `subscription_schedule.created` ๐ŸŸข -- [x] `subscription_schedule.expiring` ๐ŸŸข -- [x] `subscription_schedule.released` ๐ŸŸข -- [x] `subscription_schedule.updated` ๐ŸŸข -- [x] `entitlements.active_entitlement_summary.updated` ๐ŸŸข +For the current supported Stripe object types, see [`packages/sync-engine/README.md#syncing-data`](./packages/sync-engine/README.md#syncing-data). --- ## Contributing -Issues and pull requests are welcome at [https://github.com/stripe-experiments/sync-engine](https://github.com/stripe-experiments/sync-engine). +Issues and pull requests are welcome at [`stripe-experiments/sync-engine`](https://github.com/stripe-experiments/sync-engine). ## License diff --git a/docs/contributing.md b/docs/contributing.md index c7f075b3..062e47a5 100644 --- a/docs/contributing.md +++ b/docs/contributing.md @@ -1,35 +1,27 @@ # Contributing -Contributions are welcome! This document provides guidelines for contributing to the Stripe Sync Engine project. +Thanks for your interest in Stripe Sync Engine. -## Contributing to the Docs +This project is currently accepting code contributions only. Docs are maintained by project maintainers. -Building documentation requires Python 3.8+ and uv. +## Quick Workflow -### Install Dependencies +1. Fork the repo and create a branch. +2. Make your changes. +3. Run relevant checks locally. +4. Open a pull request with a clear description. -Create a virtual environment and install mkdocs, themes, and extensions using uv. +## Development Setup -```sh -source .venv/bin/activate # On Windows: .venv\Scripts\activate -uv pip install -r docs/requirements_docs.txt -``` - -### Serving - -To serve the documentation locally, make sure your virtual environment is activated and run: +This repo uses `pnpm` workspaces. ```sh -source .venv/bin/activate # On Windows: .venv\Scripts\activate -mkdocs serve +pnpm install ``` -and visit the docs at [http://127.0.0.1:8000/](http://127.0.0.1:8000/) - -### Deploying - -If you have write access to the repository, documentation can be updated using: +Common checks from the repo root: ```sh -mkdocs gh-deploy +pnpm lint +pnpm test ``` diff --git a/docs/edge-function.md b/docs/edge-function.md index 00f5c017..102791ac 100644 --- a/docs/edge-function.md +++ b/docs/edge-function.md @@ -1,99 +1,48 @@ # With Supabase Edge Functions -Create a new [Supabase](https://supabase.com) project and create a new [Edge Function](https://supabase.com/docs/guides/functions/quickstart). +Deploy Sync Engine to Supabase using managed Edge Functions for webhook ingestion and background workers. -## Prepare your database +## Prerequisites -Make sure to run the [migrations](../packages/sync-engine/src/database/migrations/), either by executing them manually, adding them into your CI, or running this locally once: +- A Supabase project +- A Stripe account and API key +- `stripe-experiment-sync` CLI installed (or run through `npx`) +- `SUPABASE_ACCESS_TOKEN` and `SUPABASE_PROJECT_REF` -```ts -import { runMigrations } from '@supabase/stripe-sync-engine' -;(async () => { - await runMigrations({ - databaseUrl: 'postgresql://postgres:..@db..supabase.co:5432/postgre', - logger: console, - }) -})() -``` - -## Usage - -Sample code: - -```ts -// Setup type definitions for built-in Supabase Runtime APIs -import 'jsr:@supabase/functions-js/edge-runtime.d.ts' -import { StripeSync } from 'npm:@supabase/stripe-sync-engine@0.37.2' - -// Load secrets from environment variables -const stripeWebhookSecret = Deno.env.get('STRIPE_WEBHOOK_SECRET')! -const stripeSecretKey = Deno.env.get('STRIPE_SECRET_KEY')! - -// Initialize StripeSync -const stripeSync = new StripeSync({ - poolConfig: { - connectionString: Deno.env.get('DATABASE_URL')!, - max: 20, - keepAlive: true, - // optional SSL configuration - ssl: { - ca: Buffer.from(Deno.env.get('DATABASE_SSL_CA')!).toString('utf-8'), - }, - }, - stripeWebhookSecret, - stripeSecretKey, - backfillRelatedEntities: false, - autoExpandLists: true, - maxPostgresConnections: 5, -}) - -Deno.serve(async (req) => { - // Extract raw body as Uint8Array (buffer) - const rawBody = new Uint8Array(await req.arrayBuffer()) +## Deploy - const stripeSignature = req.headers.get('stripe-signature') +Use the CLI to install the Supabase integration: - await stripeSync.webhook.processWebhook(rawBody, stripeSignature) - - return new Response(null, { - status: 202, - headers: { 'Content-Type': 'application/json' }, - }) -}) +```bash +npx stripe-experiment-sync supabase install \ + --token $SUPABASE_ACCESS_TOKEN \ + --project $SUPABASE_PROJECT_REF \ + --stripe-key $STRIPE_API_KEY ``` -Deploy your Edge Function initially. - -Set up a Stripe webhook with the newly deployed Supabase Edge Function URL. +What this does: -Create a new .env file in the `supabase` directory. +- Deploys webhook and worker Edge Functions +- Runs database migrations +- Configures webhook handling for Stripe events +- Stores required secrets in Supabase -```.env -DATABASE_URL="postgresql://postgres:..@db..supabase.co:5432/postgres" -DATABASE_SSL_CA="" -STRIPE_WEBHOOK_SECRET="whsec_" -STRIPE_SECRET_KEY="sk_test_..." -``` +## Update Stripe Webhook Endpoint -Load the secrets: +After deployment, configure your Stripe webhook endpoint to point to the deployed Supabase webhook function URL. -```sh -supabase secrets set --env-file ./supabase/.env -``` +## Remove Installation -> **Note:** -> Replace `` with your actual base64-encoded certificate. +To remove the Supabase deployment: -### Generating Base64 from CA Certificate - -To generate a base64-encoded CA certificate, follow these steps: - -1. Obtain the CA certificate file (e.g., `prod-ca-2021.crt`). -2. Use the following command on Unix-based systems: +```bash +npx stripe-experiment-sync supabase uninstall \ + --token $SUPABASE_ACCESS_TOKEN \ + --project $SUPABASE_PROJECT_REF \ + --stripe-key $STRIPE_API_KEY +``` - ```sh - base64 -i prod-ca-2021.crt -o CA.base64 - ``` +## Notes -3. Open the `CA.base64` file and copy its contents. -4. Use the base64 string in your configuration or environment variables. +- Webhook mode in local development may require ngrok; Supabase deployment does not. +- For full CLI options and flags, see `packages/sync-engine/src/cli/README.md`. diff --git a/docs/index.md b/docs/index.md index e31ab8a7..5e1f614a 100644 --- a/docs/index.md +++ b/docs/index.md @@ -2,114 +2,28 @@ # Stripe Sync Engine -Sometimes you want to analyze your billing data using SQL. Even more importantly, you want to join your billing data to your product/business data. +Sync Stripe data into PostgreSQL for analytics, reporting, and operational workflows. -This project synchronizes your Stripe account to a PostgreSQL database. It can be a new database, or an existing PostgreSQL database. +This project keeps your Stripe resources in sync with a `stripe` schema in Postgres and supports both one-off backfills and continuous sync. --- -## How it works +## How It Works ![How it works](./sync-engine-how.png) -- Creates a new schema `stripe` in a PostgreSQL database, with tables and columns matching Stripe. -- Exposes a `/webhooks` endpoint that listens to any Stripe webhooks (via the Fastify app). -- Inserts, updates, or deletes changes into the tables whenever there is a change to Stripe. +- Creates and migrates the `stripe` schema in your Postgres database +- Processes Stripe events through webhook or websocket listeners +- Upserts Stripe objects into Postgres tables designed for queryability +- Tracks sync progress with resumable status metadata -## Webhook Support +--- + +## Documentation -- [ ] `balance.available` -- [x] `charge.captured` ๐ŸŸข -- [x] `charge.expired` ๐ŸŸข -- [x] `charge.failed` ๐ŸŸข -- [x] `charge.pending` ๐ŸŸข -- [x] `charge.refunded` ๐ŸŸข -- [x] `charge.refund.updated` ๐ŸŸก - For updates on all refunds, listen to `refund.updated` instead -- [x] `charge.succeeded` ๐ŸŸข -- [x] `charge.updated` ๐ŸŸข -- [x] `charge.dispute.closed` ๐ŸŸข -- [x] `charge.dispute.created` ๐ŸŸข -- [x] `charge.dispute.funds_reinstated` ๐ŸŸข -- [x] `charge.dispute.funds_withdrawn` ๐ŸŸข -- [x] `charge.dispute.updated` ๐ŸŸข -- [x] `checkout.session.async_payment_failed` ๐ŸŸข -- [x] `checkout.session.async_payment_succeeded` ๐ŸŸข -- [x] `checkout.session.completed` ๐ŸŸข -- [x] `credit_note.created` ๐ŸŸข -- [x] `credit_note.updated` ๐ŸŸข -- [x] `credit_note.voided` ๐ŸŸข -- [x] `customer.created` ๐ŸŸข -- [x] `customer.deleted` ๐ŸŸข -- [ ] `customer.source.created` -- [ ] `customer.source.updated` -- [x] `customer.subscription.created` ๐ŸŸข -- [x] `customer.subscription.deleted` ๐ŸŸข -- [x] `customer.subscription.paused` ๐ŸŸข -- [x] `customer.subscription.pending_update_applied` ๐ŸŸข -- [x] `customer.subscription.pending_update_expired` ๐ŸŸข -- [x] `customer.subscription.resumed` ๐ŸŸข -- [x] `customer.subscription.trial_will_end` ๐ŸŸข -- [x] `customer.subscription.updated` ๐ŸŸข -- [x] `customer.tax_id.created` ๐ŸŸข -- [x] `customer.tax_id.deleted` ๐ŸŸข -- [x] `customer.tax_id.updated` ๐ŸŸข -- [x] `customer.updated` ๐ŸŸข -- [x] `invoice.created` ๐ŸŸข -- [x] `invoice.deleted` ๐ŸŸข -- [x] `invoice.finalized` ๐ŸŸข -- [x] `invoice.finalization_failed` ๐ŸŸข -- [x] `invoice.marked_uncollectible` ๐ŸŸข -- [x] `invoice.paid` ๐ŸŸข -- [x] `invoice.payment_action_required` ๐ŸŸข -- [x] `invoice.payment_failed` ๐ŸŸข -- [x] `invoice.payment_succeeded` ๐ŸŸข -- [x] `invoice.sent` ๐ŸŸข -- [x] `invoice.upcoming` โ€” Acknowledged and skipped (preview object with no `id`) -- [x] `invoice.updated` ๐ŸŸข -- [x] `invoice.overdue` ๐ŸŸข -- [x] `invoice.overpaid` ๐ŸŸข -- [x] `invoice.will_be_due` ๐ŸŸข -- [x] `invoice.voided` ๐ŸŸข -- [ ] `issuing_authorization.request` -- [ ] `issuing_card.created` -- [ ] `issuing_cardholder.created` -- [x] `payment_intent.amount_capturable_updated` ๐ŸŸข -- [x] `payment_intent.canceled` ๐ŸŸข -- [x] `payment_intent.created` ๐ŸŸข -- [x] `payment_intent.partially_refunded` ๐ŸŸข -- [x] `payment_intent.payment_failed` ๐ŸŸข -- [x] `payment_intent.processing` ๐ŸŸข -- [x] `payment_intent.requires_action` ๐ŸŸข -- [x] `payment_intent.succeeded` ๐ŸŸข -- [x] `payment_method.attached` ๐ŸŸข -- [x] `payment_method.automatically_updated` ๐ŸŸข -- [x] `payment_method.detached` ๐ŸŸข -- [x] `payment_method.updated` ๐ŸŸข -- [x] `plan.created` ๐ŸŸข -- [x] `plan.deleted` ๐ŸŸข -- [x] `plan.updated` ๐ŸŸข -- [x] `price.created` ๐ŸŸข -- [x] `price.deleted` ๐ŸŸข -- [x] `price.updated` ๐ŸŸข -- [x] `product.created` ๐ŸŸข -- [x] `product.deleted` ๐ŸŸข -- [x] `product.updated` ๐ŸŸข -- [x] `radar.early_fraud_warning.created` ๐ŸŸข -- [x] `radar.early_fraud_warning.updated` ๐ŸŸข -- [x] `refund.created` ๐ŸŸข -- [x] `refund.failed` ๐ŸŸข -- [x] `refund.updated` ๐ŸŸข -- [x] `review.opened` ๐ŸŸข -- [x] `review.closed` ๐ŸŸข -- [x] `setup_intent.canceled` ๐ŸŸข -- [x] `setup_intent.created` ๐ŸŸข -- [x] `setup_intent.requires_action` ๐ŸŸข -- [x] `setup_intent.setup_failed` ๐ŸŸข -- [x] `setup_intent.succeeded` ๐ŸŸข -- [x] `subscription_schedule.aborted` ๐ŸŸข -- [x] `subscription_schedule.canceled` ๐ŸŸข -- [x] `subscription_schedule.completed` ๐ŸŸข -- [x] `subscription_schedule.created` ๐ŸŸข -- [x] `subscription_schedule.expiring` ๐ŸŸข -- [x] `subscription_schedule.released` ๐ŸŸข -- [x] `subscription_schedule.updated` ๐ŸŸข +- [TypeScript usage](./typescript.md) +- [Docker deployment](./docker.md) +- [Supabase Edge Functions](./edge-function.md) +- [Postgres schema reference](./postgres-schema.md) +- [Webhook event support matrix](./webhook-event-support.md) +- [Contributing](./contributing.md) diff --git a/docs/roadmap.md b/docs/roadmap.md deleted file mode 100644 index 9c12e734..00000000 --- a/docs/roadmap.md +++ /dev/null @@ -1,18 +0,0 @@ -## Next Week - -- Allow table name to be customizable. -- Incremental CDC-style backfill to avoid re-pulling full historical datasets. -- Use JSONB for storing all of the data and allow user to create generated columns as necessary for future proofing. This ensures that read-only columns stay read-only because they are generated and also allow user to drop them without breaking the sync. -- Automatically creating and dropping webhooks -- Integration with ngrok as needed. - -## Next Month - -- Support additional data destinations such as MySQL, Firestore, and others. -- Dedicated documentation website to really demonstrate how this works, including support for PG Lite to allow it to work in the browser. - -## Future ideas - -- UI dashboard for monitoring sync health, failed webhooks, and retry status. -- Proxied Stripe API client that can read from the local cache first and fall back to Stripe when data is stale or missing. -- Pluggable transform layer so teams can normalize or mask Stripe data before persistence. diff --git a/docs/webhook-event-support.md b/docs/webhook-event-support.md new file mode 100644 index 00000000..6a208b5c --- /dev/null +++ b/docs/webhook-event-support.md @@ -0,0 +1,106 @@ +# Webhook Event Support + +This page tracks webhook events and their support status in Sync Engine. + +- `๐ŸŸข` Supported +- `๐ŸŸก` Partially supported or has caveats +- `๐Ÿ”ด` Not processable due to Stripe event limitations + +## Event Matrix + +- [ ] `balance.available` +- [x] `charge.captured` ๐ŸŸข +- [x] `charge.expired` ๐ŸŸข +- [x] `charge.failed` ๐ŸŸข +- [x] `charge.pending` ๐ŸŸข +- [x] `charge.refunded` ๐ŸŸข +- [x] `charge.refund.updated` ๐ŸŸก - For updates on all refunds, listen to `refund.updated` instead +- [x] `charge.succeeded` ๐ŸŸข +- [x] `charge.updated` ๐ŸŸข +- [x] `charge.dispute.closed` ๐ŸŸข +- [x] `charge.dispute.created` ๐ŸŸข +- [x] `charge.dispute.funds_reinstated` ๐ŸŸข +- [x] `charge.dispute.funds_withdrawn` ๐ŸŸข +- [x] `charge.dispute.updated` ๐ŸŸข +- [x] `checkout.session.async_payment_failed` ๐ŸŸข +- [x] `checkout.session.async_payment_succeeded` ๐ŸŸข +- [x] `checkout.session.completed` ๐ŸŸข +- [x] `credit_note.created` ๐ŸŸข +- [x] `credit_note.updated` ๐ŸŸข +- [x] `credit_note.voided` ๐ŸŸข +- [x] `customer.created` ๐ŸŸข +- [x] `customer.deleted` ๐ŸŸข +- [ ] `customer.source.created` +- [ ] `customer.source.updated` +- [x] `customer.subscription.created` ๐ŸŸข +- [x] `customer.subscription.deleted` ๐ŸŸข +- [x] `customer.subscription.paused` ๐ŸŸข +- [x] `customer.subscription.pending_update_applied` ๐ŸŸข +- [x] `customer.subscription.pending_update_expired` ๐ŸŸข +- [x] `customer.subscription.resumed` ๐ŸŸข +- [x] `customer.subscription.trial_will_end` ๐ŸŸข +- [x] `customer.subscription.updated` ๐ŸŸข +- [x] `customer.tax_id.created` ๐ŸŸข +- [x] `customer.tax_id.deleted` ๐ŸŸข +- [x] `customer.tax_id.updated` ๐ŸŸข +- [x] `customer.updated` ๐ŸŸข +- [x] `invoice.created` ๐ŸŸข +- [x] `invoice.deleted` ๐ŸŸข +- [x] `invoice.finalized` ๐ŸŸข +- [x] `invoice.finalization_failed` ๐ŸŸข +- [x] `invoice.marked_uncollectible` ๐ŸŸข +- [x] `invoice.paid` ๐ŸŸข +- [x] `invoice.payment_action_required` ๐ŸŸข +- [x] `invoice.payment_failed` ๐ŸŸข +- [x] `invoice.payment_succeeded` ๐ŸŸข +- [x] `invoice.sent` ๐ŸŸข +- [x] `invoice.upcoming` โ€” Acknowledged and skipped (preview object with no `id`) +- [x] `invoice.updated` ๐ŸŸข +- [x] `invoice.overdue` ๐ŸŸข +- [x] `invoice.overpaid` ๐ŸŸข +- [x] `invoice.will_be_due` ๐ŸŸข +- [x] `invoice.voided` ๐ŸŸข +- [ ] `issuing_authorization.request` +- [ ] `issuing_card.created` +- [ ] `issuing_cardholder.created` +- [x] `payment_intent.amount_capturable_updated` ๐ŸŸข +- [x] `payment_intent.canceled` ๐ŸŸข +- [x] `payment_intent.created` ๐ŸŸข +- [x] `payment_intent.partially_refunded` ๐ŸŸข +- [x] `payment_intent.payment_failed` ๐ŸŸข +- [x] `payment_intent.processing` ๐ŸŸข +- [x] `payment_intent.requires_action` ๐ŸŸข +- [x] `payment_intent.succeeded` ๐ŸŸข +- [x] `payment_method.attached` ๐ŸŸข +- [x] `payment_method.automatically_updated` ๐ŸŸข +- [x] `payment_method.detached` ๐ŸŸข +- [x] `payment_method.updated` ๐ŸŸข +- [x] `plan.created` ๐ŸŸข +- [x] `plan.deleted` ๐ŸŸข +- [x] `plan.updated` ๐ŸŸข +- [x] `price.created` ๐ŸŸข +- [x] `price.deleted` ๐ŸŸข +- [x] `price.updated` ๐ŸŸข +- [x] `product.created` ๐ŸŸข +- [x] `product.deleted` ๐ŸŸข +- [x] `product.updated` ๐ŸŸข +- [x] `radar.early_fraud_warning.created` ๐ŸŸข +- [x] `radar.early_fraud_warning.updated` ๐ŸŸข +- [x] `refund.created` ๐ŸŸข +- [x] `refund.failed` ๐ŸŸข +- [x] `refund.updated` ๐ŸŸข +- [x] `review.opened` ๐ŸŸข +- [x] `review.closed` ๐ŸŸข +- [x] `setup_intent.canceled` ๐ŸŸข +- [x] `setup_intent.created` ๐ŸŸข +- [x] `setup_intent.requires_action` ๐ŸŸข +- [x] `setup_intent.setup_failed` ๐ŸŸข +- [x] `setup_intent.succeeded` ๐ŸŸข +- [x] `subscription_schedule.aborted` ๐ŸŸข +- [x] `subscription_schedule.canceled` ๐ŸŸข +- [x] `subscription_schedule.completed` ๐ŸŸข +- [x] `subscription_schedule.created` ๐ŸŸข +- [x] `subscription_schedule.expiring` ๐ŸŸข +- [x] `subscription_schedule.released` ๐ŸŸข +- [x] `subscription_schedule.updated` ๐ŸŸข +- [x] `entitlements.active_entitlement_summary.updated` ๐ŸŸข diff --git a/packages/sync-engine/README.md b/packages/sync-engine/README.md index efe1888c..b24a9d2c 100644 --- a/packages/sync-engine/README.md +++ b/packages/sync-engine/README.md @@ -3,19 +3,9 @@ ![GitHub License](https://img.shields.io/github/license/stripe-experiments/sync-engine) ![NPM Version](https://img.shields.io/npm/v/stripe-experiment-sync) -A TypeScript library to synchronize Stripe data into a PostgreSQL database, designed for use in Node.js backends and serverless environments. +Sync Stripe data into PostgreSQL from the command line. -## Features - -- **Managed Webhooks:** Automatic webhook creation and lifecycle management with built-in processing -- **Real-time Sync:** Keep your database in sync with Stripe automatically -- **Backfill Support:** Sync historical data from Stripe to your database -- **Stripe Sigma:** Support for Stripe Sigma reporting data -- **Supabase Ready:** Deploy to Supabase Edge Functions with one command -- **Automatic Retries:** Built-in retry logic for rate limits and transient errors -- **Observability:** Track sync runs and monitor progress - -## Installation +## Install ```sh npm install stripe-experiment-sync stripe @@ -25,291 +15,53 @@ pnpm add stripe-experiment-sync stripe yarn add stripe-experiment-sync stripe ``` -## Quick Start - -```ts -import { StripeSync } from 'stripe-experiment-sync' - -const sync = new StripeSync({ - poolConfig: { - connectionString: 'postgres://user:pass@host:port/db', - max: 10, - }, - stripeSecretKey: 'sk_test_...', -}) - -// Create a managed webhook - no additional processing needed! -const webhook = await sync.webhook.findOrCreateManagedWebhook('https://example.com/stripe-webhooks') - -// Cleanup when done (closes PostgreSQL connection pool) -await sync.close() -``` - -## Managed Webhooks - -The Stripe Sync Engine automatically manages webhook endpoints and their processing. Once created, managed webhooks handle everything automatically - you don't need to manually process events. - -### Creating Managed Webhooks - -```typescript -// Create or reuse an existing webhook endpoint -// This webhook will automatically sync all Stripe events to your database -const webhook = await sync.webhook.findOrCreateManagedWebhook('https://example.com/stripe-webhooks') - -// Create a webhook for specific events -const webhook = await sync.createManagedWebhook('https://example.com/stripe-webhooks', { - enabled_events: ['customer.created', 'customer.updated', 'invoice.paid'], -}) - -console.log(webhook.id) // we_xxx -console.log(webhook.secret) // whsec_xxx -``` - -**โš ๏ธ Important:** Managed webhooks are tracked in the database and automatically process incoming events. You don't need to call `processWebhook()` for managed webhooks - the library handles this internally. - -### Managing Webhooks - -```typescript -// List all managed webhooks -const webhooks = await sync.webhook.listManagedWebhooks() - -// Get a specific webhook -const webhook = await sync.getManagedWebhook('we_xxx') - -// Delete a managed webhook -await sync.deleteManagedWebhook('we_xxx') -``` - -### How It Works - -**Automatic Processing:** Managed webhooks are stored in the `stripe._managed_webhooks` table. When Stripe sends events to these webhooks, they are automatically processed and synced to your database. - -**Race Condition Protection:** PostgreSQL advisory locks prevent race conditions when multiple instances call `findOrCreateManagedWebhook()` concurrently. A unique constraint on `(url, account_id)` provides additional safety. - -**Automatic Cleanup:** When you call `findOrCreateManagedWebhook()`, it will: - -1. Check if a webhook already exists for the URL in the database -2. If found, reuse the existing webhook -3. If not found, create a new webhook in Stripe and record it -4. Clean up any orphaned webhooks from previous installations - -## Manual Webhook Processing - -If you need to process webhooks outside of managed webhooks (e.g., for testing or custom integrations): - -```typescript -// Validate and process a webhook event -app.post('/stripe-webhooks', async (req, res) => { - const signature = req.headers['stripe-signature'] - const payload = req.body - - try { - await sync.webhook.processWebhook(payload, signature) - res.status(200).send({ received: true }) - } catch (error) { - res.status(400).send({ error: error.message }) - } -}) - -// Or process an event directly (no signature validation) -await sync.processEvent(stripeEvent) - -// Cleanup when done -await sync.close() -``` - -**Note:** This is only needed for custom webhook endpoints. Managed webhooks handle processing automatically. - -## Configuration - -| Option | Type | Description | -| ------------------------------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | -| `poolConfig` | object | **Required.** PostgreSQL connection pool configuration. Supports `connectionString`, `max`, `keepAlive`. See [Node-Postgres Pool API](https://node-postgres.com/apis/pool). | -| `stripeSecretKey` | string | **Required.** Stripe secret key | -| `stripeWebhookSecret` | string | Stripe webhook signing secret (only needed for manual webhook processing) | -| `stripeApiVersion` | string | Stripe API version (default: `2020-08-27`) | -| `enableSigma` | boolean | Enable Stripe Sigma reporting data sync. Default: false | -| `autoExpandLists` | boolean | Fetch all list items from Stripe (not just the default 10) | -| `backfillRelatedEntities` | boolean | Ensure related entities exist for foreign key integrity | -| `revalidateObjectsViaStripeApi` | Array | Always fetch latest data from Stripe instead of trusting webhook payload. Possible values: charge, credit_note, customer, dispute, invoice, payment_intent, payment_method, plan, price, product, refund, review, radar.early_fraud_warning, setup_intent, subscription, subscription_schedule, tax_id | -| `logger` | Logger | Logger instance (pino-compatible) | - -## Database Schema - -The library creates and manages a `stripe` schema in PostgreSQL with tables for all supported Stripe objects. - -> **Important:** The schema name is fixed as `stripe` and cannot be configured. - -> **Note:** Fields and tables prefixed with `_` are reserved for internal metadata: `_account_id`, `_last_synced_at`, `_updated_at`, `_migrations`, `_managed_webhooks`, `_sync_runs`, `_sync_obj_runs`. - -### Running Migrations - -```ts -import { runMigrations } from 'stripe-experiment-sync' - -await runMigrations({ databaseUrl: 'postgres://...' }) -``` - -### Observability - -Track sync operations with the `sync_runs` view: - -```sql -SELECT - account_id, - started_at, - closed_at, - status, -- 'running', 'complete', or 'error' - total_processed, -- Total records synced - complete_count, -- Completed object types - error_count, -- Object types with errors - running_count, -- Currently syncing - pending_count -- Not yet started -FROM stripe.sync_runs -ORDER BY started_at DESC; -``` - -## Syncing Data - -### Sync a Single Entity - -```ts -// Automatically detects entity type from ID prefix -await sync.syncSingleEntity('cus_12345') -await sync.syncSingleEntity('prod_xyz') -``` - -### Backfill Historical Data - -```ts -// Sync all products -await sync.fullSync(['product']) - -// Sync all customers -await sync.fullSync(['customer']) - -// Sync everything -await sync.fullSync() -``` - -Supported objects: `all`, `charge`, `checkout_sessions`, `credit_note`, `customer`, `customer_with_entitlements`, `dispute`, `early_fraud_warning`, `invoice`, `payment_intent`, `payment_method`, `plan`, `price`, `product`, `refund`, `setup_intent`, `subscription`, `subscription_schedules`, `tax_id`. - -The sync engine tracks cursors per account and resource, enabling incremental syncing that resumes after interruptions. - -For paged backfills, the engine keeps a separate per-run pagination cursor (`page_cursor`) while the -incremental cursor continues to track the highest `created` timestamp. - -> **Tip:** For large Stripe accounts (>10,000 objects), loop through date ranges day-by-day to avoid timeouts. - -## Account Management - -### Get Current Account - -```ts -const account = await sync.getCurrentAccount() -console.log(account.id) // acct_xxx -``` - -### List Synced Accounts - -```ts -const accounts = await sync.getAllSyncedAccounts() -``` - -### Delete Account Data +## Run Sync (CLI) -**โš ๏ธ WARNING:** This permanently deletes all synced data for an account. +Set environment variables: -```ts -// Preview deletion -const preview = await sync.dangerouslyDeleteSyncedAccountData('acct_xxx', { - dryRun: true, -}) -console.log(preview.deletedRecordCounts) - -// Actually delete -const result = await sync.dangerouslyDeleteSyncedAccountData('acct_xxx') -``` - -## Supabase Deployment - -Deploy to Supabase Edge Functions for serverless operation with automatic webhook processing: - -```bash -# Install -npx stripe-experiment-sync supabase install \ - --token $SUPABASE_ACCESS_TOKEN \ - --project $SUPABASE_PROJECT_REF \ - --stripe-key $STRIPE_API_KEY - -# Install specific version -npx stripe-experiment-sync supabase install \ - --token $SUPABASE_ACCESS_TOKEN \ - --project $SUPABASE_PROJECT_REF \ - --stripe-key $STRIPE_API_KEY \ - --package-version 1.0.15 - -# Uninstall -npx stripe-experiment-sync supabase uninstall \ - --token $SUPABASE_ACCESS_TOKEN \ - --project $SUPABASE_PROJECT_REF +```sh +export STRIPE_API_KEY=sk_live_xxx +export DATABASE_URL=postgres://... ``` -### Install Options - -- `--token ` - Supabase access token (or `SUPABASE_ACCESS_TOKEN` env) -- `--project ` - Supabase project ref (or `SUPABASE_PROJECT_REF` env) -- `--stripe-key ` - Stripe API key (or `STRIPE_API_KEY` env) -- `--package-version ` - npm package version (default: latest) -- `--worker-interval ` - Worker interval in seconds (default: 60) -- `--management-url ` - Supabase management API URL with protocol (default: https://api.supabase.com). For local testing: http://localhost:54323 - -The install command will: +Then run either command: -1. Deploy Edge Functions: `stripe-setup`, `stripe-webhook`, `stripe-worker` -2. Run database migrations to create the `stripe` schema -3. Create a managed Stripe webhook pointing to your Supabase project -4. Set up a pg_cron job for automatic background syncing - -### Required Permissions - -The Supabase access token must have the following Management API permissions: - -| Permission | Used For | -| ------------------------------ | ------------------------------------------------------------ | -| `projects_read` | Validate project access and existence | -| `edge_functions_write` | Deploy Edge Functions (setup, webhook, worker, sigma-worker) | -| `edge_functions_secrets_write` | Set Stripe API key and configuration secrets | -| `database_write` | Execute database migrations and schema setup | -| `database_read` | Check schema existence and verify installation | -| `api_gateway_keys_read` | Retrieve project's anon API key | - -**Note:** When generating a personal access token or using OAuth2, ensure these permissions are granted. For OAuth2 tokens, these correspond to the fine-grained token permissions on the Management API. - -## CLI Commands - -```bash -# Run database migrations -npx stripe-experiment-sync migrate --database-url $DATABASE_URL - -# Start local sync with ngrok tunnel -npx stripe-experiment-sync start \ +```sh +# 1) Sync everything +npx stripe-experiment-sync sync \ --stripe-key $STRIPE_API_KEY \ - --ngrok-token $NGROK_AUTH_TOKEN \ --database-url $DATABASE_URL -# Backfill specific entity type -npx stripe-experiment-sync backfill customer \ +# 2) Sync one object type +npx stripe-experiment-sync sync customer \ --stripe-key $STRIPE_API_KEY \ --database-url $DATABASE_URL - -# Enable Sigma data syncing -npx stripe-experiment-sync start \ - --stripe-key $STRIPE_API_KEY \ - --database-url $DATABASE_URL \ - --sigma ``` +> **Note:** `sync` automatically applies any pending database migrations before syncing data. + +## Supported Objects + +- `all` +- `charge` +- `checkout_sessions` +- `credit_note` +- `customer` +- `customer_with_entitlements` +- `dispute` +- `early_fraud_warning` +- `invoice` +- `payment_intent` +- `payment_method` +- `plan` +- `price` +- `product` +- `refund` +- `setup_intent` +- `subscription` +- `subscription_schedules` +- `tax_id` + ## License See [LICENSE](LICENSE) file. diff --git a/packages/sync-engine/package.json b/packages/sync-engine/package.json index 81126915..3a551ce8 100644 --- a/packages/sync-engine/package.json +++ b/packages/sync-engine/package.json @@ -32,8 +32,7 @@ "test": "vitest", "test:integration": "vitest run src/tests/integration/", "test:e2e": "pnpm run build && vitest run --config vitest.e2e.config.ts && vitest run --config vitest.e2e.webhook.config.ts", - "start": "pnpm run build && node dist/cli/index.js backfill", - "full-sync": "pnpm run build && node dist/cli/index.js full-sync", + "sync": "pnpm run build && node dist/cli/index.js sync", "generate:sigma-schema": "tsx src/sigma/schema/fetch-schema.ts" }, "files": [ diff --git a/packages/sync-engine/src/cli/README.md b/packages/sync-engine/src/cli/README.md index 0f710d11..e11c72c9 100644 --- a/packages/sync-engine/src/cli/README.md +++ b/packages/sync-engine/src/cli/README.md @@ -1,12 +1,12 @@ # stripe-experiment-sync CLI -CLI tool for syncing Stripe data to PostgreSQL with real-time webhook streaming and Supabase Edge Functions deployment. +CLI tool for syncing Stripe data to PostgreSQL with optional real-time event streaming and Supabase Edge Functions deployment. ## Features -- ๐Ÿ”„ Real-time Stripe webhook streaming to PostgreSQL +- ๐Ÿ”„ Full historical backfill from Stripe to PostgreSQL +- ๐Ÿ“ก Optional real-time event streaming via WebSocket or ngrok webhook - ๐Ÿš€ Automatic table creation and migrations -- ๐ŸŒ Built-in ngrok tunnel for local development - ๐Ÿ” Secure webhook signature verification - ๐Ÿงน Automatic cleanup on exit - โ˜๏ธ Supabase Edge Functions deployment for serverless webhook handling @@ -19,46 +19,57 @@ npm install -g stripe-experiment-sync ## Commands -### Local Development - -#### Start Webhook Server +### Run Migrations ```bash -stripe-experiment-sync start [options] +stripe-experiment-sync migrate [options] Options: - --stripe-key Stripe API key (or STRIPE_API_KEY env) - --ngrok-token ngrok auth token (or NGROK_AUTH_TOKEN env) --database-url Postgres DATABASE_URL (or DATABASE_URL env) + --sigma Create Sigma tables during migration ``` -Starts a local webhook server with ngrok tunnel for real-time Stripe event syncing. +Runs database migrations to create Stripe schema tables. -#### Run Migrations +### Sync Data ```bash -stripe-experiment-sync migrate [options] +stripe-experiment-sync sync [entityName] [options] + +Arguments: + entityName Optional Stripe entity to sync (e.g., customer, invoice, product) Options: + --stripe-key Stripe API key (or STRIPE_API_KEY env) --database-url Postgres DATABASE_URL (or DATABASE_URL env) + --sigma Enable Sigma tables + --interval Skip resync if a successful run completed within this many seconds (default: 86400) + --worker-count Number of parallel sync workers (default: 50) + --rate-limit Max requests per second (default: 25) + --listen-mode Event listener mode: websocket, webhook, or disabled (default: disabled) ``` -Runs database migrations to create Stripe schema tables. +Syncs data from Stripe to PostgreSQL. When called without an entity name, syncs all supported objects. When an entity name is provided, syncs only that entity. -#### Backfill Data +The `--listen-mode` option controls real-time event streaming after the backfill completes: -```bash -stripe-experiment-sync backfill [options] +- **`disabled`** (default) โ€” performs the backfill and exits. +- **`websocket`** โ€” connects directly to Stripe via WebSocket (no ngrok needed). After the backfill, the process stays alive streaming live changes. +- **`webhook`** โ€” creates an ngrok tunnel and Express server to receive webhook events. Requires `NGROK_AUTH_TOKEN`. After the backfill, the process stays alive streaming live changes. -Arguments: - object Stripe object to backfill (e.g., customers, products, prices) +If a successful sync run completed within the `--interval` window, the backfill is skipped entirely. + +### Monitor + +```bash +stripe-experiment-sync monitor [options] Options: - --stripe-key Stripe API key (or STRIPE_API_KEY env) --database-url Postgres DATABASE_URL (or DATABASE_URL env) + --stripe-key Stripe API key (or STRIPE_API_KEY env) ``` -Backfills historical data from Stripe to PostgreSQL. +Live display of table row counts in the stripe schema. ### Supabase Deployment @@ -68,14 +79,18 @@ Backfills historical data from Stripe to PostgreSQL. stripe-experiment-sync supabase install [options] Options: - --token Supabase access token (or SUPABASE_ACCESS_TOKEN env) - --project Supabase project ref (or SUPABASE_PROJECT_REF env) - --stripe-key Stripe API key (or STRIPE_API_KEY env) + --token Supabase access token (or SUPABASE_ACCESS_TOKEN env) + --project Supabase project ref (or SUPABASE_PROJECT_REF env) + --stripe-key Stripe API key (or STRIPE_API_KEY env) --worker-interval Worker interval in seconds (defaults to 60) - Valid values: 1-59 (seconds) or multiples of 60 up to 3540 (59 minutes) + --sync-interval Full resync interval in seconds (default: 604800 = 1 week) + --rate-limit Max Stripe API requests per second (default: 60) + --sigma Enable Sigma sync + --management-url Supabase management API URL (or SUPABASE_MANAGEMENT_URL env) + --package-version Package version to install (defaults to latest) ``` -Deploys Stripe sync engine as Supabase Edge Functions. The worker interval controls how frequently the pg_cron job invokes the worker function to process sync operations. Intervals of 1-59 seconds use pg_cron's interval format, while minute-based intervals (60, 120, 180, etc.) use cron format. +Deploys Stripe sync engine as Supabase Edge Functions. The worker interval controls how frequently the pg_cron job invokes the worker function to process sync operations. #### Uninstall from Supabase @@ -85,6 +100,7 @@ stripe-experiment-sync supabase uninstall [options] Options: --token Supabase access token (or SUPABASE_ACCESS_TOKEN env) --project Supabase project ref (or SUPABASE_PROJECT_REF env) + --management-url Supabase management API URL (or SUPABASE_MANAGEMENT_URL env) ``` Removes Stripe sync Edge Functions from Supabase. @@ -94,11 +110,11 @@ Removes Stripe sync Edge Functions from Supabase. Create a `.env` file in your project (see `.env.sample`): ```env -# Required for all commands +# Required for sync commands STRIPE_API_KEY=sk_test_... DATABASE_URL=postgresql://user:password@localhost:5432/mydb -# Required for local development (start command) +# Required for webhook listen mode NGROK_AUTH_TOKEN=your_ngrok_token # Required for Supabase deployment @@ -109,57 +125,46 @@ SUPABASE_PROJECT_REF=your_project_ref Then run commands without options: ```bash -stripe-experiment-sync start stripe-experiment-sync migrate -stripe-experiment-sync backfill customers +stripe-experiment-sync sync +stripe-experiment-sync sync customer stripe-experiment-sync supabase install ``` ## Usage Examples -### Local Development Workflow - -1. **Run migrations**: +### One-off Backfill - ```bash - stripe-experiment-sync migrate - ``` - -2. **Start webhook server**: +```bash +# Sync all data and exit +stripe-experiment-sync sync - ```bash - stripe-experiment-sync start - ``` +# Sync a specific entity +stripe-experiment-sync sync customer +``` - Output: +### Continuous Sync (Backfill + Live Streaming) - ``` - Creating tables............ โœ“ - Populating tables.......... โœ“ - Streaming live changes..... โ— [press Ctrl-C to abort] - ``` +```bash +# Backfill then stream via WebSocket (no ngrok needed) +stripe-experiment-sync sync --listen-mode websocket -3. **Backfill historical data** (optional): - ```bash - stripe-experiment-sync backfill customers - stripe-experiment-sync backfill products - ``` +# Backfill then stream via ngrok webhook +stripe-experiment-sync sync --listen-mode webhook +``` ### Supabase Deployment Workflow 1. **Deploy to Supabase**: ```bash - # Default: worker runs every 60 seconds stripe-experiment-sync supabase install - - # Custom interval: worker runs every 2 minutes - stripe-experiment-sync supabase install --worker-interval 120 ``` 2. **Update webhook endpoint in Stripe dashboard** to point to your Supabase Edge Function 3. **To remove**: + ```bash stripe-experiment-sync supabase uninstall ``` @@ -183,20 +188,6 @@ This uses the [Stripe CLI](https://stripe.com/docs/stripe-cli) to send test webh ## How It Works -### Local Development Mode - -1. **Creates Tables**: Runs database migrations to create Stripe schema tables -2. **Sets Up Tunnel**: Creates an ngrok tunnel to expose your local server -3. **Registers Webhook**: Creates a Stripe webhook endpoint listening to all events (`*`) -4. **Streams Changes**: Real-time syncing of all Stripe events to PostgreSQL - -The CLI automatically: - -- Starts an Express server with webhook handling -- Creates an ngrok tunnel for webhook delivery -- Manages webhook lifecycle (creation/cleanup) -- Verifies webhook signatures for security - ### Supabase Deployment Mode 1. **Deploys Edge Functions**: Creates webhook and worker Edge Functions diff --git a/packages/sync-engine/src/cli/commands.ts b/packages/sync-engine/src/cli/commands.ts index 4de7780a..651c1fef 100644 --- a/packages/sync-engine/src/cli/commands.ts +++ b/packages/sync-engine/src/cli/commands.ts @@ -3,7 +3,7 @@ import express from 'express' import http from 'node:http' import dotenv from 'dotenv' import { type PoolConfig } from 'pg' -import { loadConfig, type CliOptions } from './config' +import { loadConfig, type CliOptions, type ListenMode } from './config' import { StripeSync, runMigrations, @@ -12,9 +12,8 @@ import { type StripeWebhookEvent, } from '../index' import { createTunnel, type NgrokTunnel } from './ngrok' -import { SYNC_OBJECTS, type StripeObject } from '../resourceRegistry' +import { type StripeObject } from '../resourceRegistry' import { install, uninstall } from '../supabase' -import { SIGMA_INGESTION_CONFIGS } from '../sigma/sigmaIngestionConfigs' /** * Monitor command - live display of table row counts. @@ -111,193 +110,6 @@ export interface DeployOptions { export type { CliOptions } -/** - * Backfill command - backfills a specific entity type from Stripe. - */ -export async function backfillCommand(options: CliOptions, entityName: string): Promise { - let stripeSync: StripeSync | null = null - - try { - // For backfill, we only need stripe key and database URL (not ngrok token) - dotenv.config() - - // Check if sigma is enabled via CLI option or env var - const enableSigma = options.enableSigma ?? process.env.ENABLE_SIGMA === 'true' - - // Validate entity name - allow sigma table names when sigma is enabled - const sigmaTableNames = enableSigma ? Object.keys(SIGMA_INGESTION_CONFIGS) : [] - const validEntities = new Set([...SYNC_OBJECTS, ...sigmaTableNames]) - if (!validEntities.has(entityName)) { - const entityList = enableSigma - ? `${SYNC_OBJECTS.join(', ')}, and ${sigmaTableNames.length} sigma tables` - : SYNC_OBJECTS.join(', ') - console.error( - chalk.red(`Error: Invalid entity name "${entityName}". Valid entities are: ${entityList}`) - ) - process.exit(1) - } - - // Check if this is a sigma table - const isSigmaTable = sigmaTableNames.includes(entityName) - - let stripeApiKey = - options.stripeKey || process.env.STRIPE_API_KEY || process.env.STRIPE_SECRET_KEY || '' - let databaseUrl = options.databaseUrl || process.env.DATABASE_URL || '' - - if (!stripeApiKey || !databaseUrl) { - const inquirer = (await import('inquirer')).default - const questions = [] - - if (!stripeApiKey) { - questions.push({ - type: 'password', - name: 'stripeApiKey', - message: 'Enter your Stripe API key:', - mask: '*', - validate: (input: string) => { - if (!input || input.trim() === '') { - return 'Stripe API key is required' - } - if (!input.startsWith('sk_') && !input.startsWith('rk_')) { - return 'Stripe API key should start with "sk_" or "rk_"' - } - return true - }, - }) - } - - if (!databaseUrl) { - questions.push({ - type: 'password', - name: 'databaseUrl', - message: 'Enter your Postgres DATABASE_URL:', - mask: '*', - validate: (input: string) => { - if (!input || input.trim() === '') { - return 'DATABASE_URL is required' - } - if (!input.startsWith('postgres://') && !input.startsWith('postgresql://')) { - return 'DATABASE_URL should start with "postgres://" or "postgresql://"' - } - return true - }, - }) - } - - if (questions.length > 0) { - console.log(chalk.yellow('\nMissing required configuration. Please provide:')) - const answers = await inquirer.prompt(questions) - if (answers.stripeApiKey) stripeApiKey = answers.stripeApiKey - if (answers.databaseUrl) databaseUrl = answers.databaseUrl - } - } - - const config = { - stripeApiKey, - databaseUrl, - ngrokAuthToken: '', // Not needed for backfill - } - const schemaName = isSigmaTable ? 'sigma' : 'stripe' - console.log(chalk.blue(`Backfilling ${entityName} from Stripe in '${schemaName}' schema...`)) - console.log(chalk.gray(`Database: ${config.databaseUrl.replace(/:[^:@]+@/, ':****@')}`)) - - // Run migrations first (will check for legacy installations and throw if detected) - try { - const schemaName = process.env.SYNC_SCHEMA_NAME ?? undefined - const syncTablesSchemaName = process.env.SYNC_TABLES_SCHEMA_NAME ?? undefined - await runMigrations({ - databaseUrl: config.databaseUrl, - enableSigma, - stripeApiVersion: process.env.STRIPE_API_VERSION || '2020-08-27', - schemaName, - syncTablesSchemaName, - }) - } catch (migrationError) { - console.error(chalk.red('Failed to run migrations:')) - console.error( - migrationError instanceof Error ? migrationError.message : String(migrationError) - ) - throw migrationError - } - - // Create StripeSync instance - const poolConfig: PoolConfig = { - max: 10, - connectionString: config.databaseUrl, - keepAlive: true, - } - - stripeSync = await StripeSync.create({ - databaseUrl: config.databaseUrl, - stripeSecretKey: config.stripeApiKey, - enableSigma, - stripeApiVersion: process.env.STRIPE_API_VERSION || '2020-08-27', - autoExpandLists: process.env.AUTO_EXPAND_LISTS === 'true', - backfillRelatedEntities: process.env.BACKFILL_RELATED_ENTITIES !== 'false', - poolConfig, - }) - - // Run sync for the specified entity - if (entityName === 'all') { - const backfill = await stripeSync.fullSync() - const objectCount = Object.keys(backfill.totals).length - console.log( - chalk.green( - `โœ“ Backfill complete: ${backfill.totalSynced} rows synced across ${objectCount} objects` - ) - ) - if (backfill.skipped.length > 0) { - console.log( - chalk.yellow( - `Skipped ${backfill.skipped.length} Sigma tables without access: ${backfill.skipped.join(', ')}` - ) - ) - } - if (backfill.errors.length > 0) { - console.log(chalk.red(`Backfill finished with ${backfill.errors.length} errors:`)) - for (const err of backfill.errors) { - console.log(chalk.red(` - ${err.object}: ${err.message}`)) - } - } - } else { - // Use fullSync for specific objects (including sigma tables) - // Cast to allow sigma table names which aren't in SyncObject type - const result = await stripeSync.fullSync( - [entityName] as StripeObject[], - true, - 20, - 10, - true, - 0 - ) - const tableType = isSigmaTable ? '(sigma)' : '' - console.log( - chalk.green( - `โœ“ Full sync complete: ${result.totalSynced} ${entityName} ${tableType} rows synced` - ) - ) - } - - // Clean up database pool - await stripeSync.close() - } catch (error) { - if (error instanceof Error) { - console.error(chalk.red(error.message)) - } - - // Clean up database pool on error - if (stripeSync) { - try { - await stripeSync.close() - } catch { - // Ignore cleanup errors - } - } - - process.exit(1) - } -} - /** * Migration command - runs database migrations only. */ @@ -372,281 +184,181 @@ export async function migrateCommand(options: CliOptions): Promise { } } +interface EventListenerResult { + wsClient: StripeWebSocketClient | null + tunnel: NgrokTunnel | null + server: http.Server | null + webhookId: string | null +} + /** - * Main sync command - syncs Stripe data to PostgreSQL using webhooks for real-time updates. - * Supports two modes: - * - WebSocket mode (default): Direct connection to Stripe via WebSocket, no ngrok needed - * - Webhook mode: Uses ngrok tunnel + Express server (when NGROK_AUTH_TOKEN is provided) + * Sets up real-time event listening via WebSocket or ngrok webhook tunnel. + * Returns the created resources so the caller can manage their lifecycle. */ -export async function syncCommand(options: CliOptions): Promise { - let stripeSync: StripeSync - let tunnel: NgrokTunnel | null = null - let server: http.Server | null = null - let webhookId: string | null = null - let wsClient: StripeWebSocketClient | null = null - - // Setup cleanup handler - const cleanup = async (signal?: string) => { - console.log(chalk.blue(`\n\nCleaning up... (signal: ${signal || 'manual'})`)) - - // Close WebSocket client if in WebSocket mode - if (wsClient) { - try { - wsClient.close() - console.log(chalk.green('โœ“ WebSocket closed')) - } catch { - console.log(chalk.yellow('โš  Could not close WebSocket')) - } - } - - // Delete webhook endpoint if created (unless keepWebhooksOnShutdown is true) - const keepWebhooksOnShutdown = process.env.KEEP_WEBHOOKS_ON_SHUTDOWN === 'true' - if (webhookId && stripeSync && !keepWebhooksOnShutdown) { - try { - await stripeSync.webhook.deleteManagedWebhook(webhookId) - console.log(chalk.green('โœ“ Webhook cleanup complete')) - } catch { - console.log(chalk.yellow('โš  Could not delete webhook')) - } - } - - // Close server - if (server) { - try { - await new Promise((resolve, reject) => { - server!.close((err) => { - if (err) reject(err) - else resolve() - }) - }) - console.log(chalk.green('โœ“ Server stopped')) - } catch { - console.log(chalk.yellow('โš  Server already stopped')) - } - } - - // Close tunnel - if (tunnel) { - try { - await tunnel.close() - } catch { - console.log(chalk.yellow('โš  Could not close tunnel')) - } - } - - // Close database pool - if (stripeSync) { - try { - await stripeSync.close() - console.log(chalk.green('โœ“ Database pool closed')) - } catch { - console.log(chalk.yellow('โš  Could not close database pool')) - } - } +export async function setupEventListener( + stripeSync: StripeSync, + config: { stripeApiKey: string; ngrokAuthToken?: string; databaseUrl: string }, + mode: ListenMode = 'websocket' +): Promise { + const modeLabel = mode === 'websocket' ? 'WebSocket' : 'Webhook (ngrok)' + console.log(chalk.blue(`\nMode: ${modeLabel}`)) + + const databaseUrlWithoutPassword = config.databaseUrl.replace(/:[^:@]+@/, ':****@') + + if (mode === 'websocket') { + console.log(chalk.blue('\nConnecting to Stripe WebSocket...')) + + const wsClient = await createStripeWebSocketClient({ + stripeApiKey: config.stripeApiKey, + onEvent: async (event: StripeWebhookEvent) => { + try { + const payload = JSON.parse(event.event_payload) + console.log(chalk.cyan(`โ† ${payload.type}`) + chalk.gray(` (${payload.id})`)) + await stripeSync.webhook.processEvent(payload) + return { + status: 200, + event_type: payload.type, + event_id: payload.id, + databaseUrl: databaseUrlWithoutPassword, + } + } catch (err) { + console.error(chalk.red('Error processing event:'), err) + return { + status: 500, + databaseUrl: databaseUrlWithoutPassword, + error: err instanceof Error ? err.message : String(err), + } + } + }, + onReady: (secret) => { + console.log(chalk.green('โœ“ Connected to Stripe WebSocket')) + const maskedSecret = + secret.length > 14 ? `${secret.slice(0, 10)}...${secret.slice(-4)}` : '****' + console.log(chalk.gray(` Webhook secret: ${maskedSecret}`)) + }, + onError: (error) => { + console.error(chalk.red('WebSocket error:'), error.message) + }, + onClose: (code, reason) => { + console.log(chalk.yellow(`WebSocket closed: ${code} - ${reason}`)) + }, + }) - process.exit(0) + return { wsClient, tunnel: null, server: null, webhookId: null } } - // Register cleanup handlers - process.on('SIGINT', () => cleanup('SIGINT')) - process.on('SIGTERM', () => cleanup('SIGTERM')) + // Webhook mode (ngrok) + const port = 3000 + const tunnel = await createTunnel(port, config.ngrokAuthToken!) - try { - // Load configuration - const config = await loadConfig(options) + const webhookPath = process.env.WEBHOOK_PATH || '/stripe-webhooks' + console.log(chalk.blue('\nCreating Stripe webhook endpoint...')) + const webhook = await stripeSync.webhook.findOrCreateManagedWebhook(`${tunnel.url}${webhookPath}`) + const webhookId = webhook.id + const eventCount = webhook.enabled_events?.length || 0 + console.log(chalk.green(`โœ“ Webhook created: ${webhook.id}`)) + console.log(chalk.cyan(` URL: ${webhook.url}`)) + console.log(chalk.cyan(` Events: ${eventCount} supported events`)) - // Determine mode based on USE_WEBSOCKET env var or ngrok token availability - // USE_WEBSOCKET=true explicitly forces WebSocket mode (useful for tests) - const useWebSocketMode = process.env.USE_WEBSOCKET === 'true' || !config.ngrokAuthToken - const modeLabel = useWebSocketMode ? 'WebSocket' : 'Webhook (ngrok)' - console.log(chalk.blue(`\nMode: ${modeLabel}`)) + const app = express() + const webhookRoute = webhookPath + app.use(webhookRoute, express.raw({ type: 'application/json' })) - // Show command with database URL (masked) - const maskedDbUrl = config.databaseUrl.replace(/:[^:@]+@/, ':****@') - console.log(chalk.gray(`Database: ${maskedDbUrl}`)) - - // 1. Run migrations (will check for legacy installations and throw if detected) - try { - const schemaName = process.env.SYNC_SCHEMA_NAME ?? undefined - const syncTablesSchemaName = process.env.SYNC_TABLES_SCHEMA_NAME ?? undefined - await runMigrations({ - databaseUrl: config.databaseUrl, - enableSigma: config.enableSigma, - stripeApiVersion: process.env.STRIPE_API_VERSION || '2020-08-27', - schemaName, - syncTablesSchemaName, - }) - } catch (migrationError) { - console.error(chalk.red('Failed to run migrations:')) - console.error( - migrationError instanceof Error ? migrationError.message : String(migrationError) - ) - throw migrationError + app.post(webhookRoute, async (req, res) => { + const sig = req.headers['stripe-signature'] + if (!sig || typeof sig !== 'string') { + console.error('[Webhook] Missing stripe-signature header') + return res.status(400).send({ error: 'Missing stripe-signature header' }) } - // 2. Create StripeSync instance - const poolConfig: PoolConfig = { - max: 10, - connectionString: config.databaseUrl, - keepAlive: true, + const rawBody = req.body + if (!rawBody || !Buffer.isBuffer(rawBody)) { + console.error('[Webhook] Body is not a Buffer!') + return res.status(400).send({ error: 'Missing raw body for signature verification' }) } - stripeSync = await StripeSync.create({ - databaseUrl: config.databaseUrl, - stripeSecretKey: config.stripeApiKey, - enableSigma: config.enableSigma, - stripeApiVersion: process.env.STRIPE_API_VERSION || '2020-08-27', - autoExpandLists: process.env.AUTO_EXPAND_LISTS === 'true', - backfillRelatedEntities: process.env.BACKFILL_RELATED_ENTITIES !== 'false', - poolConfig, - }) - - // let's get a database URL without password for logging purposes - const databaseUrlWithoutPassword = config.databaseUrl.replace(/:[^:@]+@/, ':****@') - - if (useWebSocketMode) { - // ===== WEBSOCKET MODE ===== - console.log(chalk.blue('\nConnecting to Stripe WebSocket...')) - - wsClient = await createStripeWebSocketClient({ - stripeApiKey: config.stripeApiKey, - onEvent: async (event: StripeWebhookEvent) => { - try { - const payload = JSON.parse(event.event_payload) - console.log(chalk.cyan(`โ† ${payload.type}`) + chalk.gray(` (${payload.id})`)) - if (stripeSync) { - await stripeSync.webhook.processEvent(payload) - return { - status: 200, - event_type: payload.type, - event_id: payload.id, - databaseUrl: databaseUrlWithoutPassword, - } - } - } catch (err) { - console.error(chalk.red('Error processing event:'), err) - return { - status: 500, - databaseUrl: databaseUrlWithoutPassword, - error: err instanceof Error ? err.message : String(err), - } - } - }, - onReady: (secret) => { - console.log(chalk.green('โœ“ Connected to Stripe WebSocket')) - const maskedSecret = - secret.length > 14 ? `${secret.slice(0, 10)}...${secret.slice(-4)}` : '****' - console.log(chalk.gray(` Webhook secret: ${maskedSecret}`)) - }, - onError: (error) => { - console.error(chalk.red('WebSocket error:'), error.message) - }, - onClose: (code, reason) => { - console.log(chalk.yellow(`WebSocket closed: ${code} - ${reason}`)) - }, - }) - } else { - // ===== WEBHOOK MODE (ngrok) ===== - const port = 3000 - tunnel = await createTunnel(port, config.ngrokAuthToken!) - - // Create managed webhook endpoint - const webhookPath = process.env.WEBHOOK_PATH || '/stripe-webhooks' - console.log(chalk.blue('\nCreating Stripe webhook endpoint...')) - const webhook = await stripeSync.webhook.findOrCreateManagedWebhook( - `${tunnel.url}${webhookPath}` - ) - webhookId = webhook.id - const eventCount = webhook.enabled_events?.length || 0 - console.log(chalk.green(`โœ“ Webhook created: ${webhook.id}`)) - console.log(chalk.cyan(` URL: ${webhook.url}`)) - console.log(chalk.cyan(` Events: ${eventCount} supported events`)) - - // Create Express app and mount webhook handler - const app = express() - const webhookRoute = webhookPath - app.use(webhookRoute, express.raw({ type: 'application/json' })) - - app.post(webhookRoute, async (req, res) => { - const sig = req.headers['stripe-signature'] - if (!sig || typeof sig !== 'string') { - console.error('[Webhook] Missing stripe-signature header') - return res.status(400).send({ error: 'Missing stripe-signature header' }) - } + try { + await stripeSync.webhook.processWebhook(rawBody, sig) + return res.status(200).send({ received: true }) + } catch (error) { + const errorMessage = error instanceof Error ? error.message : 'Unknown error' + console.error('[Webhook] Processing error:', errorMessage) + return res.status(400).send({ error: errorMessage }) + } + }) + + app.use(express.json()) + app.use(express.urlencoded({ extended: false })) + app.get('/health', async (req, res) => res.status(200).json({ status: 'ok' })) + + console.log(chalk.blue(`\nStarting server on port ${port}...`)) + const server = await new Promise((resolve, reject) => { + const srv = app.listen(port, '0.0.0.0', () => resolve(srv)) + srv.on('error', reject) + }) + console.log(chalk.green(`โœ“ Server started on port ${port}`)) + + return { wsClient: null, tunnel, server, webhookId } +} - const rawBody = req.body - if (!rawBody || !Buffer.isBuffer(rawBody)) { - console.error('[Webhook] Body is not a Buffer!') - return res.status(400).send({ error: 'Missing raw body for signature verification' }) - } +/** + * Tears down all resources: event listener (WebSocket/webhook/tunnel) and database pool. + */ +export async function cleanup( + listener: EventListenerResult, + stripeSync?: StripeSync, + signal?: string +): Promise { + console.log(chalk.blue(`\n\nCleaning up... (signal: ${signal || 'manual'})`)) - try { - await stripeSync.webhook.processWebhook(rawBody, sig) - return res.status(200).send({ received: true }) - } catch (error) { - const errorMessage = error instanceof Error ? error.message : 'Unknown error' - console.error('[Webhook] Processing error:', errorMessage) - return res.status(400).send({ error: errorMessage }) - } - }) + if (listener.wsClient) { + try { + listener.wsClient.close() + console.log(chalk.green('โœ“ WebSocket closed')) + } catch { + console.log(chalk.yellow('โš  Could not close WebSocket')) + } + } - app.use(express.json()) - app.use(express.urlencoded({ extended: false })) - app.get('/health', async (req, res) => res.status(200).json({ status: 'ok' })) + const keepWebhooksOnShutdown = process.env.KEEP_WEBHOOKS_ON_SHUTDOWN === 'true' + if (listener.webhookId && stripeSync && !keepWebhooksOnShutdown) { + try { + await stripeSync.webhook.deleteManagedWebhook(listener.webhookId) + console.log(chalk.green('โœ“ Webhook cleanup complete')) + } catch { + console.log(chalk.yellow('โš  Could not delete webhook')) + } + } - // Start Express server - console.log(chalk.blue(`\nStarting server on port ${port}...`)) + if (listener.server) { + try { await new Promise((resolve, reject) => { - server = app.listen(port, '0.0.0.0', () => resolve()) - server.on('error', reject) + listener.server!.close((err) => { + if (err) reject(err) + else resolve() + }) }) - console.log(chalk.green(`โœ“ Server started on port ${port}`)) + console.log(chalk.green('โœ“ Server stopped')) + } catch { + console.log(chalk.yellow('โš  Server already stopped')) } + } - // Run historical backfill sweep (unless disabled) - if (process.env.SKIP_BACKFILL !== 'true') { - if (!stripeSync) { - throw new Error('StripeSync not initialized.') - } - - console.log(chalk.blue('\nStarting historical backfill (parallel sweep)...')) - const backfill = await stripeSync.fullSync() - const objectCount = Object.keys(backfill.totals).length - console.log( - chalk.green( - `โœ“ Historical backfill complete: ${backfill.totalSynced} rows synced across ${objectCount} objects` - ) - ) - if (backfill.skipped.length > 0) { - console.log( - chalk.yellow( - `Skipped ${backfill.skipped.length} Sigma tables without access: ${backfill.skipped.join(', ')}` - ) - ) - } - if (backfill.errors.length > 0) { - console.log( - chalk.red( - `Historical backfill finished with ${backfill.errors.length} errors. See logs above.` - ) - ) - } - } else { - console.log(chalk.yellow('\nโญ๏ธ Skipping initial sync (SKIP_BACKFILL=true)')) + if (listener.tunnel) { + try { + await listener.tunnel.close() + } catch { + console.log(chalk.yellow('โš  Could not close tunnel')) } + } - console.log( - chalk.cyan('\nโ— Streaming live changes...') + chalk.gray(' [press Ctrl-C to abort]') - ) - - // Keep the process alive - await new Promise(() => {}) - } catch (error) { - if (error instanceof Error) { - console.error(chalk.red(error.message)) + if (stripeSync) { + try { + await stripeSync.close() + console.log(chalk.green('โœ“ Database pool closed')) + } catch { + console.log(chalk.yellow('โš  Could not close database pool')) } - await cleanup() - process.exit(1) } } @@ -655,84 +367,42 @@ export async function syncCommand(options: CliOptions): Promise { * completed within the given interval, otherwise re-syncs everything from Stripe. */ export async function fullSyncCommand( - options: CliOptions & { interval?: number; workerCount?: number; rateLimit?: number } + options: CliOptions, + entityName: string = 'all' ): Promise { - let stripeSync: StripeSync | null = null - - try { - dotenv.config() - - const enableSigma = options.enableSigma ?? process.env.ENABLE_SIGMA === 'true' - const intervalSeconds = options.interval ?? 86400 - - let stripeApiKey = - options.stripeKey || process.env.STRIPE_API_KEY || process.env.STRIPE_SECRET_KEY || '' - let databaseUrl = options.databaseUrl || process.env.DATABASE_URL || '' - - if (!stripeApiKey || !databaseUrl) { - const inquirer = (await import('inquirer')).default - const questions = [] - - if (!stripeApiKey) { - questions.push({ - type: 'password', - name: 'stripeApiKey', - message: 'Enter your Stripe API key:', - mask: '*', - validate: (input: string) => { - if (!input || input.trim() === '') { - return 'Stripe API key is required' - } - if (!input.startsWith('sk_') && !input.startsWith('rk_')) { - return 'Stripe API key should start with "sk_" or "rk_"' - } - return true - }, - }) - } + let stripeSync: StripeSync + let listener: EventListenerResult = { + wsClient: null, + tunnel: null, + server: null, + webhookId: null, + } - if (!databaseUrl) { - questions.push({ - type: 'password', - name: 'databaseUrl', - message: 'Enter your Postgres DATABASE_URL:', - mask: '*', - validate: (input: string) => { - if (!input || input.trim() === '') { - return 'DATABASE_URL is required' - } - if (!input.startsWith('postgres://') && !input.startsWith('postgresql://')) { - return 'DATABASE_URL should start with "postgres://" or "postgresql://"' - } - return true - }, - }) - } + const shutdown = async (signal?: string) => { + await cleanup(listener, stripeSync, signal) + process.exit(0) + } - if (questions.length > 0) { - console.log(chalk.yellow('\nMissing required configuration. Please provide:')) - const answers = await inquirer.prompt(questions) - if (answers.stripeApiKey) stripeApiKey = answers.stripeApiKey - if (answers.databaseUrl) databaseUrl = answers.databaseUrl - } - } + process.on('SIGINT', () => shutdown('SIGINT')) + process.on('SIGTERM', () => shutdown('SIGTERM')) - const config = { - stripeApiKey, - databaseUrl, - } + try { + const config = await loadConfig(options) + const intervalSeconds = options.interval ?? 86400 - console.log(chalk.blue('\nPerforming full resync of all Stripe data...')) - console.log(chalk.gray(`Database: ${config.databaseUrl.replace(/:[^:@]+@/, ':****@')}`)) + const maskedDbUrl = config.databaseUrl.replace(/:[^:@]+@/, ':****@') + const entityLabel = entityName ? `Stripe ${entityName} data` : 'all Stripe data' + console.log(chalk.blue(`\nPerforming full resync of ${entityLabel}...`)) + console.log(chalk.gray(`Database: ${maskedDbUrl}`)) console.log(chalk.gray(`Reconciliation interval: ${intervalSeconds}s`)) - // Run migrations first try { const schemaName = process.env.SYNC_SCHEMA_NAME ?? undefined const syncTablesSchemaName = process.env.SYNC_TABLES_SCHEMA_NAME ?? undefined await runMigrations({ databaseUrl: config.databaseUrl, - enableSigma, + enableSigma: config.enableSigma, + stripeApiVersion: process.env.STRIPE_API_VERSION || '2020-08-27', schemaName, syncTablesSchemaName, }) @@ -744,7 +414,6 @@ export async function fullSyncCommand( throw migrationError } - // Create StripeSync instance const poolConfig: PoolConfig = { max: 10, connectionString: config.databaseUrl, @@ -754,7 +423,7 @@ export async function fullSyncCommand( stripeSync = await StripeSync.create({ databaseUrl: config.databaseUrl, stripeSecretKey: config.stripeApiKey, - enableSigma, + enableSigma: config.enableSigma, stripeApiVersion: process.env.STRIPE_API_VERSION || '2020-08-27', autoExpandLists: process.env.AUTO_EXPAND_LISTS === 'true', backfillRelatedEntities: process.env.BACKFILL_RELATED_ENTITIES !== 'false', @@ -776,13 +445,31 @@ export async function fullSyncCommand( return } - // Run full resync + if (options.listenMode && options.listenMode !== 'disabled') { + listener = await setupEventListener(stripeSync, config, options.listenMode) + } + + if (options.listenOnly) { + console.log(chalk.yellow('Skipping initial sync (--listen-only)')) + if (options.listenMode && options.listenMode !== 'disabled') { + console.log( + chalk.cyan('\nโ— Streaming live changes...') + chalk.gray(' [press Ctrl-C to abort]') + ) + await new Promise(() => {}) + } + await shutdown() + return + } + const startTime = Date.now() + const tables = entityName !== 'all' ? [entityName as StripeObject] : undefined const result = await stripeSync.fullSync( - undefined, - undefined, + tables, + true, options.workerCount, - options.rateLimit + options.rateLimit, + true, + intervalSeconds ) const elapsed = ((Date.now() - startTime) / 1000).toFixed(1) const objectCount = Object.keys(result.totals).length @@ -805,22 +492,19 @@ export async function fullSyncCommand( } } - // Clean up database pool - await stripeSync.close() + if (options.listenMode && options.listenMode !== 'disabled') { + console.log( + chalk.cyan('\nโ— Streaming live changes...') + chalk.gray(' [press Ctrl-C to abort]') + ) + await new Promise(() => {}) + } else { + await shutdown() + } } catch (error) { if (error instanceof Error) { console.error(chalk.red(error.message)) } - - // Clean up database pool on error - if (stripeSync) { - try { - await stripeSync.close() - } catch { - // Ignore cleanup errors - } - } - + await shutdown() process.exit(1) } } diff --git a/packages/sync-engine/src/cli/config.ts b/packages/sync-engine/src/cli/config.ts index 2c4acadd..caa7a649 100644 --- a/packages/sync-engine/src/cli/config.ts +++ b/packages/sync-engine/src/cli/config.ts @@ -2,6 +2,8 @@ import dotenv from 'dotenv' import inquirer from 'inquirer' import chalk from 'chalk' +export type ListenMode = 'websocket' | 'webhook' | 'disabled' + export interface Config { stripeApiKey: string ngrokAuthToken?: string // Optional - if not provided, use WebSocket mode @@ -14,6 +16,11 @@ export interface CliOptions { ngrokToken?: string databaseUrl?: string enableSigma?: boolean + interval?: number + workerCount?: number + rateLimit?: number + listenMode?: ListenMode + listenOnly?: boolean } /** @@ -36,9 +43,7 @@ export async function loadConfig(options: CliOptions): Promise { config.databaseUrl = options.databaseUrl || process.env.DATABASE_URL || '' // Get Sigma sync option - config.enableSigma = - options.enableSigma ?? - (process.env.ENABLE_SIGMA !== undefined ? process.env.ENABLE_SIGMA === 'true' : undefined) + config.enableSigma = options.enableSigma ?? false // Prompt for missing required values const questions = [] @@ -82,15 +87,6 @@ export async function loadConfig(options: CliOptions): Promise { }) } - if (config.enableSigma === undefined) { - questions.push({ - type: 'confirm', - name: 'enableSigma', - message: 'Enable Sigma sync? (Requires Sigma access in Stripe API key)', - default: false, - }) - } - if (questions.length > 0) { console.log(chalk.yellow('\nMissing configuration. Please provide:')) const answers = await inquirer.prompt(questions) diff --git a/packages/sync-engine/src/cli/index.ts b/packages/sync-engine/src/cli/index.ts index a8d22a07..db54d703 100644 --- a/packages/sync-engine/src/cli/index.ts +++ b/packages/sync-engine/src/cli/index.ts @@ -3,13 +3,11 @@ import { Command } from 'commander' import pkg from '../../package.json' with { type: 'json' } import { - syncCommand, migrateCommand, - backfillCommand, - fullSyncCommand, monitorCommand, installCommand, uninstallCommand, + fullSyncCommand, } from './commands' const program = new Command() @@ -32,75 +30,55 @@ program }) }) -// Start command (main sync command) -program - .command('start') - .description('Start Stripe sync') - .option('--stripe-key ', 'Stripe API key (or STRIPE_API_KEY env)') - .option('--ngrok-token ', 'ngrok auth token (or NGROK_AUTH_TOKEN env)') - .option('--database-url ', 'Postgres DATABASE_URL (or DATABASE_URL env)') - .option('--sigma', 'Sync Sigma data (requires Sigma access in Stripe API key)') - .action(async (options) => { - await syncCommand({ - stripeKey: options.stripeKey, - ngrokToken: options.ngrokToken, - databaseUrl: options.databaseUrl, - enableSigma: options.sigma, - }) - }) - -// Backfill command +// Sync command program - .command('backfill ') - .description('Backfill a specific entity type from Stripe (e.g., customer, invoice, product)') + .command('sync [entityName]') + .description( + 'Re-sync from Stripe, optionally for a specific entity (e.g., customer, invoice), skipping if a successful run completed within --interval' + ) .option('--stripe-key ', 'Stripe API key (or STRIPE_API_KEY env)') .option('--database-url ', 'Postgres DATABASE_URL (or DATABASE_URL env)') + .option('--sigma', 'Enable Sigma tables') + .option( + '--interval ', + 'Skip resync if a successful run completed within this many seconds (default: 86400)', + (val) => parseInt(val, 10), + 86400 + ) .option( - '--sigma', - 'Enable Sigma tables (required for sigma table names like exchange_rates_from_usd)' + '--worker-count ', + 'Number of parallel sync workers (default: 50)', + (val) => parseInt(val, 10), + 50 ) + .option( + '--rate-limit ', + 'Max requests per second (default: 25)', + (val) => parseInt(val, 10), + 25 + ) + .option( + '--listen-mode ', + 'Event listener mode: websocket, webhook, or disabled (default: disabled)', + 'disabled' + ) + .option('--listen-only', 'Skip the initial sync and only set up the event listener') .action(async (entityName, options) => { - await backfillCommand( + await fullSyncCommand( { stripeKey: options.stripeKey, databaseUrl: options.databaseUrl, enableSigma: options.sigma, + interval: options.interval, + workerCount: options.workerCount, + rateLimit: options.rateLimit, + listenMode: options.listenMode, + listenOnly: options.listenOnly, }, entityName ) }) -// Full sync command -program - .command('full-sync') - .description( - 'Re-sync everything from Stripe, skipping if a successful run completed within --interval' - ) - .option('--stripe-key ', 'Stripe API key (or STRIPE_API_KEY env)') - .option('--database-url ', 'Postgres DATABASE_URL (or DATABASE_URL env)') - .option('--sigma', 'Enable Sigma tables') - .option( - '--interval ', - 'Skip resync if a successful run completed within this many seconds (default: 86400)', - (val) => parseInt(val, 10) - ) - .option('--worker-count ', 'Number of parallel sync workers (default: 100)', (val) => - parseInt(val, 10) - ) - .option('--rate-limit ', 'Max requests per second (default: 50)', (val) => - parseInt(val, 10) - ) - .action(async (options) => { - await fullSyncCommand({ - stripeKey: options.stripeKey, - databaseUrl: options.databaseUrl, - enableSigma: options.sigma, - interval: options.interval, - workerCount: options.workerCount, - rateLimit: options.rateLimit, - }) - }) - // Monitor command program .command('monitor') diff --git a/packages/sync-engine/src/cli/lib.ts b/packages/sync-engine/src/cli/lib.ts index f93fadf2..1aa2c85e 100644 --- a/packages/sync-engine/src/cli/lib.ts +++ b/packages/sync-engine/src/cli/lib.ts @@ -1,11 +1,4 @@ -export { - syncCommand, - migrateCommand, - backfillCommand, - fullSyncCommand, - installCommand, - uninstallCommand, -} from './commands' +export { migrateCommand, fullSyncCommand, installCommand, uninstallCommand } from './commands' export type { DeployOptions, CliOptions } from './commands' export { loadConfig } from './config' export { createTunnel } from './ngrok' diff --git a/packages/sync-engine/src/database/postgres.ts b/packages/sync-engine/src/database/postgres.ts index caea87c4..5db1c568 100644 --- a/packages/sync-engine/src/database/postgres.ts +++ b/packages/sync-engine/src/database/postgres.ts @@ -747,11 +747,15 @@ export class PostgresClient { accountId: string, triggeredBy: string, resourceNames: string[], - priorities?: Record + priorities?: Record, + segmentedSync: boolean = false ): Promise<{ accountId: string; runStartedAt: Date }> { const run = await this.getOrCreateSyncRun(accountId, triggeredBy) - await this.createObjectRuns(run.accountId, run.runStartedAt, resourceNames, priorities) + if (!segmentedSync) { + console.log({ accountId, triggeredBy, resourceNames }, 'Creating object runs') + await this.createObjectRuns(run.accountId, run.runStartedAt, resourceNames, priorities) + } return { accountId: run.accountId, runStartedAt: run.runStartedAt } } @@ -797,12 +801,19 @@ export class PostgresClient { triggeredBy: string, resourceNames: string[], interval: number = DAY, - priorities?: Record + priorities?: Record, + segmentedSync: boolean = false ): Promise<{ accountId: string; runStartedAt: Date } | null> { const completedRun = await this.getCompletedRun(accountId, interval) if (completedRun) return null - return this.joinOrCreateSyncRun(accountId, triggeredBy, resourceNames, priorities) + return this.joinOrCreateSyncRun( + accountId, + triggeredBy, + resourceNames, + priorities, + segmentedSync + ) } /** diff --git a/packages/sync-engine/src/stripeSync.ts b/packages/sync-engine/src/stripeSync.ts index a94dece0..085fc437 100644 --- a/packages/sync-engine/src/stripeSync.ts +++ b/packages/sync-engine/src/stripeSync.ts @@ -370,24 +370,14 @@ export class StripeSync { workerCount: number = 100 ): Promise { const priorities = this.buildPriorityMap(objects) - let runKey: RunKey | null - if (segmentedSync) { - runKey = await this.postgresClient.reconciliationRun( - this.accountId, - triggeredBy, - [], - interval, - priorities - ) - } else { - runKey = await this.postgresClient.reconciliationRun( - this.accountId, - triggeredBy, - tableNames, - interval, - priorities - ) - } + const runKey = await this.postgresClient.reconciliationRun( + this.accountId, + triggeredBy, + tableNames, + interval, + priorities, + segmentedSync + ) if (runKey == null) { return null } diff --git a/packages/sync-engine/src/tests/e2e/account-management.e2e.test.ts b/packages/sync-engine/src/tests/e2e/account-management.e2e.test.ts index c888785d..2d921c34 100644 --- a/packages/sync-engine/src/tests/e2e/account-management.e2e.test.ts +++ b/packages/sync-engine/src/tests/e2e/account-management.e2e.test.ts @@ -86,7 +86,7 @@ describe('Account Management E2E', () => { describe('dangerouslyDeleteSyncedAccountData()', () => { beforeAll(async () => { - runCliCommand('backfill', ['product'], { + runCliCommand('sync', ['product', '--rate-limit', '10', '--worker-count', '5'], { cwd, env: { DATABASE_URL: container.databaseUrl }, }) diff --git a/packages/sync-engine/src/tests/e2e/backfill.e2e.test.ts b/packages/sync-engine/src/tests/e2e/backfill.e2e.test.ts index caaca779..d910bf2d 100644 --- a/packages/sync-engine/src/tests/e2e/backfill.e2e.test.ts +++ b/packages/sync-engine/src/tests/e2e/backfill.e2e.test.ts @@ -1,6 +1,6 @@ /** - * Backfill E2E Test - * Tests backfill command with real Stripe data and incremental sync + * Sync E2E Test + * Tests sync command with real Stripe data and incremental sync */ import { describe, it, expect, beforeAll, afterAll } from 'vitest' import { execSync } from 'child_process' @@ -17,7 +17,7 @@ import { import { ResourceTracker } from './helpers/cleanup.js' import { runCliCommand } from './helpers/cli-process.js' -describe('Backfill E2E', () => { +describe('Sync E2E', () => { let pool: pg.Pool let container: PostgresContainer const tracker = new ResourceTracker() @@ -100,8 +100,8 @@ describe('Backfill E2E', () => { await container?.stop() }, 30000) - it('should backfill all data from Stripe', async () => { - runCliCommand('backfill', ['all'], { + it('should sync all data from Stripe', async () => { + runCliCommand('sync', ['all', '--rate-limit', '10', '--worker-count', '5'], { cwd, env: { DATABASE_URL: container.databaseUrl }, }) @@ -125,7 +125,8 @@ describe('Backfill E2E', () => { expect(priceCount).toBeGreaterThanOrEqual(3) }, 120000) - it('should save sync cursor after backfill', async () => { + it('should save sync cursor after sync', async () => { + // Get account ID from synced data const accountRow = await queryDbSingle<{ _account_id: string }>( pool, 'SELECT DISTINCT _account_id FROM stripe.products LIMIT 1' @@ -161,20 +162,24 @@ describe('Backfill E2E', () => { expect(statusRow?.status).toBe('complete') }) - it('should perform incremental sync on subsequent backfill', async () => { + it('should perform incremental sync on subsequent run', async () => { const newProduct = await stripe.products.create({ name: 'Test Product 4 - Incremental', - description: 'Integration test product 4 - created after first backfill', + description: 'Integration test product 4 - created after first sync', }) productIds.push(newProduct.id) tracker.trackProduct(newProduct.id) await sleep(2000) - runCliCommand('backfill', ['product'], { - cwd, - env: { DATABASE_URL: container.databaseUrl }, - }) + runCliCommand( + 'sync', + ['product', '--interval', '0', '--rate-limit', '10', '--worker-count', '5'], + { + cwd, + env: { DATABASE_URL: container.databaseUrl }, + } + ) const newProductInDb = await queryDbCount( pool, diff --git a/packages/sync-engine/src/tests/e2e/helpers/cli-process.ts b/packages/sync-engine/src/tests/e2e/helpers/cli-process.ts index f01771a9..957b961f 100644 --- a/packages/sync-engine/src/tests/e2e/helpers/cli-process.ts +++ b/packages/sync-engine/src/tests/e2e/helpers/cli-process.ts @@ -19,12 +19,16 @@ export class CliProcess { async start(env: Record = {}): Promise { const logStream = fs.createWriteStream(this.logFile) - this.process = spawn('node', ['dist/cli/index.js', 'start'], { - cwd: this.cwd, - env: { ...process.env, ...env }, - stdio: ['ignore', 'pipe', 'pipe'], - detached: false, - }) + this.process = spawn( + 'node', + ['dist/cli/index.js', 'sync', 'all', '--listen-mode', 'websocket', '--listen-only'], + { + cwd: this.cwd, + env: { ...process.env, ...env }, + stdio: ['ignore', 'pipe', 'pipe'], + detached: false, + } + ) this.process.stdout?.pipe(logStream) this.process.stderr?.pipe(logStream) diff --git a/packages/sync-engine/src/tests/e2e/recoverable-backfill.e2e.test.ts b/packages/sync-engine/src/tests/e2e/recoverable-backfill.e2e.test.ts index 4eab827d..73ffd9ad 100644 --- a/packages/sync-engine/src/tests/e2e/recoverable-backfill.e2e.test.ts +++ b/packages/sync-engine/src/tests/e2e/recoverable-backfill.e2e.test.ts @@ -86,11 +86,15 @@ describe('Error Recovery E2E', () => { return } - const syncProcess = spawn('node', ['dist/cli/index.js', 'backfill', 'product'], { - cwd, - env: { ...process.env, DATABASE_URL: container.databaseUrl }, - stdio: 'pipe', - }) + const syncProcess = spawn( + 'node', + ['dist/cli/index.js', 'sync', 'product', '--rate-limit', '10', '--worker-count', '5'], + { + cwd, + env: { ...process.env, DATABASE_URL: container.databaseUrl }, + stdio: 'pipe', + } + ) let status = '' let productsBeforeKill = 0 @@ -143,7 +147,7 @@ describe('Error Recovery E2E', () => { const productsAfterKill = await queryDbCount(pool, 'SELECT COUNT(*) FROM stripe.products') expect(productsAfterKill).toBeGreaterThanOrEqual(productsBeforeKill) - runCliCommand('backfill', ['product'], { + runCliCommand('sync', ['product', '--rate-limit', '10', '--worker-count', '5'], { cwd, env: { DATABASE_URL: container.databaseUrl }, }) diff --git a/packages/sync-engine/src/tests/e2e/sigma.e2e.test.ts b/packages/sync-engine/src/tests/e2e/sigma.e2e.test.ts index 79ee31ed..545e2c72 100644 --- a/packages/sync-engine/src/tests/e2e/sigma.e2e.test.ts +++ b/packages/sync-engine/src/tests/e2e/sigma.e2e.test.ts @@ -55,14 +55,18 @@ describe('Sigma E2E', () => { await container?.stop() }, 30000) - it('should backfill products (non-sigma)', async () => { - runCliCommand('backfill', ['product'], { - cwd, - env: { - DATABASE_URL: container.databaseUrl, - STRIPE_API_KEY: process.env.STRIPE_API_KEY_3!, - }, - }) + it('should sync products (non-sigma)', async () => { + runCliCommand( + 'sync', + ['product', '--interval', '0', '--rate-limit', '10', '--worker-count', '5'], + { + cwd, + env: { + DATABASE_URL: container.databaseUrl, + STRIPE_API_KEY: process.env.STRIPE_API_KEY_3!, + }, + } + ) const productCount = await queryDbCount( pool, @@ -71,14 +75,27 @@ describe('Sigma E2E', () => { expect(productCount).toBe(1) }, 60000) - it('should backfill subscription_item_change_events_v2_beta (sigma)', async () => { - runCliCommand('backfill', ['--sigma', 'subscription_item_change_events_v2_beta'], { - cwd, - env: { - DATABASE_URL: container.databaseUrl, - STRIPE_API_KEY: process.env.STRIPE_API_KEY_3!, - }, - }) + it('should sync subscription_item_change_events_v2_beta (sigma)', async () => { + runCliCommand( + 'sync', + [ + '--sigma', + 'subscription_item_change_events_v2_beta', + '--interval', + '0', + '--rate-limit', + '10', + '--worker-count', + '5', + ], + { + cwd, + env: { + DATABASE_URL: container.databaseUrl, + STRIPE_API_KEY: process.env.STRIPE_API_KEY_3!, + }, + } + ) const count = await queryDbCount( pool, @@ -87,14 +104,27 @@ describe('Sigma E2E', () => { expect(count).toBeGreaterThan(0) }, 60000) - it('should backfill exchange_rates_from_usd (sigma)', async () => { - runCliCommand('backfill', ['--sigma', 'exchange_rates_from_usd'], { - cwd, - env: { - DATABASE_URL: container.databaseUrl, - STRIPE_API_KEY: process.env.STRIPE_API_KEY_3!, - }, - }) + it('should sync exchange_rates_from_usd (sigma)', async () => { + runCliCommand( + 'sync', + [ + '--sigma', + 'exchange_rates_from_usd', + '--interval', + '0', + '--rate-limit', + '10', + '--worker-count', + '5', + ], + { + cwd, + env: { + DATABASE_URL: container.databaseUrl, + STRIPE_API_KEY: process.env.STRIPE_API_KEY_3!, + }, + } + ) const count = await queryDbCount(pool, 'SELECT COUNT(*) FROM sigma.exchange_rates_from_usd') expect(count).toBeGreaterThan(0)