diff --git a/package.json b/package.json index 53c851f1c..108dba4f9 100644 --- a/package.json +++ b/package.json @@ -18,7 +18,7 @@ "test": "jest --coverage --no-cache", "test:unit": "jest --testMatch \"**/unit/**spec.js\" --coverage --no-cache", "test:int": "jest --testMatch \"**/integration/**spec.js\" --coverage --no-cache", - "test:amqp": "jest --testMatch \"**/integration/amqp/**spc.js\"" + "test:amqp": "jest --testMatch \"**/integration/amqp/**spc.js\" --runInBand" }, "keywords": [ "microservices", diff --git a/src/transporters/amqp.js b/src/transporters/amqp.js index 9177efdde..088edc028 100644 --- a/src/transporters/amqp.js +++ b/src/transporters/amqp.js @@ -182,7 +182,7 @@ class AmqpTransporter extends Transporter { .then(() => this.channel.close()) .then(() => this.connection.close()) .then(() => { - this.bindings = null; + this.bindings = []; this.channel = null; this.connection = null; }) diff --git a/test/integration/amqp.old/events/events.spc.js b/test/integration/amqp.old/events/events.spc.js deleted file mode 100644 index 2bc00d712..000000000 --- a/test/integration/amqp.old/events/events.spc.js +++ /dev/null @@ -1,57 +0,0 @@ -/* eslint-disable no-console */ - -const path = require("path"); -const Promise = require("bluebird"); -let { exec, debugExec, callIn } = require("../util"); - -const commonString = " received the event."; - -const findResponse = (arr) => arr - .map(string => { - const line = string - .split("\n") - .find(a => a.indexOf(commonString) > 0); - return line ? line.slice(0, line.indexOf(commonString)) : false; - }) - .filter(a => a); - -describe("Test AMQPTransporter events", () => { - //exec = debugExec; - beforeEach(() => exec("node", [path.resolve(__dirname, "..", "purge.js")])); - afterAll(() => exec("node", [path.resolve(__dirname, "..", "purge.js")])); - - it("Should send an event to all subscribed nodes.", () => { - return Promise.all([ - exec("node", [path.resolve(__dirname,"pub.js")]), - exec("node", [path.resolve(__dirname,"sub1.js")]), - exec("node", [path.resolve(__dirname,"sub2.js")]), - exec("node", [path.resolve(__dirname,"sub3.js")]), - ]) - .then((stdout) => { - const expectedReceivers = [ - "Publisher", - "Subscriber1", - "Subscriber2", - "Subscriber3", - ].sort(); - expect(findResponse(stdout).sort()).toEqual(expectedReceivers); - }); - }, 15000); - - it( "Subscribed nodes should not receive events older than 5 seconds.", () => { - return Promise.all([ - callIn(() => exec("node", [path.resolve(__dirname,"pub.js")]), 200), - exec("node", [path.resolve(__dirname,"sub1.js")]), - exec("node", [path.resolve(__dirname,"sub2.js")]), - callIn(() => exec("node", [path.resolve(__dirname,"sub3.js")]), 6000), - ]) - .then((stdout) => { - const expectedReceivers = [ - "Publisher", - "Subscriber1", - "Subscriber2", - ].sort(); - expect(findResponse(stdout).sort()).toEqual(expectedReceivers); - }); - }, 20000); -}); diff --git a/test/integration/amqp.old/events/pub.js b/test/integration/amqp.old/events/pub.js deleted file mode 100644 index ba0b09c3f..000000000 --- a/test/integration/amqp.old/events/pub.js +++ /dev/null @@ -1,23 +0,0 @@ -/* eslint-disable no-console */ - -const { ServiceBroker } = require("../../../.."); - -const broker = new ServiceBroker({ - nodeID: "event-pub-nodeID", - logger: console, - transporter: process.env.AMQP_URI || "amqp://guest:guest@localhost:5672", -}); - -broker.createService({ - name: "pub", - events: { - "hello.world"() { - console.log("Publisher received the event."); - } - } -}); - -setTimeout(() => process.exit(1), 10000); - -broker.start(); -setTimeout(() => broker.broadcast("hello.world", { testing: true }), 1000); diff --git a/test/integration/amqp.old/events/sub1.js b/test/integration/amqp.old/events/sub1.js deleted file mode 100644 index 36700f3e5..000000000 --- a/test/integration/amqp.old/events/sub1.js +++ /dev/null @@ -1,22 +0,0 @@ -/* eslint-disable no-console */ - -const { ServiceBroker } = require("../../../.."); - -const broker = new ServiceBroker({ - nodeID: "event-sub1-nodeID", - logger: console, - transporter: process.env.AMQP_URI || "amqp://guest:guest@localhost:5672", -}); - -broker.createService({ - name: "aService", - events: { - "hello.world": function() { - console.log("Subscriber1 received the event."); - }, - } -}); - -setTimeout(() => process.exit(1), 10000); - -broker.start(); diff --git a/test/integration/amqp.old/events/sub2.js b/test/integration/amqp.old/events/sub2.js deleted file mode 100644 index 76c2da84a..000000000 --- a/test/integration/amqp.old/events/sub2.js +++ /dev/null @@ -1,22 +0,0 @@ -/* eslint-disable no-console */ - -const { ServiceBroker } = require("../../../.."); - -const broker = new ServiceBroker({ - nodeID: "event-sub2-nodeID", - logger: console, - transporter: process.env.AMQP_URI || "amqp://guest:guest@localhost:5672", -}); - -broker.createService({ - name: "aService", - events: { - "hello.world": function() { - console.log("Subscriber2 received the event."); - }, - } -}); - -setTimeout(() => process.exit(1), 10000); - -broker.start(); diff --git a/test/integration/amqp.old/events/sub3.js b/test/integration/amqp.old/events/sub3.js deleted file mode 100644 index e11c7d160..000000000 --- a/test/integration/amqp.old/events/sub3.js +++ /dev/null @@ -1,22 +0,0 @@ -/* eslint-disable no-console */ - -const { ServiceBroker } = require("../../../.."); - -const broker = new ServiceBroker({ - nodeID: "event-sub3-nodeID", - logger: console, - transporter: process.env.AMQP_URI || "amqp://guest:guest@localhost:5672", -}); - -broker.createService({ - name: "bService", - events: { - "hello.world": function() { - console.log("Subscriber3 received the event."); - }, - } -}); - -setTimeout(() => process.exit(1), 10000); - -broker.start(); diff --git a/test/integration/amqp.old/purge.js b/test/integration/amqp.old/purge.js deleted file mode 100644 index 14b1d584e..000000000 --- a/test/integration/amqp.old/purge.js +++ /dev/null @@ -1,122 +0,0 @@ -/* eslint-disable no-console */ - -const queueNames = [ - "MOL.DISCONNECT.event-pub-nodeID", - "MOL.DISCONNECT.event-sub1-nodeID", - "MOL.DISCONNECT.event-sub2-nodeID", - "MOL.DISCONNECT.event-sub3-nodeID", - "MOL.DISCONNECT.five-request-nodeID", - "MOL.DISCONNECT.single-request-nodeID", - "MOL.DISCONNECT.slow-nodeID", - "MOL.DISCONNECT.timestamped-nodeID", - "MOL.DISCONNECT.too-slow-nodeID", - "MOL.DISCONNECT.worker1-nodeID", - "MOL.DISCONNECT.worker2-nodeID", - "MOL.DISCOVER.event-pub-nodeID", - "MOL.DISCOVER.event-sub1-nodeID", - "MOL.DISCOVER.event-sub2-nodeID", - "MOL.DISCOVER.event-sub3-nodeID", - "MOL.DISCOVER.five-request-nodeID", - "MOL.DISCOVER.single-request-nodeID", - "MOL.DISCOVER.slow-nodeID", - "MOL.DISCOVER.timestamped-nodeID", - "MOL.DISCOVER.too-slow-nodeID", - "MOL.DISCOVER.worker1-nodeID", - "MOL.DISCOVER.worker2-nodeID", - "MOL.EVENT.event-pub-nodeID", - "MOL.EVENT.event-sub1-nodeID", - "MOL.EVENT.event-sub2-nodeID", - "MOL.EVENT.event-sub3-nodeID", - "MOL.EVENT.five-request-nodeID", - "MOL.EVENT.single-request-nodeID", - "MOL.EVENT.slow-nodeID", - "MOL.EVENT.timestamped-nodeID", - "MOL.EVENT.too-slow-nodeID", - "MOL.EVENT.worker1-nodeID", - "MOL.EVENT.worker2-nodeID", - "MOL.HEARTBEAT.event-pub-nodeID", - "MOL.HEARTBEAT.event-sub1-nodeID", - "MOL.HEARTBEAT.event-sub2-nodeID", - "MOL.HEARTBEAT.event-sub3-nodeID", - "MOL.HEARTBEAT.five-request-nodeID", - "MOL.HEARTBEAT.single-request-nodeID", - "MOL.HEARTBEAT.slow-nodeID", - "MOL.HEARTBEAT.timestamped-nodeID", - "MOL.HEARTBEAT.too-slow-nodeID", - "MOL.HEARTBEAT.worker1-nodeID", - "MOL.HEARTBEAT.worker2-nodeID", - "MOL.INFO.event-pub-nodeID", - "MOL.INFO.event-sub1-nodeID", - "MOL.INFO.event-sub2-nodeID", - "MOL.INFO.event-sub3-nodeID", - "MOL.INFO.five-request-nodeID", - "MOL.INFO.single-request-nodeID", - "MOL.INFO.slow-nodeID", - "MOL.INFO.timestamped-nodeID", - "MOL.INFO.too-slow-nodeID", - "MOL.INFO.worker1-nodeID", - "MOL.INFO.worker2-nodeID", - "MOL.REQ.testing.hello", - "MOL.RES.event-pub-nodeID", - "MOL.RES.event-sub1-nodeID", - "MOL.RES.event-sub2-nodeID", - "MOL.RES.event-sub3-nodeID", - "MOL.RES.five-request-nodeID", - "MOL.RES.single-request-nodeID", - "MOL.RES.slow-nodeID", - "MOL.RES.timestamped-nodeID", - "MOL.RES.too-slow-nodeID", - "MOL.RES.worker1-nodeID", - "MOL.RES.worker2-nodeID", -]; -const exchanges = [ - "MOL.DISCONNECT", - "MOL.DISCOVER", - "MOL.EVENT", - "MOL.HEARTBEAT", - "MOL.INFO", -]; - -const amqp = require("amqplib"); -let connectionRef; - -amqp.connect(process.env.AMQP_URI || "amqp://guest:guest@localhost:5672") - .then(connection => { - console.info("AMQP connected!"); - connectionRef = connection; - return connection - .on("error", (err) => { - console.error("AMQP connection error!", err); - process.exit(1); - }) - .on("close", (err) => { - const crashWorthy = require("amqplib/lib/connection").isFatalError(err); - console.error("AMQP connection closed!", crashWorthy && err || ""); - process.exit(1); - }) - .createChannel(); - }) - .then((channel) => { - console.info("AMQP channel created!"); - channel - .on("close", () => { - console.warn("AMQP channel closed!"); - process.exit(1); - }) - .on("error", (error) => { - console.error("AMQP channel error!", error); - process.exit(1); - }); - - return Promise.all(queueNames.map(q => channel.deleteQueue(q))) - .then(() => Promise.all(exchanges.map(e => channel.deleteExchange(e)))); - }) - .then(() => { - console.log("Done."); - return connectionRef.close(); - }) - .then(() => process.exit(0)) - .catch((err) => { - console.error("AMQP failed to create channel!", err); - process.exit(1); - }); diff --git a/test/integration/amqp.old/rpc/fiveRequests.js b/test/integration/amqp.old/rpc/fiveRequests.js deleted file mode 100644 index 22ffc0601..000000000 --- a/test/integration/amqp.old/rpc/fiveRequests.js +++ /dev/null @@ -1,38 +0,0 @@ -/* eslint-disable no-console */ - -const { ServiceBroker } = require("../../../.."); -const broker = new ServiceBroker({ - nodeID: "five-request-nodeID", - logger: console, - transporter: process.env.AMQP_URI || "amqp://guest:guest@localhost:5672", -}); - -broker.start(); - -setTimeout(() => process.exit(1), 10000); - -setTimeout(() => { - broker.call("testing.hello", { cmd: "request" + (process.argv[2] || Date.now()) }) - .then(console.log); -}, 1000); - -setTimeout(() => { - broker.call("testing.hello", { cmd: "request" + (process.argv[3] || Date.now()) }) - .then(console.log); -}, 2000); - -setTimeout(() => { - broker.call("testing.hello", { cmd: "request" + (process.argv[4] || Date.now()) }) - .then(console.log); -}, 3000); - -setTimeout(() => { - broker.call("testing.hello", { cmd: "request" + (process.argv[5] || Date.now()) }) - .then(console.log); -}, 4000); - -setTimeout(() => { - broker.call("testing.hello", { cmd: "request" + (process.argv[6] || Date.now()) }) - .then(console.log); -}, 5000); - diff --git a/test/integration/amqp.old/rpc/rpc.spc.js b/test/integration/amqp.old/rpc/rpc.spc.js deleted file mode 100644 index e82e4216b..000000000 --- a/test/integration/amqp.old/rpc/rpc.spc.js +++ /dev/null @@ -1,119 +0,0 @@ -/* eslint-disable no-console */ - -const path = require("path"); -const Promise = require("bluebird"); -const { exec, callIn } = require("../util"); - -const respondTo = "responding to"; -const receive = "received"; -const respond = "responded"; - -const findLines = (str, target) => str.split("\n").filter(a => a.includes(target)); - - -describe("Test AMQPTransporter RPC", () => { - - beforeEach(() => exec("node", [path.resolve(__dirname, "..", "purge.js")])); - afterAll(() => exec("node", [path.resolve(__dirname, "..", "purge.js")])); - - it("Only one node should receive any given request.", () => { - const messageID = Date.now(); - return Promise.all([ - exec("node", [path.resolve(__dirname, "worker1.js")]), - callIn(() => exec("node", [path.resolve(__dirname, "worker2.js")]), 1000), - callIn(() => exec("node", [path.resolve(__dirname, "singleRequest.js"), messageID]), 1000), - ]) - .then((stdout) => { - const responses = stdout - .reduce((acc, string) => { - const lines = findLines(string, respondTo); - if (lines.length) { - return acc.concat(lines); - } - return acc; - }, []); - - expect(responses).toHaveLength(1); - const oneOf = [ - `worker1 responding to request${messageID}`, - `worker2 responding to request${messageID}` - ]; - expect(oneOf).toContain(responses[0]); - }); - }, 15000); - - it("Nodes should only receive one request at a time by default.", () => { - const messageID = Date.now(); - return Promise.all([ - exec("node", [path.resolve(__dirname, "timeStampedWorker.js")]), - exec("node", [path.resolve(__dirname, "fiveRequests.js"), messageID]), - ]) - .then((stdout) => { - const responses = stdout[0] - .split("\n") - .filter(a => a.indexOf(receive) !== -1 || a.indexOf(respond) !== -1); - - expect(responses).toHaveLength(10); - - const getType = a => a.split(" ")[0]; - const getTime = a => Number(a.split(" ")[1]); - - for (let i = 0; i < 10; i++) { - expect(getType(responses[i])).toBe(i % 2 === 0 ? receive : respond); - } - - for (let i = 0; i < 9; i++) { - expect(getTime(responses[i])).toBeLessThan(getTime(responses[i + 1])); - } - }); - }, 15000); - - it("Should use availability-based load balancing", () => { - const getResponses = str => str.split("\n").filter(a => a.includes(respondTo)); - return Promise.all([ - exec("node", [path.resolve(__dirname, "slowWorker.js")]), - exec("node", [path.resolve(__dirname, "worker1.js")]), - exec("node", [path.resolve(__dirname, "fiveRequests.js")]), - ]) - .then((stdout) => { - const slowResponses = getResponses(stdout[0]); - const fastResponses = getResponses(stdout[1]); - expect(slowResponses).toHaveLength(1); - expect(fastResponses).toHaveLength(4); - }); - }, 15000); - - it("Multiple ndoes should be able to call a single action.", () => { - return Promise.all([ - exec("node", [path.resolve(__dirname, "fiveRequests.js")]), - exec("node", [path.resolve(__dirname, "singleRequest.js")]), - exec("node", [path.resolve(__dirname, "worker1.js")]), - ]) - .then((stdout) => { - const responses = findLines(stdout[2], respondTo); - expect(responses).toHaveLength(6); - }); - }, 15000); - - it("Should not have a request or response dequeued until it has been received", () => { - return Promise.all([ - exec("node", [path.resolve(__dirname, "singleRequest.js")]), - exec("node", [path.resolve(__dirname, "tooSlowWorker.js")]), - ]) - .then((stdout) => { - const responses = findLines(stdout[1], "responded"); - expect(responses).toHaveLength(0); - return exec("node", [path.resolve(__dirname, "timeStampedWorker.js")]); - }) - .then(stdout => { - const responses = findLines(stdout, "responded"); - expect(responses).toHaveLength(1); - return exec("node", [path.resolve(__dirname, "timeStampedWorker.js")]); - }) - .then(stdout => { - const responses = findLines(stdout, "responded"); - expect(responses).toHaveLength(0); - }); - }, 60000); - -}); diff --git a/test/integration/amqp.old/rpc/singleRequest.js b/test/integration/amqp.old/rpc/singleRequest.js deleted file mode 100644 index d15c39551..000000000 --- a/test/integration/amqp.old/rpc/singleRequest.js +++ /dev/null @@ -1,19 +0,0 @@ -/* eslint-disable no-console */ - -const { ServiceBroker } = require("../../../.."); -const broker = new ServiceBroker({ - nodeID: "single-request-nodeID", - logger: console, - transporter: process.env.AMQP_URI || "amqp://guest:guest@localhost:5672", -}); - -broker.start(); - -setTimeout(() => process.exit(1), 10000); - -const messageID = process.argv[2] || Date.now(); - -setTimeout(() => { - broker.call("testing.hello", { cmd: "request" + messageID }) - .then(console.log); -}, 1000); diff --git a/test/integration/amqp.old/rpc/slowWorker.js b/test/integration/amqp.old/rpc/slowWorker.js deleted file mode 100644 index e2e2dd998..000000000 --- a/test/integration/amqp.old/rpc/slowWorker.js +++ /dev/null @@ -1,31 +0,0 @@ -/* eslint-disable no-console */ - -const { ServiceBroker } = require("../../../.."); -const broker = new ServiceBroker({ - nodeID: "slow-nodeID", - logger: console, - transporter: process.env.AMQP_URI || "amqp://guest:guest@localhost:5672", -}); - -broker.createService({ - name: "testing", - actions: { - hello: { - params: { - cmd: { type: "string" } - }, - handler(ctx) { - console.log("slowWorker responding to", ctx.params.cmd); - return new Promise((resolve) => { - setTimeout(() => { - resolve({ msg: ctx.params.cmd, from: "slowWorker" }); - }, 10000); - }); - }, - }, - }, -}); - -broker.start(); - -setTimeout(() => process.exit(1), 10000); diff --git a/test/integration/amqp.old/rpc/timeStampedWorker.js b/test/integration/amqp.old/rpc/timeStampedWorker.js deleted file mode 100644 index 89995cbd5..000000000 --- a/test/integration/amqp.old/rpc/timeStampedWorker.js +++ /dev/null @@ -1,33 +0,0 @@ -/* eslint-disable no-console */ - -const { ServiceBroker } = require("../../../.."); -const broker = new ServiceBroker({ - nodeID: "timestamped-nodeID", - logger: console, - transporter: process.env.AMQP_URI || "amqp://guest:guest@localhost:5672", -}); - -broker.createService({ - name: "testing", - actions: { - hello: { - params: { - cmd: { type: "string" } - }, - handler(ctx) { - console.log("timestampedWorker responding to", ctx.params.cmd); - console.log("received", Date.now()); - return new Promise((resolve) => { - setTimeout(() => { - console.log("responded", Date.now()); - resolve({ msg: ctx.params.cmd, from: "timestampedWorker" }); - }, 1000); - }); - }, - }, - }, -}); - -broker.start(); - -setTimeout(() => process.exit(1), 10000); diff --git a/test/integration/amqp.old/rpc/tooSlowWorker.js b/test/integration/amqp.old/rpc/tooSlowWorker.js deleted file mode 100644 index 037649903..000000000 --- a/test/integration/amqp.old/rpc/tooSlowWorker.js +++ /dev/null @@ -1,31 +0,0 @@ -/* eslint-disable no-console */ - -const { ServiceBroker } = require("../../../.."); -const broker = new ServiceBroker({ - nodeID: "too-slow-nodeID", - logger: console, - transporter: process.env.AMQP_URI || "amqp://guest:guest@localhost:5672", -}); - -broker.createService({ - name: "testing", - actions: { - hello: { - params: { - cmd: { type: "string" } - }, - handler(ctx) { - return new Promise((resolve) => { - setTimeout(() => { - resolve({ msg: ctx.params.cmd, from: "too-slow" }); - console.log("responded", ctx.params.cmd); - }, 9000); - }); - }, - }, - }, -}); - -broker.start(); - -setTimeout(() => process.exit(1), 8000); diff --git a/test/integration/amqp.old/rpc/worker1.js b/test/integration/amqp.old/rpc/worker1.js deleted file mode 100644 index 8ccbf69ae..000000000 --- a/test/integration/amqp.old/rpc/worker1.js +++ /dev/null @@ -1,27 +0,0 @@ -/* eslint-disable no-console */ - -const { ServiceBroker } = require("../../../.."); -const broker = new ServiceBroker({ - nodeID: "worker1-nodeID", - logger: console, - transporter: process.env.AMQP_URI || "amqp://guest:guest@localhost:5672", -}); - -broker.createService({ - name: "testing", - actions: { - hello: { - params: { - cmd: { type: "string" } - }, - handler(ctx) { - console.log("worker1 responding to", ctx.params.cmd); - return Promise.resolve({ msg: ctx.params.cmd, from: "worker1" }); - }, - }, - }, -}); - -broker.start(); - -setTimeout(() => process.exit(130), 10000); diff --git a/test/integration/amqp.old/rpc/worker2.js b/test/integration/amqp.old/rpc/worker2.js deleted file mode 100644 index f5917ce43..000000000 --- a/test/integration/amqp.old/rpc/worker2.js +++ /dev/null @@ -1,28 +0,0 @@ -/* eslint-disable no-console */ - -const { ServiceBroker } = require("../../../.."); - -const broker = new ServiceBroker({ - nodeID: "worker2-nodeID", - logger: console, - transporter: process.env.AMQP_URI || "amqp://guest:guest@localhost:5672", -}); - -broker.createService({ - name: "testing", - actions: { - hello: { - params: { - cmd: { type: "string" } - }, - handler(ctx) { - console.log("worker2 responding to", ctx.params.cmd); - return Promise.resolve({ msg: ctx.params.cmd, from: "worker2" }); - }, - }, - }, -}); - -broker.start(); - -setTimeout(() => process.exit(1), 10000); diff --git a/test/integration/amqp.old/util.js b/test/integration/amqp.old/util.js deleted file mode 100644 index 04cfe9347..000000000 --- a/test/integration/amqp.old/util.js +++ /dev/null @@ -1,40 +0,0 @@ -/* eslint-disable security/detect-child-process */ - -/* istanbul ignore next*/ -const debugExec = (cmd, args=[]) => - new Promise(function (resolve, reject) { - require("child_process") - .spawn(cmd, args, { stdio: "inherit", shell: true }) - .on("exit", resolve) - .on("error", reject); - }); - -/* istanbul ignore next*/ -const exec = (cmd, args=[]) => - new Promise(function (resolve, reject) { - let data = ""; - const process = require("child_process") - .spawn(cmd, args, { stdio: "pipe", shell: true }); - - process - .stdout.on("data", function (chunk) { - data += chunk; - }); - process - .on("exit", () => resolve(data)) - .on("error", () => reject(data)); - }); - -/* istanbul ignore next*/ -const callIn = (cb, timeout) => - new Promise(res => { - setTimeout(() => { - cb().then(res); - }, timeout); - }); - -module.exports = { - exec, - debugExec, - callIn, -}; diff --git a/test/integration/amqp/broadcast.spc.js b/test/integration/amqp/broadcast.spc.js index 06d99385a..11d3ab8ee 100644 --- a/test/integration/amqp/broadcast.spc.js +++ b/test/integration/amqp/broadcast.spc.js @@ -35,8 +35,12 @@ function createNode(name, disableBalancer = false) { describe("Test AMQPTransporter", () => { - beforeAll(() => purge(purgeList)); - afterAll(() => purge(purgeList)); + // Delete all queues and exchanges before and after suite + beforeAll(() => purge(purgeList, true)); + afterAll(() => purge(purgeList, true)); + + // Clear all queues between each test. + afterEach(() => purge(purgeList)); describe("Test AMQPTransporter event broadcast with built-in balancer", () => { @@ -147,7 +151,6 @@ describe("Test AMQPTransporter", () => { pub.broadcastLocal("hello.world", { testing: true }); return Promise.delay(2000).catch(protectReject).then(() => { - // console.log(FLOW); expect(FLOW).toEqual([ "pub", "pub", @@ -161,10 +164,7 @@ describe("Test AMQPTransporter", () => { const purgeList = { queues: [ - "MOL-test-broadcast.EVENTB.pub.hello.world", - "MOL-test-broadcast.EVENTB.sub1.hello.world", - "MOL-test-broadcast.EVENTB.sub2.hello.world", - "MOL-test-broadcast.EVENTB.sub3.hello.world", + // Includes auto-delete queue's in case default settings change "MOL-test-broadcast.REQ.event-pub", "MOL-test-broadcast.REQ.event-sub1", "MOL-test-broadcast.REQ.event-sub2", @@ -178,6 +178,38 @@ const purgeList = { "MOL-test-broadcast.RES.event-sub1", "MOL-test-broadcast.RES.event-sub2", "MOL-test-broadcast.RES.event-sub3", + "MOL-test-broadcast.EVENTB.pub.hello.world", + "MOL-test-broadcast.EVENTB.sub1.hello.world", + "MOL-test-broadcast.EVENTB.sub2.hello.world", + "MOL-test-broadcast.EVENTB.sub3.hello.world", + "MOL-test-broadcast.DISCONNECT.event-pub", + "MOL-test-broadcast.DISCONNECT.event-sub1", + "MOL-test-broadcast.DISCONNECT.event-sub2", + "MOL-test-broadcast.DISCONNECT.event-sub3", + "MOL-test-broadcast.DISCOVER.event-pub", + "MOL-test-broadcast.DISCOVER.event-sub1", + "MOL-test-broadcast.DISCOVER.event-sub2", + "MOL-test-broadcast.DISCOVER.event-sub3", + "MOL-test-broadcast.EVENT.event-pub", + "MOL-test-broadcast.EVENT.event-sub1", + "MOL-test-broadcast.EVENT.event-sub2", + "MOL-test-broadcast.EVENT.event-sub3", + "MOL-test-broadcast.HEARTBEAT.event-pub", + "MOL-test-broadcast.HEARTBEAT.event-sub1", + "MOL-test-broadcast.HEARTBEAT.event-sub2", + "MOL-test-broadcast.HEARTBEAT.event-sub3", + "MOL-test-broadcast.INFO.event-pub", + "MOL-test-broadcast.INFO.event-sub1", + "MOL-test-broadcast.INFO.event-sub2", + "MOL-test-broadcast.INFO.event-sub3", + "MOL-test-broadcast.PING.event-pub", + "MOL-test-broadcast.PING.event-sub1", + "MOL-test-broadcast.PING.event-sub2", + "MOL-test-broadcast.PING.event-sub3", + "MOL-test-broadcast.PONG.event-pub", + "MOL-test-broadcast.PONG.event-sub1", + "MOL-test-broadcast.PONG.event-sub2", + "MOL-test-broadcast.PONG.event-sub3", ], exchanges: [ diff --git a/test/integration/amqp/emit.spc.js b/test/integration/amqp/emit.spc.js index 25f28373c..b05bdd665 100644 --- a/test/integration/amqp/emit.spc.js +++ b/test/integration/amqp/emit.spc.js @@ -38,8 +38,12 @@ function createNode(name, serviceName = "emit-handler", disableBalancer = false) describe("Test AMQPTransporter", () => { - beforeAll(() => purge(purgeList)); - afterAll(() => purge(purgeList)); + // Delete all queues and exchanges before and after suite + beforeAll(() => purge(purgeList, true)); + afterAll(() => purge(purgeList, true)); + + // Clear all queues between each test. + afterEach(() => purge(purgeList)); describe("Test AMQPTransporter event emit with built-in balancer", () => { @@ -48,7 +52,8 @@ describe("Test AMQPTransporter", () => { const sub2 = createNode("sub2"); const sub3 = createNode("sub3", "other-handler"); - beforeAll(() => { + // Reset Flow array and start services + beforeEach(() => { FLOW = []; return Promise.all([ pub.start(), @@ -58,19 +63,19 @@ describe("Test AMQPTransporter", () => { ]).delay(200); }); - afterAll(() => Promise.all([ + // Stop services and clear queues + afterEach(() => Promise.all([ pub.stop(), sub1.stop(), sub2.stop(), sub3.stop(), ])); - it("should send emit event only one service", () => { + it("should send emit event to only one service", () => { for(let i = 0; i < 6; i++) pub.emit("hello.world2", { testing: true }); return Promise.delay(2000).catch(protectReject).then(() => { - //console.log(FLOW); expect(FLOW).toHaveLength(12); expect(FLOW).toEqual(expect.arrayContaining([ "pub", @@ -91,7 +96,8 @@ describe("Test AMQPTransporter", () => { const sub2 = createNode("sub2", "emit-handler", true); const sub3 = createNode("sub3", "other-handler", true); - beforeAll(() => { + // Reset Flow array and start services + beforeEach(() => { FLOW = []; return Promise.all([ pub.start(), @@ -101,7 +107,8 @@ describe("Test AMQPTransporter", () => { ]).delay(200); }); - afterAll(() => Promise.all([ + // Stop services and clear queues + afterEach(() => Promise.all([ pub.stop(), sub1.stop(), sub2.stop(), @@ -113,7 +120,6 @@ describe("Test AMQPTransporter", () => { pub.emit("hello.world2", { testing: true }); return Promise.delay(2000).catch(protectReject).then(() => { - //console.log(FLOW); expect(FLOW).toHaveLength(12); expect(FLOW).toEqual(expect.arrayContaining([ "pub", @@ -129,8 +135,37 @@ describe("Test AMQPTransporter", () => { const purgeList = { queues: [ + // Includes auto-delete queue's in case default settings change + "MOL-test-emit.DISCONNECT.event-pub", + "MOL-test-emit.DISCONNECT.event-sub1", + "MOL-test-emit.DISCONNECT.event-sub2", + "MOL-test-emit.DISCONNECT.event-sub3", + "MOL-test-emit.DISCOVER.event-pub", + "MOL-test-emit.DISCOVER.event-sub1", + "MOL-test-emit.DISCOVER.event-sub2", + "MOL-test-emit.DISCOVER.event-sub3", + "MOL-test-emit.EVENT.event-pub", + "MOL-test-emit.EVENT.event-sub1", + "MOL-test-emit.EVENT.event-sub2", + "MOL-test-emit.EVENT.event-sub3", "MOL-test-emit.EVENTB.emit-handler.hello.world2", "MOL-test-emit.EVENTB.other-handler.hello.world2", + "MOL-test-emit.HEARTBEAT.event-pub", + "MOL-test-emit.HEARTBEAT.event-sub1", + "MOL-test-emit.HEARTBEAT.event-sub2", + "MOL-test-emit.HEARTBEAT.event-sub3", + "MOL-test-emit.INFO.event-pub", + "MOL-test-emit.INFO.event-sub1", + "MOL-test-emit.INFO.event-sub2", + "MOL-test-emit.INFO.event-sub3", + "MOL-test-emit.PING.event-pub", + "MOL-test-emit.PING.event-sub1", + "MOL-test-emit.PING.event-sub2", + "MOL-test-emit.PING.event-sub3", + "MOL-test-emit.PONG.event-pub", + "MOL-test-emit.PONG.event-sub1", + "MOL-test-emit.PONG.event-sub2", + "MOL-test-emit.PONG.event-sub3", "MOL-test-emit.REQ.event-pub", "MOL-test-emit.REQ.event-sub1", "MOL-test-emit.REQ.event-sub2", @@ -145,7 +180,6 @@ const purgeList = { "MOL-test-emit.RES.event-sub2", "MOL-test-emit.RES.event-sub3", ], - exchanges: [ "MOL-test-emit.DISCONNECT", "MOL-test-emit.DISCOVER", diff --git a/test/integration/amqp/purge.js b/test/integration/amqp/purge.js index 4195a6040..bb738b87b 100644 --- a/test/integration/amqp/purge.js +++ b/test/integration/amqp/purge.js @@ -3,43 +3,45 @@ "use strict"; const amqp = require("amqplib"); +const Promise = require("bluebird"); -module.exports = function({ queues, exchanges }) { - +// Trying to delete a non-existant queue or exchange would cause a fatal error. This would mean we +// couldn't continue clearing queues afterwards. Using a new connection for each operation allows +// us to reliably clear out AMQP queues and exchanges. +const useNewChannel = (cb) => { let connectionRef; return amqp.connect(process.env.AMQP_URI || "amqp://guest:guest@localhost:5672") .then(connection => { - //console.info("AMQP connected!"); connectionRef = connection; return connection .on("error", (err) => { - console.error("AMQP connection error!", err); - }) - .on("close", (err) => { - //const crashWorthy = require("amqplib/lib/connection").isFatalError(err); - //console.error("AMQP connection closed!", crashWorthy && err || ""); + // console.log(err); + connectionRef.close(); }) .createChannel(); }) .then((channel) => { - //console.info("AMQP channel created!"); channel - .on("close", () => { - //console.warn("AMQP channel closed!"); - }) - .on("error", (error) => { - console.error("AMQP channel error!", error); + .on("error", (err) => { + // console.log(err); + connectionRef.close(); }); - - return Promise.all(queues.map(q => channel.deleteQueue(q))) - .then(() => Promise.all(exchanges.map(e => channel.deleteExchange(e)))); + return cb(channel); }) - .then(() => { - //console.log("Done."); - return connectionRef.close(); - }) - .catch((err) => { - console.error("AMQP failed to create channel!", err); - }); + .catch(() => {}) + .then(() => connectionRef.close()) + .catch(() => {}); +}; + +const clearQueue = (q) => useNewChannel(channel => channel.purgeQueue(q)); +const deleteQueue = (q) => useNewChannel(channel => channel.deleteQueue(q)); +const deleteExchange = (e) => useNewChannel(channel => channel.deleteExchange(e)); + +module.exports = function({ queues, exchanges }, destroy = false) { + const donePromise = destroy + ? Promise.all(queues.map(deleteQueue).concat(exchanges.map(deleteExchange))) + : Promise.all(queues.map(clearQueue)); + + return donePromise.delay(2000); }; diff --git a/test/integration/amqp/rpc.spc.js b/test/integration/amqp/rpc.spc.js index f689c9eda..aed81fbca 100644 --- a/test/integration/amqp/rpc.spc.js +++ b/test/integration/amqp/rpc.spc.js @@ -11,10 +11,11 @@ function createNode(name, disableBalancer = false, service) { const broker = new ServiceBroker({ namespace: "test-rpc", nodeID: "rpc-" + name, - //logger: console, - //logLevel: "debug", transporter: process.env.AMQP_URI || "amqp://guest:guest@localhost:5672", - disableBalancer + disableBalancer, + registry: { + preferLocal: false + } }); if (service) @@ -23,242 +24,248 @@ function createNode(name, disableBalancer = false, service) { return broker; } -describe("Test AMQPTransporter", () => { - - beforeAll(() => purge(purgeList)); - afterAll(() => purge(purgeList)); - - describe("Test AMQPTransporter RPC with built-in balancer", () => { - - const client = createNode("client", false); - - const worker1 = createNode("worker1", false, { +// Build a super insecure worker. +// Calls to this kind of worker can change the response time and make nodes crash. +// Log entries are made for receiving a request, crashing, and responding to a request. +const createWorker = (number, disableBalancer, logs, options = {}) => { + // We make a nodeRef object so that the crashing node can stop itself. + const nodeRef = { + broker: createNode(`worker${number}`, disableBalancer, { name: "test", actions: { - hello() { - return Promise.resolve(`Hello from ${this.broker.nodeID}`); + hello({ params }) { + const { delay = 0, crash = false } = params; + + logs.push({ type: "receive", worker: number, timestamp: Date.now(), params }); + + if (options.canCrash && crash) { + logs.push({ type: "crash", worker: number, timestamp: Date.now(), params }); + return nodeRef.broker.stop().delay(1000); + } + + return Promise.delay(delay) + .then(() => { + const response = { type: "respond", worker: number, timestamp: Date.now(), params }; + logs.push(response); + return response; + }); } } - }); + }) + }; - const worker2 = createNode("worker2", false, { - name: "test", - actions: { - hello() { - return Promise.resolve(`Hello from ${this.broker.nodeID}`).delay(200); - } - } - }); + return nodeRef.broker; +}; - const worker3 = createNode("worker3", false, { - name: "test", - actions: { - hello() { - return Promise.resolve(`Hello from ${this.broker.nodeID}`); - } - } - }); +// Wrap all test cases in a function so that they can be easily ran with and without balancing. +const runTestCases = (logs, client, worker1, worker2, worker3) => { - beforeEach(() => { - return Promise.all([ - client.start(), - worker1.start(), - worker2.start(), - worker3.start(), - ]); - }); + // Simple function that we use everywhere + const callShortDelay = () => client.call("test.hello", { delay: 20 }); - afterEach(() => Promise.all([ - client.stop(), - worker1.stop(), + it("Only one node should receive any given request", () => { + // Ensure that messages are not broadcast to individual queues. + return callShortDelay() + .catch(protectReject).then(() => { + expect(logs).toHaveLength(2); + expect(logs.filter(a => a.type === "receive")).toHaveLength(1); + expect(logs.filter(a => a.type === "respond")).toHaveLength(1); + }); + }); + + it("Should load balance requests to available nodes.", () => { + // Ensure that messages are evenly distributed + return Promise.all(Array(12).fill().map(callShortDelay)) + .catch(protectReject).then(res => { + expect(res).toHaveLength(12); + + expect(res).toEqual(expect.arrayContaining([ + expect.objectContaining({ worker: 1 }), + expect.objectContaining({ worker: 2 }), + expect.objectContaining({ worker: 3 }), + ])); + + expect(res.map(n => n.worker).sort()).toEqual([ + 1,1,1,1, + 2,2,2,2, + 3,3,3,3 + ]); + }); + }); + + it("Nodes should only receive one request at a time by default", () => { + // Ensure that prefetch is working. This relies on message acking happening after the action + // handler runs. + return Promise.all([ worker2.stop(), worker3.stop(), - ])); - - it("Should send an event to all subscribed nodes.", () => { - return Promise.delay(2000).then(() => { - return Promise.all([ - client.call("test.hello"), - client.call("test.hello"), - client.call("test.hello"), - client.call("test.hello"), - client.call("test.hello"), - client.call("test.hello"), - ]).catch(protectReject).then(res => { - //console.log(res); - expect(res).toHaveLength(6); - expect(res).toEqual(expect.arrayContaining([ - "Hello from rpc-worker1", - "Hello from rpc-worker2", - "Hello from rpc-worker3", - ])); - expect(res.filter(n => n == "Hello from rpc-worker1")).toHaveLength(2); - expect(res.filter(n => n == "Hello from rpc-worker2")).toHaveLength(2); - expect(res.filter(n => n == "Hello from rpc-worker3")).toHaveLength(2); + ]) + .then(() => Promise.all(Array(3).fill().map(callShortDelay))) + .catch(protectReject).then(res => { + const getType = a => a.type; + const getTime = a => a.timestamp; + + logs.forEach((cur, idx) => { + // All requests should be handled by singe node + expect(cur.worker).toEqual(1); + + // Order should go from old -> new + if (logs[idx + 1] !== undefined) { + expect(getTime(cur)).toBeLessThanOrEqual(getTime(logs[idx + 1])); + } + + // If receive and respond don't alternate requests are concurrent + expect(getType(cur)).toBe(idx % 2 === 0 ? "receive" : "respond"); }); }); - }, 10000); }); - describe("Test AMQPTransporter RPC with DISABLED built-in balancer", () => { + it("Should use availability-based load balancing", () => { + // Should allow consumers to pull messages as they can handle them. + // This means that a single slow node won't slow down everything, or cause requests to be + // processed out of order + const callShortDelay = () => client.call("test.hello", { delay: 20 }); - const client = createNode("client", true); + return Promise.all([ + client.call("test.hello", { delay: 3000 }), + ...Array(8).fill().map(callShortDelay), + ]).catch(protectReject).then(res => { + const slowWorker = logs.find(a => a.params.delay === 3000).worker; + const otherWorker = logs.find(a => a.params.delay === 20).worker; - const worker1 = createNode("worker1", true, { - name: "test", - actions: { - hello() { - return Promise.resolve(`Hello from ${this.broker.nodeID}`); - } - } - }); + expect(res).toHaveLength(9); - const worker2 = createNode("worker2", true, { - name: "test", - actions: { - hello() { - return Promise.resolve(`Hello from ${this.broker.nodeID}`).delay(200); - } - } - }); + expect(res).toEqual(expect.arrayContaining([ + expect.objectContaining({ worker: 1 }), + expect.objectContaining({ worker: 2 }), + expect.objectContaining({ worker: 3 }), + ])); - const worker3 = createNode("worker3", true, { - name: "test", - actions: { - hello() { - return Promise.resolve(`Hello from ${this.broker.nodeID}`); - } - } - }); + // Slow worker should only have handled 1 request + expect(res.filter(a => a.worker === slowWorker)).toHaveLength(1); - beforeEach(() => { - return Promise.all([ - client.start(), - worker1.start(), - worker2.start(), - worker3.start(), - ]); + // The other 2 workers should have handled 4 + expect(res.filter(a => a.worker === otherWorker)).toHaveLength(4); }); + }); + + it("Messages that haven't finished processing should be retryable by other nodes.", () => { + // This requires all requests to be made to a single queue. + // This test also requires messages to be acked after the action handler finishes. + // All broker's should consume from the same queue so that messages aren't abandoned in + // node-specific queues, or tried out of order. + const crashRequest = () => { + return client.call("test.hello", { delay: 20, crash: true }) + .catch(err => ({ message: err.message, type: "error"})); + }; + + return Promise.all(Array(9).fill().map(crashRequest)) + .catch(protectReject).then((res) => { + // The responses that failed initially won't show up in res, but the messages are still in + // AMQP. If the messages are not ack'ed until processed, then another node will be able to + // handle them instead. + expect(logs.filter(a => a.type === "respond")).toHaveLength(9); + + // Check that crashing node actually crashed instead of responding + expect(logs.filter(a => a.type === "crash").length).toBeGreaterThanOrEqual(2); - afterEach(() => Promise.all([ - client.stop(), - worker1.stop(), - worker2.stop(), - worker3.stop(), - ])); - - it("Should use availability-based load balancing", () => { - return Promise.delay(2000).then(() => { - return Promise.all([ - client.call("test.hello"), - client.call("test.hello"), - client.call("test.hello"), - client.call("test.hello"), - client.call("test.hello"), - client.call("test.hello"), - ]).catch(protectReject).then(res => { - //console.log(res); - expect(res).toHaveLength(6); - expect(res).toEqual(expect.arrayContaining([ - "Hello from rpc-worker1", - "Hello from rpc-worker2", - "Hello from rpc-worker3", - ])); - - // worker2 is slow, so it received only 1 request. - expect(res.filter(n => n == "Hello from rpc-worker2")).toHaveLength(1); - }); }); - }, 10000); + }, 40000); +}; + +describe("Test AMQPTransporter", () => { + + // Delete all queues and exchanges before and after suite + beforeAll(() => purge(purgeList, true)); + afterAll(() => purge(purgeList, true)); + + // Clear all queues between each test. + afterEach(() => purge(purgeList)); + + describe("Test AMQPTransporter RPC with built-in balancer", () => { + + const logs = []; + + const client = createNode("client", false); + const worker1 = createWorker(1, false, logs); + const worker2 = createWorker(2, false, logs, { canCrash: true }); + const worker3 = createWorker(3, false, logs, { canCrash: true }); + + const brokers = [client, worker1, worker2, worker3]; + + beforeEach(() => Promise.all(brokers.map(broker => broker.start()))); + + afterEach(() => { + logs.length = 0; + return Promise.all(brokers.map(broker => broker.stop())); + }); + + runTestCases(logs, client, worker1, worker2, worker3); + }); - describe("Test AMQPTransporter RPC with DISABLED built-in balancer & retried request", () => { + describe("Test AMQPTransporter RPC with DISABLED built-in balancer", () => { + + const logs = []; const client = createNode("client", true); - let worker2CB = jest.fn(); + const worker1 = createWorker(1, true, logs); + const worker2 = createWorker(2, true, logs, { canCrash: true }); + const worker3 = createWorker(3, true, logs, { canCrash: true }); - const worker1 = createNode("worker1", true, { - name: "test", - actions: { - hello(ctx) { - return Promise.resolve({ - worker: "worker1", - a: ctx.params.a - }); - } - } - }); + const brokers = [client, worker1, worker2, worker3]; - const worker2 = createNode("worker2", true, { - name: "test", - actions: { - hello() { - worker2CB(); - return worker2.stop(); - //return Promise.resolve(`Hello from ${this.broker.nodeID}`).delay(200); - } - } - }); + beforeEach(() => Promise.all(brokers.map(broker => broker.start()))); - const worker3 = createNode("worker3", true, { - name: "test", - actions: { - hello(ctx) { - return Promise.resolve({ - worker: "worker3", - a: ctx.params.a - }); - } - } + afterEach(() => { + logs.length = 0; + return Promise.all(brokers.map(broker => broker.stop())); }); - beforeEach(() => { - return Promise.all([ - client.start(), - worker1.start(), - worker2.start(), - worker3.start(), - ]); - }); + runTestCases(logs, client, worker1, worker2, worker3); - afterEach(() => Promise.all([ - client.stop(), - worker1.stop(), - //worker2.stop(), - worker3.stop(), - ])); - - it("should retry unacked requests to other node", () => { - return Promise.delay(2000).then(() => { - return Promise.all([ - client.call("test.hello", { a: 1 }), - client.call("test.hello", { a: 2 }), - client.call("test.hello", { a: 3 }), - client.call("test.hello", { a: 4 }), - client.call("test.hello", { a: 5 }), - client.call("test.hello", { a: 6 }), - ]).catch(protectReject).then(res => { - //console.log(res); - expect(res).toHaveLength(6); - expect(worker2CB).toHaveBeenCalledTimes(1); - expect(res.filter(o => o.worker == "worker1").length).toBeGreaterThan(0); - expect(res.filter(o => o.worker == "worker3").length).toBeGreaterThan(0); - // worker2 is crashed, so we didn't receive response from it. - expect(res.filter(o => o.worker == "worker2")).toHaveLength(0); - expect(res.map(o => o.a)).toEqual(expect.arrayContaining([1, 2, 3, 4, 5, 6])); - }); - }); - }, 10000); }); - }); const purgeList = { queues: [ + "MOL-test-rpc.DISCONNECT.rpc-client", + "MOL-test-rpc.DISCONNECT.rpc-worker1", + "MOL-test-rpc.DISCONNECT.rpc-worker2", + "MOL-test-rpc.DISCONNECT.rpc-worker3", + "MOL-test-rpc.DISCOVER.rpc-client", + "MOL-test-rpc.DISCOVER.rpc-worker1", + "MOL-test-rpc.DISCOVER.rpc-worker2", + "MOL-test-rpc.DISCOVER.rpc-worker3", + "MOL-test-rpc.EVENT.rpc-client", + "MOL-test-rpc.EVENT.rpc-worker1", + "MOL-test-rpc.EVENT.rpc-worker2", + "MOL-test-rpc.EVENT.rpc-worker3", + "MOL-test-rpc.HEARTBEAT.rpc-client", + "MOL-test-rpc.HEARTBEAT.rpc-worker1", + "MOL-test-rpc.HEARTBEAT.rpc-worker2", + "MOL-test-rpc.HEARTBEAT.rpc-worker3", + "MOL-test-rpc.INFO.rpc-client", + "MOL-test-rpc.INFO.rpc-worker1", + "MOL-test-rpc.INFO.rpc-worker2", + "MOL-test-rpc.INFO.rpc-worker3", + "MOL-test-rpc.PING.rpc-client", + "MOL-test-rpc.PING.rpc-worker1", + "MOL-test-rpc.PING.rpc-worker2", + "MOL-test-rpc.PING.rpc-worker3", + "MOL-test-rpc.PONG.rpc-client", + "MOL-test-rpc.PONG.rpc-worker1", + "MOL-test-rpc.PONG.rpc-worker2", + "MOL-test-rpc.PONG.rpc-worker3", + "MOL-test-rpc.REQ.$node.actions", + "MOL-test-rpc.REQ.$node.health", + "MOL-test-rpc.REQ.$node.list", + "MOL-test-rpc.REQ.$node.services", "MOL-test-rpc.REQ.rpc-client", "MOL-test-rpc.REQ.rpc-worker1", "MOL-test-rpc.REQ.rpc-worker2", "MOL-test-rpc.REQ.rpc-worker3", + "MOL-test-rpc.REQ.test.hello", "MOL-test-rpc.REQB.$node.actions", "MOL-test-rpc.REQB.$node.events", "MOL-test-rpc.REQB.$node.health", @@ -270,7 +277,6 @@ const purgeList = { "MOL-test-rpc.RES.rpc-worker2", "MOL-test-rpc.RES.rpc-worker3", ], - exchanges: [ "MOL-test-rpc.DISCONNECT", "MOL-test-rpc.DISCOVER",