In [1]:
!apt-get install -y openjdk-8-jdk-headless -qq > /dev/null
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"


In [2]:
#  Kafka
!wget -q https://archive.apache.org/dist/kafka/3.4.0/kafka_2.13-3.4.0.tgz
!tar -xzf kafka_2.13-3.4.0.tgz
!mv kafka_2.13-3.4.0 kafka

In [3]:
#  Spark
!wget -q https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz
!tar -xzf spark-3.4.1-bin-hadoop3.tgz


In [53]:
#  Python packages
!pip install -q kafka-python pyspark findspark confluent-kafka

import findspark
findspark.init('/content/spark-3.4.1-bin-hadoop3')

print(" Kafka and Spark installed successfully!\n")

 Kafka and Spark installed successfully!



In [54]:
# Import all required libraries
import subprocess
import time
import threading
import json
import random
from datetime import datetime, timedelta
from kafka import KafkaProducer, KafkaConsumer, KafkaAdminClient
from kafka.admin import NewTopic
from kafka.errors import TopicAlreadyExistsError
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')

print(" All libraries imported!\n")

 All libraries imported!



In [55]:
print(" Phase 2: Starting Kafka Infrastructure...\n")

# Start Zookeeper
zk_process = subprocess.Popen(
    ['./kafka/bin/zookeeper-server-start.sh', './kafka/config/zookeeper.properties'],
    stdout=subprocess.DEVNULL,
    stderr=subprocess.DEVNULL
)
print(" Zookeeper started")
time.sleep(5)

 Phase 2: Starting Kafka Infrastructure...

 Zookeeper started


In [57]:
#  Kafka Server
kafka_process = subprocess.Popen(
    ['./kafka/bin/kafka-server-start.sh', './kafka/config/server.properties'],
    stdout=subprocess.DEVNULL,
    stderr=subprocess.DEVNULL
)
print(" Kafka Server started")
time.sleep(10)

print("\n Kafka infrastructure ready!\n")

 Kafka Server started

 Kafka infrastructure ready!



In [58]:
print(" Phase 3: Creating Kafka Topics...\n")

class KafkaTopicManager:
    """Manage Kafka topics for traffic data pipeline"""

    def __init__(self, bootstrap_servers='localhost:9092'):
        self.admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)

    def create_topic(self, topic_name, num_partitions=3, replication_factor=1):
        """Create Kafka topic with partitioning"""
        topic = NewTopic(
            name=topic_name,
            num_partitions=num_partitions,
            replication_factor=replication_factor
        )
        try:
            self.admin_client.create_topics([topic])
            print(f" Created topic: {topic_name} ({num_partitions} partitions)")
        except TopicAlreadyExistsError:
            print(f"â„¹  Topic {topic_name} already exists")

 Phase 3: Creating Kafka Topics...



In [59]:

topic_manager = KafkaTopicManager()
topic_manager.create_topic('traffic-raw-stream', num_partitions=5)
topic_manager.create_topic('weather-raw-stream', num_partitions=3)
topic_manager.create_topic('road-sensor-stream', num_partitions=4)
topic_manager.create_topic('traffic-enriched-stream', num_partitions=5)
topic_manager.create_topic('accident-predictions', num_partitions=3)
topic_manager.create_topic('high-risk-alerts', num_partitions=2)

â„¹  Topic traffic-raw-stream already exists
â„¹  Topic weather-raw-stream already exists
â„¹  Topic road-sensor-stream already exists
â„¹  Topic traffic-enriched-stream already exists
â„¹  Topic accident-predictions already exists
â„¹  Topic high-risk-alerts already exists


In [61]:
# PHASE 4: DATA PRODUCERS (SIMULATING REAL-TIME DATA SOURCES)
# ================================================================

print("="*80)
print("PHASE 4: REAL-TIME DATA PRODUCERS")
print("="*80)

