Skip to content

Commit

Permalink
Fixed Batching and disabled AsyncDaemon for EVentStoreDB + Marten con…
Browse files Browse the repository at this point in the history
…figuration
  • Loading branch information
oskardudycz committed May 21, 2024
1 parent 2f51b6e commit 3e59018
Show file tree
Hide file tree
Showing 19 changed files with 76 additions and 86 deletions.
1 change: 0 additions & 1 deletion Core.EventStoreDB/Config.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ EventStoreDBSubscriptionToAllOptions subscriptionOptions
{
var subscription = new EventStoreDBSubscriptionToAll(
sp.GetRequiredService<EventStoreClient>(),
sp.GetRequiredService<ISubscriptionStoreSetup>(),
sp.GetRequiredService<ILogger<EventStoreDBSubscriptionToAll>>()
) { Options = subscriptionOptions, GetHandlers = handlers };
Expand Down
1 change: 1 addition & 0 deletions Core.EventStoreDB/Core.EventStoreDB.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
<!-- This dependency is only needed for the PostgreSQL subscriptions checkpoints -->
<PackageReference Include="Npgsql" Version="8.0.3" />
<PackageReference Include="Open.ChannelExtensions" Version="8.3.0" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public class SubscriptionInfo
public class EventStoreDBSubscriptioToAllCoordinator
{
private readonly IDictionary<string, SubscriptionInfo> subscriptions;
private readonly ISubscriptionStoreSetup storeSetup;
private readonly IServiceScopeFactory serviceScopeFactory;

private readonly Channel<EventBatch> events = Channel.CreateBounded<EventBatch>(
Expand All @@ -28,24 +29,30 @@ public class EventStoreDBSubscriptioToAllCoordinator
}
);

public EventStoreDBSubscriptioToAllCoordinator(IDictionary<string, EventStoreDBSubscriptionToAll> subscriptions,
IServiceScopeFactory serviceScopeFactory)
public EventStoreDBSubscriptioToAllCoordinator(
IDictionary<string, EventStoreDBSubscriptionToAll> subscriptions,
ISubscriptionStoreSetup storeSetup,
IServiceScopeFactory serviceScopeFactory
)
{
this.subscriptions =
subscriptions.ToDictionary(ks => ks.Key,
vs => new SubscriptionInfo { Subscription = vs.Value, LastCheckpoint = Checkpoint.None }
);
this.storeSetup = storeSetup;
this.serviceScopeFactory = serviceScopeFactory;
}

public ChannelReader<EventBatch> Reader => events.Reader;
public ChannelWriter<EventBatch> Writer => events.Writer;
private ChannelReader<EventBatch> Reader => events.Reader;
private ChannelWriter<EventBatch> Writer => events.Writer;

public async Task SubscribeToAll(CancellationToken ct)
{
// see: https://github.com/dotnet/runtime/issues/36063
await Task.Yield();

await storeSetup.EnsureStoreExists(ct).ConfigureAwait(false);

var tasks = subscriptions.Select(s => Task.Run(async () =>
{
var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
Expand Down
34 changes: 17 additions & 17 deletions Core.EventStoreDB/Subscriptions/EventStoreDBSubscriptionToAll.cs
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
using System.Threading.Channels;
using Core.Events;
using Core.EventStoreDB.Subscriptions.Batch;
using Core.EventStoreDB.Subscriptions.Checkpoints;
using Core.Extensions;
using EventStore.Client;
using Grpc.Core;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Open.ChannelExtensions;
using Polly;
using EventTypeFilter = EventStore.Client.EventTypeFilter;

namespace Core.EventStoreDB.Subscriptions;

using static ISubscriptionCheckpointRepository;

public class EventStoreDBSubscriptionToAllOptions
{
public required string SubscriptionId { get; init; }
Expand All @@ -27,11 +24,11 @@ public class EventStoreDBSubscriptionToAllOptions
public bool IgnoreDeserializationErrors { get; set; } = true;

public int BatchSize { get; set; } = 1;
public int BatchDeadline { get; set; } = 50;
}

public class EventStoreDBSubscriptionToAll(
EventStoreClient eventStoreClient,
ISubscriptionStoreSetup storeSetup,
ILogger<EventStoreDBSubscriptionToAll> logger
)
{
Expand Down Expand Up @@ -63,8 +60,6 @@ public async Task SubscribeToAll(Checkpoint checkpoint, ChannelWriter<EventBatch

try
{
await storeSetup.EnsureStoreExists(ct).ConfigureAwait(false);

var subscription = eventStoreClient.SubscribeToAll(
checkpoint != Checkpoint.None ? FromAll.After(checkpoint) : FromAll.Start,
Options.ResolveLinkTos,
Expand All @@ -77,16 +72,14 @@ public async Task SubscribeToAll(Checkpoint checkpoint, ChannelWriter<EventBatch

logger.LogInformation("Subscription to all '{SubscriptionId}' started", SubscriptionId);

// await foreach (var @event in subscription)
// TODO: Add proper batching here!
await foreach (var events in subscription
.BatchAsync(Options.BatchSize, TimeSpan.FromMilliseconds(100), ct)
.ConfigureAwait(false))
{
//ResolvedEvent[] events = [@event];
await cw.WriteAsync(new EventBatch(Options.SubscriptionId, events), ct)
.ConfigureAwait(false);
}

await subscription.Pipe<ResolvedEvent, EventBatch>(
cw,
events => new EventBatch(Options.SubscriptionId, events.ToArray()),
Options.BatchSize,
Options.BatchDeadline,
ct
).ConfigureAwait(false);
}
catch (RpcException rpcException) when (rpcException is { StatusCode: StatusCode.Cancelled } ||
rpcException.InnerException is ObjectDisposedException)
Expand Down Expand Up @@ -115,4 +108,11 @@ await cw.WriteAsync(new EventBatch(Options.SubscriptionId, events), ct)
await SubscribeToAll(checkpoint, cw, ct).ConfigureAwait(false);
}
}
//
// private AsyncPolicy retry = Policy.Handle<RpcException>(rpcException =>
// rpcException is { StatusCode: StatusCode.Cancelled } ||
// rpcException.InnerException is ObjectDisposedException)
// .Or<OperationCanceledException>()
// .F
// .WaitAndRetryForeverAsync(_ => TimeSpan.FromMilliseconds(500));
}
19 changes: 13 additions & 6 deletions Core.Marten/MartenConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,22 @@ public static class MartenConfigExtensions
IConfiguration configuration,
Action<StoreOptions>? configureOptions = null,
string configKey = DefaultConfigKey,
bool useExternalBus = false
bool useExternalBus = false,
bool disableAsyncDaemon = false
) =>
services.AddMarten(
configuration.GetRequiredConfig<MartenConfig>(configKey),
configureOptions,
useExternalBus
useExternalBus,
disableAsyncDaemon
);

public static MartenServiceCollectionExtensions.MartenConfigurationExpression AddMarten(
this IServiceCollection services,
MartenConfig martenConfig,
Action<StoreOptions>? configureOptions = null,
bool useExternalBus = false
bool useExternalBus = false,
bool disableAsyncDaemon = false
)
{
var config = services
Expand All @@ -67,10 +70,14 @@ public static class MartenConfigExtensions
return SetStoreOptions(martenConfig, configureOptions);
})
.UseLightweightSessions()
.ApplyAllDatabaseChangesOnStartup()
.ApplyAllDatabaseChangesOnStartup();

if (!disableAsyncDaemon)
{
//.OptimizeArtifactWorkflow()
.AddAsyncDaemon(martenConfig.DaemonMode)
.AddSubscriptionWithServices<MartenEventPublisher>(ServiceLifetime.Scoped);
config.AddAsyncDaemon(martenConfig.DaemonMode)
.AddSubscriptionWithServices<MartenEventPublisher>(ServiceLifetime.Scoped);
}

if (useExternalBus)
services.AddMartenAsyncCommandBus();
Expand Down
2 changes: 1 addition & 1 deletion Core.Testing/Core.Testing.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="8.0.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
<PackageReference Include="Open.ChannelExtensions" Version="8.0.2" />
<PackageReference Include="Open.ChannelExtensions" Version="8.3.0" />
<PackageReference Include="GitHubActionsTestLogger" Version="2.3.3">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
1 change: 1 addition & 0 deletions Core/Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="8.0.1" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
<PackageReference Include="Open.ChannelExtensions" Version="8.3.0" />
<PackageReference Include="Polly" Version="8.3.1" />
<PackageReference Include="RestSharp" Version="110.2.0" />
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.8.1" />
Expand Down
67 changes: 21 additions & 46 deletions Core/Extensions/AsyncEnumerableExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,61 +1,36 @@
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading.Channels;
using Open.ChannelExtensions;

namespace Core.Extensions;

public static class AsyncEnumerableExtensions
{
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

public static class AsyncEnumerableExtensions
{
public static async IAsyncEnumerable<T[]> BatchAsync<T>(
this IAsyncEnumerable<T> source,
public static async Task Pipe<T, TResult>(
this IAsyncEnumerable<T> enumerable,
ChannelWriter<TResult> cw,
Func<List<T>, TResult> transform,
int batchSize,
TimeSpan maxBatchTime,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
int timeout,
CancellationToken ct
)
{
var batch = new List<T>();
var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

try
{
await foreach (var item in source.WithCancellation(cts.Token).ConfigureAwait(false))
var channel = Channel.CreateUnbounded<T>(
new UnboundedChannelOptions
{
batch.Add(item);
if (batch.Count == 1) // Reset the timer when the first item is added
{
cts.CancelAfter(maxBatchTime); // Set or reset the deadline
}

if (batch.Count >= batchSize)
{
yield return batch.ToArray();
batch.Clear();
cts.CancelAfter(maxBatchTime); // Reset the deadline for the new batch
}
SingleWriter = false, SingleReader = true, AllowSynchronousContinuations = false
}
);

if (batch.Count > 0)
{
yield return batch.ToArray(); // Return any remaining items as a batch
}
}
catch (OperationCanceledException)
channel.Reader.Batch(batchSize).WithTimeout(timeout).PipeAsync(async batch =>
{
if (batch.Count > 0)
{
yield return batch.ToArray(); // Yield whatever is in the batch when the timeout occurs
}
// Optionally, rethrow or handle the cancellation if needed
}
finally
await cw.WriteAsync(transform(batch), ct).ConfigureAwait(false);
return batch;
});

await foreach (var @event in enumerable.WithCancellation(ct))
{
cts.Dispose(); // Ensure the CancellationTokenSource is disposed to free resources
await channel.Writer.WriteAsync(@event, ct).ConfigureAwait(false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class ConfirmShoppingCartTests(ApiSpecification<Program> api): IClassFixt
.Then(OK)
.And()
.When(GET, URI(ctx => $"/api/ShoppingCarts/{ctx.OpenedShoppingCartId()}"))
.Until(RESPONSE_ETAG_IS(3))
.Until(RESPONSE_ETAG_IS(3), 10)
.Then(
OK,
RESPONSE_BODY<ShoppingCartDetails>((details, ctx) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class OpenShoppingCartTests(ApiSpecification<Program> api): IClassFixture
response =>
api.Given()
.When(GET, URI($"/api/ShoppingCarts/{response.GetCreatedId()}"))
.Until(RESPONSE_ETAG_IS(1))
.Until(RESPONSE_ETAG_IS(1), 10)
.Then(
OK,
RESPONSE_BODY<ShoppingCartDetails>(details =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ await api
.Then(OK)
.And()
.When(GET, URI(ctx => $"/api/ShoppingCarts/{ctx.OpenedShoppingCartId()}"))
.Until(RESPONSE_ETAG_IS(1))
.Until(RESPONSE_ETAG_IS(1), 10)
.Then(
RESPONSE_BODY<ShoppingCartDetails>((details, ctx) =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class CancelShoppingCartTests(ApiSpecification<Program> api): IClassFixtu
.Then(OK)
.And()
.When(GET, URI(ctx => $"/api/ShoppingCarts/{ctx.OpenedShoppingCartId()}"))
.Until(RESPONSE_ETAG_IS(1))
.Until(RESPONSE_ETAG_IS(1), 10)
.Then(
OK,
RESPONSE_BODY<ShoppingCartDetails>((details, ctx) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ await api
await api
.Given()
.When(GET, URI($"/api/ShoppingCarts/{api.ShoppingCartId}"))
.Until(RESPONSE_ETAG_IS(1))
.Until(RESPONSE_ETAG_IS(1), 10)
.Then(
OK,
RESPONSE_BODY<ShoppingCartDetails>(details =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ await api
.Then(NO_CONTENT)
.And()
.When(GET, URI($"/api/ShoppingCarts/{api.ShoppingCartId}"))
.Until(RESPONSE_ETAG_IS(2))
.Until(RESPONSE_ETAG_IS(2), 10)
.Then(
OK,
RESPONSE_BODY<ShoppingCartDetails>(details =>
Expand Down
2 changes: 1 addition & 1 deletion Sample/EventStoreDB/ECommerce/Carts/Carts/Config.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public static class Config
public static IServiceCollection AddCartsModule(this IServiceCollection services, IConfiguration config) =>
services
// Document Part used for projections
.AddMarten(config, configKey: "ReadModel_Marten")
.AddMarten(config, configKey: "ReadModel_Marten", disableAsyncDaemon: true)
.Services
.AddCarts()
.AddEventStoreDB(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public Task Post_Should_AddProductItem_To_ShoppingCart()
.Then(OK)
.And()
.When(GET, URI(ctx => $"/api/ShoppingCarts/{ctx.OpenedShoppingCartId()}"))
.Until(RESPONSE_ETAG_IS(1))
.Until(RESPONSE_ETAG_IS(1), 10)
.Then(
RESPONSE_BODY<ShoppingCartDetails>((details, ctx) =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class CancelShoppingCartTests(ApiSpecification<Program> api): IClassFixtu
.Then(OK)
.And()
.When(GET, URI(ctx => $"/api/ShoppingCarts/{ctx.OpenedShoppingCartId()}"))
.Until(RESPONSE_ETAG_IS(1))
.Until(RESPONSE_ETAG_IS(1), 10)
.Then(
OK,
RESPONSE_BODY<ShoppingCartDetails>((details, ctx) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class OpenShoppingCartTests(ShoppingCartsApplicationFactory applicationFa
response =>
API.Given()
.When(GET, URI($"/api/ShoppingCarts/{response.GetCreatedId()}"))
.Until(RESPONSE_ETAG_IS(0))
.Until(RESPONSE_ETAG_IS(0), 10)
.Then(
OK,
RESPONSE_BODY<ShoppingCartDetails>(details =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ public class ShoppingCartDetails
public Guid Id { get; set; }
public Guid ClientId { get; set; }
public ShoppingCartStatus Status { get; set; }
public List<ShoppingCartDetailsProductItem> ProductItems { get; set; } = default!;
public List<ShoppingCartDetailsProductItem> ProductItems { get; set; } = new();
public int Version { get; set; }
public ulong LastProcessedPosition { get; set; }
}
Expand Down

0 comments on commit 3e59018

Please sign in to comment.