Skip to content

Commit

Permalink
chore: setup RabbitMQ broker
Browse files Browse the repository at this point in the history
  • Loading branch information
th3hunt committed Oct 9, 2019
1 parent 23f550c commit 9d1610f
Show file tree
Hide file tree
Showing 6 changed files with 225 additions and 5 deletions.
2 changes: 1 addition & 1 deletion .babelrc
Expand Up @@ -13,7 +13,7 @@
[
"module-resolver",
{
"root": ["./src"],
"root": ["./src", "./config"],
"alias": {
"logger": "./src/logger"
}
Expand Down
89 changes: 89 additions & 0 deletions config/rabbitmq.js
@@ -0,0 +1,89 @@
const EventEmitter = require('events');
const amqp = require('amqplib');

const RABBITMQ_HOST = process.env.RABBITMQ_HOST || 'localhost';
const RABBITMQ_PORT = process.env.RABBITMQ_PORT || 5672;
const RABBITMQ_USER = process.env.RABBITMQ_USER || 'devuser';
const RABBITMQ_PASS = process.env.RABBITMQ_PASS || 'devpass';
const RABBITMQ_VHOST = process.env.RABBITMQ_VHOST || '/';

/**
* A BrokkerConnector for RabbitMQ
*
* {@see BrokerConnector}
*/
class RabbitConnector extends EventEmitter {
constructor() {
super();
this.conn = null;
this.onClose = this.onClose.bind(this);
this.onError = this.onError.bind(this);
}

get connection() {
return this.conn ? Promise.resolve(this.conn) : this.connect();
}

async connect() {
const conn = await amqp.connect({
protocol: 'amqp',
hostname: RABBITMQ_HOST,
port: Number(RABBITMQ_PORT),
username: RABBITMQ_USER,
password: RABBITMQ_PASS,
vhost: RABBITMQ_VHOST
});

conn.on('error', this.onError);
conn.on('close', this.onClose);

this.conn = conn;
this.emit('connect', conn);

return conn;
}

disconnect() {
return new Promise(resolve => {
if (!this.conn) {
resolve();
}
this.conn.off('close', this.onClose);
this.conn.close().then(() => {
this.emit('close', this.conn);
this.conn = null;
resolve();
});
});
}

reconnect() {
this.emit('reconnect');
setTimeout(() => this.connect(), 500);
}

onClose() {
this.emit('close', this.conn);
this.conn = null;
this.reconnect();
}

onError(err) {
this.emit('error', err);
this.conn = null;
if (err.message !== 'Connection closing') {
this.reconnect();
}
}
}

module.exports = new RabbitConnector();

/**
* A BrokerConnector provides a persistent connection
* to a message broker with reconnect on failure guarantees.
*
* @typedef {Object} BrokerConnector
* @property {Promise<object>} connection - the connection as an object, will connect first if not already connected
* @property {function():Promise<object>} connect - create a new connection
*/
1 change: 1 addition & 0 deletions package.json
Expand Up @@ -53,6 +53,7 @@
}
},
"dependencies": {
"amqplib": "0.5.5",
"boom": "7.3.0",
"commander": "3.0.2",
"dotenv": "8.1.0",
Expand Down
57 changes: 57 additions & 0 deletions src/broker/index.js
@@ -0,0 +1,57 @@
/* eslint-disable global-require */
import logger from 'logger';
import CreateOrderCommand from 'domain/orders/create/command';

const inputQueue = 'hexagonal.input';

class Broker {
constructor(rabbit, commandBus) {
this.rabbit = rabbit;
this.commandBus = commandBus;
}

async start() {
this.conn = await this.rabbit.connection;
this.channel = await this.conn.createChannel();
this.channel.assertQueue(inputQueue, {durable: true});
this.channel.consume(inputQueue, this.consumeMessage.bind(this));
}

async consumeMessage(msg) {
let response;
let command;

try {
const payload = JSON.parse(msg.content.toString());
const {commandType, commandBody} = payload;
switch (commandType) {
case 'CreateOrder':
command = CreateOrderCommand.buildFromJSON(commandBody);
break;
default:
throw new Error('Unknown commandType');
}
response = this.commandBus.execute(command);
} catch (error) {
logger.error(error.message);
response = error.message;
this.channel.nack(msg);
return;
}

if (msg.properties.replyTo) {
const replyOpts = {
correlationId: msg.properties.correlationId
};
this.channel.sendToQueue(
msg.properties.replyTo,
Buffer.from(JSON.stringify(response)),
replyOpts
);
}

this.channel.ack(msg);
}
}

module.exports = Broker;
14 changes: 14 additions & 0 deletions src/index.js
Expand Up @@ -7,6 +7,8 @@ import logger from 'logger';

import EventEmitter from 'events';
import createHttpServer from './http-server';
import rabbit from '../config/rabbitmq';
import Broker from './broker';
import CommandBus from './domain/commandBus';
import Registry from './domain/registry';

Expand All @@ -21,11 +23,23 @@ const commandBus = new CommandBus({

async function terminate() {
// shut down adapters one by one
logger.info('Closing RabbitMQ connections...');
await rabbit.disconnect();
process.exit(1);
}

logger.info('Launching service in %s mode...', NODE_ENV);

// Broker
rabbit.connect().then(() => {
logger.info('Connected to RabbitMQ broker');
const broker = new Broker(rabbit, commandBus);
broker.start();
rabbit.once('reconnect', () => {
setTimeout(() => broker.start(), 0);
});
});

// HTTP server
const port = process.env.HTTP_PORT || 3000;
const server = createHttpServer({commandBus});
Expand Down
67 changes: 63 additions & 4 deletions yarn.lock
Expand Up @@ -1187,6 +1187,18 @@ ajv@^6.10.0, ajv@^6.10.2, ajv@^6.5.5:
json-schema-traverse "^0.4.1"
uri-js "^4.2.2"

amqplib@0.5.5:
version "0.5.5"
resolved "https://registry.yarnpkg.com/amqplib/-/amqplib-0.5.5.tgz#698f0cb577e0591954a90572fcb3b8998a76fd40"
integrity sha512-sWx1hbfHbyKMw6bXOK2k6+lHL8TESWxjAx5hG8fBtT7wcxoXNIsFxZMnFyBjxt3yL14vn7WqBDe5U6BGOadtLg==
dependencies:
bitsyntax "~0.1.0"
bluebird "^3.5.2"
buffer-more-ints "~1.0.0"
readable-stream "1.x >=1.1.9"
safe-buffer "~5.1.2"
url-parse "~1.4.3"

ansi-align@^2.0.0:
version "2.0.0"
resolved "https://registry.yarnpkg.com/ansi-align/-/ansi-align-2.0.0.tgz#c36aeccba563b89ceb556f3690f0b1d9e3547f7f"
Expand Down Expand Up @@ -1482,7 +1494,16 @@ binary-extensions@^1.0.0:
resolved "https://registry.yarnpkg.com/binary-extensions/-/binary-extensions-1.13.1.tgz#598afe54755b2868a5330d2aff9d4ebb53209b65"
integrity sha512-Un7MIEDdUC5gNpcGDV97op1Ywk748MpHcFTHoYs6qnj1Z3j7I53VG3nwZhKzoBZmbdRNnb6WRdFlwl7tSDuZGw==

bluebird@^3.5.4:
bitsyntax@~0.1.0:
version "0.1.0"
resolved "https://registry.yarnpkg.com/bitsyntax/-/bitsyntax-0.1.0.tgz#b0c59acef03505de5a2ed62a2f763c56ae1d6205"
integrity sha512-ikAdCnrloKmFOugAfxWws89/fPc+nw0OOG1IzIE72uSOg/A3cYptKCjSUhDTuj7fhsJtzkzlv7l3b8PzRHLN0Q==
dependencies:
buffer-more-ints "~1.0.0"
debug "~2.6.9"
safe-buffer "~5.1.2"

bluebird@^3.5.2, bluebird@^3.5.4:
version "3.7.0"
resolved "https://registry.yarnpkg.com/bluebird/-/bluebird-3.7.0.tgz#56a6a886e03f6ae577cffedeb524f8f2450293cf"
integrity sha512-aBQ1FxIa7kSWCcmKHlcHFlT2jt6J/l4FzC7KcPELkOJOsPOb/bccdhmIrKDfXhwFrmc7vDoDrrepFvGqjyXGJg==
Expand Down Expand Up @@ -1576,6 +1597,11 @@ buffer-from@^1.0.0:
resolved "https://registry.yarnpkg.com/buffer-from/-/buffer-from-1.1.1.tgz#32713bc028f75c02fdb710d7c7bcec1f2c6070ef"
integrity sha512-MQcXEUbCKtEo7bhqEs6560Hyd4XaovZlO/k9V3hjVUF/zwW7KBVdSK4gIt/bzwS9MbR5qob+F5jusZsb0YQK2A==

buffer-more-ints@~1.0.0:
version "1.0.0"
resolved "https://registry.yarnpkg.com/buffer-more-ints/-/buffer-more-ints-1.0.0.tgz#ef4f8e2dddbad429ed3828a9c55d44f05c611422"
integrity sha512-EMetuGFz5SLsT0QTnXzINh4Ksr+oo4i+UGTXEshiGCQWnsgSs7ZhJ8fzlwQ+OzEMs0MpDAMr1hxnblp5a4vcHg==

bytes@3.1.0:
version "3.1.0"
resolved "https://registry.yarnpkg.com/bytes/-/bytes-3.1.0.tgz#f6cf7933a360e0588fa9fde85651cdc7f805d1f6"
Expand Down Expand Up @@ -2107,7 +2133,7 @@ dateformat@^3.0.3:
resolved "https://registry.yarnpkg.com/dateformat/-/dateformat-3.0.3.tgz#a6e37499a4d9a9cf85ef5872044d62901c9889ae"
integrity sha512-jyCETtSl3VMZMWeRo7iY1FL19ges1t55hMo5yaam4Jrsm5EPL89UQkoQRyiI+Yf4k8r2ZpdngkV8hr1lIdjb3Q==

debug@^2.2.0, debug@^2.3.3, debug@^2.6.8, debug@^2.6.9:
debug@^2.2.0, debug@^2.3.3, debug@^2.6.8, debug@^2.6.9, debug@~2.6.9:
version "2.6.9"
resolved "https://registry.yarnpkg.com/debug/-/debug-2.6.9.tgz#5d128515df134ff327e90a4c93f4e077a536341f"
integrity sha512-bC7ElrdJaJnPbAP+1EotYvqZsb3ecl5wi6Bfi6BJTUcNowp6cvspg0jXznRTKDjm/E7AdgFBVeAPVMNcKGsHMA==
Expand Down Expand Up @@ -3423,7 +3449,7 @@ inflight@^1.0.4:
once "^1.3.0"
wrappy "1"

inherits@2, inherits@2.0.4, inherits@^2.0.3, inherits@~2.0.3:
inherits@2, inherits@2.0.4, inherits@^2.0.3, inherits@~2.0.1, inherits@~2.0.3:
version "2.0.4"
resolved "https://registry.yarnpkg.com/inherits/-/inherits-2.0.4.tgz#0fa2c64f932917c3433a0ded55363aae37416b7c"
integrity sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ==
Expand Down Expand Up @@ -5760,6 +5786,11 @@ qs@~6.5.2:
resolved "https://registry.yarnpkg.com/qs/-/qs-6.5.2.tgz#cb3ae806e8740444584ef154ce8ee98d403f3e36"
integrity sha512-N5ZAX4/LxJmF+7wN74pUD6qAh9/wnvdQcjq9TZjevvXzSUo7bfmw91saqMjzGS2xq91/odN2dW/WOl7qQHNDGA==

querystringify@^2.1.1:
version "2.1.1"
resolved "https://registry.yarnpkg.com/querystringify/-/querystringify-2.1.1.tgz#60e5a5fd64a7f8bfa4d2ab2ed6fdf4c85bad154e"
integrity sha512-w7fLxIRCRT7U8Qu53jQnJyPkYZIaR4n5151KMfcJlO/A9397Wxb1amJvROTK6TOnp7PfoAmg/qXiNHI+08jRfA==

quick-format-unescaped@^1.1.2:
version "1.1.2"
resolved "https://registry.yarnpkg.com/quick-format-unescaped/-/quick-format-unescaped-1.1.2.tgz#0ca581de3174becef25ac3c2e8956342381db698"
Expand Down Expand Up @@ -5854,6 +5885,16 @@ read-pkg@^5.1.1:
parse-json "^5.0.0"
type-fest "^0.6.0"

"readable-stream@1.x >=1.1.9":
version "1.1.14"
resolved "https://registry.yarnpkg.com/readable-stream/-/readable-stream-1.1.14.tgz#7cf4c54ef648e3813084c636dd2079e166c081d9"
integrity sha1-fPTFTvZI44EwhMY23SB54WbAgdk=
dependencies:
core-util-is "~1.0.0"
inherits "~2.0.1"
isarray "0.0.1"
string_decoder "~0.10.x"

readable-stream@^2.0.2, readable-stream@^2.0.6, readable-stream@~2.3.6:
version "2.3.6"
resolved "https://registry.yarnpkg.com/readable-stream/-/readable-stream-2.3.6.tgz#b11c27d88b8ff1fbe070643cf94b0c79ae1b0aaf"
Expand Down Expand Up @@ -6058,6 +6099,11 @@ require-main-filename@^2.0.0:
resolved "https://registry.yarnpkg.com/require-main-filename/-/require-main-filename-2.0.0.tgz#d0b329ecc7cc0f61649f62215be69af54aa8989b"
integrity sha512-NKN5kMDylKuldxYLSUfrbo5Tuzh4hd+2E8NPPX02mZtn1VuREQToYe/ZdlJy+J3uCpfaiGF05e7B8W0iXbQHmg==

requires-port@^1.0.0:
version "1.0.0"
resolved "https://registry.yarnpkg.com/requires-port/-/requires-port-1.0.0.tgz#925d2601d39ac485e091cf0da5c6e694dc3dcaff"
integrity sha1-kl0mAdOaxIXgkc8NpcbmlNw9yv8=

requizzle@^0.2.3:
version "0.2.3"
resolved "https://registry.yarnpkg.com/requizzle/-/requizzle-0.2.3.tgz#4675c90aacafb2c036bd39ba2daa4a1cb777fded"
Expand Down Expand Up @@ -6184,7 +6230,7 @@ rxjs@^6.3.3, rxjs@^6.4.0:
dependencies:
tslib "^1.9.0"

safe-buffer@5.1.2, safe-buffer@~5.1.0, safe-buffer@~5.1.1:
safe-buffer@5.1.2, safe-buffer@~5.1.0, safe-buffer@~5.1.1, safe-buffer@~5.1.2:
version "5.1.2"
resolved "https://registry.yarnpkg.com/safe-buffer/-/safe-buffer-5.1.2.tgz#991ec69d296e0313747d59bdfd2b745c35f8828d"
integrity sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g==
Expand Down Expand Up @@ -6559,6 +6605,11 @@ string_decoder@^1.1.1:
dependencies:
safe-buffer "~5.2.0"

string_decoder@~0.10.x:
version "0.10.31"
resolved "https://registry.yarnpkg.com/string_decoder/-/string_decoder-0.10.31.tgz#62e203bc41766c6c28c9fc84301dab1c5310fa94"
integrity sha1-YuIDvEF2bGwoyfyEMB2rHFMQ+pQ=

string_decoder@~1.1.1:
version "1.1.1"
resolved "https://registry.yarnpkg.com/string_decoder/-/string_decoder-1.1.1.tgz#9cf1611ba62685d7030ae9e4ba34149c3af03fc8"
Expand Down Expand Up @@ -6992,6 +7043,14 @@ url-parse-lax@^1.0.0:
dependencies:
prepend-http "^1.0.1"

url-parse@~1.4.3:
version "1.4.7"
resolved "https://registry.yarnpkg.com/url-parse/-/url-parse-1.4.7.tgz#a8a83535e8c00a316e403a5db4ac1b9b853ae278"
integrity sha512-d3uaVyzDB9tQoSXFvuSUNFibTd9zxd2bkVrDRvF5TmvWWQwqE4lgYJ5m+x1DbecWkw+LK4RNl2CU1hHuOKPVlg==
dependencies:
querystringify "^2.1.1"
requires-port "^1.0.0"

use@^3.1.0:
version "3.1.1"
resolved "https://registry.yarnpkg.com/use/-/use-3.1.1.tgz#d50c8cac79a19fbc20f2911f56eb973f4e10070f"
Expand Down

0 comments on commit 9d1610f

Please sign in to comment.