INITIAL SET UP

Import necessary modules

In [1]:
from pyspark.sql import functions as F  # Built-in SQL-style functions for DataFrame operations
from pyspark.sql.types import StringType, IntegerType, FloatType, ArrayType  # Data types for defining or casting schema
from pyspark.ml.feature import VectorAssembler, ChiSqSelector, MinMaxScaler  # Tools for feature engineering and scaling
from pyspark.ml.linalg import Vectors, DenseVector  # Linear algebra utilities for vectors and matrices
from pyspark.sql.functions import array, lit, when, col, udf  # Functions for column transformations and UDF creation
from pyspark.ml import Pipeline  # Utility to chain multiple ML stages into a single workflow
from pyspark.ml.stat import Correlation  # Tool to compute feature correlations
from pyspark.ml.regression import LinearRegression  # Linear regression model for predictive analysis

import os  # Operating system operations (e.g., setting environment variables)
import shutil  # File and directory operations (e.g., copy, move, delete)
import pandas as pd  # Pandas library for data manipulation and analysis (used to save the final preprocessed file)


Start PySpark session

In [2]:
# initialize a PySpark SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("ColabSpark").getOrCreate()

25/01/13 19:33:32 WARN Utils: Your hostname, student-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
25/01/13 19:33:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/13 19:33:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Import the **dataset**

In [4]:
# Define the folder path containing CSV files
folder_path = "./data"

# Read all CSV files in the folder
spark_df = spark.read.csv(f"{folder_path}/*.csv", header=True, inferSchema=True)

# Show the data
spark_df.show()


25/01/13 19:33:56 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|1987|   10|        14|        3|    741|       730|    912|       849|           PS|     1451

**Drop the forbiden attributes**, since they provide no useful information for the model:

*   ArrTime
*   ActualElapsedTime
*   AirTime
*   TaxiIn
*   Diverted
*   CarrierDelay
*   WeatherDelay
*   NASDelay
*   SecurityDelay
*   LateAircraftDelay











In [5]:
# define columns to drop
columns_to_drop = [
    "ArrTime",
    "ActualElapsedTime",
    "AirTime",
    "TaxiIn",
    "Diverted",
    "CarrierDelay",
    "WeatherDelay",
    "NASDelay",
    "SecurityDelay",
    "LateAircraftDelay"
]

# drop these columns
spark_df = spark_df.drop(*columns_to_drop)

# S=show updated schema
print("\nAfter dropping attributes:")
spark_df.printSchema()

# print a sample of the data
spark_df.show(20)


After dropping attributes:
root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)

+----+-----+----------+---------+-------+----------+----------+-------------+---------+-------+--------------+--------+--------+------+----+--------+-------+---------+----------------+


Drop columns TailNum, TaxiOut and CancellationCode because of missing values and Year because it has the same value for every observation

In [6]:
# drop the specified columns and reassign to the same DataFrame
spark_df = spark_df.drop("TailNum", "TaxiOut", "CancellationCode", "Year", "FlightNum", "Cancelled")

# verify the changes
spark_df.printSchema()


root
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: string (nullable = true)



Transform **from double** to **integer** because it creates issues when calculating min/max values (transformation done **only for this purpose**, all numerical attributes to be transformed to float later)

In [7]:
# columns to cast to IntegerType
columns_to_cast = ["ArrDelay", "DepTime", "DepDelay", "Distance"]

# cast the columns
for column in columns_to_cast:
    spark_df = spark_df.withColumn(column, F.col(column).cast(IntegerType()))

Count **number of missing values** for each attribute

In [8]:
# count missing values (NA, NaN, null) for each column
na_counts = spark_df.select([
    F.count(F.when(F.col(c).isNull() | F.isnan(c) | F.col(c).eqNullSafe("NA"), 1)).alias(c)
    for c in spark_df.columns
])

# show the number of missing values for each attribute
na_counts.show()




+-----+----------+---------+-------+----------+----------+-------------+--------------+--------+--------+------+----+--------+
|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|CRSArrTime|UniqueCarrier|CRSElapsedTime|ArrDelay|DepDelay|Origin|Dest|Distance|
+-----+----------+---------+-------+----------+----------+-------------+--------------+--------+--------+------+----+--------+
|    0|         0|        0|  19685|         0|         0|            0|             0|   23500|   19685|     0|   0|    1015|
+-----+----------+---------+-------+----------+----------+-------------+--------------+--------+--------+------+----+--------+



                                                                                

**Drop** the observations **containing NA / Nan or null values** (in DepTime, ArrDelay, DepDelay, Distance)

In [9]:
# define the attributes that have missing values
columns_to_check = ['DepTime', 'ArrDelay', 'DepDelay', 'Distance']

# filter out rows with missing values in the specified columns
spark_df = spark_df.filter(
    ~F.col('DepTime').isNull() & # check for "Null" values
    ~F.isnan('DepTime') & # check fot "NaN" values
    ~F.col('DepTime').eqNullSafe("NA") &  # Check for "NA" as well

    ~F.col('ArrDelay').isNull() &
    ~F.isnan('ArrDelay') &
    ~F.col('ArrDelay').eqNullSafe("NA") &

    ~F.col('DepDelay').isNull() &
    ~F.isnan('DepDelay') &
    ~F.col('DepDelay').eqNullSafe("NA") &

    ~F.col('Distance').isNull() &
    ~F.isnan('Distance') &
    ~F.col('Distance').eqNullSafe("NA")
)

# repeat verification
na_counts = spark_df.select([
    F.count(F.when(F.col(c).isNull() | F.isnan(c) | F.col(c).eqNullSafe("NA"), 1)).alias(c)
    for c in spark_df.columns
])

# show the number of missing values for each attribute after filtering them
na_counts.show()




+-----+----------+---------+-------+----------+----------+-------------+--------------+--------+--------+------+----+--------+
|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|CRSArrTime|UniqueCarrier|CRSElapsedTime|ArrDelay|DepDelay|Origin|Dest|Distance|
+-----+----------+---------+-------+----------+----------+-------------+--------------+--------+--------+------+----+--------+
|    0|         0|        0|      0|         0|         0|            0|             0|       0|       0|     0|   0|       0|
+-----+----------+---------+-------+----------+----------+-------------+--------------+--------+--------+------+----+--------+



                                                                                

Check for **negative values**

In [10]:
# iterate through all columns and check for negative values
negative_values_count = 0
for column in spark_df.columns:
    # filter for negative values in each column
    negative_count_in_column = spark_df.filter(F.col(column) < 0).count()
    negative_values_count += negative_count_in_column
    if negative_count_in_column > 0:
        print(f"Column '{column}' contains {negative_count_in_column} negative values.")

# print the total number of negative values in the entire dataset
print(f"Total number of negative values in the dataset: {negative_values_count}")


                                                                                

Column 'CRSElapsedTime' contains 2 negative values.


                                                                                

