Skip to content

Commit

Permalink
IN-4745: emit error if message does not corresponds to schema (#213)
Browse files Browse the repository at this point in the history
* IN-4745: emit error if message does not corresponds to schema

* IN-4745: changed nodejs versions for build
  • Loading branch information
OleksiiSliusarenko authored and sergiv83 committed Apr 26, 2019
1 parent fa7943d commit 6f4ff53
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 9 deletions.
6 changes: 1 addition & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
language: node_js
node_js:
- '8'
- '7'
- '6'
- '5'
- '4'
- '0.12'
- '0.11'
- '0.10'
- iojs
after_success:
- npm run coveralls
Expand Down
1 change: 1 addition & 0 deletions lib/channelManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ channelManager.create = function() {
}

function onValidationError(err, message) {
channel.emit('error', err);
channel.rejectMessage(message);
}

Expand Down
2 changes: 2 additions & 0 deletions lib/validateWithSchema.js
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,5 @@ function _errorsTrace(errors) {
});
return str;
}

module.exports.SchemaValidationError = SchemaValidationError;
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "msb",
"version": "1.2.1",
"version": "1.2.2",
"description": "A framework to simplify the implementation of an event-bus oriented microservices architecture",
"license": "MIT",
"main": "index.js",
Expand Down
26 changes: 26 additions & 0 deletions test/channelManager.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,23 @@ var msb = require('..');
var config = require('../lib/config');
var createChannelManager = require('../lib/channelManager').create;
var messageFactory = msb.messageFactory;
var SchemaValidationError = require('../lib/validateWithSchema').SchemaValidationError;

describe('channelManager', function() {
var adapter;
var channelManager;

before(function(done) {
process.env.NODE_ENV = 'test';
adapter = amqp.create();
done();
});

after(function(done) {
delete process.env.NODE_ENV;
done();
});

beforeEach(function(done) {
channelManager = createChannelManager();

Expand Down Expand Up @@ -212,6 +219,25 @@ describe('channelManager', function() {
done();
});

it('will emit `error` event if message does not corresponds to schema', function(done) {
simple.mock(config, 'schema', require('../schema'));
channelManager.configure(config);

var mockSubscriber = {};
simple.mock(mockSubscriber, 'on');
simple.mock(channelManager, 'createRawConsumer').returnWith(mockSubscriber);

var consumer = channelManager.findOrCreateConsumer('c:errorConsumer');

consumer.on('error', function(err) {
expect(err).to.be.an.instanceof(SchemaValidationError);
done();
});

var onMessageFn = mockSubscriber.on.calls[0].args[1];
onMessageFn({});
});

it('will listen for messages and emit a new message event', function(done) {
var mockSubscriber = {};
simple.mock(mockSubscriber, 'on');
Expand Down
15 changes: 12 additions & 3 deletions test/integration/integration.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,10 @@ describe('AMQP Integration', function() {
'rejectMessage');

var onMessageMethod = simple.mock();
var onErrorMethod = simple.mock();

consumer.on('message', onMessageMethod);
consumer.on('error', onErrorMethod);

publisher.publish([
fixtures.consumer_basic,
Expand All @@ -111,7 +113,8 @@ describe('AMQP Integration', function() {
if (err) return done(err);

setTimeout(function() {
expect(onMessageMethod.callCount).equals(2)
expect(onErrorMethod.callCount).equals(1);
expect(onMessageMethod.callCount).equals(2);
expect(onMessageMethod.calls[0].arg).deep.equals(fixtures.consumer_basic);
expect(onMessageMethod.calls[1].arg).deep.equals(fixtures.consumer_basic);
expect(_onMessageMethod.callCount).equals(3);
Expand All @@ -133,8 +136,10 @@ describe('AMQP Integration', function() {
'rejectMessage');

var onMessageMethod = simple.mock();
var onErrorMethod = simple.mock();

consumer.on('message', onMessageMethod);
consumer.on('error', onErrorMethod);

publisher.publish([
fixtures.consumer_basic,
Expand All @@ -144,7 +149,8 @@ describe('AMQP Integration', function() {
if (err) return done(err);

setTimeout(function() {
expect(onMessageMethod.callCount).equals(2)
expect(onErrorMethod.callCount).equals(1);
expect(onMessageMethod.callCount).equals(2);
expect(onMessageMethod.calls[0].arg).deep.equals(fixtures.consumer_basic);
expect(onMessageMethod.calls[1].arg).deep.equals(fixtures.consumer_basic);
expect(_onMessageMethod.callCount).equals(3);
Expand All @@ -166,8 +172,10 @@ describe('AMQP Integration', function() {
'rejectMessage');

var onMessageMethod = simple.mock();
var onErrorMethod = simple.mock();

consumer.on('message', onMessageMethod);
consumer.on('error', onErrorMethod);

publisher.publish([
fixtures.consumer_basic,
Expand All @@ -177,7 +185,8 @@ describe('AMQP Integration', function() {
if (err) return done(err);

setTimeout(function() {
expect(onMessageMethod.callCount).equals(2)
expect(onErrorMethod.callCount).equals(1);
expect(onMessageMethod.callCount).equals(2);
expect(onMessageMethod.calls[0].arg).deep.equals(fixtures.consumer_basic);
expect(onMessageMethod.calls[1].arg).deep.equals(fixtures.consumer_basic);
expect(rejectMethod.callCount).equals(1);
Expand Down

0 comments on commit 6f4ff53

Please sign in to comment.