Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,12 @@ internal async ValueTask ImportStateAsync(Checkpoint checkpoint)
.Select(id => this.EnsureExecutorAsync(id, tracer: null).AsTask())
.ToArray();

// Discard queued external deliveries from the superseded timeline so a runtime
// restore cannot apply stale responses after importing the checkpoint state.
while (this._queuedExternalDeliveries.TryDequeue(out _))
Comment thread
peibekwe marked this conversation as resolved.
{
}

this._nextStep = new StepContext();
this._nextStep.ImportMessages(importedState.QueuedMessages);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,48 @@ internal async Task Checkpoint_Restore_WithPendingRequests_RepublishesRequestInf
"the workflow should be able to continue after the runtime restore replay");
}

/// <summary>
/// Verifies that restoring a live run clears any queued external responses from the
/// superseded timeline before importing checkpoint state.
/// </summary>
[Fact]
internal async Task Checkpoint_Restore_ClearsQueuedExternalResponsesBeforeImportAsync()
{
Workflow workflow = CreateSimpleRequestWorkflow();
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
InProcessExecutionEnvironment env = ExecutionEnvironment.InProcess_Lockstep.ToWorkflowExecutionEnvironment();

await using StreamingRun run = await env.WithCheckpointing(checkpointManager)
.RunStreamingAsync(workflow, "Hello");

(ExternalRequest pendingRequest, CheckpointInfo checkpoint) = await CapturePendingRequestAndCheckpointAsync(run);

await run.SendResponseAsync(pendingRequest.CreateResponse("World"));
await run.RestoreCheckpointAsync(checkpoint);

List<WorkflowEvent> restoredEvents = await ReadToHaltAsync(run);
ExternalRequest replayedRequest = restoredEvents.OfType<RequestInfoEvent>()
.Select(evt => evt.Request)
.Should()
.ContainSingle("the restored run should still be waiting for the checkpointed request")
.Subject;

restoredEvents.OfType<WorkflowErrorEvent>().Should().BeEmpty(
"a queued response from the superseded timeline should not be processed after restore");
RunStatus statusAfterRestore = await run.GetStatusAsync();
statusAfterRestore.Should().Be(RunStatus.PendingRequests,
"the restored run should remain pending until a post-restore response is sent");

await run.SendResponseAsync(replayedRequest.CreateResponse("Again"));

List<WorkflowEvent> completionEvents = await ReadToHaltAsync(run);
completionEvents.OfType<WorkflowErrorEvent>().Should().BeEmpty(
"the restored request should complete cleanly once a new response is provided");
RunStatus finalStatus = await run.GetStatusAsync();
finalStatus.Should().Be(RunStatus.Idle,
"the workflow should finish once the replayed request receives a fresh response");
}

/// <summary>
/// Verifies that a resumed parent workflow re-emits pending requests that originated in a subworkflow.
/// </summary>
Expand Down
Loading