In [106]:
%pip install kafka-python



In [107]:
%pip install avro-python3



In [108]:
# import required libraries
from kafka import KafkaConsumer, KafkaProducer
import avro.schema
import avro.io
import io
import hashlib, json

In [109]:
def serialize(schema, obj):
    bytes_writer = io.BytesIO()
    encoder = avro.io.BinaryEncoder(bytes_writer)
    writer = avro.io.DatumWriter(schema)
    writer.write(obj, encoder)
    return bytes_writer.getvalue()

In [110]:
def deserialize(schema, raw_bytes):
    bytes_reader = io.BytesIO(raw_bytes)
    decoder = avro.io.BinaryDecoder(bytes_reader)
    reader = avro.io.DatumReader(schema)
    return reader.read(decoder)

# Get The File (Schema)

In [111]:
!wget -nc https://raw.githubusercontent.com/pvateekul/2110446_DSDE_2023s2/main/code/Week09_DataIngestion/Assignment/result.avsc
!wget -nc https://raw.githubusercontent.com/pvateekul/2110446_DSDE_2023s2/main/code/Week09_DataIngestion/Assignment/submit.avsc
!wget -nc https://raw.githubusercontent.com/pvateekul/2110446_DSDE_2023s2/main/code/Week09_DataIngestion/Assignment/transaction.avsc

File ‘result.avsc’ already there; not retrieving.

File ‘submit.avsc’ already there; not retrieving.

File ‘transaction.avsc’ already there; not retrieving.



In [112]:
schema_file = 'transaction.avsc'
txschema = avro.schema.parse(open(schema_file).read())
schema_file = 'submit.avsc'
submitschema = avro.schema.parse(open(schema_file).read())
schema_file = 'result.avsc'
resultschema = avro.schema.parse(open(schema_file).read())

# Kafka

In [113]:
# Connect to kafka broker running in your local host (docker). Change this to your kafka broker if needed
kafka_broker = 'lab.aimet.tech:9092'

In [114]:
producer = KafkaProducer(bootstrap_servers=[kafka_broker])

In [115]:
txconsumer = KafkaConsumer(
    'transaction',
     bootstrap_servers=[kafka_broker],
     enable_auto_commit=True,
     value_deserializer=lambda x: deserialize(txschema, x))
resultconsumer = KafkaConsumer(
    'result',
     bootstrap_servers=[kafka_broker],
     enable_auto_commit=True,
     value_deserializer=lambda x: deserialize(resultschema, x))



In [116]:
def gen_signature(txid, payer, payee, amount, token):
    o = {'txid': txid, 'payer': payer, 'payee': payee, 'amount': amount, 'token': token}
    return hashlib.md5(json.dumps(o, sort_keys=True).encode('utf-8')).hexdigest()

# Function Associated with Transaction


In [117]:
def publish_verification(vid, txid, signature):
    verify_data = {'vid': vid, 'txid': txid, 'signature': signature}
    serialized = serialize(submitschema, verify_data)
    producer.send('submit', serialized)

In [118]:
def receive_confirmation(vid, txid):
    for message in resultconsumer:
        confirmation = message.value
        if confirmation['code'] == 200 and confirmation['vid'] == vid and confirmation['txid'] == txid:
            return confirmation

In [119]:
def verify_transaction(vid, token):
    for message in txconsumer:
        txid = message.value['txid']
        payer = message.value['payer']
        payee = message.value['payee']
        amount = message.value['amount']

        signature = gen_signature(txid, payer, payee, amount, token)
        publish_verification(vid, txid, signature)
        confirmation = receive_confirmation(vid, txid)

        # get only one
        if confirmation:
            timestamp = confirmation['timestamp']
            checksum = confirmation['checksum']
            return timestamp, vid, txid, checksum

# Get Specified Token

In [120]:
%env VID=V490990
%env TOKEN=51bb2dfe6d5f675fd4189f5c85928a12

env: VID=V490990
env: TOKEN=51bb2dfe6d5f675fd4189f5c85928a12


In [121]:
import os
vid = os.getenv("VID")
token = os.getenv("TOKEN")

In [122]:
result = verify_transaction(vid, token)
if result:
    timestamp, vid, txid, checksum = result
    print("Timestamp:", timestamp)
    print("VID:", vid)
    print("TXID:", txid)
    print("Checksum:", checksum)

Timestamp: 1711642064
VID: V490990
TXID: TX00468
Checksum: 150a23139ae88e2e0d8ff38b8cc0a624
