Skip to content

Commit

Permalink
fix(sender-link): throw an error if there is no active connection
Browse files Browse the repository at this point in the history
If a link is not set to reattach and the underlaying connection is
lost, the sender link should throw errors on subsequent attempts to
send.
  • Loading branch information
mbroadst committed Oct 25, 2016
1 parent 3d7e470 commit e5bac49
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 5 deletions.
7 changes: 4 additions & 3 deletions lib/amqp_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,8 @@ AMQPClient.prototype._preventReconnect = function() {

AMQPClient.prototype._shouldReconnect = function() {
if (!this._connection || !this._reconnect) return false;
if (!this._timeouts.length && !this.policy.reconnect.forever) return false;
if (!this._timeouts ||
!this._timeouts.length && !this.policy.reconnect.forever) return false;
return true;
};

Expand All @@ -407,8 +408,8 @@ AMQPClient.prototype._attemptReconnection = function() {
// promise back to sender. As such, it will signal two uncaught
// exceptions, and then dump and quit the program. Need to find a
// better way to tie the reconnect promise to the original request.
if (!self._shouldReconnect())
throw err; // rethrow
// if (!self._shouldReconnect())
// throw err; // rethrow
});
};

Expand Down
12 changes: 12 additions & 0 deletions lib/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,15 @@ errors.TransportError = function(msg) {
this.name = 'AmqpTransportError';
};
util.inherits(errors.TransportError, errors.BaseError);

/**
* Used to indicate that a link requires an active connection
*
* @extends BaseError
* @constructor
*/
errors.NotConnectedError = function(msg) {
errors.BaseError.call(this, msg);
this.name = 'AmqpNotConnectedError';
};
util.inherits(errors.NotConnectedError, errors.BaseError);
7 changes: 5 additions & 2 deletions lib/sender_link.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,11 @@ SenderLink.prototype.canSend = function() {
* @return {Promise}
*/
SenderLink.prototype.send = function(msg, options) {
if (!this.session.connection) {
throw new Error('Must connect before sending');
if (!this.session.connection || this.session.connection.connected === false) {
var state = this.state();
if (state !== 'attached' && state !== 'attaching' && state !== 'reattaching') {
throw new errors.NotConnectedError('Link requires active connection');
}
}

options = options || {};
Expand Down
41 changes: 41 additions & 0 deletions test/unit/client.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ var _ = require('lodash'),
AMQPClient = require('../../lib').Client,
MockServer = require('./mocks').Server,

errors = require('../../lib/errors'),
constants = require('../../lib/constants'),
frames = require('../../lib/frames'),

Expand Down Expand Up @@ -462,6 +463,46 @@ describe('Client', function() {
.catch(function(err) { done(); });
});
});

it('should throw an error if attempting to send with no connection', function(done) {
test.server.setResponseSequence([
constants.amqpVersion,
new frames.OpenFrame(test.client.policy.connect.options),
new frames.BeginFrame({
remoteChannel: 1, nextOutgoingId: 0,
incomingWindow: 2147483647, outgoingWindow: 2147483647,
handleMax: 4294967295
}),
[
function (prev) {
var rxAttach = frames.readFrame(prev[prev.length - 1]);
return new frames.AttachFrame({
name: rxAttach.name, handle: 1,
role: constants.linkRole.receiver,
source: {}, target: {},
initialDeliveryCount: 0
});
},

{ delay: 100 },

// force detach from remote server, and force close of the connection
new frames.DetachFrame({ handle: 1, closed: true, error: 'internal-error' }),
new frames.CloseFrame({
error: new AMQPError({ condition: ErrorCondition.ConnectionForced, description: 'test' })
})
]
]);

test.client.connect(test.server.address())
.then(function() { return test.client.createSender('testing'); })
.then(function(sender) {
setTimeout(function() {
expect(function() { sender.send('testing'); }).to.throw(errors.NotConnectedError);
done();
}, 200);
});
});
});

describe('#reconnect()', function() {
Expand Down

0 comments on commit e5bac49

Please sign in to comment.