Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce CancellationToken and IConsumerContext<T> into the consumer interfaces #259

Merged
merged 1 commit into from
Jun 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ Another service (or application layer) handles the message:
```cs
public class SomeMessageConsumer : IConsumer<SomeMessage>
{
public async Task OnHandle(SomeMessage message)
public async Task OnHandle(SomeMessage message, CancellationToken cancellationToken)
{
// handle the message
}
Expand All @@ -132,7 +132,7 @@ The receiving side handles the request and replies:
```cs
public class SomeRequestHandler : IRequestHandler<SomeRequest, SomeResponse>
{
public async Task<SomeResponse> OnHandle(SomeRequest request)
public async Task<SomeResponse> OnHandle(SomeRequest request, CancellationToken cancellationToken)
{
// handle the request message and return a response
return new SomeResponse { /* ... */ };
Expand Down Expand Up @@ -211,7 +211,7 @@ The domain event handler implements the `IConsumer<T>` interface:
// domain event handler
public class OrderSubmittedHandler : IConsumer<OrderSubmittedEvent>
{
public Task OnHandle(OrderSubmittedEvent e)
public Task OnHandle(OrderSubmittedEvent e, CancellationToken cancellationToken)
{
// ...
}
Expand Down Expand Up @@ -270,7 +270,7 @@ The `SlimMessageBus` configuration for the in-memory provider looks like this:
```cs
//IServiceCollection services;

// Cofigure the message bus
// Configure the message bus
services.AddSlimMessageBus(mbb =>
{
mbb.WithProviderMemory();
Expand Down
92 changes: 62 additions & 30 deletions docs/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
- [Consume the request message (the request handler)](#consume-the-request-message-the-request-handler)
- [Request without response](#request-without-response)
- [Static accessor](#static-accessor)
- [Dependency resolver](#dependency-resolver)
- [Dependency Resolver](#dependency-resolver)
- [Dependency auto-registration](#dependency-auto-registration)
- [ASP.Net Core](#aspnet-core)
- [Modularization of configuration](#modularization-of-configuration)
Expand Down Expand Up @@ -123,7 +123,7 @@ Having done the SMB setup, one can then inject [`IMessageBus`](../src/SlimMessag

> The `IMessageBus` implementations are lightweight and thread-safe.

For completness, please also see the [Hybrid provider configuration](provider_hybrid.md#configuration) which might be needed if the application needs to use more than one transport.
For completeness, please also see the [Hybrid provider configuration](provider_hybrid.md#configuration) which might be needed if the application needs to use more than one transport.

## Pub/Sub communication

Expand All @@ -150,6 +150,9 @@ await bus.Publish(msg);

// OR delivered to the specified topic (or queue)
await bus.Publish(msg, "other-topic");

// pass cancellation token
await bus.Publish(msg, cancellationToken: ct);
```

> The transport plugins might introduce additional configuration options. Please check the relevant provider docs. For example, Azure Service Bus, Azure Event Hub and Kafka allow setting the partitioning key for a given message type.
Expand Down Expand Up @@ -178,7 +181,7 @@ mbb
})
```

Finally, it is possible to specify a headers modifier for the entire bus:
Finally, it is possible to specify a headers modifier for the entire bus (it will apply to all outgoing messages):

```cs
mbb
Expand All @@ -198,7 +201,7 @@ mbb.Consume<SomeMessage>(x => x
.WithConsumer<SomeConsumer>() // (1)
// if you do not want to implement the IConsumer<T> interface
// .WithConsumer<AddCommandConsumer>(nameof(AddCommandConsumer.MyHandleMethod)) // (2) uses reflection
// .WithConsumer<AddCommandConsumer>((consumer, message) => consumer.MyHandleMethod(message)) // (3) uses a delegate
// .WithConsumer<AddCommandConsumer>((consumer, message, consumerContext, cancellationToken) => consumer.MyHandleMethod(message)) // (3) uses a delegate
.Instances(1)
//.KafkaGroup("some-consumer-group")) // Kafka provider specific extensions
```
Expand All @@ -208,7 +211,7 @@ When the consumer implements the `IConsumer<SomeMessage>` interface:
```cs
public class SomeConsumer : IConsumer<SomeMessage>
{
public async Task OnHandle(SomeMessage msg)
public async Task OnHandle(SomeMessage msg, CancellationToken cancellationToken)
{
// handle the msg
}
Expand All @@ -219,7 +222,7 @@ The `SomeConsumer` needs to be registered in the DI container. The SMB runtime w

> When `.WithConsumer<TConsumer>()` is not declared, then a default consumer of type `IConsumer<TMessage>` will be assumed (since v2.0.0).

Alternatively, if you do not want to implement the `IConsumer<SomeMessage>`, then you can provide the method name (2) or a delegate that calls the consumer method (3).
Alternatively, if you do not want to implement the `IConsumer<SomeMessage>`, then you can provide the method name _(2)_ or a delegate that calls the consumer method _(3)_.
`IConsumerContext` and/or `CancellationToken` can optionally be included as parameters to be populated on invocation when taking this approach:

```cs
Expand All @@ -235,7 +238,7 @@ public class SomeConsumer
#### Start or Stop message consumption

By default message consumers are started as soon as the bus is created. This means that messages arriving on the given transport will be processed by the declared consumers.
If you want to prevent this default use the follwing setting:
If you want to prevent this default use the following setting:

```cs
mbb.AutoStartConsumersEnabled(false); // default is true
Expand All @@ -259,27 +262,49 @@ await consumerControl.Stop();

#### Consumer context (additional message information)

> Changed in version 1.15.0

The consumer can access the [`IConsumerContext`](../src/SlimMessageBus/IConsumerContext.cs) object which:

- allows to access additional message information - topic (or queue) name the message arrived on, headers, cancellation token,
- enable the transport provider to pass additional message information specific to the chosen transport.

Examples of such transport specific information are the Azure Service Bus UserProperties, or Kafka Topic-Partition offset.

To use it the consumer has to implement the [`IConsumerWithContext`](../src/SlimMessageBus/IConsumerWithContext.cs) interface:
The recommended (and newer) approach is to define a consumer type that implements `IConsumer<IConsumerContext<TMessage>>`.
For example:

```cs
// The consumer wraps the message type in IConsumerContext<T>
public class PingConsumer : IConsumer<IConsumerContext<PingMessage>>
{
public Task OnHandle(IConsumerContext<PingMessage> context, CancellationToken cancellationToken)
{
var message = context.Message; // the message (here PingMessage)
var topic = context.Path; // the topic or queue name
var headers = context.Headers; // message headers
// Kafka transport specific extension (requires SlimMessageBus.Host.Kafka package):
var transportMessage = context.GetTransportMessage();
var partition = transportMessage.TopicPartition.Partition;
}
}

// To declare the consumer type use the .WithConsumerOfContext<TConsumer>() method
mbb.Consume<SomeMessage>(x => x
.Topic("some-topic")
.WithConsumerOfContext<PingConsumer>()
);
```

The other approach is for the consumer to implement the [`IConsumerWithContext`](../src/SlimMessageBus/IConsumerWithContext.cs) interface:

```cs
public class PingConsumer : IConsumer<PingMessage>, IConsumerWithContext
{
public IConsumerContext Context { get; set; }

public Task OnHandle(PingMessage message)
public Task OnHandle(PingMessage message, CancellationToken cancellationToken)
{
var topic = Context.Path; // the topic or queue name
var headers = Context.Headers; // message headers
var cancellationToken = Context.CancellationToken;
// Kafka transport specific extension (requires SlimMessageBus.Host.Kafka package):
var transportMessage = Context.GetTransportMessage();
var partition = transportMessage.TopicPartition.Partition;
Expand Down Expand Up @@ -455,7 +480,7 @@ The request handling micro-service needs to have a handler that implements `IReq
```cs
public class SomeRequestHandler : IRequestHandler<SomeRequest, SomeResponse>
{
public async Task<SomeResponse> OnHandle(SomeRequest request)
public async Task<SomeResponse> OnHandle(SomeRequest request, CancellationToken cancellationToken)
{
// handle the request
return new SomeResponse();
Expand Down Expand Up @@ -495,7 +520,7 @@ public class SomeRequest : IRequest
// The handler has to use IRequestHandler<T> interface
public class SomeRequestHandler : IRequestHandler<SomeRequest>
{
public async Task OnHandle(SomeRequest request)
public async Task OnHandle(SomeRequest request, CancellationToken cancellationToken)
{
// no response returned
}
Expand Down Expand Up @@ -528,7 +553,7 @@ This allows to easily look up the `IMessageBus` instance in the domain model lay

See [`DomainEvents`](../src/Samples/Sample.DomainEvents.WebApi/Startup.cs#L79) sample it works per-request scope and how to use it for domain events.

## Dependency resolver
## Dependency Resolver

SMB uses the [`Microsoft.Extensions.DependencyInjection`](https://www.nuget.org/packages/Microsoft.Extensions.DependencyInjection) container to obtain and manage instances of the declared consumers (class instances that implement `IConsumer<>` or `IRequestHandler<>`) or interceptors.

Expand Down Expand Up @@ -598,7 +623,7 @@ There is also an option to provide a type filter predicate. This might be helpfu
```cs
services.AddSlimMessageBus(mbb =>
{
// Register the found types that contain DomainEventHandlers in the namespacce
// Register the found types that contain DomainEventHandlers in the namespace
mbb.AddConsumersFromAssembly(Assembly.GetExecutingAssembly(), filter: (type) => type.Namespace.Contains("DomainEventHandlers"));
};
```
Expand All @@ -613,17 +638,20 @@ services.AddHttpContextAccessor(); // This is required for the SlimMessageBus.Ho
services.AddSlimMessageBus(mbb =>
{
// ...
mbb.AddAspNet(); // reqires SlimMessageBus.Host.AspNetCore
mbb.AddAspNet(); // requires SlimMessageBus.Host.AspNetCore
};
```

### Modularization of configuration

> Since version 2.0.0

The SMB bus configuration can be split into modules. This allows to keep the bus configuration alongside the relevant application module (or layer).
The SMB bus configuration can be split into modules. This allows to keep the bus configuration alongside the relevant application module (or layer):

The `services.AddSlimMessageBus(mbb => { })` can be called multiple times. The end result will be a sum of the configurations (the supplied `MessageBusBuilder` instance will be the same). Consider the example:
- The `services.AddSlimMessageBus(mbb => { })` can be called multiple times.
- The end result will be a sum of the configurations (the supplied `MessageBusBuilder` instance will be the same).

Consider the example:

```cs
// Module 1
Expand Down Expand Up @@ -651,7 +679,8 @@ services.AddSlimMessageBus(mbb =>
});
```

Before version 2.0.0 there was support for modularity using `IMessageBusConfigurator` implementation. However, the interface was deprecated in favor of the `AddSlimMessageBus()` extension method that was made additive.
Before version 2.0.0 there was support for modularity using `IMessageBusConfigurator` implementation.
However, the interface was deprecated in favor of the `AddSlimMessageBus()` extension method that was made additive.

### Auto registration of consumers and interceptors

Expand All @@ -662,7 +691,7 @@ The `mbb.AddServicesFromAssembly()` extension method performs search for any imp
- consumers `IConsumer<T>`, `IRequestHandler<T, R>` or `IRequestHandler<T>`,
- [interceptors](#interceptors)

Found types are registered (by default as `Transient`) servcices with the MSDI container.
Found types are registered (by default as `Transient`) services with the MSDI container.

```cs
services.AddSlimMessageBus(mbb =>
Expand All @@ -685,7 +714,7 @@ The `MessageType` header will be set for every published (or produced) message t

This approach allows SMB to send polymorphic message types (messages that share a common ancestry) and even send unrelated message types via the same topic/queue transport.

This mechanism should work fine with serializers that support polimorphic serialization (e.g. Newtonsoft.Json) and have that feature enabled. In such case a message type discriminator (e.g. `$type` property for Newtonsoft.Json) will be added by the serializer to the message payload, so that the deserializer on the consumer end knows to what type to deserialize the message to.
This mechanism should work fine with serializers that support polymorphic serialization (e.g. Newtonsoft.Json) and have that feature enabled. In such case a message type discriminator (e.g. `$type` property for Newtonsoft.Json) will be added by the serializer to the message payload, so that the deserializer on the consumer end knows to what type to deserialize the message to.
However, the `MessageType` header takes precedence in SMB in matching the correct consumer.

> For better interoperability, the `MessageType` header is optional. This is to support the scenario that other publishing system does not use SMB nor is able to set the header. However, in the absence of `MessageType` header the SMB consumer side, should expect only one type per topic/queue. If there were more than one message types on the same topic (or queue) SMB would not be able to infer what type actually arrived.
Expand All @@ -706,12 +735,12 @@ mbb.Produce<OrderEvent>(x => x.DefaultTopic("events"));

public class CustomerEventConsumer : IConsumer<CustomerEvent>
{
public Task OnHandle(CustomerEvent e) { }
public Task OnHandle(CustomerEvent e, CancellationToken cancellationToken) { }
}

public class OrderEventConsumer : IConsumer<OrderEvent>
{
public Task OnHandle(OrderEvent e) { }
public Task OnHandle(OrderEvent e, CancellationToken cancellationToken) { }
}

// which consume from the same topic
Expand Down Expand Up @@ -786,12 +815,12 @@ Given the following consumers:
```cs
public class CustomerEventConsumer : IConsumer<CustomerEvent>
{
public Task OnHandle(CustomerEvent e) { }
public Task OnHandle(CustomerEvent e, CancellationToken cancellationToken) { }
}

public class CustomerCreatedEventConsumer : IConsumer<CustomerCreatedEvent>
{
public Task OnHandle(CustomerCreatedEvent e) { }
public Task OnHandle(CustomerCreatedEvent e, CancellationToken cancellationToken) { }
}
```

Expand All @@ -814,7 +843,7 @@ mbb.Consume<CustomerEvent>(x =>
});
```

All the arriving polymorphic message types will be matched agaist the declared consumers types that could accept the arrived message type and they will be activated.
All the arriving polymorphic message types will be matched against the declared consumers types that could accept the arrived message type and they will be activated.

In this example:

Expand Down Expand Up @@ -1048,8 +1077,11 @@ For example, Apache Kafka requires `mbb.KafkaGroup(string)` for consumers to dec
Providers:

- [Apache Kafka](provider_kafka.md)
- [Azure Service Bus](provider_azure_servicebus.md)
- [Azure Event Hubs](provider_azure_eventhubs.md)
- [Redis](provider_redis.md)
- [Azure Service Bus](provider_azure_servicebus.md)
- [Hybrid](provider_hybrid.md)
- [MQTT](provider_mqtt.md)
- [Memory](provider_memory.md)
- [Hybrid](provider_hybrid.md)
- [RabbitMQ](provider_rabbitmq.md)
- [Redis](provider_redis.md)
- [SQL](provider_sql.md)
Loading
Loading