Skip to content

Commit

Permalink
f
Browse files Browse the repository at this point in the history
  • Loading branch information
oskardudycz committed May 21, 2024
1 parent 8cb8cb5 commit 6499ef4
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 26 deletions.
44 changes: 26 additions & 18 deletions Core.EventStoreDB/Config.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,30 @@ public static class EventStoreDBConfigExtensions
{
services
.AddSingleton(EventTypeMapper.Instance)
.AddSingleton(new EventStoreClient(EventStoreClientSettings.Create(eventStoreDBConfig.ConnectionString)))
.AddTransient<EventStoreDBSubscriptionToAll, EventStoreDBSubscriptionToAll>();
.AddSingleton(new EventStoreClient(EventStoreClientSettings.Create(eventStoreDBConfig.ConnectionString)));

if (options?.UseInternalCheckpointing != false)
{
services
.AddTransient<ISubscriptionCheckpointRepository, EventStoreDBSubscriptionCheckpointRepository>();
}

return services;
return services.AddHostedService(serviceProvider =>
{
var logger =
serviceProvider.GetRequiredService<ILogger<BackgroundWorker>>();
var coordinator = serviceProvider.GetRequiredService<EventStoreDBSubscriptioToAllCoordinator>();
TelemetryPropagator.UseDefaultCompositeTextMapPropagator();
return new BackgroundWorker<EventStoreDBSubscriptioToAllCoordinator>(
coordinator,
logger,
(c, ct) => c.SubscribeToAll(ct)
);
}
);
}

