Skip to content

Commit

Permalink
Merge branch 'master' into next
Browse files Browse the repository at this point in the history
  • Loading branch information
icebob committed Apr 2, 2024
2 parents 1681116 + 6a5c249 commit 4823e38
Show file tree
Hide file tree
Showing 13 changed files with 174 additions and 37 deletions.
22 changes: 21 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,27 @@ These changes means the 0.15 cachers create different cache keys than 0.14 cache

--------------------------------------------------
<a name="Unreleased"></a>
# [Unreleased](https://github.com/moleculerjs/moleculer/compare/v0.14.29...master)
# [Unreleased](https://github.com/moleculerjs/moleculer/compare/v0.14.33...master)

--------------------------------------------------
<a name="0.14.33"></a>
# [0.14.33](https://github.com/moleculerjs/moleculer/compare/v0.14.32...v0.14.33) (2024-04-02)

## Changes
- autodetect Redis type discoverer when using redis SSL URI [#1260](https://github.com/moleculerjs/moleculer/pull/1260)
- change redis client events [#1269](https://github.com/moleculerjs/moleculer/pull/1269)
- fix transit connecting state flag [#1258](https://github.com/moleculerjs/moleculer/pull/1258)
- add hook middlewares interceptors to preserve call context with call middlewares [#1270](https://github.com/moleculerjs/moleculer/pull/1270)
- remove unnecessary clone in node update method [#1274](https://github.com/moleculerjs/moleculer/pull/1274)

--------------------------------------------------
<a name="0.14.32"></a>
# [0.14.32](https://github.com/moleculerjs/moleculer/compare/v0.14.31...v0.14.32) (2023-11-12)

## Changes
- update peer dependency for mqtt to 5.0.2 [#1236](https://github.com/moleculerjs/moleculer/pull/1236)
- update d.ts [#1245](https://github.com/moleculerjs/moleculer/pull/1245), [#1246](https://github.com/moleculerjs/moleculer/pull/1246), [#1248](https://github.com/moleculerjs/moleculer/pull/1248)
- fix stream sending logic to avoid memory leak [#1243](https://github.com/moleculerjs/moleculer/pull/1243)

--------------------------------------------------
<a name="0.14.31"></a>
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "moleculer",
"version": "0.14.31",
"version": "0.14.33",
"description": "Fast & powerful microservices framework for Node.JS",
"main": "index.js",
"exports": {
Expand Down
2 changes: 1 addition & 1 deletion src/cachers/redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class RedisCacher extends BaseCacher {

this.connected = false;

this.client.on("connect", () => {
this.client.on("ready", () => {
this.connected = true;

/* istanbul ignore next */
Expand Down
13 changes: 10 additions & 3 deletions src/middleware.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ class MiddlewareHandler {
this.list = [];

this.registeredHooks = {};

this.middlewareInterceptors = {};
}

add(mw) {
Expand All @@ -55,9 +57,14 @@ class MiddlewareHandler {

Object.keys(mw).forEach(key => {
if (isFunction(mw[key])) {
if (Array.isArray(this.registeredHooks[key]))
this.registeredHooks[key].push(mw[key]);
else this.registeredHooks[key] = [mw[key]];
const handle = isFunction(this.middlewareInterceptors[key])
? this.middlewareInterceptors[key](mw[key])
: mw[key];
if (Array.isArray(this.registeredHooks[key])) {
this.registeredHooks[key].push(handle);
} else {
this.registeredHooks[key] = [handle];
}
}
});

Expand Down
5 changes: 1 addition & 4 deletions src/registry/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@

"use strict";

const _ = require("lodash");

/**
* Import types
*
Expand Down Expand Up @@ -70,8 +68,7 @@ class Node {
this.client = payload.client || {};
this.config = payload.config || {};

// Process services & events (should make a clone because it will manipulate the objects (add handlers))
this.services = _.cloneDeep(payload.services);
this.services = payload.services;
this.rawInfo = payload;

const newSeq = payload.seq || 1;
Expand Down
25 changes: 15 additions & 10 deletions src/registry/registry.js
Original file line number Diff line number Diff line change
Expand Up @@ -321,28 +321,31 @@ class Registry {
_.forIn(actions, action => {
if (!this.checkActionVisibility(action, node)) return;

// Clone fields to have independent action object
const serviceAction = { ...action };

if (node.local) {
action.handler = this.broker.middlewares.wrapHandler(
serviceAction.handler = this.broker.middlewares.wrapHandler(
"localAction",
action.handler,
action
);
} else if (this.broker.transit) {
action.handler = this.broker.middlewares.wrapHandler(
serviceAction.handler = this.broker.middlewares.wrapHandler(
"remoteAction",
this.broker.transit.request.bind(this.broker.transit),
{ ...action, service }
);
}
if (this.broker.options.disableBalancer && this.broker.transit)
action.remoteHandler = this.broker.middlewares.wrapHandler(
serviceAction.remoteHandler = this.broker.middlewares.wrapHandler(
"remoteAction",
this.broker.transit.request.bind(this.broker.transit),
{ ...action, service }
);

this.actions.add(node, service, action);
service.addAction(action);
this.actions.add(node, service, serviceAction);
service.addAction(serviceAction);
});
}

Expand Down Expand Up @@ -447,15 +450,17 @@ class Registry {
*/
registerEvents(node, service, events) {
_.forIn(events, event => {
const serviceEvent = { ...event };

if (node.local)
event.handler = this.broker.middlewares.wrapHandler(
serviceEvent.handler = this.broker.middlewares.wrapHandler(
"localEvent",
event.handler,
event
serviceEvent.handler,
serviceEvent
);

this.events.add(node, service, event);
service.addEvent(event);
this.events.add(node, service, serviceEvent);
service.addEvent(serviceEvent);
});
}

Expand Down
16 changes: 16 additions & 0 deletions src/service-broker.js
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ class ServiceBroker {

// Middleware handler
this.middlewares = new MiddlewareHandler(this);
this.middlewares.middlewareInterceptors["call"] = this.interceptCallMiddleware;

// Service registry
this.registry = new Registry(this);
Expand Down Expand Up @@ -408,6 +409,21 @@ class ServiceBroker {
this.metrics.set(METRIC.MOLECULER_BROKER_MIDDLEWARES_TOTAL, this.middlewares.count());
}

/**
* It is necessary to keep the context of the call when using call middleware.
*/
interceptCallMiddleware(createMiddleware) {
return next => {
let result = null;
const call = createMiddleware((...args) => (result = next(...args)));
return (...args) => {
const promise = call(...args);
if (result) promise.ctx = result.ctx;
return promise;
};
};
}

/**
* Register Moleculer Core metrics.
*/
Expand Down
5 changes: 2 additions & 3 deletions src/transit.js
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,9 @@ class Transit {

return this.Promise.resolve()
.then(() => {
if (this.tx.connected) {
return this.discoverer.localNodeDisconnected().then(() => this.tx.disconnect());
}
return this.tx.connected && this.discoverer.localNodeDisconnected();
})
.then(() => this.tx.disconnect())
.then(() => (this.disconnecting = false));
}

Expand Down
4 changes: 2 additions & 2 deletions src/transporters/redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@ class RedisTransporter extends Transporter {
let clientSub = this.getRedisClient(this.opts);
this._clientSub = clientSub; // For tests

clientSub.on("connect", () => {
clientSub.on("ready", () => {
this.logger.info("Redis-sub client is connected.");

let clientPub = this.getRedisClient(this.opts);
this._clientPub = clientPub; // For tests

clientPub.on("connect", () => {
clientPub.on("ready", () => {
this.clientSub = clientSub;
this.clientPub = clientPub;

Expand Down
61 changes: 61 additions & 0 deletions test/integration/middlewares.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,67 @@ const utils = require("../../src/utils");
const { protectReject } = require("../unit/utils");

describe("Test middleware system", () => {
describe("Test hook middleware interceptors.", () => {
const broker = new ServiceBroker({
logger: false,
validator: false,
internalMiddlewares: false,
middlewares: [
{
call(next) {
return (actionName, params, opts) => {
return next(actionName, params, opts).then(res => {
return res;
});
};
}
},
{
call(next) {
return (actionName, params, opts) => {
return next(actionName, params, opts).then(res => {
return Promise.resolve(res + "!");
});
};
}
}
]
});

broker.createService({
name: "contextDataPassedTest",
actions: {
setContextMeta: {
handler(ctx) {
ctx.meta.$metainfo = "testmeta";
}
}
}
});

broker.createService({
name: "test",
actions: {
testAction: {
async handler(ctx) {
await ctx.call("contextDataPassedTest.setContextMeta");
return ctx.meta.$metainfo;
}
}
}
});

beforeAll(() => broker.start());
afterAll(() => broker.stop());

it("The context is passed through the call middlware.", async () => {
const p = broker.call("test.testAction");
await expect(p).resolves.toBe("testmeta!");
expect(p.ctx).toBeDefined();
expect(p.ctx.meta.$metainfo).toBe("testmeta");
});
});

describe("Test with sync & async middlewares", () => {
let flow = [];
let mw1Sync = {
Expand Down
6 changes: 6 additions & 0 deletions test/unit/registry/discoverers/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ describe("Test Discoverers resolver", () => {
expect(discoverer.opts.redis).toEqual("redis://redis-server:6379");
});

it("should resolve Redis reporter from SSL connection string", () => {
const discoverer = Discoverers.resolve("rediss://redis-server:6379");
expect(discoverer).toBeInstanceOf(Discoverers.Redis);
expect(discoverer.opts.redis).toEqual("rediss://redis-server:6379");
});

it("should resolve Redis discoverer from obj", () => {
const options = { heartbeatInterval: 8 };
const discoverer = Discoverers.resolve({ type: "Redis", options });
Expand Down
26 changes: 26 additions & 0 deletions test/unit/service-broker.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,32 @@ describe("Test ServiceBroker constructor", () => {
});
});

describe("Test broker.interceptCallMiddleware", () => {
const broker = new ServiceBroker({
logger: false,
transporter: null
});

it("should preserve the context of the call", async () => {
const next = () => {
const promise = Promise.resolve("next-result");
promise.ctx = { test: "test" };
return promise;
};

const createMiddleware = jest.fn(next => (...args) => {
return next(...args).then(() => "middleware-result");
});

const createInterceptedMiddleware = broker.interceptCallMiddleware(createMiddleware);

const result = createInterceptedMiddleware(next)();

expect(result.ctx).toEqual({ test: "test" });
await expect(result).resolves.toBe("middleware-result");
});
});

describe("Test broker.start", () => {
describe("without transporter", () => {
let schema;
Expand Down
24 changes: 12 additions & 12 deletions test/unit/transporters/redis.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ function itShouldTestRedisTransportConnectDisconnect(clusterMode = false) {
let p = transporter.connect().then(() => {
expect(transporter.clientSub).toBeDefined();
expect(transporter.clientSub.on).toHaveBeenCalledTimes(4);
expect(transporter.clientSub.on).toHaveBeenCalledWith("connect", expect.any(Function));
expect(transporter.clientSub.on).toHaveBeenCalledWith("ready", expect.any(Function));
expect(transporter.clientSub.on).toHaveBeenCalledWith("error", expect.any(Function));
expect(transporter.clientSub.on).toHaveBeenCalledWith("close", expect.any(Function));
expect(transporter.clientSub.on).toHaveBeenCalledWith(
Expand All @@ -151,13 +151,13 @@ function itShouldTestRedisTransportConnectDisconnect(clusterMode = false) {

expect(transporter.clientPub).toBeDefined();
expect(transporter.clientPub.on).toHaveBeenCalledTimes(3);
expect(transporter.clientPub.on).toHaveBeenCalledWith("connect", expect.any(Function));
expect(transporter.clientPub.on).toHaveBeenCalledWith("ready", expect.any(Function));
expect(transporter.clientPub.on).toHaveBeenCalledWith("error", expect.any(Function));
expect(transporter.clientPub.on).toHaveBeenCalledWith("close", expect.any(Function));
});

transporter._clientSub.onCallbacks.connect();
transporter._clientPub.onCallbacks.connect();
transporter._clientSub.onCallbacks.ready();
transporter._clientPub.onCallbacks.ready();

return p;
});
Expand All @@ -182,8 +182,8 @@ function itShouldTestRedisTransportConnectDisconnect(clusterMode = false) {
});
});

transporter._clientSub.onCallbacks.connect();
transporter._clientPub.onCallbacks.connect();
transporter._clientSub.onCallbacks.ready();
transporter._clientPub.onCallbacks.ready();
// Trigger an error
transporter._clientPub.onCallbacks.error(new Error("Ups"));
transporter._clientSub.onCallbacks.error(new Error("Ups"));
Expand All @@ -198,8 +198,8 @@ function itShouldTestRedisTransportConnectDisconnect(clusterMode = false) {
expect(transporter.onConnected).toHaveBeenCalledWith();
});

transporter._clientSub.onCallbacks.connect();
transporter._clientPub.onCallbacks.connect();
transporter._clientSub.onCallbacks.ready();
transporter._clientPub.onCallbacks.ready();

return p;
});
Expand All @@ -215,8 +215,8 @@ function itShouldTestRedisTransportConnectDisconnect(clusterMode = false) {
expect(cbPub).toHaveBeenCalledTimes(1);
});

transporter._clientSub.onCallbacks.connect(); // Trigger the `resolve`
transporter._clientPub.onCallbacks.connect(); // Trigger the `resolve`
transporter._clientSub.onCallbacks.ready(); // Trigger the `resolve`
transporter._clientPub.onCallbacks.ready(); // Trigger the `resolve`
return p;
});
}
Expand Down Expand Up @@ -247,8 +247,8 @@ function itShouldTestRedisTransportPublishSubscribe(clusterMode = false) {
transporter.incomingMessage = jest.fn();

let p = transporter.connect();
transporter._clientSub.onCallbacks.connect(); // Trigger the `resolve`
transporter._clientPub.onCallbacks.connect(); // Trigger the `resolve`
transporter._clientSub.onCallbacks.ready(); // Trigger the `resolve`
transporter._clientPub.onCallbacks.ready(); // Trigger the `resolve`
return p;
});

Expand Down

0 comments on commit 4823e38

Please sign in to comment.