In [1]:
# import required libraries
from kafka import KafkaConsumer, KafkaProducer
import avro.schema
import avro.io
import io
import hashlib, json

In [2]:
def serialize(schema, obj):
    bytes_writer = io.BytesIO()
    encoder = avro.io.BinaryEncoder(bytes_writer)
    writer = avro.io.DatumWriter(schema)
    writer.write(obj, encoder)
    return bytes_writer.getvalue()

In [3]:
def deserialize(schema, raw_bytes):
    bytes_reader = io.BytesIO(raw_bytes)
    decoder = avro.io.BinaryDecoder(bytes_reader)
    reader = avro.io.DatumReader(schema)
    return reader.read(decoder)

In [4]:
schema_file = 'transaction.avsc'
txschema = avro.schema.parse(open(schema_file).read())
schema_file = 'submit.avsc'
submitschema = avro.schema.parse(open(schema_file).read())
schema_file = 'result.avsc'
resultschema = avro.schema.parse(open(schema_file).read())

In [5]:
# Connect to kafka broker running in your local host (docker). Change this to your kafka broker if needed
kafka_broker = 'lab.aimet.tech:9092'

In [6]:
producer = KafkaProducer(bootstrap_servers=[kafka_broker])

In [7]:
txconsumer = KafkaConsumer(
    'transaction',
     bootstrap_servers=[kafka_broker],
     enable_auto_commit=True,
     value_deserializer=lambda x: deserialize(txschema, x))
resultconsumer = KafkaConsumer(
    'result',
     bootstrap_servers=[kafka_broker],
     enable_auto_commit=True,
     value_deserializer=lambda x: deserialize(resultschema, x))

In [8]:
def gen_signature(txid, payer, payee, amount, token):
    o = {'txid': txid, 'payer': payer, 'payee': payee, 'amount': amount, 'token': token}
    return hashlib.md5(json.dumps(o, sort_keys=True).encode('utf-8')).hexdigest()

In [48]:
def verify_one(vid=r'V584056',
               token=r'8a92988c02bd85f07f4e0e03b6b88c74',
               verbose=True,
               txconsumer=txconsumer,
               producer=producer,
               resultconsumer=resultconsumer,
               submitschema=submitschema,):
    # get msg
    msg = next(txconsumer)
    print(f'msg = {msg.value}')

    # gen signature
    signature = gen_signature(msg.value['txid'],
                              msg.value['payer'],
                              msg.value['payee'],
                              msg.value['amount'],
                              token)
    print(f'signature = {signature}')

    # submit
    submit = {'vid': vid,
              'txid': msg.value['txid'],
              'signature': signature}
    print(f'submit = {submit}')

    # send submit
    producer.send('submit', serialize(submitschema, submit))
    
    # get result
    for result_msg in resultconsumer:
        print('checking result : ', end='')
        if result_msg.value['txid'] == msg.value['txid'] and result_msg.value['vid'] == vid:
            print('matched!!')
            for k, v in result_msg.value.items():
                print(f'\t{k} : {v}')
            break
        else:
            print('not match')

In [49]:
verify_one()

msg = {'txid': 'TX00208', 'payer': 'A07618', 'payee': 'A95820', 'amount': 377}
signature = 3381c6d2dda951a6b5d46b7338884f7b
submit = {'vid': 'V584056', 'txid': 'TX00208', 'signature': '3381c6d2dda951a6b5d46b7338884f7b'}
checking result : matched!!
	timestamp : 1711093796
	vid : V584056
	txid : TX00208
	checksum : 97dd65a8c7efafb6d21ce1f4cf9754f6
	code : 200
	message : Confirm
