Skip to content

Commit

Permalink
Fix wait condition evaluation issue (#259)
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz authored Jun 4, 2024
1 parent 0e24a43 commit 83670a0
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 43 deletions.
77 changes: 35 additions & 42 deletions src/Temporalio/Worker/WorkflowInstance.cs
Original file line number Diff line number Diff line change
Expand Up @@ -651,55 +651,48 @@ private void RunOnce(bool checkConditions)
while (scheduledTasks.Count > 0)
{
// Run all tasks until empty
RunAllTasks();

// If there are any conditions, schedule a new condition-check task and the run all
// tasks again. This is a scheduled task instead of just run inline because it needs
// to be in the workflow context (i.e. current task scheduler) so the `Workflow`
// methods work properly.
if (checkConditions && conditions.Count > 0)
while (scheduledTasks.Count > 0)
{
_ = QueueNewTaskAsync(CheckConditionsAsync);
RunAllTasks();
}
}
}

private void RunAllTasks()
{
while (scheduledTasks.Count > 0)
{
// Pop last
var task = scheduledTasks.Last!.Value;
scheduledTasks.RemoveLast();
scheduledTaskNodes.Remove(task);
// Pop last
var task = scheduledTasks.Last!.Value;
scheduledTasks.RemoveLast();
scheduledTaskNodes.Remove(task);

// This should never return false
if (!TryExecuteTask(task))
{
logger.LogWarning("Task unexpectedly was unable to execute");
}
if (currentActivationException != null)
{
ExceptionDispatchInfo.Capture(currentActivationException).Throw();
// This should never return false
if (!TryExecuteTask(task))
{
logger.LogWarning("Task unexpectedly was unable to execute");
}
if (currentActivationException != null)
{
ExceptionDispatchInfo.Capture(currentActivationException).Throw();
}
}
}
}

private Task CheckConditionsAsync()
{
try
{
foreach (var source in conditions.Where(t => t.Item1()).Select(t => t.Item2))
// Check conditions. It would be nice if we could run this in the task scheduler
// because then users could have access to the `Workflow` context in the condition
// callback. However, this cannot be done because even just running one task in the
// scheduler causes .NET to add more tasks to the scheduler. And you don't want to
// "run until empty" with the condition, because conditions may need to be retried
// based on each other. This sounds confusing but basically: can't run check
// conditions in the task scheduler comfortably but still need to access the static
// Workflow class, hence the context override.
if (checkConditions && conditions.Count > 0)
{
source.TrySetResult(null);
Workflow.OverrideContext.Value = this;
try
{
foreach (var source in conditions.Where(t => t.Item1()).Select(t => t.Item2))
{
source.TrySetResult(null);
}
}
finally
{
Workflow.OverrideContext.Value = null;
}
}
}
catch (Exception e)
{
currentActivationException = e;
}
return Task.CompletedTask;
}

private void AddCommand(WorkflowCommand cmd)
Expand Down
13 changes: 12 additions & 1 deletion src/Temporalio/Workflows/Workflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,19 @@ public static WorkflowUpdateDefinition? DynamicUpdate
/// </remarks>
public static DateTime UtcNow => Context.UtcNow;

/// <summary>
/// Gets an async local to override the context.
/// </summary>
/// <remarks>
/// This was only made available so WaitConditionAsync callbacks could have access to the
/// workflow context without running inside the task scheduler.
/// </remarks>
internal static AsyncLocal<IWorkflowContext?> OverrideContext { get; } = new();

private static IWorkflowContext Context =>
TaskScheduler.Current as IWorkflowContext ?? throw new InvalidOperationException("Not in workflow");
TaskScheduler.Current as IWorkflowContext ??
OverrideContext.Value ??
throw new InvalidOperationException("Not in workflow");

/// <summary>
/// Create an exception via lambda invoking the run method that, when thrown out of the
Expand Down
45 changes: 45 additions & 0 deletions tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4542,6 +4542,51 @@ await ExecuteWorkerAsync<NextRetryDelayWorkflow>(
new TemporalWorkerOptions().AddActivity(NextRetryDelayWorkflow.NextRetryDelayActivity));
}

[Workflow]
public class ConditionBounceWorkflow
{
private int workflowCounter;
private int signalCounter;

[WorkflowRun]
public async Task RunAsync()
{
while (workflowCounter < 5)
{
var counterBefore = workflowCounter;
await Workflow.WaitConditionAsync(() => workflowCounter > counterBefore);
signalCounter++;
}
}

[WorkflowSignal]
public async Task DoSignalAsync()
{
while (signalCounter < 5)
{
workflowCounter++;
var counterBefore = signalCounter;
await Workflow.WaitConditionAsync(() => signalCounter > counterBefore);
}
}
}

[Fact]
public async Task ExecuteWorkflowAsync_ConditionBounce_ProperlyReschedules()
{
await ExecuteWorkerAsync<ConditionBounceWorkflow>(
async worker =>
{
var handle = await Env.Client.StartWorkflowAsync(
(ConditionBounceWorkflow wf) => wf.RunAsync(),
new(id: $"workflow-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!));
await AssertMore.HasEventEventuallyAsync(
handle, evt => evt.WorkflowTaskCompletedEventAttributes != null);
await handle.SignalAsync(wf => wf.DoSignalAsync());
await handle.GetResultAsync();
});
}

internal static Task AssertTaskFailureContainsEventuallyAsync(
WorkflowHandle handle, string messageContains)
{
Expand Down

0 comments on commit 83670a0

Please sign in to comment.