diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5ffb4c0d1..7e75d0d01 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -54,14 +54,62 @@ jobs: run: pnpm nx affected -t build --configuration=production --parallel --exclude=playground --base="$NX_BASE" --head="$NX_HEAD" -# ────────────────────────────────── 2. DEPLOY PLAYGROUND ─────────────────────────── +# ─────────────────────────────────────── 2. EDGE-WORKER E2E ────────────────────────────────────── + edge-worker-e2e: + runs-on: ubuntu-latest + env: + NX_CLOUD_ACCESS_TOKEN: ${{ secrets.NX_CLOUD_ACCESS_TOKEN }} + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - uses: ./.github/actions/setup + + - name: Setup Deno + uses: denoland/setup-deno@v2 + with: + deno-version: '1.45.2' + + - name: Install sqruff + uses: quarylabs/install-sqruff-cli-action@main + + - name: Setup Atlas + uses: ariga/setup-atlas@master + with: + cloud-token: ${{ secrets.ATLAS_CLOUD_TOKEN }} + + - name: Set Nx SHAs for affected commands + uses: nrwl/nx-set-shas@v4 + + - name: Verify NX_BASE and NX_HEAD are set + run: echo "BASE=$NX_BASE HEAD=$NX_HEAD" + + - name: Check if edge-worker e2e tests are affected + id: check-affected + run: | + if pnpm nx show projects --affected -t test:e2e --base="$NX_BASE" --head="$NX_HEAD" | grep -q "^edge-worker$"; then + echo "affected=true" >> $GITHUB_OUTPUT + echo "Edge-worker e2e tests are affected by changes" + else + echo "affected=false" >> $GITHUB_OUTPUT + echo "Edge-worker e2e tests are not affected by changes - skipping" + fi + + - name: Run edge-worker e2e tests + if: steps.check-affected.outputs.affected == 'true' + run: pnpm nx affected -t test:e2e --parallel --base="$NX_BASE" --head="$NX_HEAD" + + +# ────────────────────────────────── 3. DEPLOY PLAYGROUND ─────────────────────────── deploy-playground: - needs: build-and-test - if: >- - ${{ - (github.event_name == 'pull_request') || - (github.ref == 'refs/heads/main' && github.event_name == 'push') - }} + needs: [build-and-test, edge-worker-e2e] + if: false # Disabled + # if: >- + # ${{ + # (github.event_name == 'pull_request') || + # (github.ref == 'refs/heads/main' && github.event_name == 'push') + # }} runs-on: ubuntu-latest environment: ${{ github.event_name == 'pull_request' && 'preview' || 'production' }} env: @@ -95,9 +143,9 @@ jobs: preview-url: https://pr-${{ github.event.pull_request.number }}--pgflow-demo.netlify.app production-url: https://playground.pgflow.dev -# ────────────────────────────────── 3. DEPLOY WEBSITE ─────────────────────────── +# ────────────────────────────────── 4. DEPLOY WEBSITE ─────────────────────────── deploy-website: - needs: build-and-test + needs: [build-and-test, edge-worker-e2e] runs-on: ubuntu-latest environment: ${{ github.event_name == 'pull_request' && 'preview' || 'production' }} env: diff --git a/pkgs/edge-worker/CLAUDE.md b/pkgs/edge-worker/CLAUDE.md index c056fb9b7..e3b0ceb83 100644 --- a/pkgs/edge-worker/CLAUDE.md +++ b/pkgs/edge-worker/CLAUDE.md @@ -53,13 +53,34 @@ The Edge Worker: ## Testing -The package has different test types: - -- **Unit tests** (`tests/unit/`) - For testing isolated components -- **Integration tests** (`tests/integration/`) - For testing integration with PostgreSQL -- **E2E tests** (`tests/e2e/`) - For testing end-to-end workflows (not automated for now) - -Tests use Deno's built-in testing framework. Database tests require Docker to run a PostgreSQL instance. +The package has three test types with different infrastructure requirements: + +- **Unit tests** (`tests/unit/`) - Testing isolated components + - No external dependencies required + +- **Integration tests** (`tests/integration/`) - Testing integration with PostgreSQL + - Requires: Supabase database (via `db:ensure`) + - Runs in CI + +- **E2E tests** (`tests/e2e/`) - Testing end-to-end workflows with Edge Functions + - Requires: Supabase database + Edge Functions server (via `serve:functions:e2e`) + - Uses `continuous: true` in Nx to keep server running during tests + - Tests worker scaling behavior, CPU limits, and concurrency (20k+ messages) + - Takes 2+ minutes to complete + - Runs in CI as separate parallel job (only when edge-worker is affected) + - Run locally with: `pnpm nx test:e2e edge-worker` + +Tests use Deno's built-in testing framework. Database tests require Docker to run a PostgreSQL instance via Supabase CLI. + +### E2E Test Architecture + +E2E tests verify real Edge Function execution: +1. `sync-e2e-deps` copies built packages (core, dsl, edge-worker) to `supabase/functions/_vendor` +2. `serve:functions:e2e` starts Supabase Functions server in background (continuous task) +3. Tests make HTTP requests to spawn workers and verify behavior +4. Tests in `tests/e2e/`: + - `restarts.test.ts` - Worker auto-restart when CPU clock limit hits + - `performance.test.ts` - Processing thousands of queued messages concurrently ## Usage Examples diff --git a/pkgs/edge-worker/deno.lock b/pkgs/edge-worker/deno.lock index 347e5e681..5ad3386ae 100644 --- a/pkgs/edge-worker/deno.lock +++ b/pkgs/edge-worker/deno.lock @@ -4,16 +4,19 @@ "specifiers": { "jsr:@deno-library/progress": "jsr:@deno-library/progress@1.5.1", "jsr:@henrygd/queue@^1.0.7": "jsr:@henrygd/queue@1.0.7", + "jsr:@std/assert": "jsr:@std/assert@0.224.0", "jsr:@std/assert@^0.224.0": "jsr:@std/assert@0.224.0", "jsr:@std/async@^0.224.0": "jsr:@std/async@0.224.2", + "jsr:@std/crypto": "jsr:@std/crypto@1.0.5", "jsr:@std/fmt@1.0.3": "jsr:@std/fmt@1.0.3", "jsr:@std/fmt@^0.224.0": "jsr:@std/fmt@0.224.0", "jsr:@std/internal@^0.224.0": "jsr:@std/internal@0.224.0", "jsr:@std/io@0.225.0": "jsr:@std/io@0.225.0", "jsr:@std/io@^0.225.0": "jsr:@std/io@0.225.0", "npm:@henrygd/queue@^1.0.7": "npm:@henrygd/queue@1.0.7", - "npm:@supabase/supabase-js@^2.39.0": "npm:@supabase/supabase-js@2.55.0", - "npm:@supabase/supabase-js@^2.47.10": "npm:@supabase/supabase-js@2.55.0", + "npm:@supabase/supabase-js@^2.39.0": "npm:@supabase/supabase-js@2.74.0", + "npm:@supabase/supabase-js@^2.47.10": "npm:@supabase/supabase-js@2.74.0", + "npm:@types/deno@^2.3.0": "npm:@types/deno@2.3.0", "npm:@types/node@~18.16.20": "npm:@types/node@18.16.20", "npm:postgres@3.4.5": "npm:postgres@3.4.5", "npm:supabase@2.21.1": "npm:supabase@2.21.1" @@ -39,6 +42,9 @@ "@std/async@0.224.2": { "integrity": "4d277d6e165df43d5e061ba0ef3edfddb8e8d558f5b920e3e6b1d2614b44d074" }, + "@std/crypto@1.0.5": { + "integrity": "0dcfbb319fe0bba1bd3af904ceb4f948cde1b92979ec1614528380ed308a3b40" + }, "@std/fmt@0.224.0": { "integrity": "e20e9a2312a8b5393272c26191c0a68eda8d2c4b08b046bad1673148f1d69851" }, @@ -66,14 +72,14 @@ "minipass": "minipass@7.1.2" } }, - "@supabase/auth-js@2.71.1": { - "integrity": "sha512-mMIQHBRc+SKpZFRB2qtupuzulaUhFYupNyxqDj5Jp/LyPvcWvjaJzZzObv6URtL/O6lPxkanASnotGtNpS3H2Q==", + "@supabase/auth-js@2.74.0": { + "integrity": "sha512-EJYDxYhBCOS40VJvfQ5zSjo8Ku7JbTICLTcmXt4xHMQZt4IumpRfHg11exXI9uZ6G7fhsQlNgbzDhFN4Ni9NnA==", "dependencies": { "@supabase/node-fetch": "@supabase/node-fetch@2.6.15" } }, - "@supabase/functions-js@2.4.5": { - "integrity": "sha512-v5GSqb9zbosquTo6gBwIiq7W9eQ7rE5QazsK/ezNiQXdCbY+bH8D9qEaBIkhVvX4ZRW5rP03gEfw5yw9tiq4EQ==", + "@supabase/functions-js@2.74.0": { + "integrity": "sha512-VqWYa981t7xtIFVf7LRb9meklHckbH/tqwaML5P3LgvlaZHpoSPjMCNLcquuLYiJLxnh2rio7IxLh+VlvRvSWw==", "dependencies": { "@supabase/node-fetch": "@supabase/node-fetch@2.6.15" } @@ -84,14 +90,14 @@ "whatwg-url": "whatwg-url@5.0.0" } }, - "@supabase/postgrest-js@1.19.4": { - "integrity": "sha512-O4soKqKtZIW3olqmbXXbKugUtByD2jPa8kL2m2c1oozAO11uCcGrRhkZL0kVxjBLrXHE0mdSkFsMj7jDSfyNpw==", + "@supabase/postgrest-js@2.74.0": { + "integrity": "sha512-9Ypa2eS0Ib/YQClE+BhDSjx7OKjYEF6LAGjTB8X4HucdboGEwR0LZKctNfw6V0PPIAVjjzZxIlNBXGv0ypIkHw==", "dependencies": { "@supabase/node-fetch": "@supabase/node-fetch@2.6.15" } }, - "@supabase/realtime-js@2.15.1": { - "integrity": "sha512-edRFa2IrQw50kNntvUyS38hsL7t2d/psah6om6aNTLLcWem0R6bOUq7sk7DsGeSlNfuwEwWn57FdYSva6VddYw==", + "@supabase/realtime-js@2.74.0": { + "integrity": "sha512-K5VqpA4/7RO1u1nyD5ICFKzWKu58bIDcPxHY0aFA7MyWkFd0pzi/XYXeoSsAifnD9p72gPIpgxVXCQZKJg1ktQ==", "dependencies": { "@supabase/node-fetch": "@supabase/node-fetch@2.6.15", "@types/phoenix": "@types/phoenix@1.6.6", @@ -99,23 +105,27 @@ "ws": "ws@8.18.3" } }, - "@supabase/storage-js@2.10.5": { - "integrity": "sha512-9O7DfVI+v+exwPIKFGlCn2gigx441ojHnJ5QpHPegvblPdZQnnmCrL+A0JHj7wD/e+f9fnLu/DBKAtLYrLtftw==", + "@supabase/storage-js@2.74.0": { + "integrity": "sha512-o0cTQdMqHh4ERDLtjUp1/KGPbQoNwKRxUh6f8+KQyjC5DSmiw/r+jgFe/WHh067aW+WU8nA9Ytw9ag7OhzxEkQ==", "dependencies": { "@supabase/node-fetch": "@supabase/node-fetch@2.6.15" } }, - "@supabase/supabase-js@2.55.0": { - "integrity": "sha512-Y1uV4nEMjQV1x83DGn7+Z9LOisVVRlY1geSARrUHbXWgbyKLZ6/08dvc0Us1r6AJ4tcKpwpCZWG9yDQYo1JgHg==", + "@supabase/supabase-js@2.74.0": { + "integrity": "sha512-IEMM/V6gKdP+N/X31KDIczVzghDpiPWFGLNjS8Rus71KvV6y6ueLrrE/JGCHDrU+9pq5copF3iCa0YQh+9Lq9Q==", "dependencies": { - "@supabase/auth-js": "@supabase/auth-js@2.71.1", - "@supabase/functions-js": "@supabase/functions-js@2.4.5", + "@supabase/auth-js": "@supabase/auth-js@2.74.0", + "@supabase/functions-js": "@supabase/functions-js@2.74.0", "@supabase/node-fetch": "@supabase/node-fetch@2.6.15", - "@supabase/postgrest-js": "@supabase/postgrest-js@1.19.4", - "@supabase/realtime-js": "@supabase/realtime-js@2.15.1", - "@supabase/storage-js": "@supabase/storage-js@2.10.5" + "@supabase/postgrest-js": "@supabase/postgrest-js@2.74.0", + "@supabase/realtime-js": "@supabase/realtime-js@2.74.0", + "@supabase/storage-js": "@supabase/storage-js@2.74.0" } }, + "@types/deno@2.3.0": { + "integrity": "sha512-/4SyefQpKjwNKGkq9qG3Ln7MazfbWKvydyVFBnXzP5OQA4u1paoFtaOe1iHKycIWHHkhYag0lPxyheThV1ijzw==", + "dependencies": {} + }, "@types/node@18.16.19": { "integrity": "sha512-IXl7o+R9iti9eBW4Wg2hx1xQDig183jj7YLn8F7udNceyfkbn1ZxmzZXuak20gR40D7pIkIY1kYGx5VIGbaHKA==", "dependencies": {} diff --git a/pkgs/edge-worker/project.json b/pkgs/edge-worker/project.json index c89e93de7..b4af3401e 100644 --- a/pkgs/edge-worker/project.json +++ b/pkgs/edge-worker/project.json @@ -100,7 +100,6 @@ "cache": false, "inputs": [ "{workspaceRoot}/pkgs/core/supabase/migrations/**/*.sql", - "{projectRoot}/sql/**/*.sql", "^production" ], "options": { @@ -109,7 +108,6 @@ "mkdir -p supabase/migrations/", "rm -f supabase/migrations/*.sql", "cp ../core/supabase/migrations/*.sql supabase/migrations/", - "cp sql/*_*.sql supabase/migrations/", "supabase db reset" ], "parallel": false @@ -123,7 +121,7 @@ "options": { "cwd": "pkgs/edge-worker", "commands": [ - "supabase functions serve --env-file supabase/functions/.env" + "supabase functions serve --env-file supabase/functions/.env --no-verify-jwt" ], "parallel": false } @@ -132,11 +130,7 @@ "executor": "nx:run-commands", "local": true, "dependsOn": ["^verify-migrations", "^dump-realtime-schema"], - "inputs": [ - "default", - "^production", - "{projectRoot}/scripts/ensure-db" - ], + "inputs": ["default", "^production", "{projectRoot}/scripts/ensure-db"], "options": { "cwd": "pkgs/edge-worker", "commands": ["./scripts/ensure-db"], @@ -169,9 +163,34 @@ "parallel": false } }, + "sync-e2e-deps": { + "executor": "nx:run-commands", + "dependsOn": ["^build"], + "local": true, + "inputs": ["^production"], + "options": { + "cwd": "pkgs/edge-worker", + "commands": ["./scripts/sync-e2e-deps.sh"], + "parallel": false + } + }, + "serve:functions:e2e": { + "executor": "nx:run-commands", + "continuous": true, + "dependsOn": ["sync-e2e-deps", "supabase:start"], + "local": true, + "cache": false, + "options": { + "cwd": "pkgs/edge-worker", + "commands": [ + "supabase functions serve --env-file supabase/functions/.env --import-map supabase/functions/deno.json --no-verify-jwt" + ], + "parallel": false + } + }, "test:e2e": { "executor": "nx:run-commands", - "dependsOn": ["db:ensure", "^build"], + "dependsOn": ["supabase:reset", "serve:functions:e2e"], "local": true, "inputs": ["default", "^production"], "options": { diff --git a/pkgs/edge-worker/scripts/sync-e2e-deps.sh b/pkgs/edge-worker/scripts/sync-e2e-deps.sh new file mode 100755 index 000000000..f8afd7d2a --- /dev/null +++ b/pkgs/edge-worker/scripts/sync-e2e-deps.sh @@ -0,0 +1,73 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +EDGE_WORKER_DIR="$(dirname "$SCRIPT_DIR")" +MONOREPO_ROOT="$(cd "$EDGE_WORKER_DIR/../.." && pwd)" +VENDOR_DIR="$EDGE_WORKER_DIR/supabase/functions/_vendor" + +echo "🔄 Syncing edge function dependencies for e2e tests..." + +# Clean and create vendor directory +rm -rf "$VENDOR_DIR" +mkdir -p "$VENDOR_DIR/@pgflow" + +# Verify builds succeeded +if [ ! -d "$MONOREPO_ROOT/pkgs/core/dist" ]; then + echo "❌ Error: core package build failed - dist directory not found" + exit 1 +fi + +if [ ! -d "$MONOREPO_ROOT/pkgs/dsl/dist" ]; then + echo "❌ Error: dsl package build failed - dist directory not found" + exit 1 +fi + +# Copy core package +echo "📋 Copying @pgflow/core..." +mkdir -p "$VENDOR_DIR/@pgflow/core" +cp -r "$MONOREPO_ROOT/pkgs/core/dist/"* "$VENDOR_DIR/@pgflow/core/" +cp "$MONOREPO_ROOT/pkgs/core/package.json" "$VENDOR_DIR/@pgflow/core/" + +# Copy dsl package +echo "📋 Copying @pgflow/dsl..." +mkdir -p "$VENDOR_DIR/@pgflow/dsl" +cp -r "$MONOREPO_ROOT/pkgs/dsl/dist/"* "$VENDOR_DIR/@pgflow/dsl/" +cp "$MONOREPO_ROOT/pkgs/dsl/package.json" "$VENDOR_DIR/@pgflow/dsl/" + +# Copy edge-worker source (not built) - preserving directory structure +echo "📋 Copying @pgflow/edge-worker..." +mkdir -p "$VENDOR_DIR/@pgflow/edge-worker" +# Copy the entire src directory to maintain relative imports +cp -r "$MONOREPO_ROOT/pkgs/edge-worker/src" "$VENDOR_DIR/@pgflow/edge-worker/" + +# Simple fix: replace .js with .ts in imports +find "$VENDOR_DIR/@pgflow/edge-worker" -name "*.ts" -type f -exec sed -i 's/\.js"/\.ts"/g' {} + +find "$VENDOR_DIR/@pgflow/edge-worker" -name "*.ts" -type f -exec sed -i "s/\.js'/\.ts'/g" {} + + +# Create a redirect index.ts at the root that points to src/index.ts +cat > "$VENDOR_DIR/@pgflow/edge-worker/index.ts" << 'EOF' +// Re-export from the src directory to maintain compatibility +export * from './src/index.ts'; +EOF + +# Create _internal.ts redirect as well since edge-worker exports this path +cat > "$VENDOR_DIR/@pgflow/edge-worker/_internal.ts" << 'EOF' +// Re-export from the src directory to maintain compatibility +export * from './src/_internal.ts'; +EOF + +# Verify key files exist +if [ ! -f "$VENDOR_DIR/@pgflow/core/index.js" ]; then + echo "⚠️ Warning: @pgflow/core/index.js not found after copy" +fi + +if [ ! -f "$VENDOR_DIR/@pgflow/dsl/index.js" ]; then + echo "⚠️ Warning: @pgflow/dsl/index.js not found after copy" +fi + +if [ ! -f "$VENDOR_DIR/@pgflow/edge-worker/src/index.ts" ]; then + echo "⚠️ Warning: @pgflow/edge-worker/src/index.ts not found after copy" +fi + +echo "✅ Dependencies synced to $VENDOR_DIR" diff --git a/pkgs/edge-worker/sql/990_active_workers.sql b/pkgs/edge-worker/sql/990_active_workers.sql deleted file mode 100644 index 95b2b4cf8..000000000 --- a/pkgs/edge-worker/sql/990_active_workers.sql +++ /dev/null @@ -1,11 +0,0 @@ --- Active workers are workers that have sent a heartbeat in the last 6 seconds -create or replace view pgflow.active_workers as -select - worker_id, - queue_name, - function_name, - started_at, - stopped_at, - last_heartbeat_at -from pgflow.workers -where last_heartbeat_at > now() - make_interval(secs => 6); diff --git a/pkgs/edge-worker/sql/991_inactive_workers.sql b/pkgs/edge-worker/sql/991_inactive_workers.sql deleted file mode 100644 index 60c6617ef..000000000 --- a/pkgs/edge-worker/sql/991_inactive_workers.sql +++ /dev/null @@ -1,12 +0,0 @@ --- Inactive workers are workers that have not sent --- a heartbeat in the last 6 seconds -create or replace view pgflow.inactive_workers as -select - worker_id, - queue_name, - function_name, - started_at, - stopped_at, - last_heartbeat_at -from pgflow.workers -where last_heartbeat_at < now() - make_interval(secs => 6); diff --git a/pkgs/edge-worker/sql/992_spawn_worker.sql b/pkgs/edge-worker/sql/992_spawn_worker.sql deleted file mode 100644 index e82e411ac..000000000 --- a/pkgs/edge-worker/sql/992_spawn_worker.sql +++ /dev/null @@ -1,68 +0,0 @@ -create extension if not exists pg_net; - --- Calls edge function asynchronously, requires Vault secrets to be set: --- - supabase_anon_key --- - app_url -create or replace function pgflow.call_edgefn_async( - function_name text, - body text -) -returns bigint -language plpgsql -volatile -set search_path to pgflow -as $$ -declare - request_id bigint; -begin - IF function_name IS NULL OR function_name = '' THEN - raise exception 'function_name cannot be null or empty'; - END IF; - - WITH secret as ( - select decrypted_secret AS supabase_anon_key - from vault.decrypted_secrets - where name = 'supabase_anon_key' - ), - settings AS ( - select decrypted_secret AS app_url - from vault.decrypted_secrets - where name = 'app_url' - ) - select net.http_post( - url => (select app_url from settings) || '/functions/v1/' || function_name, - body => jsonb_build_object('body', body), - headers := jsonb_build_object( - 'Authorization', 'Bearer ' || (select supabase_anon_key from secret) - ) - ) into request_id; - - return request_id; -end; -$$; - --- Spawn a new worker asynchronously via edge function --- --- It is intended to be used in a cron job that ensures continuos operation -create or replace function pgflow.spawn( - function_name text -) returns integer as $$ -declare - p_function_name text := function_name; - v_active_count integer; -begin - SELECT COUNT(*) - INTO v_active_count - FROM pgflow.active_workers AS aw - WHERE aw.function_name = p_function_name; - - IF v_active_count < 1 THEN - raise notice 'Spawning new worker: %', p_function_name; - PERFORM pgflow.call_edgefn_async(p_function_name, ''); - return 1; - ELSE - raise notice 'Worker Exists for queue: NOT spawning new worker for queue: %', p_function_name; - return 0; - END IF; -end; -$$ language plpgsql; diff --git a/pkgs/edge-worker/supabase/config.toml b/pkgs/edge-worker/supabase/config.toml index 61adea00f..c850dd51b 100644 --- a/pkgs/edge-worker/supabase/config.toml +++ b/pkgs/edge-worker/supabase/config.toml @@ -18,7 +18,7 @@ max_client_conn = 250 [db.seed] enabled = true -sql_paths = ['./seed.sql'] +sql_paths = [] [edge_runtime] enabled = true diff --git a/pkgs/edge-worker/supabase/functions/.gitignore b/pkgs/edge-worker/supabase/functions/.gitignore new file mode 100644 index 000000000..92d8b7757 --- /dev/null +++ b/pkgs/edge-worker/supabase/functions/.gitignore @@ -0,0 +1 @@ +_vendor/ diff --git a/pkgs/edge-worker/supabase/functions/cpu_intensive/deno.json b/pkgs/edge-worker/supabase/functions/cpu_intensive/deno.json deleted file mode 100644 index 49c45ff6c..000000000 --- a/pkgs/edge-worker/supabase/functions/cpu_intensive/deno.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "unstable": ["sloppy-imports"], - "imports": { - "@henrygd/queue": "jsr:@henrygd/queue@^1.0.7", - "@std/assert": "jsr:@std/assert@^0.224.0", - "@std/async": "jsr:@std/async@^0.224.0", - "@std/log": "jsr:@std/log@^0.224.13", - "@std/testing/mock": "jsr:@std/testing/mock@^0.224.0", - "postgres": "npm:postgres@3.4.5", - "@pgflow/core": "../../../../core/dist/index.js", - "@pgflow/dsl": "../../../../dsl/dist/index.js", - "@pgflow/edge-worker": "../../../src/index.ts" - } -} diff --git a/pkgs/edge-worker/supabase/functions/deno.json b/pkgs/edge-worker/supabase/functions/deno.json new file mode 100644 index 000000000..a8405e35f --- /dev/null +++ b/pkgs/edge-worker/supabase/functions/deno.json @@ -0,0 +1,20 @@ +{ + "unstable": ["sloppy-imports"], + "imports": { + "@pgflow/core": "./_vendor/@pgflow/core/index.js", + "@pgflow/core/": "./_vendor/@pgflow/core/", + "@pgflow/dsl": "./_vendor/@pgflow/dsl/index.js", + "@pgflow/dsl/": "./_vendor/@pgflow/dsl/", + "@pgflow/edge-worker": "./_vendor/@pgflow/edge-worker/index.ts", + "@pgflow/edge-worker/": "./_vendor/@pgflow/edge-worker/", + "@pgflow/edge-worker/_internal": "./_vendor/@pgflow/edge-worker/_internal.ts", + "@henrygd/queue": "jsr:@henrygd/queue@^1.0.7", + "@std/assert": "jsr:@std/assert@^0.224.0", + "@std/async": "jsr:@std/async@^0.224.0", + "@std/crypto": "jsr:@std/crypto@^1.0.5", + "@std/log": "jsr:@std/log@^0.224.13", + "@std/testing/mock": "jsr:@std/testing@^0.224.0/mock", + "@supabase/supabase-js": "jsr:@supabase/supabase-js@^2.49.4", + "postgres": "npm:postgres@3.4.5" + } +} diff --git a/pkgs/edge-worker/supabase/functions/max_concurrency/deno.json b/pkgs/edge-worker/supabase/functions/max_concurrency/deno.json deleted file mode 100644 index 49c45ff6c..000000000 --- a/pkgs/edge-worker/supabase/functions/max_concurrency/deno.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "unstable": ["sloppy-imports"], - "imports": { - "@henrygd/queue": "jsr:@henrygd/queue@^1.0.7", - "@std/assert": "jsr:@std/assert@^0.224.0", - "@std/async": "jsr:@std/async@^0.224.0", - "@std/log": "jsr:@std/log@^0.224.13", - "@std/testing/mock": "jsr:@std/testing/mock@^0.224.0", - "postgres": "npm:postgres@3.4.5", - "@pgflow/core": "../../../../core/dist/index.js", - "@pgflow/dsl": "../../../../dsl/dist/index.js", - "@pgflow/edge-worker": "../../../src/index.ts" - } -} diff --git a/pkgs/edge-worker/supabase/seed.sql b/pkgs/edge-worker/supabase/seed.sql deleted file mode 100644 index 7c286c816..000000000 --- a/pkgs/edge-worker/supabase/seed.sql +++ /dev/null @@ -1,2 +0,0 @@ -select vault.create_secret('http://host.docker.internal:50321', 'app_url'); -select vault.create_secret('eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6ImFub24iLCJleHAiOjE5ODM4MTI5OTZ9.CRXP1A7WOeoJeXxjNni43kdQwgnWNReilDMblYTn_I0', 'supabase_anon_key'); diff --git a/pkgs/edge-worker/tests/config.ts b/pkgs/edge-worker/tests/config.ts new file mode 100644 index 000000000..b3e1c381b --- /dev/null +++ b/pkgs/edge-worker/tests/config.ts @@ -0,0 +1,32 @@ +/** + * Test configuration - single source of truth for URLs and connection strings + * + * Values are hardcoded for now. In future, could run `supabase status -o json` + * to get dynamic values. + * + * Ports match those defined in supabase/config.toml + */ + +/** + * E2E test configuration + * Uses Supabase instance (same as edge functions connect to) + */ +export const e2eConfig = { + get apiUrl() { + return 'http://127.0.0.1:50321'; + }, + + get dbUrl() { + return 'postgresql://postgres:postgres@127.0.0.1:50322/postgres'; + }, +}; + +/** + * Integration test configuration + * Uses Docker Compose database (port 5432) + */ +export const integrationConfig = { + get dbUrl() { + return 'postgresql://postgres:postgres@127.0.0.1:5432/postgres'; + }, +}; diff --git a/pkgs/edge-worker/tests/db.ts b/pkgs/edge-worker/tests/db.ts index d38e57473..2a4244563 100644 --- a/pkgs/edge-worker/tests/db.ts +++ b/pkgs/edge-worker/tests/db.ts @@ -1,4 +1,5 @@ import postgres from 'postgres'; +import { integrationConfig } from './config.ts'; function createSql(dbUrl: string) { return postgres(dbUrl, { @@ -13,7 +14,7 @@ function createSql(dbUrl: string) { export function withTransaction( callback: (sql: postgres.Sql) => Promise ) { - const dbUrl = `postgresql://postgres:postgres@127.0.0.1:5432/postgres`; + const dbUrl = integrationConfig.dbUrl; const localSql = createSql(dbUrl); return async () => { @@ -59,7 +60,7 @@ export function withTransaction( export function withPgNoTransaction( callback: (sql: postgres.Sql) => Promise ) { - const dbUrl = 'postgresql://postgres:postgres@127.0.0.1:5432/postgres'; + const dbUrl = integrationConfig.dbUrl; const sql = createSql(dbUrl); return async () => { diff --git a/pkgs/edge-worker/tests/e2e/_helpers.ts b/pkgs/edge-worker/tests/e2e/_helpers.ts index a06583e73..2b3c559c4 100644 --- a/pkgs/edge-worker/tests/e2e/_helpers.ts +++ b/pkgs/edge-worker/tests/e2e/_helpers.ts @@ -2,6 +2,7 @@ import { createSql } from '../sql.ts'; import { delay } from '@std/async'; import ProgressBar from 'jsr:@deno-library/progress'; import { dim } from 'https://deno.land/std@0.224.0/fmt/colors.ts'; +import { e2eConfig } from '../config.ts'; interface WaitForOptions { pollIntervalMs?: number; @@ -102,19 +103,23 @@ export async function waitForSeqToIncrementBy( const startVal = await seqLastValue(seqName); let lastVal = startVal; - return await waitFor( - async () => { - lastVal = await seqLastValue(seqName); - progress.render(lastVal); - const incrementedBy = lastVal - startVal; - - return incrementedBy >= value ? lastVal : false; - }, - { - ...options, - description: `sequence ${seqName} to reach value ${value}`, - } - ); + try { + return await waitFor( + async () => { + lastVal = await seqLastValue(seqName); + progress.render(lastVal); + const incrementedBy = lastVal - startVal; + + return incrementedBy >= value ? lastVal : false; + }, + { + ...options, + description: `sequence ${seqName} to reach value ${value}`, + } + ); + } finally { + progress.end(); + } } export async function waitForActiveWorker() { @@ -122,9 +127,27 @@ export async function waitForActiveWorker() { async () => { const sql = createSql(); try { - const [{ has_active: hasActiveWorker }] = - await sql`SELECT count(*) > 0 AS has_active FROM pgflow.active_workers`; - log('waiting for active worker ', hasActiveWorker); + const workers = await sql` + SELECT worker_id, function_name, last_heartbeat_at, started_at + FROM pgflow.workers + ORDER BY started_at DESC + LIMIT 5 + `; + log( + 'workers in DB:', + workers.length, + workers.map((w) => ({ + fn: w.function_name, + hb: w.last_heartbeat_at, + started: w.started_at, + })) + ); + + const [{ has_active: hasActiveWorker }] = await sql` + SELECT count(*) > 0 AS has_active + FROM pgflow.workers + WHERE last_heartbeat_at >= NOW() - INTERVAL '6 seconds' + `; return hasActiveWorker; } finally { await sql.end(); @@ -147,12 +170,75 @@ export async function fetchWorkers(functionName: string) { } export async function startWorker(workerName: string) { - const sql = createSql(); - try { - await sql`SELECT pgflow.spawn(${workerName}::text)`; - } finally { - await sql.end(); + log(`Starting worker: ${workerName}`); + + // Trigger the edge function via HTTP request + const apiUrl = e2eConfig.apiUrl; + const url = `${apiUrl}/functions/v1/${workerName}`; + + log(`Fetching ${url}`); + + // Retry logic for server startup + let lastError: Error | null = null; + const maxRetries = 10; + const retryDelayMs = 1000; + + for (let i = 0; i < maxRetries; i++) { + try { + const response = await fetch(url); + + const body = await response.text(); + log( + `Response: ${response.status} ${response.statusText}`, + body.substring(0, 200) + ); + + if (response.ok) { + await waitForActiveWorker(); + log('worker spawned!'); + return; + } + + lastError = new Error( + `Failed to start worker ${workerName}: ${response.status} ${response.statusText}\n${body}` + ); + + // Retry on 404 (function not ready yet) or 502/503 (server starting) + if ( + response.status === 404 || + response.status === 502 || + response.status === 503 + ) { + log( + `Retry ${i + 1}/${maxRetries}: ${response.status} ${ + response.statusText + }` + ); + await delay(retryDelayMs); + continue; + } + + // Other errors - fail immediately + throw lastError; + } catch (err) { + lastError = err as Error; + if ( + err instanceof TypeError && + err.message.includes('error sending request') + ) { + // Connection error - server not ready + log(`Retry ${i + 1}/${maxRetries}: Connection error`); + await delay(retryDelayMs); + continue; + } + throw err; + } } - await waitForActiveWorker(); - log('worker spawned!'); + + throw ( + lastError || + new Error( + `Failed to start worker ${workerName} after ${maxRetries} retries` + ) + ); } diff --git a/pkgs/edge-worker/tests/e2e/performance.test.ts b/pkgs/edge-worker/tests/e2e/performance.test.ts index 70950216a..554a009a2 100644 --- a/pkgs/edge-worker/tests/e2e/performance.test.ts +++ b/pkgs/edge-worker/tests/e2e/performance.test.ts @@ -11,7 +11,11 @@ const MESSAGES_TO_SEND = 20000; const WORKER_NAME = 'max_concurrency'; Deno.test( - 'worker can handle tens of thousands of jobs queued at once', + { + name: 'worker can handle tens of thousands of jobs queued at once', + sanitizeOps: false, // Progress bar uses async writes that don't complete before test ends + sanitizeResources: false, + }, async () => { await withSql(async (sql) => { await sql`CREATE SEQUENCE IF NOT EXISTS test_seq`; @@ -26,8 +30,9 @@ Deno.test( try { const [{ worker_count }] = await tempSql` SELECT COUNT(*)::integer AS worker_count - FROM pgflow.active_workers + FROM pgflow.workers WHERE function_name = ${WORKER_NAME} + AND last_heartbeat_at >= NOW() - INTERVAL '6 seconds' `; log('worker_count', worker_count); diff --git a/pkgs/edge-worker/tests/e2e/restarts.test.ts b/pkgs/edge-worker/tests/e2e/restarts.test.ts index afbdca784..0253ca021 100644 --- a/pkgs/edge-worker/tests/e2e/restarts.test.ts +++ b/pkgs/edge-worker/tests/e2e/restarts.test.ts @@ -14,7 +14,13 @@ const WORKER_NAME = 'cpu_intensive'; // single message and amount of messages to send const MESSAGES_TO_SEND = 30; -Deno.test('should spawn next worker when CPU clock limit hits', async () => { +Deno.test( + { + name: 'should spawn next worker when CPU clock limit hits', + sanitizeOps: false, // Progress bar uses async writes that don't complete before test ends + sanitizeResources: false, + }, + async () => { await withSql(async (sql) => { await sql`CREATE SEQUENCE IF NOT EXISTS test_seq`; await sql`ALTER SEQUENCE test_seq RESTART WITH 1`; @@ -26,10 +32,8 @@ Deno.test('should spawn next worker when CPU clock limit hits', async () => { await sql`SELECT pgmq.create(${WORKER_NAME})`; await sql` DELETE FROM pgflow.workers - WHERE worker_id IN ( - SELECT worker_id - FROM pgflow.inactive_workers - )`; + WHERE last_heartbeat_at < NOW() - INTERVAL '6 seconds' + `; await startWorker(WORKER_NAME); await sendBatch(MESSAGES_TO_SEND, WORKER_NAME); diff --git a/pkgs/edge-worker/tests/integration/_helpers.ts b/pkgs/edge-worker/tests/integration/_helpers.ts index a796fc306..ba2aa9602 100644 --- a/pkgs/edge-worker/tests/integration/_helpers.ts +++ b/pkgs/edge-worker/tests/integration/_helpers.ts @@ -8,9 +8,10 @@ import { PgflowSqlClient } from '@pgflow/core'; import type { PlatformAdapter, CreateWorkerFn } from '../../src/platform/types.ts'; import type { SupabaseResources, SupabaseEnv } from '@pgflow/dsl/supabase'; import { createServiceSupabaseClient } from '../../src/core/supabase-utils.ts'; +import { integrationConfig } from '../config.ts'; const DEFAULT_TEST_SUPABASE_ENV: SupabaseEnv = { - EDGE_WORKER_DB_URL: 'postgresql://postgres:postgres@127.0.0.1:5432/postgres', + EDGE_WORKER_DB_URL: integrationConfig.dbUrl, SUPABASE_URL: 'https://test.supabase.co', SUPABASE_ANON_KEY: 'test-anon-key', SUPABASE_SERVICE_ROLE_KEY: 'test-service-key', diff --git a/pkgs/edge-worker/tests/sql.ts b/pkgs/edge-worker/tests/sql.ts index a99f6aac7..242185ad6 100644 --- a/pkgs/edge-worker/tests/sql.ts +++ b/pkgs/edge-worker/tests/sql.ts @@ -1,6 +1,8 @@ import postgres from 'postgres'; +import { e2eConfig } from './config.ts'; -const DB_URL = 'postgresql://postgres:postgres@127.0.0.1:5432/postgres'; +// Use the Supabase database (same as edge workers connect to) +const DB_URL = e2eConfig.dbUrl; export function createSql() { return postgres(DB_URL, {