Skip to content

Commit

Permalink
Merge f5d6a35 into 6a5897f
Browse files Browse the repository at this point in the history
  • Loading branch information
aricart committed Jun 7, 2019
2 parents 6a5897f + f5d6a35 commit 24b0639
Show file tree
Hide file tree
Showing 3 changed files with 446 additions and 3 deletions.
24 changes: 24 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export const BAD_REPLY: string;
export const BAD_SUBJECT: string;
export const CLIENT_CERT_REQ: string;
export const CONN_CLOSED: string;
export const CONN_DRAINING: string;
export const CONN_ERR: string;
export const INVALID_ENCODING: string;
export const NATS_PROTOCOL_ERR: string;
Expand All @@ -37,6 +38,7 @@ export const PERMISSIONS_ERR: string;
export const REQ_TIMEOUT: string;
export const SECURE_CONN_REQ: string;
export const STALE_CONNECTION_ERR: string;
export const SUB_DRAINING: string;

/**
* Create a properly formatted inbox subject.
Expand Down Expand Up @@ -129,6 +131,28 @@ declare class Client extends events.EventEmitter {
*/
unsubscribe(sid: number, max?: number):void;

/**
* Draining a subscription is similar to unsubscribe but inbound pending messages are
* not discarded. When the last in-flight message is processed, the subscription handler
* is removed.
* @param sid
* @param callback
*/
drainSubscription(sid: number, callback?:Function):void;

/**
* Drains all subscriptions. If an opt_callback is provided, the callback
* is called if there's an error with an error argument.
*
* Note that after calling drain, it is impossible to create new subscriptions
* or any requests. As soon as all messages for the draining subscriptions are
* processed, it is also impossible to publish new messages.
*
* A drained connection is closed when the opt_callback is called without arguments.
* @param callback
*/
drain(callback?:Function):void;

/**
* Set a timeout on a subscription.
*/
Expand Down
154 changes: 151 additions & 3 deletions lib/nats.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ const VERSION = '1.2.10',
CLIENT_CERT_REQ_MSG = 'Server requires a client certificate.',
CONN_CLOSED = 'CONN_CLOSED',
CONN_CLOSED_MSG = 'Connection closed',
CONN_DRAINING = 'CONN_DRAINING',
CONN_DRAINING_MSG = 'Connection draining',
SUB_DRAINING = 'SUB_DRAINING',
SUB_DRAINING_MSG = 'Subscription draining',
CONN_ERR = 'CONN_ERR',
CONN_ERR_MSG_PREFIX = 'Could not connect to server: ',
INVALID_ENCODING = 'INVALID_ENCODING',
Expand Down Expand Up @@ -157,6 +161,7 @@ exports.BAD_SUBJECT = BAD_SUBJECT;
exports.BAD_MSG = BAD_MSG;
exports.BAD_REPLY = BAD_REPLY;
exports.CONN_CLOSED = CONN_CLOSED;
exports.CONN_DRAINING = CONN_DRAINING;
exports.BAD_JSON = BAD_JSON;
exports.BAD_AUTHENTICATION = BAD_AUTHENTICATION;
exports.INVALID_ENCODING = INVALID_ENCODING;
Expand All @@ -165,6 +170,7 @@ exports.NON_SECURE_CONN_REQ = NON_SECURE_CONN_REQ;
exports.CLIENT_CERT_REQ = CLIENT_CERT_REQ;
exports.NATS_PROTOCOL_ERR = NATS_PROTOCOL_ERR;
exports.REQ_TIMEOUT = REQ_TIMEOUT;
exports.SUB_DRAINING = SUB_DRAINING;


/**
Expand Down Expand Up @@ -1419,6 +1425,83 @@ Client.prototype.flush = function(opt_callback) {
}
};

/**
* Drains all subscriptions. If an opt_callback is provided, the callback
* is called if there's an error with an error argument.
*
* Note that after calling drain, it is impossible to create new subscriptions
* or any requests. As soon as all messages for the draining subscriptions are
* processed, it is also impossible to publish new messages.
*
* A drained connection is closed when the opt_callback is called without arguments.
* @param opt_callback
*/
Client.prototype.drain = function(opt_callback) {
if(this.handledClosedOrDraining(opt_callback)) {
return;
}
this.draining = true;
const subs = [];
const drains = [];
for (const sid in this.subs) {
if (this.subs.hasOwnProperty(sid)) {
const sub = this.subs[sid];
sub.sid = sid;
subs.push(sub);
}
}

subs.forEach((sub) => {
this.drainSubscription(sub.sid, () => {
drains.push(sub);
if(drains.length === subs.length) {
this.noMorePublishing = true;
this.flush(() => {
this.close();
if (typeof opt_callback === 'function') {
opt_callback();
}
});
}
});
});

// no subscriptions
if(subs.length === 0) {
this.noMorePublishing = true;
this.close();
if (typeof opt_callback === 'function') {
opt_callback();
}
}
};

