Skip to content

Commit

Permalink
fixed some label typo and added trycatches around
Browse files Browse the repository at this point in the history
  • Loading branch information
vgenev committed Jan 7, 2019
1 parent 85c321c commit 795891d
Show file tree
Hide file tree
Showing 5 changed files with 253 additions and 190 deletions.
2 changes: 1 addition & 1 deletion src/lib/database.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const setupDb = () => {
Mongoose.set('useNewUrlParser', true)
Mongoose.set('useCreateIndex', true)
const connectionString = config.mongo.user ? `mongodb://${config.mongo.user}:${config.mongo.password}@${config.mongo.uri}/${config.mongo.database}` :
`mongodb://${config.mongo.uri}/${config.mongo.database}`
`mongodb://${config.mongo.uri}/${config.mongo.database}`
Mongoose.connect(`${connectionString}`, { useFindAndModify: false, useNewUrlParser: true, useCreateIndex: true })
db.on('error', console.error.bind(console, 'connection error'))
db.once('open', function callback () {
Expand Down
16 changes: 10 additions & 6 deletions src/observables/actions.js
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,16 @@ const actionObservable = ({ action, params, message }) => {
}

const clearRepetitionTask = async function (actionId) { // clears the timesTriggered after delay is reached if action is still active
let action = await ActionModel.findById(actionId).populate('eventType')
let limit = await LimitModel.findOne({ type: action.eventType.limitType, name: action.eventType.name, currency: action.eventType.currency })
if (action.isActive && limit) {
action.timesTriggered = 1
action.save()
this.schedule(actionId, resetPeriod * 60 * 1000)
try {
let action = await ActionModel.findById(actionId).populate('eventType')
let limit = await LimitModel.findOne({ type: action.eventType.limitType, name: action.eventType.name, currency: action.eventType.currency })
if (action.isActive && limit) {
action.timesTriggered = 1
action.save()
this.schedule(actionId, resetPeriod * 60 * 1000)
}
} catch (err) {
throw err
}
}

Expand Down
182 changes: 113 additions & 69 deletions src/observables/centralLedgerAPI.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,92 +48,132 @@ const getPositionsFromResponse = positions => {

const prepareCurrentPosition = (name, positions, limits, transferId, messagePayload) => {
let viewsArray = []
limits.forEach(limit => {
const percentage = 100 - (positions[limit.currency] * 100 / limit.value)
let currentPosition = {
name,
currency: limit.currency,
positionValue: positions[limit.currency],
percentage,
transferId,
messagePayload
}
viewsArray.push(currentPosition)
})
return viewsArray
try {
limits.forEach(limit => {
const percentage = 100 - (positions[limit.currency] * 100 / limit.value)
let currentPosition = {
name,
currency: limit.currency,
positionValue: positions[limit.currency],
percentage,
transferId,
messagePayload
}
viewsArray.push(currentPosition)
})
return viewsArray
} catch (err) {
throw err
}
}

// TODO prepare a check for fresh limits and update them only if its necessary

const updateLimitsFromResponse = async (name, limits) => {
let result = []
for (let limit of limits) {
let doc = await LimitModel.findOne({ name: name, currency: limit.currency, type: limit.limit.type })
let limitObject = {
name,
currency: limit.currency,
type: limit.limit.type,
value: limit.limit.value,
threshold: limit.limit.alarmPercentage
}
if (doc) {
doc.oldValue = doc.value
doc.value = limit.limit.value
await doc.save()
result.push(doc.toObject())
} else {
let document = await LimitModel.create(limitObject)
result.push(document.toObject())
try {
for (let limit of limits) {
let doc = await LimitModel.findOne({ name: name, currency: limit.currency, type: limit.limit.type })
let limitObject = {
name,
currency: limit.currency,
type: limit.limit.type,
value: limit.limit.value,
threshold: limit.limit.alarmPercentage
}
if (doc) {
doc.oldValue = doc.value
doc.value = limit.limit.value
await doc.save()
result.push(doc.toObject())
} else {
let document = await LimitModel.create(limitObject)
result.push(document.toObject())
}
}
return result
} catch (err) {
throw err
}
return result
}

const createEventsForParticipant = async (name, limits) => {
for (let limit of limits) {
let notificationActions = Enums.limitNotificationMap[limit.type]
for (let key in notificationActions) {
if (key !== 'enum') {
let eventRecord = await EventModel.findOne({ name, currency: limit.currency, limitType: limit.type, notificationEndpointType: key })
if (!eventRecord) {
const newEvent = {
name,
currency: limit.currency,
notificationEndpointType: key,
limitType: limit.type,
action: notificationActions[key].action,
templateType: notificationActions[key].templateType,
language: notificationActions[key].language
try {
for (let limit of limits) {
let notificationActions = Enums.limitNotificationMap[limit.type]
for (let key in notificationActions) {
if (key !== 'enum') {
let eventRecord = await EventModel.findOne({ name, currency: limit.currency, limitType: limit.type, notificationEndpointType: key })
if (!eventRecord) {
const newEvent = {
name,
currency: limit.currency,
notificationEndpointType: key,
limitType: limit.type,
action: notificationActions[key].action,
templateType: notificationActions[key].templateType,
language: notificationActions[key].language
}
await EventModel.create(newEvent)
}
await EventModel.create(newEvent)
}
}
}
} catch (err) {
throw err
}
}

// const updateNotificationEndpointsFromResponse = async (name, notificationEndpoints) => {
// let result = []
// let notificationEndPointObject = {}
// for (let notificationEndpoint of notificationEndpoints) {
// let notificationRecord = await NotificationEndpointModel.findOne({ name: name, type: notificationEndpoint.type })
// let action = Enums.notificationActionMap[notificationEndpoint.type] ? Enums.notificationActionMap[notificationEndpoint.type].action : ''
// let document
// if (!notificationRecord) {
// notificationEndPointObject = {
// name,
// type: notificationEndpoint.type,
// value: notificationEndpoint.value,
// action
// }
// document = await NotificationEndpointModel.create(notificationEndPointObject)
// } else {
// notificationRecord.type = notificationEndpoint.type
// notificationRecord.action = action
// document = await notificationRecord.save()
// }
// result.push(document.toObject())
// }
// return result
// }

const updateNotificationEndpointsFromResponse = async (name, notificationEndpoints) => {
let result = []
let notificationEndPointObject = {}
for (let notificationEndpoint of notificationEndpoints) {
let notificationRecord = await NotificationEndpointModel.findOne({ name: name, type: notificationEndpoint.type })
let action = Enums.notificationActionMap[notificationEndpoint.type] ? Enums.notificationActionMap[notificationEndpoint.type].action : ''
let document
if (!notificationRecord) {
notificationEndPointObject = {
name,
type: notificationEndpoint.type,
value: notificationEndpoint.value,
action
}
document = await NotificationEndpointModel.create(notificationEndPointObject)
} else {
notificationRecord.type = notificationEndpoint.type
notificationRecord.action = action
document = await notificationRecord.save()
try {
for (let notificationEndpoint of notificationEndpoints) {
let action = Enums.notificationActionMap[notificationEndpoint.type] ? Enums.notificationActionMap[notificationEndpoint.type].action : ''
let notificationRecord = await NotificationEndpointModel
.findOneAndUpdate({
name,
type: notificationEndpoint.type
},
{
name,
type: notificationEndpoint.type,
value: notificationEndpoint.value,
action
}, {
upsert: true,
new: true
})
result.push(notificationRecord.toObject())
}
result.push(document.toObject())
} catch (err) {
throw err
}
// console.log('getNotificationEndpointsFromResponse' + JSON.stringify(result))
return result
}

Expand Down Expand Up @@ -176,11 +216,15 @@ const requestLimitPerName = async (name) => {

const getLimitPerNameObservable = (name) => {
return Rx.Observable.create(async observer => {
const limitResponse = await requestLimitPerName(name)
const limits = await updateLimitsFromResponse(name, limitResponse)
await createEventsForParticipant(name, limits)
observer.next(limits)
observer.complete()
try {
const limitResponse = await requestLimitPerName(name)
const limits = await updateLimitsFromResponse(name, limitResponse)
await createEventsForParticipant(name, limits)
observer.next(limits)
observer.complete()
} catch (err) {
observer.error(err)
}
})
}

Expand Down
122 changes: 65 additions & 57 deletions src/observables/rules/ndcAdjustment.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,76 +35,84 @@ let engine = new RuleEngine.Engine()
const createRules = async (limit) => {
let rules = []
let { name, currency, type } = limit
let dbEvent = await EventModel.findOne({
name,
currency,
limitType: type,
notificationEndpointType: `${type}_ADJUSTMENT`,
isActive: true
})
try {
let dbEvent = await EventModel.findOne({
name,
currency,
limitType: type,
notificationEndpointType: `${type}_ADJUSTMENT_EMAIL`,
isActive: true
})

let conditions = {
all: [{
fact: 'name',
operator: 'equal',
value: name
}, {
fact: 'type',
operator: 'equal',
value: limit.type
}, {
fact: 'value',
operator: 'notEqual',
value: limit.oldValue
}]
}
let conditions = {
all: [{
fact: 'name',
operator: 'equal',
value: name
}, {
fact: 'type',
operator: 'equal',
value: limit.type
}, {
fact: 'value',
operator: 'notEqual',
value: limit.oldValue
}]
}

let event = {
type: `${type}_ADJUSTMENT`,
params: {
dfsp: name,
limitType: type,
value: limit.value,
currency: limit.currency,
triggeredBy: limit._id,
repetitionsAllowed: limit.repetitions,
fromEvent: dbEvent.id,
action: dbEvent.action,
notificationEndpointType: dbEvent.notificationEndpointType,
templateType: dbEvent.templateType,
language: dbEvent.language,
messageSubject: `${type} LIMIT ADJUSTMENT`
let event = {
type: `${type}_ADJUSTMENT`,
params: {
dfsp: name,
limitType: type,
value: limit.value,
currency: limit.currency,
triggeredBy: limit._id,
repetitionsAllowed: limit.repetitions,
fromEvent: dbEvent.id,
action: dbEvent.action,
notificationEndpointType: dbEvent.notificationEndpointType,
templateType: dbEvent.templateType,
language: dbEvent.language,
messageSubject: `${type} LIMIT ADJUSTMENT`
}
}
let adjustmentRule = new RuleEngine.Rule({ conditions, event })
rules.push(adjustmentRule)
return { rules, event }
} catch (err) {
throw err
}
let adjustmentRule = new RuleEngine.Rule({ conditions, event })
rules.push(adjustmentRule)
return { rules, event }
}

const ndcAdjustmentObservable = (limits) => {
for (let limit of limits) {
return Rx.Observable.create(async observer => {
let { rules, event } = await createRules(limit)
rules.forEach(rule => engine.addRule(rule))
let actions = await engine.run(limit)
if (actions.length) {
actions.forEach(action => {
observer.next({
action: 'produceToKafkaTopic',
params: action.params
try {
let { rules, event } = await createRules(limit)
rules.forEach(rule => engine.addRule(rule))
let actions = await engine.run(limit)
if (actions.length) {
actions.forEach(action => {
observer.next({
action: 'produceToKafkaTopic',
params: action.params
})
})
})
} else {
observer.next({ action: 'finish' })
let activeActions = await ActionModel.find({ fromEvent: event.params.fromEvent, isActive: true })
if (activeActions.length) {
for (let activeAction of activeActions) {
await ActionModel.findByIdAndUpdate(activeAction.id, { isActive: false })
} else {
observer.next({ action: 'finish' })
let activeActions = await ActionModel.find({ fromEvent: event.params.fromEvent, isActive: true })
if (activeActions.length) {
for (let activeAction of activeActions) {
await ActionModel.findByIdAndUpdate(activeAction.id, { isActive: false })
}
}
}
rules.forEach(rule => engine.removeRule(rule))
observer.complete()
} catch (err) {
observer.error(err)
}
rules.forEach(rule => engine.removeRule(rule))
observer.complete()
})
}
}
Expand Down
Loading

0 comments on commit 795891d

Please sign in to comment.