Column 'ArrDelay' contains 419377 negative values.


                                                                                

Column 'DepDelay' contains 199262 negative values.




Total number of negative values in the dataset: 618641


                                                                                

Select only the observations containing positive values to work with

In [11]:
spark_df = spark_df.filter(
    (F.col('CRSElapsedTime') >= 0) &
    (F.col('DepDelay') >= 0) &
    (F.col('ArrDelay') >= 0)
)

spark_df.select("ArrDelay", "DepDelay", "CRSElapsedTime").describe().show()



+-------+------------------+------------------+-----------------+
|summary|          ArrDelay|          DepDelay|   CRSElapsedTime|
+-------+------------------+------------------+-----------------+
|  count|            787864|            787864|           787864|
|   mean|18.510105805062803|13.072652132855417|98.62637841048709|
| stddev|27.389488534449285| 26.91391999649335|59.77147241542005|
|    min|                 0|                 0|                0|
|    max|              1033|              1439|             1440|
+-------+------------------+------------------+-----------------+



                                                                                

**New Variables**

1. Carrier_Avrg_Delay - the average delay for each of the carrier.

Instead of having a code (*string value*), we will now have a *numerical* one that provides meaningful information.

**Replace** (old) **UniqueCarrier** with (new) **Carrier_Avrg_Delay**.

In [12]:
# create new attribute Carrier_Avrg_Delay

# calculate the average (using the mean) delay for each carrier
carrier_avg_delay = spark_df.groupBy("UniqueCarrier").agg({"ArrDelay": "mean"}) \
    .withColumnRenamed("avg(ArrDelay)", "Carrier_Avrg_Delay")

# join the average delay back to the main DataFrame
spark_df = spark_df.join(carrier_avg_delay, on="UniqueCarrier", how="left")

# drop the original UniqueCarrier column
spark_df = spark_df.drop("UniqueCarrier")


2. **Origin_Avrg_Delay** - the average delay at the origin airport.
**Replace** (old) Origin with (new) **Origin_Avrg_Delay**
Instead of the IATA code (string) -> will have the average delay for the airport


In [13]:
# create new attribute Origin_Avrg_Delay

# calculate the average delay for each origin airport
origin_avg_delay = spark_df.groupBy("Origin").agg({"ArrDelay": "mean"}) \
    .withColumnRenamed("avg(ArrDelay)", "Origin_Avrg_Delay")

# join the average delay back to the main DataFrame
spark_df = spark_df.join(origin_avg_delay, on="Origin", how="left")

# drop the original Origin column
spark_df = spark_df.drop("Origin")

3. **Dest_Avrg_Delay** - the average delay at the destination airport.

Replace (old) Dest with (new) Dest_Avrg_Delay

In [14]:
# create new attribute Dest_Avrg_Delay

# calculate the average delay for each destination airport
dest_avg_delay = spark_df.groupBy("Dest").agg({"ArrDelay": "mean"}) \
    .withColumnRenamed("avg(ArrDelay)", "Dest_Avrg_Delay")

# join the average delay back to the main DataFrame
spark_df = spark_df.join(dest_avg_delay, on="Dest", how="left")

# drop the original Dest column
spark_df = spark_df.drop("Dest")

