Skip to content

Conversation

@jumski
Copy link
Contributor

@jumski jumski commented Jan 7, 2025

This PR imports Supaworker code from my private monorepo ("Feedwise") in which it was spawned as a dependency.

jumski added 30 commits January 3, 2025 02:27
Squashed commit of the following:

commit 210c52e7a37ea2a728572a4789958c9fb3afa1c5
Author: Wojtek Majewski <wojciech.majewski@pm.me>
Date:   Fri Jan 3 02:19:10 2025 +0100

    refactor(pgflow-worker-2): simplify message handler type

    Removed explicit Json type annotation from the message parameter in the Supaworker.start function ca
    ll, relying on the inferred type from the Supaworker class.

    Update handler type to use generic MessagePayload

commit 56e190ceadf7d4794ca4c94cba327eaf6ba3456a
Author: Wojtek Majewski <wojciech.majewski@pm.me>
Date:   Fri Jan 3 01:05:58 2025 +0100

    refactor(supaworker): convert Supaworker to a class with static start method

    Refactored the Supaworker function into a class with a static start method to improve encapsulation
    and reusability. Updated the SupaworkerConfig type to extend WorkerConfig, allowing for more flexibl
    e configuration options.

    refactor(pgflow-worker-2): update Supaworker usage to new class method

    Modified the pgflow-worker-2 to use the new Supaworker.start method, improving the structure and rea
    dability of the code. Added type safety for the message parameter using the Json type.

commit 5f2a9b59c06313fd66f82f728357b2cdb4ee0f62
Author: Wojtek Majewski <wojciech.majewski@pm.me>
Date:   Fri Jan 3 01:02:20 2025 +0100

    feat(docs): add pg_polymorphic_dispatch3.md for PostgreSQL workflow engine design

    Introduce a new documentation file outlining the design principles and implementation approach for b
    uilding an extensible PostgreSQL workflow engine. The document covers core design principles, implem
    entation strategies, best practices, and example extensions to guide developers in creating modular
    and future-proof workflow systems.

commit 7ade2f4d31d7e8d201b4876c494b065fd086f659
Author: Wojtek Majewski <wojciech.majewski@pm.me>
Date:   Thu Jan 2 22:50:20 2025 +0100

    feat(supaworker): introduce Supaworker function for message handling

    - Added a new Supaworker function in `_supaworker/index.ts` to handle messages from a specified queu
    e.
    - Ensures Supaworker is instantiated only once and manages worker lifecycle.
    - Simplified `pgflow-worker-2` by utilizing the new Supaworker function for message processing.

commit 9d734bc679f15750aa5de1dbcce2e2042271876e
Author: Wojtek Majewski <wojciech.majewski@pm.me>
Date:   Thu Jan 2 00:59:55 2025 +0100

    feat(supabase): add cleanup steps to supaworker.sql

    Added SQL commands to clear the `pgmq.q_pgflow` and `pgmq.a_pgflow` tables and reset the `pgflow_tes
    t_seq` sequence. This ensures a clean state before running the worker.

commit 448abcb6383f558d6abc1ce1fc46175ccc25c8ee
Author: Wojtek Majewski <wojciech.majewski@pm.me>
Date:   Thu Jan 2 00:58:55 2025 +0100

    feat(supabase): add query to calculate supaworker jobs per second

    Added a new SQL query to calculate the number of jobs processed per second by supaworker. The query
    computes the time difference between the maximum last heartbeat and the minimum start time of inacti
    ve workers, then calculates the jobs per second based on this duration.

commit 98249ea991425c6fbecb75ec071ef77a46c1f761
Author: Wojtek Majewski <wojciech.majewski@pm.me>
Date:   Thu Jan 2 00:42:31 2025 +0100

    refactor(supaworker): rename function from `spawn_new_worker` to `spawn`

    Updated the function name from `spawn_new_worker` to `spawn` across various files including Queries.
    ts, supaworker.sql, pgmq_tests.sql, and types.d.ts for consistency and clarity.

commit d97659228042709ff671982ef38ccfc4d6e99e92
Author: Wojtek Majewski <wojciech.majewski@pm.me>
Date:   Thu Jan 2 00:19:20 2025 +0100

    refactor(supabase): remove pgflow_pgmq schema and related functions

    - Deleted the `pgflow_pgmq` schema and its associated functions `start_edgefn_worker` and `stop_edge
    fn_worker` from the migration file `20241217111641_pgflow_pgmq.sql`.
    - Removed `pgflow_pgmq` related types from `types.d.ts`.
    - Updated `20241220130305_supaworker.sql` to include `pgmq` extension creation and `pgflow` queue se
    tup.

