Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ The other configurations can be changed in `config/default.js` or by setting env
- `ALLOWED_SERVICES` the allowed calling services
- `JWT_TOKEN_SECRET` the secret to sign JWT tokens
- `JWT_TOKEN_EXPIRES_IN` the JWT token expiration
- `KAFKA_TOPIC_PREFIX` the prefix of all topics in Kafka
- `TC_EMAIL_URL` the email service URL (http://localhost:4001, if deployed locally)
- `TC_EMAIL_TOKEN` the email service authentication token (see tc-email README for details **link should be added later**)
- `TC_EMAIL_CACHE_PERIOD` the period to cache template placeholders from email service (60 min default)
Expand Down
37 changes: 0 additions & 37 deletions common/helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -126,42 +126,6 @@ function signJwtToken (payload) {
return jwt.sign(payload, config.JWT_TOKEN_SECRET, {expiresIn: config.JWT_TOKEN_EXPIRES_IN})
}

/**
* Validate the event based on the source service, type, and message.
*
* @param {Object} event the event
*/
function validateEvent (event) {
const schema = Joi.object().keys({
event: Joi.object().keys({
type: Joi
.string()
.regex(/^([a-zA-Z0-9]+\.)+[a-zA-Z0-9]+$/)
.error(createError.BadRequest(
'"type" must be a fully qualified name - dot separated string'))
.required(),
message: Joi.string().required()
})
})

const { error } = Joi.validate({event}, schema)
if (error) {
throw error
}

// The message should be a JSON-formatted string
let message
try {
message = JSON.parse(event.message)
} catch (err) {
logger.error(err)
throw createError.BadRequest(
`"message" is not a valid JSON-formatted string: ${err.message}`)
}

return message
}

/**
* Validate the event payload
*
Expand Down Expand Up @@ -201,7 +165,6 @@ module.exports = {
buildService,
verifyJwtToken,
signJwtToken,
validateEvent,
validateEventPayload,
verifyTokenScope
}
1 change: 0 additions & 1 deletion config/default.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ module.exports = {
VALID_ISSUERS: process.env.VALID_ISSUERS ? process.env.VALID_ISSUERS.replace(/\\"/g, '') : null,
JWT_TOKEN_SECRET: process.env.JWT_TOKEN_SECRET || '',
JWT_TOKEN_EXPIRES_IN: process.env.JWT_TOKEN_EXPIRES_IN || '100 days',
KAFKA_TOPIC_PREFIX: process.env.KAFKA_TOPIC_PREFIX || '',
ALLOWED_SERVICES: process.env.ALLOWED_SERVICES || ['project-service', 'message-service'],
TC_EMAIL_SERVICE_URL: process.env.TC_EMAIL_SERVICE_URL,
TC_EMAIL_SERVICE_CACHE_PERIOD: process.env.TC_EMAIL_SERVICE_CACHE_PERIOD || (3600 * 1000),
Expand Down
7 changes: 1 addition & 6 deletions deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ AWS_ECS_CONTAINER_NAME=$(eval "echo \$${ENV}_AWS_ECS_CONTAINER_NAME")

LOG_LEVEL=$(eval "echo \$${ENV}_LOG_LEVEL")
JWT_TOKEN_SECRET=$(eval "echo \$${ENV}_JWT_TOKEN_SECRET")
KAFKA_TOPIC_PREFIX=$(eval "echo \$${ENV}_KAFKA_TOPIC_PREFIX")
API_VERSION=$(eval "echo \$${ENV}_API_VERSION")
ALLOWED_SERVICES=$(eval "echo \$${ENV}_ALLOWED_SERVICES")
JWT_TOKEN_EXPIRES_IN=$(eval "echo \$${ENV}_JWT_TOKEN_EXPIRES_IN")
Expand Down Expand Up @@ -124,10 +123,6 @@ make_task_def(){
"name": "JWT_TOKEN_SECRET",
"value": "%s"
},
{
"name": "KAFKA_TOPIC_PREFIX",
"value": "%s"
},
{
"name": "ALLOWED_SERVICES",
"value": "%s"
Expand Down Expand Up @@ -195,7 +190,7 @@ make_task_def(){
}
]'

task_def=$(printf "$task_template" $AWS_ECS_CONTAINER_NAME $AWS_ACCOUNT_ID $AWS_REGION $AWS_REPOSITORY $TAG $ENV $KAFKA_URL "$KAFKA_CLIENT_CERT" "$KAFKA_CLIENT_CERT_KEY" $LOG_LEVEL $JWT_TOKEN_SECRET "$KAFKA_TOPIC_PREFIX" "$ALLOWED_SERVICES" $JWT_TOKEN_EXPIRES_IN "$API_VERSION" $PORT "$AUTH_DOMAIN" "$VALID_ISSUERS" $TC_EMAIL_SERVICE_URL "$AUTH0_URL" "$AUTH0_AUDIENCE" $AUTH0_CLIENT_ID "$AUTH0_CLIENT_SECRET" $TOKEN_CACHE_TIME $AWS_ECS_CLUSTER $AWS_REGION $AWS_ECS_CLUSTER $ENV)
task_def=$(printf "$task_template" $AWS_ECS_CONTAINER_NAME $AWS_ACCOUNT_ID $AWS_REGION $AWS_REPOSITORY $TAG $ENV $KAFKA_URL "$KAFKA_CLIENT_CERT" "$KAFKA_CLIENT_CERT_KEY" $LOG_LEVEL $JWT_TOKEN_SECRET "$ALLOWED_SERVICES" $JWT_TOKEN_EXPIRES_IN "$API_VERSION" $PORT "$AUTH_DOMAIN" "$VALID_ISSUERS" $TC_EMAIL_SERVICE_URL "$AUTH0_URL" "$AUTH0_AUDIENCE" $AUTH0_CLIENT_ID "$AUTH0_CLIENT_SECRET" $TOKEN_CACHE_TIME $AWS_ECS_CLUSTER $AWS_REGION $AWS_ECS_CLUSTER $ENV)
}

register_definition() {
Expand Down
47 changes: 3 additions & 44 deletions service/MessageBusService.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,11 @@
* The Message Bus service provides operations to the remote Kafka.
*/
const createError = require('http-errors')
const Joi = require('joi')
const _ = require('lodash')
const config = require('config')
const Kafka = require('no-kafka')

const helper = require('../common/helper')

const PlaceholderService = require('./PlaceholdersService')

// Create a new producer instance with KAFKA_URL, KAFKA_CLIENT_CERT, and
// KAFKA_CLIENT_CERT_KEY environment variables
const producer = new Kafka.Producer()
Expand All @@ -28,46 +24,9 @@ async function init () {
* @param {Object} event the event to post
*/
async function postEvent (event) {
if (_.has(event, 'message')) {
const message = helper.validateEvent(event)

if (event.type.startsWith('email.')) {
let placeholders
try {
placeholders = await PlaceholderService.getAllPlaceholders(event.type)
} catch (err) {
throw createError.InternalServerError()
}

const keys = _.fromPairs(_.map(placeholders, o => [o, Joi.string().required().min(1)]))
const schema = Joi.object().keys({
data: Joi.object().keys(keys).required(),
recipients: Joi.array().items(Joi.string().email()).min(1).required(),
replyTo: Joi.string().email()
})
const { error } = Joi.validate(message, schema)
if (error) {
throw error
}
}

// Post old structure
const result = await producer.send({
topic: `${config.KAFKA_TOPIC_PREFIX}${event.type}`,
message: {
value: event.message
}
})
// Check if there is any error
const error = _.get(result, '[0].error')
if (error) {
if (error.code === 'UnknownTopicOrPartition') {
throw createError.BadRequest(`Unknown event type "${event.type}"`)
}
// var result

throw createError.InternalServerError()
}
} else if (_.has(event, 'payload')) {
if (_.has(event, 'payload')) {
helper.validateEventPayload(event)

// Post new structure
Expand All @@ -86,7 +45,7 @@ async function postEvent (event) {
throw createError.InternalServerError()
}
} else {
throw createError.BadRequest(`Expecting either old (type-message) structure or new (mimetype-payload)`)
throw createError.BadRequest(`Expecting new (mimetype-payload) structure`)
}
}

Expand Down