feat: workpool sharding to eliminate OCC contention#401
Conversation
a9bd0a0 to
618a161
Compare
📝 WalkthroughWalkthroughThis PR introduces workflow engine sharding to distribute concurrent executions across multiple manager instances, reducing optimistic concurrency control (OCC) contention. It adds a sharding layer that computes a shard index from workflow definition IDs and routes mutations to the appropriate manager. The PR also includes comprehensive test coverage for execution lifecycle, variables handling, serialization, step execution, and scheduling logic. Additionally, a new stress-test framework is introduced with multiple scenarios (concurrent starts, sustained load, payload pressure, loop contention, scheduler overlap, shard comparison) and supporting infrastructure for metrics collection and polling. Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 20
🤖 Fix all issues with AI agents
In `@services/platform/convex/workflow_engine/engine.ts`:
- Around line 131-139: cleanupComponentWorkflow indexes workflowManagers with
args.shardIndex without validation, which can produce undefined manager and
crash; validate and clamp/fallback the shard index before lookup (e.g., if
args.shardIndex is missing, negative, or >= workflowManagers.length, use 0 or
args.shardIndex % workflowManagers.length), assign the resolved index to a local
variable and use it to get manager, and if no manager is found throw or log a
clear error before calling EngineHelpers.cleanupComponentWorkflow so you never
pass undefined as manager.
In
`@services/platform/convex/workflow_engine/helpers/engine/dynamic_workflow_handler.test.ts`:
- Around line 3-20: The test in dynamic_workflow_handler.test.ts duplicates the
logic of the private function buildRetryBehaviorFromPolicy, which can drift from
the implementation; instead, either export the function (e.g.,
_buildRetryBehaviorFromPolicy) from the module and update tests to import and
assert its real behavior, or add an integration test that calls the public entry
point that uses buildRetryBehaviorFromPolicy to validate the produced retry
behavior end-to-end; update imports and test assertions accordingly so the tests
exercise the actual implementation rather than an inlined copy.
In
`@services/platform/convex/workflow_engine/helpers/engine/execute_workflow_start.ts`:
- Around line 133-136: Guard against an out-of-range shardIndex when selecting
from DYNAMIC_WORKFLOW_REFS: validate or clamp args.shardIndex before using it
(e.g., ensure it's a number between 0 and DYNAMIC_WORKFLOW_REFS.length - 1),
fall back to a safe default index (0) if invalid, and then use the validated
index when calling args.workflowManager.start so componentWorkflowId creation
can't receive undefined.
In `@services/platform/convex/workflow_engine/helpers/engine/shard.test.ts`:
- Around line 23-29: The test "distributes across multiple shards for varied
inputs" uses Math.random(), which can introduce flakiness; replace the
non-deterministic input with deterministic variations so results are
reproducible — for example, build inputs from the loop index or a fixed array of
distinct strings (e.g., `wfDef_${i}_${i}` or a predefined list) when calling
getShardIndex in the test so the Set populated in shards deterministically
contains multiple values every run.
In
`@services/platform/convex/workflow_engine/helpers/nodes/action/execute_action_node.test.ts`:
- Around line 3-23: The test duplicates production helpers (isSecureWrapper and
sanitizeActionResultForOutput) causing tautological tests; instead export those
helper functions from execute_action_node (e.g., add an internal __test__
export) or otherwise make them importable, then remove the local
re-implementations and import the real isSecureWrapper and
sanitizeActionResultForOutput into execute_action_node.test.ts; alternatively,
delete the duplicated helpers and write tests that exercise the public
execute_action_node behavior to cover the same logic.
In
`@services/platform/convex/workflow_engine/helpers/nodes/condition/execute_condition_node.test.ts`:
- Around line 67-90: Add extra unit tests to cover non-boolean return types for
executeConditionNode: add cases that call executeConditionNode with expressions
that evaluate to a number (e.g., "42"), null ("null"), and an object/array
(e.g., "{}" or "[]") and assert each throws 'Expression must return boolean';
place them alongside the existing non-boolean test in
execute_condition_node.test.ts so error handling is consistently validated for
different non-boolean types.
In
`@services/platform/convex/workflow_engine/helpers/step_execution/execute_step_by_type.test.ts`:
- Around line 42-54: The test currently asserts a locally defined actionMap
instead of the real dispatch used by executeStepByType; either import the actual
dispatch map (e.g., ACTION_MAP or whatever constant is exported from the module
that contains executeStepByType) and assert its values (including handlers like
executeLLMNode, executeTriggerNode, executeConditionNode, executeActionNode,
executeLoopNode) are unique, or replace the test with an integration-style test
that calls executeStepByType for each StepType while mocking/spying on
ctx.runAction (or the specific handler functions) to verify the correct handler
is invoked for each StepType; update the test to remove the hard-coded actionMap
and reference the real implementation symbols so changes to executeStepByType
fail the test.
- Around line 3-8: The tests currently assert local constants instead of real
behavior; extract the dispatch routing into a pure function (e.g., create and
export a getActionForStepType or mapStepTypeToAction) and unit-test that mapping
directly, or alternatively replace the assertions with an integration-style test
that constructs a mock ActionCtx which implements runAction and records calls,
then invoke executeStepByType and assert the mocked ctx.runAction was called
with the expected internal action and arguments; update tests in
execute_step_by_type.test.ts to use the exported mapping or the ActionCtx mock
(and rename the suite if you decide to keep it as documentation rather than a
true test).
- Around line 29-40: The test is tautological: it iterates over stepTypes and
asserts stepTypes contains the iterated value, which always passes; replace it
with a real behavior test by calling the function under test (executeStepByType)
with a representative MockStepDef for each value in stepTypes and asserting
expected outcomes (e.g., returned value, thrown error, or that the correct
handler was invoked), or remove the test entirely; update the test to reference
stepTypes, MockStepDef, and executeStepByType so each case validates actual
implementation behavior rather than membership in stepTypes.
- Around line 65-77: The test "should use correct config casting for each step
type" contains trivial assertions on local literals (triggerConfig,
conditionConfig) that add no value; remove those assertions and instead call the
actual code under test (e.g., the module/function responsible for step config
casting such as executeStepByType or any exported cast/normalize function used
by it) and assert its output/side-effects for each step type (trigger,
condition, action, loop, llm). Alternatively, if there is no runtime casting
logic, replace the test with TypeScript type/JSDoc-based documentation or delete
the test; ensure references to triggerConfig and conditionConfig are replaced by
invoking the real casting/validation function and asserting expected transformed
shapes (including organizationId for llm and loop fields like
collection/itemVariable/indexVariable).
In
`@services/platform/convex/workflow_engine/helpers/step_execution/merge_execution_variables.test.ts`:
- Around line 45-53: Add an immutability assertion to the test for
mergeExecutionVariables: after calling mergeExecutionVariables(base, steps)
verify that the original base object has not been mutated (e.g., base.steps is
still 'old_value' and other remains true). Locate the test case that currently
checks overwrite behavior (the it block referencing mergeExecutionVariables) and
add assertions that inspect the original base variable to ensure it remains
unchanged, ensuring callers that rely on input immutability are protected.
In `@services/platform/convex/workflows/definitions/delete_workflow.ts`:
- Around line 72-123: The loop uses execution.shardIndex directly which can be
corrupted and make workflowManagers[shardIndex] undefined; normalize and
validate the index once (e.g., derive a safeShardIndex by bounding it to
0..workflowManagers.length-1 or skip the entry if out of range) and then use
that safeShardIndex for both manager lookup (when calling manager.cancel in the
loop) and when pushing entries into cleanupEntries and later in
scheduleCleanupBatch (used by ctx.scheduler.runAfter); ensure you also skip
scheduling/cancel if no valid manager exists to avoid runtime exceptions.
In `@services/platform/stress-tests/metrics.ts`:
- Around line 45-56: The update function accepts status: string and blindly
casts it to ExecutionRecord['status'], risking invalid values; change the
signature to status: ExecutionRecord['status'] or validate allowed statuses
before assignment in update(executionId: string, status:
ExecutionRecord['status'] | string, error?: string) and reject or normalize
unknown values (e.g., throw, early return, or map to a default) prior to setting
record.status and record.completedAt; reference the update method and
ExecutionRecord type when making the change so the compiler enforces correct
status values or the runtime validates them.
In `@services/platform/stress-tests/poll.ts`:
- Around line 13-16: The ExecutionStatus interface is using a loose string type
for status; align it with the domain workflow execution status by replacing
status: string with the canonical type/enum used elsewhere (e.g.,
WorkflowExecutionStatus or ExecutionState enum) and update any callers (polling
logic) to import and use that type, keeping error?: string as-is; modify the
interface declaration for ExecutionStatus to reference the shared type (or
export a thin alias) so polling results are type-safe and consistent with
workflow execution models.
In `@services/platform/stress-tests/run-stress-test.ts`:
- Around line 116-137: Different polling methods are used: pollExecution (which
uses client.query in run-stress-test.ts) vs pollExecutionViaConvexRun (which
shells out to `npx convex run`) in scenario files; standardize by choosing one
approach and updating code accordingly — either refactor scenario files (e.g.,
concurrent-starts.ts) to call the same client-based pollExecution helper used in
run-stress-test.ts (move pollExecution into a shared helper module and import
it), or add a clear comment in each file explaining why the CLI-based
pollExecutionViaConvexRun is required for that scenario; update references to
pollExecution/pollExecutionViaConvexRun so all consumers use the chosen
implementation and remove duplicate logic.
In `@services/platform/stress-tests/scenarios/concurrent-starts.ts`:
- Around line 83-87: The current creation of pending redundantly filters
launched for executionId even though launched already contains only entries with
non-null executionId; remove the .filter((r) => r.executionId) step and build
the Map directly from launched.map((r) => [r.id, r.executionId as string]) to
simplify the code (referenced symbols: pending and launched in
concurrent-starts.ts).
In `@services/platform/stress-tests/scenarios/scheduler-overlap.ts`:
- Around line 64-72: The simulatedSchedule assignment is fragile because the
hardcoded array (used in scheduler-overlap scenario) assumes config.total === 6
and indexing with i can return undefined; update the logic in scheduler-overlap
to defensively derive the schedule list or bound the index: either build
simulatedSchedule from the config (e.g., generate or repeat entries to match
config.total), or compute a safe index (i % simulatedScheduleArray.length or
clamp i to the array length) and provide a sensible fallback value when
undefined; make the change where simulatedSchedule is assigned so references to
simulatedSchedule and the loop/index variable i are updated accordingly.
In `@services/platform/stress-tests/scenarios/shard-comparison.ts`:
- Around line 111-115: The pending Map construction redundantly filters for
executionId even though launched already contains only entries with non-null
executionId; remove the .filter((e) => e.executionId) step and build pending
directly from launched.map(e => [e.id, e.executionId as string]) so the code is
simpler and avoids an unnecessary check (referencing the pending variable and
launched array in shard-comparison.ts).
- Around line 243-250: The comparison block using avgStartA and avgStartB can
divide by zero; update the if/else around "if (avgStartB < avgStartA)" to first
check avgStartA === 0 and handle that edge: if avgStartA === 0 and avgStartB ===
0 log "No measurable improvement", if avgStartA === 0 and avgStartB > 0 log a
clear message like "Single-shard average is 0ms; improvement cannot be
computed", otherwise compute improvement as now with ((1 - avgStartB /
avgStartA) * 100).toFixed(0) and log it; reference the avgStartA/avgStartB
variables and the computed improvement variable so the guard is applied before
dividing.
In `@services/platform/stress-tests/scenarios/sustained-load.ts`:
- Around line 4-6: The scenario currently uses a hard-coded batch interval and
batch size which ignores the scenario config and ramp-up timing; update the
logic in sustained-load.ts to compute the interval from the scenario config's
rampUpSeconds (and totalDurationMinutes if present) instead of a fixed 2s value,
e.g., derive intervalMs = (rampUpSeconds * 1000) / numberOfBatches (or compute
batches from rampUpSeconds and batchSize) so the loop that calls startWorkflows
/ scheduleBatch uses this computed interval and respects rampUpSeconds and
intended 5-minute ramp; adjust any usages of batchSize, intervalMs, and the
loop/timer code so duration and total count match the header description.
618a161 to
cea1831
Compare
FNV-1a hash of wfDefinitionId distributes workflows across 4 shards to eliminate OCC contention on the single runStatus singleton.
Register 4 workflow component instances in convex.config.ts, each with independent runStatus/pendingStart/pendingCompletion tables. Route startWorkflow, cancel, and cleanup to the correct shard via persisted shardIndex on wfExecutions.
Covers dynamic workflow handler, action/condition/loop node execution, variable serialization/deserialization, step execution routing, and variable merging.
Functional smoke tests for concurrent starts, scheduler overlap, shard comparison, sustained load, payload pressure, and loop contention. Verifies core module behavior, not OCC contention.
Add safeShardIndex helper that clamps to 0 if the persisted value is out of range, negative, or undefined. Applied in cleanupComponentWorkflow, executeWorkflowStart, and cancelAndDeleteExecutionsBatch.
20c7b28 to
4196da2
Compare
Summary
wfDefinitionIddistributes workflows across 4 independent@convex-dev/workflowcomponent instances, each with isolatedrunStatus/pendingStart/pendingCompletiontablesWorkflowManagerinstances, shard-awarestartWorkflow,cancel, andcleanup;shardIndexpersisted onwfExecutionsfor backward compatibilityContext
Production Sentry reports OCC errors on
runStatus,pendingStart, andpendingCompletiontables when 6+ workflow definitions fire concurrently at:00. Root cause: allstartWorkflowmutations touch the same singleton document in a single@convex-dev/workflowcomponent instance.How it works
getShardIndex(wfDefinitionId)hashes the definition ID to a shard (0-3)startWorkflowroutes to the matchingWorkflowManageranddynamicWorkflowrefshardIndexis persisted on the execution record so cancel/cleanup uses the correct shardshardIndexdefault to shard 0 (the original component instance)Test plan