diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs index ffcc61dad4..f94bd9848d 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs @@ -77,12 +77,13 @@ private async Task RunLoopAsync(CancellationToken cancellationToken) try { - // Wait for the first input before starting - // The consumer will call EnqueueMessageAsync which signals the run loop + // Wait for the first input before starting. + // The consumer will call EnqueueMessageAsync which signals the run loop. + // Note: AsyncRunHandle also signals here on checkpoint resume when there are + // already pending requests, so the first iteration can emit a PendingRequests + // halt signal even without unprocessed messages. await this._inputWaiter.WaitForInputAsync(cancellationToken: linkedSource.Token).ConfigureAwait(false); - this._runStatus = RunStatus.Running; - while (!linkedSource.Token.IsCancellationRequested) { // Start a new run-stage activity for this input→processing→halt cycle @@ -95,6 +96,13 @@ private async Task RunLoopAsync(CancellationToken cancellationToken) // Events are streamed out in real-time as they happen via the event handler if (this._stepRunner.HasUnprocessedMessages) { + // Flip to Running only when there's actual work to process. + // This is intentionally inside the HasUnprocessedMessages branch so + // that stale input signals cannot transiently flip status back to + // Running after a prior halt has already been observed by callers + // (e.g. Run.ResumeAsync returning after reading an Idle halt signal). + this._runStatus = RunStatus.Running; + // Emit WorkflowStartedEvent only when there's actual work to process // This avoids spurious events on timeout-only loop iterations await this._eventChannel.Writer.WriteAsync(new WorkflowStartedEvent(), linkedSource.Token).ConfigureAwait(false); @@ -129,9 +137,6 @@ private async Task RunLoopAsync(CancellationToken cancellationToken) // Wait for next input from the consumer // Works for both Idle (no work) and PendingRequests (waiting for responses) await this._inputWaiter.WaitForInputAsync(linkedSource.Token).ConfigureAwait(false); - - // When signaled, resume running - this._runStatus = RunStatus.Running; } } catch (OperationCanceledException) diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/SampleSmokeTest.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/SampleSmokeTest.cs index a290948ae4..c0be892e24 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/SampleSmokeTest.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/SampleSmokeTest.cs @@ -269,6 +269,29 @@ internal async Task Test_RunSample_Step9Async(ExecutionEnvironment environment) _ = await Step9EntryPoint.RunAsync(writer, environment.ToWorkflowExecutionEnvironment()); } + /// + /// Stress regression for the off-thread run-status race: after + /// Run.ResumeAsync returns at a halt boundary, + /// callers must observe a stable terminal status and never a transient + /// . Step9 is the canonical multi-response resume + /// sample; prior to the fix in , + /// its `runStatus.Should().Be(RunStatus.Idle)` assertion failed intermittently + /// on roughly 1-in-10 iterations under InProcess_OffThread. + /// + [Fact] + internal async Task Test_RunSample_Step9_OffThread_MultiResponseResume_StatusIsStableAsync() + { + const int Iterations = 50; + + for (int i = 0; i < Iterations; i++) + { + using StringWriter writer = new(); + _ = await Step9EntryPoint.RunAsync( + writer, + ExecutionEnvironment.InProcess_OffThread.ToWorkflowExecutionEnvironment()); + } + } + [Theory] [InlineData(ExecutionEnvironment.InProcess_Lockstep)] [InlineData(ExecutionEnvironment.InProcess_OffThread)]