In [1]:
import pandas as pd
import json
from kafka import KafkaProducer
import logging

In [2]:
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()

In [3]:
# Load embedded CSV
df_sales_embedded = pd.read_csv('../../Data/fact_sales_embedded.csv')

In [4]:
# Function to create MongoDB JSON object structure
def to_embedded_document(row):
    return {
        "_id": f"{row['order_id']}_{row['product_id']}",  # For idempotency
        "order_id": row["order_id"],
        "date": {
            "order_date": row["order_date"],
            "year": row["year"],
            "month": row["month"],
            "day": row["day"]
        },
        "customer": {
            "customer_id": row["customer_id"],
            "ship_city": row["ship-city"],
            "ship_state": row["ship-state"],
            "ship_postal_code": row["ship-postal-code"],
            "ship_country": row["ship-country"]
        },
        "product": {
            "product_id": row["product_id"],
            "SKU": row["SKU"],
            "Category": row["Category"],
            "Style": row["Style"],
            "Size": row["Size"]
        },
        "fulfillment": {
            "fulfillment_id": row["fulfillment_id"],
            "Fulfilment": row["Fulfilment"],
            "Sales_Channel": row["Sales_Channel_"],
            "ship_service_level": row["ship-service-level"]
        },
        "promotion": row["promotion-ids"],
        "status": row["status_name"],
        "courier_status": row["courier_status_name"],
        "quantity": row["quantity"],
        "amount": row["amount"],
        "currency": row["currency"],
        "B2B": row["B2B"],
        "fulfilled_by": row["fulfilled-by"],
        "timestamp": row["timestamp"]
    }

In [5]:
# Producer config
producer = KafkaProducer(
    bootstrap_servers='localhost:29092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    linger_ms=0,           # No delay
    batch_size=0           # Disable batching
)

INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:29092 <connecting> [IPv6 ('::1', 29092, 0, 0)]>: connecting to localhost:29092 [('::1', 29092, 0, 0) IPv6]
INFO:kafka.conn:Probing node bootstrap-0 broker version
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:29092 <connecting> [IPv6 ('::1', 29092, 0, 0)]>: Connection complete.
INFO:kafka.conn:Broker version identified as 2.6.0
INFO:kafka.conn:Set configuration api_version=(2, 6, 0) to skip auto check_version requests on startup


In [6]:
# Send one message at a time
for _, row in df_sales_embedded.iterrows():
    message = to_embedded_document(row)
    producer.send('fact_sales_topic_2', message).get(timeout=10)  # Block until ack

producer.close()

INFO:kafka.conn:<BrokerConnection node_id=1 host=127.0.0.1:29092 <connecting> [IPv4 ('127.0.0.1', 29092)]>: connecting to 127.0.0.1:29092 [('127.0.0.1', 29092) IPv4]
INFO:kafka.conn:<BrokerConnection node_id=1 host=127.0.0.1:29092 <connecting> [IPv4 ('127.0.0.1', 29092)]>: Connection complete.
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:29092 <connected> [IPv6 ('::1', 29092, 0, 0)]>: Closing connection. 
INFO:kafka.producer.kafka:Closing the Kafka producer with 4294967.0 secs timeout.
INFO:kafka.conn:<BrokerConnection node_id=1 host=127.0.0.1:29092 <connected> [IPv4 ('127.0.0.1', 29092)]>: Closing connection. 
