Skip to content

Commit

Permalink
Merge pull request #255 from rmccallum81/delayed-shutdown
Browse files Browse the repository at this point in the history
REQUEST FOR COMMENTS: Graceful shutdown of active calls
  • Loading branch information
icebob committed May 3, 2018
2 parents 5040a2e + ad9cb20 commit 0630503
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 16 deletions.
23 changes: 21 additions & 2 deletions benchmark/suites/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,31 @@ let bench4 = benchmark.createSuite("Remote call with FakeTransporter");
b1.start().then(() => b2.start());

let c = 0;
bench4.add("Remote call echo.reply", done => {
bench4.ref("Remote call echo.reply", done => {
return b1.call("echo.reply", { a: c++ }).then(done);
});

bench4.add("Remote call echo.reply with tracking", done => {
b2.options.trackContext = true;
return b1.call("echo.reply", { a: c++ }, { trackContext: true }).then(done);
});
})();
// ----------------------------------------------------------------

let bench5 = benchmark.createSuite("Context tracking");
(function() {
let broker = createBroker();
bench5.ref("broker.call (normal)", done => {
return broker.call("math.add", { a: 4, b: 2 }).then(done);
});

bench5.add("broker.call (with trackContext)", done => {
return broker.call("math.add", { a: 4, b: 2 }, { trackContext: true }).then(done);
});

})();

module.exports = benchmark.run([bench1, bench2, bench3, bench4]);
module.exports = benchmark.run([bench1, bench2, bench3, bench4, bench5]);


/*
Expand Down
38 changes: 38 additions & 0 deletions src/context.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,40 @@ class Context {
this.cachedResult = false;
}

/**
* Add a context to be tracked as active
*
* @param {Context} context
*
* @private
* @memberof Service
*/
_trackContext(service) {
if ( !service._activeContexts ) {
service._activeContexts = [];
}
service._activeContexts.push(this);
this.trackedBy = service;
}

/**
* Remove a context from the list of active context
*
* @param {Context} context
*
* @private
* @memberof Service
*/
dispose() {
if ( this.trackedBy && this.trackedBy._activeContexts ) {
const contextList = this.trackedBy._activeContexts;
const contextIndex = contextList.indexOf(this);
if (contextIndex !== -1) {
contextList.splice(contextIndex, 1);
}
}
}

generateID() {
this.id = generateToken();
}
Expand Down Expand Up @@ -127,6 +161,10 @@ class Context {
ctx.requestID = ctx.id;
}

if (opts.trackContext) {
ctx._trackContext(action.service || broker);
}

return ctx;
}

