In [None]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pyspark.sql.types as T

# Create a SparkSession
spark = SparkSession.builder.appName("HCHO_Analysis").getOrCreate()

# Load data into Spark DataFrames
col_mat_nuw = spark.read.csv("col_mat_nuw_output.csv", header=True, inferSchema=True)
mon_kur_jaf = spark.read.csv("mon_kur_jaf_output.csv", header=True, inferSchema=True)
kan = spark.read.csv("kan_output.csv", header=True, inferSchema=True)

# Combine DataFrames
data = col_mat_nuw.union(mon_kur_jaf).union(kan)

# Data Preprocessing
cleaned_data = data.dropna(subset=["HCHO reading"]) \
                    .withColumn("HCHO reading", col("HCHO reading").cast(T.DoubleType())) \
                    .filter((col("HCHO reading") >= quantile_approx("HCHO reading", 0.25, [0.5]) - 1.5 * qr) &
                            (col("HCHO reading") <= quantile_approx("HCHO reading", 0.75, [0.5]) + 1.5 * qr))

# Descriptive statistics
summary_stats = cleaned_data.groupBy("Location").agg(
    avg("HCHO reading").alias("mean"),
    stddev("HCHO reading").alias("std_dev"),
    approx_count_distinct("HCHO reading").alias("count")
)
summary_stats.show()

# Visualize data distribution
import matplotlib.pyplot as plt
import seaborn as sns

pdf = cleaned_data.select("HCHO reading", "Location").toPandas()
sns.histplot(data=pdf, x="HCHO reading", hue="Location", kde=True)
plt.show()

# Spatio-Temporal Analysis
# ...

# Machine Learning
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import GBTRegressor

assembler = VectorAssembler(inputCols=["Temperature", "Precipitation", ...], outputCol="features")
data_ml = assembler.transform(merged_data)

train_data, test_data = data_ml.randomSplit([0.8, 0.2], seed=42)

gbt = GBTRegressor(labelCol="HCHO reading", featuresCol="features")
model = gbt.fit(train_data)

# Evaluate model performance
predictions = model.transform(test_data)
evaluator = RegressionEvaluator(labelCol="HCHO reading", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error: {rmse:.2f}")

# Communication and Insights
# ...