In [1]:
!pip install -q pyspark numpy pandas pyarrow

from pyspark.sql import SparkSession, functions as F, types as T
from pyspark.sql.window import Window
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA, Bucketizer, ChiSqSelector
from pyspark.ml.regression import LinearRegression
from pyspark.ml.stat import Correlation, ChiSquareTest
from pyspark.ml.evaluation import RegressionEvaluator

spark = (SparkSession.builder
         .appName("SAT5165_AirQuality_Project")
         .getOrCreate())
spark.sparkContext.setLogLevel("WARN")

DATA_PATH = "/content/pollution_us_2000_2016.csv"
BASE_OUT = "/content/output"


In [3]:
from pyspark.sql import functions as F
from functools import reduce

DATA_PATH = "/content/pollution_us_2000_2016.csv"

df = (spark.read
      .option("header", True)
      .option("inferSchema", True)
      .csv(DATA_PATH))

aqi_cols_all   = ["NO2 AQI","O3 AQI","SO2 AQI","CO AQI","PM2.5 AQI","PM10 AQI"]
means_pref_all = ["NO2 Mean","O3 Mean","SO2 Mean","CO Mean","PM2.5 Mean","PM10 Mean"]

aqi_present   = [c for c in aqi_cols_all   if c in df.columns]
means_present = [c for c in means_pref_all if c in df.columns]

if aqi_present:
    df = df.withColumn(
        "AQI",
        F.array_max(F.array(*[F.col(c).cast("double") for c in aqi_present]))
    )
else:
    if not means_present:
        raise ValueError("No AQI or pollutant mean columns found.")
    sum_expr = reduce(lambda a, b: a + b, [F.col(c).cast("double") for c in means_present])
    df = df.withColumn("AQI", sum_expr)

keep_cols = ["AQI","State","County","City","Date Local"] + means_present
df = df.select(*[c for c in keep_cols if c in df.columns])

df.write.mode("overwrite").parquet("/content/output/00_raw_selected.parquet")
print("Saved → /content/output/00_raw_selected.parquet")


Saved → /content/output/00_raw_selected.parquet


In [4]:
df = spark.read.parquet(f"{BASE_OUT}/00_raw_selected.parquet")

numeric_cols = [c for c, t in df.dtypes if t in ("double","float","int","bigint")]
global_medians = {}
for c in numeric_cols:
    q = df.approxQuantile(c, [0.5], 0.001)
    global_medians[c] = float(q[0]) if q and q[0] is not None else None
df = df.fillna(global_medians)

if "City" in df.columns and "Date_Local" in df.columns:
    w_roll = Window.partitionBy("State","City").orderBy(F.col("Date_Local")).rowsBetween(-2,0)
    for c in ["NO2_Mean","O3_Mean","SO2_Mean","CO_Mean"]:
        if c in df.columns:
            df = df.withColumn(f"{c}_sm", F.avg(F.col(c)).over(w_roll))
    feature_cols_clean = [f"{c}_sm" for c in ["NO2_Mean","O3_Mean","SO2_Mean","CO_Mean"] if c in df.columns]
else:
    feature_cols_clean = ["NO2_Mean","O3_Mean","SO2_Mean","CO_Mean"]

df = df.withColumn("unhealthy_label", (F.col("AQI") >= 101).cast("int"))
df.write.mode("overwrite").parquet(f"{BASE_OUT}/10_clean.parquet")
print("Cleaned data saved at:", f"{BASE_OUT}/10_clean.parquet")


Cleaned data saved at: /content/output/10_clean.parquet


In [6]:
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation

df = spark.read.parquet("/content/output/10_clean.parquet")

# Prefer smoothed cols like "NO2 Mean_sm"; else use raw mean cols with spaces
means_with_spaces = ["NO2 Mean","O3 Mean","SO2 Mean","CO Mean","PM2.5 Mean","PM10 Mean"]
smoothed_cols = [c for c in df.columns if c.endswith("_sm")]
feature_cols = smoothed_cols if smoothed_cols else [c for c in means_with_spaces if c in df.columns]

if not feature_cols:
    raise ValueError("No feature columns found for correlation.")

# Pairwise corr with AQI
for c in feature_cols:
    val = df.stat.corr(c, "AQI")
    print(f"Correlation between {c} and AQI = {val if val is not None else 'NA'}")

# Full feature correlation matrix
corr_input = df.select(*feature_cols).dropna()
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_corr")
corr_df = assembler.transform(corr_input)
corr_mat = Correlation.corr(corr_df, "features_corr", "pearson").head()[0]
print("\nCorrelation matrix (features):")
print(corr_mat)


Correlation between NO2 Mean and AQI = 0.3056180198257939
Correlation between O3 Mean and AQI = 0.5227135895249344
Correlation between SO2 Mean and AQI = 0.20492773215226473
Correlation between CO Mean and AQI = 0.18195839797498403