commit 49698894b637520e9b980d9de6cda0bef8863e20
Author: Wojtek Majewski <wojciech.majewski@pm.me>
Date:   Thu Jan 2 00:09:49 2025 +0100

    feat(supaworker.sql): add new supaworker SQL script for worker management

    Introduced a new SQL script to manage supaworker operations, including spawning new workers and sche
    duling cron jobs. This script provides a structured approach to handle worker lifecycle and runtime
    metrics.

commit b622d4d13ba592b33ca88490f9e09604a7d65d77
Author: Wojtek Majewski <wojciech.majewski@pm.me>
Date:   Wed Jan 1 23:39:44 2025 +0100

    refactor(supabase): rename schema from pgflow_supabase to supaworker

    Updated the schema name in Queries.ts and migration SQL files to reflect the new naming convention.
    This change ensures consistency across the codebase and aligns with the updated schema structure.

commit e50e864f630dcc5c0e4923c6545a2159992d7cdc
Author: Wojtek Majewski <wojciech.majewski@pm.me>
Date:   Wed Jan 1 23:35:04 2025 +0100

    refactor(pgflow): extract supaworker to separate lib directory for easier export to monorepo
Introduced a new `Heartbeat` class to manage the periodic sending of heartbeats in the Worker. This
refactor removes the heartbeat logic from the Worker class, encapsulating it within the Heartbeat cl
ass for better separation of concerns and code readability. The heartbeat interval is set to 5000ms.
… message

The check for `controller.signal.aborted` was removed as it is redundant. The message is now archive
d directly after handling, ensuring consistent processing flow.
Added a new optional configuration parameter `maxPgConnections` to the `WorkerConfig` interface, all
owing customization of the maximum number of PostgreSQL connections. If not specified, it defaults t
o 4.
Changed the return type of the handler function in the Supaworker class to accept any promise, enhan
cing flexibility in handling different message payloads.
Refactored the Worker class to consolidate the initialization of WorkerConfig properties into a sing
le `config` object. This change simplifies the constructor by reducing repetitive code and ensures d
efault values are applied consistently. Additionally, removed redundant initializations and commente
d out a debug log statement.
…anagement

- Added `ExecutionQueue` class to manage and execute message tasks efficiently.
- Introduced `Logger` class for consistent logging with worker ID context.
- Refactored `Worker` class to utilize `ExecutionQueue` for handling message execution.
- Replaced inline logging with `Logger` for better log management.
- Removed redundant active executor management logic from `Worker`.
Replaced the ExecutionQueue with a new ExecutionController to manage message execution. This change
introduces a more robust handling of concurrent executions with a maximum concurrency limit and impr
oved abort handling.
Fixes high CPU usage - it is a backport from 1.5.0. Slight adjustment were needed: ", s.headers" was
removed as we do not have this column in 1.4.4.

Function added in supaworker schema.
Removed the executeMessage method and directly used executionController.start in the message process
ing loop to simplify the code and reduce indirection.
Introduced a new configuration option `maxConcurrency` to the Worker class, allowing control over th
e maximum number of concurrent operations. Updated the pgflow-worker-2 to utilize this new option, r
eplacing the previous `batchSize` parameter.
Squashed commit of the following:

commit 22fbf434efa204a5c0524ec02ec095d66bef849c
Author: Wojtek Majewski <wojciech.majewski@pm.me>
Date:   Sun Jan 5 20:31:06 2025 +0100

    fix(worker): change acknowledge methods to private

    Refactored `acknowledgeStart` and `acknowledgeStop` methods to be private, ensuring encapsulation wi
    thin the Worker class.

commit 6123fb88257724c9bcf8bd69684084072b06a348
Author: Wojtek Majewski <wojciech.majewski@pm.me>
Date:   Sun Jan 5 20:09:54 2025 +0100

    refactor(supaworker): remove waitUntil from Worker

    The waitUntil function and its associated comments were removed from Worker.ts as they were not bein
    g used. This simplifies the code and removes unnecessary TypeScript ignore comments.

    Adjusted the configuration parameters for maxConcurrency, batchSize, maxPgConnections, and maxPollSe
    conds to optimize performance and resource usage.
