## Creating a pyspark instance


In [1]:
from pyspark.sql import SparkSession

team = "team17"

warehouse = "project/hive/warehouse"

spark = SparkSession.builder\
        .appName("{} - spark ML".format(team))\
        .master("yarn")\
        .config("hive.metastore.uris", "thrift://hadoop-02.uni.innopolis.ru:9883")\
        .config("spark.sql.warehouse.dir", warehouse)\
        .config("spark.executor.instances", 4)\
        .config("spark.sql.avro.compression.codec", "snappy")\
        .enableHiveSupport()\
        .getOrCreate()

## Reading dataframes


In [2]:
weather = spark.read.format("avro").table('team17_projectdb.weather_conditions_updated')
air_quality = spark.read.format("avro").table('team17_projectdb.air_quality_region')
location_and_time = spark.read.format("avro").table('team17_projectdb.location_and_time_region')

## Changing last_updated from time to year, month, day, hour, minute


In [3]:
from pyspark.sql.functions import year, month, dayofmonth, hour, minute

location_and_time = location_and_time.select(
    "*",
    year("last_updated").alias("year"),
    month("last_updated").alias("month"),
    dayofmonth("last_updated").alias("day"),
    hour("last_updated").alias("hour"),
    minute("last_updated").alias("minute")
)

In [4]:
location_and_time.select('month').first()[0]

4

In [5]:
weather.printSchema()

