Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added Kafka transport library #31

Merged
merged 38 commits into from
Mar 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
ebeff6c
fixed namespace
mizrael Mar 22, 2021
9a86805
bumped package version
mizrael Mar 22, 2021
c552f1c
bumped package version
mizrael Mar 22, 2021
7d8ac5f
fixed namespace
mizrael Mar 22, 2021
93a6530
minor fixes
mizrael Mar 22, 2021
93e1484
added kafka driver, first draft
mizrael Mar 22, 2021
97d4246
added kafka images
mizrael Mar 22, 2021
610411c
updated subscriber
mizrael Mar 23, 2021
e222738
added kafka config to sonarscan job
mizrael Mar 23, 2021
a312ffe
addressed sonarcloud issues
mizrael Mar 23, 2021
dafb2cd
added more tests
mizrael Mar 23, 2021
55e623b
moved classes
mizrael Mar 23, 2021
cb9af6a
excluded coverage results
mizrael Mar 23, 2021
877eecb
minor refactoring, added more tests
mizrael Mar 23, 2021
ea512f3
moved class
mizrael Mar 23, 2021
f4726e4
added more tests
mizrael Mar 23, 2021
f2ccc75
added more tests
mizrael Mar 23, 2021
ae1dcb4
added logging
mizrael Mar 23, 2021
f7f45ec
refactoring
mizrael Mar 23, 2021
ec9439b
updated tests
mizrael Mar 24, 2021
72af217
fixed project version
mizrael Mar 24, 2021
0dc029f
addressed sonarcloud issues
mizrael Mar 24, 2021
7997fbd
bumped package versions
mizrael Mar 24, 2021
4550a5e
chores
mizrael Mar 24, 2021
750e0cd
chores
mizrael Mar 24, 2021
9304e38
refactoring
mizrael Mar 24, 2021
19f04bb
fixed visibility
mizrael Mar 24, 2021
5db3c44
updated cancellationToken usage
mizrael Mar 24, 2021
f0f974e
updated timeout handling
mizrael Mar 24, 2021
7fbdfc7
added sleep
mizrael Mar 24, 2021
ab7aac5
added test
mizrael Mar 24, 2021
64396e6
removed unnecessary code
mizrael Mar 24, 2021
e8d3644
addressed sonarcloud issues
mizrael Mar 24, 2021
c84b502
updated cancellation policy
mizrael Mar 24, 2021
edb4435
updated cancellation policy
mizrael Mar 24, 2021
ad88851
added disposed check
mizrael Mar 24, 2021
c0d0e55
refactoring
mizrael Mar 24, 2021
093e635
removed unused parameter
mizrael Mar 24, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,17 @@ jobs:
environment:
SA_PASSWORD: "Sup3r_p4ssword123"
ACCEPT_EULA: "Y"
- image: 'bitnami/zookeeper:3'
environment:
ALLOW_ANONYMOUS_LOGIN: yes
- image: 'bitnami/kafka:2'
environment:
KAFKA_LISTENERS: LISTENER_BOB://localhost:29092,LISTENER_FRED://localhost:9092
KAFKA_ADVERTISED_LISTENERS: LISTENER_BOB://localhost:29092,LISTENER_FRED://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_BOB:PLAINTEXT,LISTENER_FRED:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_BOB
KAFKA_CFG_ZOOKEEPER_CONNECT: localhost:2181
ALLOW_PLAINTEXT_LISTENER: yes

steps:
- checkout
Expand All @@ -38,6 +49,17 @@ jobs:
environment:
SA_PASSWORD: "Sup3r_p4ssword123"
ACCEPT_EULA: "Y"
- image: 'bitnami/zookeeper:3'
environment:
ALLOW_ANONYMOUS_LOGIN: yes
- image: 'bitnami/kafka:2'
environment:
KAFKA_LISTENERS: LISTENER_BOB://localhost:29092,LISTENER_FRED://localhost:9092
KAFKA_ADVERTISED_LISTENERS: LISTENER_BOB://localhost:29092,LISTENER_FRED://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_BOB:PLAINTEXT,LISTENER_FRED:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_BOB
KAFKA_CFG_ZOOKEEPER_CONNECT: localhost:2181
ALLOW_PLAINTEXT_LISTENER: yes

