Skip to content

Commit

Permalink
Merge 906fa39 into cd46117
Browse files Browse the repository at this point in the history
  • Loading branch information
Gsantomaggio authored Oct 17, 2023
2 parents cd46117 + 906fa39 commit 638c3de
Show file tree
Hide file tree
Showing 16 changed files with 456 additions and 17 deletions.
4 changes: 3 additions & 1 deletion RabbitMQ.Stream.Client/EntityInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ namespace RabbitMQ.Stream.Client;
public abstract class Info
{
public string Stream { get; }
public string ClientProvidedName { get; }

protected Info(string stream)
protected Info(string stream, string clientProvidedName)
{
Stream = stream;
ClientProvidedName = clientProvidedName;
}
}
74 changes: 74 additions & 0 deletions RabbitMQ.Stream.Client/EventBus/StreamEvents.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2007-2023 VMware, Inc.

using System.Collections.Generic;
using RabbitMQ.Stream.Client.Reliable;

namespace RabbitMQ.Stream.Client.EventBus;

public abstract class ClientEvent : IStreamEvent
{
protected ClientEvent(IDictionary<string, string> connectionProperties, ClientParameters parameters,
EventTypes eventType, EventSeverity eventSeverity)
{
ConnectionProperties = connectionProperties;
Parameters = parameters;
EventType = eventType;
EventSeverity = eventSeverity;
}

public EventTypes EventType { get; internal set; }
public EventSeverity EventSeverity { get; internal set; }

public ClientParameters Parameters { get; }
public IDictionary<string, string> ConnectionProperties { get; }
}

public class RawProducerConnected : ClientEvent
{
public RawProducerConnected(IDictionary<string, string> connectionProperties, ClientParameters parameters,
RawProducer instance)
: base(connectionProperties, parameters, EventTypes.Connection, EventSeverity.Info)
{
Instance = instance;
}

public RawProducer Instance { get; }
}

public class RawProducerDisconnected : ClientEvent
{
public RawProducerDisconnected(IDictionary<string, string> connectionProperties,
ClientParameters parameters, RawProducer instance)
: base(connectionProperties, parameters, EventTypes.Disconnection, EventSeverity.Info)
{
Instance = instance;
}

public RawProducer Instance { get; }
}

public class ReliableBaseReconnected : IStreamEvent
{
public ReliableBaseReconnected(bool isReconnection, EventSeverity eventSeverity)
{
IsReconnection = isReconnection;
EventSeverity = eventSeverity;
}

public bool IsReconnection { get; }
public EventTypes EventType { get; } = EventTypes.Reconnection;
public EventSeverity EventSeverity { get; }
}

public class ProducerReconnected : ReliableBaseReconnected
{
public Producer Instance { get; }

public ProducerReconnected(bool isReconnection, EventSeverity eventSeverity, Producer instance) : base(
isReconnection, eventSeverity)
{
Instance = instance;
}
}
71 changes: 71 additions & 0 deletions RabbitMQ.Stream.Client/EventBus/StreamEventsBus.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2007-2023 VMware, Inc.

using System.Threading.Tasks;

namespace RabbitMQ.Stream.Client.EventBus;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;

public enum EventTypes
{
Connection,
Reconnection,
Disconnection,
}

public enum EventSeverity
{
Info,
Warning,
Error,
}

public interface IStreamEvent
{
EventTypes EventType { get; }
EventSeverity EventSeverity { get; }
}

public interface IEventBus
{
void Publish<T>(T v) where T : IStreamEvent;
void Subscribe<T>(Func<T, Task> func) where T : IStreamEvent;
}

public class StreamEventsBus : IEventBus
{
private readonly ConcurrentDictionary<Type, List<Func<IStreamEvent, Task>>> _subscriptions = new();

public void Publish<T>(T v) where T : IStreamEvent
{
var type = typeof(T);
if (_subscriptions.TryGetValue(type, out var funcs))
{
foreach (var func in funcs)
{
func(v);
}
}
}

public void Subscribe<T>(Func<T, Task> func) where T : IStreamEvent
{
var type = typeof(T);
if (!_subscriptions.TryGetValue(type, out var funcs))
{
funcs = new List<Func<IStreamEvent, Task>>();
_subscriptions.TryAdd(type, funcs);
}

funcs.Add(e => func((T)e));
}
}