Correlation matrix (features):
DenseMatrix([[ 1.        , -0.43265014,  0.34818603,  0.6418281 ],
             [-0.43265014,  1.        , -0.11040144, -0.33942643],
             [ 0.34818603, -0.11040144,  1.        ,  0.21521638],
             [ 0.6418281 , -0.33942643,  0.21521638,  1.        ]])


In [10]:
!pip install -q pyspark numpy pandas pyarrow

from pyspark.sql import SparkSession, functions as F
from pyspark.sql import Row
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

spark = SparkSession.builder.appName("AQI_Regression").getOrCreate()
spark.sparkContext.setLogLevel("WARN")


df = (spark.read
      .option("header", True)
      .option("inferSchema", True)
      .csv("/content/pollution_us_2000_2016.csv"))


aqi_cols = [c for c in ["NO2 AQI","O3 AQI","SO2 AQI","CO AQI","PM2.5 AQI","PM10 AQI"] if c in df.columns]
if not aqi_cols:
    raise ValueError("No AQI columns found (e.g., 'NO2 AQI'). Check the CSV headers.")
df = df.withColumn("AQI_label", F.array_max(F.array(*[F.col(c).cast("double") for c in aqi_cols]))).dropna(subset=["AQI_label"])


feature_cols = [c for c in ["NO2 Mean","O3 Mean","SO2 Mean","CO Mean","PM2.5 Mean","PM10 Mean"] if c in df.columns]
if not feature_cols:
    raise ValueError("No pollutant mean columns found (e.g., 'NO2 Mean').")
df = df.dropna(subset=feature_cols).select(*(feature_cols + ["AQI_label"])).sample(False, 0.3, 42).cache()


train, test = df.randomSplit([0.8, 0.2], seed=42)


assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_vec")
scaler = StandardScaler(inputCol="features_vec", outputCol="features_z", withMean=True, withStd=True)
lr = LinearRegression(featuresCol="features_z", labelCol="AQI_label", predictionCol="AQI_pred", maxIter=50, regParam=0.1, elasticNetParam=0.2)

model = Pipeline(stages=[assembler, scaler, lr]).fit(train)


pred = model.transform(test).cache()
rmse = RegressionEvaluator(labelCol="AQI_label", predictionCol="AQI_pred", metricName="rmse").evaluate(pred)
mae  = RegressionEvaluator(labelCol="AQI_label", predictionCol="AQI_pred", metricName="mae").evaluate(pred)
r2   = RegressionEvaluator(labelCol="AQI_label", predictionCol="AQI_pred", metricName="r2").evaluate(pred)
print(f"RMSE: {rmse:.4f}  MAE: {mae:.4f}  R2: {r2:.4f}")


spark.createDataFrame([Row(feature=f, coefficient=float(c)) for f, c in zip(feature_cols, model.stages[-1].coefficients.toArray())]) \
     .write.mode("overwrite").parquet("/content/aqi_regression_coefficients.parquet")
pred.select(*feature_cols, "AQI_label", "AQI_pred").write.mode("overwrite").parquet("/content/aqi_regression_predictions.parquet")
spark.createDataFrame([("rmse", float(rmse)), ("mae", float(mae)), ("r2", float(r2))], ["metric","value"]) \
     .write.mode("overwrite").parquet("/content/aqi_regression_metrics.parquet")

pred.select(*feature_cols, "AQI_label", "AQI_pred").show(10, truncate=False)


RMSE: 11.6392  MAE: 7.5379  R2: 0.6266
+--------+--------+--------+--------+---------+------------------+
|NO2 Mean|O3 Mean |SO2 Mean|CO Mean |AQI_label|AQI_pred          |
+--------+--------+--------+--------+---------+------------------+
|0.0     |0.004917|2.913043|1.8875  |27.0     |3.686640716821884 |
|0.0     |0.007542|2.173913|0.795833|31.0     |2.678932294151629 |
|0.0     |0.007625|0.0     |0.879167|11.0     |1.9062933443701766|
|0.0     |0.011542|0.130435|0.483333|20.0     |5.713916569286219 |
|0.0     |0.013625|5.271429|0.379167|25.0     |10.929289100210333|
|0.0     |0.014708|0.9625  |0.8125  |33.0     |11.62991792365656 |
|0.0     |0.015444|0.0     |0.0     |14.0     |9.029265191031431 |
|0.0     |0.0175  |0.516667|0.029167|17.0     |12.16461887776179 |
|0.0     |0.019458|0.0     |0.0     |21.0     |14.38148039571611 |
|0.0     |0.019708|0.0     |0.0     |19.0     |14.714827132430404|
+--------+--------+--------+--------+---------+------------------+
only showing top 10 row

In [13]:
from pyspark.sql import functions as F
from pyspark.ml.feature import Bucketizer, VectorAssembler
from pyspark.ml.stat import ChiSquareTest

