From 8b28f0fb404fed4a5e1f9c4041a0ccfc46c4c829 Mon Sep 17 00:00:00 2001 From: vikasrohit Date: Mon, 17 Apr 2017 16:30:01 +0530 Subject: [PATCH 01/16] Updated README Updated readme for deployment checklist and error handling. --- consumer/README.md | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/consumer/README.md b/consumer/README.md index 0a93f78..7bacafe 100644 --- a/consumer/README.md +++ b/consumer/README.md @@ -20,7 +20,7 @@ Env variable: `LOG_LEVEL` - **rabbitmqURL** The rabbitmq URL. Create a free account here https://www.cloudamqp.com/ and create a new instance in any region. -You can get URL by clicking on queue details button. +You can get URL by clicking on queue details button. For deployment in AWS, please make sure that this instance is launched in the VPC which target AWS server can communicate with. Env variable: `RABBITMQ_URL` - **ownerId** @@ -86,7 +86,7 @@ You can use the existing cert.pem from `config` directory. Or generate a new certificate and key using a command: `openssl req -newkey rsa:2048 -new -nodes -x509 -days 3650 -keyout key.pem -out cert.pem` -Private key of your certificate is read from environment variable, instead of reading from the config directory. So please make sure you replace all new line characters with `\n` before setting it in the environment variable. Application would add newline characters back to the key when using it to sign the requests. +**Private key of your certificate is read from environment variable, instead of reading from the config directory. So please make sure you replace all new line characters with `\n` before setting it in the environment variable. Application would add newline characters back to the key when using it to sign the requests.** ![Alt text](https://monosnap.com/file/tT9ZZXUH1aa1j7cFzYxaV9RjmHWCum.png) Click Save @@ -224,11 +224,19 @@ Check the Lead details in Saleforce ![Alt text](https://monosnap.com/file/PdMF97k18cBGeZjR9qOkkBe1AjYw2n.png) Lead is removed from the campaign +## Deployment Checklist +1. AppXpressConfig table exists in dynamodb with dripcampaignId +2. Make sure configured rabbitmq exchange and queue are created appropriately in cloumamqp +3. There should be proper mapping between exchange and queue specified in the conifguration +4. Grant permission, with user conifgured, for the app once using url https://login.salesforce.com/services/oauth2/authorize?client_id=[clientId]&redirect_uri=https://login.salesforce.com&response_type=code -Notes on Error Handling. -UnprocessableError is thrown if operation cannot be completed. +## CI +* All changes into dev will be built and deployed to AWS beanstalk environment `tc-connect2sf-dev` +* All changes into master will be built and deployed to AWS beanstalk environment `tc-connect2sf-prod` + +## Notes on Error Handling. +`UnprocessableError` is thrown if operation cannot be completed. For example: duplicated project id added to the queue, Lead cannot be found etc. In such situation, the message from rabbitmq will be marked as ACK (removed). -If we won't remove it from queue, the message will be stuck forever. - - +If we won't remove it from queue, the message will be stuck forever. +For any other type of error the message from the rabbitmq will me marked as ACK as well, however, it would requeued into another queue for later inspection. It right now publishes the message content to the same rabbitmq exchange (configured as mentioned in Configuration section) with routing key being `connect2sf.failed`. So, we have to map the exchange and routing key comibation to a queue to which no consumer is listeting e.g. `tc-connect2sf.failed` is used in dev environment. Now we can see messages, via rabbitmq manager UI, in this queue to check if any of the messages failed and what was id of the project which failed. We can either remove those messages from the queue, if we are going to add those leads manually in saleforce or move them again to the original queue after fixing the deployed environment. From 54645e488bdfcb5989472fea7bc3434c72ac0a11 Mon Sep 17 00:00:00 2001 From: Vikas Agarwal Date: Tue, 18 Apr 2017 12:04:34 +0530 Subject: [PATCH 02/16] Instead of publishing failed messages to a hardcoded routing key, now it publishes the failed messages to a routing key which is formed by adding a suffix '.failed' to the original routing key of the message. --- consumer/config/constants.js | 2 +- consumer/src/worker.js | 2 +- consumer/test/worker.spec.js | 3 ++- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/consumer/config/constants.js b/consumer/config/constants.js index a386a32..1d4c5af 100644 --- a/consumer/config/constants.js +++ b/consumer/config/constants.js @@ -4,6 +4,6 @@ export const EVENT = { PROJECT_DRAFT_CREATED: 'project.draft-created', PROJECT_UPDATED: 'project.updated', PROJECT_DELETED: 'project.deleted', - CONNECT_TO_SF_FAILED: 'connect2sf.failed' + FAILED_SUFFIX: '.failed' }, }; diff --git a/consumer/src/worker.js b/consumer/src/worker.js index 7254c35..035b584 100644 --- a/consumer/src/worker.js +++ b/consumer/src/worker.js @@ -82,7 +82,7 @@ export async function consume(channel, exchangeName, queue, publishChannel) { try { publishChannel.publish( exchangeName, - EVENT.ROUTING_KEY.CONNECT_TO_SF_FAILED, + key + EVENT.ROUTING_KEY.FAILED_SUFFIX, new Buffer(msg.content.toString()) ); } catch(e) { diff --git a/consumer/test/worker.spec.js b/consumer/test/worker.spec.js index 02b3102..78c8551 100644 --- a/consumer/test/worker.spec.js +++ b/consumer/test/worker.spec.js @@ -105,7 +105,8 @@ describe('worker', () => { rabbitConsume = async (queue, fn) => { await fn(validMessage); ack.should.have.been.calledWith(validMessage); - channelPublishSpy.should.have.been.calledWith(exchangeName, EVENT.ROUTING_KEY.CONNECT_TO_SF_FAILED, new Buffer(validMessage.content)); + const failedRoutingKey = validMessage.fields.routingKey + EVENT.ROUTING_KEY.FAILED_SUFFIX; + channelPublishSpy.should.have.been.calledWith(exchangeName, failedRoutingKey, new Buffer(validMessage.content)); }; invokeConsume(done); }); From b23aa9ec613c9d3c2b6b5e18c35c1a1f3b93462b Mon Sep 17 00:00:00 2001 From: Vikas Agarwal Date: Tue, 18 Apr 2017 12:55:55 +0530 Subject: [PATCH 03/16] Using separate exchange for failed messages so that other consumers don't get duplicate messages. --- consumer/.ebextensions/01-environment-variables.config | 3 +++ consumer/config/custom-environment-variables.json | 1 + consumer/config/test.json | 1 + consumer/src/worker.js | 2 +- consumer/test/worker.spec.js | 4 +++- 5 files changed, 9 insertions(+), 2 deletions(-) diff --git a/consumer/.ebextensions/01-environment-variables.config b/consumer/.ebextensions/01-environment-variables.config index aa74b44..0332362 100644 --- a/consumer/.ebextensions/01-environment-variables.config +++ b/consumer/.ebextensions/01-environment-variables.config @@ -26,6 +26,9 @@ option_settings: - namespace: aws:elasticbeanstalk:application:environment option_name: RABBITMQ_PROJECTS_EXCHANGE value: dev.projects + - namespace: aws:elasticbeanstalk:application:environment + option_name: RABBITMQ_CONNECT2SF_EXCHANGE + value: dev.tc-connect2sf - namespace: aws:elasticbeanstalk:application:environment option_name: QUEUE_PROJECTS value: dev.project.service diff --git a/consumer/config/custom-environment-variables.json b/consumer/config/custom-environment-variables.json index c378910..480bbd1 100644 --- a/consumer/config/custom-environment-variables.json +++ b/consumer/config/custom-environment-variables.json @@ -21,6 +21,7 @@ }, "rabbitmq" : { "projectsExchange" : "RABBITMQ_PROJECTS_EXCHANGE", + "connect2sfExchange" : "RABBITMQ_CONNECT2SF_EXCHANGE", "queues": { "project": "QUEUE_PROJECTS" } diff --git a/consumer/config/test.json b/consumer/config/test.json index 1ed3699..3e44594 100644 --- a/consumer/config/test.json +++ b/consumer/config/test.json @@ -20,6 +20,7 @@ }, "rabbitmq" : { "projectsExchange" : "dev.projects", + "connect2sfExchange": "dev.tc-connect2sf", "queues": { "project": "dev.project.service" } diff --git a/consumer/src/worker.js b/consumer/src/worker.js index 035b584..523e351 100644 --- a/consumer/src/worker.js +++ b/consumer/src/worker.js @@ -81,7 +81,7 @@ export async function consume(channel, exchangeName, queue, publishChannel) { channel.ack(msg); try { publishChannel.publish( - exchangeName, + config.rabbitmq.connect2sfExchange, key + EVENT.ROUTING_KEY.FAILED_SUFFIX, new Buffer(msg.content.toString()) ); diff --git a/consumer/test/worker.spec.js b/consumer/test/worker.spec.js index 78c8551..0141b92 100644 --- a/consumer/test/worker.spec.js +++ b/consumer/test/worker.spec.js @@ -4,6 +4,7 @@ import {consume, initHandlers} from '../src/worker'; import {UnprocessableError} from '../src/common/errors'; import { EVENT } from '../config/constants'; +import config from 'config'; import './setup'; describe('worker', () => { @@ -105,8 +106,9 @@ describe('worker', () => { rabbitConsume = async (queue, fn) => { await fn(validMessage); ack.should.have.been.calledWith(validMessage); + const connect2sfExchange = config.rabbitmq.connect2sfExchange; const failedRoutingKey = validMessage.fields.routingKey + EVENT.ROUTING_KEY.FAILED_SUFFIX; - channelPublishSpy.should.have.been.calledWith(exchangeName, failedRoutingKey, new Buffer(validMessage.content)); + channelPublishSpy.should.have.been.calledWith(connect2sfExchange, failedRoutingKey, new Buffer(validMessage.content)); }; invokeConsume(done); }); From 6deb8ccf5acccdee0b193f4cc4d3a770f46c6527 Mon Sep 17 00:00:00 2001 From: Vikas Agarwal Date: Tue, 18 Apr 2017 13:26:18 +0530 Subject: [PATCH 04/16] Separate queue as well for consuming failed messages --- .../.ebextensions/01-environment-variables.config | 3 +++ consumer/config/custom-environment-variables.json | 3 ++- consumer/config/test.json | 3 ++- consumer/src/worker.js | 11 ++++++++++- 4 files changed, 17 insertions(+), 3 deletions(-) diff --git a/consumer/.ebextensions/01-environment-variables.config b/consumer/.ebextensions/01-environment-variables.config index 0332362..c6f812f 100644 --- a/consumer/.ebextensions/01-environment-variables.config +++ b/consumer/.ebextensions/01-environment-variables.config @@ -32,6 +32,9 @@ option_settings: - namespace: aws:elasticbeanstalk:application:environment option_name: QUEUE_PROJECTS value: dev.project.service + - namespace: aws:elasticbeanstalk:application:environment + option_name: QUEUE_CONNECT2SF + value: dev.tc-connect2sf - namespace: aws:elasticbeanstalk:application:environment option_name: IDENTITY_SERVICE_URL value: TBD diff --git a/consumer/config/custom-environment-variables.json b/consumer/config/custom-environment-variables.json index 480bbd1..ce0752e 100644 --- a/consumer/config/custom-environment-variables.json +++ b/consumer/config/custom-environment-variables.json @@ -23,7 +23,8 @@ "projectsExchange" : "RABBITMQ_PROJECTS_EXCHANGE", "connect2sfExchange" : "RABBITMQ_CONNECT2SF_EXCHANGE", "queues": { - "project": "QUEUE_PROJECTS" + "project": "QUEUE_PROJECTS", + "connect2sf": "QUEUE_CONNECT2SF" } } } diff --git a/consumer/config/test.json b/consumer/config/test.json index 3e44594..23b5772 100644 --- a/consumer/config/test.json +++ b/consumer/config/test.json @@ -22,7 +22,8 @@ "projectsExchange" : "dev.projects", "connect2sfExchange": "dev.tc-connect2sf", "queues": { - "project": "dev.project.service" + "project": "dev.project.service", + "connect2sf": "dev.tc-connect2sf" } } } \ No newline at end of file diff --git a/consumer/src/worker.js b/consumer/src/worker.js index 523e351..23d2395 100644 --- a/consumer/src/worker.js +++ b/consumer/src/worker.js @@ -105,14 +105,23 @@ async function start() { connection = await amqp.connect(config.rabbitmqURL); debug('created connection successfully with URL: ' + config.rabbitmqURL); const channel = await connection.createConfirmChannel(); - debug('Channel confirmed...'); + debug('Channel created for projects exchange ...'); const publishChannel = await connection.createConfirmChannel(); + debug('Channel created for publishing failed messages ...'); consume( channel, config.rabbitmq.projectsExchange, config.rabbitmq.queues.project, publishChannel ); + const connect2sfChannel = await connection.createConfirmChannel(); + debug('Channel created for consuming failed messages ...'); + consume( + connect2sfChannel, + config.rabbitmq.connect2sfExchange, + config.rabbitmq.queues.connect2sf, + publishChannel + ); } catch (e) { debug('Unable to connect to RabbitMQ'); } From ef166eba71375055a9599e8f95f9bcd79fac04af Mon Sep 17 00:00:00 2001 From: Vikas Agarwal Date: Tue, 18 Apr 2017 14:32:37 +0530 Subject: [PATCH 05/16] updated default values for connect2sf specific queue and exchange --- consumer/.ebextensions/01-environment-variables.config | 4 ++-- consumer/config/test.json | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/consumer/.ebextensions/01-environment-variables.config b/consumer/.ebextensions/01-environment-variables.config index c6f812f..610ec7c 100644 --- a/consumer/.ebextensions/01-environment-variables.config +++ b/consumer/.ebextensions/01-environment-variables.config @@ -28,13 +28,13 @@ option_settings: value: dev.projects - namespace: aws:elasticbeanstalk:application:environment option_name: RABBITMQ_CONNECT2SF_EXCHANGE - value: dev.tc-connect2sf + value: dev.tc.connect2sf - namespace: aws:elasticbeanstalk:application:environment option_name: QUEUE_PROJECTS value: dev.project.service - namespace: aws:elasticbeanstalk:application:environment option_name: QUEUE_CONNECT2SF - value: dev.tc-connect2sf + value: dev.tc.connect2sf.exclusive - namespace: aws:elasticbeanstalk:application:environment option_name: IDENTITY_SERVICE_URL value: TBD diff --git a/consumer/config/test.json b/consumer/config/test.json index 23b5772..b406cf9 100644 --- a/consumer/config/test.json +++ b/consumer/config/test.json @@ -20,10 +20,10 @@ }, "rabbitmq" : { "projectsExchange" : "dev.projects", - "connect2sfExchange": "dev.tc-connect2sf", + "connect2sfExchange": "dev.tc.connect2sf", "queues": { "project": "dev.project.service", - "connect2sf": "dev.tc-connect2sf" + "connect2sf": "dev.tc.connect2sf.exclusive" } } } \ No newline at end of file From 0ffa0c0bff110e569a2a9a60e67377dc47f9a61c Mon Sep 17 00:00:00 2001 From: Vikas Agarwal Date: Wed, 19 Apr 2017 14:26:36 +0530 Subject: [PATCH 06/16] Added scheduler worker for processing messages from failed messages queue. Added cron in docker container Added a sample cron job.. need to test --- consumer/Dockerfile | 8 +- consumer/config/scheduler-cron | 1 + consumer/src/scheduled-worker.js | 140 +++++++++++++++++++++++ consumer/src/services/ConsumerService.js | 115 +++++++++++-------- consumer/src/worker.js | 8 -- consumer/test/ConsumerService.spec.js | 12 +- consumer/test/scheduled-worker.spec.js | 134 ++++++++++++++++++++++ consumer/test/worker.spec.js | 3 +- 8 files changed, 356 insertions(+), 65 deletions(-) create mode 100644 consumer/config/scheduler-cron create mode 100644 consumer/src/scheduled-worker.js create mode 100644 consumer/test/scheduled-worker.spec.js diff --git a/consumer/Dockerfile b/consumer/Dockerfile index 0823b47..280bdf7 100644 --- a/consumer/Dockerfile +++ b/consumer/Dockerfile @@ -17,8 +17,10 @@ RUN npm install RUN npm install -g forever -EXPOSE 80 +RUN crontab config/scheduler-cron + +RUN cron -CMD forever -c "npm start" --uid "consumer" . +EXPOSE 80 -#CMD npm start \ No newline at end of file +CMD forever -c "npm start" --uid "consumer" . \ No newline at end of file diff --git a/consumer/config/scheduler-cron b/consumer/config/scheduler-cron new file mode 100644 index 0000000..7afbaa6 --- /dev/null +++ b/consumer/config/scheduler-cron @@ -0,0 +1 @@ +*/5 * * * * babel-node ../src/scheduled-worker.js \ No newline at end of file diff --git a/consumer/src/scheduled-worker.js b/consumer/src/scheduled-worker.js new file mode 100644 index 0000000..295ef6f --- /dev/null +++ b/consumer/src/scheduled-worker.js @@ -0,0 +1,140 @@ +/** + * The main app entry + */ + +import config from 'config'; +import amqp from 'amqplib'; +import _ from 'lodash'; +import logger from './common/logger'; +import ConsumerService from './services/ConsumerService'; +import { EVENT } from '../config/constants'; + +const debug = require('debug')('app:worker'); + +let connection; +process.once('SIGINT', () => { + close(); +}); + +let EVENT_HANDLERS = { + [EVENT.ROUTING_KEY.PROJECT_DRAFT_CREATED]: ConsumerService.processProjectCreated + // [EVENT.ROUTING_KEY.PROJECT_UPDATED]: ConsumerService.processProjectUpdated +} + +function close() { + try { + connection.close(); + } catch (ignore) { // eslint-ignore-line + } + process.exit(); +} + +export function initHandlers(handlers) { + EVENT_HANDLERS = handlers; +} + +/** + * Processes the given message and acks/nacks the channel + * @param {Object} channel the target channel + * @param {Object} msg the message to be processed + */ +export function processMessage(channel, msg) { + return new Promise((resolve, reject) => { + if (!msg) { + reject(new Error('Empty message. Ignoring')); + } + debug(`Consuming message in \n${msg.content}`); + const key = _.get(msg, 'fields.routingKey'); + debug('Received Message', key, msg.fields); + + let handler; + let data; + try { + handler = EVENT_HANDLERS[key]; + if (!_.isFunction(handler)) { + logger.error(`Unknown message type: ${key}, NACKing... `); + channel.nack(msg, false, false) + } + data = JSON.parse(msg.content.toString()); + } catch (ignore) { + logger.info(ignore); + logger.error('Invalid message. Ignoring'); + channel.ack(msg); + reject('Invalid message. Ignoring'); + } + return handler(logger, data).then(() => { + channel.ack(msg); + resolve(); + }) + .catch((e) => { + logger.logFullError(e, `Error processing message`); + if (e.shouldAck) { + channel.ack(msg); + } else { + // NACK and requeue (nack requeues by default) the message for next turn + channel.nack(msg); + } + }); + }) +} + +function assertExchangeQueues(channel, exchangeName, queue) { + channel.assertExchange(exchangeName, 'topic', { durable: true }); + channel.assertQueue(queue, { durable: true }); + const bindings = _.keys(EVENT_HANDLERS); + const bindingPromises = _.map(bindings, rk => + channel.bindQueue(queue, exchangeName, rk)); + debug('binding queue ' + queue + ' to exchange: ' + exchangeName); + return Promise.all(bindingPromises); +} + +/** + * Start the worker + */ +async function start() { + try { + console.log(config.rabbitmqURL); + connection = await amqp.connect(config.rabbitmqURL); + debug('created connection successfully with URL: ' + config.rabbitmqURL); + const connect2sfChannel = await connection.createConfirmChannel(); + debug('Channel created for consuming failed messages ...'); + assertExchangeQueues( + connect2sfChannel, + config.rabbitmq.connect2sfExchange, + config.rabbitmq.queues.connect2sf + ).then(() => { + debug('Asserted all required exchanges and queues'); + let counter = 0; + [1,2,3,4,5,6,7,8,9,10].forEach(() => { + return connect2sfChannel.get(config.rabbitmq.queues.connect2sf).then((msg) => { + if (msg) { + processMessage( + connect2sfChannel, + msg + ).then((responses) => { + counter++; + debug('Processed messages = ' + counter); + if (counter >= 10) { + close(); + } + }).catch((e) => { + counter++; + debug('Processed messages = ' + counter); + logger.logFullError(e, `Unable to process one of the messages`); + if (counter >= 10) { + close(); + } + }) + } + }) + }) + }) + + } catch (e) { + logger.logFullError(e, `Unable to connect to RabbitMQ`); + } +} + +if (!module.parent) { + start(); +} diff --git a/consumer/src/services/ConsumerService.js b/consumer/src/services/ConsumerService.js index bf79216..1b76d67 100644 --- a/consumer/src/services/ConsumerService.js +++ b/consumer/src/services/ConsumerService.js @@ -41,46 +41,44 @@ class ConsumerService { * @param {Object} projectEvent the project event */ @logAndValidate(['logger','project'], projectCreatedSchema) - //@logAndValidate(['projectEvent'], {projectEvent: projectEventSchema}) - //async processProjectCreated(projectEvent) { - async processProjectCreated(logger, project) { + processProjectCreated(logger, project) { const member = _.find(project.members, {role: memberRole, isPrimary: true}); if (!member) { throw new UnprocessableError('Cannot find primary customer'); } - const [ - campaignId, - user, - {accessToken, instanceUrl}, - ] = await Promise.all([ + return Promise.all([ ConfigurationService.getSalesforceCampaignId(), IdentityService.getUser(member.userId), SalesforceService.authenticate(), - ]); - - const lead = { - FirstName: user.firstName, - LastName: user.lastName, - Email: user.email, - LeadSource: leadSource, - Company: company, - OwnerId: config.ownerId, - TC_Connect_Project_Id__c: project.id, - }; - let leadId; - try { - leadId = await SalesforceService.createObject('Lead', lead, accessToken, instanceUrl); - } catch (e) { - if (e.response && e.response.text && duplicateRecordRegex.test(e.response.text)) { - throw new UnprocessableError(`Lead already existing for project ${project.id}`); - } - throw e; - } - const campaignMember = { - LeadId: leadId, - CampaignId: campaignId, - }; - await SalesforceService.createObject('CampaignMember', campaignMember, accessToken, instanceUrl); + ]).then((responses) => { + const campaignId = responses[0]; + const user = responses[1]; + const { accessToken, instanceUrl } = responses[2]; + const lead = { + FirstName: user.firstName, + LastName: user.lastName, + Email: user.email, + LeadSource: leadSource, + Company: company, + OwnerId: config.ownerId, + TC_Connect_Project_Id__c: project.id, + }; + return SalesforceService.createObject('Lead', lead, accessToken, instanceUrl) + .then((leadId) => { + const campaignMember = { + LeadId: leadId, + CampaignId: campaignId, + }; + return SalesforceService.createObject('CampaignMember', campaignMember, accessToken, instanceUrl); + }).catch( (e) => { + if (e.response && e.response.text && duplicateRecordRegex.test(e.response.text)) { + throw new UnprocessableError(`Lead already existing for project ${project.id}`); + } + throw e; + }) + }).catch((error) => { + throw error; + }); } /** @@ -88,27 +86,44 @@ class ConsumerService { * @param {Object} projectEvent the project */ @logAndValidate(['logger', 'projectEvent'], projectUpdatedSchema) - async processProjectUpdated(logger, projectEvent) { + processProjectUpdated(logger, projectEvent) { logger.debug(projectEvent) var project = projectEvent.original; - const [ - campaignId, - {accessToken, instanceUrl}, - ] = await Promise.all([ + return Promise.all([ ConfigurationService.getSalesforceCampaignId(), SalesforceService.authenticate(), - ]); - let sql = `SELECT id FROM Lead WHERE TC_Connect_Project_Id__c = '${project.id}'`; - const {records: [lead]} = await SalesforceService.query(sql, accessToken, instanceUrl); - if (!lead) { - throw new UnprocessableError(`Cannot find Lead with TC_Connect_Project_Id__c = '${project.id}'`); - } - sql = `SELECT id FROM CampaignMember WHERE LeadId = '${lead.Id}' AND CampaignId ='${campaignId}'`; - const {records: [member]} = await SalesforceService.query(sql, accessToken, instanceUrl); - if (!member) { - throw new UnprocessableError(`Cannot find CampaignMember for Lead.TC_Connect_Project_Id__c = '${project.id}'`); - } - await SalesforceService.deleteObject('CampaignMember', member.Id, accessToken, instanceUrl); + ]).then((responses) => { + const campaignId = responses[0]; + const { accessToken, instanceUrl } = responses[1]; + // queries existing lead for the project + let sql = `SELECT id FROM Lead WHERE TC_Connect_Project_Id__c = '${project.id}'`; + return SalesforceService.query(sql, accessToken, instanceUrl) + .then((response) => { + const {records: [lead]} = response; + if (!lead) { + throw new UnprocessableError(`Cannot find Lead with TC_Connect_Project_Id__c = '${project.id}'`); + } + sql = `SELECT id FROM CampaignMember WHERE LeadId = '${lead.Id}' AND CampaignId ='${campaignId}'`; + return SalesforceService.query(sql, accessToken, instanceUrl) + .then((response) => { + const {records: [member]} = response; + if (!member) { + throw new UnprocessableError(`Cannot find CampaignMember for Lead.TC_Connect_Project_Id__c = '${project.id}'`); + } + return SalesforceService.deleteObject('CampaignMember', member.Id, accessToken, instanceUrl); + }) + }) + // const {records: [lead]} = await SalesforceService.query(sql, accessToken, instanceUrl); + // if (!lead) { + // throw new UnprocessableError(`Cannot find Lead with TC_Connect_Project_Id__c = '${project.id}'`); + // } + // sql = `SELECT id FROM CampaignMember WHERE LeadId = '${lead.Id}' AND CampaignId ='${campaignId}'`; + // const {records: [member]} = await SalesforceService.query(sql, accessToken, instanceUrl); + // if (!member) { + // throw new UnprocessableError(`Cannot find CampaignMember for Lead.TC_Connect_Project_Id__c = '${project.id}'`); + // } + // await SalesforceService.deleteObject('CampaignMember', member.Id, accessToken, instanceUrl); + }); } } diff --git a/consumer/src/worker.js b/consumer/src/worker.js index 23d2395..ea536dc 100644 --- a/consumer/src/worker.js +++ b/consumer/src/worker.js @@ -114,14 +114,6 @@ async function start() { config.rabbitmq.queues.project, publishChannel ); - const connect2sfChannel = await connection.createConfirmChannel(); - debug('Channel created for consuming failed messages ...'); - consume( - connect2sfChannel, - config.rabbitmq.connect2sfExchange, - config.rabbitmq.queues.connect2sf, - publishChannel - ); } catch (e) { debug('Unable to connect to RabbitMQ'); } diff --git a/consumer/test/ConsumerService.spec.js b/consumer/test/ConsumerService.spec.js index 6be7f89..db51476 100644 --- a/consumer/test/ConsumerService.spec.js +++ b/consumer/test/ConsumerService.spec.js @@ -95,8 +95,14 @@ describe('ConsumerService', () => { id: 1, members: [], }; - await expect(ConsumerService.processProjectCreated(logger, projectWihoutMembers)) - .to.be.rejectedWith(UnprocessableError, /Cannot find primary customer/); + try { + ConsumerService.processProjectCreated(logger, projectWihoutMembers); + sinon.fail('Should be rejected'); + } catch(err) { + expect(err).to.exist + .and.be.instanceof(UnprocessableError) + .and.have.property('message').and.match(/Cannot find primary customer/); + } }); it('should throw UnprocessableError if Lead already exists', async() => { @@ -108,7 +114,7 @@ describe('ConsumerService', () => { }; throw err; }); - await expect(ConsumerService.processProjectCreated(logger,project)) + return expect(ConsumerService.processProjectCreated(logger,project)) .to.be.rejectedWith(UnprocessableError, /Lead already existing for project 1/); createObjectStub.should.have.been.called; }); diff --git a/consumer/test/scheduled-worker.spec.js b/consumer/test/scheduled-worker.spec.js new file mode 100644 index 0000000..4882c35 --- /dev/null +++ b/consumer/test/scheduled-worker.spec.js @@ -0,0 +1,134 @@ +/** + * Unit tests for worker + */ +import {processMessage, initHandlers} from '../src/scheduled-worker'; +import {UnprocessableError} from '../src/common/errors'; +import { EVENT } from '../config/constants'; +import config from 'config'; +import './setup'; +import _ from 'lodash'; + +describe('scheduled-worker', () => { + describe('consume', () => { + const queueName = 'sample-queue'; + const exchangeName = 'sample-exchange'; + const validMessage = { + content: JSON.stringify({ sampleData: 'foo' }), + properties: { correlationId : 'unit-tests'}, + fields: { routingKey: exchangeName } + }; + let handler; + let ack; + let nack; + let assertQueue; + let assertExchange; + let bindQueue; + let rabbitGet; + let exchangeHandlerSpy = sinon.spy(); + let fakeExchangeHandlerSpy = sinon.spy(); + let channelPublishSpy = sinon.spy(); + + beforeEach(() => { + handler = sinon.spy(); + ack = sinon.spy(); + nack = sinon.spy(); + assertQueue = sinon.spy(); + assertExchange = sinon.spy(); + bindQueue = sinon.spy(); + + initHandlers({ + [exchangeName] : (logger, data) => new Promise((resolve, reject) => { + if (data.rejectWithError) { + reject(new Error()); + } else if (data.rejectWithUnprocessableError) { + reject(new UnprocessableError()); + } else { + resolve(data); + } + }), + 'fakeExchange' : fakeExchangeHandlerSpy + }) + }); + + /** + * Invoke the worker consume method using current parameters + * @param done the mocha done function + */ + function invokeProcessMessages(message, done) { + return processMessage({ + ack, + nack, + assertExchange, + bindQueue, + assertQueue, + }, message); + } + + it('should process and ack a message successfully', () => { + invokeProcessMessages(validMessage).then(() => { + ack.should.have.been.calledWith(validMessage); + nack.should.not.have.been.called; + }).catch(() => { + sinon.fail('should not fail'); + }); + }); + + it('should ignore an empty msg', () => { + invokeProcessMessages(null).then(()=> { + sinon.fail('should not scucced'); + }).catch(() => { + ack.should.not.have.been.called; + nack.should.not.have.been.called; + }); + }); + + it('should ignore an false msg', () => { + invokeProcessMessages(false).then(()=> { + sinon.fail('should not scucced'); + }).catch(() => { + ack.should.not.have.been.called; + nack.should.not.have.been.called; + }); + }); + + it('should ack a message with invalid JSON', () => { + const msg = { content: 'foo', fields: { routingKey: exchangeName } }; + invokeProcessMessages(msg).then(()=> { + sinon.fail('should not scucced'); + }).catch(() => { + ack.should.have.been.calledWith(msg); + nack.should.not.have.been.called; + }); + }); + + it('should nack, if error is thrown', () => { + const msg = { + content: JSON.stringify({ sampleData: 'foo', rejectWithError: true }), + properties: { correlationId : 'unit-tests'}, + fields: { routingKey: exchangeName } + }; + invokeProcessMessages(msg).then(() => { + sinon.fail('should not scucced'); + }) + .catch(() => { + ack.should.not.have.been.calledWith; + nack.should.have.been.calledWith(msg); + }); + }); + + it('should ack if error is UnprocessableError', () => { + const msg = { + content: JSON.stringify({ sampleData: 'foo', rejectWithUnprocessableError : true }), + properties: { correlationId : 'unit-tests'}, + fields: { routingKey: exchangeName } + }; + invokeProcessMessages(msg).then(() => { + sinon.fail('should not scucced'); + }) + .catch(() => { + ack.should.have.been.calledWith(msg); + nack.should.not.have.been.calledWith; + }); + }); + }); +}); diff --git a/consumer/test/worker.spec.js b/consumer/test/worker.spec.js index 0141b92..733b77d 100644 --- a/consumer/test/worker.spec.js +++ b/consumer/test/worker.spec.js @@ -90,9 +90,10 @@ describe('worker', () => { it('should ack a message with invalid JSON', (done) => { rabbitConsume = async (queue, fn) => { - const msg = { content: 'foo' }; + const msg = { content: 'foo', fields: { routingKey : exchangeName } }; await fn(msg); ack.should.have.been.calledWith(msg); + nack.should.not.have.been.called; }; invokeConsume(done); }); From bd65a5e60cde3d477861473e37d1c8fa1d0f8339 Mon Sep 17 00:00:00 2001 From: Vikas Agarwal Date: Wed, 19 Apr 2017 14:38:47 +0530 Subject: [PATCH 07/16] Updated docker file to install cron first --- consumer/Dockerfile | 2 ++ 1 file changed, 2 insertions(+) diff --git a/consumer/Dockerfile b/consumer/Dockerfile index 280bdf7..b269729 100644 --- a/consumer/Dockerfile +++ b/consumer/Dockerfile @@ -5,6 +5,8 @@ LABEL description="Topcoder Salesforce Integration" RUN apt-get update && \ apt-get upgrade -y +RUN apt-get install cron -y + # Create app directory RUN mkdir -p /usr/src/app From ad6897e12831b70ad422bdbaf71d415e129d42ca Mon Sep 17 00:00:00 2001 From: Vikas Agarwal Date: Wed, 19 Apr 2017 14:42:50 +0530 Subject: [PATCH 08/16] New line at end of cront tab file --- consumer/config/scheduler-cron | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consumer/config/scheduler-cron b/consumer/config/scheduler-cron index 7afbaa6..88ee7e1 100644 --- a/consumer/config/scheduler-cron +++ b/consumer/config/scheduler-cron @@ -1 +1 @@ -*/5 * * * * babel-node ../src/scheduled-worker.js \ No newline at end of file +*/5 * * * * babel-node ../src/scheduled-worker.js From 34ee80a6f2315ca849e05dde0d8a99d9ed7a9325 Mon Sep 17 00:00:00 2001 From: Vikas Agarwal Date: Wed, 19 Apr 2017 14:44:27 +0530 Subject: [PATCH 09/16] using same key for publishing the failed messages to separate exchange. --- consumer/src/worker.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/consumer/src/worker.js b/consumer/src/worker.js index ea536dc..6b9c122 100644 --- a/consumer/src/worker.js +++ b/consumer/src/worker.js @@ -82,7 +82,8 @@ export async function consume(channel, exchangeName, queue, publishChannel) { try { publishChannel.publish( config.rabbitmq.connect2sfExchange, - key + EVENT.ROUTING_KEY.FAILED_SUFFIX, + // key + EVENT.ROUTING_KEY.FAILED_SUFFIX, + key, new Buffer(msg.content.toString()) ); } catch(e) { From f1727e8d9d61bd0859344acec27efc4288f38949 Mon Sep 17 00:00:00 2001 From: Vikas Agarwal Date: Wed, 19 Apr 2017 14:47:13 +0530 Subject: [PATCH 10/16] fixed unit test --- consumer/test/worker.spec.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consumer/test/worker.spec.js b/consumer/test/worker.spec.js index 733b77d..2552e6c 100644 --- a/consumer/test/worker.spec.js +++ b/consumer/test/worker.spec.js @@ -108,7 +108,7 @@ describe('worker', () => { await fn(validMessage); ack.should.have.been.calledWith(validMessage); const connect2sfExchange = config.rabbitmq.connect2sfExchange; - const failedRoutingKey = validMessage.fields.routingKey + EVENT.ROUTING_KEY.FAILED_SUFFIX; + const failedRoutingKey = validMessage.fields.routingKey;// + EVENT.ROUTING_KEY.FAILED_SUFFIX; channelPublishSpy.should.have.been.calledWith(connect2sfExchange, failedRoutingKey, new Buffer(validMessage.content)); }; invokeConsume(done); From c150ba356ccedeb6ced6c62235b4c912d3eee79a Mon Sep 17 00:00:00 2001 From: Vikas Agarwal Date: Wed, 19 Apr 2017 15:05:46 +0530 Subject: [PATCH 11/16] installed babel-cli for scheduler --- consumer/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consumer/Dockerfile b/consumer/Dockerfile index b269729..09c4231 100644 --- a/consumer/Dockerfile +++ b/consumer/Dockerfile @@ -17,7 +17,7 @@ COPY . /usr/src/app # Install app dependencies RUN npm install -RUN npm install -g forever +RUN npm install -g forever babel-cli RUN crontab config/scheduler-cron From 12064b1db0bff3aff1975b627aaef9fb3ad9cc13 Mon Sep 17 00:00:00 2001 From: Vikas Agarwal Date: Wed, 19 Apr 2017 15:16:15 +0530 Subject: [PATCH 12/16] fixed path of scheduled-worker for cron --- consumer/config/scheduler-cron | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consumer/config/scheduler-cron b/consumer/config/scheduler-cron index 88ee7e1..d6a8f8f 100644 --- a/consumer/config/scheduler-cron +++ b/consumer/config/scheduler-cron @@ -1 +1 @@ -*/5 * * * * babel-node ../src/scheduled-worker.js +*/5 * * * * babel-node /usr/src/app/src/scheduled-worker.js From bfe23d06eec95f0ef7267db11cebec69d0efa337 Mon Sep 17 00:00:00 2001 From: Vikas Agarwal Date: Wed, 19 Apr 2017 15:34:23 +0530 Subject: [PATCH 13/16] Fixed scheduled worker to close itself after processing all messages. --- consumer/src/scheduled-worker.js | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/consumer/src/scheduled-worker.js b/consumer/src/scheduled-worker.js index 295ef6f..2d241da 100644 --- a/consumer/src/scheduled-worker.js +++ b/consumer/src/scheduled-worker.js @@ -22,6 +22,7 @@ let EVENT_HANDLERS = { } function close() { + console.log('closing self...') try { connection.close(); } catch (ignore) { // eslint-ignore-line @@ -60,11 +61,11 @@ export function processMessage(channel, msg) { logger.info(ignore); logger.error('Invalid message. Ignoring'); channel.ack(msg); - reject('Invalid message. Ignoring'); + reject(new Error('Invalid message. Ignoring')); } return handler(logger, data).then(() => { channel.ack(msg); - resolve(); + resolve(msg); }) .catch((e) => { logger.logFullError(e, `Error processing message`); @@ -74,6 +75,7 @@ export function processMessage(channel, msg) { // NACK and requeue (nack requeues by default) the message for next turn channel.nack(msg); } + reject(new Error('Error processing message')); }); }) } @@ -119,7 +121,7 @@ async function start() { } }).catch((e) => { counter++; - debug('Processed messages = ' + counter); + debug('Processed messages[Error] = ' + counter); logger.logFullError(e, `Unable to process one of the messages`); if (counter >= 10) { close(); From e15b55e13645205696276b999005792fb02ce1f5 Mon Sep 17 00:00:00 2001 From: Vikas Agarwal Date: Fri, 21 Apr 2017 12:19:28 +0530 Subject: [PATCH 14/16] Fixed error with unasked messages. It was because connection being closed before channel is able to ack the messages. Sleep.sleep was not doing the work because it seems it stops all threads of the process. So, setTimeout did the trick here. :) --- consumer/src/scheduled-worker.js | 68 ++++++++++++++++++++++---------- 1 file changed, 48 insertions(+), 20 deletions(-) diff --git a/consumer/src/scheduled-worker.js b/consumer/src/scheduled-worker.js index 2d241da..cd0259d 100644 --- a/consumer/src/scheduled-worker.js +++ b/consumer/src/scheduled-worker.js @@ -11,9 +11,17 @@ import { EVENT } from '../config/constants'; const debug = require('debug')('app:worker'); +const FETCH_LIMIT = 10; + let connection; process.once('SIGINT', () => { - close(); + debug('Received SIGINT...closing connection...') + try { + connection.close(); + } catch (ignore) { // eslint-ignore-line + logger.logFullError(ignore) + } + process.exit(); }); let EVENT_HANDLERS = { @@ -22,12 +30,12 @@ let EVENT_HANDLERS = { } function close() { - console.log('closing self...') + console.log('closing self after processing messages...') try { - connection.close(); + setTimeout(connection.close.bind(connection), 30000); } catch (ignore) { // eslint-ignore-line + logger.logFullError(ignore) } - process.exit(); } export function initHandlers(handlers) { @@ -43,6 +51,7 @@ export function processMessage(channel, msg) { return new Promise((resolve, reject) => { if (!msg) { reject(new Error('Empty message. Ignoring')); + return; } debug(`Consuming message in \n${msg.content}`); const key = _.get(msg, 'fields.routingKey'); @@ -54,28 +63,29 @@ export function processMessage(channel, msg) { handler = EVENT_HANDLERS[key]; if (!_.isFunction(handler)) { logger.error(`Unknown message type: ${key}, NACKing... `); - channel.nack(msg, false, false) + reject(new Error(`Unknown message type: ${key}`)); + return; } data = JSON.parse(msg.content.toString()); } catch (ignore) { logger.info(ignore); logger.error('Invalid message. Ignoring'); - channel.ack(msg); - reject(new Error('Invalid message. Ignoring')); + resolve('Invalid message. Ignoring'); + return; } return handler(logger, data).then(() => { - channel.ack(msg); resolve(msg); + return; }) .catch((e) => { - logger.logFullError(e, `Error processing message`); + // logger.logFullError(e, `Error processing message`); if (e.shouldAck) { - channel.ack(msg); + debug("Resolving for Unprocessable Error in handler..."); + resolve(msg); } else { - // NACK and requeue (nack requeues by default) the message for next turn - channel.nack(msg); + debug("Rejecting promise for error in msg processing...") + reject(new Error('Error processing message')); } - reject(new Error('Error processing message')); }); }) } @@ -97,9 +107,16 @@ async function start() { try { console.log(config.rabbitmqURL); connection = await amqp.connect(config.rabbitmqURL); + connection.on('error', (e) => { + logger.logFullError(e, `ERROR IN CONNECTION`); + }) + connection.on('close', () => { + debug('Before closing connection...') + }) debug('created connection successfully with URL: ' + config.rabbitmqURL); const connect2sfChannel = await connection.createConfirmChannel(); debug('Channel created for consuming failed messages ...'); + connect2sfChannel.prefetch(FETCH_LIMIT); assertExchangeQueues( connect2sfChannel, config.rabbitmq.connect2sfExchange, @@ -107,27 +124,38 @@ async function start() { ).then(() => { debug('Asserted all required exchanges and queues'); let counter = 0; - [1,2,3,4,5,6,7,8,9,10].forEach(() => { - return connect2sfChannel.get(config.rabbitmq.queues.connect2sf).then((msg) => { + _.range(1, 11).forEach(() => { + return connect2sfChannel.get(config.rabbitmq.queues.connect2sf). + then((msg) => { if (msg) { - processMessage( + return processMessage( connect2sfChannel, msg ).then((responses) => { counter++; - debug('Processed messages = ' + counter); - if (counter >= 10) { + debug('Processed message'); + connect2sfChannel.ack(msg); + if (counter >= FETCH_LIMIT) { close(); } }).catch((e) => { counter++; - debug('Processed messages[Error] = ' + counter); + debug('Processed message with Error'); + connect2sfChannel.nack(msg); logger.logFullError(e, `Unable to process one of the messages`); - if (counter >= 10) { + if (counter >= FETCH_LIMIT) { close(); } }) + } else { + counter++; + debug('Processed message'); + if (counter >= FETCH_LIMIT) { + close(); + } } + }).catch(() => { + console.log('get failed to consume') }) }) }) From f04eb92d2f2a6ecefcee6b88b7d83a9711413c6c Mon Sep 17 00:00:00 2001 From: Vikas Agarwal Date: Fri, 21 Apr 2017 15:12:12 +0530 Subject: [PATCH 15/16] trying with different base docker image to run cron --- consumer/Dockerfile | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/consumer/Dockerfile b/consumer/Dockerfile index 09c4231..63ffacc 100644 --- a/consumer/Dockerfile +++ b/consumer/Dockerfile @@ -1,9 +1,12 @@ -FROM node:6.9.4 +FROM ubuntu LABEL version="1.0" LABEL description="Topcoder Salesforce Integration" -RUN apt-get update && \ - apt-get upgrade -y + + +RUN apt-get update && apt-get upgrade -y && apt-get install -y cron logrotate curl +RUN curl -sL https://deb.nodesource.com/setup_6.x | bash - +RUN apt-get install -y nodejs RUN apt-get install cron -y @@ -21,7 +24,7 @@ RUN npm install -g forever babel-cli RUN crontab config/scheduler-cron -RUN cron +RUN service cron start EXPOSE 80 From f646e94cfae962d5bb744fb1c6671281c83e40cc Mon Sep 17 00:00:00 2001 From: Vikas Agarwal Date: Fri, 21 Apr 2017 15:46:41 +0530 Subject: [PATCH 16/16] Reverted docker base image to node Moving schedule logic to node via node-cron module --- consumer/Dockerfile | 15 ++++++--------- consumer/package.json | 1 + consumer/src/scheduled-worker.js | 8 ++++---- consumer/src/worker.js | 8 +++++++- 4 files changed, 18 insertions(+), 14 deletions(-) diff --git a/consumer/Dockerfile b/consumer/Dockerfile index 63ffacc..a2c1d26 100644 --- a/consumer/Dockerfile +++ b/consumer/Dockerfile @@ -1,14 +1,11 @@ -FROM ubuntu +FROM node:6.9.4 LABEL version="1.0" LABEL description="Topcoder Salesforce Integration" +RUN apt-get update && \ + apt-get upgrade -y - -RUN apt-get update && apt-get upgrade -y && apt-get install -y cron logrotate curl -RUN curl -sL https://deb.nodesource.com/setup_6.x | bash - -RUN apt-get install -y nodejs - -RUN apt-get install cron -y +#RUN apt-get install cron -y # Create app directory @@ -22,9 +19,9 @@ RUN npm install RUN npm install -g forever babel-cli -RUN crontab config/scheduler-cron +#RUN crontab config/scheduler-cron -RUN service cron start +#RUN service cron start EXPOSE 80 diff --git a/consumer/package.json b/consumer/package.json index f3fa793..64c3dc3 100644 --- a/consumer/package.json +++ b/consumer/package.json @@ -46,6 +46,7 @@ "joi": "^9.0.4", "jsonwebtoken": "^7.1.7", "lodash": "^4.14.2", + "node-cron": "^1.1.3", "superagent": "^2.1.0", "superagent-promise": "^1.1.0", "winston": "^2.2.0" diff --git a/consumer/src/scheduled-worker.js b/consumer/src/scheduled-worker.js index cd0259d..b82d6eb 100644 --- a/consumer/src/scheduled-worker.js +++ b/consumer/src/scheduled-worker.js @@ -103,9 +103,9 @@ function assertExchangeQueues(channel, exchangeName, queue) { /** * Start the worker */ -async function start() { +export async function start() { try { - console.log(config.rabbitmqURL); + console.log("Scheduled Worker Connecting to RabbitMQ: " + config.rabbitmqURL.substr(-5)); connection = await amqp.connect(config.rabbitmqURL); connection.on('error', (e) => { logger.logFullError(e, `ERROR IN CONNECTION`); @@ -149,7 +149,7 @@ async function start() { }) } else { counter++; - debug('Processed message'); + debug('Processed Empty message'); if (counter >= FETCH_LIMIT) { close(); } @@ -166,5 +166,5 @@ async function start() { } if (!module.parent) { - start(); + start(); } diff --git a/consumer/src/worker.js b/consumer/src/worker.js index 6b9c122..959465c 100644 --- a/consumer/src/worker.js +++ b/consumer/src/worker.js @@ -8,6 +8,8 @@ import _ from 'lodash'; import logger from './common/logger'; import ConsumerService from './services/ConsumerService'; import { EVENT } from '../config/constants'; +import cron from 'node-cron'; +import { start as scheduleStart } from './scheduled-worker' const debug = require('debug')('app:worker'); @@ -102,7 +104,7 @@ export async function consume(channel, exchangeName, queue, publishChannel) { */ async function start() { try { - console.log(config.rabbitmqURL); + console.log("Worker Connecting to RabbitMQ: " + config.rabbitmqURL.substr(-5)); connection = await amqp.connect(config.rabbitmqURL); debug('created connection successfully with URL: ' + config.rabbitmqURL); const channel = await connection.createConfirmChannel(); @@ -122,4 +124,8 @@ async function start() { if (!module.parent) { start(); + + cron.schedule('*/1 * * * *', function(){ + scheduleStart(); + }); }