Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added EventStoreDB E-Commerce samples version #49

Merged
merged 9 commits into from Jul 2, 2021
68 changes: 68 additions & 0 deletions Core.EventStoreDB/Config.cs
@@ -0,0 +1,68 @@
using System;
using Core.EventStoreDB.Subscriptions;
using Core.Subscriptions;
using EventStore.Client;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;

namespace Core.EventStoreDB
{
public class EventStoreDBConfig
{
public string ConnectionString { get; set; } = default!;
}

public record EventStoreDBOptions(
bool UseInternalCheckpointing = true
);

public static class EventStoreDBConfigExtensions
{
private const string DefaultConfigKey = "EventStore";

public static IServiceCollection AddEventStoreDB(this IServiceCollection services, IConfiguration config, EventStoreDBOptions? options = null)
{
var eventStoreDBConfig = config.GetSection(DefaultConfigKey).Get<EventStoreDBConfig>();

services.AddSingleton(
new EventStoreClient(EventStoreClientSettings.Create(eventStoreDBConfig.ConnectionString)));

if (options?.UseInternalCheckpointing != false)
{
services
.AddTransient<ISubscriptionCheckpointRepository, EventStoreDBSubscriptionCheckpointRepository>();
}

return services;
}

public static IServiceCollection AddEventStoreDBSubscriptionToAll(
this IServiceCollection services,
string subscriptionId,
SubscriptionFilterOptions? filterOptions = null,
Action<EventStoreClientOperationOptions>? configureOperation = null,
UserCredentials? credentials = null,
bool checkpointToEventStoreDB = true)
{
if (checkpointToEventStoreDB)
{
services
.AddTransient<ISubscriptionCheckpointRepository, EventStoreDBSubscriptionCheckpointRepository>();
}

return services.AddHostedService(serviceProvider =>
new SubscribeToAllBackgroundWorker(
serviceProvider,
serviceProvider.GetRequiredService<EventStoreClient>(),
serviceProvider.GetRequiredService<ISubscriptionCheckpointRepository>(),
serviceProvider.GetRequiredService<ILogger<SubscribeToAllBackgroundWorker>>(),
subscriptionId,
filterOptions,
configureOperation,
credentials
)
);
}
}
}
24 changes: 24 additions & 0 deletions Core.EventStoreDB/Core.EventStoreDB.csproj
@@ -0,0 +1,24 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net5.0</TargetFramework>
<Nullable>enable</Nullable>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
</PropertyGroup>


<ItemGroup>
<PackageReference Include="EventStore.Client.Grpc.Streams" Version="21.2.0" />
<PackageReference Include="Grpc.Net.Client" Version="2.38.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="5.0.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Core\Core.csproj" />
</ItemGroup>

</Project>
41 changes: 41 additions & 0 deletions Core.EventStoreDB/Events/AggregateStreamExtensions.cs
@@ -0,0 +1,41 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Core.Aggregates;
using Core.Events;
using Core.EventStoreDB.Serialization;
using Core.Projections;
using EventStore.Client;

namespace Core.EventStoreDB.Events
{
public static class AggregateStreamExtensions
{
public static async Task<T?> AggregateStream<T>(
this EventStoreClient eventStore,
Guid id,
CancellationToken cancellationToken,
ulong? fromVersion = null
) where T : class, IProjection
{
var readResult = eventStore.ReadStreamAsync(
Direction.Forwards,
StreamNameMapper.ToStreamId<T>(id),
fromVersion ?? StreamPosition.Start,
cancellationToken: cancellationToken
);

// TODO: consider adding extension method for the aggregation and deserialisation
var aggregate = (T)Activator.CreateInstance(typeof(T), true)!;

await foreach (var @event in readResult)
{
var eventData = @event.Deserialize();

aggregate.When(eventData!);
}

return aggregate;
}
}
}
67 changes: 67 additions & 0 deletions Core.EventStoreDB/Repository/EventStoreDBRepository.cs
@@ -0,0 +1,67 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Core.Aggregates;
using Core.Events;
using Core.EventStoreDB.Events;
using Core.EventStoreDB.Serialization;
using Core.Repositories;
using EventStore.Client;

