Skip to content

Commit 1f02f59

Browse files
Add lightweight chaos test
1 parent 6545ca9 commit 1f02f59

File tree

1 file changed

+125
-0
lines changed

1 file changed

+125
-0
lines changed
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
import { BackendPostgres } from "../backend-postgres/backend.js";
2+
import { DEFAULT_DATABASE_URL } from "../backend-postgres/postgres.js";
3+
import { OpenWorkflow } from "./client.js";
4+
import { Worker } from "./worker.js";
5+
import { randomInt, randomUUID } from "node:crypto";
6+
import { describe, expect, test } from "vitest";
7+
8+
const TOTAL_STEPS = 50;
9+
const WORKER_COUNT = 3;
10+
const WORKER_CONCURRENCY = 2;
11+
const STEP_DURATION_MS = 25;
12+
const CHAOS_DURATION_MS = 5000;
13+
const CHAOS_INTERVAL_MS = 200;
14+
const TEST_TIMEOUT_MS = 30_000;
15+
16+
describe("chaos test (known slow test)", () => {
17+
test(
18+
"workflow completes despite random worker deaths",
19+
async () => {
20+
const backend = await createBackend();
21+
const client = new OpenWorkflow({ backend });
22+
23+
const workflow = client.defineWorkflow(
24+
{ name: "chaos-workflow" },
25+
async ({ step }) => {
26+
const results: number[] = [];
27+
for (let i = 0; i < TOTAL_STEPS; i++) {
28+
const stepName = `step-${i.toString()}`;
29+
const result = await step.run({ name: stepName }, async () => {
30+
await sleep(STEP_DURATION_MS); // fake work
31+
return i;
32+
});
33+
results.push(result);
34+
}
35+
return results;
36+
},
37+
);
38+
39+
const workers = await Promise.all(
40+
Array.from({ length: WORKER_COUNT }, () =>
41+
createAndStartWorker(client),
42+
),
43+
);
44+
45+
const handle = await workflow.run();
46+
let workflowCompleted = false;
47+
let chaosTask: Promise<number> | null = null;
48+
49+
try {
50+
chaosTask = runChaosMonkey({
51+
client,
52+
workers,
53+
durationMs: CHAOS_DURATION_MS,
54+
intervalMs: CHAOS_INTERVAL_MS,
55+
shouldStop: () => workflowCompleted,
56+
});
57+
58+
const result = await handle.result();
59+
workflowCompleted = true;
60+
const restarts = await chaosTask;
61+
62+
expect(result).toHaveLength(TOTAL_STEPS);
63+
expect(result[TOTAL_STEPS - 1]).toBe(TOTAL_STEPS - 1);
64+
expect(restarts).toBeGreaterThan(0);
65+
} finally {
66+
workflowCompleted = true;
67+
if (chaosTask) await chaosTask;
68+
await Promise.all(workers.map((worker) => worker.stop()));
69+
await backend.stop();
70+
}
71+
},
72+
TEST_TIMEOUT_MS,
73+
);
74+
});
75+
76+
async function runChaosMonkey({
77+
client,
78+
workers,
79+
durationMs,
80+
intervalMs,
81+
shouldStop,
82+
}: {
83+
client: OpenWorkflow;
84+
workers: Worker[];
85+
durationMs: number;
86+
intervalMs: number;
87+
shouldStop: () => boolean;
88+
}): Promise<number> {
89+
const chaosEndsAt = Date.now() + durationMs;
90+
let restartCount = 0;
91+
92+
while (Date.now() < chaosEndsAt && !shouldStop()) {
93+
await sleep(intervalMs);
94+
if (workers.length === 0) {
95+
workers.push(await createAndStartWorker(client));
96+
continue;
97+
}
98+
99+
const index = randomInt(workers.length);
100+
const victim = workers.splice(index, 1)[0];
101+
await victim?.stop();
102+
103+
const replacement = await createAndStartWorker(client);
104+
workers.push(replacement);
105+
restartCount++;
106+
}
107+
108+
return restartCount;
109+
}
110+
111+
async function createBackend(): Promise<BackendPostgres> {
112+
return await BackendPostgres.connect(DEFAULT_DATABASE_URL, {
113+
namespaceId: randomUUID(),
114+
});
115+
}
116+
117+
async function createAndStartWorker(client: OpenWorkflow): Promise<Worker> {
118+
const worker = client.newWorker({ concurrency: WORKER_CONCURRENCY });
119+
await worker.start();
120+
return worker;
121+
}
122+
123+
function sleep(ms: number): Promise<void> {
124+
return new Promise((resolve) => setTimeout(resolve, ms));
125+
}

0 commit comments

Comments
 (0)