<a href="https://colab.research.google.com/github/urmilapol/urmilapolprojects/blob/master/Copy_of_kafka.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Real-Time Data Streaming**

Adding Apache Kafka to this lab transforms it from a simple batch ETL process into a modern Event-Driven Data Pipeline. In this scenario, we move away from "pulling" data and toward "streaming" data as it happens.

The Streaming Architecture
In a real-world data engineering environment:

Producer: A web app sends user events to a Kafka Topic.

Stream Processor: A Python script listens to the topic, cleans the data in real-time.

Sink (Storage): The processed data is written to PostgreSQL for immediate dashboarding.

In [None]:
# 1. Install Postgres
!apt install postgresql postgresql-contrib &> /dev/null

# Configure PostgreSQL to accept password authentication for user 'postgres' and create database
!service postgresql start
!sudo -u postgres psql -c "ALTER USER postgres WITH PASSWORD 'password123';"
!sudo -u postgres psql -c "CREATE DATABASE silver_events;"
!service postgresql restart

 * Starting PostgreSQL 14 database server
   ...done.
ALTER ROLE
ERROR:  database "silver_events" already exists
 * Restarting PostgreSQL 14 database server
   ...done.


In [None]:
# 2. Install Kafka (Downloads the binaries)
!curl -sSOL https://archive.apache.org/dist/kafka/3.1.0/kafka_2.13-3.1.0.tgz
!tar -xzf kafka_2.13-3.1.0.tgz

# 3. Start Zookeeper and Kafka Server
!./kafka_2.13-3.1.0/bin/zookeeper-server-start.sh ./kafka_2.13-3.1.0/config/zookeeper.properties > zookeeper.log 2>&1 &
!./kafka_2.13-3.1.0/bin/kafka-server-start.sh ./kafka_2.13-3.1.0/config/server.properties > kafka.log 2>&1 &

# Give Kafka a moment to start up
!sleep 10

# **This script simulates a continuous stream of user clickstream data.**

In [None]:
import threading
import json
import time
import random
from kafka import KafkaProducer # Moved this import here

!pip install kafka-python # Moved installation to the top level

def run_producer():
    # The entire producer logic will now be correctly placed and indented here
    producer = KafkaProducer(
        bootstrap_servers=['localhost:9092'],
        value_serializer=lambda x: json.dumps(x).encode('utf-8')
    )

    actions = ['click', 'view', 'add_to_cart', 'purchase']

    print("Streaming started from producer thread...")
    while True:
        data = {
            "user_id": random.randint(1000, 9999),
            "action": random.choice(actions),
            "timestamp": time.time()
        }
        producer.send('user-events', value=data)
        time.sleep(1)  # Send one event per second

# This starts the producer in the background
producer_thread = threading.Thread(target=run_producer, daemon=True)
producer_thread.start()
print("Producer is running in the background...")

Producer is running in the background...


# This script acts as the Data Engineer's "Processor." It consumes the raw stream, filters out only the "purchase" events, and retrieves them into PostgreSQL.


In [None]:
from kafka import KafkaConsumer
import psycopg2
import json

# Connect to Postgres
conn = psycopg2.connect("host=localhost dbname=silver_events user=postgres password=password123")
cur = conn.cursor()

# Create the table if it doesn't exist
cur.execute("CREATE TABLE IF NOT EXISTS silver_events (u_id INTEGER, act VARCHAR(255), val INTEGER);")
conn.commit()

# Initialize Kafka Consumer
consumer = KafkaConsumer(
    'user-events',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

print("Listening for events...")
for message in consumer:
    event = message.value

    # Logic: Only store high-value events (Purchases)
    if event['action'] == 'purchase':
        print(f"Storing Purchase: {event}")
        cur.execute(
            "INSERT INTO silver_events (u_id, act, val) VALUES (%s, %s, %s)",
            (event['user_id'], event['action'], 1)
        )
        conn.commit()



Listening for events...
Storing Purchase: {'user_id': 7044, 'action': 'purchase', 'timestamp': 1767784121.754121}
Storing Purchase: {'user_id': 4352, 'action': 'purchase', 'timestamp': 1767784131.758446}
Storing Purchase: {'user_id': 5594, 'action': 'purchase', 'timestamp': 1767784135.7601395}
Storing Purchase: {'user_id': 1404, 'action': 'purchase', 'timestamp': 1767784140.7622867}
Storing Purchase: {'user_id': 5848, 'action': 'purchase', 'timestamp': 1767784141.7627509}
Storing Purchase: {'user_id': 8297, 'action': 'purchase', 'timestamp': 1767784145.7642791}
Storing Purchase: {'user_id': 7883, 'action': 'purchase', 'timestamp': 1767784146.7646616}
Storing Purchase: {'user_id': 8405, 'action': 'purchase', 'timestamp': 1767784148.7655537}
Storing Purchase: {'user_id': 6585, 'action': 'purchase', 'timestamp': 1767784151.7667046}
Storing Purchase: {'user_id': 6901, 'action': 'purchase', 'timestamp': 1767784152.767161}
Storing Purchase: {'user_id': 5277, 'action': 'purchase', 'timestamp'