Skip to content

Commit

Permalink
Merge pull request #633 from AndreMaz/master
Browse files Browse the repository at this point in the history
Fixes #630 +  Backport fix #620
  • Loading branch information
AndreMaz committed Dec 17, 2019
2 parents ab62bb8 + 0fe0ae4 commit aa9afcf
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 35 deletions.
33 changes: 33 additions & 0 deletions dev/nats-wildcard.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
const ServiceBroker = require("../src/service-broker");

const broker = new ServiceBroker({
nodeID: "broker-1",
transporter: {
type: "NATS"
},
metrics: true,
logLevel: "debug",
disableBalancer: true
});

broker.createService({
name: "test",

events: {
"config.site.**.changed": (payload) => {broker.logger.info(payload)},
"config.mail.**.changed": () => {},
"config.accounts.**.changed": () => {},
}
});

async function start() {
await broker.start();

broker.repl();

setInterval(() => {
broker.emit('config.site.test.changed', {data: 123})
}, 1000)
}

start();
9 changes: 5 additions & 4 deletions src/cachers/memory.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,12 @@ class MemoryCacher extends BaseCacher {
this.logger.debug(`FOUND ${key}`);

let item = this.cache.get(key);

if (this.opts.ttl) {
// Update expire time (hold in the cache if we are using it)
item.expire = Date.now() + this.opts.ttl * 1000;
if (item.expire && item.expire < Date.now()) {
this.logger.debug(`EXPIRED ${key}`);
this.cache.delete(key);
return Promise.resolve(null);
}

return Promise.resolve(this.clone ? this.clone(item.data) : item.data);
}
return Promise.resolve(null);
Expand Down
7 changes: 2 additions & 5 deletions src/transit.js
Original file line number Diff line number Diff line change
Expand Up @@ -864,11 +864,8 @@ class Transit {
if (!this.connected || !this.isReady) return Promise.resolve();

const info = this.broker.getLocalNodeInfo();

let p = Promise.resolve();
if (!nodeID)
p = this.tx.makeBalancedSubscriptions();


const p = !nodeID && this.broker.options.disableBalancer ? this.tx.makeBalancedSubscriptions() : Promise.resolve();
return p.then(() => this.publish(new Packet(P.PACKET_INFO, nodeID, {
services: info.services,
ipList: info.ipList,
Expand Down
2 changes: 1 addition & 1 deletion src/transporters/nats.js
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ class NatsTransporter extends Transporter {
* @memberof NatsTransporter
*/
subscribeBalancedEvent(event, group) {
const topic = `${this.prefix}.${PACKET_EVENT}B.${group}.${event}`.replace(/\*\*/g, ">");
const topic = `${this.prefix}.${PACKET_EVENT}B.${group}.${event}`.replace(/\*\*.*$/g, ">");

this.subscriptions.push(this.client.subscribe(topic, { queue: group }, (msg) => this.incomingMessage(PACKET_EVENT, msg)));
}
Expand Down
62 changes: 62 additions & 0 deletions test/unit/cachers/memory.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,68 @@ describe("Test MemoryCacher set & get", () => {

});

describe("Test MemoryCacher get() with expire", () => {

let broker = new ServiceBroker({ logger: false });
let cacher = new MemoryCacher();
cacher.init(broker);

let key = "tst123";
let data1 = {
a: 1,
b: false,
c: "Test",
d: {
e: 55
},
};

const ttlValue = 15;
const currentTime = 1487076708000;

// Solution from: https://stackoverflow.com/a/47781245/11798560
let dateNowSpy;
beforeAll(() => {
// Lock Time
dateNowSpy = jest.spyOn(Date, "now");
});

afterAll(() => {
// Unlock Time
dateNowSpy.mockRestore();
});

it("should save the data with key and a TTL value", () => {
// setting expire date -> will be called by cacher.set()
dateNowSpy.mockImplementationOnce(() => currentTime);

cacher.set(key, data1, ttlValue);
const entry = cacher.cache.get(key);
expect(entry).toBeDefined();
expect(entry.data).toBe(data1);
expect(entry.expire).toBe(currentTime + ttlValue * 1000);
});

it("should give back the data after 14 secs", () => {
// date.now() in cacher.get() will advance by 14 secs
dateNowSpy.mockImplementationOnce(() => currentTime + 14 * 1000);

return cacher.get(key).then(obj => {
expect(obj).toBeDefined();
expect(obj).toEqual(data1);
});
});

it("should remove the entry after 15 secs", () => {
// date.now() in cacher.get() will advance by 16 secs
dateNowSpy.mockImplementationOnce(() => currentTime + 16 * 1000);

return cacher.get(key).then(obj => {
expect(obj).toBeNull();
});
});
});

describe("Test MemoryCacher set & get with default cloning", () => {

let broker = new ServiceBroker({ logger: false });
Expand Down
24 changes: 23 additions & 1 deletion test/unit/transit.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -971,9 +971,12 @@ describe("Test Transit.sendNodeInfo", () => {
});
});

it("should call publish with correct params if has no nodeID", () => {
it("should call publish with correct params if has no nodeID & disableBalancer: true", () => {
// Set disableBalancer option
broker.options.disableBalancer = true
transit.publish.mockClear();
broker.getLocalNodeInfo.mockClear();
transit.tx.makeBalancedSubscriptions.mockClear();

return transit.sendNodeInfo().then(() => {
expect(transit.tx.makeBalancedSubscriptions).toHaveBeenCalledTimes(1);
Expand All @@ -987,6 +990,25 @@ describe("Test Transit.sendNodeInfo", () => {
});
});

it("should call publish with correct params if has no nodeID & disableBalancer: false", () => {
// Set disableBalancer option
broker.options.disableBalancer = false
transit.publish.mockClear();
broker.getLocalNodeInfo.mockClear();
transit.tx.makeBalancedSubscriptions.mockClear();

return transit.sendNodeInfo().then(() => {
expect(transit.tx.makeBalancedSubscriptions).toHaveBeenCalledTimes(0);
expect(transit.publish).toHaveBeenCalledTimes(1);
expect(broker.getLocalNodeInfo).toHaveBeenCalledTimes(1);
const packet = transit.publish.mock.calls[0][0];
expect(packet).toBeInstanceOf(P.Packet);
expect(packet.type).toBe(P.PACKET_INFO);
expect(packet.target).toBe();
expect(packet.payload.services).toEqual([]);
});
});

});

describe("Test Transit.sendPing", () => {
Expand Down
78 changes: 54 additions & 24 deletions test/unit/transporters/nats.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -174,34 +174,64 @@ describe("Test NatsTransporter subscribe & publish", () => {
expect(transporter.subscriptions).toEqual([123]);
});

it("check subscribeBalancedEvent", () => {
let subCb;
transporter.client.subscribe = jest.fn((name, opts, cb) => {
subCb = cb;
return 125;
describe("Test subscribeBalancedEvent", () => {

it("check subscription & unsubscription", () => {
let subCb;
transporter.client.subscribe = jest.fn((name, opts, cb) => {
subCb = cb;
return 125;
});
transporter.incomingMessage = jest.fn();

transporter.subscribeBalancedEvent("user.created", "mail");

expect(transporter.client.subscribe).toHaveBeenCalledTimes(1);
expect(transporter.client.subscribe).toHaveBeenCalledWith("MOL-TEST.EVENTB.mail.user.created", { queue: "mail" }, jasmine.any(Function));

// Test subscribe callback
subCb("{ sender: \"node1\" }");
expect(transporter.incomingMessage).toHaveBeenCalledTimes(1);
expect(transporter.incomingMessage).toHaveBeenCalledWith("EVENT", "{ sender: \"node1\" }");
expect(transporter.subscriptions).toEqual([125]);

// Test unsubscribeFromBalancedCommands
transporter.client.unsubscribe = jest.fn();
transporter.client.flush = jest.fn(cb => cb());

return transporter.unsubscribeFromBalancedCommands().catch(protectReject).then(() => {
expect(transporter.subscriptions).toEqual([]);
expect(transporter.client.unsubscribe).toHaveBeenCalledTimes(1);
expect(transporter.client.unsubscribe).toHaveBeenCalledWith(125);
expect(transporter.client.flush).toHaveBeenCalledTimes(1);
});
});
transporter.incomingMessage = jest.fn();

transporter.subscribeBalancedEvent("user.created", "mail");
it("check with '*' wildchar topic", () => {
transporter.client.subscribe = jest.fn();

expect(transporter.client.subscribe).toHaveBeenCalledTimes(1);
expect(transporter.client.subscribe).toHaveBeenCalledWith("MOL-TEST.EVENTB.mail.user.created", { queue: "mail" }, jasmine.any(Function));
transporter.subscribeBalancedEvent("user.*", "users");

// Test subscribe callback
subCb("{ sender: \"node1\" }");
expect(transporter.incomingMessage).toHaveBeenCalledTimes(1);
expect(transporter.incomingMessage).toHaveBeenCalledWith("EVENT", "{ sender: \"node1\" }");
expect(transporter.subscriptions).toEqual([125]);

// Test unsubscribeFromBalancedCommands
transporter.client.unsubscribe = jest.fn();
transporter.client.flush = jest.fn(cb => cb());

return transporter.unsubscribeFromBalancedCommands().catch(protectReject).then(() => {
expect(transporter.subscriptions).toEqual([]);
expect(transporter.client.unsubscribe).toHaveBeenCalledTimes(1);
expect(transporter.client.unsubscribe).toHaveBeenCalledWith(125);
expect(transporter.client.flush).toHaveBeenCalledTimes(1);
expect(transporter.client.subscribe).toHaveBeenCalledTimes(1);
expect(transporter.client.subscribe).toHaveBeenCalledWith("MOL-TEST.EVENTB.users.user.*", { queue: "users" }, jasmine.any(Function));
});

it("check with '**' wildchar topic", () => {
transporter.client.subscribe = jest.fn();

transporter.subscribeBalancedEvent("user.**", "users");

expect(transporter.client.subscribe).toHaveBeenCalledTimes(1);
expect(transporter.client.subscribe).toHaveBeenCalledWith("MOL-TEST.EVENTB.users.user.>", { queue: "users" }, jasmine.any(Function));
});

it("check with '**' wildchar (as not last) topic", () => {
transporter.client.subscribe = jest.fn();

transporter.subscribeBalancedEvent("user.**.changed", "users");

expect(transporter.client.subscribe).toHaveBeenCalledTimes(1);
expect(transporter.client.subscribe).toHaveBeenCalledWith("MOL-TEST.EVENTB.users.user.>", { queue: "users" }, jasmine.any(Function));
});
});

Expand Down

0 comments on commit aa9afcf

Please sign in to comment.