Skip to content

Commit

Permalink
feat: add workflow client updater for updating workflow client (#258)
Browse files Browse the repository at this point in the history
  • Loading branch information
robcao authored Jun 4, 2024
1 parent 1f6676b commit 0e24a43
Show file tree
Hide file tree
Showing 8 changed files with 287 additions and 80 deletions.
46 changes: 45 additions & 1 deletion src/Temporalio.Extensions.Hosting/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,48 @@ start instead of expecting a `ITemporalClient` to be present on the service coll
Some users may prefer to manually create the `TemporalWorker` without using host support, but still make their
activities created via the service provider. `CreateTemporalActivityDefinitions` extension methods are present on
`IServiceProvider` that will return a collection of `ActivityDefinition` instances for each activity on the type. These
can be added to the `TemporalWorkerOptions` directly.
can be added to the `TemporalWorkerOptions` directly.

## Worker Client Refresh

Some users may need to update the worker's connection to Temporal. It's desirable to do this without stopping the worker entirely, as that will evict the sticky workflow cache.

This can be done by using the `IWorkerClientUpdater`.

```csharp
using Temporalio.Extensions.Hosting;

var builder = Host.CreateApplicationBuilder(args);

// Register a worker client updater.
builder.Services.AddSingleton<TemporalWorkerClientUpdater>();

// Add a hosted Temporal worker which returns a builder to add activities and workflows, along with the worker client updater.
builder.Services.
AddHostedTemporalWorker(
"my-temporal-host:7233",
"my-namespace",
"my-task-queue").
AddScopedActivities<MyActivityClass>().
AddWorkflow<MyWorkflow>().
ConfigureOptions().
Configure<TemporalWorkerClientUpdater>((options, workerClientUpdater) => options.WorkerClientUpdater = workerClientUpdater);

var host = builder.Build();

// You can have a BackgroundService periodically refresh the worker client like this.
TemporalWorkerClientUpdater workerClientUpdater = host.Services.GetRequiredService<TemporalWorkerClientUpdater>();

// Can update the TLS options if you need.
TemporalClientConnectOptions clientConnectOptions = new("my-other-temporal-host:7233")
{
Namespace = "default"
};

ITemporalClient updatedClient = await TemporalClient.ConnectAsync(clientConnectOptions).ConfigureAwait(false);

workerClientUpdater.UpdateClient(updatedClient);

// Make sure you use RunAsync and not Run, see https://github.com/temporalio/sdk-dotnet/issues/220
await host.RunAsync();
```
48 changes: 48 additions & 0 deletions src/Temporalio.Extensions.Hosting/TemporalWorkerClientUpdater.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
using System;
using Temporalio.Worker;

namespace Temporalio.Extensions.Hosting
{
/// <summary>
/// Notification hub that can be used to push Temporal worker client updates to subscribing Temporal workers.
/// </summary>
public class TemporalWorkerClientUpdater
{
private readonly object clientLock = new();

private event EventHandler<IWorkerClient>? OnClientUpdatedEvent;

/// <summary>
/// Dispatches a notification to all subscribers that a new worker client should be used.
/// </summary>
/// <param name="client">The new <see cref="IWorkerClient"/> that should be pushed out to all subscribing workers.</param>
public void UpdateClient(IWorkerClient client)
{
OnClientUpdatedEvent?.Invoke(this, client);
}

/// <summary>
/// Adds a new subscriber that will be notified when a new worker client should be used.
/// </summary>
/// <param name="eventHandler">The event handler to add to the event listeners.</param>
internal void Subscribe(EventHandler<IWorkerClient> eventHandler)
{
lock (clientLock)
{
OnClientUpdatedEvent += eventHandler;
}
}

/// <summary>
/// Removes an existing subscriber from receiving notifications when a new worker client should be used.
/// </summary>
/// <param name="eventHandler">The event handler to remove from the event listeners.</param>
internal void Unsubscribe(EventHandler<IWorkerClient> eventHandler)
{
lock (clientLock)
{
OnClientUpdatedEvent -= eventHandler;
}
}
}
}
38 changes: 34 additions & 4 deletions src/Temporalio.Extensions.Hosting/TemporalWorkerService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ namespace Temporalio.Extensions.Hosting
/// </summary>
public class TemporalWorkerService : BackgroundService
{
// These two are mutually exclusive
// These two (newClientOptions and existingClient) are mutually exclusive
private readonly TemporalClientConnectOptions? newClientOptions;
private readonly ITemporalClient? existingClient;
private readonly TemporalWorkerOptions workerOptions;
private readonly TemporalWorkerClientUpdater? workerClientUpdater;

/// <summary>
/// Initializes a new instance of the <see cref="TemporalWorkerService"/> class using
Expand All @@ -30,8 +31,11 @@ public class TemporalWorkerService : BackgroundService
/// <param name="options">Options to use to create the worker service.</param>
public TemporalWorkerService(TemporalWorkerServiceOptions options)
{
newClientOptions = options.ClientOptions ?? throw new ArgumentException(
"Client options is required", nameof(options));
if (options.ClientOptions == null)
{
throw new ArgumentException("Client options is required", nameof(options));
}

workerOptions = options;
}

Expand Down Expand Up @@ -156,6 +160,11 @@ public TemporalWorkerService(
if (newClientOptions != null && workerOptions.LoggerFactory != null)
{
newClientOptions.LoggerFactory = workerOptions.LoggerFactory;
}

if (options.WorkerClientUpdater != null)
{
this.workerClientUpdater = options.WorkerClientUpdater;
}
}

Expand All @@ -166,7 +175,28 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
// Call connect just in case it was a lazy client (no-op if already connected)
await client.Connection.ConnectAsync().ConfigureAwait(false);
using var worker = new TemporalWorker(client, workerOptions);
await worker.ExecuteAsync(stoppingToken).ConfigureAwait(false);

if (workerClientUpdater != null)
{
void SubscribeToClientUpdates(object? sender, IWorkerClient updatedClient)
{
worker!.Client = updatedClient;
}

try
{
workerClientUpdater.Subscribe(SubscribeToClientUpdates);
await worker.ExecuteAsync(stoppingToken).ConfigureAwait(false);
}
finally
{
workerClientUpdater.Unsubscribe(SubscribeToClientUpdates);
}
}
else
{
await worker.ExecuteAsync(stoppingToken).ConfigureAwait(false);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ public TemporalWorkerServiceOptions(string taskQueue)
/// </summary>
public TemporalClientConnectOptions? ClientOptions { get; set; }

/// <summary>
/// Gets or sets the <see cref="TemporalWorkerClientUpdater"/> that can be used to push Temporal worker client updates to the underlying <see cref="TemporalWorker"/>.
/// If not set, the worker service will not be updateable with a new Temporal worker client.
/// </summary>
public TemporalWorkerClientUpdater? WorkerClientUpdater { get; set; }

/// <inheritdoc />
public override object Clone()
{
Expand All @@ -39,6 +45,7 @@ public override object Clone()
{
options.ClientOptions = (TemporalClientConnectOptions)ClientOptions.Clone();
}

return options;
}

Expand Down
3 changes: 2 additions & 1 deletion src/Temporalio/Worker/TemporalWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public TemporalWorker(IWorkerClient client, TemporalWorkerOptions options)
/// </summary>
/// <remarks>
/// When this property is set, it actually replaces the underlying client that is being used
/// by the worker. This means the next calls by the worker to Temporal (e.g. responding
/// by the worker. This means subsequent calls by the worker to Temporal (e.g. responding
/// task completion, activity heartbeat, etc) will be on this new client, but outstanding
/// calls will not be immediately interrupted.
/// </remarks>
Expand Down Expand Up @@ -255,6 +255,7 @@ protected virtual void Dispose(bool disposing)
{
activityWorker?.Dispose();
BridgeWorker.Dispose();

// Remove task tracing if not disabled and there are workflows present
if (workflowTracingEventListenerEnabled)
{
Expand Down
56 changes: 56 additions & 0 deletions tests/Temporalio.Tests/AssertMore.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,67 @@
using System.Text;
using System.Text.Json;
using Temporalio.Api.History.V1;
using Temporalio.Client;
using Xunit;

namespace Temporalio.Tests
{
public static class AssertMore
{
public static Task TaskFailureEventuallyAsync(WorkflowHandle handle, Action<WorkflowTaskFailedEventAttributes> assert)
{
return AssertMore.EventuallyAsync(async () =>
{
WorkflowTaskFailedEventAttributes? attrs = null;
await foreach (var evt in handle.FetchHistoryEventsAsync())
{
if (evt.WorkflowTaskFailedEventAttributes != null)
{
attrs = evt.WorkflowTaskFailedEventAttributes;
}
}
Assert.NotNull(attrs);
assert(attrs!);
});
}

public static Task StartedEventuallyAsync(WorkflowHandle handle)
{
return HasEventEventuallyAsync(handle, e => e.WorkflowExecutionStartedEventAttributes != null);
}

public static async Task ChildStartedEventuallyAsync(WorkflowHandle handle)
{
// Wait for started
string? childId = null;
await HasEventEventuallyAsync(
handle,
e =>
{
childId = e.ChildWorkflowExecutionStartedEventAttributes?.WorkflowExecution?.WorkflowId;
return childId != null;
});
// Check that a workflow task has completed proving child has really started
await HasEventEventuallyAsync(
handle.Client.GetWorkflowHandle(childId!),
e => e.WorkflowTaskCompletedEventAttributes != null);
}

public static Task HasEventEventuallyAsync(WorkflowHandle handle, Func<HistoryEvent, bool> predicate)
{
return EventuallyAsync(async () =>
{
await foreach (var evt in handle.FetchHistoryEventsAsync())
{
if (predicate(evt))
{
return;
}
}
Assert.Fail("Event not found");
});
}

public static Task EventuallyAsync(
Func<Task> func, TimeSpan? interval = null, int iterations = 15) =>
EventuallyAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ public async Task TemporalWorkerService_ExecuteAsync_SimpleWorker()
// Add the rest of the services
services.
AddSingleton<ILoggerFactory>(loggerFactory).
AddScoped<DatabaseClient>().
// We are also adding the DB client as a keyed service to demonstrate keyed service
// support for our DI logic. This used to break because newer DI library versions
// disallowed accessing certain properties on keyed services which we access
// internally for dupe checks.
AddScoped<DatabaseClient>().
// We are also adding the DB client as a keyed service to demonstrate keyed service
// support for our DI logic. This used to break because newer DI library versions
// disallowed accessing certain properties on keyed services which we access
// internally for dupe checks.
AddKeyedScoped<DatabaseClient>("client-keyed").
AddHostedTemporalWorker(taskQueue).
AddScopedActivities<DatabaseActivities>().
Expand Down Expand Up @@ -206,6 +206,84 @@ public async Task TemporalWorkerService_ExecuteAsync_MultipleWorkers()
["scoped-other2"] = $"tq: {taskQueue2}, counter: 6",
},
result);
}

[Workflow]
public class TickingWorkflow
{
[WorkflowRun]
public async Task RunAsync()
{
// Just tick every 100ms for 10s
for (var i = 0; i < 100; i++)
{
await Workflow.DelayAsync(100);
}
}
}

[Fact]
public async Task TemporalWorkerService_WorkerClientReplacement_UsesNewClient()
{
// We are going to start a second ephemeral server and then replace the client. So we will
// start a no-cache ticking workflow with the current client and confirm it has accomplished
// at least one task. Then we will start another on the other client, and confirm it gets
// started too. Then we will terminate both. We have to use a ticking workflow with only one
// poller to force a quick re-poll to recognize our client change quickly (as opposed to
// just waiting the minute for poll timeout).
await using var otherEnv = await Temporalio.Testing.WorkflowEnvironment.StartLocalAsync();

// Start both workflows on different servers
var taskQueue = $"tq-{Guid.NewGuid()}";
var handle1 = await Client.StartWorkflowAsync(
(TickingWorkflow wf) => wf.RunAsync(),
new(id: $"workflow-{Guid.NewGuid()}", taskQueue));
var handle2 = await otherEnv.Client.StartWorkflowAsync(
(TickingWorkflow wf) => wf.RunAsync(),
new(id: $"workflow-{Guid.NewGuid()}", taskQueue));

var bld = Host.CreateApplicationBuilder();

TemporalWorkerClientUpdater workerClientUpdater = new TemporalWorkerClientUpdater();

// Register the worker client updater.
bld.Services.AddSingleton<TemporalWorkerClientUpdater>(workerClientUpdater);

// Add the first worker with the workflow and client already DI'd, and add the worker client updater.
bld.Services.
AddSingleton(Client).
AddHostedTemporalWorker(taskQueue).
AddWorkflow<TickingWorkflow>()
.ConfigureOptions()
.Configure<TemporalWorkerClientUpdater>((options, updater) =>
{
options.WorkerClientUpdater = updater;
options.MaxCachedWorkflows = 0;
options.MaxConcurrentWorkflowTaskPolls = 1;
});

// Start the host
using var tokenSource = new CancellationTokenSource();
using var host = bld.Build();
var hostTask = Task.Run(() => host.RunAsync(tokenSource.Token));

// Confirm the first ticking workflow has completed a task but not the second workflow
await AssertMore.HasEventEventuallyAsync(handle1, e => e.WorkflowTaskCompletedEventAttributes != null);
await foreach (var evt in handle2.FetchHistoryEventsAsync())
{
Assert.Null(evt.WorkflowTaskCompletedEventAttributes);
}

// Now replace the client, which should be used fairly quickly because we should have
// timer-done poll completions every 100ms
workerClientUpdater.UpdateClient(otherEnv.Client);

// Now confirm the other workflow has started
await AssertMore.HasEventEventuallyAsync(handle2, e => e.WorkflowTaskCompletedEventAttributes != null);

// Terminate both
await handle1.TerminateAsync();
await handle2.TerminateAsync();
}

[Workflow("Workflow")]
Expand Down
Loading

0 comments on commit 0e24a43

Please sign in to comment.