Skip to content

Commit

Permalink
Feature/#306 #308 merged (#106)
Browse files Browse the repository at this point in the history
* Temp sidecar and token fix

* Delete deprecated and update first 3 scripts

* Migration scripts 9-11

* Migration scripts 12-14

* Migration scripts 15-16

* Migration scripts 22-24

* Migration scripts 17-21

* Migration scripts 17-21

* Migration scripts 17-21

* Migration scripts 25-27

* Migration scripts 34 and review fixes to ilpPacket

* Migration scripts 29

* fixed ilpCondition spelling error

* Migration scripts 31

* Migration scripts 28, 30

* Migration scripts 32-33

* Corrections and cleanup

* npm run migrate FIXES

* Fix /participant; Remove /charge, /party, /role, /fee

* Refactor: Move from domain to model & camelCase

* domain/transfer/models to models/transfer, seeds

* Changed transfer facade getById

* Transfer Prepare

* Transfer Commit

* Pre- tests clean

* Test files maintenance

* Additional clean up

* added unit test cases

* fixed dependancies into few tests

* Adding placeholder unit test files

* Fixed admin/index.js unit test

* Added tests for transferFulfilment and transferParticipant

* Tests for transferFulfilment

* Sinon.createSandbox

* Unit testing for transferExtension

* added test cases

* Unit tests for ilpPacket

* Unit tests for transfer/facade init - WIP

* fixes for tests and tweaks

* added test cases

