In [1]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
0% [Waiting for headers] [Connected to cloud.r-project.org (108.157.173.54)] [C                                                                               Get:2 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
                                                                               Get:3 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
                                                                               Get:4 https://cli.github.com/packages stable InRelease [3,917 B]
0% [2 InRelease 33.0 kB/128 kB 26%] [Waiting for headers] [Waiting for headers]                                                                               Get:5 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
0% [2 InRelease 64.9 kB/128 kB 51%] [5 InRelease 14.2 kB/129 kB 11%] [Waiting f0% [Waiting for headers] [5 InRelease 14.2 kB/129 kB 11%]

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("AirQualityPrediction ") \
    .getOrCreate()


print("Spark initialized:", spark.version)


Spark initialized: 4.0.1


In [3]:
file_path = "/content/drive/MyDrive/finalyear_eng/spark/project/AirQualityUCI.csv"

data = spark.read.csv(file_path, header=True, inferSchema=True, sep=';')
data.show(5)
print(f"Total rows: {data.count()}")


+----------+--------+------+-----------+--------+--------+-------------+-------+------------+-------+------------+-----------+----+----+------+----+----+
|      Date|    Time|CO(GT)|PT08.S1(CO)|NMHC(GT)|C6H6(GT)|PT08.S2(NMHC)|NOx(GT)|PT08.S3(NOx)|NO2(GT)|PT08.S4(NO2)|PT08.S5(O3)|   T|  RH|    AH|_c15|_c16|
+----------+--------+------+-----------+--------+--------+-------------+-------+------------+-------+------------+-----------+----+----+------+----+----+
|10/03/2004|18.00.00|   2,6|       1360|     150|    11,9|         1046|    166|        1056|    113|        1692|       1268|13,6|48,9|0,7578|NULL|NULL|
|10/03/2004|19.00.00|     2|       1292|     112|     9,4|          955|    103|        1174|     92|        1559|        972|13,3|47,7|0,7255|NULL|NULL|
|10/03/2004|20.00.00|   2,2|       1402|      88|     9,0|          939|    131|        1140|    114|        1555|       1074|11,9|54,0|0,7502|NULL|NULL|
|10/03/2004|21.00.00|   2,2|       1376|      80|     9,2|          948|    

# **Data Cleaning**

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, regexp_replace
from pyspark.sql.types import DoubleType

# Load data
data = spark.read.option("header", True).option("sep", ";").csv("/content/drive/MyDrive/finalyear_eng/spark/project/AirQualityUCI.csv")

# Rename columns safely using toDF()
clean_cols = [c.replace('.', '_').replace('(', '').replace(')', '').replace(' ', '_') for c in data.columns]
data = data.toDF(*clean_cols)

# Drop unnamed or extra columns
data = data.select([c for c in data.columns if not c.startswith("_c")])

# Replace commas with dots and cast to DoubleType
for c in data.columns:
    if c not in ["Date", "Time"]:
        data = data.withColumn(c, regexp_replace(col(c), ",", "."))
        data = data.withColumn(c, col(c).cast(DoubleType()))

# Replace -200 with null (missing sensor values)
for c in data.columns:
    if c not in ["Date", "Time"]:
        data = data.withColumn(c, when(col(c) == -200, None).otherwise(col(c)))

# Drop rows with null target (CO_GT)
data = data.dropna(subset=["COGT"])

# Show schema and preview
data.printSchema()
data.show(5)


root
 |-- Date: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- COGT: double (nullable = true)
 |-- PT08_S1CO: double (nullable = true)
 |-- NMHCGT: double (nullable = true)
 |-- C6H6GT: double (nullable = true)
 |-- PT08_S2NMHC: double (nullable = true)
 |-- NOxGT: double (nullable = true)
 |-- PT08_S3NOx: double (nullable = true)
 |-- NO2GT: double (nullable = true)
 |-- PT08_S4NO2: double (nullable = true)
 |-- PT08_S5O3: double (nullable = true)
 |-- T: double (nullable = true)
 |-- RH: double (nullable = true)
 |-- AH: double (nullable = true)

+----------+--------+----+---------+------+------+-----------+-----+----------+-----+----------+---------+----+----+------+
|      Date|    Time|COGT|PT08_S1CO|NMHCGT|C6H6GT|PT08_S2NMHC|NOxGT|PT08_S3NOx|NO2GT|PT08_S4NO2|PT08_S5O3|   T|  RH|    AH|
+----------+--------+----+---------+------+------+-----------+-----+----------+-----+----------+---------+----+----+------+
|10/03/2004|18.00.00| 2.6|   1360.0| 150.0|  11.9|    

# **Date/Time and Feature Engineering**

In [5]:
from pyspark.sql.functions import concat_ws, to_timestamp, hour, dayofweek, month

data = data.withColumn("datetime", to_timestamp(concat_ws(' ', col("Date"), col("Time")), "dd/MM/yyyy HH.mm.ss"))
data = data.withColumn("hour", hour("datetime"))
data = data.withColumn("day_of_week", dayofweek("datetime"))
data = data.withColumn("month", month("datetime"))

# Drop original Date and Time
df = data.drop("Date", "Time")
df.show(5)


+----+---------+------+------+-----------+-----+----------+-----+----------+---------+----+----+------+-------------------+----+-----------+-----+
|COGT|PT08_S1CO|NMHCGT|C6H6GT|PT08_S2NMHC|NOxGT|PT08_S3NOx|NO2GT|PT08_S4NO2|PT08_S5O3|   T|  RH|    AH|           datetime|hour|day_of_week|month|
+----+---------+------+------+-----------+-----+----------+-----+----------+---------+----+----+------+-------------------+----+-----------+-----+
| 2.6|   1360.0| 150.0|  11.9|     1046.0|166.0|    1056.0|113.0|    1692.0|   1268.0|13.6|48.9|0.7578|2004-03-10 18:00:00|  18|          4|    3|
| 2.0|   1292.0| 112.0|   9.4|      955.0|103.0|    1174.0| 92.0|    1559.0|    972.0|13.3|47.7|0.7255|2004-03-10 19:00:00|  19|          4|    3|
| 2.2|   1402.0|  88.0|   9.0|      939.0|131.0|    1140.0|114.0|    1555.0|   1074.0|11.9|54.0|0.7502|2004-03-10 20:00:00|  20|          4|    3|
| 2.2|   1376.0|  80.0|   9.2|      948.0|172.0|    1092.0|122.0|    1584.0|   1203.0|11.0|60.0|0.7867|2004-03-10 21:0

# **Feature Engineering**

In [6]:
from pyspark.sql.functions import col

# Rename target column and clean column names (remove special chars)
for old_name in data.columns:
    new_name = old_name.replace("(", "").replace(")", "").replace(".", "").replace(" ", "")
    data = data.withColumnRenamed(old_name, new_name)

data.printSchema()


root
 |-- Date: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- COGT: double (nullable = true)
 |-- PT08_S1CO: double (nullable = true)
 |-- NMHCGT: double (nullable = true)
 |-- C6H6GT: double (nullable = true)
 |-- PT08_S2NMHC: double (nullable = true)
 |-- NOxGT: double (nullable = true)
 |-- PT08_S3NOx: double (nullable = true)
 |-- NO2GT: double (nullable = true)
 |-- PT08_S4NO2: double (nullable = true)
 |-- PT08_S5O3: double (nullable = true)
 |-- T: double (nullable = true)
 |-- RH: double (nullable = true)
 |-- AH: double (nullable = true)
 |-- datetime: timestamp (nullable = false)
 |-- hour: integer (nullable = false)
 |-- day_of_week: integer (nullable = false)
 |-- month: integer (nullable = false)



Drop Nulls & Ensure Numeric Columns

In [7]:
from pyspark.sql.functions import col

# Drop rows with nulls in important numeric columns
data = data.dropna(subset=["COGT"])

# Cast all numeric columns to DoubleType
from pyspark.sql.types import DoubleType

for c in data.columns:
    if c not in ["Date", "Time", "datetime"]:
        data = data.withColumn(c, col(c).cast(DoubleType()))


Filter Out Infinite or Corrupted Values

In [8]:
from pyspark.sql.functions import when

for c in data.columns:
    if c not in ["Date", "Time", "datetime"]:
        data = data.withColumn(c, when(col(c) < -100, None).otherwise(col(c)))

data = data.na.drop()


In [9]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml import Pipeline

feature_cols = [c for c in data.columns if c not in ["Date", "Time", "datetime", "COGT"]]

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")

pipeline = Pipeline(stages=[assembler, scaler])
data_prep = pipeline.fit(data).transform(data).select("scaled_features", "COGT")
data_prep.show(5)


+--------------------+----+
|     scaled_features|COGT|
+--------------------+----+
|[5.62408771114498...| 2.6|
|[5.34288332558773...| 2.0|
|[5.79777277281269...| 2.2|
|[5.69025344892316...| 2.2|
|[5.26017615336501...| 1.6|
+--------------------+----+
only showing top 5 rows


# **Train/Test Split**

In [10]:
from pyspark.ml.regression import LinearRegression

# Split data into training and test sets
train_data, test_data = data_prep.randomSplit([0.8, 0.2], seed=42)


# **Linear Regression Model**

In [11]:
# Initialize Linear Regression
lr = LinearRegression(featuresCol="scaled_features", labelCol="COGT")

# Fit the model
lr_model = lr.fit(train_data)

# Print coefficients and intercept
print("Coefficients:", lr_model.coefficients)
print("Intercept:", lr_model.intercept)


Coefficients: [0.3141766434969541,0.1681623429882212,1.034449126692926,-0.26555581935532585,0.41296035684808674,-0.017915457026001745,0.08745745414749614,-0.15703339454239415,-0.1824425378949758,-0.19599804124110645,-0.12682432204628835,0.12255673991564155,0.05802935178219222,-0.00821473401169162,0.004770619315899792]
Intercept: 0.7849807058583937


In [12]:
# Make predictions
predictions = lr_model.transform(test_data)
predictions.select("scaled_features", "COGT", "prediction").show(5)

# Evaluate using RMSE and R2
from pyspark.ml.evaluation import RegressionEvaluator

evaluator_rmse = RegressionEvaluator(labelCol="COGT", predictionCol="prediction", metricName="rmse")
evaluator_r2 = RegressionEvaluator(labelCol="COGT", predictionCol="prediction", metricName="r2")

rmse = evaluator_rmse.evaluate(predictions)
r2 = evaluator_r2.evaluate(predictions)

print(f"RMSE: {rmse}")
print(f"R2: {r2}")


+--------------------+----+-------------------+
|     scaled_features|COGT|         prediction|
+--------------------+----+-------------------+
|[3.24625650974177...| 0.3| 0.4321488986077659|
|[3.32482832335335...| 0.3| 0.3169125748816603|
|[3.33723439918676...| 0.7| 0.6404230389216417|
|[3.41580621279835...| 0.7|0.46390637636746657|
|[3.45302444029857...| 0.4| 0.6016112815132286|
+--------------------+----+-------------------+
only showing top 5 rows
RMSE: 0.2148549682626931
R2: 0.9754801569611979


In [None]:
import gradio as gr
import pandas as pd
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegressionModel

unfitted_pipeline = Pipeline(stages=[assembler, scaler])
fitted_pipeline_model = unfitted_pipeline.fit(data)

# Extract assembler and scaler models (now fitted transformers) from the fitted pipeline model
assembler_model = fitted_pipeline_model.stages[0]
scaler_model = fitted_pipeline_model.stages[1]

# --- Features used in training ---
MODEL_FEATURES = [
    "PT08_S1CO", "NMHCGT", "C6H6GT", "PT08_S2NMHC", "NOxGT",
    "PT08_S3NOx", "NO2GT", "PT08_S4NO2", "PT08_S5O3",
    "T", "RH", "AH", "hour", "day_of_week", "month"
]

# --- Explanations for dropdowns ---
EXPLAIN = {
    "PT08_S1CO": "CO-sensitive sensor response (related to CO concentration).",
    "NMHCGT": "Total non-methane hydrocarbons (NMHC).",
    "C6H6GT": "Benzene (C6H6) concentration in ¬µg/m¬≥.",
    "PT08_S2NMHC": "Sensor tuned to NMHC gases.",
    "NOxGT": "Nitrogen oxides concentration in ppb.",
    "PT08_S3NOx": "Sensor tuned to NOx gases.",
    "NO2GT": "Nitrogen dioxide concentration in ¬µg/m¬≥.",
    "PT08_S4NO2": "Sensor tuned to NO‚ÇÇ gases.",
    "PT08_S5O3": "Sensor tuned to ozone (O‚ÇÉ).",
    "T": "Temperature in ¬∞C.",
    "RH": "Relative humidity (%).",
    "AH": "Absolute humidity.",
    "hour": "Hour of day (0‚Äì23).",
    "day_of_week": "Day of week (1=Sunday ... 7=Saturday).",
    "month": "Month of the year (1‚Äì12)."
}

# --- Example dropdown values --- (from previous definitions)
CHOICES = {
    "PT08_S1CO": [500, 1000, 1500, 2000],
    "NMHCGT": [50, 100, 150, 200],
    "C6H6GT": [5, 10, 15, 20],
    "PT08_S2NMHC": [600, 800, 1000, 1200],
    "NOxGT": [80, 120, 160, 200],
    "PT08_S3NOx": [800, 1000, 1200, 1400],
    "NO2GT": [90, 110, 130, 150],
    "PT08_S4NO2": [1000, 1300, 1500, 1700],
    "PT08_S5O3": [800, 1000, 1200, 1400],
    "T": [5, 15, 25, 35],
    "RH": [30, 50, 70, 90],
    "AH": [0.5, 0.8, 1.0, 1.2],
    "hour": [6, 12, 18, 23],
    "day_of_week": [1, 3, 5, 7],
    "month": [1, 4, 7, 10]
}

ui_inputs = []
for feat in MODEL_FEATURES:
    label = f"{feat} ‚Äî {EXPLAIN.get(feat, '')}"
    choices = CHOICES.get(feat)
    default = choices[1] if choices else None
    ui_inputs.append(gr.Dropdown(choices, label=label, value=default))

def predict_with_conclusion(*vals):
    # Convert input to float (Dropdown values are strings by default)
    numeric_vals = [float(v) for v in vals]

    # Create Spark DataFrame
    input_data = pd.DataFrame([numeric_vals], columns=MODEL_FEATURES)
    input_df = spark.createDataFrame(input_data)

    # Assemble features
    vector_df = assembler_model.transform(input_df)

    # Scale features
    scaled_df = scaler_model.transform(vector_df)

    # Predict using trained Linear Regression (lr_model is globally available)
    prediction = lr_model.transform(scaled_df).collect()[0]["prediction"]

    # --- Interpretation --- (from previous definitions)
    if prediction < 1.0:
        condition = "Excellent"
        conclusion = (
            "Air condition: Excellent ‚Äî CO levels are very low.\n"
            "Health impact: Safe for all outdoor activities.\n"
            "Recommendation: No precautions needed."
        )
    elif prediction < 2.0:
        condition = "Moderate"
        conclusion = (
            "Air condition: Moderate ‚Äî acceptable CO concentration.\n"
            "Health impact: Sensitive people should stay cautious.\n"
            "Recommendation: Avoid heavy outdoor exercise if sensitive."
        )
    elif prediction < 4.0:
        condition = "Unhealthy for Sensitive Groups"
        conclusion = (
            "Air condition: Unhealthy for sensitive groups ‚Äî elevated CO.\n"
            "Health impact: Risk for those with heart/lung issues.\n"
            "Recommendation: Limit outdoor activities."
        )
    else:
        condition = "Poor"
        conclusion = (
            "Air condition: Poor ‚Äî high CO concentration.\n"
            "Health impact: Risky for all population.\n"
            "Recommendation: Stay indoors if possible."
        )

    # Final formatted text
    result_text = (
        f"Predicted CO(GT): {prediction:.2f} mg/m¬≥\n"
        f"Air Quality Category: {condition}\n\n"
        f"{conclusion}"
    )

    return result_text

# --- Launch Gradio --- (with a more specific title and description)
gr.Interface(
    fn=predict_with_conclusion,
    inputs=ui_inputs,
    outputs=gr.Textbox(label="Prediction and Air Quality Analysis", lines=10),
    title="üå´Ô∏è Air Quality Prediction (PySpark)",
    description="Predict CO(GT) concentration based on air sensor readings using a trained Linear Regression model. Choose from example values below."
).launch(share=True , debug=True)

Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://81db127c2ee155f0eb.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)
