Skip to content

Commit

Permalink
Made the context argument available to the completion handler.
Browse files Browse the repository at this point in the history
  • Loading branch information
Salvatore Isaja committed Apr 5, 2023
1 parent d73b13c commit 9f787ca
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 121 deletions.
174 changes: 91 additions & 83 deletions DynamoScatterGather.Tests/ScatterGatherGatewayTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,6 @@ public static class ScatterGatherGatewayTest
private static Task<int> DoNothingCallback() =>
Task.FromResult(0);

private static Task<int> Taskify(Action action)
{
action();
return Task.FromResult(0);
}

private static AmazonDynamoDBClient CreateDynamoDbClient() =>
new(new AmazonDynamoDBConfig { ServiceURL = DynamoDbServiceUrl });

Expand Down Expand Up @@ -85,147 +79,161 @@ public static async Task OneTimeTearDown()
public static async Task NothingToScatter()
{
var gateway = CreateScatterGatherGateway();
var gatherCompleted = false;
Task HandleCompletion() => Taskify(() => gatherCompleted = true);
var completionHandler = new CompletionHandler();
var scatterRequestId = new ScatterRequestId("requestId");

await gateway.BeginScatter(scatterRequestId, "info");
await gateway.EndScatter(scatterRequestId, HandleCompletion);
gatherCompleted.Should().BeTrue();
await gateway.BeginScatter(scatterRequestId, "context");
await gateway.EndScatter(scatterRequestId, completionHandler.HandleCompletion);
completionHandler.Completed.Should().BeTrue();
completionHandler.Context.Should().Be("context");
}

[Test]
public static async Task SimpleScatterGather()
{
var gateway = CreateScatterGatherGateway();
var gatherCompleted = false;
Task HandleCompletion() => Taskify(() => gatherCompleted = true);
var completionHandler = new CompletionHandler();
var scatterRequestId = new ScatterRequestId("requestId");

await gateway.BeginScatter(scatterRequestId, "info");
await gateway.BeginScatter(scatterRequestId, "context");
await gateway.Scatter(scatterRequestId, new ScatterPartId[] { new("lorem"), new("ipsum") }, DoNothingCallback);
await gateway.EndScatter(scatterRequestId, HandleCompletion);
gatherCompleted.Should().BeFalse();

await gateway.Gather(scatterRequestId, new ScatterPartId[] { new("ipsum") }, HandleCompletion);
gatherCompleted.Should().BeFalse();
await gateway.Gather(scatterRequestId, new ScatterPartId[] { new("lorem") }, HandleCompletion);
gatherCompleted.Should().BeTrue();
await gateway.EndScatter(scatterRequestId, completionHandler.HandleCompletion);
completionHandler.Completed.Should().BeFalse();

await gateway.Gather(scatterRequestId, new ScatterPartId[] { new("ipsum") }, completionHandler.HandleCompletion);
completionHandler.Completed.Should().BeFalse();
await gateway.Gather(scatterRequestId, new ScatterPartId[] { new("lorem") }, completionHandler.HandleCompletion);
completionHandler.Completed.Should().BeTrue();
completionHandler.Context.Should().Be("context");
}

[Test]
public static async Task GatherFasterThanScatter()
{
var gateway = CreateScatterGatherGateway();
var gatherCompleted = false;
Task HandleCompletion() => Taskify(() => gatherCompleted = true);
var completionHandler = new CompletionHandler();
var scatterRequestId = new ScatterRequestId("requestId");
await gateway.BeginScatter(scatterRequestId, "info");
await gateway.BeginScatter(scatterRequestId, "context");

await gateway.Scatter(scatterRequestId, new ScatterPartId[] { new("lorem") }, DoNothingCallback);
await gateway.Gather(scatterRequestId, new ScatterPartId[] { new("lorem") }, HandleCompletion);
gatherCompleted.Should().BeFalse();
await gateway.Gather(scatterRequestId, new ScatterPartId[] { new("lorem") }, completionHandler.HandleCompletion);
completionHandler.Completed.Should().BeFalse();

await gateway.EndScatter(scatterRequestId, HandleCompletion);
gatherCompleted.Should().BeTrue();
await gateway.EndScatter(scatterRequestId, completionHandler.HandleCompletion);
completionHandler.Completed.Should().BeTrue();
}

