diff --git a/package-lock.json b/package-lock.json index c90c8ec..1d250cf 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "central-event-processor", - "version": "5.2.0", + "version": "5.3.0", "lockfileVersion": 1, "requires": true, "dependencies": { @@ -2888,8 +2888,7 @@ "ansi-regex": { "version": "2.1.1", "bundled": true, - "dev": true, - "optional": true + "dev": true }, "aproba": { "version": "1.2.0", @@ -2932,8 +2931,7 @@ "code-point-at": { "version": "1.1.0", "bundled": true, - "dev": true, - "optional": true + "dev": true }, "concat-map": { "version": "0.0.1", @@ -2944,8 +2942,7 @@ "console-control-strings": { "version": "1.1.0", "bundled": true, - "dev": true, - "optional": true + "dev": true }, "core-util-is": { "version": "1.0.2", @@ -3062,8 +3059,7 @@ "inherits": { "version": "2.0.3", "bundled": true, - "dev": true, - "optional": true + "dev": true }, "ini": { "version": "1.3.5", @@ -3075,7 +3071,6 @@ "version": "1.0.0", "bundled": true, "dev": true, - "optional": true, "requires": { "number-is-nan": "^1.0.0" } @@ -3105,7 +3100,6 @@ "version": "2.3.5", "bundled": true, "dev": true, - "optional": true, "requires": { "safe-buffer": "^5.1.2", "yallist": "^3.0.0" @@ -3124,7 +3118,6 @@ "version": "0.5.1", "bundled": true, "dev": true, - "optional": true, "requires": { "minimist": "0.0.8" } @@ -3218,7 +3211,6 @@ "version": "1.4.0", "bundled": true, "dev": true, - "optional": true, "requires": { "wrappy": "1" } @@ -3304,8 +3296,7 @@ "safe-buffer": { "version": "5.1.2", "bundled": true, - "dev": true, - "optional": true + "dev": true }, "safer-buffer": { "version": "2.1.2", @@ -3341,7 +3332,6 @@ "version": "1.0.2", "bundled": true, "dev": true, - "optional": true, "requires": { "code-point-at": "^1.0.0", "is-fullwidth-code-point": "^1.0.0", @@ -3361,7 +3351,6 @@ "version": "3.0.1", "bundled": true, "dev": true, - "optional": true, "requires": { "ansi-regex": "^2.0.0" } @@ -3405,14 +3394,12 @@ "wrappy": { "version": "1.0.2", "bundled": true, - "dev": true, - "optional": true + "dev": true }, "yallist": { "version": "3.0.3", "bundled": true, - "dev": true, - "optional": true + "dev": true } } }, diff --git a/package.json b/package.json index e5a7b97..9c3fed8 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "central-event-processor", - "version": "5.3.0", + "version": "5.5.0", "description": "CEP for Mojaloop Central-Ledger to monitor the notificaion kafka topic and act on it.", "main": "app.js", "dependencies": { diff --git a/settlementSubmitter.js b/settlementSubmitter.js index 51b79c5..62c22da 100644 --- a/settlementSubmitter.js +++ b/settlementSubmitter.js @@ -1,37 +1,140 @@ const Utility = require('./src/lib/utility') -let message = { - 'id': '86ce97cf-2302-4d64-8b0e-df5ae9355dd0', - 'from': 'central-switch', - 'to': 'dfsp2', - 'content': { - 'headers': { - 'Content-Type': 'application/json', - 'Date': '2019-01-17T14:46:48.918Z', - 'FSPIOP-Source': 'central-switch', - 'FSPIOP-Destination': 'dfsp2' +let messages = [ + { + 'id': '86ce97cf-2302-4d64-8b0e-df5ae9355dd0', + 'from': 'central-switch', + 'to': 'dfsp2', + 'content': { + 'headers': { + 'Content-Type': 'application/json', + 'Date': '2019-01-17T14:46:48.918Z', + 'FSPIOP-Source': 'central-switch', + 'FSPIOP-Destination': 'dfsp2' + }, + 'payload': { + 'currency': 'USD', + 'value': 0, + 'changedDate': '2019-01-17T14:46:48.918Z' + } + }, + 'type': 'application/json', + 'metadata': { + 'event': { + 'id': '5c7ff308-35be-46b0-b187-0bf4bee72cf8', + 'type': 'notification', + 'action': 'settlement-transfer-position-change', + 'state': { + 'status': 'success', + 'code': 0, + 'description': 'action successful' + } + }, + 'protocol.createdAt': 1547736408918 + } + }, + { + 'id': '86ce97cf-2302-4d64-8b0e-df5ae9355dd0', + 'from': 'central-switch', + 'to': 'dfsp1', + 'content': { + 'headers': { + 'Content-Type': 'application/json', + 'Date': '2019-01-17T14:46:48.918Z', + 'FSPIOP-Source': 'central-switch', + 'FSPIOP-Destination': 'dfsp2' + }, + 'payload': { + 'currency': 'USD', + 'value': 0, + 'changedDate': '2019-01-17T14:46:48.918Z' + } + }, + 'type': 'application/json', + 'metadata': { + 'event': { + 'id': '5c7ff308-35be-46b0-b187-0bf4bee72cf8', + 'type': 'notification', + 'action': 'settlement-transfer-position-change', + 'state': { + 'status': 'success', + 'code': 0, + 'description': 'action successful' + } + }, + 'protocol.createdAt': 1547736408918 + } + }, + { + 'id': '86ce97cf-2302-4d64-8b0e-df5ae9355dd0', + 'from': 'central-switch', + 'to': 'dfsp11', + 'content': { + 'headers': { + 'Content-Type': 'application/json', + 'Date': '2019-01-17T14:46:48.918Z', + 'FSPIOP-Source': 'central-switch', + 'FSPIOP-Destination': 'dfsp2' + }, + 'payload': { + 'currency': 'USD', + 'value': 0, + 'changedDate': '2019-01-17T14:46:48.918Z' + } }, - 'payload': { - 'currency': 'USD', - 'value': 0, - 'changedDate': '2019-01-17T14:46:48.918Z' + 'type': 'application/json', + 'metadata': { + 'event': { + 'id': '5c7ff308-35be-46b0-b187-0bf4bee72cf8', + 'type': 'notification', + 'action': 'settlement-transfer-position-change', + 'state': { + 'status': 'success', + 'code': 0, + 'description': 'action successful' + } + }, + 'protocol.createdAt': 1547736408918 } }, - 'type': 'application/json', - 'metadata': { - 'event': { - 'id': '5c7ff308-35be-46b0-b187-0bf4bee72cf8', - 'type': 'notification', - 'action': 'settlement-transfer-position-change', - 'state': { - 'status': 'success', - 'code': 0, - 'description': 'action successful' + { + 'id': '86ce97cf-2302-4d64-8b0e-df5ae9355dd0', + 'from': 'central-switch', + 'to': 'dfsp12', + 'content': { + 'headers': { + 'Content-Type': 'application/json', + 'Date': '2019-01-17T14:46:48.918Z', + 'FSPIOP-Source': 'central-switch', + 'FSPIOP-Destination': 'dfsp2' + }, + 'payload': { + 'currency': 'USD', + 'value': 0, + 'changedDate': '2019-01-17T14:46:48.918Z' } }, - 'protocol.createdAt': 1547736408918 + 'type': 'application/json', + 'metadata': { + 'event': { + 'id': '5c7ff308-35be-46b0-b187-0bf4bee72cf8', + 'type': 'notification', + 'action': 'settlement-transfer-position-change', + 'state': { + 'status': 'success', + 'code': 0, + 'description': 'action successful' + } + }, + 'protocol.createdAt': 1547736408918 + } } -} + +] + +let count = 0 + +const timeout = ms => new Promise(res => setTimeout(res, ms)) let messageSubimtter = async (message) => { try { @@ -41,10 +144,20 @@ let messageSubimtter = async (message) => { description: 'action successful' } await Utility.produceGeneralMessage('notification', 'settlement-transfer-position-change', message, success) - console.log('Message on kafka queue') + console.log('Message on kafka queue # ', ++count) } catch (err) { console.log('Message wasnt placed on Kaka queue : ' + err) } } -messageSubimtter(message) +let flood = async (count) => { + let arr = [] + for (let i = 0; i < count; i++) { + for (let message of messages) { + // await timeout(100) + await messageSubimtter(message) + } + } +} + +flood(20) diff --git a/src/rxretries.js b/src/rxretries.js new file mode 100644 index 0000000..65c4a7d --- /dev/null +++ b/src/rxretries.js @@ -0,0 +1,22 @@ +const Rx = require('rxjs') +const { filter, flatMap, retry, delay, retryWhen, repeat, repeatWhen, catchError } = require('rxjs/operators') + +let arrO = Rx.from([1, 2, 3, 4, 5, 6, 7]) + +const mapped = arrO.pipe( + flatMap(v => { + return Rx.from([2 * v]) + }), + delay(1000), + flatMap(v => { + if (v === 8) return Rx.throwError('error') + else return Rx.from([v]) + }), + catchError(e => { + return Rx.of(undefined) + }) + ) + +mapped.subscribe({ + next: v => console.log(v) +}) diff --git a/src/setup.js b/src/setup.js index 9874865..7100718 100644 --- a/src/setup.js +++ b/src/setup.js @@ -32,7 +32,7 @@ const Consumer = require('./lib/kafka/consumer') const Utility = require('./lib/utility') const Logger = require('@mojaloop/central-services-shared').Logger const Rx = require('rxjs') -const { filter, switchMap } = require('rxjs/operators') +const { filter, flatMap, catchError } = require('rxjs/operators') const Enum = require('./lib/enum') const TransferEventType = Enum.transferEventType const TransferEventAction = Enum.transferEventAction @@ -63,7 +63,7 @@ const setup = async () => { const topicObservable = Rx.Observable.create((observer) => { consumer.on('message', async (data) => { - Logger.info(`Central-Event-Processor :: Topic ${topicName} :: Payload: \n${JSON.stringify(data.value, null, 2)}`) + // Logger.info(`Central-Event-Processor :: Topic ${topicName} :: Payload: \n${JSON.stringify(data.value, null, 2)}`) observer.next(data) if (!Consumer.isConsumerAutoCommitEnabled(topicName)) { consumer.commitMessageSync(data) @@ -73,10 +73,14 @@ const setup = async () => { const generalObservable = topicObservable .pipe(filter(data => data.value.metadata.event.action === 'commit'), - switchMap(Observables.CentralLedgerAPI.getDfspNotificationEndpointsObservable), - switchMap(Observables.CentralLedgerAPI.getPositionsObservable), - switchMap(Observables.Rules.ndcBreachObservable), - switchMap(Observables.actionObservable)) + flatMap(Observables.CentralLedgerAPI.getDfspNotificationEndpointsObservable), + flatMap(Observables.CentralLedgerAPI.getPositionsObservable), + flatMap(Observables.Rules.ndcBreachObservable), + flatMap(Observables.actionObservable), + catchError(e => { + return Rx.onErrorResumeNext(generalObservable) + }) + ) generalObservable.subscribe({ next: async ({ actionResult, message }) => { @@ -93,10 +97,13 @@ const setup = async () => { const limitAdjustmentObservable = topicObservable .pipe(filter(data => data.value.metadata.event.action === 'limit-adjustment' && 'limit' in data.value.content.payload), - switchMap(Observables.CentralLedgerAPI.getDfspNotificationEndpointsForLimitObservable), - switchMap(Observables.Store.getLimitsPerNameObservable), - switchMap(Observables.Rules.ndcAdjustmentObservable), - switchMap(Observables.actionObservable) + flatMap(Observables.CentralLedgerAPI.getDfspNotificationEndpointsForLimitObservable), + flatMap(Observables.Store.getLimitsPerNameObservable), + flatMap(Observables.Rules.ndcAdjustmentObservable), + flatMap(Observables.actionObservable), + catchError(e => { + return Rx.onErrorResumeNext(limitAdjustmentObservable) + }) ) limitAdjustmentObservable.subscribe({ @@ -113,11 +120,18 @@ const setup = async () => { const settlementTransferPositionChangeObservable = topicObservable .pipe(filter(data => data.value.metadata.event.action === 'settlement-transfer-position-change'), - switchMap(Observables.CentralLedgerAPI.getParticipantEndpointsFromResponseObservable), - switchMap(Observables.actionObservable)) + flatMap(Observables.CentralLedgerAPI.getParticipantEndpointsFromResponseObservable), + flatMap(Observables.actionObservable), + // retry() + catchError(e => { + console.error(e) + return Rx.onErrorResumeNext(settlementTransferPositionChangeObservable) + }) + ) settlementTransferPositionChangeObservable.subscribe({ next: async ({ actionResult, message }) => { + Logger.info('WE ARE IN') if (!actionResult) { Logger.info(`action unsuccessful. Publishing the message to topic ${topicName}`) // TODO we should change the state and produce error message instead of republish?