Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ describe('getShardIndex', () => {
expect(shards.size).toBeGreaterThan(1);
});

it('produces different shards for different definition IDs', () => {
it('produces different shards for different IDs', () => {
const testIds = [
'document_rag_sync',
'onedrive_sync',
Expand All @@ -42,6 +42,19 @@ describe('getShardIndex', () => {
expect(shards.size).toBeGreaterThan(1);
});

it('distributes unique execution IDs across all shards', () => {
const shardCounts = new Map<number, number>();
const total = 1000;
for (let i = 0; i < total; i++) {
const shard = getShardIndex(`jd7${i}abc${i * 31}xyz${i * 97}`);
shardCounts.set(shard, (shardCounts.get(shard) ?? 0) + 1);
}
expect(shardCounts.size).toBe(NUM_SHARDS);
for (const count of shardCounts.values()) {
expect(count).toBeGreaterThan(total / NUM_SHARDS / 3);
}
});

it('handles special characters', () => {
const ids = [
'🚀',
Expand Down
13 changes: 9 additions & 4 deletions services/platform/convex/workflow_engine/helpers/engine/shard.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,20 @@
*
* Distributes workflow executions across multiple @convex-dev/workflow
* component instances so that concurrent startWorkflow mutations touch
* different runStatus singletons, eliminating OCC contention.
* different runStatus/pendingStart/pendingCompletion tables,
* eliminating OCC contention.
*
* Shard is derived from the unique executionId (FNV-1a hash) so that
* concurrent starts of the SAME workflow definition spread evenly
* across all shards.
*/

export const NUM_SHARDS = 4;

export function getShardIndex(wfDefinitionId: string): number {
export function getShardIndex(id: string): number {
let hash = 2166136261;
for (let i = 0; i < wfDefinitionId.length; i++) {
hash ^= wfDefinitionId.charCodeAt(i);
for (let i = 0; i < id.length; i++) {
hash ^= id.charCodeAt(i);
hash = Math.imul(hash, 16777619) >>> 0;
}
return hash % NUM_SHARDS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type { MutationCtx } from '../../../_generated/server';

import { createDebugLog } from '../../../lib/debug_log';
import { executeWorkflowStart } from './execute_workflow_start';
import { getShardIndex } from './shard';

const debugLog = createDebugLog('DEBUG_WORKFLOW', '[Workflow]');

Expand All @@ -21,8 +22,7 @@ export type StartWorkflowArgs = {
export async function handleStartWorkflow(
ctx: MutationCtx,
args: StartWorkflowArgs,
workflowManager: WorkflowManager,
shardIndex = 0,
managers: WorkflowManager[],
): Promise<Id<'wfExecutions'>> {
// Validate database workflow definition
const wfDefinition = await ctx.db.get(args.wfDefinitionId);
Expand Down Expand Up @@ -56,7 +56,7 @@ export async function handleStartWorkflow(
rootWfDefinitionId,
});

// Pre-create execution record with pending status
// Pre-create execution record with pending status (shardIndex patched below)
const executionId: Id<'wfExecutions'> = await ctx.db.insert('wfExecutions', {
organizationId: args.organizationId,
wfDefinitionId: args.wfDefinitionId,
Expand All @@ -71,13 +71,19 @@ export async function handleStartWorkflow(
triggerData: args.triggerData,
metadata: '{}',
workflowSlug,
shardIndex,
});

// Derive shard from the unique executionId so concurrent starts of
// the same workflow definition spread across all component instances.
const shardIndex = getShardIndex(executionId);
await ctx.db.patch(executionId, { shardIndex });
const workflowManager = managers[shardIndex];
Comment thread
Israeltheminer marked this conversation as resolved.

// Start workflow via shared helper to avoid extra nested mutations
debugLog('startWorkflow Starting workflow via helper', {
executionId,
wfDefinitionId: args.wfDefinitionId,
shardIndex,
});

await executeWorkflowStart(ctx, {
Expand Down
10 changes: 2 additions & 8 deletions services/platform/convex/workflow_engine/internal_mutations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { jsonValueValidator } from '../../lib/shared/schemas/utils/json-value';
import { internalMutation } from '../_generated/server';
import { workflowManagers } from './engine';
import * as EngineHelpers from './helpers/engine';
import { getShardIndex, safeShardIndex } from './helpers/engine/shard';
import { safeShardIndex } from './helpers/engine/shard';
import { handleStartWorkflow } from './helpers/engine/start_workflow_handler';

export const onWorkflowComplete = internalMutation({
Expand Down Expand Up @@ -44,12 +44,6 @@ export const startWorkflow = internalMutation({
},
returns: v.id('wfExecutions'),
handler: async (ctx, args) => {
const shardIndex = getShardIndex(args.wfDefinitionId);
return await handleStartWorkflow(
ctx,
args,
workflowManagers[shardIndex],
shardIndex,
);
return await handleStartWorkflow(ctx, args, workflowManagers);
},
});
4 changes: 2 additions & 2 deletions services/platform/convex/workflow_engine/mutations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { jsonValueValidator } from '../../lib/shared/schemas/utils/json-value';
import { mutation } from '../_generated/server';
import { authComponent } from '../auth';
import { getOrganizationMember } from '../lib/rls';
import { workflowManager } from './engine';
import { workflowManagers } from './engine';
import { handleStartWorkflow } from './helpers/engine/start_workflow_handler';

export const startWorkflow = mutation({
Expand All @@ -28,6 +28,6 @@ export const startWorkflow = mutation({
name: authUser.name,
});

return await handleStartWorkflow(ctx, args, workflowManager);
return await handleStartWorkflow(ctx, args, workflowManagers);
},
});
Loading