This project implements a professional data pipeline that:

Ingests official air quality data from the Central Pollution Control Board (CPCB).

Processes the data using PySpark (Big Data Processing Engine).

Forecasts future pollution trends (2021–2026) using Spark ML (Machine Learning).

Visualizes critical insights using an interactive Plotly Dashboard.

In [8]:
# ==============================================================================
# PHASE 1: INFRASTRUCTURE & ENVIRONMENT SETUP
# ==============================================================================
# In this block, we initialize the Big Data environment.
# We install PySpark (since Colab doesn't have it by default) and configure
# the Spark Session to handle memory efficiently.

print(">>> [INIT]  Checking Environment...")
try:
    import pyspark
    print("    |-- PySpark is already installed.")
except ImportError:
    print("    |-- Installing PySpark (This takes 10-15 seconds)...")
    !pip install pyspark -q

# Standard Imports
import os
import sys
import warnings
import pandas as pd
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
import plotly.io as pio

# Spark Imports
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType, IntegerType
from pyspark.sql.functions import col, to_date, year, month, avg, count, when
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

# CRITICAL: Fix for blank charts in Colab
pio.renderers.default = "colab"
warnings.filterwarnings("ignore")

# Initialize Spark with Memory Safety
spark = SparkSession.builder \
    .appName("VayuDrishti_Safe_Mode") \
    .config("spark.ui.showConsoleProgress", "false") \
    .master("local[*]") \
    .getOrCreate()

print(">>> [INIT]  Spark Session Online. Ready to Process.")

>>> [INIT]  Checking Environment...
    |-- PySpark is already installed.
>>> [INIT]  Spark Session Online. Ready to Process.


Phase 2: Data Ingestion (The "Volume" V)
Here, we connect to the external data repository.

Source: Official city_day.csv from CPCB (Central Pollution Control Board).

Volume Check: We will print the exact number of rows and columns to demonstrate the dataset's size.

Data Preview: We display the first 5 rows to inspect the raw format.

In [9]:
# ==============================================================================
# PHASE 2: ROBUST DATA INGESTION
# ==============================================================================
print("\n>>> [DATA]  Ingesting CPCB (Govt of India) Dataset...")
dataset_url = "https://raw.githubusercontent.com/mimansha98/AQI-Prediction-before-and-after-Lockdown-of-COVID-19-in-India/main/city_day.csv"
file_name = "city_day.csv"

# Download only if missing (Bandwidth Optimization)
if not os.path.exists(file_name):
    os.system(f"wget -q -O {file_name} {dataset_url}")

# Load Data into Spark DataFrame
try:
    df_raw = spark.read.csv(file_name, header=True, inferSchema=True)

    # --- VOLUME METRICS ---
    row_count = df_raw.count()
    col_count = len(df_raw.columns)
    print(f"    |-- Data Lake Hydrated: {row_count:,} Rows across {col_count} Columns.")
    print(f"    |-- RAW DATA PREVIEW (First 50 Rows):")

    # SHOW 50 ROWS, Don't cut off text
    df_raw.show(50, truncate=False)

except Exception as e:
    print(f"!!! [ERROR] Data Load Failed: {e}")


>>> [DATA]  Ingesting CPCB (Govt of India) Dataset...
    |-- Data Lake Hydrated: 29,531 Rows across 17 Columns.
    |-- RAW DATA PREVIEW (First 50 Rows):
+---+---------+----------+-----------------+------------------+------------------+-----+------+------------------+------------------+------+------+-------+-------+------+-----------------+----------+
|_c0|City     |Date      |PM2.5            |PM10              |NO                |NO2  |NOx   |NH3               |CO                |SO2   |O3    |Benzene|Toluene|Xylene|AQI              |AQI_Bucket|
+---+---------+----------+-----------------+------------------+------------------+-----+------+------------------+------------------+------+------+-------+-------+------+-----------------+----------+
|0  |Ahmedabad|2015-01-01|67.45057794890306|118.12710293078136|0.92              |18.22|17.15 |23.483476019371967|0.92              |27.64 |133.36|0.0    |0.02   |0.0   |166.4635814889336|Moderate  |
|1  |Ahmedabad|2015-01-02|67.45057794890306|

Phase 3: ETL & Cleaning (The "Veracity" & "Variety" V)
Real-world data is messy. In this phase, we perform ETL (Extract, Transform, Load) operations:

Type Casting: Converting AQI to numbers (Double) to prevent text errors.

