In [13]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("AQI Calculation") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()


In [14]:
import pyarrow.parquet as pq

input_path = "C:/Users/SNEHIL/Downloads/Air quality Monitoring/air_quality_with_weather.parquet"  # your local path

pf = pq.ParquetFile(input_path)
print("num_row_groups:", pf.num_row_groups)
print("Parquet schema (arrow):")
print(pf.schema_arrow)    # shows arrow types for each column

# print each column name + type
for field in pf.schema_arrow:
    print(field.name, "->", field.type)


FileNotFoundError: [WinError 2] Failed to open local file 'C:/Users/SNEHIL/Downloads/Air quality Monitoring/air_quality_with_weather.parquet'. Detail: [Windows error 2] The system cannot find the file specified.


In [None]:
import pyarrow.parquet as pq
import pyarrow as pa
from pathlib import Path

input_path = "C:/Users/SNEHIL/Downloads/Air quality Monitoring/air_quality_with_weather.parquet"
output_path = "C:/Users/SNEHIL/Downloads/Air quality Monitoring/fixed.parquet"

reader = pq.ParquetFile(input_path)
print("Row groups:", reader.num_row_groups)

writer = None
try:
    for rg in range(reader.num_row_groups):
        print(f"Processing row group {rg+1}/{reader.num_row_groups} ...")
        table = reader.read_row_group(rg)  # pyarrow.Table for that row group

        # On first row-group, create writer with the table's schema.
        # coerce_timestamps='ms' will cast INT96/nanos to milliseconds.
        if writer is None:
            writer = pq.ParquetWriter(
                output_path,
                table.schema,
                coerce_timestamps='ms',              # cast timestamps to ms
                allow_truncated_timestamps=True      # allow truncation if needed
            )

        writer.write_table(table)

finally:
    if writer is not None:
        writer.close()


Row groups: 15
Processing row group 1/15 ...
Processing row group 2/15 ...
Processing row group 3/15 ...
Processing row group 4/15 ...
Processing row group 5/15 ...
Processing row group 6/15 ...
Processing row group 7/15 ...
Processing row group 8/15 ...
Processing row group 9/15 ...
Processing row group 10/15 ...
Processing row group 11/15 ...
Processing row group 12/15 ...
Processing row group 13/15 ...
Processing row group 14/15 ...
Processing row group 15/15 ...


In [None]:
df = spark.read.parquet("C:/Users/SNEHIL/Downloads/Air quality Monitoring/fixed.parquet")


In [None]:
df.printSchema()