namespace Core.EventStoreDB.Repository
{
public class EventStoreDBRepository<T>: IRepository<T> where T : class, IAggregate
{
private readonly EventStoreClient eventStore;
private readonly IEventBus eventBus;

public EventStoreDBRepository(
EventStoreClient eventStoreDBClient,
IEventBus eventBus
)
{
this.eventStore = eventStoreDBClient ?? throw new ArgumentNullException(nameof(eventStoreDBClient));
this.eventBus = eventBus ?? throw new ArgumentNullException(nameof(eventBus));
}

public Task<T?> Find(Guid id, CancellationToken cancellationToken)
{
return eventStore.AggregateStream<T>(
id,
cancellationToken
);
}

public Task Add(T aggregate, CancellationToken cancellationToken)
{
return Store(aggregate, cancellationToken);
}

public Task Update(T aggregate, CancellationToken cancellationToken)
{
return Store(aggregate, cancellationToken);
}

public Task Delete(T aggregate, CancellationToken cancellationToken)
{
return Store(aggregate, cancellationToken);
}

private async Task Store(T aggregate, CancellationToken cancellationToken)
{
var events = aggregate.DequeueUncommittedEvents();

var eventsToStore = events
.Select(EventStoreDBSerializer.ToJsonEventData).ToArray();

await eventStore.AppendToStreamAsync(
StreamNameMapper.ToStreamId<T>(aggregate.Id),
// TODO: Add proper optimistic concurrency handling
StreamState.Any,
eventsToStore,
cancellationToken: cancellationToken
);
}
}
}
30 changes: 30 additions & 0 deletions Core.EventStoreDB/Serialization/EventStoreDBSerializer.cs
@@ -0,0 +1,30 @@
using System.Text;
using Core.Events;
using Core.Reflection;
using EventStore.Client;
using Newtonsoft.Json;

namespace Core.EventStoreDB.Serialization
{
public static class EventStoreDBSerializer
{
public static T Deserialize<T>(this ResolvedEvent resolvedEvent) => (T)Deserialize(resolvedEvent);

public static object Deserialize(this ResolvedEvent resolvedEvent)
{
// get type
var eventType = EventTypeMapper.ToType(resolvedEvent.Event.EventType);

// deserialize event
return JsonConvert.DeserializeObject(Encoding.UTF8.GetString(resolvedEvent.Event.Data.Span), eventType!)!;
}

public static EventData ToJsonEventData(this IEvent @event) =>
new(
Uuid.NewUuid(),
EventTypeMapper.ToName(@event.GetType()),
Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(@event)),
Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new { }))
);
}
}
@@ -0,0 +1,76 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Core.Events;
using Core.EventStoreDB.Serialization;
using Core.Subscriptions;
using EventStore.Client;

namespace Core.EventStoreDB.Subscriptions
{
public record CheckpointStored(string SubscriptionId, ulong? Position, DateTime CheckpointedAt): IEvent;

public class EventStoreDBSubscriptionCheckpointRepository: ISubscriptionCheckpointRepository
{
private readonly EventStoreClient eventStoreClient;

public EventStoreDBSubscriptionCheckpointRepository(
EventStoreClient eventStoreClient)
{
this.eventStoreClient = eventStoreClient ?? throw new ArgumentNullException(nameof(eventStoreClient));
}

public async ValueTask<ulong?> Load(string subscriptionId, CancellationToken ct)
{
var streamName = GetCheckpointStreamName(subscriptionId);

var result = eventStoreClient.ReadStreamAsync(Direction.Backwards, streamName, StreamPosition.End, 1,
cancellationToken: ct);

if (await result.ReadState == ReadState.StreamNotFound)
{
return null;
}

ResolvedEvent? @event = await result.FirstOrDefaultAsync(ct);

return @event?.Deserialize<CheckpointStored>().Position;
}

public async ValueTask Store(string subscriptionId, ulong position, CancellationToken ct)
{
var @event = new CheckpointStored(subscriptionId, position, DateTime.UtcNow);
var eventToAppend = new[] {@event.ToJsonEventData()};
var streamName = GetCheckpointStreamName(subscriptionId);

try
{
await eventStoreClient.AppendToStreamAsync(
streamName,
StreamState.StreamExists,
eventToAppend,
cancellationToken: ct
);
}
catch (WrongExpectedVersionException)
{
await eventStoreClient.SetStreamMetadataAsync(
streamName,
StreamState.NoStream,
new StreamMetadata(1),
cancellationToken: ct
);

await eventStoreClient.AppendToStreamAsync(
streamName,
StreamState.NoStream,
eventToAppend,
cancellationToken: ct
);
}
}

private static string GetCheckpointStreamName(string subscriptionId) => $"checkpoint_{subscriptionId}";
}
}