Null Handling: Removing rows with missing data (The Veracity check).

Feature Extraction: Breaking dates into Year and Month to analyze seasonality (The Variety check).

We will display the Cleaned Data Schema to prove the transformation worked

In [10]:
# ==============================================================================
# PHASE 3: ETL & CLEANING (VERACITY LAYER)
# ==============================================================================
from pyspark.sql.functions import rand # Import Randomizer

print("\n>>> [ETL]   Cleaning & transforming data...")

target_cities = ["Mumbai", "Delhi", "Bengaluru", "Chennai", "Hyderabad"]

# Pipeline: Cast Types -> Filter Cities -> Drop Nulls -> Extract Time Features
df_clean = df_raw.withColumn("AQI", col("AQI").cast(DoubleType())) \
                 .filter(col("City").isin(target_cities)) \
                 .dropna(subset=["AQI", "Date"]) \
                 .withColumn("Date", to_date(col("Date"), "yyyy-MM-dd")) \
                 .withColumn("Year", year(col("Date"))) \
                 .withColumn("Month", month(col("Date")))

clean_count = df_clean.count()
print(f"    |-- Cleaned Data: {clean_count:,} valid sensor readings ready.")
print("    |-- PROCESSED DATA SAMPLE (Random 50 Rows):")

# FIX: Order by Random to show variety (Mumbai, Delhi, etc.) instead of just top rows
df_clean.orderBy(rand()).select("City", "Date", "AQI", "Year", "Month").show(50, truncate=False)


>>> [ETL]   Cleaning & transforming data...
    |-- Cleaned Data: 10,042 valid sensor readings ready.
    |-- PROCESSED DATA SAMPLE (Random 50 Rows):
+---------+----------+-----------------+----+-----+
|City     |Date      |AQI              |Year|Month|
+---------+----------+-----------------+----+-----+
|Bengaluru|2020-04-21|65.0             |2020|4    |
|Chennai  |2018-08-25|106.0            |2018|8    |
|Bengaluru|2019-09-17|120.0            |2019|9    |
|Chennai  |2019-06-18|108.0            |2019|6    |
|Delhi    |2019-07-13|312.0            |2019|7    |
|Hyderabad|2019-06-17|60.0             |2019|6    |
|Chennai  |2019-10-07|108.0            |2019|10   |
|Hyderabad|2016-10-27|117.0            |2016|10   |
|Bengaluru|2018-10-12|115.0            |2018|10   |
|Hyderabad|2018-11-13|134.0            |2018|11   |
|Chennai  |2017-01-30|58.0             |2017|1    |
|Mumbai   |2020-06-22|51.0             |2020|6    |
|Hyderabad|2015-08-13|89.0             |2015|8    |
|Mumbai   |2016-0

Phase 4: Predictive Analytics (The "Velocity" V)
We now use Spark ML (Machine Learning) to predict the future.

Algorithm: Linear Regression.

Logic: The model learns the trend from 2015–2020 and extrapolates it to 2026.

Note on Trend: The 2020 Lockdown caused a massive dip in pollution. The model includes this "Lockdown Dip," which may cause the future forecast to look optimistic (downward trend).

In [11]:
# ==============================================================================
# PHASE 4: PREDICTIVE ANALYTICS (SPARK ML)
# ==============================================================================
print("\n>>> [ML]    Training Forecasting Models (2015 -> 2026)...")

forecast_list = []

for city in target_cities:
    # 1. Prepare Data for specific city
    city_df = df_clean.filter(col("City") == city) \
                      .groupBy("Year").agg(avg("AQI").alias("Avg_AQI")) \
                      .orderBy("Year")

    # 2. Vectorization (Spark ML Requirement)
    # FIX: changed outputCol("features") to outputCol="features"
    assembler = VectorAssembler(inputCols=["Year"], outputCol="features")
    train_data = assembler.transform(city_df)

    # 3. Train Model (Linear Regression)
    lr = LinearRegression(featuresCol="features", labelCol="Avg_AQI")
    model = lr.fit(train_data)

    # 4. Predict Future (2021-2026)
    future_years = [2021, 2022, 2023, 2024, 2025, 2026]
    future_df = spark.createDataFrame([(y,) for y in future_years], ["Year"])
    future_vec = assembler.transform(future_df)

    predictions = model.transform(future_vec)

    # 5. Collect Results
    rows = predictions.select("Year", "prediction").collect()
    for r in rows:
        forecast_list.append({"Year": r.Year, "City": city, "Avg_AQI": r.prediction, "Type": "AI Forecast"})

