In [72]:
# import required libraries
from kafka import KafkaConsumer, KafkaProducer
import avro.schema
import avro.io
import io
import hashlib, json

In [73]:
def serialize(schema, obj):
    bytes_writer = io.BytesIO()
    encoder = avro.io.BinaryEncoder(bytes_writer)
    writer = avro.io.DatumWriter(schema)
    writer.write(obj, encoder)
    return bytes_writer.getvalue()

In [74]:
def deserialize(schema, raw_bytes):
    bytes_reader = io.BytesIO(raw_bytes)
    decoder = avro.io.BinaryDecoder(bytes_reader)
    reader = avro.io.DatumReader(schema)
    return reader.read(decoder)

In [75]:
schema_file = 'transaction.avsc'
txschema = avro.schema.parse(open(schema_file).read())
schema_file = 'submit.avsc'
submitschema = avro.schema.parse(open(schema_file).read())
schema_file = 'result.avsc'
resultschema = avro.schema.parse(open(schema_file).read())

In [76]:
# Connect to kafka broker running in your local host (docker). Change this to your kafka broker if needed
kafka_broker = 'lab.aimet.tech:9092'

In [77]:
producer = KafkaProducer(bootstrap_servers=[kafka_broker])

In [78]:
txconsumer = KafkaConsumer(
    'transaction',
    bootstrap_servers=[kafka_broker],
    enable_auto_commit=True,
    value_deserializer=lambda x: deserialize(txschema, x))
resultconsumer = KafkaConsumer(
    'result',
    bootstrap_servers=[kafka_broker],
    enable_auto_commit=True,
    value_deserializer=lambda x: deserialize(resultschema, x))

In [79]:
def gen_signature(txid, payer, payee, amount, token):
    o = {'txid': txid, 'payer': payer, 'payee': payee, 'amount': amount, 'token': token}
    return hashlib.md5(json.dumps(o, sort_keys=True).encode('utf-8')).hexdigest()

In [80]:
verify_token = '144c0db0459d005a4beef900f0c0c531'
vid = "V763258"

In [81]:
for message in txconsumer:
    tx = message.value
    txid = tx['txid']
    payer = tx['payer']
    payee = tx['payee']
    amount = tx['amount']
    print(f"Received transaction")
    print(f"txid: {txid}, payer: {payer}, payee: {payee}, amount: {amount}\n")

    signature = gen_signature(txid, payer, payee, amount, verify_token)
    print(f"Generated signature - {signature}\n")

    verification = serialize(submitschema, {'vid': vid, 'txid': txid, 'signature': signature})
    producer.send('submit', verification)

    for result_message in resultconsumer:
        result = result_message.value
        resultCode = result.get('code')
        resultVid = result.get('vid')
        resultTxid = result.get('txid')
        if resultCode == 200 and resultVid == vid and resultTxid == txid:
            print(f"Transaction verified")
            print(
                f"txid: {result.get('txid')}, timestamp: {result.get('timestamp')}, vid: {result.get('vid')}, checksum: {result.get('checksum')}, code: {result.get('code')}, message: {result.get('message')}")

Received transaction
txid: TX06134, payer: A13402, payee: A89022, amount: 488

Generated signature - 5ae698d4262c497629bfa91eeb3aac00

Transaction verified
txid: TX06134, timestamp: 1729929519, vid: V763258, checksum: 4c2130fb049e9878d25d94703768b553, code: 200, message: Confirm


KeyboardInterrupt: 