/*
This example shows how to synchronize the incoming tuples using a
Barrier operator. It uses a bank deposit/debit scenario to
split the deposit/debit requests, perform that account activity,
and then combine the post-activity results with the incoming requests.
Barrier operator provides what is needed to accomplish that.
*/
namespace sample;
composite barrier_at_work {
type
BankTxSchema = uint32 accountNumber, rstring txType, float64 currentBalance, float64 txAmount;
DepositInputSchema = uint32 accountNumber, float64 currentBalance, float64 depositAmount;
DebitInputSchema = uint32 accountNumber, float64 currentBalance, float64 debitAmount;
TxResultSchema = float64 newBalance;
PostTxResultSchema = uint32 accountNumber, float64 currentBalance;
graph
// Read the bank transaction details one at a time.
stream <BankTxSchema> TxData = FileSource() {
param
file : "bank_tasks.dat";
format: csv;
} // End of FileSource.
// Apply filter to pick the deposit requests.
stream <DepositInputSchema> DepositRequest = Functor(TxData) {
param
filter: txType == "Deposit";
output
DepositRequest: depositAmount = txAmount;
} // End of Functor(TxData)
// Apply filter to pick the debit requests.
stream <DebitInputSchema> DebitRequest = Functor(TxData) {
param
filter: txType == "Debit";
output
DebitRequest: debitAmount = txAmount;
} // End of Functor(TxData)
// Compute the new balance because of the deposit activity.
stream <TxResultSchema> DepositResult = Functor(DepositRequest) {
output
DepositResult: newBalance = currentBalance + depositAmount;
} // End of Functor(DepositRequest)
// Compute the new balance because of the debit activity.
stream <TxResultSchema> DebitResult = Functor(DebitRequest) {
output
DebitResult: newBalance = currentBalance - debitAmount;
} // End of Functor(DebitRequest)
// Use a barrier to synchronize the deposit request and result.
stream <PostTxResultSchema> FinalDepositResult = Barrier(DepositRequest; DepositResult) {
output
FinalDepositResult: accountNumber = DepositRequest.accountNumber, currentBalance = DepositResult.newBalance;
} // End of Barrier(DepositRequest; DepositResult)
// Use a barrier to synchronize the debit request and result.
stream <PostTxResultSchema> FinalDebitResult = Barrier(DebitRequest; DebitResult) {
output
FinalDebitResult: accountNumber = DebitRequest.accountNumber, currentBalance = DebitResult.newBalance;
} // End of Barrier(DebitRequest; DebitResult)
// Write the deposit results to a sink file.
() as FileWriter1 = FileSink(FinalDepositResult) {
param
file: "bank_tasks_deposit.result";
} // End of FileSink(FinalDepositResult)
// Write the debit results to a sink file.
() as FileWriter2 = FileSink(FinalDebitResult) {
param
file: "bank_tasks_debit.result";
} // End of FileSink(FinalDebitResult)
} // End of composite barrier_at_work.
Files
006_barrier_at_work
Folders and files
Name | Name | Last commit date | ||
---|---|---|---|---|
parent directory.. | ||||