- Updated `Heartbeat.ts` to accept an optional `functionName` parameter in the `send` method.
- Modified `Queries.ts` to handle the `functionName` parameter when sending a heartbeat.
- Enhanced `Worker.ts` to store and pass the edge function name to the heartbeat.
- Adjusted `index.ts` to extract and set the function name from the HTTP request path.
- Updated the database schema in `supaworker.sql` to include `edge_fn_name` in the `workers` table.
…pawn

- Changed `edgeFunctionName` to be a readonly property in `Worker.ts`.
- Updated `index.ts` to conditionally spawn a new edge function using `edgeFunctionName` if it is de
fined.
Modified the MessageExecutor to use the setVt method for re-queuing messages with a visibility timeo
ut instead of sending the message again. Added setVt method to Queue class to handle visibility time
out updates.
@nx-cloud
Copy link

nx-cloud bot commented Jan 7, 2025

View your CI Pipeline Execution ↗ for commit 2df643f.

Command Status Duration Result
nx affected -t lint test build ❌ Failed 1s View ↗

☁️ Nx Cloud last updated this comment at 2025-01-07 16:56:49 UTC

@jumski jumski merged commit 9fcaa0d into main Jan 7, 2025
1 of 2 checks passed
@jumski jumski deleted the chore/import-supaworker branch January 7, 2025 17:00
graphite-app bot pushed a commit that referenced this pull request Nov 3, 2025
…Testing

 (#304)

## Summary

This PR addresses **critical bugs** in pgflow's realtime broadcasting system and dramatically improves test coverage for broadcast events. Two unreferenced CTEs in `start_ready_steps()` are never executed due to PostgreSQL's query optimizer, causing:

1. **`step:started`** **events never broadcast** - breaks real-time DAG visualization
2. **`step:completed`** **events for empty maps never broadcast** - breaks observability for edge cases

Additionally, existing integration tests were too weak to catch these bugs. This PR adds comprehensive test coverage using new event matchers that verify exact event sequences, payloads, and counts.

## Root Cause: PostgreSQL CTE Optimization

PostgreSQL's query optimizer **does not execute unreferenced CTEs with SELECT statements** - even when those SELECTs call functions with side effects like `realtime.send()`.

**Critical distinction:**

- ✅ **CTEs with INSERT/UPDATE/DELETE** - ALWAYS executed (side effects assumed)
- ❌ **CTEs with SELECT only** - SKIPPED if unreferenced (no side effects assumed)

In `pkgs/core/schemas/0100_function_start_ready_steps.sql`:

```sql
-- This CTE is NEVER executed (unreferenced SELECT)
broadcast_events AS (
  SELECT realtime.send(...)  -- ❌ Never runs!
  FROM started_step_states
),

-- This CTE is ALSO never executed (unreferenced SELECT)
broadcast_empty_completed AS (
  SELECT realtime.send(...)  -- ❌ Never runs!
  FROM completed_empty_steps
),

-- Only this INSERT executes
INSERT INTO pgflow.step_tasks (...)
SELECT ... FROM sent_messages;  -- ✅ Runs, but CTEs above don't
```

## Bugs Discovered

### Bug 1: `step:started` Events Never Broadcast

**Impact:**

- Clients never receive `step:started` events during flow execution
- DAG visualizations only update when steps complete (not when they begin)
- Active step tracking (`flowState.activeStep`) never updates
- WebSocket inspection shows only `step:*:completed` events

**Affected Code:** `pkgs/core/schemas/0100_function_start_ready_steps.sql:144-162`

The `broadcast_events` CTE that sends `step:started` messages is unreferenced and never executes.

### Bug 2: Empty Map `step:completed` Events Never Broadcast

**Impact:**

- Empty map steps (arrays with length 0) complete silently with no broadcast
- Clients miss completion events for edge case flows
- Breaks observability for map steps with empty input arrays

**Affected Code:** `pkgs/core/schemas/0100_function_start_ready_steps.sql:45-64`

The `broadcast_empty_completed` CTE is unreferenced and never executes.

## Historical Context

This is the **same bug pattern** that affected `step:failed` events, fixed in commit `2b1ea777` (June 19, 2025):

> "fix: address bug where step:failed event was not broadcast due to CTE optimization"

**Related commits:**

- `2b1ea777` - Fixed `step:failed` broadcast with PERFORM statements
- `220c8672` - PR #161: "Fix step:failed events not being broadcast"
- `ab17a0c5` - Ensured step:failed events are broadcast
- `fe18250a` - Addressed broadcasting bug due to CTE optimization

The fix for `step:failed` used explicit `PERFORM` statements to force execution. However, this pattern was **never applied** to `step:started` or empty map broadcasts.

## Why Tests Didn't Catch This

The existing integration test in `pkgs/client/__tests__/integration/real-flow-execution.test.ts` only verified that **some** events were received:

```typescript
// OLD TEST (too weak)
let stepEventCount = 0;
step.on('*', () => { stepEventCount++; });
// ...
expect(stepEventCount).toBeGreaterThan(0);  // ❌ Passes with only 1 event!
```

This passes even if only `step:completed` is broadcast (count = 1), without checking for `step:started`.

## Changes in This PR

### 1\. Test Infrastructure Improvements

Added comprehensive event matchers to `pkgs/client/__tests__/helpers/test-utils.ts`:

- `toHaveReceivedEvent(type, payload?)` - Verify specific event was received with optional payload matching
- `toNotHaveReceivedEvent(type)` - Verify event was NOT received
- `toHaveReceivedEventCount(type, count)` - Verify exact event count
- `toHaveReceivedTotalEvents(count)` - Verify total event count
- `toHaveReceivedEventSequence(types[])` - Verify exact event sequence
- `toHaveReceivedInOrder(type1, type2)` - Verify ordering of two events

These matchers enable precise verification of broadcast behavior.

### 2\. Client Unit Test Coverage (100+ New Test Cases)

Enhanced `pkgs/client/__tests__/FlowRun.test.ts` and `FlowStep.test.ts` with:

**FlowRun Event Tests:**

- ✅ Comprehensive payload validation for `run:started`, `run:completed`, `run:failed`
- ✅ Event lifecycle verification (started → completed, started → failed)
- ✅ Duplicate event rejection (same status transitions)
- ✅ Foreign-run event protection
- ✅ `waitForStatus(Failed)` with timeout/abort support

**FlowStep Event Tests:**

- ✅ Comprehensive payload validation for all step event types
- ✅ Event sequence verification (started → completed, started → failed)
- ✅ Empty map edge case (completed ONLY, no started)
- ✅ Duplicate event rejection
- ✅ Foreign-step event protection
- ✅ `waitForStatus(Started)` edge cases for empty maps

**Key improvements:**

- All event assertions now use event matchers (not just count checks)
- Payload validation ensures correct data in events
- Event sequence verification catches ordering bugs
- Edge case coverage for empty maps and error conditions

### 3\. Integration Test Coverage (4 New Tests)

Added critical integration tests to `pkgs/client/__tests__/integration/real-flow-execution.test.ts`:

#### Test 1: **CRITICAL -** **`step:started`** **Broadcast Verification** (CURRENTLY FAILING)

```typescript
it('CRITICAL: broadcasts step:started events (CTE optimization bug check)', ...)
```

This test **WILL FAIL** until Bug #1 is fixed. It specifically verifies:

- `step:started` event is broadcast when `start_ready_steps()` executes
- Event payload contains correct `run_id`, `step_slug`, `status`
- Event sequence is `['step:started', 'step:completed']`
- Both events received exactly once

**Why it fails:** The `broadcast_events` CTE is unreferenced and never executes.

#### Test 2: Empty Map Steps Edge Case

```typescript
it('empty map steps: skip step:started and go straight to step:completed', ...)
```

Verifies the **expected behavior** for empty map steps:

- NO `step:started` event (empty maps skip started state)
- Only `step:completed` event is broadcast
- Event payload has correct status and empty array output

**Note:** This test will ALSO FAIL due to Bug #2 (`broadcast_empty_completed` unreferenced).

#### Test 3: Enhanced Event Verification

```typescript
it('receives broadcast events during flow execution', ...)
```

Updated to use event matchers instead of weak count checks:

- Verifies exact event types (`run:completed`, `step:completed`)
- Validates event payloads (run_id, flow_slug, status, output)
- Counts events to ensure no duplicates

#### Test 4: `waitForStatus(Started)` Behavior

```typescript
it('waitForStatus(Started): waits for step to reach Started status', ...)
```

Verifies that:

- Root steps are started immediately by `start_flow()`
- `waitForStatus(Started)` resolves immediately if already started
- Step has `started_at` timestamp when in Started status

### 4\. Documentation Updates

Updated `.claude/skills/pgtap-testing/SKILL.md` description to be more comprehensive:

- Added all common phrasings users might use to trigger testing skill
- Emphasized realtime event testing patterns
- Made skill activation more reliable

## Failing Tests (To Be Fixed in Follow-up Commit)

The following tests are **EXPECTED TO FAIL** until the SQL bug is fixed:

1. ❌ `CRITICAL: broadcasts step:started events` - Fails because `broadcast_events` CTE never executes
2. ❌ `empty map steps: skip step:started and go straight to step:completed` - Fails because `broadcast_empty_completed` CTE never executes

## Solution (Not Included in This PR)

The fix will use the same pattern as the `step:failed` fix from commit `2b1ea777`:

**Option 1: Reference CTEs to force execution**

```sql
broadcast_events AS (
  SELECT realtime.send(...) as broadcast_result
  FROM started_step_states
)

INSERT INTO pgflow.step_tasks (...)
SELECT ... FROM sent_messages
-- Force CTE execution by referencing it
WHERE EXISTS (SELECT 1 FROM broadcast_events WHERE false);
```

The `WHERE EXISTS (...WHERE false)` ensures:

- CTE **must be evaluated** (referenced)
- **Zero performance impact** (WHERE false = no filtering)
- **No change to INSERT behavior**

**Option 2: Use PERFORM statements (like step:failed fix)**

```sql
-- Move broadcasts out of CTE
PERFORM realtime.send(...)
FROM started_step_states;

-- Then do INSERT
INSERT INTO pgflow.step_tasks ...
```

## Testing Strategy

This PR follows a **test-first approach**:

1. ✅ **Add comprehensive test infrastructure** (event matchers)
2. ✅ **Add failing tests that document expected behavior**
3. ⬜ **Fix SQL bugs** (follow-up commit)
4. ⬜ **Verify all tests pass**

## Impact Assessment

### Before This PR

- **Test Coverage:** Weak event counting (any event = pass)
- **Bug Detection:** Failed to catch missing broadcasts
- **Observability:** Client applications missing critical events
- **Real-time UX:** DAG visualizations only update on completion

### After This PR (Tests Only)

- **Test Coverage:** 100+ new test cases with event matchers
- **Bug Detection:** Failing tests document exact bugs
- **Documentation:** Clear understanding of event lifecycles
- **Regression Prevention:** Future CTE bugs will be caught

### After SQL Fix (Follow-up)

- **All Tests Pass:** Green CI with comprehensive coverage
- **Full Observability:** All broadcast events working correctly
- **Real-time UX:** DAG visualizations update during execution
- **Production Ready:** Robust event system with test coverage

## Files Changed

### Test Infrastructure

- `pkgs/client/__tests__/helpers/test-utils.ts` - Event matchers already exist (no changes)

### Unit Tests (Enhanced)

- `pkgs/client/__tests__/FlowRun.test.ts` - Added lifecycle, payload, edge case tests
- `pkgs/client/__tests__/FlowStep.test.ts` - Added lifecycle, payload, empty map tests

### Integration Tests (New)

- `pkgs/client/__tests__/integration/real-flow-execution.test.ts` - Added 4 new tests

### Documentation

- `.claude/skills/pgtap-testing/SKILL.md` - Enhanced skill description

### SQL (Bug Location - Not Fixed in This PR)

- `pkgs/core/schemas/0100_function_start_ready_steps.sql` - Contains both bugs (lines 45-64, 144-162)

## Next Steps

1. **Merge this PR** - Establishes test coverage and failing tests
2. **Fix SQL bugs** - Apply CTE reference pattern or PERFORM statements
3. **Verify tests pass** - All new tests should turn green
4. **Audit other SQL functions** - Check for similar unreferenced CTE patterns
5. **Consider linting rule** - Detect unreferenced CTEs with function calls

## Checklist

- [x] Added comprehensive event matchers to test utils
- [x] Enhanced FlowRun unit tests with event verification
- [x] Enhanced FlowStep unit tests with event verification
- [x] Added failing integration test for `step:started` broadcast
- [x] Added failing integration test for empty map broadcast
- [x] Updated documentation (skill descriptions)
- [x] Documented root cause and historical context
- [ ] Fixed SQL bugs (follow-up commit)
- [ ] All tests passing (after SQL fix)

## Related Issues

This PR addresses the same class of bug as:

- Commit `2b1ea777` - `step:failed` broadcast fix
- PR #161 - "Fix step:failed events not being broadcast"

## Breaking Changes

None. This PR only adds tests and documentation.

## Migration Required

None. SQL changes will come in follow-up commit.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants