Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/Abstractions/ContinueAsNewOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

namespace Microsoft.DurableTask;

/// <summary>
/// Options for <see cref="TaskOrchestrationContext.ContinueAsNew(ContinueAsNewOptions, object?, bool)"/>.
/// </summary>
public class ContinueAsNewOptions
{
/// <summary>
/// Gets or sets the new version for the restarted orchestration instance.
/// </summary>
/// <remarks>
/// When set, the framework uses this version to route the restarted instance to the
/// appropriate orchestrator implementation. This is the safest migration point for
/// eternal orchestrations since the history is fully reset, eliminating any replay
/// conflict risk.
/// </remarks>
public string? NewVersion { get; set; }
}
41 changes: 36 additions & 5 deletions src/Abstractions/TaskOrchestrationContext.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) Microsoft Corporation.
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using Microsoft.DurableTask.Abstractions;
Expand Down Expand Up @@ -395,16 +395,16 @@ public virtual Task CallSubOrchestratorAsync(
/// replays when rebuilding state.
/// </para><para>
/// The results of any incomplete tasks will be discarded when an orchestrator calls
/// <see cref="ContinueAsNew"/>. For example, if a timer is scheduled and then <see cref="ContinueAsNew"/>
/// <see cref="ContinueAsNew(object?, bool)"/>. For example, if a timer is scheduled and then <see cref="ContinueAsNew(object?, bool)"/>
/// is called before the timer fires, the timer event will be discarded. The only exception to this
/// is external events. By default, if an external event is received by an orchestration but not yet
/// processed, the event is saved in the orchestration state unit it is received by a call to
/// processed, the event is saved in the orchestration state until it is received by a call to
/// <see cref="WaitForExternalEvent{T}(string, CancellationToken)"/>. These events will continue to remain in memory
/// even after an orchestrator restarts using <see cref="ContinueAsNew"/>. You can disable this behavior and
/// even after an orchestrator restarts using <see cref="ContinueAsNew(object?, bool)"/>. You can disable this behavior and
/// remove any saved external events by specifying <c>false</c> for the <paramref name="preserveUnprocessedEvents"/>
/// parameter value.
/// </para><para>
/// Orchestrator implementations should complete immediately after calling the <see cref="ContinueAsNew"/> method.
/// Orchestrator implementations should complete immediately after calling the <see cref="ContinueAsNew(object?, bool)"/> method.
/// </para>
/// </remarks>
/// <param name="newInput">The JSON-serializable input data to re-initialize the instance with.</param>
Expand All @@ -415,6 +415,37 @@ public virtual Task CallSubOrchestratorAsync(
/// </param>
public abstract void ContinueAsNew(object? newInput = null, bool preserveUnprocessedEvents = true);

/// <summary>
/// Restarts the orchestration, optionally with a new version, clearing the history.
/// </summary>
/// <remarks>
/// <para>
/// This overload accepts <see cref="ContinueAsNewOptions"/> to control the restart behavior.
/// When <see cref="ContinueAsNewOptions.NewVersion"/> is set, the framework uses the new version
/// to route the restarted instance to the appropriate orchestrator implementation, enabling
/// version-based dispatch. When no version is specified (i.e., <paramref name="options"/> is
/// <c>null</c> or <c>NewVersion</c> is not set), this method behaves identically to
/// <see cref="ContinueAsNew(object?, bool)"/>.
/// </para><para>
/// The default implementation ignores <paramref name="options"/> and delegates to
/// <see cref="ContinueAsNew(object?, bool)"/>. Subclasses that support version-based
/// dispatch should override this method.
/// </para><para>
/// Orchestrator implementations should complete immediately after calling this method.
/// </para>
/// </remarks>
/// <param name="options">Options for the continue-as-new operation, including an optional new version.</param>
/// <param name="newInput">The JSON-serializable input data to re-initialize the instance with.</param>
/// <param name="preserveUnprocessedEvents">
/// If set to <c>true</c>, re-adds any unprocessed external events into the new execution
/// history when the orchestration instance restarts. If <c>false</c>, any unprocessed
/// external events will be discarded when the orchestration instance restarts.
/// </param>
public virtual void ContinueAsNew(ContinueAsNewOptions? options, object? newInput, bool preserveUnprocessedEvents)
{
this.ContinueAsNew(newInput, preserveUnprocessedEvents);
}

/// <summary>
/// Creates a new GUID that is safe for replay within an orchestration or operation.
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion src/Abstractions/TaskOrchestrator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public interface ITaskOrchestrator
/// </item>
/// <item>
/// Avoid infinite loops as they could cause the application to run out of memory. Instead, ensure that loops are
/// bounded or use <see cref="TaskOrchestrationContext.ContinueAsNew"/> to restart an orchestrator with a new
/// bounded or use <see cref="TaskOrchestrationContext.ContinueAsNew(object?, bool)"/> to restart an orchestrator with a new
/// input.
/// </item>
/// <item>
Expand Down
21 changes: 17 additions & 4 deletions src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@
version,
instanceId,
policy.ToDurableTaskCoreRetryOptions(),
input,
input,
options.Tags);
}
else if (options?.Retry?.Handler is AsyncRetryHandler handler)
Expand All @@ -236,7 +236,7 @@
orchestratorName.Name,
version,
instanceId,
input,
input,
options?.Tags),
orchestratorName.Name,
handler,
Expand All @@ -248,7 +248,7 @@
orchestratorName.Name,
version,
instanceId,
input,
input,
options?.Tags);
}
}
Expand Down Expand Up @@ -337,7 +337,20 @@
/// <inheritdoc/>
public override void ContinueAsNew(object? newInput = null, bool preserveUnprocessedEvents = true)
{
this.innerContext.ContinueAsNew(newInput);
this.ContinueAsNew(options: null, newInput, preserveUnprocessedEvents);
}

/// <inheritdoc/>
public override void ContinueAsNew(ContinueAsNewOptions? options, object? newInput, bool preserveUnprocessedEvents)
{
if (!string.IsNullOrWhiteSpace(options?.NewVersion))
{
this.innerContext.ContinueAsNew(options.NewVersion, newInput);

Check warning on line 348 in src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs

View workflow job for this annotation

GitHub Actions / build

Dereference of a possibly null reference.

Check warning on line 348 in src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs

View workflow job for this annotation

GitHub Actions / smoke-tests

Dereference of a possibly null reference.

Check warning on line 348 in src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Dereference of a possibly null reference.
}
else
{
this.innerContext.ContinueAsNew(newInput);
}

if (preserveUnprocessedEvents)
{
Expand Down
30 changes: 30 additions & 0 deletions test/Grpc.IntegrationTests/OrchestrationPatterns.cs
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,36 @@ public async Task ContinueAsNew()
Assert.Equal(10, metadata.ReadOutputAs<int>());
}

[Fact]
public async Task ContinueAsNewWithNewVersion()
{
TaskName orchestratorName = nameof(ContinueAsNewWithNewVersion);

await using HostTestLifetime server = await this.StartWorkerAsync(b =>
{
b.AddTasks(tasks => tasks.AddOrchestratorFunc<int, string>(orchestratorName, async (ctx, input) =>
{
if (input == 0)
{
// First generation: migrate to "v2"
await ctx.CreateTimer(TimeSpan.Zero, CancellationToken.None);
ctx.ContinueAsNew(new ContinueAsNewOptions { NewVersion = "v2" }, input + 1, true);
return string.Empty;
}

// Second generation: return the version to verify it changed
return ctx.Version;
}));
});

string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName, input: 0);
OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync(
instanceId, getInputsAndOutputs: true, this.TimeoutToken);
Assert.NotNull(metadata);
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);
Assert.Equal("v2", metadata.ReadOutputAs<string>());
}

[Fact]
public async Task SubOrchestration()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,103 @@ static void VerifyWrapper<T>(
wrapper.GetInput<T>().Should().Be(input);
}

[Fact]
public void ContinueAsNew_WithoutVersion_CallsInnerContextWithoutVersion()
{
// Arrange
TrackingOrchestrationContext innerContext = new();
OrchestrationInvocationContext invocationContext = new("Test", new(), NullLoggerFactory.Instance, null);
TaskOrchestrationContextWrapper wrapper = new(innerContext, invocationContext, "input");

// Act
wrapper.ContinueAsNew("new-input", preserveUnprocessedEvents: false);

// Assert
innerContext.LastContinueAsNewInput.Should().Be("new-input");
innerContext.LastContinueAsNewVersion.Should().BeNull();
}

[Fact]
public void ContinueAsNew_WithVersion_CallsInnerContextWithVersion()
{
// Arrange
TrackingOrchestrationContext innerContext = new();
OrchestrationInvocationContext invocationContext = new("Test", new(), NullLoggerFactory.Instance, null);
TaskOrchestrationContextWrapper wrapper = new(innerContext, invocationContext, "input");

// Act
wrapper.ContinueAsNew(new ContinueAsNewOptions { NewVersion = "v2" }, "new-input", preserveUnprocessedEvents: false);

// Assert
innerContext.LastContinueAsNewInput.Should().Be("new-input");
innerContext.LastContinueAsNewVersion.Should().Be("v2");
}

[Fact]
public void ContinueAsNew_WithNullOptions_CallsInnerContextWithoutVersion()
{
// Arrange
TrackingOrchestrationContext innerContext = new();
OrchestrationInvocationContext invocationContext = new("Test", new(), NullLoggerFactory.Instance, null);
TaskOrchestrationContextWrapper wrapper = new(innerContext, invocationContext, "input");

// Act
wrapper.ContinueAsNew(options: null, newInput: "new-input", preserveUnprocessedEvents: false);

// Assert
innerContext.LastContinueAsNewInput.Should().Be("new-input");
innerContext.LastContinueAsNewVersion.Should().BeNull();
}

sealed class TrackingOrchestrationContext : OrchestrationContext
{
public TrackingOrchestrationContext()
{
this.OrchestrationInstance = new()
{
InstanceId = Guid.NewGuid().ToString(),
ExecutionId = Guid.NewGuid().ToString(),
};
}

public object? LastContinueAsNewInput { get; private set; }

public string? LastContinueAsNewVersion { get; private set; }

public override void ContinueAsNew(object input)
{
this.LastContinueAsNewInput = input;
this.LastContinueAsNewVersion = null;
}

public override void ContinueAsNew(string newVersion, object input)
{
this.LastContinueAsNewInput = input;
this.LastContinueAsNewVersion = newVersion;
}

public override Task<T> CreateSubOrchestrationInstance<T>(string name, string version, object input)
=> throw new NotImplementedException();

public override Task<T> CreateSubOrchestrationInstance<T>(string name, string version, string instanceId, object input)
=> throw new NotImplementedException();

public override Task<T> CreateSubOrchestrationInstance<T>(string name, string version, string instanceId, object input, IDictionary<string, string> tags)
=> throw new NotImplementedException();

public override Task<T> CreateTimer<T>(DateTime fireAt, T state)
=> throw new NotImplementedException();

public override Task<T> CreateTimer<T>(DateTime fireAt, T state, CancellationToken cancelToken)
=> throw new NotImplementedException();

public override Task<TResult> ScheduleTask<TResult>(string name, string version, params object[] parameters)
=> throw new NotImplementedException();

public override void SendEvent(OrchestrationInstance orchestrationInstance, string eventName, object eventData)
=> throw new NotImplementedException();
}

class TestOrchestrationContext : OrchestrationContext
{
public TestOrchestrationContext()
Expand Down
Loading