In [10]:
# import required libraries
from kafka import KafkaConsumer, KafkaProducer
import avro.schema
import avro.io
import io
import hashlib, json

In [11]:
def serialize(schema, obj):
    bytes_writer = io.BytesIO()
    encoder = avro.io.BinaryEncoder(bytes_writer)
    writer = avro.io.DatumWriter(schema)
    writer.write(obj, encoder)
    return bytes_writer.getvalue()

In [12]:
def deserialize(schema, raw_bytes):
    bytes_reader = io.BytesIO(raw_bytes)
    decoder = avro.io.BinaryDecoder(bytes_reader)
    reader = avro.io.DatumReader(schema)
    return reader.read(decoder)

In [13]:
schema_file = 'transaction.avsc'
txschema = avro.schema.parse(open(schema_file).read())
schema_file = 'submit.avsc'
submitschema = avro.schema.parse(open(schema_file).read())
schema_file = 'result.avsc'
resultschema = avro.schema.parse(open(schema_file).read())

In [14]:
# Connect to kafka broker running in your local host (docker). Change this to your kafka broker if needed
kafka_broker = '35.240.149.229:9092'

In [15]:
producer = KafkaProducer(bootstrap_servers=[kafka_broker])

In [16]:
txconsumer = KafkaConsumer(
    'transaction',
     bootstrap_servers=[kafka_broker],
     enable_auto_commit=True,
     value_deserializer=lambda x: deserialize(txschema, x))
resultconsumer = KafkaConsumer(
    'result',
     bootstrap_servers=[kafka_broker],
     enable_auto_commit=True,
     value_deserializer=lambda x: deserialize(resultschema, x))

In [17]:
def gen_signature(txid, payer, payee, amount, token):
    o = {'txid': txid, 'payer': payer, 'payee': payee, 'amount': amount, 'token': token}
    return hashlib.md5(json.dumps(o, sort_keys=True).encode('utf-8')).hexdigest()

In [18]:
token = 'eb77f662d8fd62081dcfc724930c9ce4'
vid = 'V726138'
while True:
    txconsumerRecord = txconsumer.poll(timeout_ms=1000)
    if len(txconsumerRecord)!=0:
        for tp,txmessage in txconsumerRecord.items():
            for message in txmessage: 
                val=message.value #step1
                signature=gen_signature(val['txid'],val['payer'],val['payee'],val['amount'],token) #step2
                submit={'vid':vid,'signature':signature,'txid':val['txid']}
                producer.send('submit',value=serialize(submitschema,submit))#endstep2
    resultconsumerRecord =resultconsumer.poll(timeout_ms=1000)
    if len(resultconsumerRecord)!=0:
        for tp,rsmessage in resultconsumerRecord.items():
            for message in rsmessage:
                val=message.value
                print(val)        

{'timestamp': 1679730815, 'vid': 'V726138', 'txid': 'TX09564', 'checksum': '244a0a621611b6d06bdacd6f3d7ab7b6', 'code': 200, 'message': 'Confirm'}
{'timestamp': 1679730823, 'vid': 'V726138', 'txid': 'TX04858', 'checksum': '86754f565a70baa27eaf9b1fdbaecc91', 'code': 200, 'message': 'Confirm'}
{'timestamp': 1679730829, 'vid': 'V726138', 'txid': 'TX01977', 'checksum': '84d17e43827abb6b0ca409d894ee7bc3', 'code': 200, 'message': 'Confirm'}
{'timestamp': 1679730834, 'vid': 'V726138', 'txid': 'TX02773', 'checksum': 'fa63c465e4e5bc6da4698a2016d6a40c', 'code': 200, 'message': 'Confirm'}
{'timestamp': 1679730843, 'vid': 'V726138', 'txid': 'TX02824', 'checksum': '9ba03a907cd34dd59d684122dfe6377a', 'code': 200, 'message': 'Confirm'}
{'timestamp': 1679730853, 'vid': 'V726138', 'txid': 'TX09038', 'checksum': 'e9fbc72520233d99f205473053c3f71f', 'code': 200, 'message': 'Confirm'}
{'timestamp': 1679730863, 'vid': 'V726138', 'txid': 'TX06175', 'checksum': '47262cfc19b0815653e5c6f115802edc', 'code': 200, 

KeyboardInterrupt: 