In [11]:
!pip install kafka-python-ng
!pip install avro



In [12]:
# import required libraries
from kafka import KafkaConsumer, KafkaProducer
import avro.schema
import avro.io
import io
import hashlib, json
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

In [13]:
def serialize(schema, obj):
    bytes_writer = io.BytesIO()
    encoder = avro.io.BinaryEncoder(bytes_writer)
    writer = avro.io.DatumWriter(schema)
    writer.write(obj, encoder)
    return bytes_writer.getvalue()

In [14]:
def deserialize(schema, raw_bytes):
    bytes_reader = io.BytesIO(raw_bytes)
    decoder = avro.io.BinaryDecoder(bytes_reader)
    reader = avro.io.DatumReader(schema)
    return reader.read(decoder)

In [15]:
schema_file = 'transaction.avsc'
txschema = avro.schema.parse(open(schema_file).read())
schema_file = 'submit.avsc'
submitschema = avro.schema.parse(open(schema_file).read())
schema_file = 'result.avsc'
resultschema = avro.schema.parse(open(schema_file).read())

In [16]:
# Connect to kafka broker running in your local host (docker). Change this to your kafka broker if needed
kafka_broker = 'lab.aimet.tech:9092'

In [17]:
producer = KafkaProducer(
    bootstrap_servers=[kafka_broker],
    value_serializer=lambda x: serialize(submitschema, x)
)

In [18]:
txconsumer = KafkaConsumer(
    'transaction',
     bootstrap_servers=[kafka_broker],
     enable_auto_commit=True,
     value_deserializer=lambda x: deserialize(txschema, x))

resultconsumer = KafkaConsumer(
    'result',
     bootstrap_servers=[kafka_broker],
     enable_auto_commit=True,
     value_deserializer=lambda x: deserialize(resultschema, x))



In [19]:
VID='V918295'
VERIFICATION_TOKEN='5e77d637c1ec909c1d6c6be1f092bb9a'
def gen_signature(txid, payer, payee, amount, token):
    o = {'txid': txid, 'payer': payer, 'payee': payee, 'amount': amount, 'token': token}
    return hashlib.md5(json.dumps(o, sort_keys=True).encode('utf-8')).hexdigest()

In [20]:
# txproducer = KafkaProducer(
#     bootstrap_servers=[kafka_broker],
#     value_serializer=lambda x: serialize(txschema, x)
# )
# txproducer.send('transaction', {'txid':'1', 'payer':'a', 'payee':'b', 'amount':100})
# txproducer.flush()

In [None]:
def consume_transaction_and_submit(txconsumer, producer):
    print(f'Waiting for messages from topic transaction')
    try:
        for msg in txconsumer:
            transaction = msg.value
            signature = gen_signature(transaction['txid'], transaction['payer'], transaction['payee'], transaction['amount'], VERIFICATION_TOKEN)
            producer.send(
                'submit',
                {'vid':VID, 'txid': transaction['txid'], 'signature': signature}
            )
            producer.flush()
    except Exception as e:
        print(f'Error while listening to transaction: {e}')
    finally:
        producer.close()

def consume_result(resultconsumer):
    print(f'Waiting for messages from topic result')
    try:
        for msg in resultconsumer:
            result = msg.value
            if msg.value['vid'] == VID:
                print(f'{result}')
    except Exception as e:
        print(f'Error while listening to result: {e}')
    finally:
        resultconsumer.close()

with ThreadPoolExecutor() as executor:
    futures = []

    futures.append(executor.submit(consume_transaction_and_submit, txconsumer, producer))
    futures.append(executor.submit(consume_result, resultconsumer))

print('Exited')

Waiting for messages from topic transaction
Waiting for messages from topic result
{'timestamp': 1729986174, 'vid': 'V918295', 'txid': 'TX03326', 'checksum': '78c7f854afe43deb54fbaa1d803df806', 'code': 200, 'message': 'Confirm'}
{'timestamp': 1729986179, 'vid': 'V918295', 'txid': 'TX07174', 'checksum': 'b8b136993fd0e5780cf9c2f9a2f95490', 'code': 200, 'message': 'Confirm'}
{'timestamp': 1729986189, 'vid': 'V918295', 'txid': 'TX06580', 'checksum': 'e5b0fa585474aa06c718a86864631344', 'code': 200, 'message': 'Confirm'}
{'timestamp': 1729986199, 'vid': 'V918295', 'txid': 'TX06334', 'checksum': 'b6ff8d553cf0f41f91c28a28a26f131a', 'code': 200, 'message': 'Confirm'}
{'timestamp': 1729986204, 'vid': 'V918295', 'txid': 'TX00958', 'checksum': 'f0e22dce20c28c957732cf1be7f19e10', 'code': 200, 'message': 'Confirm'}
{'timestamp': 1729986212, 'vid': 'V918295', 'txid': 'TX07739', 'checksum': 'd1dc3e477a8b8243aefc6fb447f2fc31', 'code': 200, 'message': 'Confirm'}
{'timestamp': 1729986222, 'vid': 'V918295