diff --git a/README.md b/README.md index 68e5437..b18e28f 100644 --- a/README.md +++ b/README.md @@ -83,7 +83,6 @@ The other configurations can be changed in `config/default.js` or by setting env - `ALLOWED_SERVICES` the allowed calling services - `JWT_TOKEN_SECRET` the secret to sign JWT tokens - `JWT_TOKEN_EXPIRES_IN` the JWT token expiration -- `KAFKA_TOPIC_PREFIX` the prefix of all topics in Kafka - `TC_EMAIL_URL` the email service URL (http://localhost:4001, if deployed locally) - `TC_EMAIL_TOKEN` the email service authentication token (see tc-email README for details **link should be added later**) - `TC_EMAIL_CACHE_PERIOD` the period to cache template placeholders from email service (60 min default) diff --git a/common/helper.js b/common/helper.js index fd32364..6e38ad5 100644 --- a/common/helper.js +++ b/common/helper.js @@ -126,42 +126,6 @@ function signJwtToken (payload) { return jwt.sign(payload, config.JWT_TOKEN_SECRET, {expiresIn: config.JWT_TOKEN_EXPIRES_IN}) } -/** - * Validate the event based on the source service, type, and message. - * - * @param {Object} event the event - */ -function validateEvent (event) { - const schema = Joi.object().keys({ - event: Joi.object().keys({ - type: Joi - .string() - .regex(/^([a-zA-Z0-9]+\.)+[a-zA-Z0-9]+$/) - .error(createError.BadRequest( - '"type" must be a fully qualified name - dot separated string')) - .required(), - message: Joi.string().required() - }) - }) - - const { error } = Joi.validate({event}, schema) - if (error) { - throw error - } - - // The message should be a JSON-formatted string - let message - try { - message = JSON.parse(event.message) - } catch (err) { - logger.error(err) - throw createError.BadRequest( - `"message" is not a valid JSON-formatted string: ${err.message}`) - } - - return message -} - /** * Validate the event payload * @@ -201,7 +165,6 @@ module.exports = { buildService, verifyJwtToken, signJwtToken, - validateEvent, validateEventPayload, verifyTokenScope } diff --git a/config/default.js b/config/default.js index 5d54655..c91b027 100644 --- a/config/default.js +++ b/config/default.js @@ -10,7 +10,6 @@ module.exports = { VALID_ISSUERS: process.env.VALID_ISSUERS ? process.env.VALID_ISSUERS.replace(/\\"/g, '') : null, JWT_TOKEN_SECRET: process.env.JWT_TOKEN_SECRET || '', JWT_TOKEN_EXPIRES_IN: process.env.JWT_TOKEN_EXPIRES_IN || '100 days', - KAFKA_TOPIC_PREFIX: process.env.KAFKA_TOPIC_PREFIX || '', ALLOWED_SERVICES: process.env.ALLOWED_SERVICES || ['project-service', 'message-service'], TC_EMAIL_SERVICE_URL: process.env.TC_EMAIL_SERVICE_URL, TC_EMAIL_SERVICE_CACHE_PERIOD: process.env.TC_EMAIL_SERVICE_CACHE_PERIOD || (3600 * 1000), diff --git a/deploy.sh b/deploy.sh index d07e2b8..f78d41b 100755 --- a/deploy.sh +++ b/deploy.sh @@ -35,7 +35,6 @@ AWS_ECS_CONTAINER_NAME=$(eval "echo \$${ENV}_AWS_ECS_CONTAINER_NAME") LOG_LEVEL=$(eval "echo \$${ENV}_LOG_LEVEL") JWT_TOKEN_SECRET=$(eval "echo \$${ENV}_JWT_TOKEN_SECRET") -KAFKA_TOPIC_PREFIX=$(eval "echo \$${ENV}_KAFKA_TOPIC_PREFIX") API_VERSION=$(eval "echo \$${ENV}_API_VERSION") ALLOWED_SERVICES=$(eval "echo \$${ENV}_ALLOWED_SERVICES") JWT_TOKEN_EXPIRES_IN=$(eval "echo \$${ENV}_JWT_TOKEN_EXPIRES_IN") @@ -124,10 +123,6 @@ make_task_def(){ "name": "JWT_TOKEN_SECRET", "value": "%s" }, - { - "name": "KAFKA_TOPIC_PREFIX", - "value": "%s" - }, { "name": "ALLOWED_SERVICES", "value": "%s" @@ -195,7 +190,7 @@ make_task_def(){ } ]' - task_def=$(printf "$task_template" $AWS_ECS_CONTAINER_NAME $AWS_ACCOUNT_ID $AWS_REGION $AWS_REPOSITORY $TAG $ENV $KAFKA_URL "$KAFKA_CLIENT_CERT" "$KAFKA_CLIENT_CERT_KEY" $LOG_LEVEL $JWT_TOKEN_SECRET "$KAFKA_TOPIC_PREFIX" "$ALLOWED_SERVICES" $JWT_TOKEN_EXPIRES_IN "$API_VERSION" $PORT "$AUTH_DOMAIN" "$VALID_ISSUERS" $TC_EMAIL_SERVICE_URL "$AUTH0_URL" "$AUTH0_AUDIENCE" $AUTH0_CLIENT_ID "$AUTH0_CLIENT_SECRET" $TOKEN_CACHE_TIME $AWS_ECS_CLUSTER $AWS_REGION $AWS_ECS_CLUSTER $ENV) + task_def=$(printf "$task_template" $AWS_ECS_CONTAINER_NAME $AWS_ACCOUNT_ID $AWS_REGION $AWS_REPOSITORY $TAG $ENV $KAFKA_URL "$KAFKA_CLIENT_CERT" "$KAFKA_CLIENT_CERT_KEY" $LOG_LEVEL $JWT_TOKEN_SECRET "$ALLOWED_SERVICES" $JWT_TOKEN_EXPIRES_IN "$API_VERSION" $PORT "$AUTH_DOMAIN" "$VALID_ISSUERS" $TC_EMAIL_SERVICE_URL "$AUTH0_URL" "$AUTH0_AUDIENCE" $AUTH0_CLIENT_ID "$AUTH0_CLIENT_SECRET" $TOKEN_CACHE_TIME $AWS_ECS_CLUSTER $AWS_REGION $AWS_ECS_CLUSTER $ENV) } register_definition() { diff --git a/service/MessageBusService.js b/service/MessageBusService.js index 3563bbc..cb21b3b 100644 --- a/service/MessageBusService.js +++ b/service/MessageBusService.js @@ -2,15 +2,11 @@ * The Message Bus service provides operations to the remote Kafka. */ const createError = require('http-errors') -const Joi = require('joi') const _ = require('lodash') -const config = require('config') const Kafka = require('no-kafka') const helper = require('../common/helper') -const PlaceholderService = require('./PlaceholdersService') - // Create a new producer instance with KAFKA_URL, KAFKA_CLIENT_CERT, and // KAFKA_CLIENT_CERT_KEY environment variables const producer = new Kafka.Producer() @@ -28,46 +24,9 @@ async function init () { * @param {Object} event the event to post */ async function postEvent (event) { - if (_.has(event, 'message')) { - const message = helper.validateEvent(event) - - if (event.type.startsWith('email.')) { - let placeholders - try { - placeholders = await PlaceholderService.getAllPlaceholders(event.type) - } catch (err) { - throw createError.InternalServerError() - } - - const keys = _.fromPairs(_.map(placeholders, o => [o, Joi.string().required().min(1)])) - const schema = Joi.object().keys({ - data: Joi.object().keys(keys).required(), - recipients: Joi.array().items(Joi.string().email()).min(1).required(), - replyTo: Joi.string().email() - }) - const { error } = Joi.validate(message, schema) - if (error) { - throw error - } - } - - // Post old structure - const result = await producer.send({ - topic: `${config.KAFKA_TOPIC_PREFIX}${event.type}`, - message: { - value: event.message - } - }) - // Check if there is any error - const error = _.get(result, '[0].error') - if (error) { - if (error.code === 'UnknownTopicOrPartition') { - throw createError.BadRequest(`Unknown event type "${event.type}"`) - } + // var result - throw createError.InternalServerError() - } - } else if (_.has(event, 'payload')) { + if (_.has(event, 'payload')) { helper.validateEventPayload(event) // Post new structure @@ -86,7 +45,7 @@ async function postEvent (event) { throw createError.InternalServerError() } } else { - throw createError.BadRequest(`Expecting either old (type-message) structure or new (mimetype-payload)`) + throw createError.BadRequest(`Expecting new (mimetype-payload) structure`) } }