Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event bus [Proposal] #320

Closed
wants to merge 7 commits into from
Closed

Event bus [Proposal] #320

wants to merge 7 commits into from

Conversation

Gsantomaggio
Copy link
Member

@Gsantomaggio Gsantomaggio commented Oct 4, 2023

ref: #316

The idea of this implementation is to have the possibility to subscribe to a partial event.

so like:

  StreamEventsBusSingleton.Instance.Publish<ProducerConnectEvent>(new ProducerConnectEvent(this, _config));

and somewhere to subscribe to the event based on the type, like:

 StreamEventsBusSingleton.Instance.Subscribe<ProducerConnectEvent>(p =>
 {
   Console.WriteLine($"ProducerConnectEvent , Stream {p.Config.Stream}, event type {p.EventType}, event severity 
   {p.EventSeverity}" );
});

In this way we can implement different events with detailed information and (why not ) with the class instance that raised the event, for example:

    public class ProducerConnectEvent : IStreamEvent
    {
        public ProducerConnectEvent(RawProducer producer, RawProducerConfig config)
        {
            Producer = producer;
            Config = config;
        }

        public RawProducerConfig Config { get; }

        public EventTypes EventType => EventTypes.Connection;
        public EventSeverity EventSeverity => EventSeverity.Info;
        public RawProducer Producer { get; }
    }

Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
@codecov
Copy link

codecov bot commented Oct 4, 2023

Codecov Report

Attention: 3 lines in your changes are missing coverage. Please review.

Comparison is base (a515ea9) 92.66% compared to head (638c3de) 92.73%.
Report is 1 commits behind head on main.

❗ Current head 638c3de differs from pull request most recent head 906fa39. Consider uploading reports for the commit 906fa39 to get more accurate results

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #320      +/-   ##
==========================================
+ Coverage   92.66%   92.73%   +0.07%     
==========================================
  Files         113      116       +3     
  Lines        9961    10195     +234     
  Branches      825      840      +15     
==========================================
+ Hits         9230     9454     +224     
- Misses        555      562       +7     
- Partials      176      179       +3     
Files Coverage Δ
RabbitMQ.Stream.Client/EntityInfo.cs 100.00% <100.00%> (ø)
RabbitMQ.Stream.Client/EventBus/StreamEvents.cs 100.00% <100.00%> (ø)
RabbitMQ.Stream.Client/IConsumer.cs 100.00% <100.00%> (ø)
RabbitMQ.Stream.Client/IProducer.cs 100.00% <100.00%> (ø)
RabbitMQ.Stream.Client/RawConsumer.cs 84.36% <100.00%> (ø)
RabbitMQ.Stream.Client/RawProducer.cs 88.53% <100.00%> (+0.18%) ⬆️
RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs 94.30% <100.00%> (ø)
RabbitMQ.Stream.Client/RawSuperStreamProducer.cs 96.80% <100.00%> (+0.07%) ⬆️
RabbitMQ.Stream.Client/Reliable/Consumer.cs 100.00% <100.00%> (ø)
RabbitMQ.Stream.Client/Reliable/Producer.cs 84.00% <100.00%> (-0.03%) ⬇️
... and 5 more

... and 1 file with indirect coverage changes

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@jonnepmyra
Copy link
Contributor

Looks good.

We will need to execute async code when the events are raised.

Do you think the we could have async subscriptions aswell? So the registered action would be a Func<IStreamEvent, Task> and that the Publish part of the lib would await our registered callback/subscriber?

@Gsantomaggio
Copy link
Member Author

have async subscriptions aswell?

Yes

Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
@TroelsL
Copy link
Contributor

TroelsL commented Oct 6, 2023

I think this proposal covers our operational needs. But I am not certain which events the "Reconnecting" state would entail. From the WIP code, it looks like there are two event types: Connection and Disconnection.

I assume that, if modelled as a state machine, a Producer would start off in Disconnected. It would likely only have a single transition from there to Connected. But would there only be these two states, or would Reconnecting be it's own, and if so: would there be a "Reconnection" event published?

@Gsantomaggio
Copy link
Member Author

it looks like there are two event types: Connection and Disconnection.

@TroelsL this was just a proposal. Of course, we will add other types like:

  • Reconnection ( with all the details )
  • Metadata update
  • CRC error
  • Producer Error
  • Consumer-Parse error

etc...

Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
@Gsantomaggio
Copy link
Member Author

It is still a work in progress.
The bus can handle the events from the RawProducer and the Producer classes and only connection/disconnection/reconnection.

So given:

rabbitmq-streams add_super_stream invoices --partitions 3

And:

 var streamSystem = await StreamSystem.Create(new StreamSystemConfig());
        await streamSystem.CreateStream(new StreamSpec("my-stream"));
        IEventBus bus = new StreamEventsBus();
        bus.Subscribe<RawProducerConnected>(async connected =>
        {
            myLogger.LogInformation(
                "The raw producer {ClientProvidedName} is connected to the stream {Stream}",
                connected.Parameters.ClientProvidedName, connected.Instance.Info.Stream);

            await Task.CompletedTask;
        });

        bus.Subscribe<RawProducerDisconnected>(async disconnected =>
        {
            myLogger.LogInformation(
                "The raw producer {ClientProvidedName} is disconnected from the stream {Stream}",
                disconnected.Parameters.ClientProvidedName, disconnected.Instance.Info.Stream);
            await Task.CompletedTask;
        });

        bus.Subscribe<ProducerReconnected>(async reconnected =>
        {
            var value = (reconnected.IsReconnection) ? "is in reconnection.." : "ended the reconnection";
            myLogger.LogInformation("The producer {ClientProvidedName} {Value} to the stream {Stream}", reconnected.Instance.Info.ClientProvidedName, value, reconnected.Instance.Info.Stream);
            await Task.CompletedTask;
        });

        var superStreamProducer = await Producer.Create(new ProducerConfig(streamSystem, SystemUtils.InvoicesExchange)
        {
            SuperStreamConfig = new SuperStreamConfig()
            {
                Routing = message => message.Properties.MessageId.ToString(),
                RoutingStrategyType = RoutingStrategyType.Hash
            },
            ClientProvidedName = "my_super_producer",
            Events = bus,
        });

        var standardProducer = await Producer.Create(new ProducerConfig(streamSystem, "my-stream")
        {
            ClientProvidedName = "my_producer",
            Events = bus,
        });


        for (ulong i = 0; i < 2000; i++)
        {
            var message = new Message(Encoding.Default.GetBytes("hello"))
            {
                Properties = new Properties() {MessageId = $"hello{i}"}
            };
            await superStreamProducer.Send(message);
            Thread.Sleep(1 * 1000);
        }
    }

