In [1]:
!pip install kafka-python
!pip install avro-python3



In [2]:
# import required libraries
from kafka import KafkaConsumer, KafkaProducer
import avro.schema
import avro.io
import io
import hashlib, json

In [3]:
def serialize(schema, obj):
    bytes_writer = io.BytesIO()
    encoder = avro.io.BinaryEncoder(bytes_writer)
    writer = avro.io.DatumWriter(schema)
    writer.write(obj, encoder)
    return bytes_writer.getvalue()

In [4]:
def deserialize(schema, raw_bytes):
    bytes_reader = io.BytesIO(raw_bytes)
    decoder = avro.io.BinaryDecoder(bytes_reader)
    reader = avro.io.DatumReader(schema)
    return reader.read(decoder)

In [5]:
schema_file = 'transaction.avsc'
txschema = avro.schema.parse(open(schema_file).read())
schema_file = 'submit.avsc'
submitschema = avro.schema.parse(open(schema_file).read())
schema_file = 'result.avsc'
resultschema = avro.schema.parse(open(schema_file).read())

In [6]:
# Connect to kafka broker running in your local host (docker). Change this to your kafka broker if needed
kafka_broker = 'lab.aimet.tech:9092'

In [7]:
submitproducer = KafkaProducer(bootstrap_servers=[kafka_broker])

In [8]:
txconsumer = KafkaConsumer(
    'transaction',
     bootstrap_servers=[kafka_broker],
     enable_auto_commit=True,
     value_deserializer=lambda x: deserialize(txschema, x))
resultconsumer = KafkaConsumer(
    'result',
     bootstrap_servers=[kafka_broker],
     enable_auto_commit=True,
     value_deserializer=lambda x: deserialize(resultschema, x))

In [9]:
def gen_signature(txid, payer, payee, amount, token):
    o = {'txid': txid, 'payer': payer, 'payee': payee, 'amount': amount, 'token': token}
    return hashlib.md5(json.dumps(o, sort_keys=True).encode('utf-8')).hexdigest()

In [10]:
# Starting transaction consumer
for msg in txconsumer:
    
    # Setting up token and verification ID
    token = 'e6463ec70f455d19607d8c75135e4b68'
    vid = 'V458404'
    
    # Extracting transaction details
    txid = (msg.value)['txid']
    payer = (msg.value)['payer']
    payee = (msg.value)['payee']
    amount = (msg.value)['amount']

    # Constructing transaction data with verification ID, transaction ID, and signature
    transaction_data = {'vid': vid, 'txid': txid,'signature': gen_signature(txid, payer, payee, amount, token)}
    
    # Serializing transaction data
    serialized_data = serialize(submitschema, transaction_data)
    
    # Sending serialized transaction data to submitproducer
    submitproducer.send('submit', serialized_data)

    # Waiting for verification result
    for res in resultconsumer:
        if res.value['vid'] == vid and res.value['txid'] == txid:
            print(res.value)
            print()
            print('timestamp: ',res.value['timestamp'])
            print('vid: ',res.value['vid'])
            print('txid: ',res.value['txid'])
            print('checksum: ',res.value['checksum'])
            print()
            print('-------------------SUCCESS------------------------',end='\n\n\n')
            break


{'timestamp': 1711640859, 'vid': 'V458404', 'txid': 'TX09360', 'checksum': '8c7c27dd615e2c8f0ab6eaaa06b39d87', 'code': 200, 'message': 'Confirm'}

timestamp:  1711640859
vid:  V458404
txid:  TX09360
checksum:  8c7c27dd615e2c8f0ab6eaaa06b39d87

-------------------SUCCESS------------------------


{'timestamp': 1711640865, 'vid': 'V458404', 'txid': 'TX04451', 'checksum': '41ed79d48da67e2458e64f8c691c6dac', 'code': 200, 'message': 'Confirm'}

timestamp:  1711640865
vid:  V458404
txid:  TX04451
checksum:  41ed79d48da67e2458e64f8c691c6dac

-------------------SUCCESS------------------------


{'timestamp': 1711640874, 'vid': 'V458404', 'txid': 'TX02156', 'checksum': 'cb597ba145009afbef1e299f2a086a2d', 'code': 200, 'message': 'Confirm'}

timestamp:  1711640874
vid:  V458404
txid:  TX02156
checksum:  cb597ba145009afbef1e299f2a086a2d

-------------------SUCCESS------------------------


{'timestamp': 1711640883, 'vid': 'V458404', 'txid': 'TX02699', 'checksum': 'f8c80a95bde415197a45d75806c244fe'