df = spark.read.parquet(f"{BASE_OUT}/10_clean.parquet")

means_with_spaces = ["NO2 Mean","O3 Mean","SO2 Mean","CO Mean","PM2.5 Mean","PM10 Mean"]


smoothed_cols = [c for c in df.columns if c.endswith("_sm")]
expected_smoothed = [f"{c}_sm" for c in means_with_spaces]
feature_cols = [c for c in smoothed_cols if c in expected_smoothed]


if not feature_cols:
    feature_cols = [c for c in means_with_spaces if c in df.columns]

if not feature_cols:
    raise ValueError("No pollutant feature columns found (neither *_sm nor raw means).")


if "unhealthy_label" not in df.columns:
    df = df.withColumn("unhealthy_label", (F.col("AQI") >= 101).cast("int"))
else:
    df = df.withColumn("unhealthy_label", F.col("unhealthy_label").cast("int"))


df = df.dropna(subset=feature_cols + ["unhealthy_label"])


bins = 4
bucketed_cols = []
for c in feature_cols:
    quants = df.approxQuantile(c, [i/bins for i in range(1, bins)], 0.01)
    splits = [-float("inf")] + quants + [float("inf")]
    b = Bucketizer(splits=splits, inputCol=c, outputCol=f"{c}_bkt").setHandleInvalid("keep")
    df = b.transform(df)
    bucketed_cols.append(f"{c}_bkt")


assembler = VectorAssembler(inputCols=bucketed_cols, outputCol="feat_cat")
chi_df = assembler.transform(df.select("unhealthy_label", *bucketed_cols)).select("feat_cat", "unhealthy_label")

res = ChiSquareTest.test(chi_df, "feat_cat", "unhealthy_label").head()
print("Chi-Square Test p-values:", res.pValues)
print("degreesOfFreedom:", res.degreesOfFreedom)
print("statistics:", res.statistics)



Chi-Square Test p-values: [0.0,0.0,0.0,0.0]
degreesOfFreedom: [3, 3, 3, 3]
statistics: [12374.238854643576,84583.46970261444,11305.897410927417,3187.5944494836167]


In [16]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA
from pyspark.ml import Pipeline
from pyspark.ml.functions import vector_to_array

BASE_OUT = "/content/output"

spark = SparkSession.builder.appName("SAT5165_PCA_Sucharitha").getOrCreate()

df = spark.read.parquet(f"{BASE_OUT}/10_clean.parquet")

means_with_spaces = ["NO2 Mean","O3 Mean","SO2 Mean","CO Mean","PM2.5 Mean","PM10 Mean"]
smoothed_cols = [c for c in df.columns if c.endswith("_sm")]
expected_smoothed = [f"{c}_sm" for c in means_with_spaces]
feature_cols = [c for c in smoothed_cols if c in expected_smoothed]
if not feature_cols:
    feature_cols = [c for c in means_with_spaces if c in df.columns]
if not feature_cols:
    raise ValueError("No feature columns found for PCA.")

for c in feature_cols:
    df = df.withColumn(c, F.col(c).cast(DoubleType()))
df = df.dropna(subset=feature_cols)

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_vec")
scaler = StandardScaler(inputCol="features_vec", outputCol="features_z", withMean=True, withStd=True)
k = min(4, len(feature_cols))
pca = PCA(k=k, inputCol="features_z", outputCol="pca_vec")

pipeline = Pipeline(stages=[assembler, scaler, pca])
model = pipeline.fit(df)

ev = [float(x) for x in model.stages[-1].explainedVariance]
print("Explained Variance by Principal Components:", ev)

scored = model.transform(df).withColumn("pca_arr", vector_to_array("pca_vec"))
for i in range(len(ev)):
    scored = scored.withColumn(f"pc_{i+1}", F.col("pca_arr")[i])

scored.select(*[f"pc_{i+1}" for i in range(len(ev))]).show(5, truncate=False)


Explained Variance by Principal Components: [0.5257831826862004, 0.22542532144506988, 0.16650776678531745, 0.08228372908341236]
+------------------+-------------------+-------------------+--------------------+
|pc_1              |pc_2               |pc_3               |pc_4                |
+------------------+-------------------+-------------------+--------------------+
|1.7377327499127564|0.40158725623257435|0.10359616296385081|-0.21817895883172167|
|1.7451441500301703|0.4032987370181092 |0.09608461043963472|-0.22604051704491213|
|1.7419062347177334|0.3912312407875336 |0.10764207430078364|-0.22036476356978463|
|1.7493176348351474|0.3929427215730684 |0.10013052177656755|-0.22822632178297508|
|0.6286862157917354|0.07452214209596991|-0.6099443236202216|-0.2724119531224046 |
+------------------+-------------------+-------------------+--------------------+
only showing top 5 rows

