In [None]:
from kafka import KafkaConsumer
import json
import os
import pandas as pd
from sqlalchemy import create_engine
from dotenv import load_dotenv

load_dotenv()
# PostgreSQL connection
POSTGRES_USER = os.getenv("POSTGRES_USER")
POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD")
POSTGRES_HOST = os.getenv("POSTGRES_HOST", "localhost")
POSTGRES_PORT = os.getenv("POSTGRES_PORT", "5435")
POSTGRES_DB = os.getenv("POSTGRES_DB")

# SQLAlchemy engine
engine = create_engine(f'postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}')


# Define Kafka consumer
consumer = KafkaConsumer(
    os.getenv("TOPIC_NAME"),  # The topic to consume messages from
    bootstrap_servers=os.getenv("KAFKA_BROKER"),  # List of Kafka brokers to connect to
    auto_offset_reset='earliest',  # Where to start reading messages when no offset is stored ('earliest' to read from the beginning)
    enable_auto_commit=True,  # Automatically commit offsets after consuming messages
    value_deserializer=lambda x: x.decode('utf-8') if x else None  # Deserialize message values from bytes to UTF-8 strings
)

# Consume messages with error handling for non-JSON messages
for message in consumer:
    try:
        data = json.loads(msg.value)

        room_id = data.get('room_id')
        room_created_at = data.get('room_created_at')
        channel = data.get('channel')
        customer = data.get('customer', {})
        messages = data.get('messages', [])

        rows = []
        for message in messages:
            rows.append({
                "message_id": message.get("message_id"),
                "room_id": room_id,
                "room_created_at": room_created_at,
                "channel": channel,
                "customer_id": customer.get("customer_id"),
                "customer_name": customer.get("customer_name"),
                "phone": customer.get("phone"),
                "sender_type": message.get("sender_type"),
                "message_text": message.get("message_text"),
                "message_date": message.get("message_date")
            })

        df = pd.DataFrame(rows)

        # Insert into Postgres
        df.to_sql("fact_message", engine, if_exists='append', index=False)

        print(f"Inserted {len(df)} messages into Postgres.")

    except Exception as e:
        print(f"Error processing message: {e}")

