In [20]:
"""
Following are the steps that the producer follows to produce the data
1. Connect to mysql server
2.fetch the data from the table incrementally
3.creates a kafka connection and publishes data to the topic
4.Updates the incremental date back to the etl table
"""

import time

import pandas as pd
import datetime as dt
import mysql.connector
from confluent_kafka import SerializingProducer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import StringSerializer

etl_table = 'etl_log'
table_name = 'product_information'
schema_name = 'buy_online'


def connect_mysql():
    """
    This function is used to establish connection to mysql server and fetch the details required to pull the data
    :return:
    """
    #print("I am here")
    config = {
        'user': 'root',
        'host': 'localhost',
        'password': 'admin',
        'database': 'buy_online',
        'raise_on_warnings': True
    }

    cnx = mysql.connector.connect(**config)
    my_cursor = cnx.cursor()

    # Get the last_run_timestamp of the etl from the database

    get_last_run_timestamp = "select last_run_timestamp from " + etl_table + " where id = (select max(id) from " + etl_table + ");"
    print(get_last_run_timestamp)
    my_cursor.execute(get_last_run_timestamp)
    last_run_timestamp = my_cursor.fetchall()

    last_run_timestamp = pd.DataFrame(last_run_timestamp)
    print(last_run_timestamp[0][0])

    fetch_data_from_table(my_cursor, last_run_timestamp[0][0])
    
    cnx.commit()
    cnx.close()


def fetch_data_from_table(my_cursor, last_run):
    get_data = "select column_name from information_Schema.columns where table_name='" + table_name + "' and table_schema='" + schema_name + \
               "' order by ordinal_position;"
    print(get_data)

    my_cursor.execute(get_data)
    column_names = pd.DataFrame(my_cursor.fetchall())

    #display(column_names)

    # create the select query
    col_list=[]
    sel_query = "select "
    for idx, col in column_names.iterrows():
        sel_query += col[0] + ","
        col_list.append(col[0])
    sel_query = sel_query[:-1] + " from " + schema_name + "." + table_name + " where created>='" + str(last_run) + \
                "' or last_updated>='" + str(last_run) + "' limit 10;"
    print(sel_query)
    print(col_list)

    kafka_producer(sel_query, my_cursor,col_list)


def kafka_producer(sel_query, my_cursor,col_list):
    my_cursor.execute(sel_query)
    product_data = pd.DataFrame(my_cursor.fetchall(),columns=col_list)
    display(product_data.head(10))

    create_kafka_connection(product_data)

    date_update(my_cursor)


def create_kafka_connection(product_data):
    kafka_config = {
        'bootstrap.servers': 'pkc-l7pr2.ap-south-1.aws.confluent.cloud:9092',
        'sasl.mechanisms': 'PLAIN',
        'security.protocol': 'SASL_SSL',
        'sasl.username': 'P2AIJEUTB2XQGESS',
        'sasl.password': 'l5wKwZ6dXVTKhYjRjg/LvumvUXmlurP7AB36X6g6i1PgdQsNAMtzJJQ+b/PAS9RF'
    }

    # create a schema registry client

    create_schema_registry_client = SchemaRegistryClient({
        'url': 'https://psrc-gqrvzv.southeastasia.azure.confluent.cloud',
        'basic.auth.user.info': '{}:{}'.format('NWIGQODEMAQLNVHG',
                                               'srLL0tbfwYYRjsF6LvawiZArZXVYuF1g9/rQWGzi4qz2Zm3kBrUwlX0vinzsyqjH')
    })

    # fetch the latest avro schema
    subject_name = 'kafka-topic-product-value'
    schema_str = create_schema_registry_client.get_latest_version(subject_name).schema.schema_str

    key_serializer=StringSerializer('UTF-8')
    avro_serializer=AvroSerializer(create_schema_registry_client,schema_str)

    #establish connection to the producer

    producer=SerializingProducer({
        'bootstrap.servers':kafka_config['bootstrap.servers'],
        'sasl.mechanisms':kafka_config['sasl.mechanisms'],
        'security.protocol':kafka_config['security.protocol'],
        'sasl.username':kafka_config['sasl.username'],
        'sasl.password':kafka_config['sasl.password'],
        'key.serializer':key_serializer,
        'value.serializer':avro_serializer
    })

    for idx,row in product_data.iterrows():
        value=row.to_dict()
        #print(value)
        producer.produce(topic='kafka-topic-product',key=str(row[0]),value=value,on_delivery=delivery_report)
        producer.flush()
        time.sleep(2)

    print("All data successfully published")

