In [None]:
from confluent_kafka import DeserializingConsumer, KafkaError
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import StringDeserializer
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from datetime import datetime
import uuid
from cassandra import ConsistencyLevel

# Cassandra connection
cloud_config= {
  'secure_connect_bundle': 'secure-connect-kafka-cassandra.zip'
}
auth_provider = PlainTextAuthProvider('fwYCYuKfTnzdISZebSFsRUpH', 'Ze,I-gql5chyu_Umc00iwwf_UTU_ktjq0W1NXjTU.e+1d6vT1tI16dw8KQ9J+TAgtv6_GC3eNJoxTccRCE_BgLE-7ieErS0FAZGsLBL1pj.BintLJqZC5D_o68lso3KB')
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()

# Define Kafka configuration
kafka_config = {
    'bootstrap.servers': 'pkc-6ojv2.us-west4.gcp.confluent.cloud:9092',
    'sasl.mechanisms': 'PLAIN',
    'security.protocol': 'SASL_SSL',
    'sasl.username': 'KE3YCSOIN46EIKLM',
    'sasl.password': 'DfHmXuw2vR38KYDSbvXyGBNZCxsZrFF+VtTTPoeZMgp33o/oV8l7s0ObJ/Q48HmK',
    'group.id': 'group16',
    'auto.offset.reset': 'earliest'
}

# Create a Schema Registry client
schema_registry_client = SchemaRegistryClient({
  'url': 'https://psrc-zj6ny.us-east-2.aws.confluent.cloud',
  'basic.auth.user.info': '{}:{}'.format('NJHM72JKLO43DSHW', '3jAGYLSdojChjA9uv3fuaV/dS7D29l7tcZYcEVmDQqD0IeXeyCpNOXa7lTZb4hpy')
})

# Fetch the latest Avro schema for the value
subject_name = 'olist_orders_dataset-value'
schema_str = schema_registry_client.get_latest_version(subject_name).schema.schema_str

# Create Avro Deserializer for the value
key_deserializer = StringDeserializer('utf_8')
avro_deserializer = AvroDeserializer(schema_registry_client, schema_str)

# Define the DeserializingConsumer
consumer = DeserializingConsumer({
    'bootstrap.servers': kafka_config['bootstrap.servers'],
    'security.protocol': kafka_config['security.protocol'],
    'sasl.mechanisms': kafka_config['sasl.mechanisms'],
    'sasl.username': kafka_config['sasl.username'],
    'sasl.password': kafka_config['sasl.password'],
    'key.deserializer': key_deserializer,
    'value.deserializer': avro_deserializer,
    'group.id': kafka_config['group.id'],
    'auto.offset.reset': kafka_config['auto.offset.reset'],
    # 'enable.auto.commit': True,
    # 'auto.commit.interval.ms': 5000 # Commit every 5000 ms, i.e., every 5 seconds
})

# Subscribe to the 'retail_data' topic
consumer.subscribe(['olist_orders_dataset'])

