Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactored EventStoreDB subscriptions making it more scalable #275

Merged
merged 18 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
24370c4
Refactored Postgres checkpointing repository to take dedicated Postgr…
oskardudycz May 19, 2024
2a593e2
Introduced IEventBatchHandler to allow injecting custom batch handlin…
oskardudycz May 19, 2024
9482330
Added Core.EntityFramework project with EntityFrameworkProjection
oskardudycz May 19, 2024
1789c30
Added tests for EntityFrameworkProjection and fixed the implementation.
oskardudycz May 20, 2024
f3bdd34
Fixed handling for Add, Updates and Removes in the same batch
oskardudycz May 20, 2024
5c2ac46
Connected the generic EntityFrameworkProjection with the Simple imple…
oskardudycz May 20, 2024
8cb8cb5
Refactored Checkpointing, moved setup to hosted service
oskardudycz May 21, 2024
6499ef4
f
oskardudycz May 21, 2024
4a242c5
Added EventStoreDB subscription coordinator with handling multiple su…
oskardudycz May 21, 2024
2f51b6e
Improved batching
oskardudycz May 21, 2024
3e59018
Fixed Batching and disabled AsyncDaemon for EVentStoreDB + Marten con…
oskardudycz May 21, 2024
02e55b5
Added Collection Fixture to have Host setup only once
oskardudycz May 22, 2024
85b7824
Made collection test for the EventStoreDB + Marten samples
oskardudycz May 22, 2024
77129e4
Fixed scope handling in the batch processing
oskardudycz May 22, 2024
99ba1ac
Added long polling example
oskardudycz May 22, 2024
4e5b32c
Added Polly retry policy to SubscribeToAll
oskardudycz May 22, 2024
e6e870a
It appeared that there's no need (for now) for Channels besides the e…
oskardudycz May 22, 2024
45583fe
Cleaned up leftovers after the EventStoreDB refactorings
oskardudycz May 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions Core.EntityFramework.Tests/Core.EntityFramework.Tests.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<None Update="xunit.runner.json;appsettings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Core.EntityFramework\Core.EntityFramework.csproj"/>
<ProjectReference Include="..\Core.Testing\Core.Testing.csproj"/>
</ItemGroup>

<ItemGroup>
<PackageReference Include="FluentAssertions" Version="6.12.0"/>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.9.0"/>
<PackageReference Include="Microsoft.Extensions.TimeProvider.Testing" Version="8.5.0"/>
<PackageReference Include="GitHubActionsTestLogger" Version="2.3.3">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="xunit" Version="2.8.0"/>
<PackageReference Include="xunit.runner.visualstudio" Version="2.8.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets>
</PackageReference>
<!-- For PostgreSQL checkpointing tests -->
<PackageReference Include="Testcontainers.PostgreSql" Version="3.8.0"/>
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="8.0.4">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
</ItemGroup>

<ItemGroup>
<Content Update="appsettings.json">
<ExcludeFromSingleFile>true</ExcludeFromSingleFile>
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
<CopyToPublishDirectory>PreserveNewest</CopyToPublishDirectory>
</Content>
</ItemGroup>


</Project>
124 changes: 124 additions & 0 deletions Core.EntityFramework.Tests/EntityFrameworkProjectionTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
using Core.EntityFramework.Projections;
using Core.EntityFramework.Tests.Fixtures;
using Core.Events;
using FluentAssertions;
using Microsoft.EntityFrameworkCore;
using Xunit;

namespace Core.EntityFramework.Tests;

public record Opened(Guid ShoppingCartId, Guid ClientId);

public record ProductItemAdded(Guid ShoppingCartId, int Count);

public record Cancelled(Guid ShoppingCartId);

public class ShoppingCartProjection: EntityFrameworkProjection<ShoppingCart, Guid, TestDbContext>
{
public ShoppingCartProjection()
{
ViewId(c => c.Id);

Creates<Opened>(Create);
Projects<ProductItemAdded>(e => e.ShoppingCartId, Apply);
Deletes<Cancelled>(e => e.ShoppingCartId);
}

private ShoppingCart Create(Opened opened) =>
new() { Id = opened.ShoppingCartId, ClientId = opened.ClientId, ProductCount = 0 };


private ShoppingCart Apply(ShoppingCart cart, ProductItemAdded added)
{
cart.ProductCount += added.Count;
return cart;
}
}