public static IServiceCollection AddEventStoreDBSubscriptionToAll(
Expand All @@ -62,28 +76,22 @@ public static class EventStoreDBConfigExtensions
{
services.AddScoped<EventsBatchProcessor, EventsBatchProcessor>();
services.AddScoped<IEventsBatchCheckpointer, EventsBatchCheckpointer>();
services.AddSingleton<EventStoreDBSubscriptioToAllCoordinator>();

if (checkpointToEventStoreDB)
{
services
.AddSingleton<ISubscriptionCheckpointRepository, EventStoreDBSubscriptionCheckpointRepository>();
}

return services.AddHostedService(serviceProvider =>
{
var logger =
serviceProvider.GetRequiredService<ILogger<BackgroundWorker>>();
var eventStoreDBSubscriptionToAll =
serviceProvider.GetRequiredService<EventStoreDBSubscriptionToAll>();
TelemetryPropagator.UseDefaultCompositeTextMapPropagator();
return new BackgroundWorker(
logger,
ct => eventStoreDBSubscriptionToAll.SubscribeToAll(subscriptionOptions, ct)
);
}
return services.AddKeyedSingleton<EventStoreDBSubscriptionToAll>(
$"ESDB_subscription-{subscriptionOptions.SubscriptionId}",
(sp, _) => new EventStoreDBSubscriptionToAll(
subscriptionOptions,
sp.GetRequiredService<EventStoreClient>(),
sp,
sp.GetRequiredService<ILogger<EventStoreDBSubscriptionToAll>>()
)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@ public static class PostgresCheckpointingConfiguration
.AddScoped<ISubscriptionCheckpointRepository, PostgresSubscriptionCheckpointRepository>()
.AddHostedService(serviceProvider =>
{
var logger = serviceProvider.GetRequiredService<ILogger<BackgroundWorker>>();
var logger = serviceProvider.GetRequiredService<ILogger<BackgroundWorker<
PostgresSubscriptionCheckpointSetup>>>();
var checkpointSetup = serviceProvider.GetRequiredService<PostgresSubscriptionCheckpointSetup>();
TelemetryPropagator.UseDefaultCompositeTextMapPropagator();
return new BackgroundWorker(
return new BackgroundWorker<PostgresSubscriptionCheckpointSetup>(
checkpointSetup,
logger,
async ct => await checkpointSetup.EnsureCheckpointsTableExist(ct).ConfigureAwait(false)
async (setup, ct) => await setup.EnsureCheckpointsTableExist(ct).ConfigureAwait(false)
);
}
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace Core.EventStoreDB.Subscriptions;

public class EventStoreDBSubscriptioToAllCoordinator(IDictionary<string, EventStoreDBSubscriptionToAll> subscriptions)
{
public async Task SubscribeToAll(CancellationToken ct)
{
// see: https://github.com/dotnet/runtime/issues/36063
await Task.Yield();

await Task.WhenAll(subscriptions.Values.Select(s => s.SubscribeToAll(ct))).ConfigureAwait(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,19 @@ public class EventStoreDBSubscriptionToAllOptions
}

public class EventStoreDBSubscriptionToAll(
EventStoreDBSubscriptionToAllOptions subscriptionOptions,
EventStoreClient eventStoreClient,
IServiceProvider serviceProvider,
ILogger<EventStoreDBSubscriptionToAll> logger
)
{
private EventStoreDBSubscriptionToAllOptions subscriptionOptions = default!;
private string SubscriptionId => subscriptionOptions.SubscriptionId;

public async Task SubscribeToAll(EventStoreDBSubscriptionToAllOptions subscriptionOptions, CancellationToken ct)
public async Task SubscribeToAll(CancellationToken ct)
{
// see: https://github.com/dotnet/runtime/issues/36063
await Task.Yield();

this.subscriptionOptions = subscriptionOptions;

logger.LogInformation("Subscription to all '{SubscriptionId}'", subscriptionOptions.SubscriptionId);

try
Expand Down Expand Up @@ -92,7 +90,7 @@ await foreach (var events in subscription.BatchAsync(subscriptionOptions.BatchSi
// Randomness added to reduce the chance of multiple subscriptions trying to reconnect at the same time
Thread.Sleep(1000 + new Random((int)DateTime.UtcNow.Ticks).Next(1000));

await SubscribeToAll(this.subscriptionOptions, ct).ConfigureAwait(false);
await SubscribeToAll(ct).ConfigureAwait(false);
}
}

Expand Down
8 changes: 8 additions & 0 deletions Core/BackgroundWorkers/BackgroundWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@

namespace Core.BackgroundWorkers;

public class BackgroundWorker<TService>(
TService service,
ILogger<BackgroundWorker> logger,
Func<TService, CancellationToken, Task> perform)
: BackgroundWorker(logger, ct => perform(service, ct))
{
}

public class BackgroundWorker(
ILogger<BackgroundWorker> logger,
Func<CancellationToken, Task> perform)
Expand Down
2 changes: 2 additions & 0 deletions Core/Config.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Core.Commands;
using Core.Events;
using Core.Extensions;
using Core.Ids;
using Core.OpenTelemetry;
using Core.Queries;
Expand All @@ -14,6 +15,7 @@ public static class Config
public static IServiceCollection AddCoreServices(this IServiceCollection services)
{
services
.AllowResolvingKeyedServicesAsDictionary()
.AddSingleton(TimeProvider.System)
.AddSingleton(ActivityScope.Instance)
.AddEventBus()
Expand Down
73 changes: 73 additions & 0 deletions Core/Extensions/DIExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
using System.Collections.ObjectModel;
using Microsoft.Extensions.DependencyInjection;

namespace Core.Extensions;

public static class DIExtensions
{
// Taken from: https://stackoverflow.com/a/77559901
public static IServiceCollection AllowResolvingKeyedServicesAsDictionary(
this IServiceCollection sc)
{
// KeyedServiceCache caches all the keys of a given type for a
// specific service type. By making it a singleton we only have
// determine the keys once, which makes resolving the dict very fast.
sc.AddSingleton(typeof(KeyedServiceCache<,>));

// KeyedServiceCache depends on the IServiceCollection to get
// the list of keys. That's why we register that here as well, as it
// is not registered by default in MS.DI.
sc.AddSingleton(sc);

// Last we make the registration for the dictionary itself, which maps
// to our custom type below. This registration must be transient, as
// the containing services could have any lifetime and this registration
// should by itself not cause Captive Dependencies.
sc.AddTransient(typeof(IDictionary<,>), typeof(KeyedServiceDictionary<,>));

// For completeness, let's also allow IReadOnlyDictionary to be resolved.
sc.AddTransient(
typeof(IReadOnlyDictionary<,>), typeof(KeyedServiceDictionary<,>));

return sc;
}

// We inherit from ReadOnlyDictionary, to disallow consumers from changing
// the wrapped dependencies while reusing all its functionality. This way
// we don't have to implement IDictionary<T,V> ourselves; too much work.
private sealed class KeyedServiceDictionary<TKey, TService>(
KeyedServiceCache<TKey, TService> keys, IServiceProvider provider)
: ReadOnlyDictionary<TKey, TService>(Create(keys, provider))
where TKey : notnull
where TService : notnull
{
private static Dictionary<TKey, TService> Create(
KeyedServiceCache<TKey, TService> keys, IServiceProvider provider)
{
var dict = new Dictionary<TKey, TService>(capacity: keys.Keys.Length);

foreach (TKey key in keys.Keys)
{
dict[key] = provider.GetRequiredKeyedService<TService>(key);
}

return dict;
}
}

private sealed class KeyedServiceCache<TKey, TService>(IServiceCollection sc)
where TKey : notnull
where TService : notnull
{
// Once this class is resolved, all registrations are guaranteed to be
// made, so we can, at that point, safely iterate the collection to get
// the keys for the service type.
public TKey[] Keys { get; } = (
from service in sc
where service.ServiceKey != null
where service.ServiceKey!.GetType() == typeof(TKey)
where service.ServiceType == typeof(TService)
select (TKey)service.ServiceKey!)
.ToArray();
}
}
2 changes: 2 additions & 0 deletions Sample/EventStoreDB/Simple/ECommerce.Core/Configuration.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Core.Events;
using Core.EventStoreDB;
using Core.Extensions;
using Core.OpenTelemetry;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
Expand All @@ -13,6 +14,7 @@ public static class Configuration
IConfiguration configuration
) =>
services
.AllowResolvingKeyedServicesAsDictionary()
.AddSingleton<IActivityScope, ActivityScope>()
.AddEventBus()
.AddEventStoreDB(configuration);
Expand Down

0 comments on commit 6499ef4

Please sign in to comment.