[Test]
public static async Task DuplicateBeforeCompletion()
{
var gateway = CreateScatterGatherGateway();
var gatherCompleted = false;
Task HandleCompletion() => Taskify(() => gatherCompleted = true);
var completionHandler = new CompletionHandler();
var scatterRequestId = new ScatterRequestId("requestId");

await gateway.BeginScatter(scatterRequestId, "info");
await gateway.BeginScatter(scatterRequestId, "context");
await gateway.Scatter(scatterRequestId, new ScatterPartId[] { new("lorem"), new("ipsum") }, DoNothingCallback);
await gateway.EndScatter(scatterRequestId, HandleCompletion);
gatherCompleted.Should().BeFalse();

await gateway.Gather(scatterRequestId, new ScatterPartId[] { new("ipsum") }, HandleCompletion);
gatherCompleted.Should().BeFalse();
await gateway.Gather(scatterRequestId, new ScatterPartId[] { new("ipsum") }, HandleCompletion);
gatherCompleted.Should().BeFalse();
await gateway.Gather(scatterRequestId, new ScatterPartId[] { new("lorem") }, HandleCompletion);
gatherCompleted.Should().BeTrue();
await gateway.EndScatter(scatterRequestId, completionHandler.HandleCompletion);
completionHandler.Completed.Should().BeFalse();

await gateway.Gather(scatterRequestId, new ScatterPartId[] { new("ipsum") }, completionHandler.HandleCompletion);
completionHandler.Completed.Should().BeFalse();
await gateway.Gather(scatterRequestId, new ScatterPartId[] { new("ipsum") }, completionHandler.HandleCompletion);
completionHandler.Completed.Should().BeFalse();
await gateway.Gather(scatterRequestId, new ScatterPartId[] { new("lorem") }, completionHandler.HandleCompletion);
completionHandler.Completed.Should().BeTrue();
}

[Test]
public static async Task DuplicateAfterCompletion()
{
var gateway = CreateScatterGatherGateway();
var gatherCompleted = false;
Task HandleCompletion() => Taskify(() => gatherCompleted = true);
var completionHandler = new CompletionHandler();
var scatterRequestId = new ScatterRequestId("requestId");

await gateway.BeginScatter(scatterRequestId, "info");
await gateway.BeginScatter(scatterRequestId, "context");
await gateway.Scatter(scatterRequestId, new ScatterPartId[] { new("lorem"), new("ipsum") }, DoNothingCallback);
await gateway.EndScatter(scatterRequestId, HandleCompletion);
gatherCompleted.Should().BeFalse();
await gateway.EndScatter(scatterRequestId, completionHandler.HandleCompletion);
completionHandler.Completed.Should().BeFalse();

await gateway.Gather(scatterRequestId, new ScatterPartId[] { new("ipsum") }, HandleCompletion);
gatherCompleted.Should().BeFalse();
await gateway.Gather(scatterRequestId, new ScatterPartId[] { new("lorem") }, HandleCompletion);
gatherCompleted.Should().BeTrue();
await gateway.Gather(scatterRequestId, new ScatterPartId[] { new("ipsum") }, completionHandler.HandleCompletion);
completionHandler.Completed.Should().BeFalse();
await gateway.Gather(scatterRequestId, new ScatterPartId[] { new("lorem") }, completionHandler.HandleCompletion);
completionHandler.Completed.Should().BeTrue();

gatherCompleted = false;
await gateway.Gather(scatterRequestId, new ScatterPartId[] { new("lorem") }, HandleCompletion);
gatherCompleted.Should().BeFalse();
completionHandler = new CompletionHandler();
await gateway.Gather(scatterRequestId, new ScatterPartId[] { new("lorem") }, completionHandler.HandleCompletion);
completionHandler.Completed.Should().BeFalse();
}

[Test]
public static async Task ErrorOnCompletionHandlerAndRetry()
{
var gateway = CreateScatterGatherGateway();
var gatherCompleted = false;
Task HandleCompletion() => Taskify(() => gatherCompleted = true);
Task HandleCompletionThrowing() => throw new DivideByZeroException();
var completionHandler = new CompletionHandler();
static Task HandleCompletionThrowing(string context) => throw new DivideByZeroException();
var scatterRequestId = new ScatterRequestId("requestId");

await gateway.BeginScatter(scatterRequestId, "info");
await gateway.BeginScatter(scatterRequestId, "context");
await gateway.Scatter(scatterRequestId, new ScatterPartId[] { new("lorem"), new("ipsum") }, DoNothingCallback);
await gateway.EndScatter(scatterRequestId, HandleCompletion);
gatherCompleted.Should().BeFalse();
await gateway.EndScatter(scatterRequestId, completionHandler.HandleCompletion);
completionHandler.Completed.Should().BeFalse();

await gateway.Gather(scatterRequestId, new ScatterPartId[] { new("ipsum") }, HandleCompletion);
gatherCompleted.Should().BeFalse();
await gateway.Gather(scatterRequestId, new ScatterPartId[] { new("ipsum") }, completionHandler.HandleCompletion);
completionHandler.Completed.Should().BeFalse();
var act = () => gateway.Gather(scatterRequestId, new ScatterPartId[] { new("lorem") }, HandleCompletionThrowing);
await act.Should().ThrowAsync<DivideByZeroException>();
await gateway.Gather(scatterRequestId, new ScatterPartId[] { new("lorem") }, HandleCompletion);
gatherCompleted.Should().BeTrue();
await gateway.Gather(scatterRequestId, new ScatterPartId[] { new("lorem") }, completionHandler.HandleCompletion);
completionHandler.Completed.Should().BeTrue();
}

