Skip to content

Commit

Permalink
Add method and class ids to channel errors
Browse files Browse the repository at this point in the history
  • Loading branch information
MitMaro committed Aug 7, 2018
1 parent 120c5f9 commit 8076fa3
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 7 deletions.
4 changes: 4 additions & 0 deletions bin/generate-defs.js
Expand Up @@ -82,6 +82,8 @@ for (var i = 0, len = defs.classes.length; i < len; i++) {
methods[name] = {
id: methodId(clazz, method),
name: name,
methodId: method.id,
clazzId: clazz.id,
clazz: clazz.name,
args: method['arguments'].map(argument),
isReply: method.answer,
Expand Down Expand Up @@ -518,6 +520,8 @@ function decoderFn(method) {

function infoObj(thing) {
var info = JSON.stringify({id: thing.id,
classId: thing.clazzId,
methodId: thing.methodId,
name: thing.name,
args: thing.args});
println('var %s = module.exports.%s = %s;',
Expand Down
26 changes: 19 additions & 7 deletions lib/channel.js
Expand Up @@ -103,9 +103,10 @@ C._rpc = function(method, fields, expect, cb) {
// We have detected a problem, so it's up to us to close the
// channel
var expectedName = methodName(expect);

var e = new Error(fmt("Expected %s; got %s",
expectedName, inspect(f, false)));
self.closeWithError(fmt('Expected %s; got %s',
self.closeWithError(f.id, fmt('Expected %s; got %s',
expectedName, methodName(f.id)),
defs.constants.UNEXPECTED_FRAME, e);
return cb(e);
Expand All @@ -124,7 +125,11 @@ C._rpc = function(method, fields, expect, cb) {
? fmt("Operation failed: %s; %s",
methodName(method), closeMsg(err))
: fmt("Channel closed by server: %s", closeMsg(err));
return cb(new Error(e));
var closeFrameError = new Error(e);
closeFrameError.code = err.fields.replyCode;
closeFrameError.classId = err.fields.classId;
closeFrameError.methodId = err.fields.methodId;
return cb(closeFrameError);
}
}

Expand Down Expand Up @@ -210,10 +215,15 @@ C.closeBecause = function(reason, code, k) {
// If we close because there's been an error, we need to distinguish
// between what we tell the server (`reason`) and what we report as
// the cause in the client (`error`).
C.closeWithError = function(reason, code, error) {
C.closeWithError = function(id, reason, code, error) {
var self = this;
this.closeBecause(reason, code, function() {
error.code = code;
// content frames and consumer errors do not provide a method a class/method ID
if (id) {
error.classId = defs.info(id).classId;
error.methodId = defs.info(id).methodId;
}
self.emit('error', error);
});
};
Expand All @@ -232,15 +242,15 @@ C.acceptMessageFrame = function(f) {
}
catch (msg) {
if (typeof msg === 'string') {
this.closeWithError(msg, defs.constants.UNEXPECTED_FRAME,
this.closeWithError(f.id, msg, defs.constants.UNEXPECTED_FRAME,
new Error(msg));
}
else if (msg instanceof Error) {
this.closeWithError('Error while processing message',
this.closeWithError(f.id, 'Error while processing message',
defs.constants.INTERNAL_ERROR, msg);
}
else {
this.closeWithError('Internal error while processing message',
this.closeWithError(f.id, 'Internal error while processing message',
defs.constants.INTERNAL_ERROR,
new Error(msg.toString()));
}
Expand Down Expand Up @@ -405,6 +415,8 @@ C.accept = function(f) {

var error = new Error(emsg);
error.code = f.fields.replyCode;
error.classId = f.fields.classId;
error.methodId = f.fields.methodId;
this.emit('error', error);

var s = stackCapture(emsg);
Expand All @@ -413,7 +425,7 @@ C.accept = function(f) {

case defs.BasicFlow:
// RabbitMQ doesn't send this, it just blocks the TCP socket
return this.closeWithError("Flow not implemented",
return this.closeWithError(f.id, "Flow not implemented",
defs.constants.NOT_IMPLEMENTED,
new Error('Flow not implemented'));

Expand Down
12 changes: 12 additions & 0 deletions test/channel.js
Expand Up @@ -128,6 +128,8 @@ test("server close", channelTest(
function(ch, done) {
ch.on('error', function(error) {
assert.strictEqual(504, error.code);
assert.strictEqual(0, error.classId);
assert.strictEqual(0, error.methodId);
succeed(done)();
});
open(ch);
Expand Down Expand Up @@ -229,6 +231,8 @@ test("Bad RPC", channelTest(
var errLatch = latch(2, done);
ch.on('error', function(error) {
assert.strictEqual(505, error.code);
assert.strictEqual(60, error.classId);
assert.strictEqual(72, error.methodId);
succeed(errLatch)();
});

Expand Down Expand Up @@ -259,6 +263,8 @@ test("RPC on closed channel", channelTest(
var close = new Promise(function(resolve) {
ch.on('error', function(error) {
assert.strictEqual(504, error.code);
assert.strictEqual(0, error.classId);
assert.strictEqual(0, error.methodId);
resolve();
});
});
Expand Down Expand Up @@ -415,6 +421,8 @@ test("bad delivery", channelTest(
var errorAndClose = latch(2, done);
ch.on('error', function(error) {
assert.strictEqual(505, error.code);
assert.strictEqual(60, error.classId);
assert.strictEqual(60, error.methodId);
succeed(errorAndClose)();
});
ch.on('close', succeed(errorAndClose));
Expand Down Expand Up @@ -469,6 +477,8 @@ test("bad consumer", channelTest(
});
ch.on('error', function(error) {
assert.strictEqual(541, error.code);
assert.strictEqual(undefined, error.classId);
assert.strictEqual(undefined, error.methodId);
succeed(errorAndClose)();
});
ch.on('close', succeed(errorAndClose));
Expand All @@ -488,6 +498,8 @@ test("bad send in consumer", channelTest(
ch.on('close', succeed(errorAndClose));
ch.on('error', function(error) {
assert.strictEqual(541, error.code);
assert.strictEqual(undefined, error.classId);
assert.strictEqual(undefined, error.methodId);
succeed(errorAndClose)();
});

Expand Down

0 comments on commit 8076fa3

Please sign in to comment.