The connection part for the super stream is:

[10:23:16] info: example.MyEventsTest[0] The raw producer my_super_producer#invoices-0 is connected to the stream invoices-0
[10:23:16] info: example.MyEventsTest[0] The raw producer my_super_producer#invoices-1 is connected to the stream invoices-1
[10:23:16] info: example.MyEventsTest[0] The raw producer my_super_producer#invoices-2 is connected to the stream invoices-2

for the standard stream is:

[10:23:16] info: example.MyEventsTest[0] The raw producer my_producer is connected to the stream my-stream

I case I kill the standard producer connection the events will be:

[10:23:45] info: example.MyEventsTest[0] The raw producer my_producer is disconnected from the stream my-stream
[10:23:45] info: example.MyEventsTest[0] The producer my_producer is in reconnection.. to the stream my-stream
[10:23:45] info: example.MyEventsTest[0] The raw producer my_producer is connected to the stream my-stream
[10:23:45] info: example.MyEventsTest[0] The producer my_producer ended the reconnection to the stream my-stream

The behaviour for the super stream is a bit different: the connection is re-created as soon a message is sent to the partition stream.

The basic idea is to define your bus with the subscriptions you want to listen to and handle it with different information.

The bus is an interface public interface IEventBus.So the user can write its own event logger in case the default one is not enough.

@Gsantomaggio
Copy link
Member Author

closed in favour of #336

@Zerpet Zerpet deleted the event_bus branch December 15, 2023 16:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants