Skip to content

Commit

Permalink
Improve one-way message handling.
Browse files Browse the repository at this point in the history
One-way message behavior between stateful and stateless transports is
now consistent: listeners are never passed a callback (it was odd to
have the type of transport influence this, breaking separation of
concerns even).

As a side-effect, this change also improves default behavior when no
callback is passed to emitters (even for non-one-way messages). Errors
are now emitted on the corresponding message emitter rather than
directly thrown (where they couldn't be caught).

Also rename "unsupported message" to more explicit "unhandled message"
when a message is sent to a protocol which hasn't implemented a handler
for it.
  • Loading branch information
mtth committed Jan 29, 2016
1 parent 163b9d7 commit 8e91111
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 50 deletions.
47 changes: 19 additions & 28 deletions lib/protocols.js
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,11 @@ function Protocol(name, messages, types, ptcl) {
this._handlers = Object.create(ptcl ? ptcl._handlers : null);
this._onListenerCall = function (name, req, cb) {
var handler = self._handlers[name];
if (!handler) {
cb(new Error(f('unsupported message: %s', name)));
} else {
if (handler) {
handler.call(self, req, this, cb);
} else if (cb) {
// This (listening) protocol hasn't implemented this message.
cb(new Error(f('unhandled message: %s', name)));
}
};

Expand All @@ -150,7 +151,7 @@ Protocol.prototype.subprotocol = function () {
};

Protocol.prototype.emit = function (name, req, emitter, cb) {
cb = cb || throwError; // To provide a more helpful error message.
cb = cb || function (err) { emitter.emit('error', err); };

if (
!(emitter instanceof MessageEmitter) ||
Expand Down Expand Up @@ -425,10 +426,11 @@ StatelessEmitter.prototype._emit = function (message, req, cb) {
}

readable
.pipe(new MessageDecoder(true))
.pipe(new MessageDecoder(!message.oneWay))
.on('error', done)
// This will happen when the readable stream ends before a single
// message has been decoded (e.g. on invalid response).
// This will happen when the message isn't one way and the readable
// stream ends before a single message has been decoded (e.g. on
// invalid response).
.on('data', function (buf) {
readable.unpipe(this); // Single message per readable stream.
if (self._interrupted) {
Expand All @@ -449,7 +451,9 @@ StatelessEmitter.prototype._emit = function (message, req, cb) {
done(err);
return;
}
done.apply(undefined, args);
if (!message.oneWay) {
done.apply(undefined, args);
}
});
});

Expand Down Expand Up @@ -835,7 +839,12 @@ function StatelessListener(ptcl, readableFactory, opts) {
return;
}

self.emit('_call', name, req, onResponse);
if (self._message.oneWay) {
self.emit('_call', name, req);
onResponse(null, null);
} else {
self.emit('_call', name, req, onResponse);
}
}

function onResponse(err, res) {
Expand Down Expand Up @@ -1149,23 +1158,6 @@ function safeWrite(tap, type, val) {
}
}

/**
* Default callback when not provided.
*
*/
function throwError(err) {
if (!err) {
return;
}
if (typeof err == 'object' && err.string) {
err = err.string;
}
if (typeof err == 'string') {
err = new Error(err);
}
throw err;
}

/**
* Convert an error message into a format suitable for RPC.
*
Expand Down Expand Up @@ -1264,6 +1256,5 @@ module.exports = {
streams: {
MessageDecoder: MessageDecoder,
MessageEncoder: MessageEncoder
},
throwError: throwError
}
};
47 changes: 25 additions & 22 deletions test/test_protocols.js
Original file line number Diff line number Diff line change
Expand Up @@ -1234,15 +1234,29 @@ suite('protocols', function () {
messages: {ping: {request: [], response: 'null'}}
});
setupFn(ptcl1, ptcl2, function (ee) {
ee.on('error', function () {}); // For stateful protocols.
ptcl1.emit('ping', {}, ee, function (err) {
assert(err);
ee.on('error', function (err) {
assert(/incompatible/.test(err.message));
done();
});
ptcl1.emit('ping', {}, ee);
}
);
});

test('one way message', function (done) {
var ptcl = createProtocol({
protocol: 'ptcl',
messages: {ping: {request: [], response: 'null', 'one-way': true}}
});
setupFn(ptcl, ptcl, function (ee) {
ptcl.on('ping', function (req, ee, cb) {
assert.strictEqual(cb, undefined);
done();
});
ptcl.emit('ping', {}, ee);
});
});

test('unknown message', function (done) {
var ptcl = createProtocol({protocol: 'Empty'});
setupFn(ptcl, ptcl, function (ee) {
Expand All @@ -1253,20 +1267,24 @@ suite('protocols', function () {
});
});

test('unsupported message', function (done) {
test('unhandled message', function (done) {
var ptcl = createProtocol({
protocol: 'Echo',
messages: {
echo: {
request: [{name: 'id', type: 'string'}],
response: 'string'
}
},
ping: {request: [], response: 'null', 'one-way': true}
}
});
setupFn(ptcl, ptcl, function (ee) {
ptcl.emit('echo', {id: ''}, ee, function (err) {
assert(/unsupported/.test(err.string));
done();
assert(/unhandled/.test(err.string));
ptcl.emit('ping', {}, ee);
// By definition of one-way, there is no reliable way of calling
// done exactly when ping is done, so we add a small timeout.
setTimeout(done, 100);
});
});
});
Expand Down Expand Up @@ -1334,21 +1352,6 @@ suite('protocols', function () {

});

test('throw error', function () {
assert(!tryCatch(null));
assert.equal(tryCatch(new Error('hi')), 'hi');
assert.equal(tryCatch('hi'), 'hi');
assert.equal(tryCatch({string: 'hi'}), 'hi');

function tryCatch(err) {
try {
protocols.throwError(err);
} catch (err_) {
return err_.message;
}
}
});

});

// Helpers.
Expand Down

0 comments on commit 8e91111

Please sign in to comment.