Systemic Azure Bus is a systemic component for the Azure Service Bus SDK. Its goal is to help you deal with azure bus topics and queues subscriptions and publications.
This library:
- Enforces the client to use a particular, sensible configuration
- Provides safe defaults for configuration
- Exposes an easy interface for publication/subscription
- Solves error handling
- Allows clients to easily retry, retry with exponential backoff or dead letter a failed message
- Opens/closes the connections
A typical, simple configuration looks like this:
{
connection: {
connectionString: process.env.AZURE_SERVICE_BUS_CONNECTION_STRING,
},
subscriptions: {
topicSubscriptionName: {
topic: 'myTopic',
subscription: 'myTopic.action'
},
},
publications: {
topicPublicationName: {
topic: 'myDestinationTopic',
contentType: 'application/json', // optional - default is json
},
},
}
Names that will included in the topic
and subscription
properties inside each publication / subscription object must match existing ones in your Azure service bus instance.
As for the properties topicSubscriptionName
and topicPublicationName
in this example, those are mere samples and can be fully customized based on your needs. Names you add to all objects you create inside of subscriptions
and publications
are the names that you'll use to reference a specific publication / subscription from within your source code. You can add as many topics as you need in each category.
const initBus = require('systemic-azure-bus');
const { start, stop } = initBus();
...
const api = await start({ config }); // configuration similar to the one above
In order to be able to publish messages in the service bus, the publish
method should be used to generate a function that can be called anytime to publish messages to a specific topic. We can immediatly publish a message by passing the message contents to the method returned by publish
method, as illustrated in the example below:
// Generate method to publish messages in a specific topic
const publicationId = 'topicPublicationName'; // declared in config
const publishInMyPublication = api.publish(publicationId);
// Publish a message to be consumed as soon as possible
await publishInMyPublication(messageBody);
The function returned by the publish
method can optionally receive a second parameter with an options object. The option scheduledEnqueueTimeUtc
can be used to delay the publication of a message so it's not consumed immediatly. Instead, message will be kept on the queue and published at an specific date.
// Generate method to publish messages in a specific topic
const publicationId = 'topicPublicationName'; // declared in config
const publishInMyPublication = api.publish(publicationId);
// Schedule a message to be published at a future date
const delayedPublishDate = new Date(Date.now() + 2000);
await publishInMyPublication(messageBody, { scheduledEnqueueTimeUtc: delayedPublishDate });
A message planned to be published on a future date can be cancelled anytime by using the cancelScheduledMessages
method. We can use this if our scheduled message became obsolete or it's no longer needed for whichever reason, so we prevent it from being sent to the subscribers.
To cancel a message it's necessary that we provide the id of a message, which we can obtain from the return value of the function used to publish the message.
// Generate method to publish messages in a specific topic
const publicationId = 'topicPublicationName'; // declared in config
const publishInMyPublication = api.publish(publicationId);
// Schedule a message to be published at a future date and store its id
const messageId = await publishInMyPublication(messageBody, { scheduledEnqueueTimeUtc: delayedPublishDate });
// Cancel the message
await api.cancelScheduledMessages(publicationId, messageId);
We provide a streaming API to subscribe to a topic and process messages flowing in. The example below illustrates how to use the subscribe
method to be able to listen and process messages on a given topic.
const subscriptionId = 'topicSubscriptionName'; // declared in config
// We need to define a function that will process messages received on this subscription
// The `body` property will contain the actual contents of the message that were published to the service bus
const handleReceivedMessage = ({ body, userProperties }) => {
// do something with message...
};
// We also need to define an error handling function that'll run in case we fail to process the message
const onMessageProcessingError = console.error;
// Start listening to messages in the configured subscription and determine functions that'll be used to handle incoming messages
const subscribe = api.subscribe(onMessageProcessingError);
subscribe(subscriptionId, handleReceivedMessage);
In the case we want to retrieve the rules applied to a subscription, we can use this.
let subscriptionRules = await bus.getSubscriptionRules('topicSubscriptionName');
When a message goes to DLQ (Dead Letter Queue) we can peek those messages with this operation.
const subscriptionId = 'topicSubscriptionName'; // declared in config
const deadMessage = await api.peekDlq(subscriptionId); // retrieves only one
Sometimes we need to process messages in DLQ, i.e. to purge it or to republish and reprocess them. We provide a streaming API to process them.
// Define function to handle messages
const handleReceivedMessage = ({ body, userProperties }) => {
// do something with message...
};
const subscriptionId = 'topicSubscriptionName'; // declared in config
api.processDlq(subscriptionId, handleReceivedMessage);