def process_message(message):
    # Process the Kafka message and derive the new columns
    key = message.key()  
    value = message.value()
    
    # Convert 'order_purchase_timestamp' string to datetime object
    order_purchase_timestamp_str = value['order_purchase_timestamp']
    order_purchase_timestamp = None
    if order_purchase_timestamp_str is not None:
        order_purchase_timestamp = datetime.strptime(order_purchase_timestamp_str, '%Y-%m-%d %H:%M:%S')
    
    # Convert 'order_approved_at' to a datetime object
    order_approved_at_str = value['order_approved_at']
    order_approved_at = None
    if order_approved_at_str is not None:
        order_approved_at = datetime.strptime(order_approved_at_str, "%Y-%m-%d %H:%M:%S")
    
    # Convert 'order_delivered_carrier_date' to a datetime object
    order_delivered_carrier_date_str = value['order_delivered_carrier_date']
    order_delivered_carrier_date = None
    if order_delivered_carrier_date_str is not None:
        order_delivered_carrier_date = datetime.strptime(order_delivered_carrier_date_str, "%Y-%m-%d %H:%M:%S")
    
    # Convert 'order_delivered_customer_date' to a datetime object
    order_delivered_customer_date_str = value['order_delivered_customer_date']
    order_delivered_customer_date = None
    if order_delivered_customer_date_str is not None:
        order_delivered_customer_date = datetime.strptime(order_delivered_customer_date_str, "%Y-%m-%d %H:%M:%S")
    
    # Convert 'order_estimated_delivery_date' to a datetime object
    order_estimated_delivery_date_str = value['order_estimated_delivery_date']
    order_estimated_delivery_date = None
    if order_estimated_delivery_date_str is not None:
        order_estimated_delivery_date = datetime.strptime(order_estimated_delivery_date_str, "%Y-%m-%d %H:%M:%S")
    
    purchase_hour = order_purchase_timestamp.hour
    purchase_day_of_week = order_purchase_timestamp.strftime('%A')
    
    # Convert 'order_id' to a valid UUID format
    try:
        order_id = uuid.UUID(value['order_id'])
    except ValueError:
        # If the 'order_id' is not a valid UUID, handle the error or skip the message
        print(f"Invalid 'order_id': {value['order_id']}")
        return
    
    # Convert 'customer_id' to a valid UUID format
    try:
        customer_id = uuid.UUID(value['customer_id'])
    except ValueError:
        # If the 'order_id' is not a valid UUID, handle the error or skip the message
        print(f"Invalid 'customer_id': {value['customer_id']}")
        return

    try:
        # Ingest the transformed data into the 'orders' table in Cassandra
        query = "INSERT INTO ecommerce.orders (order_id, customer_id, order_status, order_purchase_timestamp, " \
                "order_approved_at, order_delivered_carrier_date, order_delivered_customer_date, " \
                "order_estimated_delivery_date, order_hour, Oorder_day_of_week) " \
                "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
                
        prepared = session.prepare(query)
        
        bound_statement = prepared.bind((
        order_id,
        customer_id,
        value['order_status'],
        order_purchase_timestamp,
        order_approved_at,
        order_delivered_carrier_date,
        order_delivered_customer_date,
        order_estimated_delivery_date,
        purchase_hour,
        purchase_day_of_week
    ))
        
        bound_statement.consistency_level = ConsistencyLevel.QUORUM
        
        session.execute(bound_statement)
        
        print(f'Record {key} inserted successfully !!')
        
        # Manually commit the offset to Kafka
        consumer.commit(message)
        
    except Exception as err:
        
        print(f"Exception occured while inserting {key} into the table: {err}")
    

# Continually read messages from Kafka
try:
    while True:
        msg = consumer.poll(1.0)

        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition event, not an error
                print('Reached end of partition')
            else:
                print('Error while consuming: {}'.format(msg.error()))
        else:
            print('Successfully consumed record with key {} and value {}'.format(msg.key(), msg.value()))
            process_message(msg)

except KeyboardInterrupt:
    pass

finally:
    consumer.close()
    cluster.shutdown()
    

