In [0]:
dbutils.secrets.listScopes()

# Mounting ADLS with databricks

In [0]:
files = dbutils.fs.ls("/mnt/vijetaadlscapstone2/Air_quality")
for f in files:
    print(f.path)

In [0]:
storage_key = dbutils.secrets.get(scope='vijetanewscopecapstone', key='vijetasecretcapstone')
fd_storage_path = "fs.azure.account.key.vijetaadlscapstone.blob.core.windows.net"

spark.conf.set(fd_storage_path, storage_key)

import pyspark.sql.functions as F
file_path = 'wasbs://capstone2@vijetaadlscapstone.blob.core.windows.net/Air_quality/AirQuality_2025-11-07 11:07:45'

df = spark.read.format("json").load(file_path)

# Explode the list of dictionaries into rows
# df_exploded = df.select(F.explode('records').alias('item'))

# If you want to flatten the dictionary fields into columns
# df_flat = df_exploded.select('item.*')

df.show()

In [0]:
capitals = {
  "Andhra Pradesh": "Amaravati",
  "Arunachal Pradesh": "Itanagar",
  "Assam": "Dispur",
  "Bihar": "Patna",
  "Chhattisgarh": "Raipur",
  "Delhi": "New Delhi",
  "Goa": "Panaji",
  "Gujarat": "Gandhinagar",
  "Haryana": "Chandigarh",
  "Himachal Pradesh": "Shimla",
  "Jharkhand": "Ranchi",
  "Karnataka": "Bengaluru",
  "Kerala": "Thiruvananthapuram",
  "Madhya Pradesh": "Bhopal",
  "Maharashtra": "Mumbai",
  "Manipur": "Imphal",
  "Meghalaya": "Shillong",
  "Mizoram": "Aizawl",
  "Nagaland": "Kohima",
  "Odisha": "Bhubaneswar",
  "Punjab": "Chandigarh",
  "Rajasthan": "Jaipur",
  "Sikkim": "Gangtok",
  "Tamil Nadu": "Chennai",
  "Telangana": "Hyderabad",
  "Tripura": "Agartala",
  "Uttar Pradesh": "Lucknow",
  "Uttarakhand": "Dehradun",
  "West Bengal": "Kolkata"
}
 

# Transformations

In [0]:
df_capital = spark.createDataFrame(capitals.items(), ["state", "city"])
display(df_capital)

In [0]:
df_2 = df.join(df_capital, (df.state == df_capital.state) & (df.city == df_capital.city), "inner")
display(df_2)

In [0]:
df_2 = df_2.drop(df_capital.state,df_capital.city)
display(df_2)

# Removing row level duplicates

In [0]:
df_2 = df_2.dropDuplicates()
display(df_2)

In [0]:
#All states of India

states_all = ["Andhra Pradesh","Arunachal Pradesh ","Assam","Bihar","Chhattisgarh","Delhi","Goa","Gujarat","Haryana","Himachal Pradesh","Jharkhand","Karnataka","Kerala","Madhya Pradesh","Maharashtra","Manipur","Meghalaya","Mizoram","Nagaland","Odisha","Punjab","Rajasthan","Sikkim","Tamil Nadu","Telangana","Tripura","Uttar Pradesh","Uttarakhand","West Bengal"]

In [0]:
#State values

df_check = df_2.withColumn("state", F.col("state").isin(states_all))
 
alarm_states = df_check.filter(df_check.state == False)
 
if alarm_states.count() > 0:
  print("The States are not Correct")
  alarm_states.select("state").distinct().show()
else:
  print("All States are Correct")

In [0]:
# Lattitude and Longitude (maximum and minimum i.e. -90 to 90 and -180 to 180)

df_check = df_2.filter((df_2.latitude.cast("Double") < -90) | (df_2.latitude.cast("Double") > 90) | (df_2.longitude.cast("Double") < -180) | (df_2.longitude.cast("Double") > 180))
 
if df_check.count() > 0:
  print("The Latitude and Longitude are not Correct")
  df_check.select("latitude", "longitude").distinct().show()
else:
  print("All Latitude and Longitude are Correct")

In [0]:
# Ozone, CO & SO2- Transformations

# Cleansing Pollutant_Avg
df_pollutant = df_2.withColumn("pollutant_avg", F.when(F.col('pollutant_avg') == 'NA', 0).otherwise(F.col('pollutant_avg')))
 
ozone_range = df_pollutant.filter((df_pollutant.pollutant_id == 'OZONE') & (df_pollutant.pollutant_avg.cast("Double") > 100))
co_range = df_pollutant.filter((df_pollutant.pollutant_id == 'CO') & (df_pollutant.pollutant_avg.cast("Double") > 7))
so2_range = df_pollutant.filter((df_pollutant.pollutant_id == 'SO2') & (df_pollutant.pollutant_avg.cast("Double") > 40))
 
