From 624b03ab834a0f20813fbffe35fdab52b06dce35 Mon Sep 17 00:00:00 2001 From: yoution Date: Sat, 31 Jul 2021 14:07:32 +0800 Subject: [PATCH] feat: add transaction for WorkPeriodService, WorkPeriodPaymentService and ResourceBookingService --- config/default.js | 3 + src/common/helper.js | 21 +++ src/esProcessors/ResourceBookingProcessor.js | 54 +++++++ .../WorkPeriodPaymentProcessor.js | 85 +++++++++++ src/esProcessors/WorkPeriodProcessor.js | 132 ++++++++++++++++++ src/services/ResourceBookingService.js | 55 +++++++- src/services/WorkPeriodPaymentService.js | 47 ++++++- src/services/WorkPeriodService.js | 73 +++++++--- 8 files changed, 441 insertions(+), 29 deletions(-) create mode 100644 src/esProcessors/ResourceBookingProcessor.js create mode 100644 src/esProcessors/WorkPeriodPaymentProcessor.js create mode 100644 src/esProcessors/WorkPeriodProcessor.js diff --git a/config/default.js b/config/default.js index cb589290..1cc0818a 100644 --- a/config/default.js +++ b/config/default.js @@ -93,6 +93,9 @@ module.exports = { KAFKA_ERROR_TOPIC: process.env.KAFKA_ERROR_TOPIC || 'common.error.reporting', // The originator value for the kafka messages KAFKA_MESSAGE_ORIGINATOR: process.env.KAFKA_MESSAGE_ORIGINATOR || 'taas-api', + + // topics for error + TAAS_ERROR_TOPIC: process.env.UBAHN_ERROR_TOPIC || 'taas.action.error', // topics for job service // the create job entity Kafka message topic TAAS_JOB_CREATE_TOPIC: process.env.TAAS_JOB_CREATE_TOPIC || 'taas.job.create', diff --git a/src/common/helper.js b/src/common/helper.js index 7f9625be..f76ed60e 100644 --- a/src/common/helper.js +++ b/src/common/helper.js @@ -987,6 +987,26 @@ async function postEvent (topic, payload, options = {}) { await eventDispatcher.handleEvent(topic, { value: payload, options }) } +/** + * Send error event to Kafka + * @params {String} topic the topic name + * @params {Object} payload the payload + * @params {String} action for which operation error occurred + */ +async function postErrorEvent (topic, payload, action) { + _.set(payload, 'apiAction', action) + const client = getBusApiClient() + const message = { + topic, + originator: config.KAFKA_MESSAGE_ORIGINATOR, + timestamp: new Date().toISOString(), + 'mime-type': 'application/json', + payload + } + logger.debug(`Publish error to Kafka topic ${topic}, ${JSON.stringify(message, null, 2)}`) + await client.postEvent(message) +} + /** * Test if an error is document missing exception * @@ -2036,6 +2056,7 @@ module.exports = { getM2MToken, getM2MUbahnToken, postEvent, + postErrorEvent, getBusApiClient, isDocumentMissingException, getProjects, diff --git a/src/esProcessors/ResourceBookingProcessor.js b/src/esProcessors/ResourceBookingProcessor.js new file mode 100644 index 00000000..e81e3ccb --- /dev/null +++ b/src/esProcessors/ResourceBookingProcessor.js @@ -0,0 +1,54 @@ +/** + * ResourceBooking Processor + */ + +const helper = require('../common/helper') +const config = require('config') + +const esClient = helper.getESClient() + +/** + * Process create entity message + * @param {Object} entity entity object + */ +async function processCreate (entity) { + await esClient.create({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + id: entity.id, + body: entity, + refresh: 'wait_for' + }) +} + +/** + * Process update entity message + * @param {Object} entity entity object + */ +async function processUpdate (entity) { + await esClient.update({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + id: entity.id, + body: { + doc: entity + }, + refresh: 'wait_for' + }) +} + +/** + * Process delete entity message + * @param {Object} entity entity object + */ +async function processDelete (entity) { + await esClient.delete({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + id: entity.id, + refresh: 'wait_for' + }) +} + +module.exports = { + processCreate, + processUpdate, + processDelete +} diff --git a/src/esProcessors/WorkPeriodPaymentProcessor.js b/src/esProcessors/WorkPeriodPaymentProcessor.js new file mode 100644 index 00000000..78e379c6 --- /dev/null +++ b/src/esProcessors/WorkPeriodPaymentProcessor.js @@ -0,0 +1,85 @@ +/** + * WorkPeriodPayment Processor + */ + +const config = require('config') +const helper = require('../common/helper') + +const esClient = helper.getESClient() + +/** + * Process create entity + * @param {Object} entity entity object + */ +async function processCreate (entity) { + // find related resourceBooking + const resourceBooking = await esClient.search({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + body: { + query: { + nested: { + path: 'workPeriods', + query: { + match: { 'workPeriods.id': entity.workPeriodId } + } + } + } + } + }) + if (!resourceBooking.body.hits.total.value) { + throw new Error(`id: ${entity.workPeriodId} "WorkPeriod" not found`) + } + await esClient.update({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + id: resourceBooking.body.hits.hits[0]._id, + body: { + script: { + lang: 'painless', + source: 'def wp = ctx._source.workPeriods.find(workPeriod -> workPeriod.id == params.workPeriodPayment.workPeriodId); if(!wp.containsKey("payments") || wp.payments == null){wp["payments"]=[]}wp.payments.add(params.workPeriodPayment)', + params: { workPeriodPayment: entity } + } + }, + refresh: 'wait_for' + }) +} + +/** + * Process update entity + * @param {Object} entity entity object + */ +async function processUpdate (entity) { + // find workPeriodPayment in it's parent ResourceBooking + const resourceBooking = await esClient.search({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + body: { + query: { + nested: { + path: 'workPeriods.payments', + query: { + match: { 'workPeriods.payments.id': entity.id } + } + } + } + } + }) + if (!resourceBooking.body.hits.total.value) { + throw new Error(`id: ${entity.id} "WorkPeriodPayment" not found`) + } + await esClient.update({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + id: resourceBooking.body.hits.hits[0]._id, + body: { + script: { + lang: 'painless', + source: 'def wp = ctx._source.workPeriods.find(workPeriod -> workPeriod.id == params.data.workPeriodId); wp.payments.removeIf(payment -> payment.id == params.data.id); wp.payments.add(params.data)', + params: { data: entity } + } + }, + refresh: 'wait_for' + }) +} + +module.exports = { + processCreate, + processUpdate +} diff --git a/src/esProcessors/WorkPeriodProcessor.js b/src/esProcessors/WorkPeriodProcessor.js new file mode 100644 index 00000000..2fc7261f --- /dev/null +++ b/src/esProcessors/WorkPeriodProcessor.js @@ -0,0 +1,132 @@ +/** + * WorkPeriod Processor + */ + +const helper = require('../common/helper') +const config = require('config') +const esClient = helper.getESClient() + +/** + * Process create entity + * @param {Object} entity entity object + */ +async function processCreate (entity, options) { + // Find related resourceBooking + const resourceBooking = await esClient.getSource({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + id: entity.resourceBookingId + }) + await esClient.update({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + id: resourceBooking.body.id, + body: { + script: { + lang: 'painless', + source: 'if(!ctx._source.containsKey("workPeriods") || ctx._source.workPeriods == null){ctx._source["workPeriods"]=[]}ctx._source.workPeriods.add(params.workPeriod)', + params: { workPeriod: entity } + } + }, + refresh: 'wait_for' + }) +} + +/** + * Process update entity + * @param {Object} entity entity object + */ +async function processUpdate (entity) { + // find workPeriod in it's parent ResourceBooking + const resourceBooking = await esClient.search({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + body: { + query: { + nested: { + path: 'workPeriods', + query: { + match: { 'workPeriods.id': entity.id } + } + } + } + } + }) + if (!resourceBooking.body.hits.total.value) { + throw new Error(`id: ${entity.id} "WorkPeriod" not found`) + } + await esClient.update({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + id: resourceBooking.body.hits.hits[0]._id, + body: { + script: { + lang: 'painless', + source: 'def wp = ctx._source.workPeriods.find(workPeriod -> workPeriod.id == params.data.id); ctx._source.workPeriods.removeIf(workPeriod -> workPeriod.id == params.data.id); params.data.payments = wp.payments; ctx._source.workPeriods.add(params.data)', + params: { data: entity } + } + }, + refresh: 'wait_for' + }) +} + +/** + * Process delete entity + * @param {Object} entity entity object + */ +async function processDelete (entity) { + // Find related ResourceBooking + const resourceBooking = await esClient.search({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + body: { + query: { + nested: { + path: 'workPeriods', + query: { + match: { 'workPeriods.id': entity.id } + } + } + } + } + }) + if (!resourceBooking.body.hits.total.value) { + const resourceBookingId = entity.key.replace('resourceBooking.id:', '') + if (resourceBookingId) { + try { + await esClient.getSource({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + id: resourceBookingId + }) + if (!resourceBooking) { + return + } + + throw new Error(`id: ${entity.id} "WorkPeriod" not found`) + } catch (e) { + // if ResourceBooking is deleted, ignore + if (e.message === 'resource_not_found_exception') { + return + } + throw e + } + } + // if ResourceBooking is deleted, ignore, else throw error + if (resourceBooking) { + throw new Error(`id: ${entity.id} "WorkPeriod" not found`) + } + } + await esClient.update({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + id: resourceBooking.body.hits.hits[0]._id, + body: { + script: { + lang: 'painless', + source: 'ctx._source.workPeriods.removeIf(workPeriod -> workPeriod.id == params.data.id)', + params: { data: entity } + } + }, + refresh: 'wait_for' + }) +} + +module.exports = { + processCreate, + processUpdate, + processDelete +} diff --git a/src/services/ResourceBookingService.js b/src/services/ResourceBookingService.js index 46d2fe62..a22b453c 100644 --- a/src/services/ResourceBookingService.js +++ b/src/services/ResourceBookingService.js @@ -13,6 +13,11 @@ const helper = require('../common/helper') const logger = require('../common/logger') const errors = require('../common/errors') const models = require('../models') +const { + processCreate, + processUpdate, + processDelete +} = require('../esProcessors/ResourceBookingProcessor') const constants = require('../../app-constants') const moment = require('moment') @@ -22,6 +27,8 @@ const WorkPeriodPayment = models.WorkPeriodPayment const esClient = helper.getESClient() const cachedModelFields = _cacheModelFields() +const sequelize = models.sequelize + /** * Get the fields of the ResourceBooking model and the nested WorkPeriod model * @returns {Array} array of field names @@ -342,9 +349,21 @@ async function createResourceBooking (currentUser, resourceBooking) { resourceBooking.id = uuid() resourceBooking.createdBy = await helper.getUserId(currentUser.userId) - const created = await ResourceBooking.create(resourceBooking) - await helper.postEvent(config.TAAS_RESOURCE_BOOKING_CREATE_TOPIC, created.toJSON()) - return created.dataValues + let entity + try { + await sequelize.transaction(async (t) => { + const created = await ResourceBooking.create(resourceBooking, { transaction: t }) + entity = created.toJSON() + await processCreate(entity) + }) + } catch (e) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'resourcebooking.create') + } + throw e + } + await helper.postEvent(config.TAAS_RESOURCE_BOOKING_CREATE_TOPIC, entity) + return entity } createResourceBooking.schema = Joi.object().keys({ @@ -394,9 +413,22 @@ async function updateResourceBooking (currentUser, id, data) { data.updatedBy = await helper.getUserId(currentUser.userId) - const updated = await resourceBooking.update(data) - await helper.postEvent(config.TAAS_RESOURCE_BOOKING_UPDATE_TOPIC, updated.toJSON(), { oldValue: oldValue }) - return updated.dataValues + let entity + try { + await sequelize.transaction(async (t) => { + const updated = await resourceBooking.update(data, { transaction: t }) + + entity = updated.toJSON() + await processUpdate(entity) + }) + } catch (e) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'resourcebooking.update') + } + throw e + } + await helper.postEvent(config.TAAS_RESOURCE_BOOKING_UPDATE_TOPIC, entity, { oldValue: oldValue }) + return entity } /** @@ -484,7 +516,16 @@ async function deleteResourceBooking (currentUser, id) { // we can't delete workperiods with paymentStatus 'partially-completed' or 'completed'. await _ensurePaidWorkPeriodsNotDeleted(id) const resourceBooking = await ResourceBooking.findById(id) - await resourceBooking.destroy() + + try { + await sequelize.transaction(async (t) => { + await resourceBooking.destroy({ transaction: t }) + await processDelete({ id }) + }) + } catch (e) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, { id }, 'resourcebooking.delete') + throw e + } await helper.postEvent(config.TAAS_RESOURCE_BOOKING_DELETE_TOPIC, { id }) } diff --git a/src/services/WorkPeriodPaymentService.js b/src/services/WorkPeriodPaymentService.js index 5e0f6a3e..ca873620 100644 --- a/src/services/WorkPeriodPaymentService.js +++ b/src/services/WorkPeriodPaymentService.js @@ -15,6 +15,12 @@ const errors = require('../common/errors') const models = require('../models') const { WorkPeriodPaymentStatus } = require('../../app-constants') const { searchResourceBookings } = require('./ResourceBookingService') +const { + processCreate, + processUpdate +} = require('../esProcessors/WorkPeriodPaymentProcessor') + +const sequelize = models.sequelize const WorkPeriodPayment = models.WorkPeriodPayment const WorkPeriod = models.WorkPeriod @@ -89,9 +95,23 @@ async function _createSingleWorkPeriodPaymentWithWorkPeriodAndResourceBooking (w workPeriodPayment.status = WorkPeriodPaymentStatus.SCHEDULED workPeriodPayment.createdBy = createdBy - const created = await WorkPeriodPayment.create(workPeriodPayment) - await helper.postEvent(config.TAAS_WORK_PERIOD_PAYMENT_CREATE_TOPIC, created.toJSON(), { key: `workPeriodPayment.billingAccountId:${workPeriodPayment.billingAccountId}` }) - return created.dataValues + const key = `workPeriodPayment.billingAccountId:${workPeriodPayment.billingAccountId}` + + let entity + try { + await sequelize.transaction(async (t) => { + const created = await WorkPeriodPayment.create(workPeriodPayment, { transaction: t }) + entity = created.toJSON() + await processCreate({ ...entity, key }) + }) + } catch (err) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'workperiodpayment.create') + } + throw err + } + await helper.postEvent(config.TAAS_WORK_PERIOD_PAYMENT_CREATE_TOPIC, entity, { key }) + return entity } /** @@ -233,9 +253,24 @@ async function updateWorkPeriodPayment (currentUser, id, data) { } } data.updatedBy = await helper.getUserId(currentUser.userId) - const updated = await workPeriodPayment.update(data) - await helper.postEvent(config.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC, updated.toJSON(), { oldValue: oldValue, key: `workPeriodPayment.billingAccountId:${updated.billingAccountId}` }) - return updated.dataValues + + const key = `workPeriodPayment.billingAccountId:${workPeriodPayment.billingAccountId}` + let entity + try { + await sequelize.transaction(async (t) => { + const updated = await workPeriodPayment.update(data, { transaction: t }) + entity = updated.toJSON() + + await processUpdate({ ...entity, key }) + }) + } catch (e) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'workperiodpayment.update') + } + throw e + } + await helper.postEvent(config.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC, entity, { oldValue: oldValue, key }) + return entity } /** diff --git a/src/services/WorkPeriodService.js b/src/services/WorkPeriodService.js index 8d018fb0..ea2cc61f 100644 --- a/src/services/WorkPeriodService.js +++ b/src/services/WorkPeriodService.js @@ -12,12 +12,19 @@ const helper = require('../common/helper') const logger = require('../common/logger') const errors = require('../common/errors') const models = require('../models') +const { + processCreate, + processUpdate, + processDelete +} = require('../esProcessors/WorkPeriodProcessor') const constants = require('../../app-constants') const moment = require('moment') const WorkPeriod = models.WorkPeriod const esClient = helper.getESClient() +const sequelize = models.sequelize + // "startDate" and "endDate" should always represent one week: // "startDate" should be always Monday and "endDate" should be always Sunday of the same week. // It should not include time or timezone, only date. @@ -221,19 +228,27 @@ async function createWorkPeriod (workPeriod) { workPeriod.id = uuid.v4() workPeriod.createdBy = config.m2m.M2M_AUDIT_USER_ID - let created = null + const key = `resourceBooking.id:${workPeriod.resourceBookingId}` + + let entity try { - created = await WorkPeriod.create(workPeriod) + await sequelize.transaction(async (t) => { + const created = await WorkPeriod.create(workPeriod, { transaction: t }) + entity = created.toJSON() + await processCreate({ ...entity, key }) + }) } catch (err) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'workperiod.create') + } if (!_.isUndefined(err.original)) { throw new errors.BadRequestError(err.original.detail) } else { throw err } } - - await helper.postEvent(config.TAAS_WORK_PERIOD_CREATE_TOPIC, created.toJSON(), { key: `resourceBooking.id:${workPeriod.resourceBookingId}` }) - return created.dataValues + await helper.postEvent(config.TAAS_WORK_PERIOD_CREATE_TOPIC, entity, { key }) + return entity } createWorkPeriod.schema = Joi.object().keys({ @@ -278,11 +293,26 @@ async function updateWorkPeriod (currentUser, id, data) { } data.paymentStatus = helper.calculateWorkPeriodPaymentStatus(_.assign({}, oldValue, data)) data.updatedBy = await helper.getUserId(currentUser.userId) - const updated = await workPeriod.update(data) - const updatedDataWithoutPayments = _.omit(updated.toJSON(), ['payments']) + + const key = `resourceBooking.id:${workPeriod.resourceBookingId}` + let entity + try { + await sequelize.transaction(async (t) => { + const updated = await workPeriod.update(data, { transaction: t }) + entity = updated.toJSON() + + entity = _.omit(entity, ['payments']) + await processUpdate({ ...entity, key }) + }) + } catch (e) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'workperiod.update') + } + throw e + } const oldValueWithoutPayments = _.omit(oldValue, ['payments']) - await helper.postEvent(config.TAAS_WORK_PERIOD_UPDATE_TOPIC, updatedDataWithoutPayments, { oldValue: oldValueWithoutPayments, key: `resourceBooking.id:${updated.resourceBookingId}` }) - return updatedDataWithoutPayments + await helper.postEvent(config.TAAS_WORK_PERIOD_UPDATE_TOPIC, entity, { oldValue: oldValueWithoutPayments, key }) + return entity } /** @@ -313,13 +343,24 @@ async function deleteWorkPeriod (id) { if (_.some(workPeriod.payments, payment => constants.ActiveWorkPeriodPaymentStatuses.indexOf(payment.status) !== -1)) { throw new errors.BadRequestError(`Can't delete WorkPeriod as it has associated WorkPeriodsPayment with one of statuses ${constants.ActiveWorkPeriodPaymentStatuses.join(', ')}`) } - await models.WorkPeriodPayment.destroy({ - where: { - workPeriodId: id - } - }) - await workPeriod.destroy() - await helper.postEvent(config.TAAS_WORK_PERIOD_DELETE_TOPIC, { id }, { key: `resourceBooking.id:${workPeriod.resourceBookingId}` }) + + const key = `resourceBooking.id:${workPeriod.resourceBookingId}` + try { + await sequelize.transaction(async (t) => { + await models.WorkPeriodPayment.destroy({ + where: { + workPeriodId: id + }, + transaction: t + }) + await workPeriod.destroy({ transaction: t }) + await processDelete({ id, key }) + }) + } catch (e) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, { id }, 'workperiod.delete') + throw e + } + await helper.postEvent(config.TAAS_WORK_PERIOD_DELETE_TOPIC, { id }, { key }) } deleteWorkPeriod.schema = Joi.object().keys({