Skip to content

Commit

Permalink
- tokenHandler port from go-nats (called from createConnection())
Browse files Browse the repository at this point in the history
  • Loading branch information
andriy-bulynko committed Apr 17, 2019
1 parent e5f4558 commit c21fc5b
Show file tree
Hide file tree
Showing 2 changed files with 192 additions and 21 deletions.
81 changes: 61 additions & 20 deletions lib/nats.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ const VERSION = '1.2.10',
// Errors
BAD_AUTHENTICATION = 'BAD_AUTHENTICATION',
BAD_AUTHENTICATION_MSG = 'User and Token can not both be provided',
BAD_AUTHENTICATION_TH_NOT_FUNC_MSG = 'tokenHandler must be a function returning a Promise',
BAD_AUTHENTICATION_T_AND_TH_MSG = 'token and tokenHandler cannot both be provided',
BAD_AUTHENTICATION_TH_FAILED_MSG_PREFIX = 'tokenHandler call failed: ',
BAD_JSON = 'BAD_JSON',
BAD_JSON_MSG = 'Message should be a non-circular JSON-serializable value',
BAD_MSG = 'BAD_MSG',
Expand Down Expand Up @@ -282,6 +285,7 @@ Client.prototype.parseOptions = function(opts) {
this.assignOption(opts, 'user');
this.assignOption(opts, 'pass');
this.assignOption(opts, 'token');
this.assignOption(opts, 'tokenHandler');
this.assignOption(opts, 'password', 'pass');
this.assignOption(opts, 'verbose');
this.assignOption(opts, 'pedantic');
Expand Down Expand Up @@ -326,12 +330,21 @@ Client.prototype.parseOptions = function(opts) {

// Set token as needed if in options.
this.token = options.token;
this.tokenHandler = options.tokenHandler;

// Authentication - make sure authentication is valid.
if (this.user && this.token) {
throw (new NatsError(BAD_AUTHENTICATION_MSG, BAD_AUTHENTICATION));
}

if (this.tokenHandler && (typeof this.tokenHandler !== 'function')) {
throw (new NatsError(BAD_AUTHENTICATION_TH_NOT_FUNC_MSG, BAD_AUTHENTICATION));
}

if (this.tokenHandler && this.token) {
throw (new NatsError(BAD_AUTHENTICATION_T_AND_TH_MSG, BAD_AUTHENTICATION));
}

// Encoding - make sure its valid.
if (Buffer.isEncoding(options.encoding)) {
this.encoding = options.encoding;
Expand Down Expand Up @@ -644,6 +657,27 @@ Client.prototype.scheduleHeartbeat = function() {
}, this.options.pingInterval, this);
};

/**
* Schedule a reconnect if configured, else close
*
* @api private
*/
Client.prototype.reconnectOrClose = function() {
const stream = this.stream;
this.closeStream();
if (stream && stream.bytesRead > 0) {
this.emit('disconnect');
}
if (this.closed === true ||
this.options.reconnect === false ||
((this.reconnects >= this.options.maxReconnectAttempts) && this.options.maxReconnectAttempts !== -1)) {
this.cleanupTimers();
this.emit('close');
} else {
this.scheduleReconnect();
}
};

/**
* Properly setup a stream event handlers.
*
Expand All @@ -662,19 +696,8 @@ Client.prototype.setupHandlers = function() {
this.scheduleHeartbeat();
});

stream.on('close', (hadError) => {
this.closeStream();
if(stream.bytesRead > 0) {
this.emit('disconnect');
}
if (this.closed === true ||
this.options.reconnect === false ||
((this.reconnects >= this.options.maxReconnectAttempts) && this.options.maxReconnectAttempts !== -1)) {
this.cleanupTimers();
this.emit('close');
} else {
this.scheduleReconnect();
}
stream.on('close', () => {
this.reconnectOrClose();
});

stream.on('error', (exception) => {
Expand Down Expand Up @@ -827,13 +850,31 @@ Client.prototype.createConnection = function() {
if (this.stream) {
this.stream.removeAllListeners();
this.stream.destroy();
this.stream = null;
}

const doCreateConnection = () => {
// Create the stream
this.stream = net.createConnection(this.url.port, this.url.hostname);
// this change makes it a bit faster on Linux, slightly worse on OS X
this.stream.setNoDelay(true);
// Setup the proper handlers.
this.setupHandlers();
};

if (this.tokenHandler) {
this.tokenHandler().then((token) => {
this.token = token;
doCreateConnection();
}, (err) => {
this.emit('error', new NatsError(BAD_AUTHENTICATION_TH_FAILED_MSG_PREFIX + err, BAD_AUTHENTICATION, err));
setImmediate(() => {
this.reconnectOrClose();
});
});
} else {
doCreateConnection();
}
// Create the stream
this.stream = net.createConnection(this.url.port, this.url.hostname);
// this change makes it a bit faster on Linux, slightly worse on OS X
this.stream.setNoDelay(true);
// Setup the proper handlers.
this.setupHandlers();
};

/**
Expand Down Expand Up @@ -905,7 +946,7 @@ Client.prototype.cleanupTimers = function() {
* @api private
*/
Client.prototype.closeStream = function() {
if (this.stream !== null) {
if (this.stream) {
this.stream.destroy();
this.stream = null;
}
Expand Down
132 changes: 131 additions & 1 deletion test/auth.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ describe('Token Authorization', function() {

// Shutdown our server after we are done
after(function(done) {
server = nsc.stop_server(server, done);
nsc.stop_server(server, done);
});

it('should fail to connect with no credentials ', function(done) {
Expand Down Expand Up @@ -146,3 +146,133 @@ describe('Token Authorization', function() {
});
});
});

describe('tokenHandler Authorization', function() {
const PORT = 1421;
const flags = ['--auth', 'token1'];
const noAuthUrl = 'nats://localhost:' + PORT;
let server;

// Start up our own nats-server
before(function(done) {
server = nsc.start_server(PORT, flags, done);
});

// Shutdown our server after we are done
after(function(done) {
nsc.stop_server(server, done);
});

it('should connect using tokenHandler instead of plain old token', function(done) {
const nc = NATS.connect({
'url': noAuthUrl,
'tokenHandler': () => new Promise((resolve) => {
setTimeout(() => {
resolve('token1');
}, 500);
})
});
nc.on('connect', (nc) => {
setTimeout(() => {
nc.close();
done();
}, 100);
});
});

it('should fail to connect if tokenHandler is not a function', function(done) {
(function() {
const nc = NATS.connect({
'url': noAuthUrl,
'tokenHandler': 'token1'
});
}).should.throw(/tokenHandler must be a function returning a Promise/);
done();
});

it('should fail to connect if both token and tokenHandler are provided', function(done) {
(function() {
const nc = NATS.connect({
'url': noAuthUrl,
'token': 'token1',
'tokenHandler': () => new Promise((resolve) => {
setTimeout(() => {
resolve('token1');
}, 500);
})
});
}).should.throw(/token and tokenHandler cannot both be provided/);
done();
});

it('should NOT connect if tokenHandler fails to return a token', function(done) {
this.timeout(10 * 1000);

const nc = NATS.connect({
'url': noAuthUrl,
'tokenHandler': () => new Promise((resolve, reject) => {
setTimeout(() => {
reject(new Error('no token for you!'));
}, 50);
})
});

let totalErrorCount = 0;
let tokenHandlerErrorCount = 0;
nc.on('error', (err) => {
totalErrorCount++;
if ((/^NatsError: tokenHandler call failed: .+$/).test(err.toString())) {
tokenHandlerErrorCount++;
}
});
nc.on('close', () => {
tokenHandlerErrorCount.should.be.greaterThan(0);
totalErrorCount.should.equal(tokenHandlerErrorCount);
done();
});
});

it('tokenHandler errors can be recovered from', function(done) {
this.timeout(10 * 1000);

let tokenHandlerCallCount = 0;
const nc = NATS.connect({
'url': noAuthUrl,
'tokenHandler': () => new Promise((resolve, reject) => {
tokenHandlerCallCount++;
setTimeout(() => {
if (tokenHandlerCallCount == 2) {
reject(new Error('no token for you!'));
} else {
resolve('token1');
}
}, 50);
})
});

let totalErrorCount = 0;
let tokenHandlerErrorCount = 0;
nc.on('error', (err) => {
totalErrorCount++;
if ((/^NatsError: tokenHandler call failed: .+$/).test(err.toString())) {
tokenHandlerErrorCount++;
}
});
nc.on('connect', () => {
setTimeout(() => {
nsc.stop_server(server, () => {
server = nsc.start_server(PORT, flags);
});
}, 100);
});
nc.on('reconnect', (nc) => {
setTimeout(() => {
nc.close();
tokenHandlerCallCount.should.equal(3);
tokenHandlerErrorCount.should.equal(1);
totalErrorCount.should.equal(tokenHandlerErrorCount);
done();
}, 100);
});
});
});

0 comments on commit c21fc5b

Please sign in to comment.