root
 |-- location_id: long (nullable = true)
 |-- sensors_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- datetime: timestamp_ntz (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- time: string (nullable = true)
 |-- value_co: double (nullable = true)
 |-- value_no2: double (nullable = true)
 |-- value_o3: double (nullable = true)
 |-- value_pm10: double (nullable = true)
 |-- value_pm25: double (nullable = true)
 |-- value_so2: double (nullable = true)
 |-- datetime_hour: timestamp_ntz (nullable = true)
 |-- lat_round: double (nullable = true)
 |-- lon_round: double (nullable = true)
 |-- temperature: double (nullable = true)
 |-- wind: double (nullable = true)
 |-- humidity: long (nullable = true)
 |-- units: string (nullable = true)



In [None]:
breakpoints = {
    "pm25": [(0, 30, 0, 50), (31, 60, 51, 100), (61, 90, 101, 200), (91, 120, 201, 300), (121, 250, 301, 400), (251, 9999, 401, 500)],
    "pm10": [(0, 50, 0, 50), (51, 100, 51, 100), (101, 250, 101, 200), (251, 350, 201, 300), (351, 430, 301, 400), (431, 9999, 401, 500)],
    "no2":  [(0, 40, 0, 50), (41, 80, 51, 100), (81, 180, 101, 200), (181, 280, 201, 300), (281, 400, 301, 400), (401, 9999, 401, 500)],
    "so2":  [(0, 40, 0, 50), (41, 80, 51, 100), (81, 380, 101, 200), (381, 800, 201, 300), (801, 1600, 301, 400), (1601, 9999, 401, 500)],
    "co":   [(0, 1, 0, 50), (1.1, 2, 51, 100), (2.1, 10, 101, 200), (10.1, 17, 201, 300), (17.1, 34, 301, 400), (34.1, 9999, 401, 500)],
    "o3":   [(0, 50, 0, 50), (51, 100, 51, 100), (101, 168, 101, 200), (169, 208, 201, 300), (209, 748, 301, 400), (749, 9999, 401, 500)]
}

In [None]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf

def compute_aqi(cp, pollutant):
    if cp is None:
        return None
    for bp_lo, bp_hi, i_lo, i_hi in breakpoints[pollutant]:
        if bp_lo <= cp <= bp_hi:
            return ((i_hi - i_lo) / (bp_hi - bp_lo)) * (cp - bp_lo) + i_lo
    return None

# Register UDFs
for pol in breakpoints.keys():
    spark.udf.register(f"aqi_{pol}", lambda x, pol=pol: compute_aqi(x, pol), DoubleType())

In [None]:
from pyspark.sql.functions import col, greatest

df = df.withColumn("aqi_pm25",  udf(lambda x: compute_aqi(x, "pm25"), DoubleType())(col("value_pm25")))
df = df.withColumn("aqi_pm10",  udf(lambda x: compute_aqi(x, "pm10"), DoubleType())(col("value_pm10")))
df = df.withColumn("aqi_no2",   udf(lambda x: compute_aqi(x, "no2"), DoubleType())(col("value_no2")))
df = df.withColumn("aqi_so2",   udf(lambda x: compute_aqi(x, "so2"), DoubleType())(col("value_so2")))
df = df.withColumn("aqi_co",    udf(lambda x: compute_aqi(x, "co"), DoubleType())(col("value_co")))
df = df.withColumn("aqi_o3",    udf(lambda x: compute_aqi(x, "o3"), DoubleType())(col("value_o3")))

# Final AQI
df = df.withColumn("AQI", greatest("aqi_pm25", "aqi_pm10", "aqi_no2", "aqi_so2", "aqi_co", "aqi_o3"))

In [None]:
df.printSchema()

root
 |-- location_id: long (nullable = true)
 |-- sensors_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- datetime: timestamp_ntz (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- time: string (nullable = true)
 |-- value_co: double (nullable = true)
 |-- value_no2: double (nullable = true)
 |-- value_o3: double (nullable = true)
 |-- value_pm10: double (nullable = true)
 |-- value_pm25: double (nullable = true)
 |-- value_so2: double (nullable = true)
 |-- datetime_hour: timestamp_ntz (nullable = true)
 |-- lat_round: double (nullable = true)
 |-- lon_round: double (nullable = true)
 |-- temperature: double (nullable = true)
 |-- wind: double (nullable = true)
 |-- humidity: long (nullable = true)
 |-- units: string (nullable = true)
 |-- aqi_pm25: double (nullable = true)
 |-- aqi_pm10: double (nullable = tr

In [None]:
!python --version

Python 3.12.1


In [None]:
!pip install pyspark==3.5.1 findspark


In [None]:
import pandas as pd
df = pd.read_parquet("C:/Users/SNEHIL/Downloads/Air quality Monitoring/fixed.parquet")


In [None]:
df.head()

Unnamed: 0,location_id,sensors_id,location,datetime,lat,lon,year,month,day,time,...,value_pm10,value_pm25,value_so2,datetime_hour,lat_round,lon_round,temperature,wind,humidity,units
0,12,23,SPARTAN - IIT Kanpur-12,2013-12-14 16:00:00,26.519,80.233,2013,12,14,16:00:00,...,0.0,106.5,0.0,2013-12-14 16:00:00,26.519,80.233,22.2,3.8,40,µg/m³
1,12,23,SPARTAN - IIT Kanpur-12,2013-12-14 17:00:00,26.519,80.233,2013,12,14,17:00:00,...,0.0,127.6,0.0,2013-12-14 17:00:00,26.519,80.233,20.6,7.2,46,µg/m³
2,12,23,SPARTAN - IIT Kanpur-12,2013-12-14 18:00:00,26.519,80.233,2013,12,14,18:00:00,...,0.0,124.0,0.0,2013-12-14 18:00:00,26.519,80.233,18.8,9.0,52,µg/m³
3,12,23,SPARTAN - IIT Kanpur-12,2013-12-14 19:00:00,26.519,80.233,2013,12,14,19:00:00,...,0.0,84.9,0.0,2013-12-14 19:00:00,26.519,80.233,17.2,9.4,59,µg/m³
4,12,23,SPARTAN - IIT Kanpur-12,2013-12-14 20:00:00,26.519,80.233,2013,12,14,20:00:00,...,0.0,36.8,0.0,2013-12-14 20:00:00,26.519,80.233,15.8,8.2,65,µg/m³


In [None]:
import pandas as pd
import numpy as np

# --- 1. Breakpoints Dictionary (Remains the same) ---
breakpoints = {
    "pm25": [(0, 30, 0, 50), (31, 60, 51, 100), (61, 90, 101, 200), (91, 120, 201, 300), (121, 250, 301, 400), (251, 9999, 401, 500)],
    "pm10": [(0, 50, 0, 50), (51, 100, 51, 100), (101, 250, 101, 200), (251, 350, 201, 300), (351, 430, 301, 400), (431, 9999, 401, 500)],
    "no2":  [(0, 40, 0, 50), (41, 80, 51, 100), (81, 180, 101, 200), (181, 280, 201, 300), (281, 400, 301, 400), (401, 9999, 401, 500)],
    "so2":  [(0, 40, 0, 50), (41, 80, 51, 100), (81, 380, 101, 200), (381, 800, 201, 300), (801, 1600, 301, 400), (1601, 9999, 401, 500)],
    "co":   [(0, 1, 0, 50), (1.1, 2, 51, 100), (2.1, 10, 101, 200), (10.1, 17, 201, 300), (17.1, 34, 301, 400), (34.1, 9999, 401, 500)],
    "o3":   [(0, 50, 0, 50), (51, 100, 51, 100), (101, 168, 101, 200), (169, 208, 201, 300), (209, 748, 301, 400), (749, 9999, 401, 500)]
}

# --- 2. The Core AQI Calculation Function (Remains the same) ---

def compute_aqi(cp, pollutant):
    """
    Computes the Air Quality Index (AQI) for a given pollutant concentration (Cp).
    """
    if cp is None or pd.isna(cp):
        return None
    
    # Ensure Cp is treated as a float for calculations
    cp = float(cp)
    
    # Check for division by zero risk (Bphi - Bplo == 0)
    for bp_lo, bp_hi, i_lo, i_hi in breakpoints[pollutant]:
        if bp_hi == bp_lo:
             continue # Skip invalid breakpoint ranges
             
        if bp_lo <= cp <= bp_hi:
            # The AQI formula: I = [(Ihi - Ilo) / (Bphi - Bplo)] * (Cp - Bplo) + Ilo
            return ((i_hi - i_lo) / (bp_hi - bp_lo)) * (cp - bp_lo) + i_lo
            
    return None


# --- 3. Pandas Implementation (Replaces Spark Transformations) ---


# List of pollutants and their corresponding DataFrame columns
pollutants = {
    "pm25": "value_pm25",
    "pm10": "value_pm10",
    "no2":  "value_no2",
    "so2":  "value_so2",
    "co":   "value_co",
    "o3":   "value_o3"
}

# 3a. Calculate individual AQI columns using .apply()
aqi_columns = []
for pol, col_name in pollutants.items():
    aqi_col_name = f"aqi_{pol}"
    
    # Use .apply() to execute the function row-wise on the concentration column.
    # The 'lambda' handles passing the pollutant name for the breakpoints lookup.
    df[aqi_col_name] = df[col_name].apply(lambda cp: compute_aqi(cp, pol))
    
    aqi_columns.append(aqi_col_name)

# 3b. Final AQI calculation (Replaces Spark's 'greatest')
# Use the .max(axis=1) method on the subset of AQI columns.
# skipna=True ensures that if one AQI value is None/NaN, the max of the others is taken.
df["AQI"] = df[aqi_columns].max(axis=1, skipna=True)

# print(df) # Uncomment to see the resulting DataFrame

In [None]:
df.head()

Unnamed: 0,location_id,sensors_id,location,datetime,lat,lon,year,month,day,time,...,wind,humidity,units,aqi_pm25,aqi_pm10,aqi_no2,aqi_so2,aqi_co,aqi_o3,AQI
0,12,23,SPARTAN - IIT Kanpur-12,2013-12-14 16:00:00,26.519,80.233,2013,12,14,16:00:00,...,3.8,40,µg/m³,253.913793,0.0,0.0,0.0,0.0,0.0,253.913793
1,12,23,SPARTAN - IIT Kanpur-12,2013-12-14 17:00:00,26.519,80.233,2013,12,14,17:00:00,...,7.2,46,µg/m³,306.065116,0.0,0.0,0.0,0.0,0.0,306.065116
2,12,23,SPARTAN - IIT Kanpur-12,2013-12-14 18:00:00,26.519,80.233,2013,12,14,18:00:00,...,9.0,52,µg/m³,303.302326,0.0,0.0,0.0,0.0,0.0,303.302326
3,12,23,SPARTAN - IIT Kanpur-12,2013-12-14 19:00:00,26.519,80.233,2013,12,14,19:00:00,...,9.4,59,µg/m³,182.589655,0.0,0.0,0.0,0.0,0.0,182.589655
4,12,23,SPARTAN - IIT Kanpur-12,2013-12-14 20:00:00,26.519,80.233,2013,12,14,20:00:00,...,8.2,65,µg/m³,60.8,0.0,0.0,0.0,0.0,0.0,60.8


In [None]:
# List the columns you want to remove
columns_to_drop = [
    'aqi_pm25', 
    'aqi_pm10', 
    'aqi_no2', 
    'aqi_so2', 
    'aqi_co', 
    'aqi_o3'
]

df.drop(columns=columns_to_drop, axis=1, inplace=True)

print("Individual AQI columns successfully dropped from df.")

MemoryError: Unable to allocate 1.33 GiB for an array with shape (12, 14841314) and data type float64

In [None]:
import os
import pandas as pd

# Define the file path
file_path = r"C:\Users\SNEHIL\Downloads\Air quality Monitoring\air_quality_with_weather1.parquet"

# --- Recommended Practice: Ensure the directory exists ---
# This step prevents an error if the 'Air quality Monitoring' folder doesn't exist yet.
output_dir = os.path.dirname(file_path)
if output_dir and not os.path.exists(output_dir):
    os.makedirs(output_dir)
    print(f"Created directory: {output_dir}")

# Write the DataFrame to the Parquet file
try:
    df.to_parquet(
        path=file_path, 
        engine='pyarrow',      # Highly recommended engine for Parquet
        compression='snappy',  # A fast, efficient compression codec
        index=False            # Set to False unless you specifically need to save the index
    )
    print(f"\nDataFrame successfully written to Parquet file:\n{file_path}")

except ImportError:
    print("\nERROR: The 'pyarrow' or 'fastparquet' library is required to write Parquet files.")
    print("Please install one of them: `pip install pyarrow`")
except Exception as e:
    print(f"\nAn error occurred while writing the file: {e}")


DataFrame successfully written to Parquet file:
C:\Users\SNEHIL\Downloads\Air quality Monitoring\air_quality_with_weather.parquet


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("CSV Loader").getOrCreate()
print(spark)

<pyspark.sql.session.SparkSession object at 0x0000020B4E2E1610>


In [None]:
stations_df = spark.read.csv("./stations.csv", header=True, inferSchema=True)
stations_df.show()

+---------+--------------------+-----------------+--------------+------+
|StationId|         StationName|             City|         State|Status|
+---------+--------------------+-----------------+--------------+------+
|    AP001|Secretariat, Amar...|        Amaravati|Andhra Pradesh|Active|
|    AP002|Anand Kala Kshetr...|Rajamahendravaram|Andhra Pradesh|  NULL|
|    AP003|Tirumala, Tirupat...|         Tirupati|Andhra Pradesh|  NULL|
|    AP004|PWD Grounds, Vija...|       Vijayawada|Andhra Pradesh|  NULL|
|    AP005|GVM Corporation, ...|    Visakhapatnam|Andhra Pradesh|Active|
|    AS001|Railway Colony, G...|         Guwahati|         Assam|Active|
|    BR001|Collectorate, Gay...|             Gaya|         Bihar|  NULL|
|    BR002|SFTI Kusdihra, Ga...|             Gaya|         Bihar|  NULL|
|    BR003|Industrial Area, ...|          Hajipur|         Bihar|  NULL|
|    BR004|Muzaffarpur Colle...|      Muzaffarpur|         Bihar|  NULL|
|    BR005|DRM Office Danapu...|            Patna| 

In [None]:
from pyspark.sql.functions import regexp_replace

stations_df = stations_df.withColumn("StationName", regexp_replace("StationName", "-.*", ""))
stations_df.show(n=5)

+---------+--------------------+-----------------+--------------+------+
|StationId|         StationName|             City|         State|Status|
+---------+--------------------+-----------------+--------------+------+
|    AP001|Secretariat, Amar...|        Amaravati|Andhra Pradesh|Active|
|    AP002|Anand Kala Kshetr...|Rajamahendravaram|Andhra Pradesh|  NULL|
|    AP003| Tirumala, Tirupati |         Tirupati|Andhra Pradesh|  NULL|
|    AP004|PWD Grounds, Vija...|       Vijayawada|Andhra Pradesh|  NULL|
|    AP005|GVM Corporation, ...|    Visakhapatnam|Andhra Pradesh|Active|
+---------+--------------------+-----------------+--------------+------+
only showing top 5 rows


In [None]:
import pandas as pd

stations_df = pd.read_csv('./stations.csv')

In [None]:
stations_df.head()

Unnamed: 0,StationId,StationName,City,State,Status
0,AP001,"Secretariat, Amaravati - APPCB",Amaravati,Andhra Pradesh,Active
1,AP002,"Anand Kala Kshetram, Rajamahendravaram - APPCB",Rajamahendravaram,Andhra Pradesh,
2,AP003,"Tirumala, Tirupati - APPCB",Tirupati,Andhra Pradesh,
3,AP004,"PWD Grounds, Vijayawada - APPCB",Vijayawada,Andhra Pradesh,
4,AP005,"GVM Corporation, Visakhapatnam - APPCB",Visakhapatnam,Andhra Pradesh,Active


In [None]:
stations_df["StationName"] = stations_df["StationName"].str.replace("-.*", "", regex=True)
print(stations_df.head(5))

  StationId                              StationName               City  \
0     AP001                  Secretariat, Amaravati           Amaravati   
1     AP002  Anand Kala Kshetram, Rajamahendravaram   Rajamahendravaram   
2     AP003                      Tirumala, Tirupati            Tirupati   
3     AP004                 PWD Grounds, Vijayawada          Vijayawada   
4     AP005          GVM Corporation, Visakhapatnam       Visakhapatnam   

            State  Status  
0  Andhra Pradesh  Active  
1  Andhra Pradesh     NaN  
2  Andhra Pradesh     NaN  
3  Andhra Pradesh     NaN  
4  Andhra Pradesh  Active  


In [None]:
import pandas as pd
from geopy.geocoders import Nominatim
import time
# --- NEW IMPORT ---
from tqdm.auto import tqdm

# Enable tqdm progress bar for pandas operations
tqdm.pandas()

# --- Setup for geocoding ---
# Initialize the geolocator once outside the function for efficiency
geolocator = Nominatim(user_agent="air_quality_app")
# Define the rate-limit delay
DELAY_SECONDS = 2 # Increased delay to be safe with Nominatim

def geocode_location_safe(location):
    """
    Geocodes a location string and returns (longitude, latitude).
    Includes a delay for rate limiting.
    """
    if pd.isna(location) or location.strip() == "":
        return None, None

    try:
        # NOTE: Using the globally defined geolocator
        location_data = geolocator.geocode(location)
        if location_data:
            # Add a small delay to avoid hitting API rate limits
            time.sleep(DELAY_SECONDS)
            return (location_data.longitude, location_data.latitude)
        return (None, None)
    except Exception as e:
        # Wait even if there's an error to respect rate limits
        time.sleep(DELAY_SECONDS)
        return (None, None)


# --- Apply geocoding logic to the pandas DataFrame ---

# 1. Create the full location string
stations_df["City"] = stations_df["City"].fillna("")
stations_df["State"] = stations_df["State"].fillna("")
stations_df["location_full"] = stations_df["StationName"] + ", " + stations_df["City"] + ", " + stations_df["State"]

# 2. Apply geocoding for the full location
# --- ADDED PROGRESS BAR HERE (using .progress_apply) ---
print("Starting geocoding for Full Location...")
stations_df[["longitude_full", "latitude_full"]] = stations_df["location_full"].progress_apply(
    lambda x: geocode_location_safe(x)
).apply(pd.Series)

# 3. Create the City & State location string
stations_df["location_city_state"] = stations_df["City"] + ", " + stations_df["State"]

# 4. Apply geocoding for City & State, but ONLY where full geocoding failed (lon is NaN)
failed_mask = stations_df["longitude_full"].isna()
rows_to_geocode = failed_mask.sum()

print(f"\nStarting geocoding for City/State fallback on {rows_to_geocode} rows...")

# Only apply the function to the rows where the full geocoding failed
# --- ADDED PROGRESS BAR HERE (using .progress_apply on the subset) ---
stations_df.loc[failed_mask, ["longitude_city_state", "latitude_city_state"]] = stations_df.loc[failed_mask, "location_city_state"].progress_apply(
    lambda x: geocode_location_safe(x)
).apply(pd.Series)

# Fill remaining non-failed rows with NaN to ensure consistent column structure
stations_df["longitude_city_state"] = stations_df["longitude_city_state"].fillna(pd.NA)
stations_df["latitude_city_state"] = stations_df["latitude_city_state"].fillna(pd.NA)


# 5. Combine the results
stations_df["longitude"] = stations_df["longitude_full"].combine_first(stations_df["longitude_city_state"])
stations_df["latitude"] = stations_df["latitude_full"].combine_first(stations_df["latitude_city_state"])

# 6. Drop the intermediate columns
columns_to_drop = [
    "location_full", "longitude_full", "latitude_full",
    "location_city_state", "longitude_city_state", "latitude_city_state"
]
stations_df = stations_df.drop(columns=columns_to_drop)

print("\nFinal DataFrame structure:")
print(stations_df.head(5))
print(stations_df.info())

  from .autonotebook import tqdm as notebook_tqdm


Starting geocoding for Full Location...


100%|██████████| 230/230 [07:28<00:00,  1.95s/it]



Starting geocoding for City/State fallback on 94 rows...


100%|██████████| 94/94 [03:43<00:00,  2.37s/it]


Final DataFrame structure:
  StationId                              StationName               City  \
0     AP001                  Secretariat, Amaravati           Amaravati   
1     AP002  Anand Kala Kshetram, Rajamahendravaram   Rajamahendravaram   
2     AP003                      Tirumala, Tirupati            Tirupati   
3     AP004                 PWD Grounds, Vijayawada          Vijayawada   
4     AP005          GVM Corporation, Visakhapatnam       Visakhapatnam   

            State  Status  longitude   latitude  
0  Andhra Pradesh  Active  80.485951  16.537386  
1  Andhra Pradesh     NaN        NaN        NaN  
2  Andhra Pradesh     NaN  79.349752  13.679524  
3  Andhra Pradesh     NaN        NaN        NaN  
4  Andhra Pradesh  Active        NaN        NaN  
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 230 entries, 0 to 229
Data columns (total 7 columns):
 #   Column       Non-Null Count  Dtype  
---  ------       --------------  -----  
 0   StationId    230 non-null   




In [None]:
stations_df['latitude'].isnull().sum()

94

In [None]:
stations_df.to_csv('./stations_df1.csv', index=False) 

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Weather Loader").getOrCreate()
print(spark)

<pyspark.sql.session.SparkSession object at 0x0000016F9A0FC470>


In [None]:
historic_df = spark.read.csv("./transformation and cleaning/air_quality_final_20250929_105402.csv", header=True, inferSchema=True)

# Filter out rows with year less than 2000 and missing aqi values
historic_df = historic_df.filter((historic_df.year >= 2000) & (historic_df.aqi.isNotNull()))

# Drop the specified columns
historic_df = historic_df.drop("_c21", "geocoding_source")

historic_df.show()

+--------------+----------------+----------------------+------------------------------+--------------------+----+----+---------+------+----+-----+---+-------------------+----+------+------+---+------------+----------+----------+
|_sampling_date|           state|city_town_village_area|location_of_monitoring_station|    type_of_location| so2| no2|rspm_pm10|pm_2_5|year|month|day|               time|hour|minute|second|aqi|aqi_category|  latitude| longitude|
+--------------+----------------+----------------------+------------------------------+--------------------+----+----+---------+------+----+-----+---+-------------------+----+------+------+---+------------+----------+----------+
|7/1/2014 12:48|       Meghalaya|                 Dawki|          Terrace building,...|Residential, Rura...| 2.0|11.0|     52.0|  NULL|2014|    7|  1|2025-10-03 12:48:04|  12|    48|     4| 52|Satisfactory|25.1856343|92.0215717|
|1/1/2014 12:25|             Goa|                Panaji|          Infront of Old GS.

In [53]:
historic_df = spark.read.csv("./rest_missing.csv",header=True,inferSchema=True)

In [54]:
# Get unique location-year pairs using Spark DataFrame operations
unique_loc_year_spark = historic_df.select('latitude', 'longitude', 'year').distinct()

# Show the count of unique location-year pairs
print("Total unique (lat,lon,year) pairs:", unique_loc_year_spark.count())

# Display the first few rows of the unique location-year pairs
unique_loc_year_spark.show(n=5)

Total unique (lat,lon,year) pairs: 193
+----------+----------+----+
|  latitude| longitude|year|
+----------+----------+----+
| 9.0014992|76.5347516|2011|
|27.4844597|94.9019447|2015|
|32.2504936|77.1881383|2014|
|18.5004949|73.8529037|2004|
|22.3474121|82.5340303|2009|
+----------+----------+----+
only showing top 5 rows


In [None]:
import os
import pandas as pd
import requests
import time
import json
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DoubleType, TimestampType

# Initialize Spark session
spark = SparkSession.builder.appName("WeatherFetcher").getOrCreate()

# Schema for final Spark DataFrame
weather_schema = StructType([
    StructField("datetime", TimestampType(), True),
    StructField("temperature", DoubleType(), True),
    StructField("wind", DoubleType(), True),
    StructField("humidity", DoubleType(), True),
    StructField("lat", DoubleType(), True),
    StructField("lon", DoubleType(), True)
])

# ----------------------------
# Function to fetch weather
# ----------------------------
def fetch_weather(lat, lon, year, max_retries=5):
    lat = float(lat)
    lon = float(lon)
    year = int(year)

    start = f"{year}-01-01"
    end   = f"{year}-12-31"

    params = {
        "latitude": lat,
        "longitude": lon,
        "start_date": start,
        "end_date": end,
        "hourly": ["temperature_2m", "windspeed_10m", "relative_humidity_2m"],
        "timezone": "Asia/Kolkata"
    }

    retries = 0
    while retries < max_retries:
        try:
            r = requests.get("https://archive-api.open-meteo.com/v1/archive", params=params, timeout=60)
            r.raise_for_status()
            data = r.json()

            # Case 1: API responds but has no data
            if "hourly" not in data or not data["hourly"].get("time"):
                return None, {"lat": lat, "lon": lon, "year": year, "error": "no_data_available"}

            pdf = pd.DataFrame({
                "datetime": pd.to_datetime(data["hourly"]["time"]),
                "temperature": data["hourly"]["temperature_2m"],
                "wind": data["hourly"]["windspeed_10m"],
                "humidity": data["hourly"]["relative_humidity_2m"],
                "lat": lat,
                "lon": lon
            })
            return pdf, None  # ✅ success

        except (requests.exceptions.RequestException, json.JSONDecodeError) as e:
            retries += 1
            if retries >= max_retries:
                return None, {"lat": lat, "lon": lon, "year": year, "error": f"api_failure: {str(e)}"}
            time.sleep(2 ** retries)
        except Exception as e:
            return None, {"lat": lat, "lon": lon, "year": year, "error": f"unexpected: {str(e)}"}

    return None, {"lat": lat, "lon": lon, "year": year, "error": "unknown_failure"}


# ----------------------------
# MAIN PIPELINE
# ----------------------------
# Get your unique (lat, lon, year) pairs from Spark
pairs = unique_loc_year_spark.collect()
print(f"Total tasks: {len(pairs)}")

results = []
errors = []  # store failed requests

# Run parallel fetching
with ThreadPoolExecutor(max_workers=5) as executor:  # adjust workers depending on CPU/internet
    futures = {
        executor.submit(fetch_weather, row.latitude, row.longitude, row.year): row 
        for row in pairs
    }

    for future in tqdm(as_completed(futures), total=len(futures), desc="Fetching weather"):
        pdf, error = future.result()
        if pdf is not None:
            results.append(pdf)
        if error is not None:
            errors.append(error)

# ----------------------------
# Convert results to Spark DataFrame
# ----------------------------
if results:
    all_weather_pd1 = pd.concat(results, ignore_index=True)
    print("✅ Weather data saved successfully!")
else:
    print("⚠️ No data fetched.")

# ----------------------------
# Save errors separately
# ----------------------------
if errors:
    error_df = pd.DataFrame(errors)
    print(f"⚠️ {len(errors)} requests failed. Errors saved to weather_fetch_errors.parquet")
else:
    print("✅ No errors encountered!")


Total tasks: 193


Fetching weather: 100%|██████████| 193/193 [06:22<00:00,  1.98s/it]

⚠️ No data fetched.
⚠️ 193 requests failed. Errors saved to weather_fetch_errors.parquet





In [62]:
print(error_df.head())

         lat        lon  year  \
0  18.500495  73.852904  2004   
1  22.347412  82.534030  2009   
2  21.149813  79.082056  2010   
3  26.180598  91.753943  2010   
4  26.296772  73.035143  2005   

                                               error  
0  429 Client Error: Too Many Requests for url: h...  
1  429 Client Error: Too Many Requests for url: h...  
2  429 Client Error: Too Many Requests for url: h...  
3  429 Client Error: Too Many Requests for url: h...  
4  429 Client Error: Too Many Requests for url: h...  


In [None]:
weather_df = spark.createDataFrame(all_weather_pd, schema=weather_schema)

    # Save fewer parquet files
weather_df.coalesce(10).write.mode("overwrite").parquet(
        "C:/Users/SNEHIL/Downloads/Air_quality_Monitoring/final_weather_data_all"
)

In [56]:
all_weather_pd.head()

Unnamed: 0,datetime,temperature,wind,humidity,lat,lon,year
0,2013-01-01 00:00:00,11.2,7.9,81,26.400882,80.400005,2013
1,2013-01-01 01:00:00,10.6,8.0,82,26.400882,80.400005,2013
2,2013-01-01 02:00:00,10.1,7.3,84,26.400882,80.400005,2013
3,2013-01-01 03:00:00,9.5,6.4,85,26.400882,80.400005,2013
4,2013-01-01 04:00:00,9.0,5.9,87,26.400882,80.400005,2013


In [10]:
# Assuming 'datetime' column is already in datetime format
all_weather_pd['year'] = all_weather_pd['datetime'].dt.year 


In [11]:
uni_all_weather_pd = all_weather_pd[['lat','lon','year']].drop_duplicates()

In [None]:
uni_all_weather_pd

(278, 3)

In [19]:
from pyspark.sql.functions import col

# Perform a left anti join to find the non-matching combinations
non_matching_combinations = unique_loc_year_spark.join(
    uni_all_weather_spark,
    on=(
        (unique_loc_year_spark["latitude"] == uni_all_weather_spark["lat"]) &
        (unique_loc_year_spark["longitude"] == uni_all_weather_spark["lon"]) &
        (unique_loc_year_spark["year"] == uni_all_weather_spark["year"])
    ),
    how="left_anti" # Use "left_anti" for this specific purpose
)




In [24]:
all_weather_pd.shape

(2436600, 7)

In [69]:
# Save the DataFrame with explicit options for better Spark compatibility
# Use 'fastparquet' as the engine and specify the timestamp format
# OR use pyarrow with specific version and format options

import pandas as pd
import pyarrow.parquet

# Using pyarrow (recommended):
df_dropped.to_parquet(
    'all_weather_pd_compatible_final.parquet', 
    engine='pyarrow', 
    version='1.0',  # Use an older, more compatible Parquet version
    allow_truncated_timestamps=True # Allows conversion from nanoseconds to microseconds/milliseconds
)

# Optional: Using fastparquet (sometimes resolves issues pyarrow can't):
# all_weather_pd.to_parquet(
#     'all_weather_pd_compatible.parquet', 
#     engine='fastparquet',
#     times='int96' # This is an older, often-compatible timestamp format
# )

In [64]:
all_weather_pd = pd.read_parquet("./all_weather_pd_compatible.parquet")

In [65]:
all_weather_pd1 = pd.read_parquet("./all_weather_pd_compatible1.parquet")

In [66]:
all_weather_pd2 = pd.read_parquet("./all_weather_pd_compatible2.parquet")

In [67]:
all_weather_combined_pd = pd.concat([all_weather_pd,all_weather_pd1, all_weather_pd2], ignore_index=True)
display(all_weather_combined_pd.head())

Unnamed: 0,datetime,temperature,wind,humidity,lat,lon,year
0,2013-01-01 00:00:00,25.2,7.1,89,9.500665,76.412414,
1,2013-01-01 01:00:00,25.1,6.0,89,9.500665,76.412414,
2,2013-01-01 02:00:00,24.7,4.7,91,9.500665,76.412414,
3,2013-01-01 03:00:00,24.3,3.5,93,9.500665,76.412414,
4,2013-01-01 04:00:00,24.1,4.6,94,9.500665,76.412414,


In [68]:
df_dropped = all_weather_combined_pd.drop('year', axis=1) # or axis='columns'
print(df_dropped)

                    datetime  temperature  wind  humidity        lat  \
0        2013-01-01 00:00:00         25.2   7.1        89   9.500665   
1        2013-01-01 01:00:00         25.1   6.0        89   9.500665   
2        2013-01-01 02:00:00         24.7   4.7        91   9.500665   
3        2013-01-01 03:00:00         24.3   3.5        93   9.500665   
4        2013-01-01 04:00:00         24.1   4.6        94   9.500665   
...                      ...          ...   ...       ...        ...   
21868459 2012-12-31 19:00:00         23.8   7.9        52  22.300123   
21868460 2012-12-31 20:00:00         22.5   7.7        56  22.300123   
21868461 2012-12-31 21:00:00         21.4   7.8        59  22.300123   
21868462 2012-12-31 22:00:00         20.4   7.4        62  22.300123   
21868463 2012-12-31 23:00:00         19.4   7.4        64  22.300123   

                lon  
0         76.412414  
1         76.412414  
2         76.412414  
3         76.412414  
4         76.412414  
...

In [63]:
error_df.to_csv('./error.csv')

In [45]:
kk = pd.read_csv('./unique.csv')

In [29]:
kk1 =  pd.read_csv('./missing.csv')

In [31]:
kk1.head()

Unnamed: 0,Latitude,Longitude,Year
0,17.356045,78.455428,2015
1,31.292231,75.567888,2012
2,20.02974,79.186276,2015
3,26.667798,83.364233,2012
4,26.23253,81.23227,2012


In [46]:
kk.head()

Unnamed: 0,lat,lon,year
0,26.400882,80.400005,2013
1,26.667798,83.364233,2012
2,9.926115,78.114098,2007
3,26.23253,81.23227,2012
4,31.292231,75.567888,2012


In [48]:
import pandas as pd



# Perform a left merge with an indicator
merged_df = kk1.merge(
    kk,
    left_on=['Latitude', 'Longitude', 'Year'],
    right_on=['lat', 'lon', 'year'],
    how='left',
    indicator=True
)

# Filter for rows that only exist in kk1 (the left DataFrame)
rows_only_in_kk1 = merged_df[merged_df['_merge'] == 'left_only']

# Optional: Clean up the result to show only the columns from kk1
rows_only_in_kk1 = rows_only_in_kk1[kk1.columns]

print("Rows in kk1 that are not present in kk:")
print(rows_only_in_kk1)


Rows in kk1 that are not present in kk:
      Latitude  Longitude  Year
89    9.001499  76.534752  2011
245  27.484460  94.901945  2015
246  32.250494  77.188138  2014
249  18.500495  73.852904  2004
250  22.347412  82.534030  2009
..         ...        ...   ...
465  22.230926  84.867870  2008
466  23.631732  93.349318  2014
467  28.657152  77.227513  2005
468  28.657152  77.227513  2006
469  17.330726  76.835254  2005

[193 rows x 3 columns]


In [15]:
weather_df = spark.read.parquet('./all_weather_pd_compatible.parquet')

In [16]:
weather_df.show()

+-------------------+-----------+----+--------+---------+----------+
|           datetime|temperature|wind|humidity|      lat|       lon|
+-------------------+-----------+----+--------+---------+----------+
|2013-01-01 00:00:00|       25.2| 7.1|      89|9.5006651|76.4124143|
|2013-01-01 01:00:00|       25.1| 6.0|      89|9.5006651|76.4124143|
|2013-01-01 02:00:00|       24.7| 4.7|      91|9.5006651|76.4124143|
|2013-01-01 03:00:00|       24.3| 3.5|      93|9.5006651|76.4124143|
|2013-01-01 04:00:00|       24.1| 4.6|      94|9.5006651|76.4124143|
|2013-01-01 05:00:00|       23.9| 4.1|      94|9.5006651|76.4124143|
|2013-01-01 06:00:00|       23.6| 3.5|      96|9.5006651|76.4124143|
|2013-01-01 07:00:00|       24.2| 4.7|      95|9.5006651|76.4124143|
|2013-01-01 08:00:00|       26.1| 5.4|      85|9.5006651|76.4124143|
|2013-01-01 09:00:00|       27.2| 4.5|      79|9.5006651|76.4124143|
|2013-01-01 10:00:00|       28.4| 2.6|      73|9.5006651|76.4124143|
|2013-01-01 11:00:00|       29.4| 

In [52]:
rows_only_in_kk1.to_csv('./rest_missing.csv')