A framework to simplify the implementation of an event-bus oriented microservices architecture.
$ npm install msb --save
var msb = require('msb');
See examples of the patterns below.
Every message-broker pattern in this module is based on one of these publish/subscribe patterns. Producers publish messages to a topic on the microservicebus, which gets delivered to subscribed consumers.
- Multiple producers can publish messages on the same topic.
- Producers have no control over how many times its messages are consumed.
- It is the responsibility of consuming services to define how they want to consume messages from a specific topic.
- You can combine different consumer patterns on a single topic.
It's pretty easy to get started with publishing messages. You just need to specify a topic and provide a message.
msb.publisher('test:pubsub')
.publish(message);
Publisher wraps the message internally according to the envelope schema.
There is a set of optional parameters which could be useful for you. You can define them using the following methods:
.withTtl()
- a time-to-live to ensure messages cannot be delivered after this amount of milliseconds. This value is sensitive to synchronisation of clocks between producers and consumers..withTags()
- a list of tags for extra information.withExchangeType()
- allows to specify exchange type (RabbitMQ specific).withRoutingKey()
- allows to specify routing key, should be used in conjunction with exchange typetopic
(RabbitMQ specific)
By providing a callback you can handle messages that cannot be delivered to the message broker, i.e. where an error is passed back.
Here is the ultimate example that contains all available options:
msb.publisher('test:pubsub')
.withTtl(30000)
.withTags(['black','red'])
.withExchangeType('topic')
.withRoutingKey('*.crazy.*')
.publish(payload, (err) => {
if (err) return console.error(err);
//...
});
This is an implementation of Publish-Subscribe Channel pattern All listeners will receive all messages published to this topic, as long as they are online at the time the message is published
msb.subscriber('test:pubsub')
.createEmitter()
.on('message', (message)=> console.log(message));
.on('error', console.error);
This is an implementation of Competing Consumers pattern Only one listener will receive each message published to the specified topic.
msb.subscriber('test:pubsub')
.withGroupId('example-string')
.createEmitter()
.on('message', (message)=> console.log(message));
.on('error', console.error);
It is possible that listeners receive messages published while they were offline, queued up in the message broker.
In order to achieve this you need to define a queue as a durable using method .withDurable(true)
msb.subscriber('test:pubsub')
.withGroupId('example-string')
.withDurable(true)
.createEmitter()
.on('message', (message)=> console.log(message));
.on('error', console.error);
Notes:
- Messages that has a time-to-live (ttl) specified will not be delivered after this time has been exceeded.
- Messages will only be queued from the first time this listener has been instantiated.
The simplest way to do a 1-1 request is to provide only a topic and a JSON-serializable payload. Should multiple responders attempt to respond, only the first response to be received will be provided to the callback.
msb.request('example:topic', payload, function(err, payload, _fullMesssage) {
if (err) return debug(err)
//...
});
Additional settings can be provided:
msb.request({
namespace: 'example:topic',
waitForResponsesMs: 1000
}, payload, function(err, payload, _fullMesssage) {
if (err) return debug(err)
//...
});
A single payload is published with a return topic derived from the namespace that will ensure responses are received by this requester.
The requester will listen for multiple responses for the specified amount of time.
var requester = msb.Requester({
namespace: 'example:topic',
waitForResponsesMs: 10000 // a.k.a. responseTimeout
})
requester
.on('payload', function(payload, _fullMessage) {
//...
})
.on('error', function(err) {
debug(err)
})
.on('end', function() {
//... Note: won't fire if the requester encountered an error
})
.publish(payload)
The requester will 'end' once this number of responses have been received.
var requester = msb.Requester({
namespace: 'example:topic',
waitForResponses: 1
})
//...
Responders have the ability to change the expected number of responses or how long the requester should wait for responses from that responder. If you want to guarantee that the requester will wait for such messages (acks) to be received, you should specify a minimum time for the requester to wait.
var requester = msb.Requester({
namespace: 'example:topic',
waitForAcksMs: 1000, // a.k.a. ackTimeout
waitForResponses: 1
})
//...
In the above case, the requester will only ever end after the specified waitForAcksMs
.
A single response (per responder) for each incoming request.
msb.Responder.createServer({
namespace: 'example:topic'
})
.use(function(request, response, next) {
var body = {}
response.writeHead(200) // HTTP-compatible
response.end(body) // To be provided in response `payload.body`
})
.listen()
An ack is sent to ensure the requester will continue to wait for this response.
msb.Responder.createServer({
namespace: 'example:topic'
})
.use(function(request, response, next) {
var expectedResponses = 1
var expectedTimeForResponse = 3000
response.responder.sendAck(expectedResponses, expectedTimeForResponse, next)
})
.use(function(request, response, next) {
var body = {}
response.writeHead(200) // HTTP-compatible
response.end(body) // To be provided in response `payload.body`
})
.listen()
An ack is sent to ensure the requester will wait for the multiple responses being prepared.
msb.Responder.createEmitter({
namespace: 'example:topic'
})
.on('responder', function(responder) {
responder.sendAck(3, 5000)
var i = 0;
while (i++ < 3) {
var payload = {
body: {
//...
}
}
responder.send(payload, function(err) {
if (err) return debug(err)
})
}
})
Loads the provided config object over the configuration for the channelManager. E.g.
msb.configure(config); // Default channelManager, or
msb.createChannelManager().configure(config); // Additional channelManager
Note: It is recommended that you do not re-configure after publisher/subscriber channels have been created.
- MSB_SERVICE_NAME The string used to identify the type of service, also used as the default for the broker groupId. (Default:
name
in the package.json of the main module.) - MSB_SERVICE_VERSION (Default:
version
in the package.json of the main module.) - MSB_SERVICE_INSTANCE_ID (Default: generated universally unique 12-byte/24-char hex string.)
- MSB_BROKER_ADAPTER One of 'amqp' or 'local' (Default: 'amqp')
- MSB_BROKER_HOST, default "127.0.0.1".
- MSB_BROKER_PORT, default 5672.
- MSB_BROKER_USER_NAME, default "guest".
- MSB_BROKER_PASSWORD, default "guest".
- MSB_BROKER_VIRTUAL_HOST, default "/".
- MSB_BROKER_USE_SSL, default "false".
- MSB_BROKER_RECONNECT A boolean which determines whether the connection with the broker should be automatically recovered from failures (Default: 'false')
- MSB_CONFIG_PATH Loads the JSON/JS file at this path over the base channelManager configuration. Similar to calling
channelManager.configure(config)
programmatically.
By default, MSB does not automatically recover failed connection with the broker. When a connection failure occurs, an error will be raised and the application process will terminate. This behaviour can be overridden by setting MSB_BROKER_RECONNECT=true
. There is a drawback in the implementation of that functionality though. The reconnection will be done silently, without emitting any events or logging errors. Due to that it might be hard to understand if a microservice is connected to the broker or is in the middle of connection retry.
Listens to a topic on the bus and prints JSON to stdout. By default it will also listen for response topics detected on messages, and JSON is pretty-printed. For Newline-delimited JSON compatibility, specify -p false
.
$ node_modules/msb/bin/msb -t topic:to:listen:to
Or if globally installed, i.e. npm install msb -g
:
$ msb -t topic:to:listen:to
Options:
- --topic or -t
- --follow or -f listen for following topics, empty to disable (Default: response)
- --pretty or -p set to false to use as a newline-delimited json stream, (Default: true)
- http2bus Provides HTTP endpoints for services exposed through the bus.
- bus2http Exposes HTTP endpoints through the bus.
- bus2aws Generic adapter to send messages from the bus to AWS services.
- es-archiver Archives all messages to Elasticsearch.
- msb-java Java API
RabbitMQ is the default message broker used. The AMQP adapter is tested with RabbitMQ and it implements a limited topology for simplification. One exchange is created per topic and a queue is created for every group of similar services, configured using a groupId. This means that you can have different types of services listening on the same topic, and multiple processes of the same type of service would receive a fair distribution of messages.
A responder lets you send of formatted acks and responses in response to a request message received on a topic/namespace.
- timeoutMs (optional) The requester should wait until at least this amount of milliseconds has passed since the request was published before ending. (Default: previously set value or the default timeout on the requester.)
- responsesRemaining (optional) A positive value increases the amount of responses the requester should wait for from this responder. A negative value reduces the amount of the responses the requester should wait for from this responder. Default: 1
- cb (optional) cb(err) Function that is called after transmission has completed.
- payload An object that can be converted to JSON.
- cb (optional) cb(err) Function that is called after transmission has completed.
The request message this responder is responding to.
- options.namespace String topic name to listen on for messages.
- options.groupId Optional See channelManager.findOrCreateConsumer
- channelManager Optional channelManager. (Default:
msb.channelManager
)
See ResponderServer for options.
- options.namespace String topic name to listen on for requests.
- options.tags Array of Strings Add these tags to responses.
- options.responseChannelTimeoutMs Optional Number of milliseconds for the producer channel to be kept after the last publish. (Default: 15 * 60000/15 minutes)
- options.groupId Optional See channelManager.findOrCreateConsumer
(Use msb.Responder.createServer()
to create instances.)
- fnOrArr Function or Array of middleware-like functions with signature:
function handler(request, response, next)
- request The payload on the incoming message.
- response ResponderResponse object.
- next Function To call if response was not fulfilled, with an error object where an error occurred.
function errorHandler(err, request, response, next)
- err Error Passed to a previous
next()
call. - request, response, next as above.
Call this to start listening for requests.
- channelManager Optional channelManager. (Default:
msb.channelManager
)
Passed to ResponderServer middelware-like functions. The interface is kept similar to core HttpServerResponse for convenience.
See http.
- statusCode Number Corresponding HTTP status code.
- statusMessage String Corresponding HTTP status message.
- headers Object
- body Optional String|Object|Buffer
- cb Optional Function Callback to be called when response has been successfully sent or on error.
The Responder object used to send acks and responses.
An requester is a collector component that can also publish new messages on the bus.
- options.namespace String Publish request message on this topic and listen on this appended by ':response'.
- options.tags Array of Strings Add these tags to the published message.
- options.waitForAcksMs Optional Will wait at least this amount of ms for acks, before ending.
- options.waitForResponsesMs Optional Will wait at least this amount of ms to receive the expected number of responses, before ending. (Default: 3000).
- options.waitForResponses Optional Number of responses the collector expects before either ending or timing out. (Default: Infinity/-1, i.e. only end on timeout. You will typically set this to 1.)
- options.requestChannelTimeoutMs Number of milliseconds for the producer channel to be kept after the request is published. (Default: 15 * 60000/15 minutes)
- originalMessage Optional (Object|null) Message this request should inherit tags from. Explicitly specify
null
to prevent inheritance from the currentmessageFactory
context.
- payload Object Contains typical payload.
- cb Function Callback to be called on success or error.
function(payload, _message) { }
- payload Object Response message payload.
- _message Object The full response message. In most cases it should not be needed.
function(ack, _message) { }
- ack Object Response message ack.
- _message Object The full ack-containing message. In most cases it should not be needed.
Emitted either on timeout or when the expected number of responses has been received.
A collector is a component that listens for multiple response messages, with timeouts and number of responses determining its lifetime.
(For events and instantiation, see Requester.)
A simpler API for 1-1 request/responses.
- options Object See Requester common options.
- namespace or options.namespace String The namespace to send the request on.
- options.responseSchema JSON schema schema object, describing the expected response payload.
- options.channelManager Optional Alternative channelManager to use.
- options.originalMessage Optional See originalMessage provided to Requesters.
A function that throws a validation error if the message does not validate.
- schema JSON schema schema object.
- message The message to be validated.
Returns a middleware-style function, e.g. function(request, response, next) { }
, to be used in a ResponderServer middleware chain, that will pass a validation error to next()
for invalid incoming requests.
- payloadSchema JSON schema schema object, describing the incoming
request
.
E.g. responderServer.use(msb.validateWithSchema.middleware(payloadSchema));
Returns an event handler function, e.g. function(payload) { ... }
.
- schema JSON schema schema object, describing the incoming event message.
- successFn Function
function(payload) { }
An event handler that will only be called if the incoming payload validates. - errorFn Optional Function
function(err, payload) { }
A function that will be called with the validation error and original payload if the incoming message fails validation.
Note: Without an errorEventHandlerFn
, errors will be emitted on the original event emitter.
E.g.
requester
.on(msb.validateWithSchema.onEvent(messageSchema, function(payload) {
...
}))
.on('error', function(err, payload) {
console.error(err);
requester.end();
}));
The channel manager enables re-use of channels listening/publishing per topic. It is an EventEmitter instance used as a singleton with the app-wide configuration.
var channelManager = msb.channelManager;
Returns a producer for this topic. Either existing or new. Corresponding channelManager
events will be emitted for this producer.
Returns a consumer listening on this topic. Either existing or new. Corresponding channelManager events will be emitted for this consumer. If config.cleanupConsumers
is set, these consumers will be removed as soon as there are no more listeners for them. If an app-wide schema exists, it will be checked for every incoming message.
- topic String
- options.groupId String Custom group identifier for round-robin message queue.
- options.groupId Boolean Set to
false
for broadcast-style message queue. - options.autoConfirm Optional Boolean Set to
false
to require explicit confirmation of processed messages. (Default: true)
- topic The name of the topic a new producer has been created for.
- topic The name of the topic a message has been successfully published for
- topic The name of the topic a new producer has been created for.
- topic The name of the topic a new producer has been created for.
- topic The name of the topic a message has been successfully published for.
(Created using the channelManager.findOrCreateProducer
.)
- message Either a string or an object that will be converted to JSON.
- cb cb(err) Function that is called after transmission has completed.
(Created using the channelManager.findOrCreateConsumer
.)
Stops listening for messages on this topic. If config.cleanupConsumers
is set, and this consumer was created using channelManager.findOrCreateConsumer
, it would be removed from the channelManager
.
- cb Function that is called if the consumer is already consuming or once when it next starts consuming.
Confirms with the broker (where supported) that processing of this message has completed. Only works where the broker-adapter is AMQP, and where config.autoConfirm
is set to false
.
- message The message originally emitted by this consumer, by reference.
Confirms with the broker (where supported) that this message should not be processed, e.g. in cases such as invalid message or TTL reached. Only works where the broker-adapter is AMQP, and where config.autoConfirm
is set to false
.
- message The message originally emitted by this consumer, by reference.
- message a parsed object, validated using the app-wide
config.schema
.
- error Either an error emitted by the underlying driver, or a schema validation error.