Skip to content

Commit

Permalink
Merge pull request #173 from noodlefrenzy/address-parsing
Browse files Browse the repository at this point in the history
refactor address parsing
  • Loading branch information
mbroadst committed Oct 12, 2015
2 parents d771ca4 + 119bf49 commit 1369fbc
Show file tree
Hide file tree
Showing 13 changed files with 299 additions and 170 deletions.
7 changes: 6 additions & 1 deletion lib/amqp_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,15 @@ AMQPClient.prototype.connect = function(url) {

debug('connecting to: ' + url);
self._reconnect = self.connect.bind(self, url);
var address = u.parseAddress(url);
var address = self.policy.parseAddress(url);
self._defaultQueue = address.path.substr(1);
self.policy.connect.options.hostname = address.host;
var sasl = address.user ? new Sasl() : null;
if (!!sasl && !!address.vhost) {
sasl._remoteHostname = address.vhost;
self.policy.connect.options.hostname = address.vhost;
}

self._connection = self._newConnection();
self._connection.on(Connection.Connected, function(c) {
debug('connected');
Expand Down
2 changes: 1 addition & 1 deletion lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ Connection.prototype._connect = function(address, sasl) {
self.client = tls.connect(sslOptions);
debug('Connecting to ' + self.address.host + ':' + self.address.port + ' via TLS');
} else {
self.client = net.connect({port: self.address.port, host: self.address.host});
self.client = net.connect({ port: self.address.port, host: self.address.host });
debug('Connecting to ' + self.address.host + ':' + self.address.port + ' via straight-up sockets');
}
self.client.on(isSSL ? 'secureConnect' : 'connect', function() {
Expand Down
2 changes: 1 addition & 1 deletion lib/frames/sasl_frame.js
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ function SaslInit(options) {
this.hostname = up.get(options, 2);
} else {
u.assertArguments(options, ['mechanism']);
this.mechanism = options.mechanism;
this.mechanism = u.coerce(options.mechanism, AMQPSymbol);
this.initialResponse = options.initialResponse;
this.hostname = options.hostname;
}
Expand Down
20 changes: 9 additions & 11 deletions lib/index.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
'use strict';
var DefaultPolicy = require('./policies/default_policy'),
EventHubPolicy = require('./policies/event_hub_policy'),
ServiceBusQueuePolicy = require('./policies/service_bus_queue_policy'),
ServiceBusTopicPolicy = require('./policies/service_bus_topic_policy'),
translator = require('./adapters/translate_encoder'),

pu = require('./policies/policy_utilities'),
u = require('./utilities');

Expand All @@ -14,17 +9,20 @@ module.exports = {
Errors: require('./errors'),
Policy: {
Default: DefaultPolicy,
EventHub: EventHubPolicy,
ServiceBusQueue: ServiceBusQueuePolicy,
ServiceBusTopic: ServiceBusTopicPolicy,
EventHub: require('./policies/event_hub_policy'),
ServiceBusQueue: require('./policies/service_bus_queue_policy'),
ServiceBusTopic: require('./policies/service_bus_topic_policy'),
QpidJava: require('./policies/qpid_java_policy'),
Utils: pu,
merge: function(newPolicy, base) {
return u.deepMerge(newPolicy, base || DefaultPolicy);
}
},

/**
* translator, which allows you to translate from node-amqp-encoder'd values into the
* internal types used in this library. (e.g. [ 'symbol', 'symval' ] => Symbol('symval') ).
* translator, which allows you to translate from node-amqp-encoder'd
* values into the internal types used in this library. (e.g.
* [ 'symbol', 'symval' ] => Symbol('symval') ).
*/
translator: translator
translator: require('./adapters/translate_encoder')
};
43 changes: 42 additions & 1 deletion lib/policies/default_policy.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict';

var constants = require('../constants'),
var url = require('url'),
constants = require('../constants'),
putils = require('./policy_utilities');

function containerName() {
Expand Down Expand Up @@ -38,6 +39,7 @@ module.exports = {
strategy: 'fibonacci', // || 'exponential'
forever: true
},

connect: {
options: {
containerId: containerName(),
Expand All @@ -58,6 +60,7 @@ module.exports = {
}
}
},

session: {
options: {
nextOutgoingId: constants.session.defaultOutgoingId,
Expand All @@ -68,6 +71,7 @@ module.exports = {
windowQuantum: constants.session.defaultIncomingWindow,
enableSessionFlowControl: true
},

senderLink: {
attach: {
name: linkName('sender'),
Expand All @@ -80,6 +84,7 @@ module.exports = {
encoder: function(body) { return body; },
reattach: null
},

receiverLink: {
attach: {
name: linkName('receiver'),
Expand All @@ -92,5 +97,41 @@ module.exports = {
creditQuantum: 100,
decoder: function(body) { return body; },
reattach: null
},

/**
* Parses an address for use when connecting to an AMQP 1.0 broker
*/
parseAddress: function(address) {
var parsedAddress = url.parse(address);
var result = {
host: parsedAddress.hostname,
path: parsedAddress.path || '/',
protocol: parsedAddress.protocol.slice(0, -1).toLowerCase() || 'amqp',
};

if (result.protocol !== 'amqp' && result.protocol !== 'amqps')
throw new Error('invalid protocol: ' + result.protocol);

if (!!parsedAddress.port) {
result.port = parseInt(parsedAddress.port);
} else {
switch (result.protocol.toLowerCase()) {
case 'amqp': result.port = constants.defaultPort; break;
case 'amqps': result.port = constants.defaultTlsPort; break;
}
}

result.rootUri = parsedAddress.protocol + '//';
if (!!parsedAddress.auth) {
result.rootUri += parsedAddress.auth + '@';

var userPass = parsedAddress.auth.split(':', 2);
result.user = userPass[0];
result.pass = userPass[1] || null;
}

result.rootUri += result.host + ':' + result.port;
return result;
}
};
20 changes: 20 additions & 0 deletions lib/policies/qpid_java_policy.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
'use strict';

var u = require('../utilities'),
DefaultPolicy = require('./default_policy');

module.exports = u.deepMerge({
defaultSubjects: false,
parseAddress: function(address) {
var result = DefaultPolicy.parseAddress(address);

// accept vhosts
if (result.path !== '/') {
var pathParts = result.path.substr(1).split('/');
result.vhost = pathParts[0];
result.path = pathParts.slice(1).join('/');
}

return result;
}
}, DefaultPolicy);
4 changes: 3 additions & 1 deletion lib/sasl.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ Sasl.prototype.headerReceived = function(header) {
debug('Server SASL Version: ' + header.toString('hex') + ' vs ' + constants.saslVersion.toString('hex'));
if (u.bufferEquals(header, constants.saslVersion)) {
this.receivedHeader = true;
// Wait for mechanisms
// Wait for mechanisms
} else {
this.callback(new errors.MalformedHeaderError('Invalid SASL Header ' + header.toString('hex')));
}
Expand All @@ -59,6 +59,8 @@ Sasl.prototype._processFrame = function(frame) {
mechanism: new AMQPSymbol('PLAIN'),
initialResponse: buf.get()
});

if (!!this._remoteHostname) initFrame.hostname = this._remoteHostname;
this.connection.sendFrame(initFrame);
} else {
throw new errors.NotImplementedError('Only supports SASL-PLAIN at the moment.');
Expand Down
62 changes: 1 addition & 61 deletions lib/utilities.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,10 @@

var _ = require('lodash'),
Int64 = require('node-int64'),

constants = require('./constants'),
errors = require('./errors'),

uuid = require('uuid'),
errors = require('./errors'),
utilities = module.exports = {};


/**
* Encodes given value into node-amqp-encoder format.
*
Expand Down Expand Up @@ -92,62 +88,6 @@ function bufferEquals(lhs, rhs, offset1, offset2, size) {

utilities.bufferEquals = bufferEquals;

// Constants
var addressRegex = new RegExp('^(amqps?)://([^:/]+)(?::([0-9]+))?(/.*)?$');
var addressWithCredentialsRegex = new RegExp('^(amqps?)://([^:]+):([^@]+)@([^:/]+)(?::([0-9]+))?(/.*)?$');

function getPort(port, protocol) {
if (port) {
var asFloat = parseFloat(port);
if (!isNaN(asFloat) && isFinite(port) && (port % 1 === 0)) {
return asFloat;
} else {
throw new Error('Invalid port: ' + port);
}
} else {
switch (protocol) {
case 'amqp':
return constants.defaultPort;
case 'amqps':
return constants.defaultTlsPort;
default:
throw new Error('Unknown Protocol ' + protocol);
}
}
}

function parseAddress(address) {
var results = addressWithCredentialsRegex.exec(address);
if (results) {
results = {
protocol: results[1],
user: decodeURIComponent(results[2]),
pass: decodeURIComponent(results[3]),
host: results[4],
port: getPort(results[5], results[1]),
path: results[6] || '/'
};
results.rootUri = results.protocol + '://' + results.user + ':' + results.pass + '@' + results.host + ':' + results.port;
} else {
results = addressRegex.exec(address);
if (results) {
results = {
protocol: results[1],
host: results[2],
port: getPort(results[3], results[1]),
path: results[4] || '/'
};
results.rootUri = results.protocol + '://' + results.host + ':' + results.port;
}
}

if (results) return results;

throw new Error('Failed to parse ' + address);
}

utilities.parseAddress = parseAddress;

// @todo: this "parsing" should be far more rigorous
utilities.parseLinkAddress = function(address, policy) {
if (policy && !policy.defaultSubjects) {
Expand Down
103 changes: 103 additions & 0 deletions test/unit/address.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
"use strict";
var DefaultPolicy = require('../../lib/policies/default_policy'),
QpidJavaPolicy = require('../../lib/policies/qpid_java_policy'),
expect = require('chai').expect;

describe('Address', function() {
describe('default', function() {
[
{
description: 'amqp no port no route',
address: 'amqp://127.0.0.1',
expected: {
protocol: 'amqp', host: '127.0.0.1', port: 5672, path: '/',
rootUri: 'amqp://127.0.0.1:5672'
}
},
{
description: 'amqps no port no route',
address: 'amqps://localhost',
expected: {
protocol: 'amqps', host: 'localhost', port: 5671, path: '/',
rootUri: 'amqps://localhost:5671'
}
},
{
description: 'should match with port and with/without route (1)',
address: 'amqp://localhost:1234',
expected: {
protocol: 'amqp', host: 'localhost', port: 1234, path: '/',
rootUri: 'amqp://localhost:1234'
}
},
{
description: 'should match with port and with/without route (2)',
address: 'amqps://mq.myhost.com:1235/myroute?with=arguments&multiple=arguments',
expected: {
protocol: 'amqps', host: 'mq.myhost.com', port: 1235,
path: '/myroute?with=arguments&multiple=arguments',
rootUri: 'amqps://mq.myhost.com:1235'
}
},
{
description: 'should match ip + port',
address: 'amqp://10.42.1.193:8118/testqueue',
expected: {
protocol: 'amqp', host: '10.42.1.193', port: 8118, path: '/testqueue',
rootUri: 'amqp://10.42.1.193:8118'
}
},
{
description: 'should match credentials no port no route',
address: 'amqp://username:password@my.amqp.server',
expected: {
protocol: 'amqp', host: 'my.amqp.server', port: 5672, path: '/',
user: 'username', pass: 'password',
rootUri: 'amqp://username:password@my.amqp.server:5672'
}
},
{
description: 'should match credentials with port and route',
address: 'amqps://username:password@192.168.1.1:1234/myroute',
expected: {
protocol: 'amqps', host: '192.168.1.1', port: 1234, path: '/myroute',
user: 'username', pass: 'password',
rootUri: 'amqps://username:password@192.168.1.1:1234'
}
}
].forEach(function(testCase) {
it('should match ' + testCase.description, function() {
expect(DefaultPolicy.parseAddress(testCase.address))
.to.eql(testCase.expected);
});
});

[
{ address: 'invalid://localhost', error: 'Should validate protocol' }

].forEach(function(testCase, idx) {
it('should throw error on invalid address (' + (idx+1) + ')', function() {
expect(function() {
DefaultPolicy.parseAddress(testCase.address);
}).to.throw(Error, null, testCase.error);
});
});
});

describe('Qpid Java', function() {
it('should parse vhosts', function() {
var address = 'amqps://username:password@192.168.1.1:1234/some-vhost/topic/and/more';
expect(QpidJavaPolicy.parseAddress(address)).to.eql({
host: "192.168.1.1",
pass: "password",
path: "topic/and/more",
port: 1234,
protocol: "amqps",
rootUri: "amqps://username:password@192.168.1.1:1234",
user: "username",
vhost: "some-vhost"
});
});
});

});
Loading

0 comments on commit 1369fbc

Please sign in to comment.