Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event bus [Proposal] #320

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion RabbitMQ.Stream.Client/EntityInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ namespace RabbitMQ.Stream.Client;
public abstract class Info
{
public string Stream { get; }
public string ClientProvidedName { get; }

protected Info(string stream)
protected Info(string stream, string clientProvidedName)
{
Stream = stream;
ClientProvidedName = clientProvidedName;
}
}
74 changes: 74 additions & 0 deletions RabbitMQ.Stream.Client/EventBus/StreamEvents.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2007-2023 VMware, Inc.

using System.Collections.Generic;
using RabbitMQ.Stream.Client.Reliable;

namespace RabbitMQ.Stream.Client.EventBus;

public abstract class ClientEvent : IStreamEvent
{
protected ClientEvent(IDictionary<string, string> connectionProperties, ClientParameters parameters,
EventTypes eventType, EventSeverity eventSeverity)
{
ConnectionProperties = connectionProperties;
Parameters = parameters;
EventType = eventType;
EventSeverity = eventSeverity;
}

public EventTypes EventType { get; internal set; }
public EventSeverity EventSeverity { get; internal set; }

public ClientParameters Parameters { get; }
public IDictionary<string, string> ConnectionProperties { get; }
}

public class RawProducerConnected : ClientEvent
{
public RawProducerConnected(IDictionary<string, string> connectionProperties, ClientParameters parameters,
RawProducer instance)
: base(connectionProperties, parameters, EventTypes.Connection, EventSeverity.Info)
{
Instance = instance;
}

public RawProducer Instance { get; }
}

public class RawProducerDisconnected : ClientEvent
{
public RawProducerDisconnected(IDictionary<string, string> connectionProperties,
ClientParameters parameters, RawProducer instance)
: base(connectionProperties, parameters, EventTypes.Disconnection, EventSeverity.Info)
{
Instance = instance;
}

public RawProducer Instance { get; }
}

public class ReliableBaseReconnected : IStreamEvent
{
public ReliableBaseReconnected(bool isReconnection, EventSeverity eventSeverity)
{
IsReconnection = isReconnection;
EventSeverity = eventSeverity;
}

public bool IsReconnection { get; }
public EventTypes EventType { get; } = EventTypes.Reconnection;
public EventSeverity EventSeverity { get; }
}

public class ProducerReconnected : ReliableBaseReconnected
{
public Producer Instance { get; }

public ProducerReconnected(bool isReconnection, EventSeverity eventSeverity, Producer instance) : base(
isReconnection, eventSeverity)
{
Instance = instance;
}
}
71 changes: 71 additions & 0 deletions RabbitMQ.Stream.Client/EventBus/StreamEventsBus.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2007-2023 VMware, Inc.

using System.Threading.Tasks;

namespace RabbitMQ.Stream.Client.EventBus;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;

public enum EventTypes
{
Connection,
Reconnection,
Disconnection,
}

public enum EventSeverity
{
Info,
Warning,
Error,
}

public interface IStreamEvent
{
EventTypes EventType { get; }
EventSeverity EventSeverity { get; }
}

public interface IEventBus
{
void Publish<T>(T v) where T : IStreamEvent;
void Subscribe<T>(Func<T, Task> func) where T : IStreamEvent;
}

public class StreamEventsBus : IEventBus
{
private readonly ConcurrentDictionary<Type, List<Func<IStreamEvent, Task>>> _subscriptions = new();

public void Publish<T>(T v) where T : IStreamEvent
{
var type = typeof(T);
if (_subscriptions.TryGetValue(type, out var funcs))
{
foreach (var func in funcs)
{
func(v);
}
}
}

public void Subscribe<T>(Func<T, Task> func) where T : IStreamEvent
{
var type = typeof(T);
if (!_subscriptions.TryGetValue(type, out var funcs))
{
funcs = new List<Func<IStreamEvent, Task>>();
_subscriptions.TryAdd(type, funcs);
}

funcs.Add(e => func((T)e));
}
}

