In [1]:
# import required libraries
%pip install kafka-python avro
from kafka import KafkaConsumer, KafkaProducer
import avro.schema
import avro.io
import io
import hashlib, json

Collecting kafka-python
  Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m246.5/246.5 kB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting avro
  Downloading avro-1.11.3.tar.gz (90 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m90.6/90.6 kB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Building wheels for collected packages: avro
  Building wheel for avro (pyproject.toml) ... [?25l[?25hdone
  Created wheel for avro: filename=avro-1.11.3-py2.py3-none-any.whl size=123913 sha256=0ae96186582f5469a23ed7fdb674201e9d897932a63f5b10bfa42372bb52d2f7
  Stored in directory: /root/.cache/pip/wheels/1d/f6/41/0e0399396af07060e64d4e32c8bd259b48b98a4a114df31294
Successfully built avro
Installing collected packages: kafka-py

In [2]:
def serialize(schema, obj):
    bytes_writer = io.BytesIO()
    encoder = avro.io.BinaryEncoder(bytes_writer)
    writer = avro.io.DatumWriter(schema)
    writer.write(obj, encoder)
    return bytes_writer.getvalue()

In [3]:
def deserialize(schema, raw_bytes):
    bytes_reader = io.BytesIO(raw_bytes)
    decoder = avro.io.BinaryDecoder(bytes_reader)
    reader = avro.io.DatumReader(schema)
    return reader.read(decoder)

In [4]:
schema_file = 'transaction.avsc'
txschema = avro.schema.parse(open(schema_file).read())
schema_file = 'submit.avsc'
submitschema = avro.schema.parse(open(schema_file).read())
schema_file = 'result.avsc'
resultschema = avro.schema.parse(open(schema_file).read())

In [5]:
# Connect to kafka broker running in your local host (docker). Change this to your kafka broker if needed
kafka_broker = 'lab.aimet.tech:9092'

In [6]:
producer = KafkaProducer(bootstrap_servers=[kafka_broker])

In [7]:
txconsumer = KafkaConsumer(
    'transaction',
     bootstrap_servers=[kafka_broker],
     enable_auto_commit=True,
     value_deserializer=lambda x: deserialize(txschema, x))

resultconsumer = KafkaConsumer(
    'result',
     bootstrap_servers=[kafka_broker],
     enable_auto_commit=True,
     value_deserializer=lambda x: deserialize(resultschema, x))



In [8]:
def gen_signature(txid, payer, payee, amount, token):
    o = {'txid': txid, 'payer': payer, 'payee': payee, 'amount': amount, 'token': token}
    return hashlib.md5(json.dumps(o, sort_keys=True).encode('utf-8')).hexdigest()

In [9]:
# result = None
for message in txconsumer:
    transaction = message.value
    txid = transaction["txid"]
    payer = transaction["payer"]
    payee = transaction["payee"]
    amount = transaction["amount"]
    token = "2953a3965bab2f5319c1037c251bd081"
    signature = gen_signature(txid, payer, payee, amount, token)

    # Prepare submission message
    verification_data = {"vid": "V414883", "txid": txid, "signature": signature}
    submit_message = serialize(submitschema, verification_data)

    # Submit verification message
    producer.send("submit", submit_message)

    # Wait for result
    for result_message in resultconsumer:
        result = result_message.value
        if result['txid'] == verification_data['txid'] :
          break
    if result:
      print(result)
      break
    else:
      print(verification_data['txid'])

{'timestamp': 1711554743, 'vid': 'V208736', 'txid': 'TX01550', 'checksum': '25b529b87d08269c287a96f0b81db99f', 'code': 200, 'message': 'Confirm'}