Successfully consumed record with key b0830fb4747a6c6d20dea0b8c802d7ef_53cdb2fc8bc7dce0b6741e2150273451 and value {'order_id': '53cdb2fc8bc7dce0b6741e2150273451', 'customer_id': 'b0830fb4747a6c6d20dea0b8c802d7ef', 'order_status': 'delivered', 'order_purchase_timestamp': '2018-07-24 20:41:37', 'order_approved_at': '2018-07-26 03:24:27', 'order_delivered_carrier_date': '2018-07-26 14:31:00', 'order_delivered_customer_date': '2018-08-07 15:27:45', 'order_estimated_delivery_date': '2018-08-13 00:00:00', 'OrderHour': None, 'OrderDayOfWeek': None}
Record b0830fb4747a6c6d20dea0b8c802d7ef_53cdb2fc8bc7dce0b6741e2150273451 inserted successfully !!
Successfully consumed record with key f88197465ea7920adcdbec7375364d82_949d5b44dbf5de918fe9c16f97b45f8a and value {'order_id': '949d5b44dbf5de918fe9c16f97b45f8a', 'customer_id': 'f88197465ea7920adcdbec7375364d82', 'order_status': 'delivered', 'order_purchase_timestamp': '2017-11-18 19:28:06', 'order_approved_at': '2017-11-18 19:45:59', 'order_delivered

Record 8b212b9525f9e74e85e37ed6df37693e_dd78f560c270f1909639c11b925620ea inserted successfully !!
Successfully consumed record with key cce89a605105b148387c52e286ac8335_91b2a010e1e45e6ba3d133fa997597be and value {'order_id': '91b2a010e1e45e6ba3d133fa997597be', 'customer_id': 'cce89a605105b148387c52e286ac8335', 'order_status': 'delivered', 'order_purchase_timestamp': '2018-05-02 11:45:38', 'order_approved_at': '2018-05-03 12:55:01', 'order_delivered_carrier_date': '2018-05-10 16:16:00', 'order_delivered_customer_date': '2018-05-16 20:56:24', 'order_estimated_delivery_date': '2018-05-23 00:00:00', 'OrderHour': None, 'OrderDayOfWeek': None}
Record cce89a605105b148387c52e286ac8335_91b2a010e1e45e6ba3d133fa997597be inserted successfully !!
Successfully consumed record with key 52142aa69d8d0e1247ab0cada0f76023_1790eea0b567cf50911c057cf20f90f9 and value {'order_id': '1790eea0b567cf50911c057cf20f90f9', 'customer_id': '52142aa69d8d0e1247ab0cada0f76023', 'order_status': 'delivered', 'order_purcha

Record 8ab97904e6daea8866dbdbc4fb7aad2c_ad21c59c0840e6cb83a9ceb5573f8159 inserted successfully !!
Successfully consumed record with key 503740e9ca751ccdda7ba28e9ab8f608_a4591c265e18cb1dcee52889e2d8acc3 and value {'order_id': 'a4591c265e18cb1dcee52889e2d8acc3', 'customer_id': '503740e9ca751ccdda7ba28e9ab8f608', 'order_status': 'delivered', 'order_purchase_timestamp': '2017-07-09 21:57:05', 'order_approved_at': '2017-07-09 22:10:13', 'order_delivered_carrier_date': '2017-07-11 14:58:04', 'order_delivered_customer_date': '2017-07-26 10:57:55', 'order_estimated_delivery_date': '2017-08-01 00:00:00', 'OrderHour': None, 'OrderDayOfWeek': None}
Record 503740e9ca751ccdda7ba28e9ab8f608_a4591c265e18cb1dcee52889e2d8acc3 inserted successfully !!
Successfully consumed record with key ed0271e0b7da060a393796590e7b737a_136cce7faa42fdb2cefd53fdc79a6098 and value {'order_id': '136cce7faa42fdb2cefd53fdc79a6098', 'customer_id': 'ed0271e0b7da060a393796590e7b737a', 'order_status': 'invoiced', 'order_purchas

Record 816f8653d5361cbf94e58c33f2502a5c_989225ba6d0ebd5873335f7e01de2ae7 inserted successfully !!
Successfully consumed record with key cf8ffeddf027932e51e4eae73b384059_b276e4f8c0fb86bd82fce576f21713e0 and value {'order_id': 'b276e4f8c0fb86bd82fce576f21713e0', 'customer_id': 'cf8ffeddf027932e51e4eae73b384059', 'order_status': 'delivered', 'order_purchase_timestamp': '2018-07-29 23:34:51', 'order_approved_at': '2018-07-29 23:45:15', 'order_delivered_carrier_date': '2018-07-30 14:43:00', 'order_delivered_customer_date': '2018-07-31 22:48:50', 'order_estimated_delivery_date': '2018-08-06 00:00:00', 'OrderHour': None, 'OrderDayOfWeek': None}
Record cf8ffeddf027932e51e4eae73b384059_b276e4f8c0fb86bd82fce576f21713e0 inserted successfully !!
Successfully consumed record with key f5458ddc3545711efa883dd7ae7c4497_60550084e6b4c0cb89a87df1f3e5ebd9 and value {'order_id': '60550084e6b4c0cb89a87df1f3e5ebd9', 'customer_id': 'f5458ddc3545711efa883dd7ae7c4497', 'order_status': 'delivered', 'order_purcha