# ----------------- TRAFFIC DATA PRODUCER -----------------
class TrafficDataProducer:
    """
    Producer 1: Real-time traffic data from road sensors
    Simulates: Traffic volume, speed, road conditions
    """

    def __init__(self):
        self.producer = KafkaProducer(
            bootstrap_servers='localhost:9092',
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            compression_type='gzip',
            batch_size=16384,
            linger_ms=10
        )
        self.locations = [f'LOC_{i:03d}' for i in range(1, 51)]
        self.road_types = ['Highway', 'Urban', 'Rural', 'Residential']
        self.road_conditions = ['Dry', 'Wet', 'Icy', 'Under Construction']
        self.event_count = 0

    def generate_traffic_data(self):
        """Generate realistic traffic data"""
        hour = datetime.now().hour

        # Traffic volume varies by time
        if 7 <= hour <= 9 or 17 <= hour <= 19:  # Rush hours
            traffic_volume = random.randint(70, 100)
            avg_speed = random.randint(20, 50)
        elif 22 <= hour or hour <= 5:  # Night
            traffic_volume = random.randint(0, 30)
            avg_speed = random.randint(60, 80)
        else:  # Normal hours
            traffic_volume = random.randint(30, 70)
            avg_speed = random.randint(40, 70)

        data = {
            'event_id': f'TRF_{self.event_count:08d}',
            'location_id': random.choice(self.locations),
            'latitude': round(random.uniform(12.9, 13.1), 6),
            'longitude': round(random.uniform(77.5, 77.7), 6),
            'timestamp': datetime.now().isoformat(),
            'road_type': random.choice(self.road_types),
            'traffic_volume': traffic_volume,
            'avg_speed': avg_speed,
            'speed_limit': random.choice([30, 40, 50, 60, 70, 80]),
            'road_condition': random.choice(self.road_conditions),
            'num_vehicles': random.randint(0, 50),
            'heavy_vehicles': random.randint(0, 10),
            'construction_zone': random.choice([0, 1]) if random.random() < 0.1 else 0,
            'school_zone': random.choice([0, 1]) if random.random() < 0.05 else 0
        }

        self.event_count += 1
        return data

    def produce_stream(self, num_events=1000, delay=0.05):
        """Produce continuous stream of traffic data"""
        print("\n Traffic Producer: Streaming data to Kafka...\n")

        for i in range(num_events):
            data = self.generate_traffic_data()
            self.producer.send('traffic-raw-stream', value=data)

            if (i + 1) % 200 == 0:
                print(f"   Sent {i + 1} traffic events...")

            time.sleep(delay)

        self.producer.flush()
        print(f"\n Traffic Producer: Sent {num_events} events\n")


PHASE 4: REAL-TIME DATA PRODUCERS


In [62]:
# ----------------- WEATHER DATA PRODUCER -----------------
class WeatherDataProducer:
    """
    Producer 2: Real-time weather data
    Simulates: Weather conditions, temperature, visibility
    """

    def __init__(self):
        self.producer = KafkaProducer(
            bootstrap_servers='localhost:9092',
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            compression_type='gzip'
        )
        self.locations = [f'LOC_{i:03d}' for i in range(1, 51)]
        self.weather_conditions = ['Clear', 'Rain', 'Fog', 'Snow', 'Cloudy']
        self.event_count = 0

    def generate_weather_data(self):
        """Generate realistic weather data"""
        weather = random.choice(self.weather_conditions)

        # Weather-dependent values
        if weather == 'Clear':
            visibility = random.randint(80, 100)
            temperature = random.uniform(20, 35)
            precipitation = 0
        elif weather == 'Rain':
            visibility = random.randint(30, 70)
            temperature = random.uniform(15, 25)
            precipitation = random.uniform(5, 30)
        elif weather == 'Fog':
            visibility = random.randint(10, 40)
            temperature = random.uniform(10, 20)
            precipitation = 0
        elif weather == 'Snow':
            visibility = random.randint(20, 50)
            temperature = random.uniform(-5, 10)
            precipitation = random.uniform(10, 40)
        else:  # Cloudy
            visibility = random.randint(60, 90)
            temperature = random.uniform(18, 28)
            precipitation = 0

        data = {
            'event_id': f'WTH_{self.event_count:08d}',
            'location_id': random.choice(self.locations),
            'timestamp': datetime.now().isoformat(),
            'weather_condition': weather,
            'temperature': round(temperature, 2),
            'visibility': visibility,
            'precipitation': round(precipitation, 2),
            'humidity': random.randint(30, 95),
            'wind_speed': round(random.uniform(0, 30), 2),
            'atmospheric_pressure': round(random.uniform(980, 1020), 2)
        }

        self.event_count += 1
        return data

    def produce_stream(self, num_events=500, delay=0.1):
        """Produce continuous stream of weather data"""
        print("\nðŸ“¤ Weather Producer: Streaming data to Kafka...\n")

        for i in range(num_events):
            data = self.generate_weather_data()
            self.producer.send('weather-raw-stream', value=data)

            if (i + 1) % 100 == 0:
                print(f"   Sent {i + 1} weather events...")

            time.sleep(delay)

        self.producer.flush()
        print(f"\n Weather Producer: Sent {num_events} events\n")