public static class StreamEventsBusSingleton
{
private static readonly Lazy<StreamEventsBus> s_lazy = new(() => new StreamEventsBus());
public static StreamEventsBus Instance => s_lazy.Value;
}
3 changes: 2 additions & 1 deletion RabbitMQ.Stream.Client/IConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ public ushort InitialCredits
public class ConsumerInfo : Info
{
public string Reference { get; }
public ConsumerInfo(string stream, string reference) : base(stream)

public ConsumerInfo(string stream, string reference, string clientProvidedName) : base(stream, clientProvidedName)
{
Reference = reference;
}
Expand Down
5 changes: 4 additions & 1 deletion RabbitMQ.Stream.Client/IProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using RabbitMQ.Stream.Client.EventBus;

namespace RabbitMQ.Stream.Client;

Expand Down Expand Up @@ -103,6 +104,8 @@ public record IProducerConfig : INamedEntity
/// Filter enables the chunk filter feature.
/// </summary>
public ProducerFilter Filter { get; set; } = null;

public IEventBus Events { get; set; }
}

/// <summary>
Expand All @@ -112,7 +115,7 @@ public class ProducerInfo : Info
{
public string Reference { get; }

public ProducerInfo(string stream, string reference) : base(stream)
public ProducerInfo(string stream, string reference, string clientProvidedName) : base(stream, clientProvidedName)
{
Reference = reference;
}
Expand Down
53 changes: 50 additions & 3 deletions RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,49 @@ RabbitMQ.Stream.Client.ConsumerFilter.PostFilter.set -> void
RabbitMQ.Stream.Client.ConsumerFilter.Values.get -> System.Collections.Generic.List<string>
RabbitMQ.Stream.Client.ConsumerFilter.Values.set -> void
RabbitMQ.Stream.Client.ConsumerInfo
RabbitMQ.Stream.Client.ConsumerInfo.ConsumerInfo(string stream, string reference) -> void
RabbitMQ.Stream.Client.ConsumerInfo.ConsumerInfo(string stream, string reference, string clientProvidedName) -> void
RabbitMQ.Stream.Client.ConsumerInfo.Reference.get -> string
RabbitMQ.Stream.Client.CrcException
RabbitMQ.Stream.Client.CrcException.CrcException(string s) -> void
RabbitMQ.Stream.Client.EventBus.ClientEvent
RabbitMQ.Stream.Client.EventBus.ClientEvent.ClientEvent(System.Collections.Generic.IDictionary<string, string> connectionProperties, RabbitMQ.Stream.Client.ClientParameters parameters, RabbitMQ.Stream.Client.EventBus.EventTypes eventType, RabbitMQ.Stream.Client.EventBus.EventSeverity eventSeverity) -> void
RabbitMQ.Stream.Client.EventBus.ClientEvent.ConnectionProperties.get -> System.Collections.Generic.IDictionary<string, string>
RabbitMQ.Stream.Client.EventBus.ClientEvent.EventSeverity.get -> RabbitMQ.Stream.Client.EventBus.EventSeverity
RabbitMQ.Stream.Client.EventBus.ClientEvent.EventType.get -> RabbitMQ.Stream.Client.EventBus.EventTypes
RabbitMQ.Stream.Client.EventBus.ClientEvent.Parameters.get -> RabbitMQ.Stream.Client.ClientParameters
RabbitMQ.Stream.Client.EventBus.EventSeverity
RabbitMQ.Stream.Client.EventBus.EventSeverity.Error = 2 -> RabbitMQ.Stream.Client.EventBus.EventSeverity
RabbitMQ.Stream.Client.EventBus.EventSeverity.Info = 0 -> RabbitMQ.Stream.Client.EventBus.EventSeverity
RabbitMQ.Stream.Client.EventBus.EventSeverity.Warning = 1 -> RabbitMQ.Stream.Client.EventBus.EventSeverity
RabbitMQ.Stream.Client.EventBus.EventTypes
RabbitMQ.Stream.Client.EventBus.EventTypes.Connection = 0 -> RabbitMQ.Stream.Client.EventBus.EventTypes
RabbitMQ.Stream.Client.EventBus.EventTypes.Disconnection = 2 -> RabbitMQ.Stream.Client.EventBus.EventTypes
RabbitMQ.Stream.Client.EventBus.EventTypes.Reconnection = 1 -> RabbitMQ.Stream.Client.EventBus.EventTypes
RabbitMQ.Stream.Client.EventBus.IEventBus
RabbitMQ.Stream.Client.EventBus.IEventBus.Publish<T>(T v) -> void
RabbitMQ.Stream.Client.EventBus.IEventBus.Subscribe<T>(System.Func<T, System.Threading.Tasks.Task> func) -> void
RabbitMQ.Stream.Client.EventBus.IStreamEvent
RabbitMQ.Stream.Client.EventBus.IStreamEvent.EventSeverity.get -> RabbitMQ.Stream.Client.EventBus.EventSeverity
RabbitMQ.Stream.Client.EventBus.IStreamEvent.EventType.get -> RabbitMQ.Stream.Client.EventBus.EventTypes
RabbitMQ.Stream.Client.EventBus.ProducerReconnected
RabbitMQ.Stream.Client.EventBus.ProducerReconnected.Instance.get -> RabbitMQ.Stream.Client.Reliable.Producer
RabbitMQ.Stream.Client.EventBus.ProducerReconnected.ProducerReconnected(bool isReconnection, RabbitMQ.Stream.Client.EventBus.EventSeverity eventSeverity, RabbitMQ.Stream.Client.Reliable.Producer instance) -> void
RabbitMQ.Stream.Client.EventBus.RawProducerConnected
RabbitMQ.Stream.Client.EventBus.RawProducerConnected.Instance.get -> RabbitMQ.Stream.Client.RawProducer
RabbitMQ.Stream.Client.EventBus.RawProducerConnected.RawProducerConnected(System.Collections.Generic.IDictionary<string, string> connectionProperties, RabbitMQ.Stream.Client.ClientParameters parameters, RabbitMQ.Stream.Client.RawProducer instance) -> void
RabbitMQ.Stream.Client.EventBus.RawProducerDisconnected
RabbitMQ.Stream.Client.EventBus.RawProducerDisconnected.Instance.get -> RabbitMQ.Stream.Client.RawProducer
RabbitMQ.Stream.Client.EventBus.RawProducerDisconnected.RawProducerDisconnected(System.Collections.Generic.IDictionary<string, string> connectionProperties, RabbitMQ.Stream.Client.ClientParameters parameters, RabbitMQ.Stream.Client.RawProducer instance) -> void
RabbitMQ.Stream.Client.EventBus.ReliableBaseReconnected
RabbitMQ.Stream.Client.EventBus.ReliableBaseReconnected.EventSeverity.get -> RabbitMQ.Stream.Client.EventBus.EventSeverity
RabbitMQ.Stream.Client.EventBus.ReliableBaseReconnected.EventType.get -> RabbitMQ.Stream.Client.EventBus.EventTypes
RabbitMQ.Stream.Client.EventBus.ReliableBaseReconnected.IsReconnection.get -> bool
RabbitMQ.Stream.Client.EventBus.ReliableBaseReconnected.ReliableBaseReconnected(bool isReconnection, RabbitMQ.Stream.Client.EventBus.EventSeverity eventSeverity) -> void
RabbitMQ.Stream.Client.EventBus.StreamEventsBus
RabbitMQ.Stream.Client.EventBus.StreamEventsBus.Publish<T>(T v) -> void
RabbitMQ.Stream.Client.EventBus.StreamEventsBus.StreamEventsBus() -> void
RabbitMQ.Stream.Client.EventBus.StreamEventsBus.Subscribe<T>(System.Func<T, System.Threading.Tasks.Task> func) -> void
RabbitMQ.Stream.Client.EventBus.StreamEventsBusSingleton
RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
RabbitMQ.Stream.Client.IConsumer.Info.get -> RabbitMQ.Stream.Client.ConsumerInfo
RabbitMQ.Stream.Client.IConsumerConfig.Crc32.get -> RabbitMQ.Stream.Client.ICrc32
Expand Down Expand Up @@ -59,9 +98,12 @@ RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.set -> void
RabbitMQ.Stream.Client.ICrc32
RabbitMQ.Stream.Client.ICrc32.Hash(byte[] data) -> byte[]
RabbitMQ.Stream.Client.Info
RabbitMQ.Stream.Client.Info.Info(string stream) -> void
RabbitMQ.Stream.Client.Info.ClientProvidedName.get -> string
RabbitMQ.Stream.Client.Info.Info(string stream, string clientProvidedName) -> void
RabbitMQ.Stream.Client.Info.Stream.get -> string
RabbitMQ.Stream.Client.IProducer.Info.get -> RabbitMQ.Stream.Client.ProducerInfo
RabbitMQ.Stream.Client.IProducerConfig.Events.get -> RabbitMQ.Stream.Client.EventBus.IEventBus
RabbitMQ.Stream.Client.IProducerConfig.Events.set -> void
RabbitMQ.Stream.Client.IProducerConfig.Filter.get -> RabbitMQ.Stream.Client.ProducerFilter
RabbitMQ.Stream.Client.IProducerConfig.Filter.set -> void
RabbitMQ.Stream.Client.IRoutingStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
Expand All @@ -74,7 +116,7 @@ RabbitMQ.Stream.Client.ProducerFilter
RabbitMQ.Stream.Client.ProducerFilter.FilterValue.get -> System.Func<RabbitMQ.Stream.Client.Message, string>
RabbitMQ.Stream.Client.ProducerFilter.FilterValue.set -> void
RabbitMQ.Stream.Client.ProducerInfo
RabbitMQ.Stream.Client.ProducerInfo.ProducerInfo(string stream, string reference) -> void
RabbitMQ.Stream.Client.ProducerInfo.ProducerInfo(string stream, string reference, string clientProvidedName) -> void
RabbitMQ.Stream.Client.ProducerInfo.Reference.get -> string
RabbitMQ.Stream.Client.PublishFilter
RabbitMQ.Stream.Client.PublishFilter.Command.get -> ushort
Expand Down Expand Up @@ -109,6 +151,10 @@ RabbitMQ.Stream.Client.Reliable.Producer.Info.get -> RabbitMQ.Stream.Client.Prod
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.get -> RabbitMQ.Stream.Client.ProducerFilter
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.set -> void
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Reference.set -> void
RabbitMQ.Stream.Client.Reliable.ReliableBase.OnReconnected.get -> System.Func<bool, RabbitMQ.Stream.Client.EventBus.EventSeverity, System.Threading.Tasks.Task>
RabbitMQ.Stream.Client.Reliable.ReliableBase.OnReconnected.set -> void
RabbitMQ.Stream.Client.Reliable.ReliableConfig.Events.get -> RabbitMQ.Stream.Client.EventBus.IEventBus
RabbitMQ.Stream.Client.Reliable.ReliableConfig.Events.set -> void
RabbitMQ.Stream.Client.Reliable.SuperStreamConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType
RabbitMQ.Stream.Client.Reliable.SuperStreamConfig.RoutingStrategyType.set -> void
RabbitMQ.Stream.Client.RouteNotFoundException
Expand Down Expand Up @@ -144,5 +190,6 @@ RabbitMQ.Stream.Client.UnknownCommandException.UnknownCommandException(string s)
RabbitMQ.Stream.Client.UnsupportedOperationException
RabbitMQ.Stream.Client.UnsupportedOperationException.UnsupportedOperationException(string s) -> void
static RabbitMQ.Stream.Client.Connection.Create(System.Net.EndPoint endpoint, System.Func<System.Memory<byte>, System.Threading.Tasks.Task> commandCallback, System.Func<string, System.Threading.Tasks.Task> closedCallBack, RabbitMQ.Stream.Client.SslOption sslOption, Microsoft.Extensions.Logging.ILogger logger) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.Connection>
static RabbitMQ.Stream.Client.EventBus.StreamEventsBusSingleton.Instance.get -> RabbitMQ.Stream.Client.EventBus.StreamEventsBus
static RabbitMQ.Stream.Client.Message.From(ref System.Buffers.ReadOnlySequence<byte> seq, uint len) -> RabbitMQ.Stream.Client.Message
static RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Create(RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig producerConfig, Microsoft.Extensions.Logging.ILogger<RabbitMQ.Stream.Client.Reliable.Producer> logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer>
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/RawConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private RawConsumer(Client client, RawConsumerConfig config, ILogger logger = nu
_initialCredits = config.InitialCredits;
_config = config;
_logger.LogDebug("Creating... {ConsumerInfo}", ConsumerInfo());
Info = new ConsumerInfo(_config.Stream, _config.Reference);
Info = new ConsumerInfo(_config.Stream, _config.Reference, config.ClientProvidedName);
// _chunksBuffer is a channel that is used to buffer the chunks
_chunksBuffer = Channel.CreateBounded<Chunk>(new BoundedChannelOptions(_initialCredits)
{
Expand Down
9 changes: 7 additions & 2 deletions RabbitMQ.Stream.Client/RawProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using RabbitMQ.Stream.Client.EventBus;

namespace RabbitMQ.Stream.Client
{
Expand Down Expand Up @@ -69,17 +70,19 @@ public class RawProducer : AbstractEntity, IProducer, IDisposable
{
var client = await RoutingHelper<Routing>.LookupLeaderConnection(clientParameters, metaStreamInfo, logger)
.ConfigureAwait(false);

var producer = new RawProducer((Client)client, config, logger);
await producer.Init().ConfigureAwait(false);
config.Events?.Publish(new RawProducerConnected(client.ConnectionProperties,
client.Parameters, producer));

return producer;
}

private RawProducer(Client client, RawProducerConfig config, ILogger logger = null)
{
_client = client;
_config = config;
Info = new ProducerInfo(_config.Stream, _config.Reference);
Info = new ProducerInfo(_config.Stream, _config.Reference, _config.ClientProvidedName);
_messageBuffer = Channel.CreateBounded<OutgoingMsg>(new BoundedChannelOptions(10000)
{
AllowSynchronousContinuations = false,
Expand All @@ -102,6 +105,8 @@ private async Task Init()
_client.ConnectionClosed += async reason =>
{
await Close().ConfigureAwait(false);
_config.Events?.Publish(new RawProducerDisconnected(_client.ConnectionProperties,
_client.Parameters, this));
if (_config.ConnectionClosedHandler != null)
{
await _config.ConnectionClosedHandler(reason).ConfigureAwait(false);
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class RawSuperStreamConsumer : IConsumer, IDisposable
_streamInfos = streamInfos;
_clientParameters = clientParameters;
_logger = logger ?? NullLogger.Instance;
Info = new ConsumerInfo(_config.SuperStream, _config.Reference);
Info = new ConsumerInfo(_config.SuperStream, _config.Reference, _config.ClientProvidedName);

StartConsumers().Wait(CancellationToken.None);
}
Expand Down
9 changes: 7 additions & 2 deletions RabbitMQ.Stream.Client/RawSuperStreamProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class RawSuperStreamProducer : IProducer, IDisposable
_config = config;
_streamInfos = streamInfos;
_clientParameters = clientParameters;
Info = new ProducerInfo(config.SuperStream, config.Reference);
Info = new ProducerInfo(config.SuperStream, config.Reference, config.ClientProvidedName);
_defaultRoutingConfiguration.RoutingStrategy = _config.RoutingStrategyType switch
{
RoutingStrategyType.Key => new KeyRoutingStrategy(_config.Routing,
Expand All @@ -88,6 +88,7 @@ private RawProducerConfig FromStreamConfig(string stream)
Reference = _config.Reference,
MaxInFlight = _config.MaxInFlight,
Filter = _config.Filter,
Events = _config.Events,
ConnectionClosedHandler = s =>
{
// In case of connection closed, we need to remove the producer from the list
Expand Down Expand Up @@ -129,7 +130,11 @@ private RawProducerConfig FromStreamConfig(string stream)
// The producer is created on demand when a message is sent to a stream
private async Task<IProducer> InitProducer(string stream)
{
var p = await RawProducer.Create(_clientParameters, FromStreamConfig(stream), _streamInfos[stream], _logger)
var c = _clientParameters with
{
ClientProvidedName = _clientParameters.ClientProvidedName + "#" + stream
};
var p = await RawProducer.Create(c, FromStreamConfig(stream), _streamInfos[stream], _logger)
.ConfigureAwait(false);
_logger?.LogDebug("Producer {ProducerReference} created for Stream {StreamIdentifier}", _config.Reference,
stream);
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/Reliable/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ internal Consumer(ConsumerConfig consumerConfig, ILogger<Consumer> logger = null
{
_logger = logger ?? NullLogger<Consumer>.Instance;
_consumerConfig = consumerConfig;
Info = new ConsumerInfo(consumerConfig.Stream, consumerConfig.Reference);
Info = new ConsumerInfo(consumerConfig.Stream, consumerConfig.Reference, consumerConfig.ClientProvidedName);
}

public static async Task<Consumer> Create(ConsumerConfig consumerConfig, ILogger<Consumer> logger = null)
Expand Down
Loading