/**
* Returns true if the client is closed or draining, caller should
* return as error was generated.
* @private
* @param opt_callback
* @returns {boolean}
*/
Client.prototype.handledClosedOrDraining = function(opt_callback) {
if (this.closed) {
if (typeof opt_callback === 'function') {
opt_callback(new NatsError(CONN_CLOSED_MSG, CONN_CLOSED));
} else {
throw (new NatsError(CONN_CLOSED_MSG, CONN_CLOSED));
}
return true;
}
if (this.draining) {
if (typeof opt_callback === 'function') {
opt_callback(new NatsError(CONN_DRAINING_MSG, CONN_DRAINING));
} else {
throw (new NatsError(CONN_DRAINING_MSG, CONN_DRAINING));
}
return true;
}
};

/**
* Publish a message to the given subject, with optional reply and callback.
*
Expand All @@ -1435,6 +1518,15 @@ Client.prototype.publish = function(subject, msg, opt_reply, opt_callback) {
subject = undefined;
}

if (this.noMorePublishing) {
if (typeof opt_callback === 'function') {
opt_callback(new NatsError(CONN_DRAINING_MSG, CONN_DRAINING));
} else {
this.emit('error', new NatsError(CONN_DRAINING_MSG, CONN_DRAINING));
}
return;
}

if (!this.options.json) {
msg = msg || EMPTY;
} else {
Expand Down Expand Up @@ -1512,9 +1604,6 @@ Client.prototype.publish = function(subject, msg, opt_reply, opt_callback) {
* @api public
*/
Client.prototype.subscribe = function(subject, opts, callback) {
if (this.closed) {
throw (new NatsError(CONN_CLOSED_MSG, CONN_CLOSED));
}
let qgroup, max;
if (typeof opts === 'function') {
callback = opts;
Expand All @@ -1524,6 +1613,11 @@ Client.prototype.subscribe = function(subject, opts, callback) {
qgroup = opts.queue;
max = opts.max;
}

if(this.handledClosedOrDraining(callback)) {
return 0;
}

this.ssid += 1;
this.subs[this.ssid] = {
'subject': subject,
Expand Down Expand Up @@ -1594,6 +1688,50 @@ Client.prototype.unsubscribe = function(sid, opt_max) {
}
};


/**
* Draining a subscription is similar to unsubscribe but inbound pending messages are
* not discarded. When the last in-flight message is processed, the subscription handler
* is removed.
* @param sid
* @param opt_callback
*/
Client.prototype.drainSubscription = function(sid, opt_callback) {
if(this.handledClosedOrDraining(opt_callback)) {
return;
}

const sub = this.subs[sid];
if (sub === undefined) {
if (typeof opt_callback === 'function') {
opt_callback();
}
return;
}
if(sub.draining) {
if (typeof opt_callback === 'function') {
opt_callback(new NatsError(SUB_DRAINING_MSG, SUB_DRAINING));
} else {
throw (new NatsError(SUB_DRAINING_MSG, SUB_DRAINING));
}
return;
}
sub.draining = true;
const proto = [UNSUB, sid + CR_LF];
this.sendCommand(proto.join(SPC));
this.flush(() => {
if(sub.timeout) {
clearTimeout(sub.timeout);
sub.timeout = null;
}
delete this.subs[sid];
this.emit('unsubscribe', sid, sub.subject);
if (typeof opt_callback === 'function') {
opt_callback();
}
});
};

/**
* Set a timeout on a subscription. The subscription is cancelled if the
* expected number of messages is reached or the timeout is reached.
Expand Down Expand Up @@ -1670,6 +1808,10 @@ Client.prototype.request = function(subject, opt_msg, opt_options, callback) {
opt_options = null;
}

if(this.handledClosedOrDraining(callback)) {
return 0;
}

opt_options = opt_options || {};
const conf = this.initMuxRequestDetails(callback, opt_options.max);
this.publish(subject, opt_msg, conf.inbox);
Expand Down Expand Up @@ -1701,6 +1843,12 @@ Client.prototype.oldRequest = function(subject, opt_msg, opt_options, callback)
callback = opt_options;
opt_options = null;
}

if (this.draining) {
callback(new NatsError(CONN_DRAINING_MSG, CONN_DRAINING));
return;
}

const inbox = this.createInbox();
const s = this.subscribe(inbox, opt_options, function(msg, reply) {
callback(msg, reply);
Expand Down

0 comments on commit 24b0639

Please sign in to comment.