# Import data from Kaggle using API
## Click-Through Rate Prediction :
Air pollution causes approximately 7 million premature deaths annually (WHO). This dataset enables researchers and data scientists to:

Analyze global pollution disparities
Investigate health impacts of air quality
Develop predictive models for environmental monitoring

 Data Link : https://www.kaggle.com/datasets/youssefelebiary/air-quality-2024/data


In [8]:
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json


In [9]:
!kaggle datasets download -d youssefelebiary/air-quality-2024

Dataset URL: https://www.kaggle.com/datasets/youssefelebiary/air-quality-2024
License(s): MIT
Downloading air-quality-2024.zip to /content
  0% 0.00/1.76M [00:00<?, ?B/s]
100% 1.76M/1.76M [00:00<00:00, 197MB/s]


In [10]:
!unzip air-quality-2024.zip -d air_quality

Archive:  air-quality-2024.zip
  inflating: air_quality/Air_Quality.csv  
  inflating: air_quality/Brasilia_Air_Quality.csv  
  inflating: air_quality/Cairo_Air_Quality.csv  
  inflating: air_quality/Dubai_Air_Quality.csv  
  inflating: air_quality/London_Air_Quality.csv  
  inflating: air_quality/New_York_Air_Quality.csv  
  inflating: air_quality/Sydney_Air_Quality.csv  


# Install pyspark lib

In [11]:
!pip install pyspark py4j




In [18]:
from pyspark.sql import SparkSession
import pandas as pd
import time


In [13]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("AirQuality").getOrCreate()


In [14]:
pandas_df = pd.read_csv("/content/air_quality/Air_Quality.csv")
spark_df = spark.read.csv("/content/air_quality/Air_Quality.csv", header=True, inferSchema=True)

In [16]:
pandas_df.columns

Index(['Date', 'City', 'CO', 'CO2', 'NO2', 'SO2', 'O3', 'PM2.5', 'PM10',
       'AQI'],
      dtype='object')

In [28]:
pandas_df.rename(columns={"PM2.5": "PM2_5"}, inplace=True)
spark_df = spark_df.withColumnRenamed("PM2.5", "PM2_5")

## Using Pandas library

In [31]:
pollutants = ["CO", "CO2", "NO2", "SO2", "O3", "PM2_5", "PM10", "AQI"]

start = time.time()

pandas_stats = (
    pandas_df.groupby("City")[pollutants]
    .agg(["mean", lambda x: x.quantile(0.25), "median", lambda x: x.quantile(0.75)])
)

# Rename columns: pollutant_mean, pollutant_q25, ...
pandas_stats.columns = [
    f"{col[0]}_{col[1] if isinstance(col[1], str) else 'q25' if col[1]==0.25 else 'q75'}"
    for col in pandas_stats.columns
]
pandas_stats = pandas_stats.reset_index()

end = time.time()
print("✅ Pandas Stats:")
print(pandas_stats.head())
print(f"⏱ Pandas Time: {end - start:.4f} seconds")




✅ Pandas Stats:
       City     CO_mean  CO_<lambda_0>  CO_median  CO_<lambda_1>    CO2_mean  \
0  Brasilia  221.450478          147.0      192.0          267.0  445.726368   
1     Cairo  293.819217          198.0      239.0          316.0  457.920398   
2     Dubai  440.464026          305.0      400.0          533.0  463.778607   
3    London  187.744194          156.0      177.0          205.0  475.114428   
4  New York  283.640027          207.0      248.0          313.0  488.358831   

   CO2_<lambda_0>  CO2_median  CO2_<lambda_1>   NO2_mean  ...  PM2_5_median  \
0           440.0       444.0           450.0   8.735633  ...           5.3   
1           449.0       454.0           462.0  32.617634  ...          20.7   
2           452.0       461.0           473.0  39.488297  ...          39.2   
3           450.0       458.0           482.0  21.797495  ...           7.8   
4           457.0       471.0           494.0  27.804360  ...          11.2   

   PM2_5_<lambda_1>   PM10_m

## Using Pyspark


In [21]:
spark_df.groupBy("City").count().orderBy("count", ascending=False).show()

+--------+-----+
|    City|count|
+--------+-----+
|   Cairo| 8784|
|  London| 8784|
|  Sydney| 8784|
|Brasilia| 8784|
|   Dubai| 8784|
|New York| 8784|
+--------+-----+



In [33]:
from pyspark.sql.functions import mean, percentile_approx
import time

# Rename columns like "PM2.5" to "PM2_5" for Spark compatibility
pollutants = ["CO", "CO2", "NO2", "SO2", "O3", "PM2.5", "PM10", "AQI"]
rename_map = {col: col.replace(".", "_") for col in pollutants}

# Apply renaming
for old_name, new_name in rename_map.items():
    spark_df = spark_df.withColumnRenamed(old_name, new_name)

# Updated pollutant names (for renamed columns)
pollutants_clean = [name.replace(".", "_") for name in pollutants]

# Create aggregate expressions for mean and quantiles
agg_exprs = []
for col in pollutants_clean:
    agg_exprs.extend([
        mean(col).alias(f"{col}_mean"),
        percentile_approx(col, 0.25).alias(f"{col}_q25"),
        percentile_approx(col, 0.5).alias(f"{col}_median"),
        percentile_approx(col, 0.75).alias(f"{col}_q75"),
    ])

# Time the operation
start = time.time()

df_stats = spark_df.groupBy("City").agg(*agg_exprs)

end = time.time()

# Show result
df_stats.show(5, truncate=False)

# Print time taken
print(f"⏱ Spark DataFrame API Time: {end - start:.4f} seconds")


+--------+------------------+------+---------+------+------------------+-------+----------+-------+------------------+-------+----------+-------+------------------+-------+----------+-------+-----------------+------+---------+------+-----------------+---------+------------+---------+------------------+--------+-----------+--------+------------------+---------+----------+---------+
|City    |CO_mean           |CO_q25|CO_median|CO_q75|CO2_mean          |CO2_q25|CO2_median|CO2_q75|NO2_mean          |NO2_q25|NO2_median|NO2_q75|SO2_mean          |SO2_q25|SO2_median|SO2_q75|O3_mean          |O3_q25|O3_median|O3_q75|PM2_5_mean       |PM2_5_q25|PM2_5_median|PM2_5_q75|PM10_mean         |PM10_q25|PM10_median|PM10_q75|AQI_mean          |AQI_q25  |AQI_median|AQI_q75  |
+--------+------------------+------+---------+------+------------------+-------+----------+-------+------------------+-------+----------+-------+------------------+-------+----------+-------+-----------------+------+---------+------