# Producer

In [1]:
# import pytz
from datetime import datetime
import yaml
import logging

from confluent_kafka import SerializingProducer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer


log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(level=logging.INFO, format=log_format)
logger = logging.getLogger("producer")


def delivery_report(err, msg):
    """
    Reports the failure or success of a message delivery.

    Args:
        err (KafkaError): The error that occurred on None on success.

        msg (Message): The message that was produced or failed.

    Note:
        In the delivery report callback the Message.key() and Message.value()
        will be the binary format as encoded by any configured Serializers and
        not the same object that was passed to produce().
        If you wish to pass the original object(s) for key and value to delivery
        report callback we recommend a bound callback or lambda where you pass
        the objects along.

    """
    if err is not None:
        print("Delivery failed for record {}: {}".format(msg.key(), err))
        return
    print('User record {} successfully produced to {} [{}] at offset {}'.format(
        msg.key(), msg.topic(), msg.partition(), msg.offset()))


config_path = 'conf/config.yaml'
with open(config_path, 'r') as yaml_fp:
    cfg = yaml.safe_load(yaml_fp)

sr_cfg = cfg['schema_registry']
kafka_cfg = cfg['kafka']

# Create a Schema Registry client
schema_registry_config = {
    'url': sr_cfg['schema.registry.url'],
    'basic.auth.user.info': '{}:{}'.format(sr_cfg['basic.auth.credentials.source'], sr_cfg['basic.auth.user.info'])
}
schema_registry_client = SchemaRegistryClient(schema_registry_config)


# Fetch the latest Avro schema for the key and the value
key_subject_name = sr_cfg["subject.key"]
value_subject_name = sr_cfg["subject.value"]
try:
    key_schema_str = schema_registry_client.get_latest_version(key_subject_name).schema.schema_str
    value_schema_str = schema_registry_client.get_latest_version(value_subject_name).schema.schema_str
    logger.info(f"Schamas have been fetched from the registry")
    print(f'key_schema_str:{key_schema_str}, value_schema_str: {value_schema_str}')

except Exception as e:
    logger.error(f"schema fetching error: {e}")

# Create Avro Serializer for the key and the value
key_avro_serializer = AvroSerializer(schema_registry_client, key_schema_str)
value_avro_serializer = AvroSerializer(schema_registry_client, value_schema_str)

# Define Kafka configuration and create a kafka producer
kafka_config = {
    'bootstrap.servers': kafka_cfg['bootstrap.servers'],
    'sasl.mechanisms': kafka_cfg['sasl.mechanisms'],
    'security.protocol': kafka_cfg['security.protocol'],
    'sasl.username': kafka_cfg['sasl.username'],
    'sasl.password': kafka_cfg['sasl.password'],
    'key.serializer': key_avro_serializer,
    'value.serializer': value_avro_serializer
}

try:
    producer = SerializingProducer(kafka_config)
    logger.info("Kafka connection has been successfully established!")
except Exception as err:
    logger.error(f"Kafka cluster connection error: {err}")

2023-10-16 17:09:22,280 - producer - INFO - Schamas have been fetched from the registry
2023-10-16 17:09:22,311 - producer - INFO - Kafka connection has been successfully established!


key_schema_str:{"type":"record","name":"Key","namespace":"com.glowdataskills.cassandra","doc":"avro-schema for kafka key","fields":[{"name":"order_id","type":"string"},{"name":"customer_id","type":"string"}]}, value_schema_str: {"type":"record","name":"Value","namespace":"com.glowdataskills.cassandra","doc":"value avro-schema for kafka topic ecommerce-orders","fields":[{"name":"order_id","type":"string"},{"name":"customer_id","type":"string"},{"name":"order_status","type":"string"},{"name":"order_purchase_timestamp","type":"string"},{"name":"order_approved_at","type":["null","string"]},{"name":"order_delivered_carrier_date","type":["null","string"]},{"name":"order_delivered_customer_date","type":["null","string"]},{"name":"order_estimated_delivery_date","type":["null","string"]}]}


In [46]:
import pandas as pd

data_path = data_path = cfg['data_source']['path']
data = pd.read_csv(data_path)

In [47]:
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 99441 entries, 0 to 99440
Data columns (total 8 columns):
 #   Column                         Non-Null Count  Dtype 
---  ------                         --------------  ----- 
 0   order_id                       99441 non-null  object
 1   customer_id                    99441 non-null  object
 2   order_status                   99441 non-null  object
 3   order_purchase_timestamp       99441 non-null  object
 4   order_approved_at              99281 non-null  object
 5   order_delivered_carrier_date   97658 non-null  object
 6   order_delivered_customer_date  96476 non-null  object
 7   order_estimated_delivery_date  99441 non-null  object
dtypes: object(8)
memory usage: 6.1+ MB


In [48]:
data.head()

Unnamed: 0,order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date
0,e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-04 19:55:00,2017-10-10 21:25:13,2017-10-18 00:00:00
1,53cdb2fc8bc7dce0b6741e2150273451,b0830fb4747a6c6d20dea0b8c802d7ef,delivered,2018-07-24 20:41:37,2018-07-26 03:24:27,2018-07-26 14:31:00,2018-08-07 15:27:45,2018-08-13 00:00:00
2,47770eb9100c2d0c44946d9cf07ec65d,41ce2a54c0b03bf3443c3d931a367089,delivered,2018-08-08 08:38:49,2018-08-08 08:55:23,2018-08-08 13:50:00,2018-08-17 18:06:29,2018-09-04 00:00:00
3,949d5b44dbf5de918fe9c16f97b45f8a,f88197465ea7920adcdbec7375364d82,delivered,2017-11-18 19:28:06,2017-11-18 19:45:59,2017-11-22 13:39:59,2017-12-02 00:28:42,2017-12-15 00:00:00
4,ad21c59c0840e6cb83a9ceb5573f8159,8ab97904e6daea8866dbdbc4fb7aad2c,delivered,2018-02-13 21:18:39,2018-02-13 22:20:29,2018-02-14 19:46:34,2018-02-16 18:17:02,2018-02-26 00:00:00


In [49]:
data['order_id'][0]

'e481f51cbdc54678b7cc49136f2d6af7'