Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions config/default.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ module.exports = {
KAFKA_ERROR_TOPIC: process.env.KAFKA_ERROR_TOPIC || 'common.error.reporting',
// The originator value for the kafka messages
KAFKA_MESSAGE_ORIGINATOR: process.env.KAFKA_MESSAGE_ORIGINATOR || 'taas-api',

// topics for error
TAAS_ERROR_TOPIC: process.env.UBAHN_ERROR_TOPIC || 'taas.action.error',
// topics for job service
// the create job entity Kafka message topic
TAAS_JOB_CREATE_TOPIC: process.env.TAAS_JOB_CREATE_TOPIC || 'taas.job.create',
Expand Down
21 changes: 21 additions & 0 deletions src/common/helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -987,6 +987,26 @@ async function postEvent (topic, payload, options = {}) {
await eventDispatcher.handleEvent(topic, { value: payload, options })
}

/**
* Send error event to Kafka
* @params {String} topic the topic name
* @params {Object} payload the payload
* @params {String} action for which operation error occurred
*/
async function postErrorEvent (topic, payload, action) {
_.set(payload, 'apiAction', action)
const client = getBusApiClient()
const message = {
topic,
originator: config.KAFKA_MESSAGE_ORIGINATOR,
timestamp: new Date().toISOString(),
'mime-type': 'application/json',
payload
}
logger.debug(`Publish error to Kafka topic ${topic}, ${JSON.stringify(message, null, 2)}`)
await client.postEvent(message)
}

/**
* Test if an error is document missing exception
*
Expand Down Expand Up @@ -2036,6 +2056,7 @@ module.exports = {
getM2MToken,
getM2MUbahnToken,
postEvent,
postErrorEvent,
getBusApiClient,
isDocumentMissingException,
getProjects,
Expand Down
54 changes: 54 additions & 0 deletions src/esProcessors/ResourceBookingProcessor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* ResourceBooking Processor
*/

const helper = require('../common/helper')
const config = require('config')

const esClient = helper.getESClient()

/**
* Process create entity message
* @param {Object} entity entity object
*/
async function processCreate (entity) {
await esClient.create({
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
id: entity.id,
body: entity,
refresh: 'wait_for'
})
}

/**
* Process update entity message
* @param {Object} entity entity object
*/
async function processUpdate (entity) {
await esClient.update({
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
id: entity.id,
body: {
doc: entity
},
refresh: 'wait_for'
})
}

/**
* Process delete entity message
* @param {Object} entity entity object
*/
async function processDelete (entity) {
await esClient.delete({
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
id: entity.id,
refresh: 'wait_for'
})
}

module.exports = {
processCreate,
processUpdate,
processDelete
}
85 changes: 85 additions & 0 deletions src/esProcessors/WorkPeriodPaymentProcessor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* WorkPeriodPayment Processor
*/

const config = require('config')
const helper = require('../common/helper')

const esClient = helper.getESClient()

/**
* Process create entity
* @param {Object} entity entity object
*/
async function processCreate (entity) {
// find related resourceBooking
const resourceBooking = await esClient.search({
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
body: {
query: {
nested: {
path: 'workPeriods',
query: {
match: { 'workPeriods.id': entity.workPeriodId }
}
}
}
}
})
if (!resourceBooking.body.hits.total.value) {
throw new Error(`id: ${entity.workPeriodId} "WorkPeriod" not found`)
}
await esClient.update({
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
id: resourceBooking.body.hits.hits[0]._id,
body: {
script: {
lang: 'painless',
source: 'def wp = ctx._source.workPeriods.find(workPeriod -> workPeriod.id == params.workPeriodPayment.workPeriodId); if(!wp.containsKey("payments") || wp.payments == null){wp["payments"]=[]}wp.payments.add(params.workPeriodPayment)',
params: { workPeriodPayment: entity }
}
},
refresh: 'wait_for'
})
}

/**
* Process update entity
* @param {Object} entity entity object
*/
async function processUpdate (entity) {
// find workPeriodPayment in it's parent ResourceBooking
const resourceBooking = await esClient.search({
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
body: {
query: {
nested: {
path: 'workPeriods.payments',
query: {
match: { 'workPeriods.payments.id': entity.id }
}
}
}
}
})
if (!resourceBooking.body.hits.total.value) {
throw new Error(`id: ${entity.id} "WorkPeriodPayment" not found`)
}
await esClient.update({
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
id: resourceBooking.body.hits.hits[0]._id,
body: {
script: {
lang: 'painless',
source: 'def wp = ctx._source.workPeriods.find(workPeriod -> workPeriod.id == params.data.workPeriodId); wp.payments.removeIf(payment -> payment.id == params.data.id); wp.payments.add(params.data)',
params: { data: entity }
}
},
refresh: 'wait_for'
})
}

module.exports = {
processCreate,
processUpdate
}
132 changes: 132 additions & 0 deletions src/esProcessors/WorkPeriodProcessor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/**
* WorkPeriod Processor
*/

const helper = require('../common/helper')
const config = require('config')
const esClient = helper.getESClient()

/**
* Process create entity
* @param {Object} entity entity object
*/
async function processCreate (entity, options) {
// Find related resourceBooking
const resourceBooking = await esClient.getSource({
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
id: entity.resourceBookingId
})
await esClient.update({
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
id: resourceBooking.body.id,
body: {
script: {
lang: 'painless',
source: 'if(!ctx._source.containsKey("workPeriods") || ctx._source.workPeriods == null){ctx._source["workPeriods"]=[]}ctx._source.workPeriods.add(params.workPeriod)',
params: { workPeriod: entity }
}
},
refresh: 'wait_for'
})
}

/**
* Process update entity
* @param {Object} entity entity object
*/
async function processUpdate (entity) {
// find workPeriod in it's parent ResourceBooking
const resourceBooking = await esClient.search({
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
body: {
query: {
nested: {
path: 'workPeriods',
query: {
match: { 'workPeriods.id': entity.id }
}
}
}
}
})
if (!resourceBooking.body.hits.total.value) {
throw new Error(`id: ${entity.id} "WorkPeriod" not found`)
}
await esClient.update({
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
id: resourceBooking.body.hits.hits[0]._id,
body: {
script: {
lang: 'painless',
source: 'def wp = ctx._source.workPeriods.find(workPeriod -> workPeriod.id == params.data.id); ctx._source.workPeriods.removeIf(workPeriod -> workPeriod.id == params.data.id); params.data.payments = wp.payments; ctx._source.workPeriods.add(params.data)',
params: { data: entity }
}
},
refresh: 'wait_for'
})
}

/**
* Process delete entity
* @param {Object} entity entity object
*/
async function processDelete (entity) {
// Find related ResourceBooking
const resourceBooking = await esClient.search({
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
body: {
query: {
nested: {
path: 'workPeriods',
query: {
match: { 'workPeriods.id': entity.id }
}
}
}
}
})
if (!resourceBooking.body.hits.total.value) {
const resourceBookingId = entity.key.replace('resourceBooking.id:', '')
if (resourceBookingId) {
try {
await esClient.getSource({
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
id: resourceBookingId
})
if (!resourceBooking) {
return
}

throw new Error(`id: ${entity.id} "WorkPeriod" not found`)
} catch (e) {
// if ResourceBooking is deleted, ignore
if (e.message === 'resource_not_found_exception') {
return
}
throw e
}
}
// if ResourceBooking is deleted, ignore, else throw error
if (resourceBooking) {
throw new Error(`id: ${entity.id} "WorkPeriod" not found`)
}
}
await esClient.update({
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
id: resourceBooking.body.hits.hits[0]._id,
body: {
script: {
lang: 'painless',
source: 'ctx._source.workPeriods.removeIf(workPeriod -> workPeriod.id == params.data.id)',
params: { data: entity }
}
},
refresh: 'wait_for'
})
}

module.exports = {
processCreate,
processUpdate,
processDelete
}
Loading