Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions consumer/config/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ export const EVENT = {
PROJECT_DRAFT_CREATED: 'project.draft-created',
PROJECT_UPDATED: 'project.updated',
PROJECT_DELETED: 'project.deleted',
CONNECT_TO_SF_FAILED: 'connect2sf.failed'
},
};
1 change: 0 additions & 1 deletion consumer/src/services/ConfigurationService.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ class ConfigurationService {
},
},
}).promise();
console.log('node env: ' + process.env.NODE_ENV);
if (!result.Items.length) {
throw new Error('Configuration for AppXpressConfig not found');
}
Expand Down
27 changes: 24 additions & 3 deletions consumer/src/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ export function initHandlers(handlers) {
* @param {String} exchangeName the exchange name
* @param {String} queue the queue name
*/
export async function consume(channel, exchangeName, queue) {
export async function consume(channel, exchangeName, queue, publishChannel) {
channel.assertExchange(exchangeName, 'topic', { durable: true });
publishChannel.assertExchange(exchangeName, 'topic', { durable: true });
channel.assertQueue(queue, { durable: true });
const bindings = _.keys(EVENT_HANDLERS);
const bindingPromises = _.map(bindings, rk =>
Expand Down Expand Up @@ -73,8 +74,22 @@ export async function consume(channel, exchangeName, queue) {
if (e.shouldAck) {
channel.ack(msg);
} else {
// acking for debugging issue on production. this would prevent log pile up
// ack the message but copy it to other queue where no consumer is listening
// we can listen to that queue on adhoc basis when we see error case like lead not created in SF
// we can use cloudamqp console to check the messages and may be manually create SF lead
// nacking here was causing flood of messages to the worker and it keep on consuming high resources
channel.ack(msg);
try {
publishChannel.publish(
exchangeName,
EVENT.ROUTING_KEY.CONNECT_TO_SF_FAILED,
new Buffer(msg.content.toString())
);
} catch(e) {
// TODO decide if we want nack the original msg here
// for now just ignoring the error in requeue
logger.logFullError(e, `Error in publising Exchange to ${exchangeName}`);
}
}
}
});
Expand All @@ -91,7 +106,13 @@ async function start() {
debug('created connection successfully with URL: ' + config.rabbitmqURL);
const channel = await connection.createConfirmChannel();
debug('Channel confirmed...');
consume(channel, config.rabbitmq.projectsExchange, config.rabbitmq.queues.project);
const publishChannel = await connection.createConfirmChannel();
consume(
channel,
config.rabbitmq.projectsExchange,
config.rabbitmq.queues.project,
publishChannel
);
} catch (e) {
debug('Unable to connect to RabbitMQ');
}
Expand Down
14 changes: 10 additions & 4 deletions consumer/test/worker.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import './setup';
describe('worker', () => {
describe('consume', () => {
const queueName = 'sample-queue';
const exchangeName = EVENT.ROUTING_KEY.PROJECT_DRAFT_CREATED//'sample-exchange';
const exchangeName = 'sample-exchange';
const validMessage = {
content: JSON.stringify({ sampleData: 'foo' }),
properties: { correlationId : 'unit-tests'},
Expand All @@ -24,6 +24,7 @@ describe('worker', () => {
let rabbitConsume;
let exchangeHandlerSpy = sinon.spy();
let fakeExchangeHandlerSpy = sinon.spy();
let channelPublishSpy = sinon.spy();

beforeEach(() => {
handler = sinon.spy();
Expand Down Expand Up @@ -58,7 +59,11 @@ describe('worker', () => {
done(e);
}
},
}, exchangeName, queueName);
}, exchangeName, queueName,
{
publish: channelPublishSpy,
assertExchange
});
}

it('should consume and ack a message successfully', (done) => {
Expand Down Expand Up @@ -91,15 +96,16 @@ describe('worker', () => {
invokeConsume(done);
});

xit('should nack if error is thrown', (done) => {
it('should ack, with message being copied to temp queue, if error is thrown', (done) => {
initHandlers({
[exchangeName] : () => {
throw new Error('foo');
}
})
rabbitConsume = async (queue, fn) => {
await fn(validMessage);
nack.should.have.been.calledWith(validMessage);
ack.should.have.been.calledWith(validMessage);
channelPublishSpy.should.have.been.calledWith(exchangeName, EVENT.ROUTING_KEY.CONNECT_TO_SF_FAILED, new Buffer(validMessage.content));
};
invokeConsume(done);
});
Expand Down