From 77085383e31c780dc4453a5b91636d882d47cd98 Mon Sep 17 00:00:00 2001 From: Copilot Date: Sat, 14 Mar 2026 08:30:30 +0000 Subject: [PATCH] fix: update InMemoryOrchestrationBackend suspend/resume to set instance status The in-memory backend's suspend() and resume() methods queued the appropriate history events but never updated instance.status, causing getOrchestrationState() to return incorrect status. In the real DTS sidecar, calling the suspend/resume RPCs immediately transitions the orchestration status. This fix aligns the in-memory backend with that behavior: - suspend() now sets status to ORCHESTRATION_STATUS_SUSPENDED and notifies state waiters - resume() now transitions status from SUSPENDED back to RUNNING and notifies state waiters Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../src/testing/in-memory-backend.ts | 14 +++ .../test/in-memory-backend.spec.ts | 116 ++++++++++++++++++ 2 files changed, 130 insertions(+) diff --git a/packages/durabletask-js/src/testing/in-memory-backend.ts b/packages/durabletask-js/src/testing/in-memory-backend.ts index 2c96acb..1af9222 100644 --- a/packages/durabletask-js/src/testing/in-memory-backend.ts +++ b/packages/durabletask-js/src/testing/in-memory-backend.ts @@ -175,6 +175,10 @@ export class InMemoryOrchestrationBackend { return; } + // Update status immediately to match real sidecar behavior, where the + // suspend RPC transitions the orchestration to SUSPENDED right away. + instance.status = pb.OrchestrationStatus.ORCHESTRATION_STATUS_SUSPENDED; + const event = pbh.newSuspendEvent(); instance.pendingEvents.push(event); instance.lastUpdatedAt = new Date(); @@ -182,6 +186,8 @@ export class InMemoryOrchestrationBackend { if (!this.orchestrationQueueSet.has(instanceId)) { this.enqueueOrchestration(instanceId); } + + this.notifyWaiters(instanceId); } /** @@ -193,6 +199,12 @@ export class InMemoryOrchestrationBackend { throw new Error(`Orchestration instance '${instanceId}' not found`); } + // Transition from SUSPENDED back to RUNNING to match real sidecar behavior. + // Only update if the instance was actually suspended. + if (instance.status === pb.OrchestrationStatus.ORCHESTRATION_STATUS_SUSPENDED) { + instance.status = pb.OrchestrationStatus.ORCHESTRATION_STATUS_RUNNING; + } + const event = pbh.newResumeEvent(); instance.pendingEvents.push(event); instance.lastUpdatedAt = new Date(); @@ -200,6 +212,8 @@ export class InMemoryOrchestrationBackend { if (!this.orchestrationQueueSet.has(instanceId)) { this.enqueueOrchestration(instanceId); } + + this.notifyWaiters(instanceId); } /** diff --git a/packages/durabletask-js/test/in-memory-backend.spec.ts b/packages/durabletask-js/test/in-memory-backend.spec.ts index 66fe6a9..c4701e4 100644 --- a/packages/durabletask-js/test/in-memory-backend.spec.ts +++ b/packages/durabletask-js/test/in-memory-backend.spec.ts @@ -377,4 +377,120 @@ describe("In-Memory Backend", () => { expect(state?.runtimeStatus).toEqual(OrchestrationStatus.COMPLETED); expect(state?.serializedOutput).toEqual(JSON.stringify(42)); }); + + describe("suspend and resume status", () => { + it("should update status to SUSPENDED when suspend is called", async () => { + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + yield ctx.waitForExternalEvent("proceed"); + return "done"; + }; + + worker.addOrchestrator(orchestrator); + await worker.start(); + + const id = await client.scheduleNewOrchestration(orchestrator); + await client.waitForOrchestrationStart(id, false, 10); + + await client.suspendOrchestration(id); + + const state = await client.getOrchestrationState(id); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.SUSPENDED); + }); + + it("should update status to RUNNING when resume is called after suspend", async () => { + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + yield ctx.waitForExternalEvent("proceed"); + return "done"; + }; + + worker.addOrchestrator(orchestrator); + await worker.start(); + + const id = await client.scheduleNewOrchestration(orchestrator); + await client.waitForOrchestrationStart(id, false, 10); + + await client.suspendOrchestration(id); + let state = await client.getOrchestrationState(id); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.SUSPENDED); + + await client.resumeOrchestration(id); + state = await client.getOrchestrationState(id); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.RUNNING); + }); + + it("should complete successfully after suspend and resume", async () => { + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + const val: number = yield ctx.waitForExternalEvent("proceed"); + return val * 2; + }; + + worker.addOrchestrator(orchestrator); + await worker.start(); + + const id = await client.scheduleNewOrchestration(orchestrator); + await client.waitForOrchestrationStart(id, false, 10); + + // Suspend the orchestration + await client.suspendOrchestration(id); + let state = await client.getOrchestrationState(id); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.SUSPENDED); + + // Send an event while suspended (will be buffered) + await client.raiseOrchestrationEvent(id, "proceed", 21); + + // Resume the orchestration + await client.resumeOrchestration(id); + + // Wait for completion — the buffered event should be processed + state = await client.waitForOrchestrationCompletion(id, true, 10); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.COMPLETED); + expect(state?.serializedOutput).toEqual(JSON.stringify(42)); + }); + + it("should be idempotent when suspend is called twice", async () => { + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + yield ctx.waitForExternalEvent("proceed"); + return "done"; + }; + + worker.addOrchestrator(orchestrator); + await worker.start(); + + const id = await client.scheduleNewOrchestration(orchestrator); + await client.waitForOrchestrationStart(id, false, 10); + + // Call suspend twice — should not throw + await client.suspendOrchestration(id); + await client.suspendOrchestration(id); + + const state = await client.getOrchestrationState(id); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.SUSPENDED); + }); + + it("should notify state waiters on suspend", async () => { + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + yield ctx.waitForExternalEvent("proceed"); + return "done"; + }; + + worker.addOrchestrator(orchestrator); + await worker.start(); + + const id = await client.scheduleNewOrchestration(orchestrator); + await client.waitForOrchestrationStart(id, false, 10); + + // Set up a waiter for SUSPENDED status, then suspend + const suspendedPromise = backend.waitForState( + id, + (inst) => backend.toClientStatus(inst.status) === OrchestrationStatus.SUSPENDED, + 5000, + ); + + await client.suspendOrchestration(id); + + const suspendedInstance = await suspendedPromise; + expect(suspendedInstance).toBeDefined(); + expect(backend.toClientStatus(suspendedInstance!.status)).toEqual(OrchestrationStatus.SUSPENDED); + }); + }); });