# Join/merge all three columns
alarmed_pollutants = ozone_range.union(co_range).union(so2_range)
 
if (alarmed_pollutants.count() > 0):
  print("The pollutant values are not Correct")
  alarmed_pollutants.select("state", "city", "pollutant_id", "pollutant_avg").distinct().show(150)
else:
  print("All pollutant values are Correct")

#Write to silver (Silver layer transformations)

In [0]:
df_2.write.mode("overwrite").format("delta").saveAsTable("faampn6.vijeta_schema_airquality.air_quality_silver")
display(df_2)

# Adding data fetch time as a column

In [0]:
from pyspark.sql.functions import to_date
 
df_2 = df_2.withColumn("date", to_date("Datafetchtime"))
display(df_2)

In [0]:
from pyspark.sql import functions as F
 
df_clean = df_2.withColumn("pollutant_avg", F.when(F.col('pollutant_avg') == 'NA', 0).otherwise(F.col('pollutant_avg')))\
    .withColumn("pollutant_min", F.when(F.col('pollutant_min') == 'NA', 0).otherwise(F.col('pollutant_min')))\
    .withColumn("pollutant_max", F.when(F.col('pollutant_max') == 'NA', 0).otherwise(F.col('pollutant_max')))
display(df_clean)

In [0]:
from pyspark.sql.functions import min, max, avg
df_summary = df_clean.groupBy("state", "city", "pollutant_id", "date").agg(min("pollutant_min").alias("Min_Pollutant"), max("pollutant_max").alias("Max_Pollutant"), avg("pollutant_avg").alias("Avg_Pollutant"))

display(df_summary)

# Saving to Gold Layer

In [0]:
df_summary.write.format("delta").mode("overwrite").saveAsTable("faampn6.vijeta_schema_airquality.air_quality_gold")

In [0]:
df_summary.write.format("delta").mode("overwrite").saveAsTable("hive_metastore.default.Vijeta_air_quality")

In [0]:
df_gold = spark.read.table("faampn6.vijeta_schema_airquality.air_quality_gold")
display(df_gold)

#Databricks Visualisations

In [0]:
import plotly.graph_objects as go

# Prepare data for Sankey diagram
states = df_summary.select("state").distinct().rdd.flatMap(lambda x: x).collect()
cities = df_summary.select("city").distinct().rdd.flatMap(lambda x: x).collect()
pollutants = df_summary.select("pollutant_id").distinct().rdd.flatMap(lambda x: x).collect()

state_idx = {k: i for i, k in enumerate(states)}
city_idx = {k: i+len(states) for i, k in enumerate(cities)}
pollutant_idx = {k: i+len(states)+len(cities) for i, k in enumerate(pollutants)}

# State to City links
state_city = df_summary.groupBy("state", "city").count().collect()
links_state_city = [{
    "source": state_idx[row["state"]],
    "target": city_idx[row["city"]],
    "value": row["count"]
} for row in state_city]

# City to Pollutant links
city_pollutant = df_summary.groupBy("city", "pollutant_id").count().collect()
links_city_pollutant = [{
    "source": city_idx[row["city"]],
    "target": pollutant_idx[row["pollutant_id"]],
    "value": row["count"]
} for row in city_pollutant]

# Combine all links
links = links_state_city + links_city_pollutant

label = states + cities + pollutants
source = [l["source"] for l in links]
target = [l["target"] for l in links]
value = [l["value"] for l in links]

fig = go.Figure(data=[go.Sankey(
    node=dict(
        pad=15,
        thickness=20,
        line=dict(color="black", width=0.5),
        label=label
    ),
    link=dict(
        source=source,
        target=target,
        value=value
    ))])

fig.update_layout(title_text="State → City → Pollutant Sankey Diagram", font_size=10)
fig.show()

In [0]:
import plotly.express as px

# Prepare data for mapping: ensure state names and avg_pollutant are present
df_map = df_summary.select("state", "Avg_Pollutant").groupBy("state").avg("Avg_Pollutant").withColumnRenamed("avg(Avg_Pollutant)", "avg_pollutant")
pandas_df = df_map.toPandas()

fig = px.choropleth(
    pandas_df,
    geojson="https://raw.githubusercontent.com/plotly/datasets/master/india_states.geojson",
    featureidkey="properties.ST_NM",
    locations="state",
    color="avg_pollutant",
    color_continuous_scale="YlOrRd",
    title="Average Pollutant by Indian State"
)
fig.update_geos(fitbounds="locations", visible=False)
fig.show()

In [0]:
import plotly.graph_objects as go