public static class StreamEventsBusSingleton
{
private static readonly Lazy<StreamEventsBus> s_lazy = new(() => new StreamEventsBus());
public static StreamEventsBus Instance => s_lazy.Value;
}
3 changes: 2 additions & 1 deletion RabbitMQ.Stream.Client/IConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ public ushort InitialCredits
public class ConsumerInfo : Info
{
public string Reference { get; }
public ConsumerInfo(string stream, string reference) : base(stream)

public ConsumerInfo(string stream, string reference, string clientProvidedName) : base(stream, clientProvidedName)
{
Reference = reference;
}
Expand Down
5 changes: 4 additions & 1 deletion RabbitMQ.Stream.Client/IProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using RabbitMQ.Stream.Client.EventBus;

namespace RabbitMQ.Stream.Client;

Expand Down Expand Up @@ -103,6 +104,8 @@ public record IProducerConfig : INamedEntity
/// Filter enables the chunk filter feature.
/// </summary>
public ProducerFilter Filter { get; set; } = null;

public IEventBus Events { get; set; }
}

/// <summary>
Expand All @@ -112,7 +115,7 @@ public class ProducerInfo : Info
{
public string Reference { get; }

public ProducerInfo(string stream, string reference) : base(stream)
public ProducerInfo(string stream, string reference, string clientProvidedName) : base(stream, clientProvidedName)
{
Reference = reference;
}
Expand Down
53 changes: 50 additions & 3 deletions RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,49 @@ RabbitMQ.Stream.Client.ConsumerFilter.PostFilter.set -> void
RabbitMQ.Stream.Client.ConsumerFilter.Values.get -> System.Collections.Generic.List<string>
RabbitMQ.Stream.Client.ConsumerFilter.Values.set -> void
RabbitMQ.Stream.Client.ConsumerInfo
RabbitMQ.Stream.Client.ConsumerInfo.ConsumerInfo(string stream, string reference) -> void
RabbitMQ.Stream.Client.ConsumerInfo.ConsumerInfo(string stream, string reference, string clientProvidedName) -> void
RabbitMQ.Stream.Client.ConsumerInfo.Reference.get -> string
RabbitMQ.Stream.Client.CrcException
RabbitMQ.Stream.Client.CrcException.CrcException(string s) -> void
RabbitMQ.Stream.Client.EventBus.ClientEvent
RabbitMQ.Stream.Client.EventBus.ClientEvent.ClientEvent(System.Collections.Generic.IDictionary<string, string> connectionProperties, RabbitMQ.Stream.Client.ClientParameters parameters, RabbitMQ.Stream.Client.EventBus.EventTypes eventType, RabbitMQ.Stream.Client.EventBus.EventSeverity eventSeverity) -> void
RabbitMQ.Stream.Client.EventBus.ClientEvent.ConnectionProperties.get -> System.Collections.Generic.IDictionary<string, string>
RabbitMQ.Stream.Client.EventBus.ClientEvent.EventSeverity.get -> RabbitMQ.Stream.Client.EventBus.EventSeverity
RabbitMQ.Stream.Client.EventBus.ClientEvent.EventType.get -> RabbitMQ.Stream.Client.EventBus.EventTypes
RabbitMQ.Stream.Client.EventBus.ClientEvent.Parameters.get -> RabbitMQ.Stream.Client.ClientParameters
RabbitMQ.Stream.Client.EventBus.EventSeverity
RabbitMQ.Stream.Client.EventBus.EventSeverity.Error = 2 -> RabbitMQ.Stream.Client.EventBus.EventSeverity
RabbitMQ.Stream.Client.EventBus.EventSeverity.Info = 0 -> RabbitMQ.Stream.Client.EventBus.EventSeverity
RabbitMQ.Stream.Client.EventBus.EventSeverity.Warning = 1 -> RabbitMQ.Stream.Client.EventBus.EventSeverity
RabbitMQ.Stream.Client.EventBus.EventTypes
RabbitMQ.Stream.Client.EventBus.EventTypes.Connection = 0 -> RabbitMQ.Stream.Client.EventBus.EventTypes
RabbitMQ.Stream.Client.EventBus.EventTypes.Disconnection = 2 -> RabbitMQ.Stream.Client.EventBus.EventTypes
RabbitMQ.Stream.Client.EventBus.EventTypes.Reconnection = 1 -> RabbitMQ.Stream.Client.EventBus.EventTypes
RabbitMQ.Stream.Client.EventBus.IEventBus
RabbitMQ.Stream.Client.EventBus.IEventBus.Publish<T>(T v) -> void
RabbitMQ.Stream.Client.EventBus.IEventBus.Subscribe<T>(System.Func<T, System.Threading.Tasks.Task> func) -> void
RabbitMQ.Stream.Client.EventBus.IStreamEvent
RabbitMQ.Stream.Client.EventBus.IStreamEvent.EventSeverity.get -> RabbitMQ.Stream.Client.EventBus.EventSeverity
RabbitMQ.Stream.Client.EventBus.IStreamEvent.EventType.get -> RabbitMQ.Stream.Client.EventBus.EventTypes
RabbitMQ.Stream.Client.EventBus.ProducerReconnected
RabbitMQ.Stream.Client.EventBus.ProducerReconnected.Instance.get -> RabbitMQ.Stream.Client.Reliable.Producer
RabbitMQ.Stream.Client.EventBus.ProducerReconnected.ProducerReconnected(bool isReconnection, RabbitMQ.Stream.Client.EventBus.EventSeverity eventSeverity, RabbitMQ.Stream.Client.Reliable.Producer instance) -> void
RabbitMQ.Stream.Client.EventBus.RawProducerConnected
RabbitMQ.Stream.Client.EventBus.RawProducerConnected.Instance.get -> RabbitMQ.Stream.Client.RawProducer
RabbitMQ.Stream.Client.EventBus.RawProducerConnected.RawProducerConnected(System.Collections.Generic.IDictionary<string, string> connectionProperties, RabbitMQ.Stream.Client.ClientParameters parameters, RabbitMQ.Stream.Client.RawProducer instance) -> void
RabbitMQ.Stream.Client.EventBus.RawProducerDisconnected
RabbitMQ.Stream.Client.EventBus.RawProducerDisconnected.Instance.get -> RabbitMQ.Stream.Client.RawProducer
RabbitMQ.Stream.Client.EventBus.RawProducerDisconnected.RawProducerDisconnected(System.Collections.Generic.IDictionary<string, string> connectionProperties, RabbitMQ.Stream.Client.ClientParameters parameters, RabbitMQ.Stream.Client.RawProducer instance) -> void
RabbitMQ.Stream.Client.EventBus.ReliableBaseReconnected
RabbitMQ.Stream.Client.EventBus.ReliableBaseReconnected.EventSeverity.get -> RabbitMQ.Stream.Client.EventBus.EventSeverity
RabbitMQ.Stream.Client.EventBus.ReliableBaseReconnected.EventType.get -> RabbitMQ.Stream.Client.EventBus.EventTypes
RabbitMQ.Stream.Client.EventBus.ReliableBaseReconnected.IsReconnection.get -> bool
RabbitMQ.Stream.Client.EventBus.ReliableBaseReconnected.ReliableBaseReconnected(bool isReconnection, RabbitMQ.Stream.Client.EventBus.EventSeverity eventSeverity) -> void
RabbitMQ.Stream.Client.EventBus.StreamEventsBus
RabbitMQ.Stream.Client.EventBus.StreamEventsBus.Publish<T>(T v) -> void
RabbitMQ.Stream.Client.EventBus.StreamEventsBus.StreamEventsBus() -> void
RabbitMQ.Stream.Client.EventBus.StreamEventsBus.Subscribe<T>(System.Func<T, System.Threading.Tasks.Task> func) -> void
RabbitMQ.Stream.Client.EventBus.StreamEventsBusSingleton
RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
RabbitMQ.Stream.Client.IConsumer.Info.get -> RabbitMQ.Stream.Client.ConsumerInfo
RabbitMQ.Stream.Client.IConsumerConfig.Crc32.get -> RabbitMQ.Stream.Client.ICrc32
Expand Down Expand Up @@ -59,9 +98,12 @@ RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.set -> void
RabbitMQ.Stream.Client.ICrc32
RabbitMQ.Stream.Client.ICrc32.Hash(byte[] data) -> byte[]
RabbitMQ.Stream.Client.Info
RabbitMQ.Stream.Client.Info.Info(string stream) -> void
RabbitMQ.Stream.Client.Info.ClientProvidedName.get -> string
RabbitMQ.Stream.Client.Info.Info(string stream, string clientProvidedName) -> void
RabbitMQ.Stream.Client.Info.Stream.get -> string
RabbitMQ.Stream.Client.IProducer.Info.get -> RabbitMQ.Stream.Client.ProducerInfo
RabbitMQ.Stream.Client.IProducerConfig.Events.get -> RabbitMQ.Stream.Client.EventBus.IEventBus
RabbitMQ.Stream.Client.IProducerConfig.Events.set -> void
RabbitMQ.Stream.Client.IProducerConfig.Filter.get -> RabbitMQ.Stream.Client.ProducerFilter
RabbitMQ.Stream.Client.IProducerConfig.Filter.set -> void
RabbitMQ.Stream.Client.IRoutingStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
Expand All @@ -74,7 +116,7 @@ RabbitMQ.Stream.Client.ProducerFilter
RabbitMQ.Stream.Client.ProducerFilter.FilterValue.get -> System.Func<RabbitMQ.Stream.Client.Message, string>
RabbitMQ.Stream.Client.ProducerFilter.FilterValue.set -> void
RabbitMQ.Stream.Client.ProducerInfo
RabbitMQ.Stream.Client.ProducerInfo.ProducerInfo(string stream, string reference) -> void
RabbitMQ.Stream.Client.ProducerInfo.ProducerInfo(string stream, string reference, string clientProvidedName) -> void
RabbitMQ.Stream.Client.ProducerInfo.Reference.get -> string
RabbitMQ.Stream.Client.PublishFilter
RabbitMQ.Stream.Client.PublishFilter.Command.get -> ushort
Expand Down Expand Up @@ -109,6 +151,10 @@ RabbitMQ.Stream.Client.Reliable.Producer.Info.get -> RabbitMQ.Stream.Client.Prod
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.get -> RabbitMQ.Stream.Client.ProducerFilter
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.set -> void
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Reference.set -> void
RabbitMQ.Stream.Client.Reliable.ReliableBase.OnReconnected.get -> System.Func<bool, RabbitMQ.Stream.Client.EventBus.EventSeverity, System.Threading.Tasks.Task>
RabbitMQ.Stream.Client.Reliable.ReliableBase.OnReconnected.set -> void
RabbitMQ.Stream.Client.Reliable.ReliableConfig.Events.get -> RabbitMQ.Stream.Client.EventBus.IEventBus
RabbitMQ.Stream.Client.Reliable.ReliableConfig.Events.set -> void
RabbitMQ.Stream.Client.Reliable.SuperStreamConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType
RabbitMQ.Stream.Client.Reliable.SuperStreamConfig.RoutingStrategyType.set -> void
RabbitMQ.Stream.Client.RouteNotFoundException
Expand Down Expand Up @@ -144,5 +190,6 @@ RabbitMQ.Stream.Client.UnknownCommandException.UnknownCommandException(string s)
RabbitMQ.Stream.Client.UnsupportedOperationException
RabbitMQ.Stream.Client.UnsupportedOperationException.UnsupportedOperationException(string s) -> void
static RabbitMQ.Stream.Client.Connection.Create(System.Net.EndPoint endpoint, System.Func<System.Memory<byte>, System.Threading.Tasks.Task> commandCallback, System.Func<string, System.Threading.Tasks.Task> closedCallBack, RabbitMQ.Stream.Client.SslOption sslOption, Microsoft.Extensions.Logging.ILogger logger) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.Connection>
static RabbitMQ.Stream.Client.EventBus.StreamEventsBusSingleton.Instance.get -> RabbitMQ.Stream.Client.EventBus.StreamEventsBus
static RabbitMQ.Stream.Client.Message.From(ref System.Buffers.ReadOnlySequence<byte> seq, uint len) -> RabbitMQ.Stream.Client.Message
static RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Create(RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig producerConfig, Microsoft.Extensions.Logging.ILogger<RabbitMQ.Stream.Client.Reliable.Producer> logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer>
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/RawConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private RawConsumer(Client client, RawConsumerConfig config, ILogger logger = nu
_initialCredits = config.InitialCredits;
_config = config;
_logger.LogDebug("Creating... {ConsumerInfo}", ConsumerInfo());
Info = new ConsumerInfo(_config.Stream, _config.Reference);
Info = new ConsumerInfo(_config.Stream, _config.Reference, config.ClientProvidedName);
// _chunksBuffer is a channel that is used to buffer the chunks
_chunksBuffer = Channel.CreateBounded<Chunk>(new BoundedChannelOptions(_initialCredits)
{
Expand Down
9 changes: 7 additions & 2 deletions RabbitMQ.Stream.Client/RawProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using RabbitMQ.Stream.Client.EventBus;

namespace RabbitMQ.Stream.Client
{
Expand Down Expand Up @@ -69,17 +70,19 @@ public static async Task<IProducer> Create(
{
var client = await RoutingHelper<Routing>.LookupLeaderConnection(clientParameters, metaStreamInfo, logger)
.ConfigureAwait(false);

var producer = new RawProducer((Client)client, config, logger);
await producer.Init().ConfigureAwait(false);
config.Events?.Publish(new RawProducerConnected(client.ConnectionProperties,
client.Parameters, producer));

return producer;
}

private RawProducer(Client client, RawProducerConfig config, ILogger logger = null)
{
_client = client;
_config = config;
Info = new ProducerInfo(_config.Stream, _config.Reference);
Info = new ProducerInfo(_config.Stream, _config.Reference, _config.ClientProvidedName);
_messageBuffer = Channel.CreateBounded<OutgoingMsg>(new BoundedChannelOptions(10000)
{
AllowSynchronousContinuations = false,
Expand All @@ -102,6 +105,8 @@ private async Task Init()
_client.ConnectionClosed += async reason =>
{
await Close().ConfigureAwait(false);
_config.Events?.Publish(new RawProducerDisconnected(_client.ConnectionProperties,
_client.Parameters, this));
if (_config.ConnectionClosedHandler != null)
{
await _config.ConnectionClosedHandler(reason).ConfigureAwait(false);
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ private RawSuperStreamConsumer(
_streamInfos = streamInfos;
_clientParameters = clientParameters;
_logger = logger ?? NullLogger.Instance;
Info = new ConsumerInfo(_config.SuperStream, _config.Reference);
Info = new ConsumerInfo(_config.SuperStream, _config.Reference, _config.ClientProvidedName);

StartConsumers().Wait(CancellationToken.None);
}
Expand Down
9 changes: 7 additions & 2 deletions RabbitMQ.Stream.Client/RawSuperStreamProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private RawSuperStreamProducer(
_config = config;
_streamInfos = streamInfos;
_clientParameters = clientParameters;
Info = new ProducerInfo(config.SuperStream, config.Reference);
Info = new ProducerInfo(config.SuperStream, config.Reference, config.ClientProvidedName);
_defaultRoutingConfiguration.RoutingStrategy = _config.RoutingStrategyType switch
{
RoutingStrategyType.Key => new KeyRoutingStrategy(_config.Routing,
Expand All @@ -88,6 +88,7 @@ private RawProducerConfig FromStreamConfig(string stream)
Reference = _config.Reference,
MaxInFlight = _config.MaxInFlight,
Filter = _config.Filter,
Events = _config.Events,
ConnectionClosedHandler = s =>
{
// In case of connection closed, we need to remove the producer from the list
Expand Down Expand Up @@ -129,7 +130,11 @@ private RawProducerConfig FromStreamConfig(string stream)
// The producer is created on demand when a message is sent to a stream
private async Task<IProducer> InitProducer(string stream)
{
var p = await RawProducer.Create(_clientParameters, FromStreamConfig(stream), _streamInfos[stream], _logger)
var c = _clientParameters with
{
ClientProvidedName = _clientParameters.ClientProvidedName + "#" + stream
};
var p = await RawProducer.Create(c, FromStreamConfig(stream), _streamInfos[stream], _logger)
.ConfigureAwait(false);
_logger?.LogDebug("Producer {ProducerReference} created for Stream {StreamIdentifier}", _config.Reference,
stream);
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/Reliable/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ internal Consumer(ConsumerConfig consumerConfig, ILogger<Consumer> logger = null
{
_logger = logger ?? NullLogger<Consumer>.Instance;
_consumerConfig = consumerConfig;
Info = new ConsumerInfo(consumerConfig.Stream, consumerConfig.Reference);
Info = new ConsumerInfo(consumerConfig.Stream, consumerConfig.Reference, consumerConfig.ClientProvidedName);
}

public static async Task<Consumer> Create(ConsumerConfig consumerConfig, ILogger<Consumer> logger = null)
Expand Down
Loading

0 comments on commit 638c3de

Please sign in to comment.