* fixes for tests and tweaks (#1)

* removed unused test and fixed position test

* some fixes but still broken tests

* merged projections

* merged projections second attempt

* Feature/db model changes georgi (#2)

* fixes for tests and tweaks

* removed unused test and fixed position test

* some fixes but still broken tests

* Standard fixes

* fixes for producer and consumer. need to test properly

* add producer to list
made some addons

* Disable failing test scripts

* projection fixed

* deleted transform.test.js

* updated dependencies

* unit/models/transfer/facade.test.js

* fixed testPI2/unit/domain/participant/index.test.js

* testPI2/unit/handlers/transfers/handler.test.js

* unit/handlers/transfers/handler.test.js

* Feature/db model changes georgi (#3)

* fixes for tests and tweaks

* removed unused test and fixed position test

* some fixes but still broken tests

* fixes for producer and consumer

* add producer to list

* updated dependencies

* positions model layer

* Integration tests fix Part 1

* Fix & merge rmothilal

* Disable broken integration tests

* updating migration scripts to new knex standards for table creation

* Feature/db model changes georgi (#5)

* fixes for tests and tweaks

* removed unused test and fixed position test

* some fixes but still broken tests

* fixes for producer and consumer. need to test properly

* add producer to list
made some addons

* updated dependencies

* Feature/db model changes georgi (#3)

* fixes for tests and tweaks

* removed unused test and fixed position test

* some fixes but still broken tests

* fixes for producer and consumer

* add producer to list

* updated dependencies

* positions model layer

* updating migration scripts to new knex standards for table creation

* disconnect topicName

* Feature/#355integrations

@vgenev integration tests fixes

* Cleaning up code and renaming integration test file names

* Changes according to rmothilal PR review

* WIP

* premerge with Rajiv db changes

* WIP

* WIP

* WIP

* PositionHandler::positions::commit

* Avoid knex version warning: http://knexjs.org/#Schema-timestamps

* Fixing minor bugs

* fixes for prepare position still needs rework

* .then to await

* fixes. now writes to database and updates accordingly. minor defects to fix

* fixes to allow flow to continue to transfer handler

* removal of space

* Fulfil reject scenario

* Move projection to facade transaction

* Unit tests batch 1

* Transfer facade unit test coverage 100 percent

* temporary fix for sorting transfer state change Id

* Position facade unit test coverage 100 percent

* Inserting placeholders for integration tests

* refactoring the position/facade

* moar refactoring

* fixed array.from map callback

* refactored the integration tests for testPI2/integration/domain/participant/index.test.js (#7)

* position facade refactored and debuged. changed the test prepare route to return different transfers

* small issue on the handler fixed

* fixed some string to numbers parsing issues

* moar refactoring. a small one

* Integration test handlers reworked

* Updated registerHandler functions for positions and prepare to allow for specific registration of FSPs (#8)

* Updated registerHandler functions for positions and prepare to allow for specific registration of FSPs

* Removed comment and fixed jsdoc param name

* merging

* merged and commented out few tests

* handlers integration test additions

* integration tests additions

* Finilizing integration tests work

* added unit tests (#9)

* seeds and domain/participant 100

* Adding more unit tests to improve coverage

* Reduce wait time during unit testing for time.js

* Enable sidecar and auth keychain

* Disable dev seed

* adjusted the position calculations according the latest review

* adjusted positions calculations according the latest review take2

* merged with #308

* fixed handlers tests

* unit tests running ok now

* fixed 573 tests

* integration test fix

* fixes for tests as well as making save transfer prepare a transaction

* update node package.json for cache refresh on circleci
  • Loading branch information
vgenev authored and rmothilal committed Aug 3, 2018
1 parent fb4e09a commit cde464a
Show file tree
Hide file tree
Showing 22 changed files with 885 additions and 864 deletions.
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"pretest": "standard",
"standard": "standard",
"test:unit": "tape 'testPI2/unit/**/*.test.js'",
"test:int": "tape 'testPI2/integration/**/*.test.js'",
"test:all": "run-s test test:integration test:functional test:spec",
"test:xunit": "tape 'testPI2/unit/**/*.test.js' | tap-xunit",
"test:coverage": "istanbul cover tape -- 'testPI2/unit/**/*.test.js'",
Expand Down Expand Up @@ -87,7 +88,7 @@
"jsdoc": "^3.5.5",
"json2csv": "3.11.5",
"jsonwebtoken": "8.1.1",
"knex": "^0.15.0",
"knex": "^0.15.2",
"lodash": "4.17.4",
"moment": "2.21.0",
"mustache": "^2.3.0",
Expand Down
4 changes: 4 additions & 0 deletions src/domain/position/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,7 @@ exports.changeParticipantPosition = (participantCurrencyId, isIncrease, amount,
exports.generatePositionPlaceHolder = () => {
return true
}

exports.calculatePreparePositionsBatch = async (transferList) => {
return await PositionFacade.prepareChangeParticipantPositionTransaction(transferList)
}
10 changes: 2 additions & 8 deletions src/domain/transfer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,12 @@ const TransferStateChangeModel = require('../../models/transfer/transferStateCha
const TransferFulfilmentModel = require('../../models/transfer/transferFulfilment')
const SettlementFacade = require('../../models/settlement/facade')
const SettlementModel = require('../../models/settlement/settlement')
const Projection = require('./projection')
const TransferObjectTransform = require('./transform')
// const Enum = require('../../lib/enum')
const Events = require('../../lib/events')
const Errors = require('../../errors')

const prepare = async (payload, stateReason = null, hasPassedValidation = true) => {
try {
const result = await Projection.saveTransferPrepared(payload, stateReason, hasPassedValidation)
const t = TransferObjectTransform.toTransfer(result)
Events.emitTransferPrepared(t)
return {transfer: t}
return await TransferFacade.saveTransferPrepared(payload, stateReason, hasPassedValidation)
} catch (e) {
throw e
}
Expand Down Expand Up @@ -124,7 +118,7 @@ const settle = async () => {
const settledTransfers = SettlementModel.create(settlementId, 'transfer').then(() => {
return SettlementFacade.getSettleableTransfers().then(transfers => {
transfers.forEach(transfer => {
Projection.saveSettledTransfers({id: transfer.transferId, settlement_id: settlementId})
TransferFacade.saveSettledTransfers({id: transfer.transferId, settlement_id: settlementId})
})
return transfers
})
Expand Down
106 changes: 0 additions & 106 deletions src/domain/transfer/projection.js

This file was deleted.

56 changes: 53 additions & 3 deletions src/handlers/positions/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
* Lazola Lucas <lazola.lucas@modusbox.com>
* Rajiv Mothilal <rajiv.mothilal@modusbox.com>
* Miguel de Barros <miguel.debarros@modusbox.com>
* Valentin Genev <valentin.genev@modusbox.com>
--------------
******/
Expand Down Expand Up @@ -67,10 +68,13 @@ const positions = async (error, messages) => {
throw error
}
let message = {}
let prepareBatch = []
try {
if (Array.isArray(messages)) {
prepareBatch = messages
message = messages[0]
} else {
prepareBatch = [messages]
message = messages
}
Logger.info('PositionHandler::positions')
Expand All @@ -80,7 +84,21 @@ const positions = async (error, messages) => {
if (message.value.metadata.event.type === TransferEventType.POSITION && message.value.metadata.event.action === TransferEventAction.PREPARE) {
Logger.info('PositionHandler::positions::prepare')
consumer = Kafka.Consumer.getConsumer(Utility.transformAccountToTopicName(message.value.from, TransferEventType.POSITION, TransferEventAction.PREPARE))
await TransferService.saveTransferStateChange({transferId: payload.transferId, transferStateId: TransferState.RESERVED})
const { preparedMessagesList, limitAlarms } = await PositionService.calculatePreparePositionsBatch(prepareBatch)
for (let prepareMessage of preparedMessagesList) {
const { transferState, rawMessage } = prepareMessage
if (transferState.transferStateId === Enum.TransferState.RESERVED) {
await Utility.produceGeneralMessage(TransferEventType.TRANSFER, TransferEventAction.TRANSFER, rawMessage.value, Utility.ENUMS.STATE.SUCCESS)
} else {
await Utility.produceGeneralMessage(Utility.ENUMS.NOTIFICATION, Utility.ENUMS.EVENT, rawMessage.value, Utility.createState(Utility.ENUMS.STATE.FAILURE.status, 4001, transferState.reason))
}
await consumer.commitMessageSync(rawMessage)
}
for (let limit of limitAlarms) {
Logger.info(`Limit alarm should be sent with ${limit}`)
// Publish alarm message to KafkaTopic for the Hub to consume.The Hub rather than the switch will manage this (the topic is an participantEndpoint)
}
return true
} else if (message.value.metadata.event.type === TransferEventType.POSITION && message.value.metadata.event.action === TransferEventAction.COMMIT) {
Logger.info('PositionHandler::positions::commit')
consumer = Kafka.Consumer.getConsumer(Utility.transformAccountToTopicName(message.value.from, TransferEventType.POSITION, TransferEventType.FULFIL))
Expand Down Expand Up @@ -117,10 +135,42 @@ const positions = async (error, messages) => {
}
} else if (message.value.metadata.event.type === TransferEventType.POSITION && message.value.metadata.event.action === TransferEventAction.TIMEOUT_RECEIVED) {
Logger.info('PositionHandler::positions::timeoutPrepared')
consumer = Kafka.Consumer.getConsumer(Utility.transformAccountToTopicName(message.value.from, TransferEventType.POSITION, TransferEventAction.ABORT))
const transferInfo = await TransferService.getTransferInfoToChangePosition(payload.transferId, Enum.TransferParticipantRoleType.PAYEE_DFSP, Enum.LedgerEntryType.PRINCIPLE_VALUE)
if (transferInfo.transferStateId !== TransferState.EXPIRED) {
Logger.info('PositionHandler::positions::commit::validationFailed::notReceivedFulfilState')
// TODO: throw Error 2001
} else { // transfer state check success
const isIncrease = false
const transferStateChange = {
transferId: transferInfo.transferId,
transferStateId: TransferState.ABORTED,
reason: 'Client requested to use a transfer that has already expired.'
}
await PositionService.changeParticipantPosition(transferInfo.participantCurrencyId, isIncrease, transferInfo.amount, transferStateChange)
message.value.content.payload = Utility.createPrepareErrorStatus(3303, 'Client requested to use a transfer that has already expired.', message.value.content.payload.extensionList)
consumer = Kafka.Consumer.getConsumer(
await Utility.produceGeneralMessage(Utility.ENUMS.NOTIFICATION, Utility.ENUMS.EVENT, message.value, Utility.createState(Utility.ENUMS.STATE.FAILURE.status, 4001, transferStateChange.reason))
)
}
} else if (message.value.metadata.event.type === TransferEventType.POSITION && message.value.metadata.event.action === TransferEventAction.TIMEOUT_RESERVED) {
Logger.info('PositionHandler::positions::timeout')
consumer = Kafka.Consumer.getConsumer(Utility.transformAccountToTopicName(message.value.from, TransferEventType.POSITION, TransferEventAction.ABORT))
const transferInfo = await TransferService.getTransferInfoToChangePosition(payload.transferId, Enum.TransferParticipantRoleType.PAYEE_DFSP, Enum.LedgerEntryType.PRINCIPLE_VALUE)
if (transferInfo.transferStateId !== TransferState.EXPIRED) {
Logger.info('PositionHandler::positions::commit::validationFailed::notReceivedFulfilState')
// TODO: throw Error 2001
} else { // transfer state check success
const isIncrease = false
const transferStateChange = {
transferId: transferInfo.transferId,
transferStateId: TransferState.ABORTED,
reason: 'Client requested to use a transfer that has already expired.'
}
await PositionService.changeParticipantPosition(transferInfo.participantCurrencyId, isIncrease, transferInfo.amount, transferStateChange)
message.value.content.payload = Utility.createPrepareErrorStatus(3303, 'Client requested to use a transfer that has already expired.', message.value.content.payload.extensionList)
consumer = Kafka.Consumer.getConsumer(
await Utility.produceGeneralMessage(Utility.ENUMS.NOTIFICATION, Utility.ENUMS.EVENT, message.value, Utility.createState(Utility.ENUMS.STATE.FAILURE.status, 4001, transferStateChange.reason))
)
}
} else if (message.value.metadata.event.type === TransferEventType.POSITION && message.value.metadata.event.action === TransferEventAction.FAIL) {
Logger.info('PositionHandler::positions::fail')
consumer = Kafka.Consumer.getConsumer(Utility.transformAccountToTopicName(message.value.from, TransferEventType.POSITION, TransferEventAction.ABORT))
Expand Down
13 changes: 13 additions & 0 deletions src/handlers/transfers/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,19 @@ const transfer = async (error, messages) => {

await consumer.commitMessageSync(message)

return true
} else if (action.toLowerCase() === 'reject' && status.toLowerCase() === 'success') {
const consumer = Kafka.Consumer.getConsumer(Utility.transformGeneralTopicName(TransferEventType.TRANSFER, TransferEventAction.TRANSFER))

// send notification message to Payee
await Utility.produceGeneralMessage(Utility.ENUMS.NOTIFICATION, Utility.ENUMS.EVENT, message.value, Utility.ENUMS.STATE.SUCCESS)

// send notification message to Payer
// message.value.to = from
// await Utility.produceGeneralMessage(Utility.ENUMS.NOTIFICATION, Utility.ENUMS.EVENT, message.value, Utility.ENUMS.STATE.SUCCESS)

await consumer.commitMessageSync(message)

return true
} else {
Logger.warn('TransferService::transfer - Unknown event...nothing to do here')
Expand Down
3 changes: 3 additions & 0 deletions src/lib/enum.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ module.exports = {
EXPIRED: 'expired',
CANCELLED: 'cancelled'
},
limitType: {
NET_DEBIT_CAP: 1
},
transferEventStatus: {
SUCCESS: 'success',
FAILED: 'failed'
Expand Down
73 changes: 72 additions & 1 deletion src/models/participant/facade.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ const getByNameAndCurrency = async (name, currencyId) => {
.innerJoin('participantCurrency AS pc', 'pc.participantId', 'participant.participantId')
.select(
'participant.*',
'pc.participantCurrencyId'
'pc.participantCurrencyId',
'pc.currencyId'
)
.first()
})
Expand All @@ -50,6 +51,27 @@ const getByNameAndCurrency = async (name, currencyId) => {
}
}

const getParticipantLimitByParticipantIdAndCurrencyId = async (participantId, currencyId) => {
try {
return await Db.participant.query(async (builder) => {
return await builder
.where({
'participant.participantId': participantId,
'pc.currencyId': currencyId
})
.innerJoin('participantCurrency AS pc', 'pc.participantId', 'participant.participantId')
.innerJoin('participantLimit AS pl', 'pl.participantCurrencyId', 'pl.participantCurrencyId')
.select(
'participant.*',
'pc.*',
'pl.*'
)
})
} catch (e) {
throw e
}
}

/**
* @function GetEndpoint
*
Expand Down Expand Up @@ -165,6 +187,52 @@ const addEndpoint = async (participantId, endpoint) => {
}
}

const getParticipantLimitByParticipantCurrencyLimit = async (participantId, currencyId, participantLimitTypeId) => {
try {
return await Db.participant.query(async (builder) => {
return await builder
.where({
'participant.participantId': participantId,
'pc.currencyId': currencyId,
'pl.participantLimitTypeId': participantLimitTypeId,
'participant.isActive': 1,
'pc.IsActive': 1,
'pl.isActive': 1
})
.innerJoin('participantCurrency AS pc', 'pc.participantId', 'participant.participantId')
.innerJoin('participantLimit AS pl', 'pl.participantCurrencyId', 'pc.participantCurrencyId')
.select(
'participant.participantID AS participantId',
'pc.currencyId AS currencyId',
'pl.participantLimitTypeId as participantLimitTypeId',
'pl.value AS value'
).first()
})
} catch (e) {
throw e
}
}

const getParticipantPositionByParticipantIdAndCurrencyId = async (participantId, currencyId) => {
try {
return await Db.participant.query(async (builder) => {
return await builder
.where({
'participant.participantId': participantId,
'pc.currencyId': currencyId
})
.innerJoin('participantCurrency AS pc', 'pc.participantId', 'participant.participantId')
.innerJoin('participantPosition AS pp', 'pp.participantCurrencyId', 'pc.participantCurrencyId')
.select(
'participant.*',
'pc.*',
'pp.*'
)
})
} catch (e) {
throw e
}
}
/**
* @function addLimitAndInitialPosition
*
Expand Down Expand Up @@ -367,9 +435,12 @@ const getParticipantLimitsByParticipantId = async (participantId, type) => {

module.exports = {
getByNameAndCurrency,
getParticipantLimitByParticipantIdAndCurrencyId,
getEndpoint,
getAllEndpoints,
addEndpoint,
getParticipantPositionByParticipantIdAndCurrencyId,
getParticipantLimitByParticipantCurrencyLimit,
addLimitAndInitialPosition,
adjustLimits,
getParticipantLimitsByCurrencyId,
Expand Down
Loading

0 comments on commit cde464a

Please sign in to comment.