Skip to content
This repository has been archived by the owner on Jan 5, 2019. It is now read-only.

Commit

Permalink
Add linting
Browse files Browse the repository at this point in the history
  • Loading branch information
imbstack committed Oct 19, 2017
1 parent f3ed68b commit 80915a8
Show file tree
Hide file tree
Showing 17 changed files with 1,367 additions and 561 deletions.
3 changes: 0 additions & 3 deletions .babelrc

This file was deleted.

3 changes: 3 additions & 0 deletions .eslintrc
@@ -0,0 +1,3 @@
{
"extends": "eslint-config-taskcluster"
}
6 changes: 5 additions & 1 deletion package.json
Expand Up @@ -8,7 +8,7 @@
"compile": "babel-compile -p taskcluster src:lib test:.test",
"prepublish": "npm run compile",
"pretest": "npm run compile",
"test": "mocha ./.test/*_test.js"
"test": "mocha .test/lint.js ./.test/*_test.js"
},
"repository": {
"type": "git",
Expand All @@ -17,10 +17,14 @@
"dependencies": {
"amqplib": "^0.5.1",
"babel-compile": "^2.0.0",
"babel-eslint": "^8",
"babel-preset-taskcluster": "^3.0.0",
"debug": "^3.1.0",
"eslint-config-taskcluster": "^2.0.0",
"eslint-plugin-taskcluster": "^1.0.2",
"hawk": "^6.0.2",
"lodash": "^4.17.4",
"mocha-eslint": "^3.0.1",
"promise": "^8.0.1",
"pulse-publisher": "^2.0.0",
"slugid": "^1.1.0",
Expand Down
315 changes: 156 additions & 159 deletions src/client.js

Large diffs are not rendered by default.

23 changes: 10 additions & 13 deletions src/parsetime.js
@@ -1,5 +1,3 @@
"use strict";

// Regular expression matching:
// A years B months C days D hours E minutes F seconds
var timeExp = new RegExp([
Expand All @@ -11,28 +9,27 @@ var timeExp = new RegExp([
'(\\s*(\\d+)\\s*h((ours?)|r)?)?',
'(\\s*(\\d+)\\s*m(in(utes?)?)?)?',
'(\\s*(\\d+)\\s*s(ec(onds?)?)?)?',
'\\s*$'
'\\s*$',
].join(''), 'i');


/** Parse time string */
var parseTime = function(str) {
// Parse the string
var match = timeExp.exec(str || '');
if (!match) {
throw new Error("String: '" + str + "' isn't a time expression");
throw new Error('String: \'' + str + '\' isn\'t a time expression');
}
// Negate if needed
var neg = (match[2] === '-' ? - 1 : 1);
var neg = match[2] === '-' ? - 1 : 1;
// Return parsed values
return {
years: parseInt(match[4] || 0) * neg,
months: parseInt(match[8] || 0) * neg,
weeks: parseInt(match[11] || 0) * neg,
days: parseInt(match[15] || 0) * neg,
hours: parseInt(match[18] || 0) * neg,
minutes: parseInt(match[22] || 0) * neg,
seconds: parseInt(match[25] || 0) * neg
years: parseInt(match[4] || 0, 10) * neg,
months: parseInt(match[8] || 0, 10) * neg,
weeks: parseInt(match[11] || 0, 10) * neg,
days: parseInt(match[15] || 0, 10) * neg,
hours: parseInt(match[18] || 0, 10) * neg,
minutes: parseInt(match[22] || 0, 10) * neg,
seconds: parseInt(match[25] || 0, 10) * neg,
};
};

Expand Down
100 changes: 49 additions & 51 deletions src/pulselistener.js
Expand Up @@ -17,8 +17,8 @@ var URL = require('url');
* }
*/
var buildPulseConnectionString = function(options) {
assert(options.username, "options.username password is required");
assert(options.password, "options.password is required");
assert(options.username, 'options.username password is required');
assert(options.password, 'options.password is required');

// Construct connection string
return [
Expand All @@ -29,15 +29,15 @@ var buildPulseConnectionString = function(options) {
'@',
options.hostname || 'pulse.mozilla.org',
':',
5671 // Port for SSL
5671, // Port for SSL
].join('');
};

/** Connect to AMQP server while retrying connection establishment */
var retryConnect = function(connectionString, retries) {
return amqplib.connect(connectionString, {
noDelay: true,
timeout: 30 * 1000
timeout: 30 * 1000,
}).catch(function(err) {
if (retries > 0) {
return retryConnect(connectionString, retries - 1);
Expand All @@ -61,9 +61,9 @@ var retryConnect = function(connectionString, retries) {
* }
*/
var PulseConnection = function(options) {
assert(typeof(options) === 'object', "options is required");
assert(typeof options === 'object', 'options is required');
options = _.defaults({}, options, {
namespace: options.username || ''
namespace: options.username || '',
});

// a fake connection does notihng but signal its fake-ness to listeners
Expand All @@ -75,9 +75,9 @@ var PulseConnection = function(options) {
if (!options.connectionString) {
options.connectionString = buildPulseConnectionString(options);
} else {
assert(!options.username, "Can't take `username` along with `connectionString`");
assert(!options.password, "Can't take `password` along with `connectionString`");
assert(!options.hostname, "Can't take `hostname` along with `connectionString`");
assert(!options.username, 'Can\'t take `username` along with `connectionString`');
assert(!options.password, 'Can\'t take `password` along with `connectionString`');
assert(!options.hostname, 'Can\'t take `hostname` along with `connectionString`');
}

// If namespace was not explicitly set infer it from connection string...
Expand Down Expand Up @@ -125,16 +125,16 @@ PulseConnection.prototype.connect = function() {

// Setup error handling
conn.on('error', function(err) {
debug("Connection error in Connection: %s", err, err.stack);
debug('Connection error in Connection: %s', err, err.stack);
that.emit('error', err);
});
conn.on('close', function() {
if (!that._conn) {
return; // Forget this, if close() was called
}
debug("Connection closed unexpectedly");
debug('Connection closed unexpectedly');
that.emit('error', new Error(
"Connection closed unexpectedly, likely server initiated shutdown"
'Connection closed unexpectedly, likely server initiated shutdown'
));
});

Expand Down Expand Up @@ -162,7 +162,6 @@ PulseConnection.prototype.close = function() {
return Promise.resolve(undefined);
};


// Export PulseConnection
exports.PulseConnection = PulseConnection;

Expand Down Expand Up @@ -193,18 +192,18 @@ exports.PulseConnection = PulseConnection;
*/
var PulseListener = function(options) {
var that = this;
assert(options, "options are required");
assert(options, 'options are required');
assert(options.connection ||
options.credentials, "options.connection or credentials is required");
options.credentials, 'options.connection or credentials is required');
this._bindings = [];
this._options = _.defaults(options, {
prefetch: 5,
queueName: undefined,
maxLength: undefined
maxLength: undefined,
});

this._fake = (options.credentials && options.credentials.fake) ||
(options.connection && options.connection.fake);
this._fake = options.credentials && options.credentials.fake ||
options.connection && options.connection.fake;

// Ensure that we have connection object
this._connection = options.connection || null;
Expand Down Expand Up @@ -239,13 +238,13 @@ util.inherits(PulseListener, events.EventEmitter);
* instance of `Client`, see `createClient`.
*/
PulseListener.prototype.bind = function(binding) {
assert(typeof(binding.exchange) === 'string',
"Can't bind to unspecified exchange!");
assert(typeof(binding.routingKeyPattern) === 'string',
"routingKeyPattern is required!");
assert(typeof binding.exchange === 'string',
'Can\'t bind to unspecified exchange!');
assert(typeof binding.routingKeyPattern === 'string',
'routingKeyPattern is required!');
this._bindings.push(binding);
if(!this._fake && this._channel) {
debug("Binding %s to %s with pattern '%s'",
if (!this._fake && this._channel) {
debug('Binding %s to %s with pattern \'%s\'',
this._queueName || 'exclusive queue',
binding.exchange, binding.routingKeyPattern);
return this._channel.bindQueue(
Expand All @@ -262,7 +261,7 @@ PulseListener.prototype.bind = function(binding) {
PulseListener.prototype.connect = function() {
var that = this;

assert(!this._fake, "Fake listeners can't connect");
assert(!this._fake, 'Fake listeners can\'t connect');

// Return channel if we have one
if (this._channel) {
Expand All @@ -280,16 +279,16 @@ PulseListener.prototype.connect = function() {
// Prevent invalidation of the connection, by someone calling .close()
// this way channel.close() won't be called when .close() is called.
that._channel = null;
debug("Channel error in PulseListener: ", err.stack);
debug('Channel error in PulseListener: ', err.stack);
that.emit('error', err);
});
channel.on('close', function() {
if (!that._channel) {
return; // Ignore if close() was called
}
debug("Channel was closed unexpectedly");
debug('Channel was closed unexpectedly');
that.emit('error', new Error(
"Channel closed unexpectedly, likely server initiated shutdown"
'Channel closed unexpectedly, likely server initiated shutdown'
));
});
return channel.prefetch(that._options.prefetch);
Expand All @@ -301,8 +300,8 @@ PulseListener.prototype.connect = function() {
this._queueName = [
'queue', // Required by pulse security model
this._connection.namespace, // Required by pulse security model
this._options.queueName || 'exclusive/' + slugid.v4()
].join('/')
this._options.queueName || 'exclusive/' + slugid.v4(),
].join('/');

// Create queue
var queueCreated = channelCreated.then(function() {
Expand All @@ -322,7 +321,7 @@ PulseListener.prototype.connect = function() {
var bindingsCreated = queueCreated.then(function() {
that._channel = channel;
return Promise.all(that._bindings.map(function(binding) {
debug("Binding %s to %s with pattern %s",
debug('Binding %s to %s with pattern %s',
that._queueName || 'exclusive queue',
binding.exchange, binding.routingKeyPattern);
return channel.bindQueue(
Expand All @@ -342,16 +341,16 @@ PulseListener.prototype.connect = function() {
/** Pause consumption of messages */
PulseListener.prototype.pause = function() {
if (this._fake) {
assert(this._fakeListening, "cannot pause when not listening");
assert(this._fakeListening, 'cannot pause when not listening');
this._fakeListening = false;
return Promise.resolve();
}

if (!this._channel) {
debug("WARNING: Paused PulseListener instance was wasn't connected yet");
debug('WARNING: Paused PulseListener instance was wasn\'t connected yet');
return Promise.resolve();
}
assert(this._channel, "Can't pause when not connected");
assert(this._channel, 'Can\'t pause when not connected');
return this._channel.cancel(this._consumerTag);
};

Expand All @@ -360,7 +359,7 @@ PulseListener.prototype.resume = function() {
var that = this;

if (this._fake) {
assert(!this._fakeListening, "cannot resume when already listening");
assert(!this._fakeListening, 'cannot resume when already listening');
this._fakeListening = true;
return Promise.resolve();
}
Expand All @@ -385,10 +384,10 @@ PulseListener.prototype.resume = function() {
* }
*/
PulseListener.prototype.fakeMessage = function(message) {
assert(this._fake, "fakeMessage can only be called on a fake PulseListener");
assert(this._fakeListening, "fakeMessage must be called on a resume'd listener");
assert(this._fake, 'fakeMessage can only be called on a fake PulseListener');
assert(this._fakeListening, 'fakeMessage must be called on a resume\'d listener');
var msg = {
content: new Buffer(JSON.stringify(message.payload), "utf-8"),
content: new Buffer(JSON.stringify(message.payload), 'utf-8'),
fields: {
exchange: message.exchange,
routingKey: message.routingKey,
Expand All @@ -401,7 +400,7 @@ PulseListener.prototype.fakeMessage = function(message) {
},
};
return this._handle(msg);
}
};

/** Handle message*/
PulseListener.prototype._handle = function(msg) {
Expand All @@ -412,7 +411,7 @@ PulseListener.prototype._handle = function(msg) {
exchange: msg.fields.exchange,
routingKey: msg.fields.routingKey,
redelivered: msg.fields.redelivered,
routes: []
routes: [],
};

// Find CC'ed routes
Expand All @@ -430,7 +429,7 @@ PulseListener.prototype._handle = function(msg) {
// Find routing key reference, if any is available to us
var routingKeyReference = null;
this._bindings.forEach(function(binding) {
if(binding.exchange === message.exchange && binding.routingKeyReference) {
if (binding.exchange === message.exchange && binding.routingKeyReference) {
routingKeyReference = binding.routingKeyReference;
}
});
Expand All @@ -441,7 +440,7 @@ PulseListener.prototype._handle = function(msg) {
var routing = {};
var keys = message.routingKey.split('.');
// first handle non-multi keys from the beginning
for(var i = 0; i < routingKeyReference.length; i++) {
for (var i = 0; i < routingKeyReference.length; i++) {
var ref = routingKeyReference[i];
if (ref.multipleWords) {
break;
Expand All @@ -451,29 +450,28 @@ PulseListener.prototype._handle = function(msg) {
// If we reached a multi key
if (i < routingKeyReference.length) {
// then handle non-multi keys from the end
for(var j = routingKeyReference.length - 1; j > i; j--) {
for (var j = routingKeyReference.length - 1; j > i; j--) {
var ref = routingKeyReference[j];
if (ref.multipleWords) {
break;
}
routing[ref.name] = keys.pop();
}
// Check that we only have one multiWord routing key
assert(i == j, "i != j really shouldn't be the case");
assert(i == j, 'i != j really shouldn\'t be the case');
routing[routingKeyReference[i].name] = keys.join('.');
}

// Provide parsed routing key
message.routing = routing;
}
catch(err) {
} catch (err) {
// Ideally we should rethrow the exception. But since it's not quite
// possible to promise that `routing` (the parsed routing key) is
// available... As you can subscribe without providing a routing
// key reference.
// In short people can assume this is present in most cases, and if they
// assume this we get the error at a level where they can handle it.
debug("Failed to parse routingKey: %s for %s with err: %s, as JSON: %j",
debug('Failed to parse routingKey: %s for %s with err: %s, as JSON: %j',
message.routingKey, message.exchange, err, err, err.stack);
}
}
Expand All @@ -487,24 +485,24 @@ PulseListener.prototype._handle = function(msg) {
if (!that._fake) {
return that._channel.ack(msg);
} else {
debug("Processed fake message %j from %s", message, message.exchange);
debug('Processed fake message %j from %s', message, message.exchange);
}
}).then(null, function(err) {
debug("Failed to process message %j from %s with error: %s, as JSON: %j",
debug('Failed to process message %j from %s with error: %s, as JSON: %j',
message, message.exchange, err, err, err.stack);
if (that._fake) {
return;
}
if (message.redelivered) {
debug("Nack (without requeueing) message %j from %s",
debug('Nack (without requeueing) message %j from %s',
message, message.exchange);
return that._channel.nack(msg, false, false);
} else {
// Nack and requeue
return that._channel.nack(msg, false, true);
}
}).then(null, function(err) {
debug("CRITICAL: Failed to nack message");
debug('CRITICAL: Failed to nack message');
that.emit('error', err);
});
};
Expand Down

0 comments on commit 80915a8

Please sign in to comment.