# Merge Real History + Forecasts for Visualization
pdf_history = df_clean.groupBy("Year", "City").agg(avg("AQI").alias("Avg_AQI")).toPandas()
pdf_history["Type"] = "Actual Data"
pdf_forecast = pd.DataFrame(forecast_list)
pdf_final = pd.concat([pdf_history, pdf_forecast], ignore_index=True)
print("    |-- Forecasting Complete. Model Output Integrated.")


>>> [ML]    Training Forecasting Models (2015 -> 2026)...
    |-- Forecasting Complete. Model Output Integrated.


Phase 5: The Command Center (Visualization)
We present 5 Strategic Visuals:

Strategic Roadmap: Historical data + AI Forecast (Dotted Line). Note the 2020 drop.

Seasonality Audit: Proof of the "Monsoon Effect" (AQI drops in June/July).

Pollutant Correlation: Validates the chemical link between PM2.5 and PM10.

Veracity Matrix: A heatmap showing data gaps/sensor uptime.

Risk Gauge: A real-time snapshot of Mumbai's average risk level.

In [19]:
# ==============================================================================
# PHASE 5: VISUALIZATION DASHBOARD
# ==============================================================================
print("\n>>> [VIZ]   Rendering Interactive Command Center...")

# CHART 1: STRATEGIC ROADMAP (Line Chart)
fig1 = px.line(pdf_final, x="Year", y="Avg_AQI", color="City", symbol="Type",
               title="<b>1. Strategic Air Quality Roadmap (2015-2026)</b>",
               template="plotly_dark", markers=True)
fig1.add_vline(x=2020.5, line_dash="dash", line_color="white", annotation_text="Forecast Start")
fig1.show()

# CHART 2: SEASONALITY AUDIT (Bar Chart)
df_mumbai_season = df_clean.filter(col("City") == "Mumbai") \
                           .groupBy("Month").agg(avg("AQI").alias("AQI")).toPandas()
fig2 = px.bar(df_mumbai_season, x="Month", y="AQI",
              title="<b>2. Mumbai Environmental Rhythm (Seasonality)</b>",
              color="AQI", color_continuous_scale="RdYlGn_r", template="plotly_white")
fig2.show()

# CHART 3: POLLUTANT CORRELATION (Scatter)
# --- SANITIZATION STEP ---
# We rename 'PM2.5' to 'PM25' to avoid Spark syntax errors with the dot (.)
# This is the industry-standard way to handle special characters.
pdf_corr = df_raw.withColumnRenamed("PM2.5", "PM25") \
                 .filter(col("City") == "Mumbai") \
                 .select("PM25", "PM10") \
                 .dropna() \
                 .sample(False, 0.1) \
                 .toPandas()

fig3 = px.scatter(pdf_corr, x="PM25", y="PM10", opacity=0.5,
                  title="<b>3. Pollutant Composition (PM2.5 vs PM10)</b>",
                  trendline="ols", template="simple_white",
                  labels={"PM25": "PM 2.5 (µg/m³)", "PM10": "PM 10 (µg/m³)"})
fig3.show()

# CHART 4: DATA VERACITY MATRIX (Heatmap)
df_veracity = df_clean.groupBy("Year", "City").count().toPandas()
fig4 = px.density_heatmap(df_veracity, x="Year", y="City", z="count",
                          title="<b>4. Data Veracity Matrix (Sensor Uptime)</b>",
                          color_continuous_scale="Viridis", template="plotly_dark")
fig4.show()

# CHART 5: CURRENT STATUS GAUGE
# Real-time snapshot for Mumbai
latest_val = pdf_history[pdf_history['City']=='Mumbai']['Avg_AQI'].iloc[-1]
fig5 = go.Figure(go.Indicator(
    mode = "gauge+number", value = latest_val,
    title = {'text': "<b>5. Mumbai Current Risk Level (Avg)</b>"},
    gauge = {'axis': {'range': [None, 300]}, 'bar': {'color': "darkred"},
             'steps': [{'range': [0, 100], 'color': "green"}, {'range': [100, 200], 'color': "orange"}]}))
fig5.show()

print("\n✅ SYSTEM EXECUTION SUCCESSFUL.")


>>> [VIZ]   Rendering Interactive Command Center...



✅ SYSTEM EXECUTION SUCCESSFUL.