Transform **all attributes to float** (all are numerical - either integer or double

In [15]:
# get all column names
all_columns = spark_df.columns

# cast all columns to FloatType
for column in all_columns:
    spark_df = spark_df.withColumn(column, F.col(column).cast(FloatType()))

# verify the changes
spark_df.printSchema()
spark_df.show()

root
 |-- Month: float (nullable = true)
 |-- DayofMonth: float (nullable = true)
 |-- DayOfWeek: float (nullable = true)
 |-- DepTime: float (nullable = true)
 |-- CRSDepTime: float (nullable = true)
 |-- CRSArrTime: float (nullable = true)
 |-- CRSElapsedTime: float (nullable = true)
 |-- ArrDelay: float (nullable = true)
 |-- DepDelay: float (nullable = true)
 |-- Distance: float (nullable = true)
 |-- Carrier_Avrg_Delay: float (nullable = true)
 |-- Origin_Avrg_Delay: float (nullable = true)
 |-- Dest_Avrg_Delay: float (nullable = true)



                                                                                

+-----+----------+---------+-------+----------+----------+--------------+--------+--------+--------+------------------+-----------------+---------------+
|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|CRSArrTime|CRSElapsedTime|ArrDelay|DepDelay|Distance|Carrier_Avrg_Delay|Origin_Avrg_Delay|Dest_Avrg_Delay|
+-----+----------+---------+-------+----------+----------+--------------+--------+--------+--------+------------------+-----------------+---------------+
| 10.0|       1.0|      4.0|  936.0|     915.0|    1001.0|          46.0|    34.0|    21.0|   192.0|          25.15036|        27.897373|       22.10293|
| 10.0|       2.0|      5.0|  918.0|     915.0|    1001.0|          46.0|    16.0|     3.0|   192.0|          25.15036|        27.897373|       22.10293|
| 10.0|       3.0|      6.0|  928.0|     915.0|    1001.0|          46.0|    36.0|    13.0|   192.0|          25.15036|        27.897373|       22.10293|
| 10.0|       5.0|      1.0| 1042.0|     915.0|    1001.0|          46.0|   

Perfom **stratified undersampling**

In [16]:
# define delay thresholds for small, medium, and high delay

# small delay is under 30 min
# medium delay is between 30 and 60 min
# high delay is over 60 min
small_delay_threshold = 30
medium_delay_threshold = 60

# create a temporary new column 'DelayCategory' based on ArrDelay
spark_df = spark_df.withColumn(
    "DelayCategory",
    when(col("ArrDelay") < small_delay_threshold, lit("Small"))
    .when((col("ArrDelay") >= small_delay_threshold) & (col("ArrDelay") < medium_delay_threshold), lit("Medium"))
    .otherwise(lit("High"))
)

# calculate the fraction to sample for each category of delay
fractions = {
    "Small": 3500 / spark_df.filter(col("DelayCategory") == "Small").count(),
    "Medium": 3500 / spark_df.filter(col("DelayCategory") == "Medium").count(),
    "High": 3500 / spark_df.filter(col("DelayCategory") == "High").count()
}

# perform stratified sampling
stratified_sample = spark_df.sampleBy(
    "DelayCategory", fractions, seed=42
)

# show the distribution of DelayCategory in the stratified sample
stratified_sample.groupBy("DelayCategory").count().show()

[Stage 79:>                                                         (0 + 6) / 6]

+-------------+-----+
|DelayCategory|count|
+-------------+-----+
|         High| 3589|
|       Medium| 3541|
|        Small| 3508|
+-------------+-----+



                                                                                

Drop the previously made (only for the purspose of stratified undersampling) 'DelayCategory' column

In [17]:
# drop 'DelayCategory' column after stratified sampling
stratified_sample = stratified_sample.drop("DelayCategory")

# verify that the column has been removed
stratified_sample.show(5)

                                                                                

+-----+----------+---------+-------+----------+----------+--------------+--------+--------+--------+------------------+-----------------+---------------+
|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|CRSArrTime|CRSElapsedTime|ArrDelay|DepDelay|Distance|Carrier_Avrg_Delay|Origin_Avrg_Delay|Dest_Avrg_Delay|
+-----+----------+---------+-------+----------+----------+--------------+--------+--------+--------+------------------+-----------------+---------------+
| 10.0|      24.0|      6.0|  939.0|     920.0|    1047.0|         147.0|    38.0|    19.0|   954.0|          25.79126|        21.582087|      19.259954|
| 10.0|      17.0|      6.0| 1054.0|     812.0|     902.0|         110.0|   160.0|   162.0|   651.0|          15.92184|        17.198046|      19.259954|
| 10.0|      15.0|      4.0| 1341.0|    1338.0|    1516.0|         158.0|     3.0|     3.0|   998.0|          15.92184|        18.436413|      19.259954|
| 10.0|      26.0|      1.0| 1229.0|    1220.0|    1326.0|          66.0|   

**Min-Max Scaling**

In [18]:
# Step 1: Define the columns to scale (all of them)
columns_to_scale = ['Month', 'DayofMonth', 'DayOfWeek', 'DepTime', 'CRSDepTime', 'CRSArrTime', 'CRSElapsedTime',
                    'DepDelay', 'Distance', 'Carrier_Avrg_Delay',
                    'Origin_Avrg_Delay', 'Dest_Avrg_Delay']

# Step 2: Assemble the feature columns into a single vector column
assembler = VectorAssembler(inputCols=columns_to_scale, outputCol="features")

# Step 3: Apply MinMaxScaler
scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")

# Step 4: Chain them into a pipeline
pipeline = Pipeline(stages=[assembler, scaler])

# Step 5: Fit and transform the stratified_sample DataFrame
scaled_stratified_sample = pipeline.fit(stratified_sample).transform(stratified_sample)

# Step 6: Define UDF to extract values from the DenseVector
def extract_vector_values(v):
    if isinstance(v, DenseVector):
        return v.toArray().tolist()  # Convert DenseVector to a list
    return []

# Register UDF
extract_vector_values_udf = udf(extract_vector_values, ArrayType(FloatType()))

# Step 7: Apply the UDF to extract scaled values
scaled_stratified_sample = scaled_stratified_sample.withColumn("scaled_values", extract_vector_values_udf(col("scaled_features")))

# Step 8: Extract scaled values into individual columns, replacing the original ones
scaled_stratified_sample = scaled_stratified_sample.select(
    *[col("scaled_values")[i].alias(columns_to_scale[i]) for i in range(len(columns_to_scale))],
    col("ArrDelay")  # Include ArrDelay as is, unscaled
)

# Step 9: Show the result
scaled_stratified_sample.show()

[Stage 130:>                                                        (0 + 1) / 1]

+-----+-----------+----------+----------+----------+----------+--------------+------------+-----------+------------------+-----------------+---------------+--------+
|Month| DayofMonth| DayOfWeek|   DepTime|CRSDepTime|CRSArrTime|CRSElapsedTime|    DepDelay|   Distance|Carrier_Avrg_Delay|Origin_Avrg_Delay|Dest_Avrg_Delay|ArrDelay|
+-----+-----------+----------+----------+----------+----------+--------------+------------+-----------+------------------+-----------------+---------------+--------+
|  0.0| 0.76666665| 0.8333333|0.39099625| 0.3887001|  0.436015|    0.23333333|  0.01439394| 0.18966211|         0.7476924|        0.2721508|     0.34190214|    38.0|
|  0.0| 0.53333336| 0.8333333| 0.4389329|0.34282073|0.37557316|    0.17460318| 0.122727275| 0.12872083|        0.13989556|       0.18591262|     0.34190214|   160.0|
|  0.0| 0.46666667|       0.5| 0.5585661| 0.5662702| 0.6315131|    0.25079367|0.0022727272| 0.19851166|        0.13989556|       0.21027248|     0.34190214|     3.0|
|  0

                                                                                

Save the preprocessed file to Drive

In [19]:
# Step 1: Convert Spark DataFrame to Pandas DataFrame
pandas_df = scaled_stratified_sample.toPandas()

# Step 2: Define the path in Google Drive for the desired csv path
drive_path = "./preprocessedData/preprocessed.csv"

# Step 3: Save the Pandas DataFrame as a single CSV file in Google Drive
pandas_df.to_csv(drive_path, index=False)


                                                                                

**Multivariate** feature selection using **Linear Regression** model

In [20]:
# 1. define features (all of them except target class)and target
features_col = [
    "Month", "DayofMonth", "DayOfWeek", "DepTime", "CRSDepTime", "CRSArrTime",
    "CRSElapsedTime", "DepDelay", "Distance", "Carrier_Avrg_Delay",
    "Origin_Avrg_Delay", "Dest_Avrg_Delay"
]
target_col = "ArrDelay"

# 2. Prepare the data
# 2.1 combine features into a single vector
assembler = VectorAssembler(inputCols=features_col, outputCol="features")
assembled_data = assembler.transform(stratified_sample)

# 2.2 drop rows with missing values to ensure clean data for feature selection
clean_data = assembled_data.select("features", target_col).na.drop()

# 3. Compute Correlation Matrix
# Use Correlation to check feature correlations with the target
correlation_matrix = Correlation.corr(clean_data, "features").head()[0].toArray()
correlations = {features_col[i]: correlation_matrix[-1, i] for i in range(len(features_col))}

# Print correlations with the target
print("Feature Correlations with Target:")
for feature, corr in correlations.items():
    print(f"{feature}: {corr:.4f}")

# 5. Feature Importance using Linear Regression
# Train a Linear Regression model to get feature importances
lr = LinearRegression(featuresCol="features", labelCol=target_col)
lr_model = lr.fit(clean_data)

# Extract feature importances from the linear regression coefficients
coefficients = lr_model.coefficients.toArray()
importance = {features_col[i]: abs(coefficients[i]) for i in range(len(features_col))}

# Sort features by importance
sorted_importance = sorted(importance.items(), key=lambda x: x[1], reverse=True)

print("Feature Importance:")
for feature, coeff in sorted_importance:
    print(f"{feature}: {coeff:.4f}")

# 6. Select Top Features
# Select top N features (e.g., top 7 features) based on importance or correlation
top_features = [feature for feature, _ in sorted_importance[:7]]

# Create a new DataFrame with only selected features and the target
selected_features_df = assembled_data.select(target_col, *top_features)
print("Selected Features:", top_features)


                                                                                

Feature Correlations with Target:
Month: 0.0182
DayofMonth: -0.0222
DayOfWeek: 0.0070
DepTime: 0.0017
CRSDepTime: -0.0173
CRSArrTime: -0.0063
CRSElapsedTime: 0.2754
DepDelay: 0.1594
Distance: 0.2612
Carrier_Avrg_Delay: 0.4289
Origin_Avrg_Delay: 0.2813
Dest_Avrg_Delay: 1.0000


25/01/13 19:35:23 WARN Instrumentation: [a6713447] regParam is zero, which might cause numerical instability and overfitting.

Feature Importance:
Month: 2.2900
DepDelay: 0.8601
Dest_Avrg_Delay: 0.6006
DayOfWeek: 0.1382
DayofMonth: 0.0404
CRSElapsedTime: 0.0303
Carrier_Avrg_Delay: 0.0118
Origin_Avrg_Delay: 0.0046
CRSDepTime: 0.0017
CRSArrTime: 0.0010
DepTime: 0.0005
Distance: 0.0004
Selected Features: ['Month', 'DepDelay', 'Dest_Avrg_Delay', 'DayOfWeek', 'DayofMonth', 'CRSElapsedTime', 'Carrier_Avrg_Delay']


                                                                                

**Min-Max Scaling** for the **selected features** on multivariate

In [21]:
# Step 1: Assemble the selected feature columns into a single vector column
assembler = VectorAssembler(inputCols=[feature for feature in top_features if feature != target_col], outputCol="features")

# Step 2: Apply MinMaxScaler to scale the feature vector
scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")

# Step 3: Chain the steps into a pipeline
pipeline = Pipeline(stages=[assembler, scaler])

# Step 4: Fit and transform the DataFrame
scaled_selected_features_df = pipeline.fit(selected_features_df).transform(selected_features_df)

# Step 5: Define a UDF to extract values from the DenseVector
def extract_vector_values(v):
    if isinstance(v, DenseVector):
        return v.toArray().tolist()  # Convert DenseVector to a list
    return []

# Register the UDF
extract_vector_values_udf = udf(extract_vector_values, ArrayType(FloatType()))

# Step 6: Apply the UDF to extract scaled values
scaled_selected_features_df = scaled_selected_features_df.withColumn("scaled_values", extract_vector_values_udf(col("scaled_features")))

# Step 7: Extract scaled values into individual columns, replacing the original feature columns
scaled_selected_features_df = scaled_selected_features_df.select(
    target_col,  # Include the target variable unscaled
    *[col("scaled_values")[i].alias(feature) for i, feature in enumerate(top_features) if feature != target_col]
)

# Step 8: Show the result
scaled_selected_features_df.show()


                                                                                

+--------+-----+------------+---------------+----------+-----------+--------------+------------------+
|ArrDelay|Month|    DepDelay|Dest_Avrg_Delay| DayOfWeek| DayofMonth|CRSElapsedTime|Carrier_Avrg_Delay|
+--------+-----+------------+---------------+----------+-----------+--------------+------------------+
|    38.0|  0.0|  0.01439394|     0.34190214| 0.8333333| 0.76666665|    0.23333333|         0.7476924|
|   160.0|  0.0| 0.122727275|     0.34190214| 0.8333333| 0.53333336|    0.17460318|        0.13989556|
|     3.0|  0.0|0.0022727272|     0.34190214|       0.5| 0.46666667|    0.25079367|        0.13989556|
|     9.0|  0.0| 0.006818182|     0.34190214|       0.0|  0.8333333|   0.104761906|        0.17544873|
|    34.0|  0.0| 0.012121212|     0.34190214|       1.0| 0.56666666|   0.104761906|        0.17544873|
|   105.0|  0.0|  0.06439394|     0.34190214|       0.0|  0.8333333|     0.0952381|       0.102999195|
|    63.0|  0.0| 0.049242426|     0.34190214|       0.5|        0.7|     

Save the preprocessed (multivariate feature selected) file to Drive

In [22]:
# Step 1: Convert Spark DataFrame to Pandas DataFrame
pandas_df = scaled_selected_features_df.toPandas()

# Step 2: Define the path in Google Drive
drive_path = "./preprocessedData/multivariate.csv"

# Step 3: Save the Pandas DataFrame as a single CSV file in Google Drive
pandas_df.to_csv(drive_path, index=False)


                                                                                

Starting with the training process without the variable Selection

In [23]:
from pyspark.ml.feature import VectorAssembler

# Create a VectorAssembler
feature = ["ArrDelay"]
inputCols = ["Month", "DayofMonth", "DayOfWeek", "DepTime","CRSDepTime","CRSArrTime","CRSElapsedTime","DepDelay",
    "Distance", "Carrier_Avrg_Delay", "Origin_Avrg_Delay", "Dest_Avrg_Delay"]



from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

#Renamed variables for more significance during training
allVariables_df = scaled_stratified_sample



#The data is randomply splited into a 70% for training and 30% for the final evaluation of the model
split = allVariables_df.randomSplit([0.7, 0.3])
training = split[0]
test = split[1]

# The columns that will be used to predict are specified
assembler = VectorAssembler(inputCols=inputCols, outputCol="features")

In [24]:
# Create a Linear Regression model
lr = LinearRegression(featuresCol="features", labelCol="ArrDelay")

# The different values of the Hyperparameters that the model will be trained with
grid = ParamGridBuilder().addGrid(lr.elasticNetParam, [0,0.2,0.8,1]).addGrid(lr.regParam, [ 0.05,0.1,0.2]).addGrid(lr.maxIter, [10,100,200]).build()

# An evaluator that will measure the performance of the model predicting the target variable is build
evaluator = RegressionEvaluator(
    labelCol="ArrDelay", predictionCol="prediction", metricName="rmse")
# With 5 folds Cross-validation, all the models created by the combination of the different hyperparameters values will be compared
# using the previously build evaluator
cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator, numFolds=5, parallelism=6)