{"room_id": "300001", "room_created_at": "2024-10-22T04:42:30", "channel": "ads", "customer": {"customer_id": "cust_001", "customer_name": "Alice Johnson", "phone": "081234567891"}, "messages": [{"message_id": "msg_001", "sender_type": "customer", "message_text": "Hi! I'm interested in swimming classes", "message_date": "2024-10-22T04:42:30"}, {"message_id": "msg_002", "sender_type": "agent", "message_text": "Sure! We have classes available for 3-5 Y.O.", "message_date": "2024-10-22T04:45:30"}, {"message_id": "msg_003", "sender_type": "system", "message_text": "Booking confirmed for 24 Oct", "message_date": "2024-10-23T10:00:00"}, {"message_id": "msg_004", "sender_type": "system", "message_text": "Payment of IDR 500000 confirmed", "message_date": "2024-10-24T08:00:00"}]}
{"room_id": "300002", "room_created_at": "2024-10-22T04:47:25", "channel": "website", "customer": {"customer_id": "cust_002", "customer_name": "Bob Smith", "phone": "081234567892"}, "messages": [{"message_id": "msg_005

KeyboardInterrupt: 

In [13]:
import os
import json
import psycopg2
from psycopg2.extras import execute_values
from kafka import KafkaConsumer
import logging
from datetime import datetime

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def deserialize_json(x):
    """Deserialize message value to JSON"""
    if x is None:
        return None
    try:
        return json.loads(x.decode('utf-8'))
    except json.JSONDecodeError as e:
        logger.error(f"Failed to decode JSON: {e}")
        return None

def create_kafka_consumer():
    """Create Kafka consumer with JSON deserialization"""
    return KafkaConsumer(
        os.getenv("TOPIC_NAME"),
        bootstrap_servers=os.getenv("KAFKA_BROKER"),
        auto_offset_reset='earliest',
        enable_auto_commit=True,
        value_deserializer=deserialize_json
    )

def create_pg_connection():
    """Create PostgreSQL connection"""
    try:
        connection = psycopg2.connect(
            host='localhost',
            database='streaming',
            user='streaming',
            password='password',
            port=5435
        )
        connection.autocommit = False  # We'll handle transactions manually
        return connection
    except Exception as e:
        logger.error(f"Failed to connect to PostgreSQL: {e}")
        raise

def validate_room_data(room):
    """Validate room data structure"""
    required_fields = ['room_id', 'customer']
    for field in required_fields:
        if field not in room:
            logger.warning(f"Missing required field '{field}' in room data")
            return False
    
    if 'customer_id' not in room.get('customer', {}):
        logger.warning("Missing customer_id in customer data")
        return False
        
    return True

def parse_datetime(date_str):
    """Parse datetime string to datetime object"""
    if not date_str:
        return None
    
    try:
        # Handle different datetime formats
        for fmt in ['%Y-%m-%dT%H:%M:%S', '%Y-%m-%d %H:%M:%S']:
            try:
                return datetime.strptime(date_str, fmt)
            except ValueError:
                continue
        
        logger.warning(f"Could not parse datetime: {date_str}")
        return None
        
    except Exception as e:
        logger.error(f"Error parsing datetime {date_str}: {e}")
        return None

def process_chat_data_batch(rooms_batch, pg_connection):
    """Process a batch of room data and insert into normalized tables"""
    if not rooms_batch:
        return
        
    try:
        cursor = pg_connection.cursor()
        
        # Prepare data for batch insertion
        customers_data = []
        rooms_data = []
        messages_data = []
        
        # Extract and prepare data
        for room in rooms_batch:
            if not validate_room_data(room):
                continue
            
            # Extract customer data
            customer = room.get('customer', {})
            customer_record = (
                customer.get('customer_id'),
                customer.get('customer_name'),
                customer.get('phone')
            )
            customers_data.append(customer_record)
            
            # Extract room data
            room_record = (
                room.get('room_id'),
                customer.get('customer_id'),
                room.get('channel'),
                parse_datetime(room.get('room_created_at'))
            )
            rooms_data.append(room_record)
            
            # Extract messages data
            for message in room.get('messages', []):
                message_record = (
                    message.get('message_id'),
                    room.get('room_id'),
                    message.get('sender_type'),
                    message.get('message_text'),
                    parse_datetime(message.get('message_date'))
                )
                messages_data.append(message_record)
        
        # Insert customers (with conflict resolution)
        if customers_data:
            execute_values(
                cursor,
                """
                INSERT INTO stream.customers (customer_id, customer_name, phone) 
                VALUES %s 
                ON CONFLICT (customer_id) 
                DO UPDATE SET 
                    customer_name = EXCLUDED.customer_name,
                    phone = EXCLUDED.phone,
                    updated_at = CURRENT_TIMESTAMP
                """,
                customers_data,
                template=None,
                page_size=100
            )
        
        # Insert rooms (with conflict resolution)
        if rooms_data:
            execute_values(
                cursor,
                """
                INSERT INTO stream.rooms (room_id, customer_id, channel, room_created_at) 
                VALUES %s 
                ON CONFLICT (room_id) DO NOTHING
                """,
                rooms_data,
                template=None,
                page_size=100
            )
        
        # Insert messages (with conflict resolution)
        if messages_data:
            execute_values(
                cursor,
                """
                INSERT INTO stream.messages (message_id, room_id, sender_type, message_text, message_date) 
                VALUES %s 
                ON CONFLICT (message_id) DO NOTHING
                """,
                messages_data,
                template=None,
                page_size=100
            )
        
        pg_connection.commit()
        
        logger.info(f"Successfully processed batch: {len(customers_data)} customers, "
                   f"{len(rooms_data)} rooms, {len(messages_data)} messages")
        
    except Exception as e:
        logger.error(f"Failed to process batch: {e}")
        pg_connection.rollback()
        raise

def process_messages_batch(consumer, pg_connection, batch_size=50):
    """Process messages in batches for better performance"""
    message_batch = []
    
    try:
        for message in consumer:
            if message.value is not None:
                # Handle both single room object and array of rooms
                rooms_data = message.value
                if isinstance(rooms_data, dict):
                    rooms_data = [rooms_data]  # Convert single object to array
                elif not isinstance(rooms_data, list):
                    logger.warning(f"Unexpected data format: {type(rooms_data)}")
                    continue
                
                message_batch.extend(rooms_data)
                
                # Process batch when it reaches the specified size
                if len(message_batch) >= batch_size:
                    process_chat_data_batch(message_batch, pg_connection)
                    message_batch = []
                    
    except KeyboardInterrupt:
        logger.info("Received interrupt signal, processing remaining messages...")
        if message_batch:
            process_chat_data_batch(message_batch, pg_connection)
    except Exception as e:
        logger.error(f"Error processing messages: {e}")
        raise

def close_connections(consumer, pg_connection):
    """Close all connections"""
    try:
        if consumer:
            consumer.close()
        if pg_connection:
            pg_connection.close()
        logger.info("Connections closed successfully")
    except Exception as e:
        logger.error(f"Error closing connections: {e}")

def validate_environment():
    """Validate required environment variables"""
    required_env_vars = [
        "TOPIC_NAME", "KAFKA_BROKER", "POSTGRES_DB", 
        "POSTGRES_USER", "POSTGRES_PASSWORD"
    ]
    
    missing_vars = [var for var in required_env_vars if not os.getenv(var)]
    if missing_vars:
        logger.error(f"Missing required environment variables: {missing_vars}")
        return False
    return True

def main():
    """Main function to run the Chat Data Processor"""
    
    # Validate environment variables
    if not validate_environment():
        return
    
    logger.info("Starting Chat Data Processor...")
    logger.info(f"Topic: {os.getenv('TOPIC_NAME')}")
    logger.info(f"Kafka Broker: {os.getenv('KAFKA_BROKER')}")
    logger.info(f"PostgreSQL Database: {os.getenv('POSTGRES_DB')}")
    
    # Initialize connections
    consumer = None
    pg_connection = None
    
    try:
        # Create connections
        consumer = create_kafka_consumer()
        pg_connection = create_pg_connection()
         
        # Start processing messages
        process_messages_batch(consumer, pg_connection)
        
    except Exception as e:
        logger.error(f"Processor failed: {e}")
        raise
    finally:
        # Clean up connections
        close_connections(consumer, pg_connection)

if __name__ == "__main__":
    main()

INFO:__main__:Starting Chat Data Processor...
INFO:__main__:Topic: chat_msgs
INFO:__main__:Kafka Broker: localhost:9092
INFO:__main__:PostgreSQL Database: messages_db
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: connecting to localhost:9092 [('::1', 9092, 0, 0) IPv6]
INFO:kafka.conn:Probing node bootstrap-0 broker version
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: Connection complete.
INFO:kafka.conn:Broker version identified as 2.6.0
INFO:kafka.conn:Set configuration api_version=(2, 6, 0) to skip auto check_version requests on startup
INFO:kafka.consumer.subscription_state:Updating subscribed topics to: ('chat_msgs',)
INFO:kafka.consumer.subscription_state:Updated partition assignment: [TopicPartition(topic='chat_msgs', partition=0)]
INFO:kafka.conn:<BrokerConnection node_id=1001 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: co

CardinalityViolation: ON CONFLICT DO UPDATE command cannot affect row a second time
HINT:  Ensure that no rows proposed for insertion within the same command have duplicate constrained values.


In [None]:
def get_analytics_queries():
    """Return useful analytics queries for the chat data"""
    return {
        "messages_per_channel": """
            SELECT r.channel, COUNT(m.message_id) as message_count
            FROM rooms r
            LEFT JOIN messages m ON r.room_id = m.room_id
            GROUP BY r.channel
            ORDER BY message_count DESC;
        """,
        
        "customer_activity": """
            SELECT c.customer_name, c.phone, 
                   COUNT(DISTINCT r.room_id) as total_rooms,
                   COUNT(m.message_id) as total_messages
            FROM customers c
            LEFT JOIN rooms r ON c.customer_id = r.customer_id
            LEFT JOIN messages m ON r.room_id = m.room_id
            GROUP BY c.customer_id, c.customer_name, c.phone
            ORDER BY total_messages DESC;
        """,
        
        "daily_message_volume": """
            SELECT DATE(m.message_date) as message_date,
                   m.sender_type,
                   COUNT(*) as message_count
            FROM messages m
            WHERE m.message_date IS NOT NULL
            GROUP BY DATE(m.message_date), m.sender_type
            ORDER BY message_date DESC, sender_type;
        """,
        
        "recent_conversations": """
            SELECT r.room_id, c.customer_name, r.channel,
                   r.room_created_at, COUNT(m.message_id) as message_count,
                   MAX(m.message_date) as last_message_date
            FROM rooms r
            JOIN customers c ON r.customer_id = c.customer_id
            LEFT JOIN messages m ON r.room_id = m.room_id
            GROUP BY r.room_id, c.customer_name, r.channel, r.room_created_at
            ORDER BY last_message_date DESC
            LIMIT 20;
        """
    }