Demo execution of a payment transaction without an atomic commit across 3 partitions.
Switch branches/tags
Nothing to show
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Failed to load latest commit information.
cmd
docker
kafka
mock
rest
rocks
.gitignore
Gopkg.lock
Gopkg.toml
Makefile
README.md
error.go
log.go
wallet.go

README.md

Distributed Payment

Documentation Go Report Card Also have a look at distributed signup.

This project demonstrates execution of a payment transaction without an atomic commit across 3 partitions (a primer from "Designing Data-Intensive Applications" book):

  1. Alice wants to send $0.5 to Bob: the intent is stored in 💬 partition.
  2. Alice's -$0.5 outgoing payment is created in 👩 partition.
  3. Bob's +$0.5 incoming payment is persisted in 👨🏻 partition.

The idea is to write a money transfer request into wallet.transfer_request Kafka topic which is partitioned by request ID (some unique ID generated by Alice). Hence all requests with the same ID will be stored in the same Kafka partition 💬 based on consistent hashing algorithm. For example, {from: Alice, amount: 0.5, to: Bob, request_id: a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11} message is written to hash('a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11') % partitions_count partition 💬. Let's have two partitions partitions_count=2 for each Kafka topic for simplicity.

A transfer-server instance appends transfer requests into wallet.transfer_request topic.

Each paymentd instance (two in our case) sequentially reads Kafka messages from its own partition of wallet.transfer_request and creates two payment instructions in wallet.payment topic:

  • {account: Alice, direction: outgoing, amount: 0.5, request_id: a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11} message goes into 👩 partition based on hash('Alice') % 2.
  • {account: Bob, direction: incoming, amount: 0.5, request_id: a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11} message goes into 👨🏻 partition based on hash('Bob') % 2.

There might be duplicate credit/debit instructions when a process crashes and restarts.

Each accountantd instance sequentially reads Kafka messages from its own partition of wallet.payment topic, deduplicates messages by request ID, and applies the changes to the balances. For example, the accountant №1 has read the following messages:

  • {account: Alice, direction: outgoing, amount: 0.5, request_id: a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11}
  • {account: John, direction: incoming, amount: 99, request_id: 6ba7b810-9dad-11d1-80b4-00c04fd430c8}
  • {account: Alice, direction: outgoing, amount: 0.5, request_id: a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11}

Alice's account must be deducted only once. The accountant №2 skipped a duplicate and credited Bob $0.5:

  • {account: Bob, direction: incoming, amount: 0.5, request_id: a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11}
  • {account: Bob, direction: incoming, amount: 0.5, request_id: a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11}

Note, request_id is generated by a client who sends money (Alice). Request IDs are kept for a certain duration (until a message ages out) or limited by storage size. Segment shared how they leverage RocksDB in Delivering Billions of Messages Exactly Once:

If the dedupe worker crashes for any reason or encounters an error from Kafka, when it re-starts it will first consult the "source of truth" for whether an event was published: the output topic. If a message was found in the output topic, but not RocksDB (or vice-versa) the dedupe worker will make the necessary repairs to keep the database and RocksDB in-sync. In essence, we're using the output topic as both our write-ahead-log, and our end source of truth, with RocksDB checkpointing and verifying it.

That inspired me to try RocksDB as a deduplication storage as well.

Get Started

We need Kafka which will have wallet.transfer_request and wallet.payment topics with 2 partitions and 1 replica. Docker Compose will take care of that. The only caveat is that you should set KAFKA_ADVERTISED_HOST_NAME.

$ cd ./docker/
$ KAFKA_ADVERTISED_HOST_NAME=$(ipconfig getifaddr en0) docker-compose up

Install dependencies using dep package manager and build all commands. Note, you need to install RocksDB first (assuming you're on Mac).

$ brew install rocksdb
$ dep ensure
$ make build

Run a transfer-server to validate transfer requests and persist them in wallet.transfer_request topic partitioned by request ID.

$ ./transfer-server

Send a money transfer request:

$ curl -i -X POST -d '{"from": "Alice", "to": "Bob", "amount": "0.5", "request_id": "a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11"}' \
    http://localhost:8000/api/v1/transfers
HTTP/1.1 201 Created
Content-Type: application/json
Content-Length: 96

{"request_id":"a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11","from":"Alice","amount":"0.50","to":"Bob"}

Stream Processors

Since we have a transfer request in Kafka, we can run two paymentd processes for each partition to create corresponding payments.

$ ./paymentd -partition=0
$ ./paymentd -partition=1
1:0 a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11 Alice -$0.50
0:0 a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11 Bob +$0.50

Payments are printed in partition_id:offset request_id account amount format. As you can see:

  • a transfer has been stored in partition 1 (no output from ./paymentd -partition=0),
  • Alice's outgoing payment was stored in partition 1,
  • Bob's incoming payment landed at partition 0.

Payment instructions end up in wallet.payment topic's partitions. Let's process them, so Alice's and Bob's balances are updated:

$ ./accountantd -partition=0
Bob balance: 0.50 USD
$ ./accountantd -partition=1
Alice balance: -0.50 USD

Try sending a duplicate request and see if balances stay the same.

Future Work

  • Validate sender's balance before creating a transfer.
  • It will be interesting to check invariants by DInv, TLA+.