# Prepare data
df_map = df_summary.select("state", "Avg_Pollutant").groupBy("state").avg("Avg_Pollutant").withColumnRenamed("avg(Avg_Pollutant)", "avg_pollutant")
pandas_df = df_map.toPandas()

# For globe view, we need lat/lon for each state. Example mapping (should be replaced with actual centroids for all states)
state_coords = {
    "Andhra Pradesh": (15.9129, 79.7400),
    "Arunachal Pradesh": (28.2180, 94.7278),
    "Assam": (26.2006, 92.9376),
    "Bihar": (25.0961, 85.3131),
    "Chhattisgarh": (21.2787, 81.8661),
    "Goa": (15.2993, 74.1240),
    "Gujarat": (22.2587, 71.1924),
    "Haryana": (29.0588, 76.0856),
    "Himachal Pradesh": (31.1048, 77.1734),
    "Jharkhand": (23.6102, 85.2799),
    "Karnataka": (15.3173, 75.7139),
    "Kerala": (10.8505, 76.2711),
    "Madhya Pradesh": (22.9734, 78.6569),
    "Maharashtra": (19.7515, 75.7139),
    "Manipur": (24.6637, 93.9063),
    "Meghalaya": (25.4670, 91.3662),
    "Mizoram": (23.1645, 92.9376),
    "Nagaland": (26.1584, 94.5624),
    "Odisha": (20.9517, 85.0985),
    "Punjab": (31.1471, 75.3412),
    "Rajasthan": (27.0238, 74.2179),
    "Sikkim": (27.5330, 88.5122),
    "Tamil Nadu": (11.1271, 78.6569),
    "Telangana": (18.1124, 79.0193),
    "Tripura": (23.9408, 91.9882),
    "Uttar Pradesh": (26.8467, 80.9462),
    "Uttarakhand": (30.0668, 79.0193),
    "West Bengal": (22.9868, 87.8550),
    "Delhi": (28.7041, 77.1025),
    "Jammu and Kashmir": (33.7782, 76.5762),
    "Ladakh": (34.1526, 77.5771),
    "Puducherry": (11.9416, 79.8083),
    "Chandigarh": (30.7333, 76.7794),
    "Andaman and Nicobar Islands": (11.7401, 92.6586),
    "Dadra and Nagar Haveli and Daman and Diu": (20.1809, 73.0169),
    "Lakshadweep": (10.5667, 72.6417)
}

# Add lat/lon to pandas_df
pandas_df["lat"] = pandas_df["state"].map(lambda x: state_coords.get(x, (None, None))[0])
pandas_df["lon"] = pandas_df["state"].map(lambda x: state_coords.get(x, (None, None))[1])

# Drop rows with missing coordinates
pandas_df = pandas_df.dropna(subset=["lat", "lon"])

fig = go.Figure(go.Scattergeo(
    lon = pandas_df["lon"],
    lat = pandas_df["lat"],
    text = pandas_df["state"] + "<br>Avg Pollutant: " + pandas_df["avg_pollutant"].round(2).astype(str),
    marker = dict(
        size = 12,
        color = pandas_df["avg_pollutant"],
        colorscale = "YlOrRd",
        colorbar_title = "Avg Pollutant",
        line_color='black',
        line_width=0.5,
        sizemode='area'
    )
))

fig.update_geos(
    projection_type="orthographic",
    showcountries=True,
    showcoastlines=True,
    showland=True,
    landcolor="rgb(217, 217, 217)",
    countrycolor="rgb(204, 204, 204)"
)

fig.update_layout(
    title_text="Globe View: Average Pollutant by Indian State",
    geo=dict(
        projection_scale=2.5,
        center=dict(lat=22, lon=80)
    )
)

fig.show()

In [0]:
import plotly.graph_objects as go

# Prepare data: average pollutant by state
df_map = df_summary.select("state", "Avg_Pollutant").groupBy("state").avg("Avg_Pollutant").withColumnRenamed("avg(Avg_Pollutant)", "avg_pollutant")
pandas_df = df_map.toPandas().sort_values("avg_pollutant", ascending=False)

fig = go.Figure(go.Waterfall(
    x=pandas_df["state"],
    y=pandas_df["avg_pollutant"],
    text=pandas_df["avg_pollutant"].round(2).astype(str),
    connector={"line":{"color":"#0074D9"}},
    decreasing={"marker":{"color":"#1f77b4"}},
    increasing={"marker":{"color":"#339af0"}},
    totals={"marker":{"color":"#003366"}}
))

fig.update_layout(
    title="Waterfall Chart: Average Pollutant by Indian State",
    xaxis_title="State",
    yaxis_title="Average Pollutant",
    showlegend=False
)
fig.show()