#### This script will pull the serialized data from the topic , de-serialize it and push it to a file after converting it to Json

###### Import the libraries

In [1]:
import pandas as pd
import datetime as dt
from confluent_kafka import DeserializingConsumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import StringDeserializer
from pymongo import MongoClient

##### call the main function and create functions to establish connection to consumer and download data

In [2]:
def main():
    connect_kafka_consumer()

##### Function to connect to kafka consumer

In [3]:
def connect_kafka_consumer():
    kafka_config={
        'bootstrap.servers':'xxxx',
        'sasl.mechanisms':'PLAIN',
        'security.protocol':'SASL_SSL',
        'sasl.username':'xxxx',
        'sasl.password':'xxxx',
        'auto.offset.reset':'earliest'
    }

    schema_registry_client=SchemaRegistryClient({
        'url':'xxxx',
        'basic.auth.user.info':'{}:{}'.format('xxxx', 'xxxx')
        })

    subject_name='logistics_data-value'
    schema_str=schema_registry_client.get_latest_version(subject_name).schema.schema_str

    key_deserialize=StringDeserializer('UTF-8')
    avro_deserializer=AvroDeserializer(schema_registry_client,schema_str)

    consumer=DeserializingConsumer({
        'bootstrap.servers':kafka_config['bootstrap.servers'],
        'security.protocol':kafka_config['security.protocol'],
        'sasl.mechanisms':kafka_config['sasl.mechanisms'],
        'sasl.username':kafka_config['sasl.username'],
        'sasl.password':kafka_config['sasl.password'],
        'group.id':'tst10',
        'auto.offset.reset':kafka_config['auto.offset.reset'],
        'key.deserializer':key_deserialize,
        'value.deserializer':avro_deserializer
    })

    consumer.subscribe(['logistics_data'])

    try:
        while True:
            msg=consumer.poll(1.0)
            if msg is None:
                continue
            elif msg.error():
                print('failed with error :{}'.format(msg.error()))
            data=msg.value()
            print("Consumer record with key : {} and value : {}".format(msg.key(),msg.value()))
            push_data_to_mongo(data)
    except KeyboardInterrupt:
        pass
    finally:
        consumer.close()
    
        

#### Function to read the data from the consumer and write data to mongoDB

In [4]:
def push_data_to_mongo(data):
    #Connect to mongo_db
    url="xxxx"
    client=MongoClient(url)

    db_names=client.list_database_names()
    if 'logistics_mart' not in db_names:
        my_db=client['logistics_mart']
    my_db=client['logistics_mart']
    if 'logistics_collection' not in my_db.list_collection_names():
        my_collection=client['logistics_collection']
    my_collection=my_db['logistics_collection']
    
    #insert the data into the logistics_mart.logistics_collection

    insert_result=my_collection.insert_one(data)
    print('document inserted with id :{}'.format(insert_result.inserted_id))
    client.close()
    



#### The main function call

In [5]:
if __name__=='__main__':
    main()

Consumer record with key : 1 and value : {'GpsProvider': 'VAMOSYS', 'BookingID': 'VCV00014271/082021', 'Market_Regular': 'Regular', 'BookingID_Date': '8/27/2020', 'vehicle_no': 'TN30BC5917', 'Origin_Location': 'DAIMLER INDIA COMMERCIAL VEHICLES,KANCHIPURAM,TAMIL NADU', 'Destination_Location': 'DAIMLER INDIA COMMERCIAL VEHICLES,KANCHIPURAM,TAMIL NADU', 'Org_lat_lon': '12.8390,79.9540', 'Des_lat_lon': '12.8390,79.9540', 'Data_Ping_time': '40:28.0', 'Planned_ETA': '22:22.8', 'Current_Location': 'Unnamed Road, Oragadam Industrial Corridor, Vattambakkam R.F., Tamil Nadu 631605, India', 'DestinationLocation': 'DAIMLER INDIA COMMERCIAL VEHICLES,KANCHIPURAM,TAMIL NADU', 'actual_eta': '8/28/2020 12:46', 'Curr_lat': 12.83675683, 'Curr_lon': 79.95442767, 'ontime': 'G', 'delay': nan, 'OriginLocation_Code': 'CHEORADMRCCB1', 'DestinationLocation_Code': 'CHEMATDMROPA7', 'trip_start_date': '8/27/2020 16:21', 'trip_end_date': nan, 'TRANSPORTATION_DISTANCE_IN_KM': 103.0, 'vehicleType': nan, 'Minimum_kms