In [63]:

class RoadSensorProducer:
    """
    Producer 3: IoT road sensors
    Simulates: Road quality, surface conditions, lighting
    """

    def __init__(self):
        self.producer = KafkaProducer(
            bootstrap_servers='localhost:9092',
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
        self.locations = [f'LOC_{i:03d}' for i in range(1, 51)]
        self.lighting_conditions = ['Daylight', 'Dark', 'Dusk/Dawn', 'Street Lights']
        self.event_count = 0

    def generate_sensor_data(self):
        """Generate road sensor data"""
        hour = datetime.now().hour

        # Lighting based on time
        if 6 <= hour <= 18:
            lighting = 'Daylight'
        elif 19 <= hour <= 21 or 5 <= hour <= 6:
            lighting = 'Dusk/Dawn'
        else:
            lighting = random.choice(['Dark', 'Street Lights'])

        data = {
            'event_id': f'SEN_{self.event_count:08d}',
            'sensor_id': f'SENSOR_{random.randint(1, 100):03d}',
            'location_id': random.choice(self.locations),
            'timestamp': datetime.now().isoformat(),
            'lighting_condition': lighting,
            'road_surface_quality': random.uniform(0, 10),  # 0=poor, 10=excellent
            'lane_markings_visible': random.choice([0, 1]),
            'traffic_signal_present': random.choice([0, 1]),
            'pedestrian_crossing': random.choice([0, 1]),
            'road_width_meters': round(random.uniform(6, 20), 2),
            'number_of_lanes': random.randint(1, 4),
            'curve_present': random.choice([0, 1]) if random.random() < 0.3 else 0,
            'slope_grade': round(random.uniform(-10, 10), 2)
        }

        self.event_count += 1
        return data

    def produce_stream(self, num_events=800, delay=0.03):
        """Produce continuous stream of sensor data"""
        print("\nðŸ“¤ Sensor Producer: Streaming data to Kafka...\n")

        for i in range(num_events):
            data = self.generate_sensor_data()
            self.producer.send('road-sensor-stream', value=data)

            if (i + 1) % 200 == 0:
                print(f"   Sent {i + 1} sensor events...")

            time.sleep(delay)

        self.producer.flush()
        print(f"\n Sensor Producer: Sent {num_events} events\n")


In [79]:
# Start all producers in parallel threads
traffic_producer = TrafficDataProducer()
weather_producer = WeatherDataProducer()
sensor_producer = RoadSensorProducer()

traffic_thread = threading.Thread(target=traffic_producer.produce_stream, args=(1000, 0.03))
weather_thread = threading.Thread(target=weather_producer.produce_stream, args=(500, 0.06))
sensor_thread = threading.Thread(target=sensor_producer.produce_stream, args=(800, 0.04))

traffic_thread.daemon = True
weather_thread.daemon = True
sensor_thread.daemon = True

print("\n Starting all producers in parallel...\n")
traffic_thread.start()
weather_thread.start()
sensor_thread.start()


time.sleep(5)


Exception in thread Thread-23 (produce_stream):
Traceback (most recent call last):
  File "/usr/lib/python3.12/threading.py", line 1075, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.12/threading.py", line 1012, in run
    self._target(*self._args, **self._kwargs)
  File "/tmp/ipython-input-1326548535.py", line 68, in produce_stream
  File "/tmp/ipython-input-1326548535.py", line 46, in generate_traffic_data
Exception in thread Thread-24 (produce_stream):
Traceback (most recent call last):
  File "/usr/lib/python3.12/threading.py", line 1075, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.12/threading.py", line 1012, in run
    self._target(*self._args, **self._kwargs)
  File "/tmp/ipython-input-926756882.py", line 65, in produce_stream
  File "/tmp/ipython-input-926756882.py", line 49, in generate_weather_data
  File "/content/spark-3.4.1-bin-hadoop3/python/pyspark/sql/utils.py", line 160, in wrapped
Exception in thread Thread-25 (produce_stream):
Traceback 


 Starting all producers in parallel...


 Traffic Producer: Streaming data to Kafka...


ðŸ“¤ Weather Producer: Streaming data to Kafka...


ðŸ“¤ Sensor Producer: Streaming data to Kafka...



In [65]:
# PHASE 5: SPARK SESSION INITIALIZATION
# ================================================================

print("\n" + "="*80)
print("PHASE 5: INITIALIZING SPARK SESSION")
print("="*80 + "\n")

spark = SparkSession.builder \
    .appName("TrafficAccidentPredictionPipeline") \
    .config("spark.sql.shuffle.partitions", "10") \
    .config("spark.default.parallelism", "10") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .master("local[*]") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

print("âœ… Spark Session initialized")
print(f"   Spark Version: {spark.version}")
print(f"   Available Cores: {spark.sparkContext.defaultParallelism}")
print()


PHASE 5: INITIALIZING SPARK SESSION

âœ… Spark Session initialized
   Spark Version: 3.4.1
   Available Cores: 10



In [66]:
 # PHASE 6: SPARK STREAMING CONSUMERS - BRONZE LAYER
# ================================================================

print("="*80)
print("PHASE 6: SPARK STREAMING - BRONZE LAYER (RAW DATA)")
print("="*80 + "\n")

print("  Reading streams from Kafka...\n")


PHASE 6: SPARK STREAMING - BRONZE LAYER (RAW DATA)

  Reading streams from Kafka...



In [72]:
import os, findspark
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"
findspark.init()


In [70]:
# Simulate a live stream (acts like Kafka messages)
import socket, time, json, threading, random

def accident_stream_server():
    server = socket.socket()
    server.bind(("localhost", 9999))
    server.listen(1)
    conn, _ = server.accept()
    while True:
        msg = {
            "timestamp": time.time(),
            "location": random.choice(["Delhi","Mumbai","Chennai","Kolkata"]),
            "weather": random.choice(["Rainy","Clear","Foggy","Sunny"]),
            "vehicles_involved": random.randint(1,5),
            "severity": random.choice(["Low","Medium","High"])
        }
        conn.send((json.dumps(msg) + "\n").encode("utf-8"))
        time.sleep(2)

threading.Thread(target=accident_stream_server).start()


In [71]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, DoubleType

spark = SparkSession.builder.appName("AccidentPipeline").getOrCreate()

schema = (StructType()
          .add("timestamp", DoubleType())
          .add("location", StringType())
          .add("weather", StringType())
          .add("vehicles_involved", IntegerType())
          .add("severity", StringType()))

bronze = (spark.readStream
          .format("socket")
          .option("host", "localhost")
          .option("port", 9999)
          .load())

from pyspark.sql.functions import from_json, col

bronze_parsed = bronze.select(from_json(col("value"), schema).alias("data")).select("data.*")


bronze_query = (bronze_parsed
                .writeStream
                .format("parquet")
                .option("path", "/content/bronze")
                .option("checkpointLocation", "/content/chk_bronze")
                .outputMode("append")
                .start())


In [73]:
from pyspark.sql.functions import when

silver = (bronze_parsed
          .withColumn("weather", when(col("weather")=="Foggy", "Low Visibility").otherwise(col("weather")))
          .withColumn("accident_index", col("vehicles_involved") *
                      when(col("severity")=="High", 3)
                      .when(col("severity")=="Medium", 2)
                      .otherwise(1)))

silver_query = (silver
                .writeStream
                .format("parquet")
                .option("path", "/content/silver")
                .option("checkpointLocation", "/content/chk_silver")
                .outputMode("append")
                .start())


In [78]:
from pyspark.sql.functions import col, from_unixtime, avg, count, window


silver_fixed = silver.withColumn("event_time", from_unixtime(col("timestamp")).cast("timestamp"))


gold = (silver_fixed
        .withWatermark("event_time", "1 minute")
        .groupBy(
            window(col("event_time"), "1 minute"),
            col("location")
        )
        .agg(
            avg("accident_index").alias("avg_accident_index"),
            count("*").alias("record_count")
        )
        .select(
            col("window.start").alias("window_start"),
            col("window.end").alias("window_end"),
            col("location"),
            col("avg_accident_index"),
            col("record_count")
        ))


gold_query = (gold
              .writeStream
              .format("parquet")
              .option("path", "/content/gold")
              .option("checkpointLocation", "/content/chk_gold")
              .outputMode("append")
              .start())
