Skip to content

Commit

Permalink
Apply non-strict error handling to custom hooks.
Browse files Browse the repository at this point in the history
Previously, using a custom message hook for listeners bypassed the
convenience error transformation logic.

Also move back to named records for requests; the name has the format
"org.apache.avro.ipc.<messageName>Request" (but isn't exposed in the
protocol's registry). This required requiring message names to be valid
Avro names.
  • Loading branch information
mtth committed May 26, 2016
1 parent d3131d4 commit 5acff52
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 25 deletions.
37 changes: 22 additions & 15 deletions lib/protocols.js
Expand Up @@ -33,8 +33,7 @@ var MAP_BYTES_TYPE = types.createType({type: 'map', values: 'bytes'}, OPTS);
var STRING_TYPE = types.createType('string', OPTS);

var HANDSHAKE_REQUEST_TYPE = types.createType({
namespace: 'org.apache.avro.ipc',
name: 'HandshakeRequest',
name: 'org.apache.avro.ipc.HandshakeRequest',
type: 'record',
fields: [
{name: 'clientHash', type: {name: 'MD5', type: 'fixed', size: 16}},
Expand All @@ -45,8 +44,7 @@ var HANDSHAKE_REQUEST_TYPE = types.createType({
}, OPTS);

var HANDSHAKE_RESPONSE_TYPE = types.createType({
namespace: 'org.apache.avro.ipc',
name: 'HandshakeResponse',
name: 'org.apache.avro.ipc.HandshakeResponse',
type: 'record',
fields: [
{
Expand Down Expand Up @@ -292,7 +290,7 @@ Protocol.prototype.getSchema = function (opts) {
var namedTypes = [];
Object.keys(this._types).forEach(function (name) {
var type = this._types[name];
if (type.getName()) {
if (type.getName()) { // Skip primitives.
namedTypes.push(type);
}
}, this);
Expand Down Expand Up @@ -740,14 +738,6 @@ MessageListener.prototype._receive = function (reqBuf, adapter, cb) {
done(null, null);
} else {
handler.call(ptcl, reqEnv.request, this, function (err, res) {
var errType = serverMsg.getErrorType();
if (!self._strict) {
if (isError(err)) {
err = errType.clone(err.message, {wrapUnions: true});
} else if (err === null) {
err = undefined;
}
}
done(null, {error: err, response: res});
});
}
Expand All @@ -768,15 +758,23 @@ MessageListener.prototype._receive = function (reqBuf, adapter, cb) {
var resBuf;
if (!err) {
var errType = serverMsg.getErrorType();
var noError = resEnv.error === undefined;
var resErr = resEnv.error;
if (!self._strict) {
if (isError(resErr)) {
resErr = errType.clone(resErr.message, {wrapUnions: true});
} else if (resErr === null) {
resErr = undefined;
}
}
var noError = resErr === undefined;
try {
var header = MAP_BYTES_TYPE.toBuffer(resEnv.header || {});
resBuf = Buffer.concat([
header,
BOOLEAN_TYPE.toBuffer(!noError),
noError ?
serverMsg.getResponseType().toBuffer(resEnv.response) :
errType.toBuffer(resEnv.error)
errType.toBuffer(resErr)
]);
} catch (cause) {
err = wrapError('invalid response', cause);
Expand Down Expand Up @@ -973,13 +971,22 @@ util.inherits(StatefulListener, MessageListener);
*/
function Message(name, attrs, opts) {
opts = opts || {};

if (!types.isValidName(name)) {
throw new Error(f('invalid message name: %s', name));
}
this._name = name;

var recordName = f('org.apache.avro.ipc.%sRequest', name);
this._requestType = types.createType({
name: recordName,
type: 'record',
namespace: opts.namespace,
fields: attrs.request
}, opts);
// We remove the record from the registry to prevent it from being exported
// in the protocol's schema.
delete opts.registry[recordName];

if (!attrs.response) {
throw new Error('missing response');
Expand Down
20 changes: 10 additions & 10 deletions test/test_protocols.js
Expand Up @@ -78,17 +78,17 @@ suite('protocols', function () {
});

test('special character in name', function () {
var ptcl = createProtocol({
protocol: 'Ping',
messages: {
'ping/1': {
request: [],
response: 'string'
assert.throws(function () {
createProtocol({
protocol: 'Ping',
messages: {
'ping/1': {
request: [],
response: 'string'
}
}
}
});
var message = ptcl.getMessage('ping/1');
assert.equal(message.getResponseType().getName(true), 'string');
});
}, /invalid message name/);
});

test('get messages', function () {
Expand Down
11 changes: 11 additions & 0 deletions test/test_types.js
Expand Up @@ -2774,6 +2774,17 @@ suite('types', function () {
assert.equal(type._fields[1]._type._name, 'Id');
});

test('namespace reset with qualified name', function () {
var type = createType({
type: 'record',
name: 'earth.Human',
namespace: '',
fields: [{name: 'id', type: {type: 'fixed', name: 'Id', size: 2}}]
});
assert.equal(type._name, 'earth.Human');
assert.equal(type._fields[0]._type._name, 'Id');
});

test('absolute reference', function () {
var type = createType({
type: 'record',
Expand Down

0 comments on commit 5acff52

Please sign in to comment.