From 632010fdea6bcd0cc58bbd7fe60eb4228373dff5 Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Wed, 29 Apr 2026 00:40:51 -0700 Subject: [PATCH] Atomically dedupe duplicate step_created/wait_created events in world-local MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Concurrent invocations producing identical correlationIds (as the snapshot runtime does by design across replays) previously both succeeded and persisted duplicate events. step_created had no guard at all; wait_created used a TOCTOU read-then-check that allowed both writers through under concurrency. Both now claim a per-(runId, correlationId) constraint file with O_CREAT|O_EXCL before writing, so the loser surfaces as EntityConflictError — which the runtime's dedup catch path already handles. --- .../fix-world-local-step-created-race.md | 5 + packages/world-local/src/storage.test.ts | 106 ++++++++++++++++++ .../world-local/src/storage/events-storage.ts | 55 +++++++-- 3 files changed, 156 insertions(+), 10 deletions(-) create mode 100644 .changeset/fix-world-local-step-created-race.md diff --git a/.changeset/fix-world-local-step-created-race.md b/.changeset/fix-world-local-step-created-race.md new file mode 100644 index 0000000000..1c9edba970 --- /dev/null +++ b/.changeset/fix-world-local-step-created-race.md @@ -0,0 +1,5 @@ +--- +"@workflow/world-local": patch +--- + +Fix race in `events.create()` where concurrent `step_created` / `wait_created` writes with the same `correlationId` would both succeed instead of one losing with `EntityConflictError`. diff --git a/packages/world-local/src/storage.test.ts b/packages/world-local/src/storage.test.ts index 0c3bc63048..efdee1274c 100644 --- a/packages/world-local/src/storage.test.ts +++ b/packages/world-local/src/storage.test.ts @@ -1987,6 +1987,112 @@ describe('Storage', () => { }); }); + describe('concurrent entity-creation races', () => { + let testRunId: string; + beforeEach(async () => { + const run = await createRun(storage, { + deploymentId: 'deployment-123', + workflowName: 'test-workflow', + input: new Uint8Array(), + }); + testRunId = run.runId; + await updateRun(storage, testRunId, 'run_started'); + }); + + it('should reject concurrent step_created with the same correlationId', async () => { + // Two concurrent step_created calls with identical correlationIds + // (as produced by the snapshot runtime's deterministic ULIDs across + // concurrent VM invocations of the same resumption) must produce + // exactly one step_created event in the log — not two. Without an + // atomic guard the second writer overwrites the entity and persists + // a duplicate event, causing downstream issues like double-queued + // step messages. + const results = await Promise.allSettled([ + createStep(storage, testRunId, { + stepId: 'step_dup_1', + stepName: 'test-step', + input: new Uint8Array([1]), + }), + createStep(storage, testRunId, { + stepId: 'step_dup_1', + stepName: 'test-step', + input: new Uint8Array([2]), + }), + ]); + + const fulfilled = results.filter((r) => r.status === 'fulfilled'); + const rejected = results.filter((r) => r.status === 'rejected'); + expect(fulfilled).toHaveLength(1); + expect(rejected).toHaveLength(1); + expect((rejected[0] as PromiseRejectedResult).reason).toMatchObject({ + name: 'EntityConflictError', + }); + + // Verify only one step_created event exists in the log. + const events = await storage.events.list({ + runId: testRunId, + pagination: {}, + }); + const stepCreatedEvents = events.data.filter( + (e) => + e.eventType === 'step_created' && e.correlationId === 'step_dup_1' + ); + expect(stepCreatedEvents).toHaveLength(1); + }); + + it('should reject concurrent wait_created with the same correlationId', async () => { + // wait_created previously used a TOCTOU read-then-check pattern that + // could let both concurrent writers through. The atomic claim now + // guarantees exactly one winner. + const results = await Promise.allSettled([ + createWait(storage, testRunId, { + waitId: 'wait_dup_1', + resumeAt: new Date('2099-01-01'), + }), + createWait(storage, testRunId, { + waitId: 'wait_dup_1', + resumeAt: new Date('2099-01-02'), + }), + ]); + + const fulfilled = results.filter((r) => r.status === 'fulfilled'); + const rejected = results.filter((r) => r.status === 'rejected'); + expect(fulfilled).toHaveLength(1); + expect(rejected).toHaveLength(1); + expect((rejected[0] as PromiseRejectedResult).reason).toMatchObject({ + name: 'EntityConflictError', + }); + + // Verify only one wait_created event exists in the log. + const events = await storage.events.list({ + runId: testRunId, + pagination: {}, + }); + const waitCreatedEvents = events.data.filter( + (e) => + e.eventType === 'wait_created' && e.correlationId === 'wait_dup_1' + ); + expect(waitCreatedEvents).toHaveLength(1); + }); + + it('should reject sequential duplicate step_created calls', async () => { + // Sequential (non-racing) duplicates must also be rejected — the + // constraint file persists across calls. + await createStep(storage, testRunId, { + stepId: 'step_seq_dup', + stepName: 'test-step', + input: new Uint8Array(), + }); + await expect( + createStep(storage, testRunId, { + stepId: 'step_seq_dup', + stepName: 'test-step', + input: new Uint8Array(), + }) + ).rejects.toMatchObject({ name: 'EntityConflictError' }); + }); + }); + describe('run terminal state validation', () => { describe('completed run', () => { it('should reject run_started on completed run', async () => { diff --git a/packages/world-local/src/storage/events-storage.ts b/packages/world-local/src/storage/events-storage.ts index 63df4a81c0..d8926fc6ff 100644 --- a/packages/world-local/src/storage/events-storage.ts +++ b/packages/world-local/src/storage/events-storage.ts @@ -559,7 +559,32 @@ export function createEventsStorage( data.eventType === 'step_created' && 'eventData' in data ) { - // step_created: Creates step entity with status 'pending', attempt=0, createdAt set + // step_created: Creates step entity with status 'pending', attempt=0, createdAt set. + // Two concurrent invocations with identical correlationIds (e.g. the + // snapshot runtime's deterministic correlationIds across replays) + // must be deduped — otherwise both writes succeed and the event log + // ends up with duplicate step_created entries. Claim a per-(runId, + // correlationId) constraint file with O_CREAT|O_EXCL; the loser + // throws EntityConflictError so the runtime's existing catch path + // can swallow it and avoid double-queuing the step. + const stepCreatedLockName = tag + ? `${effectiveRunId}-${data.correlationId}.created.${tag}` + : `${effectiveRunId}-${data.correlationId}.created`; + const stepCreatedLockPath = path.join( + basedir, + '.locks', + 'steps', + stepCreatedLockName + ); + const stepCreatedClaimed = await writeExclusive( + stepCreatedLockPath, + '' + ); + if (!stepCreatedClaimed) { + throw new EntityConflictError( + `Step "${data.correlationId}" already created` + ); + } const stepData = data.eventData as { stepName: string; input: any; @@ -865,23 +890,33 @@ export function createEventsStorage( } await deleteJSON(hookPath); } else if (data.eventType === 'wait_created' && 'eventData' in data) { - // wait_created: Creates wait entity with status 'waiting' - const waitData = data.eventData as { - resumeAt?: Date; - }; + // wait_created: Creates wait entity with status 'waiting'. + // Atomic claim on a per-(runId, correlationId) constraint file + // ensures duplicate wait_created from concurrent invocations + // surfaces as EntityConflictError (replaces a prior TOCTOU + // read-then-check that could let both writers through). const waitCompositeKey = `${effectiveRunId}-${data.correlationId}`; - const existingWait = await readJSONWithFallback( + const waitCreatedLockName = tag + ? `${waitCompositeKey}.created.${tag}` + : `${waitCompositeKey}.created`; + const waitCreatedLockPath = path.join( basedir, + '.locks', 'waits', - waitCompositeKey, - WaitSchema, - tag + waitCreatedLockName ); - if (existingWait) { + const waitCreatedClaimed = await writeExclusive( + waitCreatedLockPath, + '' + ); + if (!waitCreatedClaimed) { throw new EntityConflictError( `Wait "${data.correlationId}" already exists` ); } + const waitData = data.eventData as { + resumeAt?: Date; + }; wait = { waitId: waitCompositeKey, runId: effectiveRunId,