# Introduced the features columns in the training and test data
transformed = assembler.transform(training)
testT = assembler.transform(test)

# Trains the model with the transformed training data
cvModel = cv.fit(transformed)

# The best model hyperparameters are printed
print(cvModel.bestModel)
paramMap = cvModel.bestModel.extractParamMap()
for param, value in paramMap.items():
    print(f"{param.name}: {value}")
print(cvModel.avgMetrics)

# The best model is evaluated with the test data. 
rmse = evaluator.evaluate(cvModel.transform(testT))
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

# To save the model
# cvModel.save(path="./best_model")

                                                                                

LinearRegressionModel: uid=LinearRegression_9081e0406dbc, numFeatures=12
aggregationDepth: 2
elasticNetParam: 1.0
epsilon: 1.35
featuresCol: features
fitIntercept: True
labelCol: ArrDelay
loss: squaredError
maxBlockSizeInMB: 0.0
maxIter: 100
predictionCol: prediction
regParam: 0.2
solver: auto
standardization: True
tol: 1e-06
[np.float64(21.08113541259292), np.float64(21.08113541259292), np.float64(21.08113541259292), np.float64(21.085976671123994), np.float64(21.085976671123994), np.float64(21.085976671123994), np.float64(21.095945253834135), np.float64(21.095945253834135), np.float64(21.095945253834135), np.float64(21.07573769081828), np.float64(21.077439335073848), np.float64(21.077439335073848), np.float64(21.07090372494816), np.float64(21.079120761721136), np.float64(21.079120761721136), np.float64(21.080652651684325), np.float64(21.084357034971113), np.float64(21.084357034971113), np.float64(21.064518545723164), np.float64(21.06890749729615), np.float64(21.06890749729615), np.flo

                                                                                

Root Mean Squared Error (RMSE) on test data = 17.3497


In [25]:
# Create a Decision Tree Regressor Model model
dtr = DecisionTreeRegressor(featuresCol="features", labelCol="ArrDelay")

# The different values of the Hyperparameters that the model will be trained with
grid = ParamGridBuilder().addGrid(dtr.maxDepth, [5,10,20,30]).addGrid(dtr.minInfoGain, [0, 0.1]).addGrid(dtr.maxBins, [32,64]).build()

# An evaluator that will measure the performance of the model predicting the target variable is build
evaluator = RegressionEvaluator(
    labelCol="ArrDelay", predictionCol="prediction", metricName="rmse")

# With 5 folds Cross-validation, all the models created by the combination of the different hyperparameters values will be compared
# using the previously build evaluator
cv = CrossValidator(estimator=dtr, estimatorParamMaps=grid, evaluator=evaluator, numFolds=5, parallelism=6)

# Introduced the features columns in the training and test data
transformed = assembler.transform(training)
testT = assembler.transform(test)

# Trains the model with the transformed training data
cvModel = cv.fit(transformed)
print(cvModel.bestModel)

# The best model hyperparameters are printed
paramMap = cvModel.bestModel.extractParamMap()
for param, value in paramMap.items():
    print(f"{param.name}: {value}")
print(cvModel.avgMetrics)

# The best model is evaluated with the test data. 
rmse = evaluator.evaluate(cvModel.transform(testT))
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

# To save the model
# cvModel.save(path="./best_model")


25/01/13 19:38:27 WARN DAGScheduler: Broadcasting large task binary with size 1024.0 KiB
25/01/13 19:38:27 WARN DAGScheduler: Broadcasting large task binary with size 1089.2 KiB
25/01/13 19:38:28 WARN DAGScheduler: Broadcasting large task binary with size 1023.6 KiB
25/01/13 19:38:28 WARN DAGScheduler: Broadcasting large task binary with size 1089.5 KiB
25/01/13 19:38:28 WARN DAGScheduler: Broadcasting large task binary with size 1154.0 KiB
25/01/13 19:38:28 WARN DAGScheduler: Broadcasting large task binary with size 1218.8 KiB
25/01/13 19:38:28 WARN DAGScheduler: Broadcasting large task binary with size 1153.6 KiB
25/01/13 19:38:29 WARN DAGScheduler: Broadcasting large task binary with size 1219.2 KiB
25/01/13 19:38:29 WARN DAGScheduler: Broadcasting large task binary with size 1275.1 KiB
25/01/13 19:38:29 WARN DAGScheduler: Broadcasting large task binary with size 1329.4 KiB
25/01/13 19:38:29 WARN DAGScheduler: Broadcasting large task binary with size 1274.7 KiB
25/01/13 19:38:29 WAR

DecisionTreeRegressionModel: uid=DecisionTreeRegressor_247ccbe45037, depth=5, numNodes=63, numFeatures=12
cacheNodeIds: False
checkpointInterval: 10
featuresCol: features
impurity: variance
labelCol: ArrDelay
leafCol: 
maxBins: 64
maxDepth: 5
maxMemoryInMB: 256
minInfoGain: 0.0
minInstancesPerNode: 1
minWeightFractionPerNode: 0.0
predictionCol: prediction
seed: 9005695898014258404
[np.float64(20.765295686055914), np.float64(18.849811824469473), np.float64(20.765295686055914), np.float64(18.849811824469473), np.float64(25.417282860166768), np.float64(23.124372319109423), np.float64(25.417282860166768), np.float64(23.124372319109423), np.float64(28.40247402729917), np.float64(25.980228119378836), np.float64(28.40231516186602), np.float64(25.980085502009967), np.float64(28.432846428826867), np.float64(26.01940411628732), np.float64(28.43270167258087), np.float64(26.019302911859057)]


                                                                                

Root Mean Squared Error (RMSE) on test data = 18.3329


In [None]:
# Create a Random Forest Regression model
rfr = RandomForestRegressor(featuresCol="features", labelCol="ArrDelay")

# The different values of the Hyperparameters that the model will be trained with
grid = ParamGridBuilder().addGrid(rfr.maxDepth, [5,10,20,30]).addGrid(rfr.bootstrap, [True, False]).addGrid(rfr.minInfoGain, [0, 0.1]).addGrid(rfr.maxBins, [32,64]).build()

# An evaluator that will measure the performance of the model predicting the target variable is build
evaluator = RegressionEvaluator(
    labelCol="ArrDelay", predictionCol="prediction", metricName="rmse")

# With 5 folds Cross-validation, all the models created by the combination of the different hyperparameters values will be compared
# using the previously build evaluator
cv = CrossValidator(estimator=rfr, estimatorParamMaps=grid, evaluator=evaluator, numFolds=5, parallelism=6)

# Introduced the features columns in the training and test data
transformed = assembler.transform(training)
testT = assembler.transform(test)

# Trains the model with the transformed training data
cvModel = cv.fit(transformed)
print(cvModel.bestModel)

# The best model hyperparameters are printed
paramMap = cvModel.bestModel.extractParamMap()
for param, value in paramMap.items():
    print(f"{param.name}: {value}")
print(cvModel.avgMetrics)

# The best model is evaluated with the test data. 
rmse = evaluator.evaluate(cvModel.transform(testT))
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

# To save the model
# cvModel.save(path="./best_model")

25/01/11 11:43:40 WARN DAGScheduler: Broadcasting large task binary with size 1282.3 KiB
25/01/11 11:43:40 WARN DAGScheduler: Broadcasting large task binary with size 1282.3 KiB
25/01/11 11:43:40 WARN DAGScheduler: Broadcasting large task binary with size 1229.2 KiB
25/01/11 11:43:40 WARN DAGScheduler: Broadcasting large task binary with size 1229.2 KiB
25/01/11 11:43:41 WARN DAGScheduler: Broadcasting large task binary with size 2.0 MiB
25/01/11 11:43:41 WARN DAGScheduler: Broadcasting large task binary with size 2.0 MiB
25/01/11 11:43:41 WARN DAGScheduler: Broadcasting large task binary with size 1374.2 KiB
25/01/11 11:43:41 WARN DAGScheduler: Broadcasting large task binary with size 1960.6 KiB
25/01/11 11:43:41 WARN DAGScheduler: Broadcasting large task binary with size 1960.6 KiB
25/01/11 11:43:42 WARN DAGScheduler: Broadcasting large task binary with size 1340.9 KiB
25/01/11 11:43:42 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/01/11 11:43:43 WARN DAGSche

RandomForestRegressionModel: uid=RandomForestRegressor_439dc33ce190, numTrees=20, numFeatures=12
bootstrap: False
cacheNodeIds: False
checkpointInterval: 10
featureSubsetStrategy: auto
featuresCol: features
impurity: variance
labelCol: ArrDelay
leafCol: 
maxBins: 64
maxDepth: 10
maxMemoryInMB: 256
minInfoGain: 0.0
minInstancesPerNode: 1
minWeightFractionPerNode: 0.0
numTrees: 20
predictionCol: prediction
seed: 5744811048762193986
subsamplingRate: 1.0
[np.float64(23.20031378978206), np.float64(22.611630917430222), np.float64(23.20031378978206), np.float64(22.611630917430222), np.float64(22.984514905586472), np.float64(22.628801399729156), np.float64(22.984514905586472), np.float64(22.628801399729156), np.float64(20.69843615245997), np.float64(19.67051731803977), np.float64(20.699490288167635), np.float64(19.674816622124855), np.float64(20.496954794137743), np.float64(19.60807058454845), np.float64(20.496949855069857), np.float64(19.637622754152055), np.float64(20.830288049740094), np.fl

In [None]:
# Create a  Gradient-Boosted Trees Regression model
gbtr = GBTRegressor(featuresCol="features", labelCol="ArrDelay")

# The different values of the Hyperparameters that the model will be trained with
grid = ParamGridBuilder().addGrid(gbtr.maxDepth, [5,10,20,30]).addGrid(gbtr.minInfoGain, [0, 0.1]).addGrid(gbtr.maxBins, [32,64]).build()
evaluator = RegressionEvaluator(
    labelCol="ArrDelay", predictionCol="prediction", metricName="rmse")

# With 5 folds Cross-validation, all the models created by the combination of the different hyperparameters values will be compared
# using the previously build evaluator
cv = CrossValidator(estimator=gbtr, estimatorParamMaps=grid, evaluator=evaluator, numFolds=5, parallelism=6)

# Introduced the features columns in the training and test data
transformed = assembler.transform(training)
testT = assembler.transform(test)

# Trains the model with the transformed training data
cvModel = cv.fit(transformed)
print(cvModel.bestModel)

# The best model hyperparameters are printed
paramMap = cvModel.bestModel.extractParamMap()
for param, value in paramMap.items():
    print(f"{param.name}: {value}")

print(cvModel.avgMetrics)
# The best model is evaluated with the test data. 
rmse = evaluator.evaluate(cvModel.transform(testT))

print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

# To save the model
# cvModel.save(path="./best_model")


25/01/11 12:01:11 WARN DAGScheduler: Broadcasting large task binary with size 1027.0 KiB
25/01/11 12:01:11 WARN DAGScheduler: Broadcasting large task binary with size 1019.5 KiB
25/01/11 12:01:11 WARN DAGScheduler: Broadcasting large task binary with size 1020.0 KiB
25/01/11 12:01:11 WARN DAGScheduler: Broadcasting large task binary with size 1020.6 KiB
25/01/11 12:01:11 WARN DAGScheduler: Broadcasting large task binary with size 1021.7 KiB
25/01/11 12:01:12 WARN DAGScheduler: Broadcasting large task binary with size 1023.9 KiB
25/01/11 12:01:12 WARN DAGScheduler: Broadcasting large task binary with size 1027.3 KiB
25/01/11 12:01:12 WARN DAGScheduler: Broadcasting large task binary with size 1033.8 KiB
25/01/11 12:01:12 WARN DAGScheduler: Broadcasting large task binary with size 1022.5 KiB
25/01/11 12:01:12 WARN DAGScheduler: Broadcasting large task binary with size 1045.5 KiB
25/01/11 12:01:12 WARN DAGScheduler: Broadcasting large task binary with size 1063.4 KiB
25/01/11 12:01:13 WAR

GBTRegressionModel: uid=GBTRegressor_da044fe2f4d8, numTrees=20, numFeatures=12
cacheNodeIds: False
checkpointInterval: 10
featureSubsetStrategy: all
featuresCol: features
impurity: variance
labelCol: ArrDelay
leafCol: 
lossType: squared
maxBins: 64
maxDepth: 5
maxIter: 20
maxMemoryInMB: 256
minInfoGain: 0.0
minInstancesPerNode: 1
minWeightFractionPerNode: 0.0
predictionCol: prediction
seed: -5222713660694064718
stepSize: 0.1
subsamplingRate: 1.0
validationTol: 0.01
[np.float64(20.94988600068423), np.float64(20.106767215621073), np.float64(20.949244611365085), np.float64(20.106767215621073), np.float64(25.83421022854271), np.float64(25.451463112229614), np.float64(25.793597227474358), np.float64(25.37217598854061), np.float64(28.494035479084395), np.float64(28.349658238174705), np.float64(28.479179160154615), np.float64(28.307774921268457), np.float64(28.50672326648356), np.float64(28.353959621572606), np.float64(28.50689265959926), np.float64(28.353827861793768)]
Root Mean Squared Erro

Starting with the training of the multivariate dataset

In [None]:
from pyspark.ml.feature import VectorAssembler

# Create a VectorAssembler
feature = ["ArrDelay"]
inputCols = [ "Month", "DepDelay", "Dest_Avrg_Delay", "Carrier_Avrg_Delay", "Origin_Avrg_Delay", "DayofMonth", "CRSElapsedTime"]



from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

#Rename to a meaningfull naming during training
multivariate_df = scaled_selected_features_df

#The data is randomply splited into a 70% for training and 30% for the final evaluation of the model
split = multivariate_df.randomSplit([0.7, 0.3])
training = split[0]
test = split[1]

# The columns that will be used to predict are specified
assembler = VectorAssembler(inputCols=inputCols, outputCol="features")


In [None]:
# Create a Linear Regression model
lr = LinearRegression(featuresCol="features", labelCol="ArrDelay")

# The different values of the Hyperparameters that the model will be trained with
grid = ParamGridBuilder().addGrid(lr.elasticNetParam, [0,0.2,0.8,1]).addGrid(lr.regParam, [0.05,0.1,0.2]).addGrid(lr.maxIter, [10,100,200]).build()

# An evaluator that will measure the performance of the model predicting the target variable is build
evaluator = RegressionEvaluator(
    labelCol="ArrDelay", predictionCol="prediction", metricName="rmse")
# With 5 folds Cross-validation, all the models created by the combination of the different hyperparameters values will be compared
# using the previously build evaluator
cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator, numFolds=5, parallelism=6)

# Introduced the features columns in the training and test data
transformed = assembler.transform(training)
testT = assembler.transform(test)

# Trains the model with the transformed training data
cvModel = cv.fit(transformed)

# The best model hyperparameters are printed
print(cvModel.bestModel)
paramMap = cvModel.bestModel.extractParamMap()
for param, value in paramMap.items():
    print(f"{param.name}: {value}")
print(cvModel.avgMetrics)

# The best model is evaluated with the test data. 
rmse = evaluator.evaluate(cvModel.transform(testT))
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

# To save the model
# cvModel.save(path="./best_model")

25/01/11 20:58:58 WARN BlockManager: Block rdd_79689_0 already exists on this machine; not re-adding it
25/01/11 20:58:58 WARN BlockManager: Block rdd_79689_0 already exists on this machine; not re-adding it
                                                                                

LinearRegressionModel: uid=LinearRegression_325e30c85f23, numFeatures=7
aggregationDepth: 2
elasticNetParam: 0.0
epsilon: 1.35
featuresCol: features
fitIntercept: True
labelCol: ArrDelay
loss: squaredError
maxBlockSizeInMB: 0.0
maxIter: 10
predictionCol: prediction
regParam: 0.05
solver: auto
standardization: True
tol: 1e-06
[np.float64(17.341915610492123), np.float64(17.341915610492123), np.float64(17.341915610492123), np.float64(17.342299930591558), np.float64(17.342299930591558), np.float64(17.342299930591558), np.float64(17.3434835781434), np.float64(17.3434835781434), np.float64(17.3434835781434), np.float64(17.34191647202321), np.float64(17.341920419270377), np.float64(17.341920419270377), np.float64(17.342351412403243), np.float64(17.342362788889783), np.float64(17.342362788889783), np.float64(17.343817790035466), np.float64(17.343821702362415), np.float64(17.343821702362415), np.float64(17.341991376720358), np.float64(17.341993954818122), np.float64(17.341993954818122), np.floa

                                                                                

In [None]:
# Create a Decision Tree Regressor Model model
dtr = DecisionTreeRegressor(featuresCol="features", labelCol="ArrDelay")

# The different values of the Hyperparameters that the model will be trained with
grid = ParamGridBuilder().addGrid(dtr.maxDepth, [5,10,20,30]).addGrid(dtr.minInfoGain, [0, 0.1]).addGrid(dtr.maxBins, [32,64]).build()

# An evaluator that will measure the performance of the model predicting the target variable is build
evaluator = RegressionEvaluator(
    labelCol="ArrDelay", predictionCol="prediction", metricName="rmse")

# With 5 folds Cross-validation, all the models created by the combination of the different hyperparameters values will be compared
# using the previously build evaluator
cv = CrossValidator(estimator=dtr, estimatorParamMaps=grid, evaluator=evaluator, numFolds=5, parallelism=6)

# Introduced the features columns in the training and test data
transformed = assembler.transform(training)
testT = assembler.transform(test)

# Trains the model with the transformed training data
cvModel = cv.fit(transformed)
print(cvModel.bestModel)

# The best model hyperparameters are printed
paramMap = cvModel.bestModel.extractParamMap()
for param, value in paramMap.items():
    print(f"{param.name}: {value}")
print(cvModel.avgMetrics)

# The best model is evaluated with the test data. 
rmse = evaluator.evaluate(cvModel.transform(testT))
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

# To save the model
# cvModel.save(path="./best_model")


25/01/11 18:30:22 WARN DAGScheduler: Broadcasting large task binary with size 1024.9 KiB
25/01/11 18:30:22 WARN DAGScheduler: Broadcasting large task binary with size 1023.8 KiB
25/01/11 18:30:22 WARN DAGScheduler: Broadcasting large task binary with size 1138.6 KiB
25/01/11 18:30:22 WARN DAGScheduler: Broadcasting large task binary with size 1136.7 KiB
25/01/11 18:30:22 WARN DAGScheduler: Broadcasting large task binary with size 1016.6 KiB
25/01/11 18:30:22 WARN DAGScheduler: Broadcasting large task binary with size 1016.1 KiB
25/01/11 18:30:23 WARN DAGScheduler: Broadcasting large task binary with size 1235.0 KiB
25/01/11 18:30:23 WARN DAGScheduler: Broadcasting large task binary with size 1237.6 KiB
25/01/11 18:30:23 WARN DAGScheduler: Broadcasting large task binary with size 1134.9 KiB
25/01/11 18:30:23 WARN DAGScheduler: Broadcasting large task binary with size 1313.4 KiB
25/01/11 18:30:23 WARN DAGScheduler: Broadcasting large task binary with size 1316.2 KiB
25/01/11 18:30:23 WAR

DecisionTreeRegressionModel: uid=DecisionTreeRegressor_7f6da3143aa7, depth=5, numNodes=63, numFeatures=7
cacheNodeIds: False
checkpointInterval: 10
featuresCol: features
impurity: variance
labelCol: ArrDelay
leafCol: 
maxBins: 64
maxDepth: 5
maxMemoryInMB: 256
minInfoGain: 0.0
minInstancesPerNode: 1
minWeightFractionPerNode: 0.0
predictionCol: prediction
seed: 6210202149619856988
[np.float64(22.588919987343182), np.float64(20.141471048358078), np.float64(22.588919987343182), np.float64(20.141471048358078), np.float64(28.446553454295042), np.float64(26.38063431677987), np.float64(28.446553454295042), np.float64(26.38063431677987), np.float64(32.071353153724836), np.float64(29.047862244938575), np.float64(32.07117510386881), np.float64(29.04796217245851), np.float64(32.082560232573655), np.float64(29.09187715082532), np.float64(32.08233848451412), np.float64(29.092235708866877)]
Root Mean Squared Error (RMSE) on test data = 20.4194


In [None]:
# Create a Random Forest Regression model
rfr = RandomForestRegressor(featuresCol="features", labelCol="ArrDelay")

# The different values of the Hyperparameters that the model will be trained with
grid = ParamGridBuilder().addGrid(rfr.maxDepth, [5,10,20,30]).addGrid(rfr.bootstrap, [True, False]).addGrid(rfr.minInfoGain, [0, 0.1]).addGrid(rfr.maxBins, [32,64]).build()

# An evaluator that will measure the performance of the model predicting the target variable is build
evaluator = RegressionEvaluator(
    labelCol="ArrDelay", predictionCol="prediction", metricName="rmse")

# With 5 folds Cross-validation, all the models created by the combination of the different hyperparameters values will be compared
# using the previously build evaluator
cv = CrossValidator(estimator=rfr, estimatorParamMaps=grid, evaluator=evaluator, numFolds=5, parallelism=6)

# Introduced the features columns in the training and test data
transformed = assembler.transform(training)
testT = assembler.transform(test)

# Trains the model with the transformed training data
cvModel = cv.fit(transformed)
print(cvModel.bestModel)

# The best model hyperparameters are printed
paramMap = cvModel.bestModel.extractParamMap()
for param, value in paramMap.items():
    print(f"{param.name}: {value}")
print(cvModel.avgMetrics)

# The best model is evaluated with the test data. 
rmse = evaluator.evaluate(cvModel.transform(testT))
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

# To save the model
# cvModel.save(path="./best_model")

25/01/11 18:31:10 WARN DAGScheduler: Broadcasting large task binary with size 1282.9 KiB
25/01/11 18:31:10 WARN DAGScheduler: Broadcasting large task binary with size 1282.6 KiB
25/01/11 18:31:10 WARN DAGScheduler: Broadcasting large task binary with size 1272.6 KiB
25/01/11 18:31:10 WARN DAGScheduler: Broadcasting large task binary with size 1272.6 KiB
25/01/11 18:31:11 WARN DAGScheduler: Broadcasting large task binary with size 2.0 MiB
25/01/11 18:31:11 WARN DAGScheduler: Broadcasting large task binary with size 2.0 MiB
25/01/11 18:31:11 WARN DAGScheduler: Broadcasting large task binary with size 2047.3 KiB
25/01/11 18:31:11 WARN DAGScheduler: Broadcasting large task binary with size 2046.7 KiB
25/01/11 18:31:11 WARN DAGScheduler: Broadcasting large task binary with size 1370.9 KiB
25/01/11 18:31:12 WARN DAGScheduler: Broadcasting large task binary with size 1351.7 KiB
25/01/11 18:31:12 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/01/11 18:31:13 WARN DAGSche

RandomForestRegressionModel: uid=RandomForestRegressor_b4313634f401, numTrees=20, numFeatures=7
bootstrap: True
cacheNodeIds: False
checkpointInterval: 10
featureSubsetStrategy: auto
featuresCol: features
impurity: variance
labelCol: ArrDelay
leafCol: 
maxBins: 64
maxDepth: 10
maxMemoryInMB: 256
minInfoGain: 0.0
minInstancesPerNode: 1
minWeightFractionPerNode: 0.0
numTrees: 20
predictionCol: prediction
seed: 9093253588901075681
subsamplingRate: 1.0
[np.float64(22.910908052124935), np.float64(21.614504638104673), np.float64(22.910908052124935), np.float64(21.614504638104673), np.float64(23.017194124156607), np.float64(21.78300008354642), np.float64(23.017194124156607), np.float64(21.78300008354642), np.float64(21.979196644221425), np.float64(20.30934626776801), np.float64(21.971876763630842), np.float64(20.35388471380108), np.float64(22.549878990094207), np.float64(20.48658772597701), np.float64(22.434002895844458), np.float64(20.505824765903753), np.float64(22.24750962382406), np.float

In [None]:
# Create a  Gradient-Boosted Trees Regression model
gbtr = GBTRegressor(featuresCol="features", labelCol="ArrDelay")

# The different values of the Hyperparameters that the model will be trained with
grid = ParamGridBuilder().addGrid(gbtr.maxDepth, [5,10,20,30]).addGrid(gbtr.minInfoGain, [0, 0.1]).addGrid(gbtr.maxBins, [32,64]).build()
evaluator = RegressionEvaluator(
    labelCol="ArrDelay", predictionCol="prediction", metricName="rmse")

# With 5 folds Cross-validation, all the models created by the combination of the different hyperparameters values will be compared
# using the previously build evaluator
cv = CrossValidator(estimator=gbtr, estimatorParamMaps=grid, evaluator=evaluator, numFolds=5, parallelism=6)

# Introduced the features columns in the training and test data
transformed = assembler.transform(training)
testT = assembler.transform(test)

# Trains the model with the transformed training data
cvModel = cv.fit(transformed)
print(cvModel.bestModel)

# The best model hyperparameters are printed
paramMap = cvModel.bestModel.extractParamMap()
for param, value in paramMap.items():
    print(f"{param.name}: {value}")

print(cvModel.avgMetrics)
# The best model is evaluated with the test data. 
rmse = evaluator.evaluate(cvModel.transform(testT))

print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

# To save the model
# cvModel.save(path="./best_model")


25/01/11 18:47:31 WARN DAGScheduler: Broadcasting large task binary with size 1025.4 KiB
25/01/11 18:47:32 WARN DAGScheduler: Broadcasting large task binary with size 1020.1 KiB
25/01/11 18:47:32 WARN DAGScheduler: Broadcasting large task binary with size 1139.0 KiB
25/01/11 18:47:32 WARN DAGScheduler: Broadcasting large task binary with size 1017.0 KiB
25/01/11 18:47:32 WARN DAGScheduler: Broadcasting large task binary with size 1016.5 KiB
25/01/11 18:47:32 WARN DAGScheduler: Broadcasting large task binary with size 1238.0 KiB
25/01/11 18:47:32 WARN DAGScheduler: Broadcasting large task binary with size 1017.0 KiB
25/01/11 18:47:32 WARN DAGScheduler: Broadcasting large task binary with size 1135.4 KiB
25/01/11 18:47:32 WARN DAGScheduler: Broadcasting large task binary with size 1017.6 KiB
25/01/11 18:47:32 WARN DAGScheduler: Broadcasting large task binary with size 1316.6 KiB
25/01/11 18:47:32 WARN DAGScheduler: Broadcasting large task binary with size 1018.7 KiB
25/01/11 18:47:32 WAR

GBTRegressionModel: uid=GBTRegressor_8c6d6dc6bd27, numTrees=20, numFeatures=7
cacheNodeIds: False
checkpointInterval: 10
featureSubsetStrategy: all
featuresCol: features
impurity: variance
labelCol: ArrDelay
leafCol: 
lossType: squared
maxBins: 64
maxDepth: 5
maxIter: 20
maxMemoryInMB: 256
minInfoGain: 0.1
minInstancesPerNode: 1
minWeightFractionPerNode: 0.0
predictionCol: prediction
seed: -160350197208826515
stepSize: 0.1
subsamplingRate: 1.0
validationTol: 0.01
[np.float64(22.642580950076322), np.float64(20.23766898150236), np.float64(22.642580950076322), np.float64(20.236765706091422), np.float64(28.56423609343758), np.float64(26.5290203490302), np.float64(28.494122322846515), np.float64(26.465814208661335), np.float64(32.06710915591502), np.float64(29.11315911989821), np.float64(32.07117510386881), np.float64(29.04796217245851), np.float64(32.082560232573655), np.float64(29.09187715082532), np.float64(32.08233848451412), np.float64(29.092235708866877)]
Root Mean Squared Error (RMSE