steps:
- checkout
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ obj/
*.DotSettings.user
.docker/
packages/
TestResults/
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

All notable changes to this project will be documented in this file.

## [2021-03-23](https://github.com/mizrael/OpenSleigh/pull/31)
### Added
- added Kafka transport library

## [2021-02-24](https://github.com/mizrael/OpenSleigh/pull/27)
### Added
- added support for compensating transactions
Expand Down
14 changes: 14 additions & 0 deletions OpenSleigh.sln
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenSleigh.Persistence.Cosm
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenSleigh.Persistence.Cosmos.SQL.Tests", "tests\OpenSleigh.Persistence.Cosmos.SQL.Tests\OpenSleigh.Persistence.Cosmos.SQL.Tests.csproj", "{08E996DB-3D0C-4A63-8166-BF61732D3B21}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OpenSleigh.Transport.Kafka", "src\OpenSleigh.Transport.Kafka\OpenSleigh.Transport.Kafka.csproj", "{887358C5-9EFF-4498-A04B-E12B199EC259}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OpenSleigh.Transport.Kafka.Tests", "tests\OpenSleigh.Transport.Kafka.Tests\OpenSleigh.Transport.Kafka.Tests.csproj", "{5B363808-664B-42F4-8C38-EEFB9F05C500}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -109,6 +113,14 @@ Global
{08E996DB-3D0C-4A63-8166-BF61732D3B21}.Debug|Any CPU.Build.0 = Debug|Any CPU
{08E996DB-3D0C-4A63-8166-BF61732D3B21}.Release|Any CPU.ActiveCfg = Release|Any CPU
{08E996DB-3D0C-4A63-8166-BF61732D3B21}.Release|Any CPU.Build.0 = Release|Any CPU
{887358C5-9EFF-4498-A04B-E12B199EC259}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{887358C5-9EFF-4498-A04B-E12B199EC259}.Debug|Any CPU.Build.0 = Debug|Any CPU
{887358C5-9EFF-4498-A04B-E12B199EC259}.Release|Any CPU.ActiveCfg = Release|Any CPU
{887358C5-9EFF-4498-A04B-E12B199EC259}.Release|Any CPU.Build.0 = Release|Any CPU
{5B363808-664B-42F4-8C38-EEFB9F05C500}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{5B363808-664B-42F4-8C38-EEFB9F05C500}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5B363808-664B-42F4-8C38-EEFB9F05C500}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5B363808-664B-42F4-8C38-EEFB9F05C500}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -128,6 +140,8 @@ Global
{13805E00-0578-4E71-AB8B-0AAFCCFD3551} = {5594CC89-F905-46B2-B938-27B8050D9CA3}
{E367B2EF-2111-4BBB-B37F-0263559E5FD2} = {5594CC89-F905-46B2-B938-27B8050D9CA3}
{08E996DB-3D0C-4A63-8166-BF61732D3B21} = {5594CC89-F905-46B2-B938-27B8050D9CA3}
{887358C5-9EFF-4498-A04B-E12B199EC259} = {86CDC8FD-5E6F-4F45-A073-9F2749192582}
{5B363808-664B-42F4-8C38-EEFB9F05C500} = {86CDC8FD-5E6F-4F45-A073-9F2749192582}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {D5297242-16B4-43D7-B329-362EBCE2A5A5}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using OpenSleigh.Core.Messaging;