public class EntityFrameworkProjectionTests(EfCorePostgresContainerFixture<TestDbContext> fixture)
: IClassFixture<EfCorePostgresContainerFixture<TestDbContext>>
{
private readonly TestDbContext context = fixture.DbContext;

[Fact]
public async Task Applies_Works_Separately()
{
var projection = new ShoppingCartProjection { DbContext = context };

var cartId = Guid.NewGuid();
var clientId = Guid.NewGuid();

await projection.Handle([EventEnvelope.From(new Opened(cartId, clientId))], CancellationToken.None);
await context.SaveChangesAsync();

var savedEntity = await context.ShoppingCarts.Where(e => e.Id == cartId).FirstOrDefaultAsync();
savedEntity.Should().NotBeNull();
savedEntity.Should().BeEquivalentTo(new ShoppingCart { Id = cartId, ClientId = clientId, ProductCount = 0 });

await projection.Handle([EventEnvelope.From(new ProductItemAdded(cartId, 2))], CancellationToken.None);
await context.SaveChangesAsync();

savedEntity = await context.ShoppingCarts.Where(e => e.Id == cartId).FirstOrDefaultAsync();
savedEntity.Should().NotBeNull();
savedEntity.Should().BeEquivalentTo(new ShoppingCart { Id = cartId, ClientId = clientId, ProductCount = 2 });


await projection.Handle([EventEnvelope.From(new Cancelled(cartId))], CancellationToken.None);
await context.SaveChangesAsync();

savedEntity = await context.ShoppingCarts.Where(e => e.Id == cartId).FirstOrDefaultAsync();
savedEntity.Should().BeNull();
}

[Fact]
public async Task Applies_Works_In_Batch()
{
var projection = new ShoppingCartProjection { DbContext = context };

var cartId = Guid.NewGuid();
var clientId = Guid.NewGuid();

await projection.Handle([
EventEnvelope.From(new Opened(cartId, clientId)),
EventEnvelope.From(new ProductItemAdded(cartId, 2)),
EventEnvelope.From(new ProductItemAdded(cartId, 3)),
EventEnvelope.From(new ProductItemAdded(cartId, 5))
], CancellationToken.None);
await context.SaveChangesAsync();

var savedEntity = await context.ShoppingCarts.Where(e => e.Id == cartId).FirstOrDefaultAsync();
savedEntity.Should().NotBeNull();
savedEntity.Should().BeEquivalentTo(new ShoppingCart { Id = cartId, ClientId = clientId, ProductCount = 10 });
}

[Fact]
public async Task Applies_Works_In_Batch_With_AddAndDeleteOnTheSameRecord()
{
var projection = new ShoppingCartProjection { DbContext = context };

var cartId = Guid.NewGuid();
var clientId = Guid.NewGuid();

await projection.Handle([
EventEnvelope.From(new Opened(cartId, clientId)),
EventEnvelope.From(new Cancelled(cartId))
], CancellationToken.None);
await context.SaveChangesAsync();

var savedEntity = await context.ShoppingCarts.Where(e => e.Id == cartId).FirstOrDefaultAsync();
savedEntity.Should().BeNull();
}

[Fact]
public async Task SmokeTest()
{
var entity = new ShoppingCart { Id = Guid.NewGuid(), ProductCount = 2, ClientId = Guid.NewGuid() };
context.ShoppingCarts.Add(entity);
await context.SaveChangesAsync();

var savedEntity = await context.ShoppingCarts.FindAsync(entity.Id);
Assert.NotNull(savedEntity);
savedEntity.Should().NotBeNull();
savedEntity.Should().BeEquivalentTo(entity);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using Microsoft.EntityFrameworkCore;
using Core.Testing.Fixtures;
using Xunit;

namespace Core.EntityFramework.Tests.Fixtures;

public class EfCorePostgresContainerFixture<TContext>: IAsyncLifetime where TContext : DbContext
{
private readonly PostgresContainerFixture postgresContainerFixture = new();

public TContext DbContext { get; private set; } = default!;

public async Task InitializeAsync()
{
await postgresContainerFixture.InitializeAsync().ConfigureAwait(false);

var optionsBuilder = new DbContextOptionsBuilder<TContext>()
.UseNpgsql(postgresContainerFixture.DataSource);

DbContext = (TContext)Activator.CreateInstance(typeof(TContext), optionsBuilder.Options)!;

await DbContext.Database.MigrateAsync().ConfigureAwait(false);
}

public async Task DisposeAsync()
{
await DbContext.DisposeAsync().ConfigureAwait(false);
await postgresContainerFixture.DisposeAsync().ConfigureAwait(false);
}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using System;
using Microsoft.EntityFrameworkCore.Migrations;

#nullable disable

namespace Core.EntityFramework.Tests.Migrations
{
/// <inheritdoc />
public partial class InitialCreate : Migration
{
/// <inheritdoc />
protected override void Up(MigrationBuilder migrationBuilder)
{
migrationBuilder.CreateTable(
name: "ShoppingCarts",
columns: table => new
{
Id = table.Column<Guid>(type: "uuid", nullable: false),
ClientId = table.Column<Guid>(type: "uuid", nullable: false),
ProductCount = table.Column<int>(type: "integer", nullable: false)
},
constraints: table =>
{
table.PrimaryKey("PK_ShoppingCarts", x => x.Id);
});
}

/// <inheritdoc />
protected override void Down(MigrationBuilder migrationBuilder)
{
migrationBuilder.DropTable(
name: "ShoppingCarts");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// <auto-generated />
using System;
using Core.EntityFramework.Tests;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata;

#nullable disable

namespace Core.EntityFramework.Tests.Migrations
{
[DbContext(typeof(TestDbContext))]
partial class TestDbContextModelSnapshot : ModelSnapshot
{
protected override void BuildModel(ModelBuilder modelBuilder)
{
#pragma warning disable 612, 618
modelBuilder
.HasAnnotation("ProductVersion", "8.0.4")
.HasAnnotation("Relational:MaxIdentifierLength", 63);

NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder);

modelBuilder.Entity("Core.EntityFramework.Tests.ShoppingCart", b =>
{
b.Property<Guid>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("uuid");

b.Property<Guid>("ClientId")
.HasColumnType("uuid");

b.Property<int>("ProductCount")
.HasColumnType("integer");

b.HasKey("Id");

b.ToTable("ShoppingCarts");
});
#pragma warning restore 612, 618
}
}
}
51 changes: 51 additions & 0 deletions Core.EntityFramework.Tests/TestDbContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Design;
using Microsoft.Extensions.Configuration;

namespace Core.EntityFramework.Tests;

public class ShoppingCart
{
public Guid Id { get; set; }
public Guid ClientId { get; set; }
public int ProductCount { get; set; }
}

public class TestDbContext(DbContextOptions<TestDbContext> options): DbContext(options)
{
public DbSet<ShoppingCart> ShoppingCarts { get; set; } = default!;

protected override void OnModelCreating(ModelBuilder modelBuilder) =>
modelBuilder.Entity<ShoppingCart>();
}

public class TestDbContextFactory: IDesignTimeDbContextFactory<TestDbContext>
{
public TestDbContext CreateDbContext(params string[] args)
{
var optionsBuilder = new DbContextOptionsBuilder<TestDbContext>();

if (optionsBuilder.IsConfigured)
return new TestDbContext(optionsBuilder.Options);

//Called by parameterless ctor Usually Migrations
var environmentName = Environment.GetEnvironmentVariable("EnvironmentName") ?? "Development";

var connectionString =
new ConfigurationBuilder()
.SetBasePath(AppContext.BaseDirectory)
.AddJsonFile("appsettings.json")
.AddJsonFile($"appsettings.{environmentName}.json", optional: true, reloadOnChange: false)
.AddEnvironmentVariables()
.Build()
.GetConnectionString("TestDb");

optionsBuilder.UseNpgsql(connectionString);

return new TestDbContext(optionsBuilder.Options);
}

public static TestDbContext Create()
=> new TestDbContextFactory().CreateDbContext();
}

Loading
Loading