Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fix-world-local-step-created-race.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@workflow/world-local": patch
---

Fix race in `events.create()` where concurrent `step_created` / `wait_created` writes with the same `correlationId` would both succeed instead of one losing with `EntityConflictError`.
106 changes: 106 additions & 0 deletions packages/world-local/src/storage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1987,6 +1987,112 @@ describe('Storage', () => {
});
});

describe('concurrent entity-creation races', () => {
let testRunId: string;
beforeEach(async () => {
const run = await createRun(storage, {
deploymentId: 'deployment-123',
workflowName: 'test-workflow',
input: new Uint8Array(),
});
testRunId = run.runId;
await updateRun(storage, testRunId, 'run_started');
});

it('should reject concurrent step_created with the same correlationId', async () => {
// Two concurrent step_created calls with identical correlationIds
// (as produced by the snapshot runtime's deterministic ULIDs across
// concurrent VM invocations of the same resumption) must produce
// exactly one step_created event in the log — not two. Without an
// atomic guard the second writer overwrites the entity and persists
// a duplicate event, causing downstream issues like double-queued
// step messages.
const results = await Promise.allSettled([
createStep(storage, testRunId, {
stepId: 'step_dup_1',
stepName: 'test-step',
input: new Uint8Array([1]),
}),
createStep(storage, testRunId, {
stepId: 'step_dup_1',
stepName: 'test-step',
input: new Uint8Array([2]),
}),
]);

const fulfilled = results.filter((r) => r.status === 'fulfilled');
const rejected = results.filter((r) => r.status === 'rejected');
expect(fulfilled).toHaveLength(1);
expect(rejected).toHaveLength(1);
expect((rejected[0] as PromiseRejectedResult).reason).toMatchObject({
name: 'EntityConflictError',
});

// Verify only one step_created event exists in the log.
const events = await storage.events.list({
runId: testRunId,
pagination: {},
});
const stepCreatedEvents = events.data.filter(
(e) =>
e.eventType === 'step_created' && e.correlationId === 'step_dup_1'
);
expect(stepCreatedEvents).toHaveLength(1);
});

it('should reject concurrent wait_created with the same correlationId', async () => {
// wait_created previously used a TOCTOU read-then-check pattern that
// could let both concurrent writers through. The atomic claim now
// guarantees exactly one winner.
const results = await Promise.allSettled([
createWait(storage, testRunId, {
waitId: 'wait_dup_1',
resumeAt: new Date('2099-01-01'),
}),
createWait(storage, testRunId, {
waitId: 'wait_dup_1',
resumeAt: new Date('2099-01-02'),
}),
]);

const fulfilled = results.filter((r) => r.status === 'fulfilled');
const rejected = results.filter((r) => r.status === 'rejected');
expect(fulfilled).toHaveLength(1);
expect(rejected).toHaveLength(1);
expect((rejected[0] as PromiseRejectedResult).reason).toMatchObject({
name: 'EntityConflictError',
});

// Verify only one wait_created event exists in the log.
const events = await storage.events.list({
runId: testRunId,
pagination: {},
});
const waitCreatedEvents = events.data.filter(
(e) =>
e.eventType === 'wait_created' && e.correlationId === 'wait_dup_1'
);
expect(waitCreatedEvents).toHaveLength(1);
});

it('should reject sequential duplicate step_created calls', async () => {
// Sequential (non-racing) duplicates must also be rejected — the
// constraint file persists across calls.
await createStep(storage, testRunId, {
stepId: 'step_seq_dup',
stepName: 'test-step',
input: new Uint8Array(),
});
await expect(
createStep(storage, testRunId, {
stepId: 'step_seq_dup',
stepName: 'test-step',
input: new Uint8Array(),
})
).rejects.toMatchObject({ name: 'EntityConflictError' });
});
});

describe('run terminal state validation', () => {
describe('completed run', () => {
it('should reject run_started on completed run', async () => {
Expand Down
55 changes: 45 additions & 10 deletions packages/world-local/src/storage/events-storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,32 @@ export function createEventsStorage(
data.eventType === 'step_created' &&
'eventData' in data
) {
// step_created: Creates step entity with status 'pending', attempt=0, createdAt set
// step_created: Creates step entity with status 'pending', attempt=0, createdAt set.
// Two concurrent invocations with identical correlationIds (e.g. the
// snapshot runtime's deterministic correlationIds across replays)
// must be deduped — otherwise both writes succeed and the event log
// ends up with duplicate step_created entries. Claim a per-(runId,
// correlationId) constraint file with O_CREAT|O_EXCL; the loser
// throws EntityConflictError so the runtime's existing catch path
// can swallow it and avoid double-queuing the step.
const stepCreatedLockName = tag
? `${effectiveRunId}-${data.correlationId}.created.${tag}`
: `${effectiveRunId}-${data.correlationId}.created`;
const stepCreatedLockPath = path.join(
basedir,
'.locks',
'steps',
stepCreatedLockName
);
const stepCreatedClaimed = await writeExclusive(
stepCreatedLockPath,
''
);
if (!stepCreatedClaimed) {
throw new EntityConflictError(
`Step "${data.correlationId}" already created`
);
}
const stepData = data.eventData as {
stepName: string;
input: any;
Expand Down Expand Up @@ -865,23 +890,33 @@ export function createEventsStorage(
}
await deleteJSON(hookPath);
} else if (data.eventType === 'wait_created' && 'eventData' in data) {
// wait_created: Creates wait entity with status 'waiting'
const waitData = data.eventData as {
resumeAt?: Date;
};
// wait_created: Creates wait entity with status 'waiting'.
// Atomic claim on a per-(runId, correlationId) constraint file
// ensures duplicate wait_created from concurrent invocations
// surfaces as EntityConflictError (replaces a prior TOCTOU
// read-then-check that could let both writers through).
const waitCompositeKey = `${effectiveRunId}-${data.correlationId}`;
const existingWait = await readJSONWithFallback(
const waitCreatedLockName = tag
? `${waitCompositeKey}.created.${tag}`
: `${waitCompositeKey}.created`;
const waitCreatedLockPath = path.join(
basedir,
'.locks',
'waits',
waitCompositeKey,
WaitSchema,
tag
waitCreatedLockName
);
if (existingWait) {
const waitCreatedClaimed = await writeExclusive(
waitCreatedLockPath,
''
);
if (!waitCreatedClaimed) {
throw new EntityConflictError(
`Wait "${data.correlationId}" already exists`
);
}
const waitData = data.eventData as {
resumeAt?: Date;
};
wait = {
waitId: waitCompositeKey,
runId: effectiveRunId,
Expand Down
Loading