Skip to content

Commit

Permalink
Synchronous reads for improved performance for large payloads (#134)
Browse files Browse the repository at this point in the history
* Workaround for dotnet/SqlClient#593
* Added test to show perf problem
* Fix usage of DbDataReader to use Sync method over Async

Co-authored-by: Benjamin Hugot <benjmain.hugot@younited-credit.com>
Co-authored-by: Chris Gillum <cgillum@microsoft.com>
  • Loading branch information
3 people committed Jan 29, 2023
1 parent 8e28b17 commit 22af387
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 19 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## v1.1.1

### Updates

* Synchronous reads for improved performance for large payloads ([#134](https://github.com/microsoft/durabletask-mssql/pull/134)) - contributed by [@bhugot](https://github.com/bhugot)

## v1.1.0

### New
Expand Down
42 changes: 27 additions & 15 deletions src/DurableTask.SqlServer/SqlOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,12 @@ public override Task DeleteAsync(bool deleteInstanceStore)
int longestWaitTime = 0;
var messages = new List<TaskMessage>(capacity: batchSize);
var eventPayloadMappings = new EventPayloadMap(capacity: batchSize);
while (await reader.ReadAsync(cancellationToken))

// Synchronous reads have significantly better performance: https://github.com/dotnet/SqlClient/issues/593
while (reader.Read())
{
cancellationToken.ThrowIfCancellationRequested();

TaskMessage message = reader.GetTaskMessage();
messages.Add(message);
Guid? payloadId = reader.GetPayloadId();
Expand Down Expand Up @@ -187,7 +191,8 @@ public override Task DeleteAsync(bool deleteInstanceStore)
// Result #2: The runtime status of the orchestration instance
if (await reader.NextResultAsync(cancellationToken))
{
bool instanceExists = await reader.ReadAsync(cancellationToken);
// Synchronous reads have significantly better performance: https://github.com/dotnet/SqlClient/issues/593
bool instanceExists = reader.Read();
string instanceId;
OrchestrationStatus? currentStatus;

Expand Down Expand Up @@ -252,7 +257,7 @@ public override Task DeleteAsync(bool deleteInstanceStore)
IList<HistoryEvent> history;
if (await reader.NextResultAsync(cancellationToken))
{
history = await ReadHistoryEventsAsync(reader, executionIdFilter: null, cancellationToken);
history = ReadHistoryEvents(reader, executionIdFilter: null, cancellationToken);
}
else
{
Expand Down Expand Up @@ -402,11 +407,13 @@ public override async Task RenewTaskOrchestrationWorkItemLockAsync(TaskOrchestra
// removes the need for a DB access and also ensures that a work-item can't spam the error logs in a tight loop.
public override Task AbandonTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem workItem) => Task.CompletedTask;

public override async Task<TaskActivityWorkItem?> LockNextTaskActivityWorkItem(TimeSpan receiveTimeout, CancellationToken cancellationToken)
public override async Task<TaskActivityWorkItem?> LockNextTaskActivityWorkItem(
TimeSpan receiveTimeout,
CancellationToken shutdownCancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
while (!shutdownCancellationToken.IsCancellationRequested)
{
using SqlConnection connection = await this.GetAndOpenConnectionAsync();
using SqlConnection connection = await this.GetAndOpenConnectionAsync(shutdownCancellationToken);
using SqlCommand command = this.GetSprocCommand(connection, $"{this.settings.SchemaName}._LockNextTask");

DateTime lockExpiration = DateTime.UtcNow.Add(this.settings.WorkItemLockTimeout);
Expand All @@ -418,10 +425,10 @@ public override async Task RenewTaskOrchestrationWorkItemLockAsync(TaskOrchestra
command,
this.traceHelper,
instanceId: null,
cancellationToken);
if (!await reader.ReadAsync())
shutdownCancellationToken);
if (!await reader.ReadAsync(shutdownCancellationToken))
{
await this.activityBackoffHelper.WaitAsync(cancellationToken);
await this.activityBackoffHelper.WaitAsync(shutdownCancellationToken);
continue;
}

Expand Down Expand Up @@ -596,7 +603,8 @@ public override async Task SendTaskOrchestrationMessageAsync(TaskMessage message
instanceId,
cancellationToken);

if (await reader.ReadAsync(cancellationToken))
// Synchronous reads have significantly better performance: https://github.com/dotnet/SqlClient/issues/593
if (reader.Read())
{
OrchestrationState state = reader.GetOrchestrationState();
return state;
Expand All @@ -615,18 +623,22 @@ public override async Task<string> GetOrchestrationHistoryAsync(string instanceI

using DbDataReader reader = await SqlUtils.ExecuteReaderAsync(command, this.traceHelper, instanceId);

List<HistoryEvent> history = await ReadHistoryEventsAsync(reader, executionIdFilter);
List<HistoryEvent> history = ReadHistoryEvents(reader, executionIdFilter);
return JsonConvert.SerializeObject(history);
}

static async Task<List<HistoryEvent>> ReadHistoryEventsAsync(
static List<HistoryEvent> ReadHistoryEvents(
DbDataReader reader,
string? executionIdFilter = null,
CancellationToken cancellationToken = default)
{
var history = new List<HistoryEvent>(capacity: 128);
while (await reader.ReadAsync(cancellationToken))

// Synchronous reads have significantly better performance: https://github.com/dotnet/SqlClient/issues/593
while (reader.Read())
{
cancellationToken.ThrowIfCancellationRequested();

string executionId = SqlUtils.GetExecutionId(reader)!;
HistoryEvent e = reader.GetHistoryEvent(isOrchestrationHistory: true);
if (executionIdFilter == null)
Expand Down Expand Up @@ -820,7 +832,7 @@ public async Task<IReadOnlyCollection<OrchestrationState>> GetManyOrchestrations
cancellationToken);

var results = new List<OrchestrationState>(query.PageSize);
while (await reader.ReadAsync(cancellationToken))
while (!cancellationToken.IsCancellationRequested && reader.Read())
{
OrchestrationState state = reader.GetOrchestrationState();
results.Add(state);
Expand Down Expand Up @@ -872,7 +884,7 @@ public async Task<int> GetRecommendedReplicaCountAsync(int? currentReplicaCount
command.Parameters.Add("@MaxConcurrentOrchestrations", SqlDbType.Int).Value = this.MaxConcurrentTaskOrchestrationWorkItems;
command.Parameters.Add("@MaxConcurrentActivities", SqlDbType.Int).Value = this.MaxConcurrentTaskActivityWorkItems;

int recommendedReplicaCount = (int)await command.ExecuteScalarAsync();
int recommendedReplicaCount = (int)await command.ExecuteScalarAsync(cancellationToken);
if (currentReplicaCount != null && currentReplicaCount != recommendedReplicaCount)
{
this.traceHelper.ReplicaCountChangeRecommended(currentReplicaCount.Value, recommendedReplicaCount);
Expand Down
2 changes: 1 addition & 1 deletion src/common.props
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<PropertyGroup>
<MajorVersion>1</MajorVersion>
<MinorVersion>1</MinorVersion>
<PatchVersion>0</PatchVersion>
<PatchVersion>1</PatchVersion>
<VersionPrefix>$(MajorVersion).$(MinorVersion).$(PatchVersion)</VersionPrefix>
<VersionSuffix></VersionSuffix>
<AssemblyVersion>$(MajorVersion).$(MinorVersion).0.0</AssemblyVersion>
Expand Down
2 changes: 1 addition & 1 deletion test/DurableTask.SqlServer.AzureFunctions.Tests/Utils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ static class Utils
return status;
}

await Task.Delay(TimeSpan.FromMilliseconds(500));
await Task.Delay(TimeSpan.FromMilliseconds(500), cancellationToken);
}

cancellationToken.ThrowIfCancellationRequested();
Expand Down
44 changes: 42 additions & 2 deletions test/DurableTask.SqlServer.Tests/Integration/StressTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ namespace DurableTask.SqlServer.Tests.Integration
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using DurableTask.SqlServer.Tests.Utils;
using Xunit;
Expand Down Expand Up @@ -32,13 +33,13 @@ public StressTests(ITestOutputHelper output)
// This test has previously been used to uncover various deadlock issues by stressing the code paths
// related to foreign keys that point to the Instances and Payloads tables.
// Example: https://github.com/microsoft/durabletask-mssql/issues/45
[Theory]
[Theory(Timeout = 1_000_000)]
[InlineData(10)]
[InlineData(2000)]
public async Task ParallelSubOrchestrations(int subOrchestrationCount)
{
const string SubOrchestrationName = "SubOrchestration";

this.testService.RegisterInlineOrchestration<DateTime, string>(
orchestrationName: SubOrchestrationName,
version: "",
Expand Down Expand Up @@ -72,5 +73,44 @@ public async Task ParallelSubOrchestrations(int subOrchestrationCount)
// On slower CI machines, this test could take several minutes to complete.
await testInstance.WaitForCompletion(TimeSpan.FromMinutes(5));
}

[Theory(Timeout = 100_000)]
[InlineData(10)]
public async Task ParallelWithBigPayload(int subOrchestrationCount)
{
const string SubOrchestrationName = "SubOrchestration";
string bigString = string.Join("", Enumerable.Range(0, 1024 * 1024 * 10).Select(x => "1"));

this.testService.RegisterInlineOrchestration<DateTime, string>(
orchestrationName: SubOrchestrationName,
version: "",
implementation: async (ctx, input) =>
{
await ctx.CreateTimer(DateTime.MinValue, input);
return ctx.CurrentUtcDateTime;
});

TestInstance<int> testInstance = await this.testService.RunOrchestration(
input: 1,
orchestrationName: nameof(ParallelSubOrchestrations),
implementation: async (ctx, input) =>
{
var listInstances = new List<Task<DateTime>>();
for (int i = 0; i < subOrchestrationCount; i++)
{
Task<DateTime> instance = ctx.CreateSubOrchestrationInstance<DateTime>(
name: SubOrchestrationName,
version: "",
instanceId: $"suborchestration[{i}]",
input: $"{i}-{bigString}");
listInstances.Add(instance);
}
DateTime[] results = await Task.WhenAll(listInstances);
return new List<DateTime>(results);
});

await testInstance.WaitForCompletion(TimeSpan.FromMinutes(1));
}
}
}

0 comments on commit 22af387

Please sign in to comment.