root
 |-- location_id: integer (nullable = true)
 |-- temperature_celsius: string (nullable = true)
 |-- temperature_fahrenheit: string (nullable = true)
 |-- wind_mph: string (nullable = true)
 |-- wind_kph: string (nullable = true)
 |-- wind_degree: integer (nullable = true)
 |-- wind_direction: string (nullable = true)
 |-- pressure_mb: string (nullable = true)
 |-- pressure_in: string (nullable = true)
 |-- precip_mm: string (nullable = true)
 |-- precip_in: string (nullable = true)
 |-- humidity: integer (nullable = true)
 |-- cloud: integer (nullable = true)
 |-- feels_like_celsius: string (nullable = true)
 |-- feels_like_fahrenheit: string (nullable = true)
 |-- visibility_km: string (nullable = true)
 |-- visibility_miles: string (nullable = true)
 |-- uv_index: string (nullable = true)
 |-- gust_mph: string (nullable = true)
 |-- gust_kph: string (nullable = true)
 |-- sunrise: string (nullable = true)
 |-- sunset: string (nullable = true)
 |-- moonrise: string (nullable = tr

## Converting time features into sin/cos


In [40]:
import math
from pyspark.ml import Transformer
from pyspark.ml.param import Param, Params
from pyspark.sql.functions import sin, cos, col

class SinCosEncoder(Transformer, Params):
    inputCol = Param(Params._dummy(), "inputCol", "input column name")
    outputCols = Param(Params._dummy(), "outputCols", "output column names")

    def __init__(self, inputCol, outputCols):
        super(SinCosEncoder, self).__init__()
        self._setDefault(inputCol=inputCol, outputCols=outputCols)

    def getInputCol(self):
        return self.getOrDefault(self.inputCol)
    
    def getOutputCols(self):
        return self.getOrDefault(self.outputCols)

    def _transform(self, df):
        input_col = self.getInputCol()
        output_cols = self.getOutputCols()
        sin_col = f"{output_cols[0]}"
        cos_col = f"{output_cols[1]}"

        df = df.withColumn(sin_col, sin(2 * math.pi * col(input_col) / 12))
        df = df.withColumn(cos_col, cos(2 * math.pi * col(input_col) / 12))
        return df

class MonthEncoder(SinCosEncoder):
    def __init__(self, inputCol="month", outputCols=["monthsin", "monthcos"]):
        super().__init__(inputCol, outputCols)

class DayEncoder(SinCosEncoder):
    def __init__(self, inputCol="day", outputCols=["daysin", "daycos"]):
        super().__init__(inputCol, outputCols)

class HourEncoder(SinCosEncoder):
    def __init__(self, inputCol="hour", outputCols=["hoursin", "hourcos"]):
        super().__init__(inputCol, outputCols)

class MinuteEncoder(SinCosEncoder):
    def __init__(self, inputCol="minute", outputCols=["minutesin", "minutecos"]):
        super().__init__(inputCol, outputCols)

class SecondEncoder(SinCosEncoder):
    def __init__(self, inputCol="second", outputCols=["secondsins", "secondcos"]):
        super().__init__(inputCol, outputCols)

In [7]:
from pyspark.sql.functions import col, to_timestamp, expr
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer, OneHotEncoder

In [8]:
weather = weather.withColumn("temperature_celsius", col("temperature_celsius").cast("double")) \
                 .withColumn("temperature_fahrenheit", col("temperature_fahrenheit").cast("double"))\
                 .withColumn("wind_mph", col("wind_mph").cast("double"))\
                 .withColumn("wind_kph", col("wind_kph").cast("double"))\
                 .withColumn("pressure_mb", col("pressure_mb").cast("double"))\
                 .withColumn("pressure_in", col("pressure_in").cast("double"))\
                 .withColumn("precip_mm", col("precip_mm").cast("double"))\
                 .withColumn("precip_in", col("precip_in").cast("double"))\
                 .withColumn("feels_like_celsius", col("feels_like_celsius").cast("double"))\
                 .withColumn("feels_like_fahrenheit", col("feels_like_fahrenheit").cast("double"))\
                 .withColumn("visibility_km", col("visibility_km").cast("double"))\
                 .withColumn("visibility_miles", col("visibility_miles").cast("double"))\
                 .withColumn("uv_index", col("uv_index").cast("double"))\
                 .withColumn("gust_kph", col("gust_kph").cast("double"))\
                 .withColumn("gust_mph", col("gust_mph").cast("double"))

In [41]:
month_encoder = MonthEncoder()
day_encoder = DayEncoder()
hour_encoder = HourEncoder()
minute_encoder = MinuteEncoder()

## Converting coordinates


In [44]:
from pyspark.ml import Transformer
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
import numpy as np

class LatLonToECEF(Transformer, DefaultParamsReadable, DefaultParamsWritable):
    def _transform(self, df):
        # Earth's radius in kilometers approx 6371
        R = 6371
        df = df.withColumn("x", R * cos(col("latitude") * np.pi / 180) * cos(col("longitude") * np.pi / 180))
        df = df.withColumn("y", R * cos(col("latitude") * np.pi / 180) * sin(col("longitude") * np.pi / 180))
        df = df.withColumn("z", R * sin(col("latitude") * np.pi / 180))
        return df

ecef_encoder = LatLonToECEF()

## Joining tables


In [11]:
data = weather.join(air_quality, on="location_id", how="inner")
data = data.join(location_and_time, on="location_id", how="inner")

## Preprocessing columns


In [45]:
# We dropped temperature_fahrenheit, wind_mph, pressure_in, precip_in, feels_like_fahreheit, visibility_miles, gust_mph
# Because we already have the same features, just in metric format (kph instead of mph, celsius instead of fahrenheit, mm instead of in, and so on)
# also, wind degree and wind direction are redundant features
# city and region is not needed as we have latitude and longitude
# sun and moon features have no effect on the air quality because they are external factors, but moon_illumination has an effect
feature_cols = ["x", "y", "z",   # ECEF coordinates
                "monthsin", "monthcos", "daysin", "daycos", "hoursin", "hourcos", "minutesin", "minutecos",   # time encodings
                "temperature_celsius", "humidity", "wind_kph", "pressure_mb", "precip_mm", "cloud", "feels_like_celsius", "visibility_km", "uv_index", "gust_kph", "wind_degree", "moon_illumination"]

# Convert text categories to indices
condition_indexer = StringIndexer(inputCol="condition_text", outputCol="condition_index")
# Convert indices to one-hot encoded variables
condition_encoder = OneHotEncoder(inputCols=["condition_index"], outputCols=["condition_encoded"])

assembler = VectorAssembler(inputCols=feature_cols + ["condition_encoded"], outputCol="features")
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")

# Select one or more air quality measures as labels depending on your analysis approach
label_col = "air_quality_us_epa_index"

In [46]:
pipeline = Pipeline(stages=[month_encoder, day_encoder, hour_encoder, minute_encoder,
                            ecef_encoder, condition_indexer, condition_encoder, assembler, scaler])
model = pipeline.fit(data)
transformed_data = model.transform(data)

In [18]:
transformed_data.select("scaled_features").first()[0]

SparseVector(40, {0: 3.2733, 1: 24.4164, 2: 1.7444, 3: -0.6303, 4: 2.0372, 5: 0.7181, 6: -1.2306, 7: -1.477, 8: 0.7048, 10: 1.3824, 11: 5.148, 12: 3.8574, 13: 3.6236, 14: 186.9008, 16: 2.2142, 17: 4.7664, 18: 2.9182, 19: 1.2396, 20: 2.8293, 21: 3.1601, 22: 2.782, 24: 2.2565})

## Splitting data


In [14]:
train_data, test_data = transformed_data.randomSplit([0.7, 0.3], seed=42)
train_data.write.mode('overwrite').json("project/data/train")
test_data.write.mode('overwrite').json("project/data/test")

## Fitting model 1: RandomForestRegressor


In [25]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

# Define the model
rf_base = RandomForestRegressor(featuresCol='scaled_features', labelCol=label_col)

# Setup evaluation metric
rf_evaluator = RegressionEvaluator(labelCol=label_col, predictionCol="prediction")

# Fit the model
rf_base_model = rf_base.fit(train_data)

In [26]:
# Evaluation on the test set
predictions = rf_base_model.transform(test_data)

# Evaluate the best model and print out the result
rf_base_r2 = rf_evaluator.evaluate(predictions, {rf_evaluator.metricName: "r2"})
rf_base_rmse = rf_evaluator.evaluate(predictions, {rf_evaluator.metricName: "rmse"})
print("Test R^2: ", rf_base_r2)
print("Test RMSE: ", rf_base_rmse)

Test R^2:  0.5370130382709225
Test RMSE:  0.9953526130379097


# Add just the model, then do gridsearch on it

In [27]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

# Define the model
rf_tuned = RandomForestRegressor(featuresCol='scaled_features', labelCol=label_col)

# Define the grid of hyperparameters to test:
#  - numTrees: number of trees in the forest.
#  - maxDepth: maximum depth of each tree.
rf_paramGrid = ParamGridBuilder() \
    .addGrid(rf_tuned.numTrees, [20, 30]) \
    .addGrid(rf_tuned.maxDepth, [5, 10]) \
    .build()

# Setup evaluation metric
rf_tuned_evaluator = RegressionEvaluator(labelCol=label_col, predictionCol="prediction")

# Cross-validator
rf_cv = CrossValidator(estimator=rf_tuned,
                    estimatorParamMaps=rf_paramGrid,
                    evaluator=rf_tuned_evaluator,
                    numFolds=3)

# Fit the model
rf_cvModel = rf_cv.fit(train_data)

# Choose the best model
rf_bestModel = rf_cvModel.bestModel

# Save the best model
rf_bestModel.write().overwrite().save("project/models/model1")

In [28]:
bestParams = rf_bestModel.extractParamMap()
for param, value in bestParams.items():
    if param.name == "numTrees" or param.name == "maxDepth":
      print(f"{param.name}: {value}")

maxDepth: 10
numTrees: 30


In [29]:
# Evaluation on the test set
rf_tuned_predictions = rf_bestModel.transform(test_data)
rf_tuned_predictions.select(label_col, "prediction").coalesce(1).write.mode('overwrite').option("header", "true").csv("project/output/model1_predictions.csv")

# Evaluate the best model and print out the result
rf_tuned_r2 = rf_tuned_evaluator.evaluate(rf_tuned_predictions, {rf_tuned_evaluator.metricName: "r2"})
rf_tuned_rmse = rf_tuned_evaluator.evaluate(rf_tuned_predictions, {rf_tuned_evaluator.metricName: "rmse"})
print("Test R^2: ", rf_tuned_r2)
print("Test RMSE: ", rf_tuned_rmse)

Test R^2:  0.7138348870917375
Test RMSE:  0.7825298696415471


# Add just the model, then do gridsearch on it

In [30]:
from pyspark.ml.regression import GBTRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Initialize the GBTRegressor
gbt_base = GBTRegressor(featuresCol='scaled_features', labelCol=label_col)

gbt_evaluator = RegressionEvaluator(labelCol=label_col, predictionCol="prediction")

# Fit the model
gbt_base_model = gbt_base.fit(train_data)

In [31]:
# Evaluation on the test set
gbt_base_predictions = gbt_base_model.transform(test_data)

# Evaluate the best model and print out the result
gbt_base_r2 = gbt_evaluator.evaluate(gbt_base_predictions, {gbt_evaluator.metricName: "r2"})
gbt_base_rmse = gbt_evaluator.evaluate(gbt_base_predictions, {gbt_evaluator.metricName: "rmse"})
print("Test R^2: ", gbt_base_r2)
print("Test RMSE: ", gbt_base_rmse)

Test R^2:  0.6554341181228047
Test RMSE:  0.858674798100258


In [32]:
from pyspark.ml.regression import GBTRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Initialize the GBTRegressor
gbt_tuned = GBTRegressor(featuresCol='scaled_features', labelCol=label_col)

gbt_tuned_evaluator = RegressionEvaluator(labelCol=label_col, predictionCol="prediction")

# Define the parameter grid for tuning
gbt_paramGrid = ParamGridBuilder() \
    .addGrid(gbt_tuned.maxDepth, [5, 10]) \
    .addGrid(gbt_tuned.maxIter, [20, 30]) \
    .build()

# Setup CrossValidator for model tuning
gbt_crossval = CrossValidator(estimator=gbt_tuned,
                          estimatorParamMaps=gbt_paramGrid,
                          evaluator=gbt_tuned_evaluator,
                          numFolds=3)  # Use 3+ folds in practice

# Fit the model
gbt_cvModel = gbt_crossval.fit(train_data)

gbt_bestModel = gbt_cvModel.bestModel
gbt_bestModel.write().overwrite().save("project/models/model2")

In [34]:
gbt_bestParams = gbt_bestModel.extractParamMap()
for param, value in gbt_bestParams.items():
    if param.name == "maxIter" or param.name == "maxDepth":
      print(f"{param.name}: {value}")

maxDepth: 10
maxIter: 30


In [35]:
# Evaluation on the test set
gbt_tuned_predictions = gbt_bestModel.transform(test_data)
gbt_tuned_predictions.select(label_col, "prediction").coalesce(1).write.mode('overwrite').option("header", "true").csv("project/output/model2_predictions.csv")

# Evaluate the best model and print out the result
gbt_tuned_r2 = gbt_tuned_evaluator.evaluate(gbt_tuned_predictions, {gbt_tuned_evaluator.metricName: "r2"})
gbt_tuned_rmse = gbt_tuned_evaluator.evaluate(gbt_tuned_predictions, {gbt_tuned_evaluator.metricName: "rmse"})
print("Test R^2: ", gbt_tuned_r2)
print("Test RMSE: ", gbt_tuned_rmse)

Test R^2:  0.7693112052121618
Test RMSE:  0.7025962548893449


## Create a comparison dataframe

In [38]:
from pyspark.sql import Row

evaluation_data = [
    Row(model_name="RandomForestRegressor", r2_metric=rf_tuned_r2, rmse_metric=rf_tuned_rmse),
    Row(model_name="GBTRegressor", r2_metric=gbt_tuned_r2, rmse_metric=gbt_tuned_rmse)
]
evaluation_df = spark.createDataFrame(evaluation_data)

# Show DataFrame contents
evaluation_df.show()

+--------------------+------------------+------------------+
|          model_name|         r2_metric|       rmse_metric|
+--------------------+------------------+------------------+
|RandomForestRegre...|0.7138348870917375|0.7825298696415471|
|        GBTRegressor|0.7693112052121618|0.7025962548893449|
+--------------------+------------------+------------------+



In [39]:
evaluation_df.coalesce(1).write.mode("overwrite").option("header", "true").csv('output/evaluation/evaluation.csv')