-
-
Notifications
You must be signed in to change notification settings - Fork 43
Message Storage
Messaging is intended for message passing between one or more systems in disconnected fashion. You can send a message somewhere and current or remote system picks it up for processing later when required. This paradigm somehow fits into CQRS and Message Passing architectural ideas.
To name a few examples, Apache Kafka, RabbitMQ, Azure Service Bus are all falling into this category - essentially they are designed to pass messages. Some systems are more advanced to others of course, but most often it doesn't really matter.
FluentStorage supports many messaging providers out of the box, including Azure Service Bus Topics and Queues, Azure Event Hub and others.
There are two abstractions available - message publisher and message receiver. As the name stands, one is publishing messages, and another is receiving them on another end.
To publish messages you will usually construct an instance of IMessagePublisher
with an appropriate implementation. All the available implementations can be created using factory methods in the FluentStorage.StorageFactory.Messages
class. More methods appear in that class as you reference an assembly containing specific implementations.
Similarly, to receive messages you can use factory methods to create receivers which all implement IMessageReceiver
interface.
The primary method of this interface
Task StartMessagePumpAsync(
Func<IEnumerable<QueueMessage>, Task> onMessageAsync,
int maxBatchSize = 1,
CancellationToken cancellationToken = default);
starts a message pump that listens for incoming queue messages and calls Func<IEnumerable<QueueMessage>, Task>
as a call back to pass those messages to your code.
maxBatchSize
is a number specifying how many messages you are ready to handle at once in your callback. Choose this number carefully as specifying number too low will result in slower message processing, whereas number too large will increase RAM requirements for your software.
cancellationToken
is used to signal the message pump to stop. Not passing any parameter there will result in never stopping message pump. See example below in Use Cases for a pattern on how to use this parameter.
You can find the list of supported messaging implementations here.
FluentStorage provides built-in capability to handle large message content by allowing you to offload message content over a certain threshold to an external blob storage. It works in the following way:
- Check that message content is larger than
threshold value
. - If not, do the usual processing.
- If it is, upload message content as a blob to external storage, clear message content and add a custom header
x-sn-large
that points to the blob containing message content.
When receiving messages, it will check that x-sn-large
header is present, and if so, will download blob, set it's content as message content, and return the message to the receiver.
Blob is deleted from the blob storage only when message is confirmed by the receiver.
Large message handling works on any supported queue implementation because it's implemented in the core library itself, outside of specific queue implementation. To enable it, call .HandleLargeContent
on both publisher and receiver:
IBlobStorage offloadStorage = ...; // your blob storage for offloading content
IMessagePublisher publisher = StorageFactory.Messages
.XXXPublisher(...)
.HandleLargeContent(offloadStorage, thresholdValue);
IMessageReceiver receiver = StorageFactory.Messages
.XXXReceiver(...)
.HandleLargeContent(offloadStorage);
QueueMessage
class itself is not a serialisable entity when we talk about JSON or built-in .NET binary serialisation due to the fact it is a functionally rich structure. However, you might want to transfer the whole QueueMessage
across the wire sometimes. For these purposes you can use built-in binary methods:
var qm = new QueueMessage("id", "content");
qm.DequeueCount = 4;
qm.Properties.Add("key", "value");
byte[] wireData = qm.ToByteArray();
//transfer the bytes
QueueMessage receivedMessage = QueueMessage.FromByteArray(wireData);
These methods make sure that all of the message data is preserved, and also are backward compatible between any changes to this class.