# SENG-550 Term Project 
### ML Classifier for Air Quality in Dublin


In [108]:
#Importing PySpark Libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, concat, when, split, mean, lower, trim
from pyspark.ml.evaluation import BinaryClassificationEvaluator 
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler

spark = SparkSession.builder.appName("PollutionClassification").getOrCreate()

file_path = "dublin-city-council-co-2011p20110929-1048.csv"

In [113]:
from pyspark.sql.types import StructType, StructField, StringType

# Load the file and skip the first 5 rows
rdd = sc.textFile(file_path).zipWithIndex() \
    .filter(lambda line_num: line_num[1] >= 7) \
    .map(lambda line_num: line_num[0])

# Extract the header and data
header = rdd.first()  # Fetch the first row as the header
data_rdd = rdd.filter(lambda row: row != header)  # Exclude header from data rows

# Define the column names from the header
columns = header.split(",")

# Split data rows by commas and trim whitespace
data_split_rdd = data_rdd.map(lambda row: [x.strip() for x in row.split(",")])

# Filter rows to ensure the correct number of columns
data_split_rdd = data_split_rdd.filter(lambda row: len(row) == len(columns))

# Define schema
schema = StructType([
    StructField("Date", StringType(), True),
    StructField("Time", StringType(), True),
    StructField("Wood_CO_mg_m3", StringType(), True),
    StructField("Wood_8hr_avg", StringType(), True),
    StructField("Wood_Flag", StringType(), True),
    StructField("Wood_Comment", StringType(), True),
    StructField("Col_CO_mg_m3", StringType(), True),
    StructField("Col_8hr_avg", StringType(), True),
    StructField("Col_Flag", StringType(), True),
    StructField("Col_Comment", StringType(), True),
])

# Convert RDD to DataFrame
data = spark.createDataFrame(data_split_rdd, schema=schema)

# Create a list of values to be replaced with NULL
special_values = ['#DIV/0!', 'cal', 'CAL', 'no data', 'No data', 'ext cal', 'SERVICE', 'Cal', 'Zero Air', 'Service', '']

def replace_special_values(column_name):
    # Handle empty strings and whitespace explicitly
    condition = (
        when(trim(col(column_name)) == '', None)
        .when(lower(trim(col(column_name))).isin([v.lower() for v in special_values]), None)
    )
    return condition.otherwise(col(column_name))

# Apply the replacements to all columns
columns_to_clean = ['Wood_CO_mg_m3', 'Wood_8hr_avg', 'Col_CO_mg_m3', 'Col_8hr_avg']

for column in columns_to_clean:
    data = data.withColumn(column, replace_special_values(column))

# Show the DataFrame
data.show()

+----------+----+-------------+------------+---------+------------+------------+-----------+--------+-----------+
|      Date|Time|Wood_CO_mg_m3|Wood_8hr_avg|Wood_Flag|Wood_Comment|Col_CO_mg_m3|Col_8hr_avg|Col_Flag|Col_Comment|
+----------+----+-------------+------------+---------+------------+------------+-----------+--------+-----------+
|31/12/2010|  18|         NULL|        NULL|         |            |        NULL|       NULL|        |           |
|31/12/2010|  19|         NULL|        NULL|         |            |        NULL|       NULL|        |           |
|31/12/2010|  20|         NULL|        NULL|         |            |        NULL|       NULL|        |           |
|31/12/2010|  21|         NULL|        NULL|         |            |        NULL|       NULL|        |           |
|31/12/2010|  22|         NULL|        NULL|         |            |        NULL|       NULL|        |           |
|31/12/2010|  23|         NULL|        NULL|         |            |        NULL|       N

In [116]:
def classify_air_quality(col_val):
    if col_val is None:
        return "Unknown"
    col_val = float(col_val)
    if col_val <= 0.2:
        return "Low"
    elif col_val <= 0.4:
        return "Moderate"
    elif col_val <= 0.6:
        return "High"
    else:
        return "Very High"


In [117]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define the UDF
classify_udf = udf(lambda val: classify_air_quality(val), StringType())

# Apply classification to Wood_8hr_avg and Col_8hr_avg
data = data.withColumn("Wood_Quality", classify_udf(col("Wood_8hr_avg"))) \
           .withColumn("Col_Quality", classify_udf(col("Col_8hr_avg")))

data.show(8700)

+----------+----+-------------+------------+---------+------------+------------+-----------+--------+-----------+------------+-----------+
|      Date|Time|Wood_CO_mg_m3|Wood_8hr_avg|Wood_Flag|Wood_Comment|Col_CO_mg_m3|Col_8hr_avg|Col_Flag|Col_Comment|Wood_Quality|Col_Quality|
+----------+----+-------------+------------+---------+------------+------------+-----------+--------+-----------+------------+-----------+
|31/12/2010|  18|         NULL|        NULL|         |            |        NULL|       NULL|        |           |     Unknown|    Unknown|
|31/12/2010|  19|         NULL|        NULL|         |            |        NULL|       NULL|        |           |     Unknown|    Unknown|
|31/12/2010|  20|         NULL|        NULL|         |            |        NULL|       NULL|        |           |     Unknown|    Unknown|
|31/12/2010|  21|         NULL|        NULL|         |            |        NULL|       NULL|        |           |     Unknown|    Unknown|
|31/12/2010|  22|         N