From 5211533e867317c0b5abbdf02874cce40015d67b Mon Sep 17 00:00:00 2001 From: Samir Gondzetovic Date: Tue, 26 Jun 2018 15:48:59 +0100 Subject: [PATCH 1/2] deprecate old bus api format --- README.md | 1 - common/helper.js | 37 --------------------------- config/default.js | 1 - deploy.sh | 7 +----- services/MessageBusService.js | 47 ++--------------------------------- 5 files changed, 3 insertions(+), 90 deletions(-) diff --git a/README.md b/README.md index 78a4a06..9687583 100644 --- a/README.md +++ b/README.md @@ -79,7 +79,6 @@ The other configurations can be changed in `config/default.js` or by setting env - `ALLOWED_SERVICES` the allowed calling services - `JWT_TOKEN_SECRET` the secret to sign JWT tokens - `JWT_TOKEN_EXPIRES_IN` the JWT token expiration -- `KAFKA_TOPIC_PREFIX` the prefix of all topics in Kafka - `TC_EMAIL_URL` the email service URL (http://localhost:4001, if deployed locally) - `TC_EMAIL_TOKEN` the email service authentication token (see tc-email README for details **link should be added later**) - `TC_EMAIL_CACHE_PERIOD` the period to cache template placeholders from email service (60 min default) diff --git a/common/helper.js b/common/helper.js index a5a9b5e..b31e2f8 100644 --- a/common/helper.js +++ b/common/helper.js @@ -126,42 +126,6 @@ function signJwtToken (payload) { return jwt.sign(payload, config.JWT_TOKEN_SECRET, {expiresIn: config.JWT_TOKEN_EXPIRES_IN}) } -/** - * Validate the event based on the source service, type, and message. - * - * @param {Object} event the event - */ -function validateEvent (event) { - const schema = Joi.object().keys({ - event: Joi.object().keys({ - type: Joi - .string() - .regex(/^([a-zA-Z0-9]+\.)+[a-zA-Z0-9]+$/) - .error(createError.BadRequest( - '"type" must be a fully qualified name - dot separated string')) - .required(), - message: Joi.string().required() - }) - }) - - const { error } = Joi.validate({event}, schema) - if (error) { - throw error - } - - // The message should be a JSON-formatted string - let message - try { - message = JSON.parse(event.message) - } catch (err) { - logger.error(err) - throw createError.BadRequest( - `"message" is not a valid JSON-formatted string: ${err.message}`) - } - - return message -} - /** * Validate the event payload * @@ -201,7 +165,6 @@ module.exports = { buildService, verifyJwtToken, signJwtToken, - validateEvent, validateEventPayload, verifyTokenScope } diff --git a/config/default.js b/config/default.js index 3aa2e07..9f30435 100644 --- a/config/default.js +++ b/config/default.js @@ -11,7 +11,6 @@ module.exports = { VALID_ISSUERS: process.env.VALID_ISSUERS ? process.env.VALID_ISSUERS.replace(/\\"/g, '') : null, JWT_TOKEN_SECRET: process.env.JWT_TOKEN_SECRET || '', JWT_TOKEN_EXPIRES_IN: process.env.JWT_TOKEN_EXPIRES_IN || '100 days', - KAFKA_TOPIC_PREFIX: process.env.KAFKA_TOPIC_PREFIX || '', ALLOWED_SERVICES: process.env.ALLOWED_SERVICES || ['project-service', 'message-service'], TC_EMAIL_SERVICE_URL: process.env.TC_EMAIL_SERVICE_URL, TC_EMAIL_SERVICE_CACHE_PERIOD: process.env.TC_EMAIL_SERVICE_CACHE_PERIOD || (3600 * 1000), diff --git a/deploy.sh b/deploy.sh index 60c7b78..a0228fc 100755 --- a/deploy.sh +++ b/deploy.sh @@ -35,7 +35,6 @@ AWS_ECS_CONTAINER_NAME=$(eval "echo \$${ENV}_AWS_ECS_CONTAINER_NAME") LOG_LEVEL=$(eval "echo \$${ENV}_LOG_LEVEL") JWT_TOKEN_SECRET=$(eval "echo \$${ENV}_JWT_TOKEN_SECRET") -KAFKA_TOPIC_PREFIX=$(eval "echo \$${ENV}_KAFKA_TOPIC_PREFIX") API_VERSION=$(eval "echo \$${ENV}_API_VERSION") ALLOWED_SERVICES=$(eval "echo \$${ENV}_ALLOWED_SERVICES") JWT_TOKEN_EXPIRES_IN=$(eval "echo \$${ENV}_JWT_TOKEN_EXPIRES_IN") @@ -124,10 +123,6 @@ make_task_def(){ "name": "JWT_TOKEN_SECRET", "value": "%s" }, - { - "name": "KAFKA_TOPIC_PREFIX", - "value": "%s" - }, { "name": "ALLOWED_SERVICES", "value": "%s" @@ -194,7 +189,7 @@ make_task_def(){ } } ]' - + task_def=$(printf "$task_template" $AWS_ECS_CONTAINER_NAME $AWS_ACCOUNT_ID $AWS_REGION $AWS_REPOSITORY $TAG $ENV $KAFKA_URL "$KAFKA_CLIENT_CERT" "$KAFKA_CLIENT_CERT_KEY" $LOG_LEVEL $JWT_TOKEN_SECRET "$KAFKA_TOPIC_PREFIX" "$ALLOWED_SERVICES" $JWT_TOKEN_EXPIRES_IN "$API_VERSION" $PORT "$AUTH_DOMAIN" "$VALID_ISSUERS" $TC_EMAIL_SERVICE_URL "$AUTH0_URL" "$AUTH0_AUDIENCE" $AUTH0_CLIENT_ID "$AUTH0_CLIENT_SECRET" $TOKEN_CACHE_TIME $AWS_ECS_CLUSTER $AWS_REGION $AWS_ECS_CLUSTER $ENV) } diff --git a/services/MessageBusService.js b/services/MessageBusService.js index b6ba92e..cb21b3b 100644 --- a/services/MessageBusService.js +++ b/services/MessageBusService.js @@ -2,15 +2,11 @@ * The Message Bus service provides operations to the remote Kafka. */ const createError = require('http-errors') -const Joi = require('joi') const _ = require('lodash') -const config = require('config') const Kafka = require('no-kafka') const helper = require('../common/helper') -const PlaceholderService = require('./PlaceholderService') - // Create a new producer instance with KAFKA_URL, KAFKA_CLIENT_CERT, and // KAFKA_CLIENT_CERT_KEY environment variables const producer = new Kafka.Producer() @@ -30,46 +26,7 @@ async function init () { async function postEvent (event) { // var result - if (_.has(event, 'message')) { - const message = helper.validateEvent(event) - - if (event.type.startsWith('email.')) { - let placeholders - try { - placeholders = await PlaceholderService.getAllPlaceholders(event.type) - } catch (err) { - throw createError.InternalServerError() - } - - const keys = _.fromPairs(_.map(placeholders, o => [o, Joi.string().required().min(1)])) - const schema = Joi.object().keys({ - data: Joi.object().keys(keys).required(), - recipients: Joi.array().items(Joi.string().email()).min(1).required(), - replyTo: Joi.string().email() - }) - const { error } = Joi.validate(message, schema) - if (error) { - throw error - } - } - - // Post old structure - const result = await producer.send({ - topic: `${config.KAFKA_TOPIC_PREFIX}${event.type}`, - message: { - value: event.message - } - }) - // Check if there is any error - const error = _.get(result, '[0].error') - if (error) { - if (error.code === 'UnknownTopicOrPartition') { - throw createError.BadRequest(`Unknown event type "${event.type}"`) - } - - throw createError.InternalServerError() - } - } else if (_.has(event, 'payload')) { + if (_.has(event, 'payload')) { helper.validateEventPayload(event) // Post new structure @@ -88,7 +45,7 @@ async function postEvent (event) { throw createError.InternalServerError() } } else { - throw createError.BadRequest(`Expecting either old (type-message) structure or new (mimetype-payload)`) + throw createError.BadRequest(`Expecting new (mimetype-payload) structure`) } } From 2aa8019034ace2c575346007dcb144e8eb1c3647 Mon Sep 17 00:00:00 2001 From: Sachin Maheshwari Date: Tue, 3 Jul 2018 15:53:32 +0530 Subject: [PATCH 2/2] fixing deployment issue. --- deploy.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deploy.sh b/deploy.sh index a0228fc..f78d41b 100755 --- a/deploy.sh +++ b/deploy.sh @@ -190,7 +190,7 @@ make_task_def(){ } ]' - task_def=$(printf "$task_template" $AWS_ECS_CONTAINER_NAME $AWS_ACCOUNT_ID $AWS_REGION $AWS_REPOSITORY $TAG $ENV $KAFKA_URL "$KAFKA_CLIENT_CERT" "$KAFKA_CLIENT_CERT_KEY" $LOG_LEVEL $JWT_TOKEN_SECRET "$KAFKA_TOPIC_PREFIX" "$ALLOWED_SERVICES" $JWT_TOKEN_EXPIRES_IN "$API_VERSION" $PORT "$AUTH_DOMAIN" "$VALID_ISSUERS" $TC_EMAIL_SERVICE_URL "$AUTH0_URL" "$AUTH0_AUDIENCE" $AUTH0_CLIENT_ID "$AUTH0_CLIENT_SECRET" $TOKEN_CACHE_TIME $AWS_ECS_CLUSTER $AWS_REGION $AWS_ECS_CLUSTER $ENV) + task_def=$(printf "$task_template" $AWS_ECS_CONTAINER_NAME $AWS_ACCOUNT_ID $AWS_REGION $AWS_REPOSITORY $TAG $ENV $KAFKA_URL "$KAFKA_CLIENT_CERT" "$KAFKA_CLIENT_CERT_KEY" $LOG_LEVEL $JWT_TOKEN_SECRET "$ALLOWED_SERVICES" $JWT_TOKEN_EXPIRES_IN "$API_VERSION" $PORT "$AUTH_DOMAIN" "$VALID_ISSUERS" $TC_EMAIL_SERVICE_URL "$AUTH0_URL" "$AUTH0_AUDIENCE" $AUTH0_CLIENT_ID "$AUTH0_CLIENT_SECRET" $TOKEN_CACHE_TIME $AWS_ECS_CLUSTER $AWS_REGION $AWS_ECS_CLUSTER $ENV) } register_definition() {