namespace OpenSleigh.Core.BackgroundServices
{
Expand All @@ -21,8 +22,15 @@ public SubscribersBackgroundService(IEnumerable<ISubscriber> subscribers, System
public override async Task StartAsync(CancellationToken cancellationToken)
{
if (!_systemInfo.PublishOnly)
await Task.WhenAll(_subscribers.Select(s => s.StartAsync(cancellationToken)));

{
var tasks = _subscribers.Select(s => s.StartAsync(cancellationToken));

await Task.Factory.StartNew(() => Task.WhenAll(tasks),
cancellationToken,
TaskCreationOptions.LongRunning,
TaskScheduler.Current);
}

await base.StartAsync(cancellationToken);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
using System.Threading;
using System.Threading.Tasks;
using OpenSleigh.Core.Messaging;

namespace OpenSleigh.Core
namespace OpenSleigh.Core.Messaging
{
public interface ISubscriber
{
Expand Down
2 changes: 1 addition & 1 deletion src/OpenSleigh.Core/OpenSleigh.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<TargetFramework>net5.0</TargetFramework>
<PackageVersion>1.1.1</PackageVersion>
<PackageVersion>1.1.2</PackageVersion>
<IsPackable>true</IsPackable>
<Authors>davidguida</Authors>
<Product>OpenSleigh</Product>
Expand Down
29 changes: 13 additions & 16 deletions src/OpenSleigh.Persistence.Cosmos.SQL/Entities/OutboxMessage.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
using System;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata.Builders;

namespace OpenSleigh.Persistence.Cosmos.SQL.Entities
{
public record OutboxMessage(Guid Id, byte[] Data, string Type)
public class OutboxMessage
{
private OutboxMessage()
{
}

public Guid Id { get; init; }
public byte[] Data { get; init; }
public string Type { get; init; }
public string PartitionKey { get; init; }

public MessageStatuses Status { get; private set; }
public Guid? LockId { get; private set; }
public DateTime? LockTime { get; private set; }
Expand All @@ -26,8 +32,11 @@ public void Release()
this.Status = OutboxMessage.MessageStatuses.Processed;
}

public static OutboxMessage New(Guid id, byte[] data, string type, Guid correlationId) => new OutboxMessage(id, data, type)
public static OutboxMessage New(Guid id, byte[] data, string type, Guid correlationId) => new OutboxMessage
{
Id = id,
Data = data,
Type = type,
Status = MessageStatuses.Pending,
PartitionKey = correlationId.ToString()
};
Expand All @@ -38,16 +47,4 @@ public enum MessageStatuses
Processed
}
}

internal class OutboxMessageStateEntityTypeConfiguration : IEntityTypeConfiguration<OutboxMessage>
{
public void Configure(EntityTypeBuilder<OutboxMessage> builder)
{
builder.ToContainer("OutboxMessages")
.HasNoDiscriminator();

builder.HasKey(e => e.Id);
builder.HasPartitionKey(e => e.PartitionKey);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System.Diagnostics.CodeAnalysis;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata.Builders;

namespace OpenSleigh.Persistence.Cosmos.SQL.Entities
{
[ExcludeFromCodeCoverage]
internal class OutboxMessageStateEntityTypeConfiguration : IEntityTypeConfiguration<OutboxMessage>
{
public void Configure(EntityTypeBuilder<OutboxMessage> builder)
{
builder.ToContainer("OutboxMessages")
.HasNoDiscriminator();

builder.HasKey(e => e.Id);
builder.HasPartitionKey(e => e.PartitionKey);
}
}
}
14 changes: 0 additions & 14 deletions src/OpenSleigh.Persistence.Cosmos.SQL/Entities/SagaState.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
using System;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata.Builders;

namespace OpenSleigh.Persistence.Cosmos.SQL.Entities
{
Expand All @@ -11,16 +9,4 @@ public record SagaState(string PartitionKey, Guid CorrelationId, string Type)
public DateTime? LockTime { get; set; } = null;
}

internal class SagaStateEntityTypeConfiguration : IEntityTypeConfiguration<SagaState>
{
public void Configure(EntityTypeBuilder<SagaState> builder)
{
builder.ToContainer("SagaStates")
.HasNoDiscriminator();

builder.HasKey(e => new {e.CorrelationId, e.Type});
builder.HasPartitionKey(e => e.PartitionKey);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System.Diagnostics.CodeAnalysis;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata.Builders;

namespace OpenSleigh.Persistence.Cosmos.SQL.Entities
{
[ExcludeFromCodeCoverage]
internal class SagaStateEntityTypeConfiguration : IEntityTypeConfiguration<SagaState>
{
public void Configure(EntityTypeBuilder<SagaState> builder)
{
builder.ToContainer("SagaStates")
.HasNoDiscriminator();

builder.HasKey(e => new {e.CorrelationId, e.Type});
builder.HasPartitionKey(e => e.PartitionKey);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<TargetFramework>net5.0</TargetFramework>
<PackageVersion>0.0.2</PackageVersion>
<PackageVersion>0.0.3</PackageVersion>
<IsPackable>true</IsPackable>
<Authors>davidguida</Authors>
<Product>OpenSleigh.Persistence.Cosmos.SQL</Product>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using OpenSleigh.Core;
using OpenSleigh.Core.Messaging;

namespace OpenSleigh.Persistence.InMemory.Messaging
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<TargetFramework>net5.0</TargetFramework>
<PackageVersion>1.0.1</PackageVersion>
<PackageVersion>1.0.2</PackageVersion>
<IsPackable>true</IsPackable>
<Authors>davidguida</Authors>
<Product>OpenSleigh.Persistence.InMemory</Product>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<TargetFramework>net5.0</TargetFramework>
<PackageVersion>1.0.1</PackageVersion>
<PackageVersion>1.0.2</PackageVersion>
<IsPackable>true</IsPackable>
<Authors>davidguida</Authors>
<Product>OpenSleigh.Persistence.SQL</Product>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<TargetFramework>net5.0</TargetFramework>
<PackageVersion>1.0.1</PackageVersion>
<PackageVersion>1.0.2</PackageVersion>
<IsPackable>true</IsPackable>
<Authors>davidguida</Authors>
<Product>OpenSleigh.Transport.AzureServiceBus</Product>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using Azure.Messaging.ServiceBus;
using Microsoft.Extensions.Logging;
using OpenSleigh.Core;
using OpenSleigh.Core.Messaging;
using System;
using System.Threading;
Expand Down
15 changes: 15 additions & 0 deletions src/OpenSleigh.Transport.Kafka/GuidDeserializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using System;
using System.Runtime.CompilerServices;
using Confluent.Kafka;

[assembly: InternalsVisibleTo("OpenSleigh.Transport.Kafka.Tests")]
namespace OpenSleigh.Transport.Kafka
{
internal class GuidDeserializer : IDeserializer<Guid>
{
public Guid Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
{
return new Guid(data);
}
}
}
8 changes: 8 additions & 0 deletions src/OpenSleigh.Transport.Kafka/HeaderNames.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace OpenSleigh.Transport.Kafka
{
public static class HeaderNames
{
public const string MessageType = "message-type";
public const string Error = "error";
}
}
33 changes: 33 additions & 0 deletions src/OpenSleigh.Transport.Kafka/IKafkaBusConfigurationBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using Microsoft.Extensions.DependencyInjection;
using OpenSleigh.Core.DependencyInjection;
using OpenSleigh.Core.Messaging;
using System;
using System.Diagnostics.CodeAnalysis;

namespace OpenSleigh.Transport.Kafka
{
public interface IKafkaBusConfigurationBuilder
{
void UseMessageNamingPolicy<TM>(QueueReferencesPolicy<TM> policy) where TM : IMessage;
}


[ExcludeFromCodeCoverage]
internal class DefaultKafkaBusConfigurationBuilder : IKafkaBusConfigurationBuilder
{
private readonly IBusConfigurator _busConfigurator;

public DefaultKafkaBusConfigurationBuilder(IBusConfigurator busConfigurator)
{
_busConfigurator = busConfigurator;
}

public void UseMessageNamingPolicy<TM>(QueueReferencesPolicy<TM> policy) where TM : IMessage
{
if (policy == null)
throw new ArgumentNullException(nameof(policy));

_busConfigurator.Services.AddSingleton(policy);
}
}
}
12 changes: 12 additions & 0 deletions src/OpenSleigh.Transport.Kafka/IKafkaMessageHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;

namespace OpenSleigh.Transport.Kafka
{
public interface IKafkaMessageHandler
{
Task HandleAsync(ConsumeResult<Guid, byte[]> result, QueueReferences queueReferences, CancellationToken cancellationToken = default);
}
}
17 changes: 17 additions & 0 deletions src/OpenSleigh.Transport.Kafka/IKafkaPublisherExecutor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using OpenSleigh.Core.Messaging;

namespace OpenSleigh.Transport.Kafka
{
public interface IKafkaPublisherExecutor
{
Task<DeliveryResult<Guid, byte[]>> PublishAsync(IMessage message,
string topic,
IEnumerable<Header> additionalHeaders = null,
CancellationToken cancellationToken = default);
}
}
Loading