From 9f869cacb7ce361b34ee2f6f560c8724e00815e8 Mon Sep 17 00:00:00 2001 From: Oskar Dudycz Date: Sun, 30 Jul 2023 16:17:57 +0200 Subject: [PATCH] Added full distribution of projections --- .../Helpdesk.Api/Helpdesk.Api.csproj | 1 - .../Core/Kafka/KafkaProducer.cs | 60 +++++++++++++++++++ .../Helpdesk.Kafka/Helpdesk.Kafka.csproj | 19 ++++++ .../Helpdesk.Kafka/Program.cs | 39 ++++++++++++ .../Properties/launchSettings.json | 14 +++++ .../appsettings.Development.json | 8 +++ .../appsettings.Production.json | 18 ++++++ .../Helpdesk.Kafka/appsettings.json | 21 +++++++ .../Helpdesk.Projections.csproj | 18 ++++++ .../Helpdesk.Projections/Program.cs | 30 ++++++++++ .../Properties/launchSettings.json | 14 +++++ .../appsettings.Development.json | 8 +++ .../appsettings.Production.json | 18 ++++++ .../Helpdesk.Projections/appsettings.json | 21 +++++++ .../Helpdesk.SignalR/.dockerignore | 6 -- .../Helpdesk.ScaledUpProjections/Helpdesk.sln | 12 ++++ 16 files changed, 300 insertions(+), 7 deletions(-) create mode 100644 Sample/Helpdesk.ScaledUpProjections/Helpdesk.Kafka/Core/Kafka/KafkaProducer.cs create mode 100644 Sample/Helpdesk.ScaledUpProjections/Helpdesk.Kafka/Helpdesk.Kafka.csproj create mode 100644 Sample/Helpdesk.ScaledUpProjections/Helpdesk.Kafka/Program.cs create mode 100644 Sample/Helpdesk.ScaledUpProjections/Helpdesk.Kafka/Properties/launchSettings.json create mode 100644 Sample/Helpdesk.ScaledUpProjections/Helpdesk.Kafka/appsettings.Development.json create mode 100644 Sample/Helpdesk.ScaledUpProjections/Helpdesk.Kafka/appsettings.Production.json create mode 100644 Sample/Helpdesk.ScaledUpProjections/Helpdesk.Kafka/appsettings.json create mode 100644 Sample/Helpdesk.ScaledUpProjections/Helpdesk.Projections/Helpdesk.Projections.csproj create mode 100644 Sample/Helpdesk.ScaledUpProjections/Helpdesk.Projections/Program.cs create mode 100644 Sample/Helpdesk.ScaledUpProjections/Helpdesk.Projections/Properties/launchSettings.json create mode 100644 Sample/Helpdesk.ScaledUpProjections/Helpdesk.Projections/appsettings.Development.json create mode 100644 Sample/Helpdesk.ScaledUpProjections/Helpdesk.Projections/appsettings.Production.json create mode 100644 Sample/Helpdesk.ScaledUpProjections/Helpdesk.Projections/appsettings.json delete mode 100644 Sample/Helpdesk.ScaledUpProjections/Helpdesk.SignalR/.dockerignore diff --git a/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Api/Helpdesk.Api.csproj b/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Api/Helpdesk.Api.csproj index 85c939084..b5ba5dd68 100644 --- a/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Api/Helpdesk.Api.csproj +++ b/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Api/Helpdesk.Api.csproj @@ -12,7 +12,6 @@ - diff --git a/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Kafka/Core/Kafka/KafkaProducer.cs b/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Kafka/Core/Kafka/KafkaProducer.cs new file mode 100644 index 000000000..d4c28773e --- /dev/null +++ b/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Kafka/Core/Kafka/KafkaProducer.cs @@ -0,0 +1,60 @@ +using System.Text.Json; +using Confluent.Kafka; +using Marten; +using Marten.Events; +using Marten.Events.Projections; + +namespace Helpdesk.Api.Core.Kafka; + +public class KafkaProducer: IProjection +{ + private const string DefaultConfigKey = "KafkaProducer"; + + private readonly KafkaProducerConfig config; + + public KafkaProducer(IConfiguration configuration) + { + config = configuration.GetRequiredSection(DefaultConfigKey).Get() ?? + throw new InvalidOperationException(); + } + + public async Task ApplyAsync(IDocumentOperations operations, IReadOnlyList streamsActions, + CancellationToken ct) + { + foreach (var @event in streamsActions.SelectMany(streamAction => streamAction.Events)) + { + await Publish(@event.Data, ct); + } + } + + public void Apply(IDocumentOperations operations, IReadOnlyList streams) => + throw new NotImplementedException("Producer should be only used in the AsyncDaemon"); + + private async Task Publish(object @event, CancellationToken ct) + { + try + { + using var producer = new ProducerBuilder(config.ProducerConfig).Build(); + + await producer.ProduceAsync(config.Topic, + new Message + { + // store event type name in message Key + Key = @event.GetType().Name, + // serialize event to message Value + Value = JsonSerializer.Serialize(@event) + }, ct).ConfigureAwait(false); + } + catch (Exception exc) + { + Console.WriteLine(exc.Message); + throw; + } + } +} + +public class KafkaProducerConfig +{ + public ProducerConfig? ProducerConfig { get; set; } + public string? Topic { get; set; } +} diff --git a/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Kafka/Helpdesk.Kafka.csproj b/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Kafka/Helpdesk.Kafka.csproj new file mode 100644 index 000000000..da467636c --- /dev/null +++ b/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Kafka/Helpdesk.Kafka.csproj @@ -0,0 +1,19 @@ + + + + net7.0 + enable + true + enable + + + + + + + + + + + + diff --git a/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Kafka/Program.cs b/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Kafka/Program.cs new file mode 100644 index 000000000..80e81feb3 --- /dev/null +++ b/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Kafka/Program.cs @@ -0,0 +1,39 @@ +using System.Text.Json.Serialization; +using Helpdesk; +using Helpdesk.Api.Core.Kafka; +using Marten.Events.Projections; +using Microsoft.AspNetCore.Http.Json; +using Microsoft.AspNetCore.SignalR; +using Oakton; + +var builder = WebApplication.CreateBuilder(args); + +builder.Services + .AddMartenForHelpdeskAsyncOnly( + builder.Configuration, + (options, _) => + options.Projections.Add(new KafkaProducer(builder.Configuration), ProjectionLifecycle.Async) + ); + +builder.Services + .Configure(o => o.SerializerOptions.Converters.Add(new JsonStringEnumConverter())) + .Configure(o => + o.JsonSerializerOptions.Converters.Add(new JsonStringEnumConverter())) + .AddSignalR(); + +builder.Host.ApplyOaktonExtensions(); + +var app = builder.Build(); + +app.UseCors("ClientPermission"); +app.MapHub("/hubs/incidents"); + +return await app.RunOaktonCommands(args); + +public class IncidentsHub: Hub +{ +} + +public partial class Program +{ +} diff --git a/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Kafka/Properties/launchSettings.json b/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Kafka/Properties/launchSettings.json new file mode 100644 index 000000000..c3fcd1d6a --- /dev/null +++ b/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Kafka/Properties/launchSettings.json @@ -0,0 +1,14 @@ +{ + "profiles": { + "Helpdesk.Kafka": { + "commandName": "Project", + "dotnetRunMessages": true, + "launchBrowser": true, + "launchUrl": "swagger/index.html", + "applicationUrl": "http://localhost:5250", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + } + } + } +} diff --git a/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Kafka/appsettings.Development.json b/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Kafka/appsettings.Development.json new file mode 100644 index 000000000..a34cd70c5 --- /dev/null +++ b/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Kafka/appsettings.Development.json @@ -0,0 +1,8 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + } +} diff --git a/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Kafka/appsettings.Production.json b/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Kafka/appsettings.Production.json new file mode 100644 index 000000000..549051387 --- /dev/null +++ b/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Kafka/appsettings.Production.json @@ -0,0 +1,18 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + }, + "AllowedHosts": "*", + "ConnectionStrings": { + "Incidents": "PORT = 5432; HOST = postgres; TIMEOUT = 15; POOLING = True; MINPOOLSIZE = 1; MAXPOOLSIZE = 100; COMMANDTIMEOUT = 20; DATABASE = 'postgres'; PASSWORD = 'Password12!'; USER ID = 'postgres'" + }, + "KafkaProducer": { + "ProducerConfig": { + "BootstrapServers": "kafka:9092" + }, + "Topic": "Incidents" + } +} diff --git a/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Kafka/appsettings.json b/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Kafka/appsettings.json new file mode 100644 index 000000000..5d37f99aa --- /dev/null +++ b/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Kafka/appsettings.json @@ -0,0 +1,21 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + }, + "AllowedHosts": "*", + "ConnectionStrings": { + "Incidents": "PORT = 5432; HOST = localhost; TIMEOUT = 15; POOLING = True; MINPOOLSIZE = 1; MAXPOOLSIZE = 100; COMMANDTIMEOUT = 20; DATABASE = 'postgres'; PASSWORD = 'Password12!'; USER ID = 'postgres'" + }, + "KafkaProducer": { + "ProducerConfig": { + "BootstrapServers": "localhost:9092" + }, + "Topic": "Incidents" + }, + "Helpdesk": { + "DaemonLockId": 7777 + } +} diff --git a/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Projections/Helpdesk.Projections.csproj b/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Projections/Helpdesk.Projections.csproj new file mode 100644 index 000000000..7ab07c0b7 --- /dev/null +++ b/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Projections/Helpdesk.Projections.csproj @@ -0,0 +1,18 @@ + + + + net7.0 + enable + true + enable + + + + + + + + + + + diff --git a/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Projections/Program.cs b/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Projections/Program.cs new file mode 100644 index 000000000..9051db101 --- /dev/null +++ b/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Projections/Program.cs @@ -0,0 +1,30 @@ +using System.Text.Json.Serialization; +using Helpdesk; +using Helpdesk.Incidents.GetCustomerIncidentsSummary; +using Marten.Events.Projections; +using Microsoft.AspNetCore.Http.Json; +using Oakton; + +var builder = WebApplication.CreateBuilder(args); + +builder.Services + .AddMartenForHelpdeskAsyncOnly( + builder.Configuration, + (options, _) => + options.Projections.Add(ProjectionLifecycle.Async) + ); + +builder.Services + .Configure(o => o.SerializerOptions.Converters.Add(new JsonStringEnumConverter())) + .Configure(o => + o.JsonSerializerOptions.Converters.Add(new JsonStringEnumConverter())); + +builder.Host.ApplyOaktonExtensions(); + +var app = builder.Build(); + +return await app.RunOaktonCommands(args); + +public partial class Program +{ +} diff --git a/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Projections/Properties/launchSettings.json b/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Projections/Properties/launchSettings.json new file mode 100644 index 000000000..8d7814d36 --- /dev/null +++ b/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Projections/Properties/launchSettings.json @@ -0,0 +1,14 @@ +{ + "profiles": { + "Helpdesk.Projections": { + "commandName": "Project", + "dotnetRunMessages": true, + "launchBrowser": true, + "launchUrl": "swagger/index.html", + "applicationUrl": "http://localhost:5251", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + } + } + } +} diff --git a/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Projections/appsettings.Development.json b/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Projections/appsettings.Development.json new file mode 100644 index 000000000..a34cd70c5 --- /dev/null +++ b/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Projections/appsettings.Development.json @@ -0,0 +1,8 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + } +} diff --git a/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Projections/appsettings.Production.json b/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Projections/appsettings.Production.json new file mode 100644 index 000000000..549051387 --- /dev/null +++ b/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Projections/appsettings.Production.json @@ -0,0 +1,18 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + }, + "AllowedHosts": "*", + "ConnectionStrings": { + "Incidents": "PORT = 5432; HOST = postgres; TIMEOUT = 15; POOLING = True; MINPOOLSIZE = 1; MAXPOOLSIZE = 100; COMMANDTIMEOUT = 20; DATABASE = 'postgres'; PASSWORD = 'Password12!'; USER ID = 'postgres'" + }, + "KafkaProducer": { + "ProducerConfig": { + "BootstrapServers": "kafka:9092" + }, + "Topic": "Incidents" + } +} diff --git a/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Projections/appsettings.json b/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Projections/appsettings.json new file mode 100644 index 000000000..c79746083 --- /dev/null +++ b/Sample/Helpdesk.ScaledUpProjections/Helpdesk.Projections/appsettings.json @@ -0,0 +1,21 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + }, + "AllowedHosts": "*", + "ConnectionStrings": { + "Incidents": "PORT = 5432; HOST = localhost; TIMEOUT = 15; POOLING = True; MINPOOLSIZE = 1; MAXPOOLSIZE = 100; COMMANDTIMEOUT = 20; DATABASE = 'postgres'; PASSWORD = 'Password12!'; USER ID = 'postgres'" + }, + "KafkaProducer": { + "ProducerConfig": { + "BootstrapServers": "localhost:9092" + }, + "Topic": "Incidents" + }, + "Helpdesk": { + "DaemonLockId": 5555 + } +} diff --git a/Sample/Helpdesk.ScaledUpProjections/Helpdesk.SignalR/.dockerignore b/Sample/Helpdesk.ScaledUpProjections/Helpdesk.SignalR/.dockerignore deleted file mode 100644 index c8a50845c..000000000 --- a/Sample/Helpdesk.ScaledUpProjections/Helpdesk.SignalR/.dockerignore +++ /dev/null @@ -1,6 +0,0 @@ -**/bin/ -**/obj/ -**/out/ -**/TestResults/ -**/Internal/Generated -Dockerfile diff --git a/Sample/Helpdesk.ScaledUpProjections/Helpdesk.sln b/Sample/Helpdesk.ScaledUpProjections/Helpdesk.sln index 66afee702..21c555927 100644 --- a/Sample/Helpdesk.ScaledUpProjections/Helpdesk.sln +++ b/Sample/Helpdesk.ScaledUpProjections/Helpdesk.sln @@ -19,6 +19,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Helpdesk", "Helpdesk\Helpde EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Helpdesk.SignalR", "Helpdesk.SignalR\Helpdesk.SignalR.csproj", "{FA50346F-88F5-47B9-8047-3A668C4826CF}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Helpdesk.Projections", "Helpdesk.Projections\Helpdesk.Projections.csproj", "{52267B33-6A66-402A-BDBB-8F7ED4DF6FA8}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Helpdesk.Kafka", "Helpdesk.Kafka\Helpdesk.Kafka.csproj", "{6084EFF8-7E51-4F44-B1ED-AA5FEA000CAB}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -41,6 +45,14 @@ Global {FA50346F-88F5-47B9-8047-3A668C4826CF}.Debug|Any CPU.Build.0 = Debug|Any CPU {FA50346F-88F5-47B9-8047-3A668C4826CF}.Release|Any CPU.ActiveCfg = Release|Any CPU {FA50346F-88F5-47B9-8047-3A668C4826CF}.Release|Any CPU.Build.0 = Release|Any CPU + {52267B33-6A66-402A-BDBB-8F7ED4DF6FA8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {52267B33-6A66-402A-BDBB-8F7ED4DF6FA8}.Debug|Any CPU.Build.0 = Debug|Any CPU + {52267B33-6A66-402A-BDBB-8F7ED4DF6FA8}.Release|Any CPU.ActiveCfg = Release|Any CPU + {52267B33-6A66-402A-BDBB-8F7ED4DF6FA8}.Release|Any CPU.Build.0 = Release|Any CPU + {6084EFF8-7E51-4F44-B1ED-AA5FEA000CAB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {6084EFF8-7E51-4F44-B1ED-AA5FEA000CAB}.Debug|Any CPU.Build.0 = Debug|Any CPU + {6084EFF8-7E51-4F44-B1ED-AA5FEA000CAB}.Release|Any CPU.ActiveCfg = Release|Any CPU + {6084EFF8-7E51-4F44-B1ED-AA5FEA000CAB}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(NestedProjects) = preSolution EndGlobalSection