[Test]
public static async Task RetryScatterWithNewIds()
{
var gateway = CreateScatterGatherGateway();
var gatherCompleted = false;
Task HandleCompletion() => Taskify(() => gatherCompleted = true);
var completionHandler = new CompletionHandler();
var scatterRequestId = new ScatterRequestId("requestId");

await gateway.BeginScatter(scatterRequestId, "info");
await gateway.BeginScatter(scatterRequestId, "context");
await gateway.Scatter(scatterRequestId, new ScatterPartId[] { new("lorem"), new("ipsum"), new("dolor"), new("consectetur") }, DoNothingCallback);
await gateway.EndScatter(scatterRequestId, HandleCompletion);
gatherCompleted.Should().BeFalse();
await gateway.EndScatter(scatterRequestId, completionHandler.HandleCompletion);
completionHandler.Completed.Should().BeFalse();

await gateway.Gather(scatterRequestId, new ScatterPartId[] { new("ipsum") }, HandleCompletion);
gatherCompleted.Should().BeFalse();
await gateway.Gather(scatterRequestId, new ScatterPartId[] { new("dolor") }, HandleCompletion);
gatherCompleted.Should().BeFalse();
await gateway.Gather(scatterRequestId, new ScatterPartId[] { new("ipsum") }, completionHandler.HandleCompletion);
completionHandler.Completed.Should().BeFalse();
await gateway.Gather(scatterRequestId, new ScatterPartId[] { new("dolor") }, completionHandler.HandleCompletion);
completionHandler.Completed.Should().BeFalse();

await gateway.BeginScatter(scatterRequestId, "info");
await gateway.BeginScatter(scatterRequestId, "context");
await gateway.Scatter(scatterRequestId, new ScatterPartId[] { new("dolor"), new("sit"), new("amet") }, DoNothingCallback);
await gateway.EndScatter(scatterRequestId, HandleCompletion);
gatherCompleted.Should().BeFalse();

await gateway.Gather(scatterRequestId, new ScatterPartId[] { new("sit") }, HandleCompletion);
gatherCompleted.Should().BeFalse();
await gateway.Gather(scatterRequestId, new ScatterPartId[] { new("lorem") }, HandleCompletion);
gatherCompleted.Should().BeFalse();
await gateway.Gather(scatterRequestId, new ScatterPartId[] { new("dolor") }, HandleCompletion);
gatherCompleted.Should().BeFalse();
await gateway.Gather(scatterRequestId, new ScatterPartId[] { new("amet") }, HandleCompletion);
gatherCompleted.Should().BeTrue();
await gateway.EndScatter(scatterRequestId, completionHandler.HandleCompletion);
completionHandler.Completed.Should().BeFalse();

await gateway.Gather(scatterRequestId, new ScatterPartId[] { new("sit") }, completionHandler.HandleCompletion);
completionHandler.Completed.Should().BeFalse();
await gateway.Gather(scatterRequestId, new ScatterPartId[] { new("lorem") }, completionHandler.HandleCompletion);
completionHandler.Completed.Should().BeFalse();
await gateway.Gather(scatterRequestId, new ScatterPartId[] { new("dolor") }, completionHandler.HandleCompletion);
completionHandler.Completed.Should().BeFalse();
await gateway.Gather(scatterRequestId, new ScatterPartId[] { new("amet") }, completionHandler.HandleCompletion);
completionHandler.Completed.Should().BeTrue();
}

private class CompletionHandler
{
public bool Completed { get; private set; }
public string Context { get; private set; }

public CompletionHandler()
{
Completed = false;
Context = "";
}

public Task HandleCompletion(string context)
{
Completed = true;
Context = context;
return Task.CompletedTask;
}
}
}
2 changes: 1 addition & 1 deletion DynamoScatterGather/DynamoScatterGather.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<TargetFramework>net6.0</TargetFramework>
<Nullable>enable</Nullable>
<WarningsAsErrors>nullable</WarningsAsErrors>
<Version>0.1.0</Version>
<Version>0.2.0</Version>
<Authors>Salvo Isaja</Authors>
<Company>Salvo Isaja</Company>
<Copyright>Copyright 2023 Salvatore Isaja</Copyright>
Expand Down
6 changes: 3 additions & 3 deletions DynamoScatterGather/IScatterGatherGateway.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ namespace DynamoScatterGather;

public interface IScatterGatherGateway
{
Task BeginScatter(ScatterRequestId requestId, string info);
Task BeginScatter(ScatterRequestId requestId, string context);
Task Scatter(ScatterRequestId requestId, IEnumerable<ScatterPartId> partIds, Func<Task> callback);
Task<T> Scatter<T>(ScatterRequestId requestId, IEnumerable<ScatterPartId> partIds, Func<Task<T>> callback);
Task EndScatter(ScatterRequestId requestId, Func<Task> handleCompletion);
Task Gather(ScatterRequestId requestId, IReadOnlyCollection<ScatterPartId> partIds, Func<Task> handleCompletion);
Task EndScatter(ScatterRequestId requestId, Func<string, Task> handleCompletion);
Task Gather(ScatterRequestId requestId, IReadOnlyCollection<ScatterPartId> partIds, Func<string, Task> handleCompletion);
}
Loading

0 comments on commit 9f787ca

Please sign in to comment.