From 60fbe33f820f9478c07ef5c40e2b70c7a4a11548 Mon Sep 17 00:00:00 2001 From: israel Date: Tue, 10 Feb 2026 19:36:04 +0100 Subject: [PATCH 1/2] fix(convex): shard workflows by executionId to eliminate OCC contention The previous sharding used a deterministic hash of wfDefinitionId, which funnelled all concurrent executions of the same workflow definition into a single shard. This caused persistent OCC failures on pendingStart, runStatus, and pendingCompletion tables. Now the shard is derived from the unique executionId after insert, so concurrent starts of the same definition spread across all 4 component instances. --- .../workflow_engine/helpers/engine/shard.test.ts | 15 ++++++++++++++- .../workflow_engine/helpers/engine/shard.ts | 13 +++++++++---- .../helpers/engine/start_workflow_handler.ts | 14 ++++++++++---- .../convex/workflow_engine/internal_mutations.ts | 10 ++-------- 4 files changed, 35 insertions(+), 17 deletions(-) diff --git a/services/platform/convex/workflow_engine/helpers/engine/shard.test.ts b/services/platform/convex/workflow_engine/helpers/engine/shard.test.ts index 7611fe049..d594fe7a9 100644 --- a/services/platform/convex/workflow_engine/helpers/engine/shard.test.ts +++ b/services/platform/convex/workflow_engine/helpers/engine/shard.test.ts @@ -29,7 +29,7 @@ describe('getShardIndex', () => { expect(shards.size).toBeGreaterThan(1); }); - it('produces different shards for different definition IDs', () => { + it('produces different shards for different IDs', () => { const testIds = [ 'document_rag_sync', 'onedrive_sync', @@ -42,6 +42,19 @@ describe('getShardIndex', () => { expect(shards.size).toBeGreaterThan(1); }); + it('distributes unique execution IDs across all shards', () => { + const shardCounts = new Map(); + const total = 1000; + for (let i = 0; i < total; i++) { + const shard = getShardIndex(`jd7${i}abc${i * 31}xyz${i * 97}`); + shardCounts.set(shard, (shardCounts.get(shard) ?? 0) + 1); + } + expect(shardCounts.size).toBe(NUM_SHARDS); + for (const count of shardCounts.values()) { + expect(count).toBeGreaterThan(total / NUM_SHARDS / 3); + } + }); + it('handles special characters', () => { const ids = [ '🚀', diff --git a/services/platform/convex/workflow_engine/helpers/engine/shard.ts b/services/platform/convex/workflow_engine/helpers/engine/shard.ts index d2afb2533..b3f899ba5 100644 --- a/services/platform/convex/workflow_engine/helpers/engine/shard.ts +++ b/services/platform/convex/workflow_engine/helpers/engine/shard.ts @@ -3,15 +3,20 @@ * * Distributes workflow executions across multiple @convex-dev/workflow * component instances so that concurrent startWorkflow mutations touch - * different runStatus singletons, eliminating OCC contention. + * different runStatus/pendingStart/pendingCompletion tables, + * eliminating OCC contention. + * + * Shard is derived from the unique executionId (FNV-1a hash) so that + * concurrent starts of the SAME workflow definition spread evenly + * across all shards. */ export const NUM_SHARDS = 4; -export function getShardIndex(wfDefinitionId: string): number { +export function getShardIndex(id: string): number { let hash = 2166136261; - for (let i = 0; i < wfDefinitionId.length; i++) { - hash ^= wfDefinitionId.charCodeAt(i); + for (let i = 0; i < id.length; i++) { + hash ^= id.charCodeAt(i); hash = Math.imul(hash, 16777619) >>> 0; } return hash % NUM_SHARDS; diff --git a/services/platform/convex/workflow_engine/helpers/engine/start_workflow_handler.ts b/services/platform/convex/workflow_engine/helpers/engine/start_workflow_handler.ts index 728b82857..ea338ae49 100644 --- a/services/platform/convex/workflow_engine/helpers/engine/start_workflow_handler.ts +++ b/services/platform/convex/workflow_engine/helpers/engine/start_workflow_handler.ts @@ -7,6 +7,7 @@ import type { MutationCtx } from '../../../_generated/server'; import { createDebugLog } from '../../../lib/debug_log'; import { executeWorkflowStart } from './execute_workflow_start'; +import { getShardIndex } from './shard'; const debugLog = createDebugLog('DEBUG_WORKFLOW', '[Workflow]'); @@ -21,8 +22,7 @@ export type StartWorkflowArgs = { export async function handleStartWorkflow( ctx: MutationCtx, args: StartWorkflowArgs, - workflowManager: WorkflowManager, - shardIndex = 0, + managers: WorkflowManager[], ): Promise> { // Validate database workflow definition const wfDefinition = await ctx.db.get(args.wfDefinitionId); @@ -56,7 +56,7 @@ export async function handleStartWorkflow( rootWfDefinitionId, }); - // Pre-create execution record with pending status + // Pre-create execution record with pending status (shardIndex patched below) const executionId: Id<'wfExecutions'> = await ctx.db.insert('wfExecutions', { organizationId: args.organizationId, wfDefinitionId: args.wfDefinitionId, @@ -71,13 +71,19 @@ export async function handleStartWorkflow( triggerData: args.triggerData, metadata: '{}', workflowSlug, - shardIndex, }); + // Derive shard from the unique executionId so concurrent starts of + // the same workflow definition spread across all component instances. + const shardIndex = getShardIndex(executionId); + await ctx.db.patch(executionId, { shardIndex }); + const workflowManager = managers[shardIndex]; + // Start workflow via shared helper to avoid extra nested mutations debugLog('startWorkflow Starting workflow via helper', { executionId, wfDefinitionId: args.wfDefinitionId, + shardIndex, }); await executeWorkflowStart(ctx, { diff --git a/services/platform/convex/workflow_engine/internal_mutations.ts b/services/platform/convex/workflow_engine/internal_mutations.ts index 5a9320572..1d526b656 100644 --- a/services/platform/convex/workflow_engine/internal_mutations.ts +++ b/services/platform/convex/workflow_engine/internal_mutations.ts @@ -5,7 +5,7 @@ import { jsonValueValidator } from '../../lib/shared/schemas/utils/json-value'; import { internalMutation } from '../_generated/server'; import { workflowManagers } from './engine'; import * as EngineHelpers from './helpers/engine'; -import { getShardIndex, safeShardIndex } from './helpers/engine/shard'; +import { safeShardIndex } from './helpers/engine/shard'; import { handleStartWorkflow } from './helpers/engine/start_workflow_handler'; export const onWorkflowComplete = internalMutation({ @@ -44,12 +44,6 @@ export const startWorkflow = internalMutation({ }, returns: v.id('wfExecutions'), handler: async (ctx, args) => { - const shardIndex = getShardIndex(args.wfDefinitionId); - return await handleStartWorkflow( - ctx, - args, - workflowManagers[shardIndex], - shardIndex, - ); + return await handleStartWorkflow(ctx, args, workflowManagers); }, }); From 1919506a9081be58374435bdafdf37ff7c9daf99 Mon Sep 17 00:00:00 2001 From: israel Date: Tue, 10 Feb 2026 19:48:26 +0100 Subject: [PATCH 2/2] fix(convex): pass workflowManagers array in public mutation The public startWorkflow mutation was passing a single workflowManager instead of the managers array, which would crash on managers[shardIndex]. --- services/platform/convex/workflow_engine/mutations.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/platform/convex/workflow_engine/mutations.ts b/services/platform/convex/workflow_engine/mutations.ts index ca4cab5f6..308bb1b9a 100644 --- a/services/platform/convex/workflow_engine/mutations.ts +++ b/services/platform/convex/workflow_engine/mutations.ts @@ -4,7 +4,7 @@ import { jsonValueValidator } from '../../lib/shared/schemas/utils/json-value'; import { mutation } from '../_generated/server'; import { authComponent } from '../auth'; import { getOrganizationMember } from '../lib/rls'; -import { workflowManager } from './engine'; +import { workflowManagers } from './engine'; import { handleStartWorkflow } from './helpers/engine/start_workflow_handler'; export const startWorkflow = mutation({ @@ -28,6 +28,6 @@ export const startWorkflow = mutation({ name: authUser.name, }); - return await handleStartWorkflow(ctx, args, workflowManager); + return await handleStartWorkflow(ctx, args, workflowManagers); }, });