This project processes payments using Hazelcast Jet. It reads payments from a payments.csv
file, processes them through a Hazelcast Jet pipeline, debulks bulk transactions, aggregates payments by merchant ID, and prints the processed payments to the console.
- Reads payments from a CSV file.
- Handles bulk payments by debulking them into individual transactions.
- Enriches customer data using Hazelcast IMap.
- Aggregates payments based on Merchant ID.
- Filters out invalid transactions.
- Writes the processed payments to the console.
- Java 17 or later installed
- Apache Maven installed
- Git installed
git clone https://github.com/phpavan/PaymentProcessor.git
cd PaymentProcessor
mvn clean package
java -jar target/hazelcast-payment-processor-1.0-SNAPSHOT.jar
The application processes a CSV file with the following format:
TXN001,123,M001,100,USD,1712551800
TXN002,456,M002,200,EUR,1712551860
TXN006,111,M001,150,USD,1712551980
TXN007,789,M003,"[{\"txn_id\":\"TXN004\",\"amount\":50},{\"txn_id\":\"TXN005\",\"amount\":30}]",USD,1712551920
TXN003,123,M002,100,USD,1712551800
TXN004,456,M003,200,EUR,1712551860
TXN005,111,M001,150,USD,1712551980
The Hazelcast Jet Pipeline follows these steps:
- Read Payments from CSV: Reads payment transactions from
payments.csv
using Hazelcast Jet's file source. - Parse Payments: Converts CSV records into
Payment
objects. - Debulk Transactions: Identifies bulk payments and expands them into individual transactions.
- Enrich with Customer Data: Uses Hazelcast IMap to map customer IDs to their names.
- Filter Invalid Transactions: Removes payments with invalid or zero amounts.
- Aggregate Payments by Merchant: Groups payments by Merchant ID and sums the transaction amounts.
- Write Output to Console: Prints the processed payments to the console.
HazelcastInstance hz = Hazelcast.newHazelcastInstance(config);
– Creates a new Hazelcast instance.IMap<String, String> customerData = hz.getMap("customer-info");
– Stores customer data in a distributed map.pipeline.readFrom(Sources.files("payments.csv"))
– Reads the CSV file as input..map(PaymentProcessor::parsePayment)
– Converts CSV lines intoPayment
objects..flatMap(PaymentProcessor::debulkTransaction)
– Expands bulk transactions..mapUsingIMap("customer-info", Payment::getCustomerId, (payment, customerName) -> {...})
– Enriches transactions with customer names..groupingKey(Payment::getMerchantId).aggregate(AggregateOperations.summingDouble(Payment::getAmount))
– Aggregates payments by merchant..writeTo(Sinks.fromProcessor("consoleSink", ProcessorMetaSupplier.of(...)))
– Outputs results to the console.
When you run the application, the processed payments will be displayed in the console, showing individual transactions and aggregated totals.
Feel free to fork the repo, make improvements, and submit pull requests!
This project is licensed under the MIT License.
For any questions or issues, reach out via the GitHub repository: PaymentProcessor.