Expand Down
33 changes: 33 additions & 0 deletions src/service-broker.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ const defaultOptions = {
heartbeatInterval: 5,
heartbeatTimeout: 15,

trackContext: false,
gracefulStopTimeout: 2000,

disableBalancer: false,

registry: {
Expand Down Expand Up @@ -376,6 +379,7 @@ class ServiceBroker {
*/
stop() {
return Promise.resolve()
// TODO: Close subscriber to stop accepting requests
.then(() => {
// Call service `stopped` handlers
return Promise.all(this.services.map(svc => svc.stopped.call(svc)));
Expand Down Expand Up @@ -845,6 +849,11 @@ class ServiceBroker {
*/
call(actionName, params, opts = {}) {
const endpoint = this.findNextActionEndpoint(actionName, opts);

// Add trackContext option from broker options
if (opts.trackContext === undefined && this.options.trackContext)
opts.trackContext = this.options.trackContext;

if (endpoint instanceof Error)
return Promise.reject(endpoint);

Expand Down Expand Up @@ -892,6 +901,10 @@ class ServiceBroker {
* @memberof ServiceBroker
*/
callWithoutBalancer(actionName, params, opts = {}) {
// Add trackContext option from broker options
if (opts.trackContext === undefined && this.options.trackContext)
opts.trackContext = this.options.trackContext;

if (opts.timeout == null)
opts.timeout = this.options.requestTimeout || 0;

Expand Down Expand Up @@ -978,6 +991,14 @@ class ServiceBroker {
});
}

// Remove the context from the active contexts list
if (ctx.trackedBy) {
p.then(res => {
ctx.dispose();
return res;
});
}

// Error handler
p = p.catch(err => this._callErrorHandler(err, ctx, endpoint, opts));

Expand Down Expand Up @@ -1007,6 +1028,14 @@ class ServiceBroker {
if (ctx.timeout > 0 && p.timeout)
p = p.timeout(ctx.timeout);

// Remove the context from the active contexts list
if (ctx.trackedBy) {
p.then(res => {
ctx.dispose();
return res;
});
}

// Handle half-open state in circuit breaker
if (this.options.circuitBreaker.enabled && endpoint) {
p = p.then(res => {
Expand Down Expand Up @@ -1087,6 +1116,10 @@ class ServiceBroker {

err.ctx = ctx;

if (opts.trackContext) {
ctx.dispose();
}

if (nodeID != this.nodeID) {
// Remove pending request (if the request didn't reached the target service)
this.transit.removePendingRequest(ctx.id);
Expand Down
61 changes: 48 additions & 13 deletions src/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,17 @@ class Service {
// Expose to call `service.actions.find({ ...params })`
this.actions[name] = (params, opts) => {
const ctx = this.broker.ContextFactory.create(this.broker, innerAction, null, params, opts || {});
return innerAction.handler(ctx);
const contextDispose = (ret) => {
if (opts.trackContext) {
ctx.dispose();
}

return ret;
};
const contextDisposeCatch = (ret) => {
return this.Promise.reject(contextDispose(ret));
};
return innerAction.handler(ctx).then(contextDispose).catch(contextDisposeCatch);
};

});
Expand Down Expand Up @@ -119,22 +129,24 @@ class Service {
event.handler = function (payload, sender, eventName) {
if (_.isFunction(handler)) {
const p = handler.apply(self, [payload, sender, eventName]);
// TODO: Track event handler started

// Handle async-await returns
if (utils.isPromise(p)) {
/* istanbul ignore next */
p.catch(err => self.logger.error(err));
}
} // TODO: Cleanup event tracking

} else if (Array.isArray(handler)) {
handler.forEach(fn => {
const p = fn.apply(self, [payload, sender, eventName]);
// TODO: Track event handler started

// Handle async-await returns
if (utils.isPromise(p)) {
/* istanbul ignore next */
p.catch(err => self.logger.error(err));
}
} // TODO: Cleanup event tracking
});
}

Expand Down Expand Up @@ -192,17 +204,30 @@ class Service {
};

this.stopped = () => {
if (_.isFunction(this.schema.stopped))
return this.Promise.method(this.schema.stopped).call(this);

if (Array.isArray(this.schema.stopped)) {
return this.schema.stopped
.reverse()
.map(fn => this.Promise.method(fn.bind(this)))
.reduce((p, fn) => p.then(fn), this.Promise.resolve());
}
return new this.Promise((resolve, reject) => {
const timeout = setTimeout(reject, this.settings.$gracefulStopTimeout || this.broker.options.gracefulStopTimeout);
const checkForContexts = () => {
if (this._getActiveContexts().length === 0) {
clearTimeout(timeout);
resolve();
} else {
setTimeout(checkForContexts, 100);
}
};
setImmediate(checkForContexts);
}).finally(() => {
if (_.isFunction(this.schema.stopped))
return this.Promise.method(this.schema.stopped).call(this);

if (Array.isArray(this.schema.stopped)) {
return this.schema.stopped
.reverse()
.map(fn => this.Promise.method(fn.bind(this)))
.reduce((p, fn) => p.then(fn), this.Promise.resolve());
}

return this.Promise.resolve();
return this.Promise.resolve();
});
};

// Call the created event handler
Expand Down Expand Up @@ -260,6 +285,16 @@ class Service {
return action;
}

/**
* Retrieve list of active contexts for the service
* @returns {Set}
* @private
* @memberof Service
*/
_getActiveContexts() {
return this._activeContexts || [];
}

/**
* Wait for other services
*
Expand Down
37 changes: 36 additions & 1 deletion test/integration/service.lifecycle.spec.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
let ServiceBroker = require("../../src/service-broker");
const ServiceBroker = require("../../src/service-broker");
const { protectReject } = require("../unit/utils");

describe("Test Service handlers", () => {

Expand Down Expand Up @@ -88,3 +89,37 @@ describe("Test Service handlers after broker.start", () => {
});
});
});


describe("Test Service handlerswith delayed shutdown", () => {
const broker = new ServiceBroker({ nodeID: "node-1", trackContext: true });

const schema = {
name: "delayed",

actions: {
test: jest.fn()
},

stopped: jest.fn()
};

it("should called stopped", () => {
const service = broker.createService(schema);
service.schema.actions.test.mockResolvedValue(service.Promise.delay(80));
const getActiveContextsSpy = jest.spyOn(service, "_getActiveContexts");
return broker.start()
.then(() => {
broker.call("delayed.test", {});
return service.Promise.delay(10);
})
.then(() => broker.stop())
.then(() => {
expect(schema.actions.test).toHaveBeenCalledTimes(1);
expect(getActiveContextsSpy).toHaveBeenCalledTimes(2);

getActiveContextsSpy.mockReset();
getActiveContextsSpy.mockRestore();
}).catch(protectReject);
});
});
2 changes: 2 additions & 0 deletions test/unit/service-broker.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ describe("Test ServiceBroker constructor", () => {
statistics: true,
heartbeatTimeout : 20,
heartbeatInterval: 5,
trackContext: false,
gracefulStopTimeout: 2000,

disableBalancer: true,
registry: {
Expand Down

0 comments on commit 0630503

Please sign in to comment.