Skip to content

Commit

Permalink
- breaking change (notifications extension): notification buffers, pi…
Browse files Browse the repository at this point in the history
…peline and governors are now identified using arbitrary string names instead of GUIDs for more flexibility
  • Loading branch information
martinzima committed Jan 27, 2022
1 parent ff34a4e commit bbb9589
Show file tree
Hide file tree
Showing 22 changed files with 112 additions and 80 deletions.
2 changes: 1 addition & 1 deletion Common.props
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project>

<PropertyGroup>
<VersionPrefix>1.27.2</VersionPrefix>
<VersionPrefix>1.28.0</VersionPrefix>
</PropertyGroup>

<PropertyGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,19 @@ public class AggregatingBufferGovernor : IBufferGovernor
{
private readonly TimeSpan minTimeDelay;

public AggregatingBufferGovernor(Guid id, TimeSpan minTimeDelay)
public AggregatingBufferGovernor(string name, TimeSpan minTimeDelay)
{
Id = id;
Name = name;
this.minTimeDelay = minTimeDelay;
}

public Guid Id { get; }
public string Name { get; }

public async Task<MultiValueDictionary<NotificationBuffer, BufferedNotification>> SelectNotificationsForReleaseAsync(IReadRepository readRepository)
{
DateTimeOffset maxDate = Clock.Current.Now.Subtract(minTimeDelay);
var notifications = readRepository.FindAll<BufferedNotification>()
.Where(x => x.TimeQueued <= maxDate && x.Buffer.GovernorId == Id);
.Where(x => x.TimeQueued <= maxDate && x.Buffer.GovernorName == Name);
var buffers = await notifications
.Include(readRepository, x => x.Buffer.Notifications)
.Select(x => x.Buffer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ public Task CommitAsync()
return crudRepository.SaveChangesAsync();
}

public async Task Add(SerializedNotification serializedNotification, Guid bufferId,
DateTimeOffset timeQueued, Guid bufferGovernorId, Guid notificationPipelineId)
public async Task Add(SerializedNotification serializedNotification, string bufferName,
DateTimeOffset timeQueued, string bufferGovernorName, string notificationPipelineName)
{
NotificationBuffer buffer = await crudRepository.FindAsync<NotificationBuffer>(bufferId);
NotificationBuffer buffer = await crudRepository.FirstOrDefaultAsync<NotificationBuffer>(x => x.Name == bufferName);
if (buffer == null)
{
buffer = new NotificationBuffer(bufferId, bufferGovernorId, notificationPipelineId);
buffer = new NotificationBuffer(Guid.NewGuid(), bufferName, bufferGovernorName, notificationPipelineName);
crudRepository.Add(buffer); // TODO race condition - use AddOrUpdate instead
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ private async Task AddNotification(INotification notification)

SerializedNotification serialized = notificationSerializer.ToJson(notification);

Guid bufferId = await bufferSelector.SelectBufferIdAsync(tNotification);
var bufferId = await bufferSelector.SelectBufferIdAsync(tNotification);

await bufferedNotificationStore.Add(serialized, bufferId, Clock.Current.Now,
bufferGovernor.Id, notificationPipeline.Id);
bufferGovernor.Name, notificationPipeline.Name);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Revo.Extensions.Notifications.Channels.Buffering
{
public interface IBufferGovernor
{
Guid Id { get; }
string Name { get; }
//Task AddNotificationAsync(INotification notification);
Task<MultiValueDictionary<NotificationBuffer, BufferedNotification>> SelectNotificationsForReleaseAsync(IReadRepository readRepository);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ namespace Revo.Extensions.Notifications.Channels.Buffering
{
public interface IBufferSelector<in T> where T : INotification
{
Task<Guid> SelectBufferIdAsync(T notification);
Task<string> SelectBufferIdAsync(T notification);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ namespace Revo.Extensions.Notifications.Channels.Buffering
{
public interface IBufferedNotificationStore
{
Task Add(SerializedNotification serializedNotification, Guid bufferId,
DateTimeOffset timeQueued, Guid bufferGovernorId, Guid notificationPipelineId);
Task Add(SerializedNotification serializedNotification, string bufferName,
DateTimeOffset timeQueued, string bufferGovernorName, string notificationPipelineName);

Task CommitAsync();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
using System;
using System.Collections.Generic;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace Revo.Extensions.Notifications
namespace Revo.Extensions.Notifications.Channels.Buffering
{
public interface INotificationPipeline
{
Guid Id { get; }
string Name { get; }
Task ProcessNotificationsAsync(IReadOnlyCollection<INotification> notifications);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ public class NotificationPipeline : INotificationPipeline
{
private readonly IBufferedNotificationChannel[] bufferedNotificationChannels;

public NotificationPipeline(Guid id,
public NotificationPipeline(string name,
IBufferedNotificationChannel[] bufferedNotificationChannels)
{
Id = id;
Name = name;
this.bufferedNotificationChannels = bufferedNotificationChannels;
}

public Guid Id { get; }
public string Name { get; }

public async Task ProcessNotificationsAsync(IReadOnlyCollection<INotification> notifications)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ public async Task HandleAsync(ProcessBufferedNotificationsJob job, CancellationT
{
MultiValueDictionary<NotificationBuffer, BufferedNotification> toRelease = await bufferGovernor
.SelectNotificationsForReleaseAsync(crudRepository);
var byPipeline = toRelease.GroupBy(x => x.Key.PipelineId);
var byPipeline = toRelease.GroupBy(x => x.Key.PipelineName);

foreach (var pipelineNotifications in byPipeline)
{
INotificationPipeline pipeline =
notificationPipelines.FirstOrDefault(x => x.Id == pipelineNotifications.Key);
notificationPipelines.FirstOrDefault(x => x.Name == pipelineNotifications.Key);
if (pipeline == null)
{
throw new InvalidOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,22 @@ namespace Revo.Extensions.Notifications.Model
[TablePrefix(NamespacePrefix = "RNO", ColumnPrefix = "NBF")]
public class NotificationBuffer : ReadModelBase
{
public NotificationBuffer(Guid id, Guid governorId, Guid pipelineId)
public NotificationBuffer(Guid id, string name, string governorName, string pipelineName)
{
Id = id;
GovernorId = governorId;
PipelineId = pipelineId;
Name = name;
GovernorName = governorName;
PipelineName = pipelineName;
}

protected NotificationBuffer()
{
}

public Guid Id { get; private set; }
public Guid PipelineId { get; private set; }
public Guid GovernorId { get; set; }
public string Name { get; private set; }
public string PipelineName { get; private set; }
public string GovernorName { get; set; }
public List<BufferedNotification> Notifications { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,19 @@
<Description>Event Sourcing, CQRS and DDD framework for modern C#/.NET applications.
Configurable user notifications extension package.</Description>
</PropertyGroup>


<ItemGroup>
<None Remove="Sql\rno_2_pgsql.sql" />
<None Remove="Sql\rno_baseline_mssql.sql" />
<None Remove="Sql\rno_baseline_pgsql.sql" />
<None Remove="Sql\rno_baseline_sqlite.sql" />
</ItemGroup>

<ItemGroup>
<EmbeddedResource Include="Sql\rno_baseline_1_mssql.sql" />
<EmbeddedResource Include="Sql\rno_baseline_1_pgsql.sql" />
<EmbeddedResource Include="Sql\rno_baseline_1_sqlite.sql" />
<EmbeddedResource Include="Sql\rno_2_pgsql.sql" />
<EmbeddedResource Include="Sql\rno_baseline_mssql.sql" />
<EmbeddedResource Include="Sql\rno_baseline_pgsql.sql" />
<EmbeddedResource Include="Sql\rno_baseline_sqlite.sql" />
</ItemGroup>

<ItemGroup>
Expand Down
17 changes: 17 additions & 0 deletions Extensions/Revo.Extensions.Notifications/Sql/rno_2_pgsql.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
ALTER TABLE rno_notification_buffer
ADD COLUMN rno_nbf_name text NOT NULL DEFAULT '',
ALTER COLUMN rno_nbf_pipeline_id TYPE text,
ALTER COLUMN rno_nbf_governor_id TYPE text;

UPDATE rno_notification_buffer SET rno_nbf_name = rno_nbf_notification_buffer_id::text;

ALTER TABLE rno_notification_buffer
ALTER COLUMN rno_nbf_name DROP DEFAULT;
ALTER TABLE rno_notification_buffer
ADD UNIQUE(rno_nbf_name);

ALTER TABLE rno_notification_buffer
RENAME COLUMN rno_nbf_pipeline_id TO rno_nbf_pipeline_name;

ALTER TABLE rno_notification_buffer
RENAME COLUMN rno_nbf_governor_id TO rno_nbf_governor_name;
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
-- Revo.Extensions.Notifications SQL baseline schema for common providers (EF Core, EF6)
-- MSSQL version
-- version: 2

-- NOTIFICATIONS

CREATE TABLE [dbo].[RNO_NOTIFICATION_BUFFER] (
[RNO_NBF_NotificationBufferId] [UNIQUEIDENTIFIER] NOT NULL,
[RNO_NBF_Ordinal] [INT] IDENTITY(1,1) NOT NULL,
[RNO_NBF_PipelineId] [UNIQUEIDENTIFIER] NOT NULL,
[RNO_NBF_GovernorId] [UNIQUEIDENTIFIER] NOT NULL
[RNO_NBF_Name] [NVARCHAR] (256) NOT NULL UNIQUE,
[RNO_NBF_PipelineName] [NVARCHAR] (MAX) NOT NULL,
[RNO_NBF_GovernorName] [NVARCHAR] (MAX) NOT NULL
CONSTRAINT [RNO_NOTIFICATION_BUFFER_PK] PRIMARY KEY NONCLUSTERED ([RNO_NBF_NotificationBufferId])
);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
-- Revo.Extensions.Notifications SQL baseline schema for common providers (EF Core, EF6)
-- PgSQL version
-- version: 2

-- NOTIFICATIONS

CREATE TABLE rno_notification_buffer (
rno_nbf_notification_buffer_id uuid PRIMARY KEY,
rno_nbf_pipeline_id uuid NOT NULL,
rno_nbf_governor_id uuid NOT NULL
rno_nbf_name text NOT NULL UNIQUE,
rno_nbf_pipeline_name text NOT NULL,
rno_nbf_governor_name text NOT NULL
);

CREATE TABLE rno_buffered_notification (
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
-- Revo.Extensions.Notifications SQL baseline schema for common providers (EF Core, EF6)
-- SQLite version
-- version: 2

-- NOTIFICATIONS

CREATE TABLE rno_notification_buffer (
rno_nbf_notification_buffer_id uuid PRIMARY KEY,
rno_nbf_pipeline_id uuid NOT NULL,
rno_nbf_governor_id uuid NOT NULL
rno_nbf_name text NOT NULL UNIQUE,
rno_nbf_pipeline_name text NOT NULL,
rno_nbf_governor_name text NOT NULL
);

CREATE TABLE rno_buffered_notification (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,19 @@ namespace Revo.Extensions.Notifications.Tests.Channels.Bufferring
public class AggregatingBufferGovernorTests
{
private readonly AggregatingBufferGovernor sut;
private readonly Guid governorId = Guid.NewGuid();
private readonly string governorName = "governor1";
private readonly InMemoryCrudRepository inMemoryCrudRepository;

public AggregatingBufferGovernorTests()
{
sut = new AggregatingBufferGovernor(governorId, TimeSpan.FromMinutes(5));
sut = new AggregatingBufferGovernor(governorName, TimeSpan.FromMinutes(5));
inMemoryCrudRepository = new InMemoryCrudRepository();
}

[Fact]
public void Id_ReturnsCorrectValue()
{
Assert.Equal(governorId, sut.Id);
Assert.Equal(governorName, sut.Name);
}

[Fact]
Expand All @@ -34,7 +34,7 @@ public async Task SelectNotificationsForReleaseAsync_ReturnsAllExpiredNotificati
FakeClock.Setup();
FakeClock.Now = DateTime.Now;

NotificationBuffer buffer1 = new NotificationBuffer(Guid.NewGuid(), governorId, Guid.NewGuid());
NotificationBuffer buffer1 = new NotificationBuffer(Guid.NewGuid(), "buffer1", governorName, "pipeline1");
inMemoryCrudRepository.Attach(buffer1);
BufferedNotification notification1 = new BufferedNotification(Guid.NewGuid(), "Notification1", "{}",
buffer1, FakeClock.Now.Subtract(TimeSpan.FromMinutes(6)));
Expand All @@ -44,14 +44,14 @@ public async Task SelectNotificationsForReleaseAsync_ReturnsAllExpiredNotificati
inMemoryCrudRepository.Attach(notification2);
buffer1.Notifications = new List<BufferedNotification>() { notification1, notification2 };

NotificationBuffer buffer2 = new NotificationBuffer(Guid.NewGuid(), governorId, Guid.NewGuid());
NotificationBuffer buffer2 = new NotificationBuffer(Guid.NewGuid(), "buffer2", governorName, "pipeline2");
inMemoryCrudRepository.Attach(buffer2);
BufferedNotification notification3 = new BufferedNotification(Guid.NewGuid(), "Notification3", "{}",
buffer2, FakeClock.Now.Subtract(TimeSpan.FromMinutes(8)));
inMemoryCrudRepository.Attach(notification3);
buffer2.Notifications = new List<BufferedNotification>() { notification3 };

NotificationBuffer buffer3 = new NotificationBuffer(Guid.NewGuid(), governorId, Guid.NewGuid());
NotificationBuffer buffer3 = new NotificationBuffer(Guid.NewGuid(), "buffer3", governorName, "pipeline3");
inMemoryCrudRepository.Attach(buffer3);
BufferedNotification notification4 = new BufferedNotification(Guid.NewGuid(), "Notification3", "{}",
buffer3, FakeClock.Now.Subtract(TimeSpan.FromMinutes(1)));
Expand All @@ -74,14 +74,14 @@ public async Task SelectNotificationsForReleaseAsync_DoesntSelectFromOtherGovern
FakeClock.Setup();
FakeClock.Now = DateTime.Now;

NotificationBuffer buffer1 = new NotificationBuffer(Guid.NewGuid(), governorId, Guid.NewGuid());
NotificationBuffer buffer1 = new NotificationBuffer(Guid.NewGuid(), "buffer1", governorName, "pipeline1");
inMemoryCrudRepository.Attach(buffer1);
BufferedNotification notification1 = new BufferedNotification(Guid.NewGuid(), "Notification1", "{}",
buffer1, FakeClock.Now.Subtract(TimeSpan.FromMinutes(6)));
inMemoryCrudRepository.Attach(notification1);
buffer1.Notifications = new List<BufferedNotification>() { notification1 };

NotificationBuffer buffer2 = new NotificationBuffer(Guid.NewGuid(), Guid.NewGuid(), Guid.NewGuid());
NotificationBuffer buffer2 = new NotificationBuffer(Guid.NewGuid(), "buffer2", "governor2", "pipeline1");
inMemoryCrudRepository.Attach(buffer2);
BufferedNotification notification3 = new BufferedNotification(Guid.NewGuid(), "Notification3", "{}",
buffer2, FakeClock.Now.Subtract(TimeSpan.FromMinutes(8)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ public BufferedNotificationStoreTests()
[Fact]
public async Task Add_AddsToRepository()
{
NotificationBuffer buffer = new NotificationBuffer(Guid.NewGuid(), Guid.NewGuid(),
Guid.NewGuid());
NotificationBuffer buffer = new NotificationBuffer(Guid.NewGuid(), "buffer1",
"governor1", "pipeline1");
inMemoryCrudRepository.Attach(buffer);

DateTimeOffset now = DateTimeOffset.Now;

await sut.Add(new SerializedNotification() { NotificationClassName = "Notification1", NotificationJson = "{}" },
buffer.Id, now, Guid.NewGuid(), Guid.NewGuid());
buffer.Name, now, "governor1", "pipeline1");

Assert.Equal(1, inMemoryCrudRepository.FindAllWithAdded<BufferedNotification>().Count());
Assert.Single(inMemoryCrudRepository.FindAllWithAdded<BufferedNotification>(),
Expand All @@ -45,18 +45,17 @@ public async Task Add_AddsToRepository()
[Fact]
public async Task SendNotificationAsync_CreatesNewBuffer()
{
Guid bufferId = Guid.NewGuid();
Guid bufferGovernorId = Guid.NewGuid();
Guid notificationPipelineId = Guid.NewGuid();

await sut.Add(new SerializedNotification() { NotificationClassName = "Notification1", NotificationJson = "{}" },
bufferId, DateTime.Now, bufferGovernorId, notificationPipelineId);
"buffer1", DateTime.Now, "governor1", "pipeline1");

Assert.Equal(1, inMemoryCrudRepository.FindAllWithAdded<NotificationBuffer>().Count());
Assert.Single(inMemoryCrudRepository.FindAllWithAdded<NotificationBuffer>(),
x => x.Id == bufferId
&& x.GovernorId == bufferGovernorId
&& x.PipelineId == notificationPipelineId);
x => x.Name == "buffer1"
&& x.GovernorName == "governor1"
&& x.PipelineName == "pipeline1");
Assert.Equal(1, inMemoryCrudRepository.FindAllWithAdded<BufferedNotification>().Count());
Assert.Single(inMemoryCrudRepository.FindAllWithAdded<BufferedNotification>(),
x => x.Buffer == inMemoryCrudRepository.FindAllWithAdded<NotificationBuffer>().First());
Expand All @@ -66,7 +65,7 @@ public async Task SendNotificationAsync_CreatesNewBuffer()
public async Task OnTransactionSucceededAsync_SavesRepository()
{
sut.Add(new SerializedNotification() { NotificationClassName = "Notification1", NotificationJson = "{}" },
Guid.NewGuid(), DateTimeOffset.Now, Guid.NewGuid(), Guid.NewGuid());
"buffer1", DateTimeOffset.Now, "governor1", "pipeline1");
await sut.CommitAsync();

inMemoryCrudRepository.Received(1).SaveChangesAsync();
Expand Down

0 comments on commit bbb9589

Please sign in to comment.