diff --git a/lib/amqp_client.js b/lib/amqp_client.js index 4d71271..e629078 100644 --- a/lib/amqp_client.js +++ b/lib/amqp_client.js @@ -391,7 +391,8 @@ AMQPClient.prototype._preventReconnect = function() { AMQPClient.prototype._shouldReconnect = function() { if (!this._connection || !this._reconnect) return false; - if (!this._timeouts.length && !this.policy.reconnect.forever) return false; + if (!this._timeouts || + !this._timeouts.length && !this.policy.reconnect.forever) return false; return true; }; @@ -407,8 +408,8 @@ AMQPClient.prototype._attemptReconnection = function() { // promise back to sender. As such, it will signal two uncaught // exceptions, and then dump and quit the program. Need to find a // better way to tie the reconnect promise to the original request. - if (!self._shouldReconnect()) - throw err; // rethrow + // if (!self._shouldReconnect()) + // throw err; // rethrow }); }; diff --git a/lib/errors.js b/lib/errors.js index 21e842c..cc1820d 100644 --- a/lib/errors.js +++ b/lib/errors.js @@ -201,3 +201,15 @@ errors.TransportError = function(msg) { this.name = 'AmqpTransportError'; }; util.inherits(errors.TransportError, errors.BaseError); + +/** + * Used to indicate that a link requires an active connection + * + * @extends BaseError + * @constructor + */ +errors.NotConnectedError = function(msg) { + errors.BaseError.call(this, msg); + this.name = 'AmqpNotConnectedError'; +}; +util.inherits(errors.NotConnectedError, errors.BaseError); diff --git a/lib/sender_link.js b/lib/sender_link.js index 7714400..10dcfdb 100644 --- a/lib/sender_link.js +++ b/lib/sender_link.js @@ -58,8 +58,11 @@ SenderLink.prototype.canSend = function() { * @return {Promise} */ SenderLink.prototype.send = function(msg, options) { - if (!this.session.connection) { - throw new Error('Must connect before sending'); + if (!this.session.connection || this.session.connection.connected === false) { + var state = this.state(); + if (state !== 'attached' && state !== 'attaching' && state !== 'reattaching') { + throw new errors.NotConnectedError('Link requires active connection'); + } } options = options || {}; diff --git a/test/unit/client.test.js b/test/unit/client.test.js index 73a7bbd..b91b256 100644 --- a/test/unit/client.test.js +++ b/test/unit/client.test.js @@ -7,6 +7,7 @@ var _ = require('lodash'), AMQPClient = require('../../lib').Client, MockServer = require('./mocks').Server, + errors = require('../../lib/errors'), constants = require('../../lib/constants'), frames = require('../../lib/frames'), @@ -462,6 +463,46 @@ describe('Client', function() { .catch(function(err) { done(); }); }); }); + + it('should throw an error if attempting to send with no connection', function(done) { + test.server.setResponseSequence([ + constants.amqpVersion, + new frames.OpenFrame(test.client.policy.connect.options), + new frames.BeginFrame({ + remoteChannel: 1, nextOutgoingId: 0, + incomingWindow: 2147483647, outgoingWindow: 2147483647, + handleMax: 4294967295 + }), + [ + function (prev) { + var rxAttach = frames.readFrame(prev[prev.length - 1]); + return new frames.AttachFrame({ + name: rxAttach.name, handle: 1, + role: constants.linkRole.receiver, + source: {}, target: {}, + initialDeliveryCount: 0 + }); + }, + + { delay: 100 }, + + // force detach from remote server, and force close of the connection + new frames.DetachFrame({ handle: 1, closed: true, error: 'internal-error' }), + new frames.CloseFrame({ + error: new AMQPError({ condition: ErrorCondition.ConnectionForced, description: 'test' }) + }) + ] + ]); + + test.client.connect(test.server.address()) + .then(function() { return test.client.createSender('testing'); }) + .then(function(sender) { + setTimeout(function() { + expect(function() { sender.send('testing'); }).to.throw(errors.NotConnectedError); + done(); + }, 200); + }); + }); }); describe('#reconnect()', function() {