def delivery_report(err,msg):

    if err is not None:
        print("Delivery failed for User record {}: {}".format(msg.key(), err))
        return
    print('User record {} successfully produced to {} [{}] at offset {}'.format(
        msg.key(), msg.topic(), msg.partition(), msg.offset()))


def date_update(my_cursor):

    date_insert_query="insert into  " + etl_table + " (last_run_timestamp) values ('" + str(dt.datetime.now()) + "');"
    print(date_insert_query)
    my_cursor.execute(date_insert_query)
    



def main():
    connect_mysql()
    

if __name__ == "__main__":
    main()



select last_run_timestamp from etl_log where id = (select max(id) from etl_log);
1990-01-01 00:00:00
select column_name from information_Schema.columns where table_name='product_information' and table_schema='buy_online' order by ordinal_position;
select id,product_name,category,price,created,last_updated from buy_online.product_information where created>='1990-01-01 00:00:00' or last_updated>='1990-01-01 00:00:00' limit 10;
['id', 'product_name', 'category', 'price', 'created', 'last_updated']


Unnamed: 0,id,product_name,category,price,created,last_updated
0,1,Sofa,Houseold,25000.0,2024-07-28 13:32:58,2024-07-28 13:32:58
1,2,Cupboard,Shopping Mart,20000.0,2024-07-28 13:32:58,2024-07-28 13:32:58
2,3,Cupboard,Shopping Mart,10000.0,2024-07-28 13:32:58,2024-07-28 13:32:58
3,4,Wires,Office,20000.0,2024-07-28 13:32:58,2024-07-28 13:32:58
4,5,Cupboard,Educational Institutions,300.0,2024-07-28 13:32:58,2024-07-28 13:32:58
5,6,Cupboard,Houseold,25000.0,2024-07-28 13:32:58,2024-07-28 13:32:58
6,7,Adaptor,Shopping Mart,300.0,2024-07-28 13:32:58,2024-07-28 13:32:58
7,8,Sofa,Office,4000.0,2024-07-28 13:32:58,2024-07-28 13:32:58
8,9,Bottle,Houseold,790.0,2024-07-28 13:32:58,2024-07-28 13:32:58
9,10,Sofa,Educational Institutions,3000.0,2024-07-28 13:32:58,2024-07-28 13:32:58


  producer.produce(topic='kafka-topic-product',key=str(row[0]),value=value,on_delivery=delivery_report)


User record b'1' successfully produced to kafka-topic-product [3] at offset 1
User record b'2' successfully produced to kafka-topic-product [7] at offset 1
User record b'3' successfully produced to kafka-topic-product [1] at offset 3
User record b'4' successfully produced to kafka-topic-product [8] at offset 1
User record b'5' successfully produced to kafka-topic-product [6] at offset 2
User record b'6' successfully produced to kafka-topic-product [0] at offset 1
User record b'7' successfully produced to kafka-topic-product [6] at offset 3
User record b'8' successfully produced to kafka-topic-product [1] at offset 4
User record b'9' successfully produced to kafka-topic-product [9] at offset 1
User record b'10' successfully produced to kafka-topic-product [1] at offset 5
All data successfully published
insert into  etl_log (last_run_timestamp) values ('2024-07-28 17:34:22.476411');
