### <u>DSA5208 Project 2: Using MLlib to predict air pressure</u>

Done by: S K Ruban A0253837W ,Owen Li Dong Lin A0231088H

This notebook is organised in the following structure:<br>
    **1. Import libraries**<br>
    **2. Initialize Spark session**<br>
    **3. Data Cleaning & Preprocessing**<br>
        &emsp;3.1 Load and clean data from all extracted .csv files<br>
        &emsp;3.2 Prepare data by applying MinMax scaling<br>
        &emsp;3.3 Split data into train and test sets<br>
        &emsp;3.4 Assemble feature vector for model training<br>
    **4. Model Training**<br>
        &emsp;4.1 Cache training and test data for faster processing<br>
        &emsp;4.2 Define regression models<br>
        &emsp;4.3 Define model parameter grids for hyperparameter tuning<br>
        &emsp;4.4 Define RegressionEvaluator & CrossValidators for each model<br>
        &emsp;4.5 Create pipelines and fit each model<br>
        &emsp;4.6 Make predictions on the train and test sets<br>
    **5. Model Evaluation & Optimization**<br>
        &emsp;5.1 Calculate RMSE of model predictions on train and test sets<br>
        &emsp;5.2 Identify the optimized hyperparameters for each model<br>

# 1. Import libraries

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, substring, when
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor, GBTRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
import sys
import os
from time import time

# 2. Initialize Spark session

In [2]:
spark = SparkSession.builder \
    .appName("Weather Pressure Prediction") \
    .config("spark.ui.showConsoleProgress", "false") \
    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

24/11/16 13:26:09 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


# 3. Data Cleaning & Preprocessing

### 3.1 Load and clean data from all extracted .csv files

In [4]:
df = spark.read.csv("gs://dsa5208-proj2/extracted/*.csv", header=True, inferSchema=True)

selected_columns = ["LATITUDE", "LONGITUDE", "ELEVATION", "WND", "CIG", "VIS", "TMP", "DEW", "SLP"]

df = df.select(selected_columns).cache()

df_parsed = df.select(
        # Latitude: +99999 is missing
        when(col("LATITUDE").contains("+99999"), None)
        .otherwise(col("LATITUDE").cast("double"))
        .alias("latitude"),

        # Longitude: +999999 is missing
        when(col("LONGITUDE").contains("+999999"), None)
        .otherwise(col("LONGITUDE").cast("double"))
        .alias("longitude"),

        # Elevation: +9999 is missing
        when(col("ELEVATION").contains("+9999"), None)
        .otherwise(col("ELEVATION").cast("double"))
        .alias("elevation"),

        # Wind direction: 999 is missing
        when(split(col("WND"), ",").getItem(0) == "999", None)
        .otherwise(split(col("WND"), ",").getItem(0).cast("double"))
        .alias("wind_direction"),

        # Wind speed: 9999 is missing in fourth position
        when(split(col("WND"), ",").getItem(3) == "9999", None)
        .otherwise(split(col("WND"), ",").getItem(3).cast("double"))
        .alias("wind_speed"),

        # Ceiling height: 99999 is missing
        when(split(col("CIG"), ",").getItem(0) == "99999", None)
        .otherwise(split(col("CIG"), ",").getItem(0).cast("double"))
        .alias("ceiling_height"),

        # Visibility: 999999 is missing
        when(split(col("VIS"), ",").getItem(0) == "999999", None)
        .otherwise(split(col("VIS"), ",").getItem(0).cast("double"))
        .alias("visibility"),

        # Air temperature: +9999 is missing
        when(col("TMP").contains("+9999"), None)
        .otherwise(split(col("TMP"), ",").getItem(0).cast("double"))
        .alias("air_temp"),

        # Dew point: +9999 is missing
        when(col("DEW").contains("+9999"), None)
        .otherwise(split(col("DEW"), ",").getItem(0).cast("double"))
        .alias("dew_point"),

        # Sea level pressure: 99999 is missing
        when(split(col("SLP"), ",").getItem(0) == "99999", None)
        .otherwise(split(col("SLP"), ",").getItem(0).cast("double"))
        .alias("sea_level_pressure")
    )

df.unpersist()
print("\nSample of parsed data:")
df_parsed.show(20)

print("\nNull counts after parsing:")
for column in df_parsed.columns:
    null_count = df_parsed.filter(col(column).isNull()).count()
    total = df_parsed.count()
    print(f"{column}: {null_count} nulls out of {total} ({(null_count/total)*100:.2f}%)")

print("\nRemoving rows with null values...")
df_no_nulls = df_parsed.dropna().cache()

rows_after_null_removal = df_no_nulls.count()
print(f"Rows remaining after null removal: {rows_after_null_removal}")

print("\nApplying valid range filters...")
df_filtered = df_no_nulls.filter(
    (col("latitude").between(-90000, 90000)) &
    (col("longitude").between(-179999, 180000)) &
    (col("elevation").between(-400, 8850)) &
    (col("wind_direction").between(1, 360)) &
    (col("wind_speed").between(0, 900)) &
    (co4l("ceiling_height").between(0, 22000)) &
    (col("visibility").between(0, 160000)) &
    (col("air_temp").between(-932, 618)) &
    (col("dew_point").between(-982, 368)) &
    (col("sea_level_pressure").between(8600, 10900))
    )

df_no_nulls.unpersist()
final_count = df_filtered.count()
print(f"Final row count after all filtering: {final_count}")


df_filtered.coalesce(1).write.csv("gs://dsa5208-proj2/cleaned_data", header=True, mode="overwrite")

                                                                                


Sample of parsed data:
+--------+---------+---------+--------------+----------+--------------+----------+--------+---------+------------------+
|latitude|longitude|elevation|wind_direction|wind_speed|ceiling_height|visibility|air_temp|dew_point|sea_level_pressure|
+--------+---------+---------+--------------+----------+--------------+----------+--------+---------+------------------+
| 71.3213| -156.611|      4.6|          NULL|       5.0|          NULL|      NULL|  -175.0|     NULL|              NULL|
| 71.3213| -156.611|      4.6|          NULL|      NULL|          NULL|      NULL|  -175.0|     NULL|              NULL|
| 71.3213| -156.611|      4.6|          NULL|      NULL|          NULL|      NULL|  -175.0|     NULL|              NULL|
| 71.3213| -156.611|      4.6|          NULL|      NULL|          NULL|      NULL|  -174.0|     NULL|              NULL|
| 71.3213| -156.611|      4.6|          NULL|      NULL|          NULL|      NULL|  -174.0|     NULL|              NULL|
| 71.321

                                                                                

latitude: 0 nulls out of 130531842 (0.00%)




longitude: 0 nulls out of 130531842 (0.00%)


                                                                                

elevation: 0 nulls out of 130531842 (0.00%)


                                                                                

wind_direction: 37143661 nulls out of 130531842 (28.46%)


                                                                                

wind_speed: 18615382 nulls out of 130531842 (14.26%)


                                                                                

ceiling_height: 68297483 nulls out of 130531842 (52.32%)


                                                                                

visibility: 44706860 nulls out of 130531842 (34.25%)


24/11/16 14:51:15 ERROR AsyncEventQueue: Dropping event from queue dataprocEvent. This likely means one of the listeners is too slow and cannot keep up with the rate at which tasks are being started by the scheduler.
24/11/16 14:51:15 WARN AsyncEventQueue: Dropped 1 events from dataprocEvent since the application started.
                                                                                

air_temp: 3440078 nulls out of 130531842 (2.64%)


                                                                                

dew_point: 22730107 nulls out of 130531842 (17.41%)


                                                                                

sea_level_pressure: 78046337 nulls out of 130531842 (59.79%)

Removing rows with null values...


                                                                                

Rows remaining after null removal: 18474434

Applying valid range filters...


24/11/16 15:13:42 WARN AsyncEventQueue: Dropped 1 events from dataprocEvent since Sat Nov 16 14:51:15 UTC 2024.
                                                                                

Final row count after all filtering: 18471526


24/11/16 15:22:27 WARN TaskSetManager: Stage 72 contains a task of very large size (1352 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

### 3.2 Prepare data by applying MinMax scaling

In [8]:
cleaned_df = spark.read.csv("gs://dsa5208-proj2/cleaned_data/*.csv", header=True, inferSchema=True)


feature_columns = ['latitude', 'longitude', 'elevation', 'wind_direction', 'wind_speed', 'ceiling_height', 
                   'visibility', 'air_temp', 'dew_point']

min_values = [-90000, -179999, -400, 1, 0, 0, 0, -932, -982]
max_values = [90000, 180000, 8850, 360, 900, 22000, 160000, 618, 368]


for col_name, min_val, max_val in zip(feature_columns, min_values, max_values):
    cleaned_df = cleaned_df.withColumn(col_name, (col(col_name) - min_val) / (max_val - min_val))

                                                                                

### 3.3 Split data into train and test sets

In [10]:
train_data, test_data = cleaned_df.randomSplit([0.7, 0.3], seed=42)

### 3.4 Assemble feature vector for model training

In [11]:
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

---

# 4. Model Training

### 4.1 Cache training & test data for faster processing

In [17]:
train_df = train_data.cache()
test_df = test_data.cache()

24/11/16 18:17:11 WARN CacheManager: Asked to cache already cached data.
24/11/16 18:17:11 WARN CacheManager: Asked to cache already cached data.


### 4.2 Define regression models

In [13]:
ridge_reg = LinearRegression(featuresCol="features", labelCol="sea_level_pressure", elasticNetParam=0.0)

gbt = GBTRegressor(featuresCol="features", labelCol="sea_level_pressure")

rf = RandomForestRegressor(featuresCol="features", labelCol="sea_level_pressure")

### 4.3 Define model parameter grids for hyperparameter tuning

In [14]:
ridge_param_grid = (ParamGridBuilder()
                        .addGrid(ridge_reg.regParam, [0.01, 0.1, 1.0])
                        .build())

gbt_param_grid = (ParamGridBuilder()
                      .addGrid(gbt.maxDepth, [5, 7, 9])
                      .addGrid(gbt.maxIter, [20, 30, 40])
                      .build())

rf_param_grid = (ParamGridBuilder()
                     .addGrid(rf.numTrees, [20, 30, 40])
                     .addGrid(rf.maxDepth, [5, 7, 9])
                     .build())

### 4.4 Define RegressionEvaluator & CrossValidators for each model

In [15]:
# Define evaluators
evaluator = RegressionEvaluator(labelCol="sea_level_pressure", predictionCol="prediction", metricName="rmse")

# Create CrossValidator for each model
ridge_cv = CrossValidator(estimator=ridge_reg, estimatorParamMaps=ridge_param_grid, evaluator=evaluator, numFolds=3)
gbt_cv = CrossValidator(estimator=gbt, estimatorParamMaps=gbt_param_grid, evaluator=evaluator, numFolds=3)
rf_cv = CrossValidator(estimator=rf, estimatorParamMaps=rf_param_grid, evaluator=evaluator, numFolds=3)

### 4.5 Create pipelines and fit each model

In [18]:
# Create pipelines for each model
ridge_pipeline = Pipeline(stages=[assembler, ridge_cv])
gbt_pipeline = Pipeline(stages=[assembler, gbt_cv])
rf_pipeline = Pipeline(stages=[assembler, rf_cv])

# Fit the models
ridge_model = ridge_pipeline.fit(train_df)
gbt_model = gbt_pipeline.fit(train_df)
rf_model = rf_pipeline.fit(train_df)

24/11/16 18:42:07 WARN DAGScheduler: Broadcasting large task binary with size 1001.8 KiB
24/11/16 18:42:10 WARN DAGScheduler: Broadcasting large task binary with size 1002.3 KiB
24/11/16 18:42:10 WARN DAGScheduler: Broadcasting large task binary with size 1002.9 KiB
24/11/16 18:42:11 WARN DAGScheduler: Broadcasting large task binary with size 1004.0 KiB
24/11/16 18:42:12 WARN DAGScheduler: Broadcasting large task binary with size 1006.3 KiB
24/11/16 18:42:13 WARN DAGScheduler: Broadcasting large task binary with size 1010.8 KiB
24/11/16 18:42:14 WARN DAGScheduler: Broadcasting large task binary with size 1019.9 KiB
24/11/16 18:44:10 WARN DAGScheduler: Broadcasting large task binary with size 1036.8 KiB
24/11/16 18:44:11 WARN DAGScheduler: Broadcasting large task binary with size 1047.7 KiB
24/11/16 18:44:14 WARN DAGScheduler: Broadcasting large task binary with size 1048.2 KiB
24/11/16 18:44:14 WARN DAGScheduler: Broadcasting large task binary with size 1048.8 KiB
24/11/16 18:44:15 WAR

24/11/16 18:45:41 WARN DAGScheduler: Broadcasting large task binary with size 1741.1 KiB
24/11/16 18:45:42 WARN DAGScheduler: Broadcasting large task binary with size 1759.8 KiB
24/11/16 18:45:43 WARN DAGScheduler: Broadcasting large task binary with size 1796.5 KiB
24/11/16 18:47:37 WARN DAGScheduler: Broadcasting large task binary with size 1036.8 KiB
24/11/16 18:47:38 WARN DAGScheduler: Broadcasting large task binary with size 1047.7 KiB
24/11/16 18:47:41 WARN DAGScheduler: Broadcasting large task binary with size 1048.2 KiB
24/11/16 18:47:42 WARN DAGScheduler: Broadcasting large task binary with size 1048.8 KiB
24/11/16 18:47:42 WARN DAGScheduler: Broadcasting large task binary with size 1049.9 KiB
24/11/16 18:47:43 WARN DAGScheduler: Broadcasting large task binary with size 1052.2 KiB
24/11/16 18:47:44 WARN DAGScheduler: Broadcasting large task binary with size 1056.7 KiB
24/11/16 18:47:45 WARN DAGScheduler: Broadcasting large task binary with size 1065.8 KiB
24/11/16 18:47:46 WAR

24/11/16 18:49:15 WARN DAGScheduler: Broadcasting large task binary with size 1844.0 KiB
24/11/16 18:49:16 WARN DAGScheduler: Broadcasting large task binary with size 1880.8 KiB
24/11/16 18:49:17 WARN DAGScheduler: Broadcasting large task binary with size 1891.6 KiB
24/11/16 18:49:19 WARN DAGScheduler: Broadcasting large task binary with size 1892.1 KiB
24/11/16 18:49:20 WARN DAGScheduler: Broadcasting large task binary with size 1892.7 KiB
24/11/16 18:49:21 WARN DAGScheduler: Broadcasting large task binary with size 1893.8 KiB
24/11/16 18:49:22 WARN DAGScheduler: Broadcasting large task binary with size 1896.1 KiB
24/11/16 18:49:23 WARN DAGScheduler: Broadcasting large task binary with size 1900.6 KiB
24/11/16 18:49:24 WARN DAGScheduler: Broadcasting large task binary with size 1909.7 KiB
24/11/16 18:49:24 WARN DAGScheduler: Broadcasting large task binary with size 1928.4 KiB
24/11/16 18:49:25 WARN DAGScheduler: Broadcasting large task binary with size 1965.2 KiB
24/11/16 18:49:27 WAR

24/11/16 18:52:54 WARN DAGScheduler: Broadcasting large task binary with size 1132.9 KiB
24/11/16 18:52:55 WARN DAGScheduler: Broadcasting large task binary with size 1133.5 KiB
24/11/16 18:52:56 WARN DAGScheduler: Broadcasting large task binary with size 1134.6 KiB
24/11/16 18:52:57 WARN DAGScheduler: Broadcasting large task binary with size 1136.9 KiB
24/11/16 18:52:57 WARN DAGScheduler: Broadcasting large task binary with size 1141.4 KiB
24/11/16 18:52:58 WARN DAGScheduler: Broadcasting large task binary with size 1150.5 KiB
24/11/16 18:52:59 WARN DAGScheduler: Broadcasting large task binary with size 1169.2 KiB
24/11/16 18:53:00 WARN DAGScheduler: Broadcasting large task binary with size 1206.0 KiB
24/11/16 18:53:01 WARN DAGScheduler: Broadcasting large task binary with size 1216.8 KiB
24/11/16 18:53:04 WARN DAGScheduler: Broadcasting large task binary with size 1217.3 KiB
24/11/16 18:53:05 WARN DAGScheduler: Broadcasting large task binary with size 1217.8 KiB
24/11/16 18:53:06 WAR

24/11/16 18:54:34 WARN DAGScheduler: Broadcasting large task binary with size 1978.4 KiB
24/11/16 18:54:35 WARN DAGScheduler: Broadcasting large task binary with size 1980.7 KiB
24/11/16 18:54:36 WARN DAGScheduler: Broadcasting large task binary with size 1985.2 KiB
24/11/16 18:54:37 WARN DAGScheduler: Broadcasting large task binary with size 1994.3 KiB
24/11/16 18:54:37 WARN DAGScheduler: Broadcasting large task binary with size 2013.0 KiB
24/11/16 18:54:38 WARN DAGScheduler: Broadcasting large task binary with size 2.0 MiB
24/11/16 18:54:40 WARN DAGScheduler: Broadcasting large task binary with size 2.0 MiB
24/11/16 18:54:42 WARN DAGScheduler: Broadcasting large task binary with size 2.0 MiB
24/11/16 18:54:43 WARN DAGScheduler: Broadcasting large task binary with size 2.0 MiB
24/11/16 18:54:44 WARN DAGScheduler: Broadcasting large task binary with size 2.0 MiB
24/11/16 18:54:45 WARN DAGScheduler: Broadcasting large task binary with size 2.0 MiB
24/11/16 18:54:46 WARN DAGScheduler: Br

24/11/16 18:56:27 WARN DAGScheduler: Broadcasting large task binary with size 2.8 MiB
24/11/16 18:56:30 WARN DAGScheduler: Broadcasting large task binary with size 2.8 MiB
24/11/16 18:56:31 WARN DAGScheduler: Broadcasting large task binary with size 2.8 MiB
24/11/16 18:56:32 WARN DAGScheduler: Broadcasting large task binary with size 2.8 MiB
24/11/16 18:56:33 WARN DAGScheduler: Broadcasting large task binary with size 2.8 MiB
24/11/16 18:56:34 WARN DAGScheduler: Broadcasting large task binary with size 2.8 MiB
24/11/16 18:56:35 WARN DAGScheduler: Broadcasting large task binary with size 2.8 MiB
24/11/16 18:56:36 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB
24/11/16 18:56:38 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB
24/11/16 18:56:39 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB
24/11/16 18:56:42 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB
24/11/16 18:56:43 WARN DAGScheduler: Broadcasting larg

24/11/16 19:20:08 WARN DAGScheduler: Broadcasting large task binary with size 1223.3 KiB
24/11/16 19:20:09 WARN DAGScheduler: Broadcasting large task binary with size 1232.4 KiB
24/11/16 19:20:10 WARN DAGScheduler: Broadcasting large task binary with size 1251.1 KiB
24/11/16 19:20:10 WARN DAGScheduler: Broadcasting large task binary with size 1287.9 KiB
24/11/16 19:20:11 WARN DAGScheduler: Broadcasting large task binary with size 1298.8 KiB
24/11/16 19:20:14 WARN DAGScheduler: Broadcasting large task binary with size 1299.3 KiB
24/11/16 19:20:15 WARN DAGScheduler: Broadcasting large task binary with size 1299.8 KiB
24/11/16 19:20:16 WARN DAGScheduler: Broadcasting large task binary with size 1301.0 KiB
24/11/16 19:20:16 WARN DAGScheduler: Broadcasting large task binary with size 1303.3 KiB
24/11/16 19:20:17 WARN DAGScheduler: Broadcasting large task binary with size 1307.8 KiB
24/11/16 19:20:18 WARN DAGScheduler: Broadcasting large task binary with size 1316.9 KiB
24/11/16 19:20:19 WAR

24/11/16 19:23:27 WARN DAGScheduler: Broadcasting large task binary with size 1316.9 KiB
24/11/16 19:23:28 WARN DAGScheduler: Broadcasting large task binary with size 1335.6 KiB
24/11/16 19:23:30 WARN DAGScheduler: Broadcasting large task binary with size 1372.3 KiB
24/11/16 19:23:31 WARN DAGScheduler: Broadcasting large task binary with size 1382.9 KiB
24/11/16 19:23:33 WARN DAGScheduler: Broadcasting large task binary with size 1383.4 KiB
24/11/16 19:23:34 WARN DAGScheduler: Broadcasting large task binary with size 1384.0 KiB
24/11/16 19:23:35 WARN DAGScheduler: Broadcasting large task binary with size 1385.1 KiB
24/11/16 19:23:36 WARN DAGScheduler: Broadcasting large task binary with size 1387.4 KiB
24/11/16 19:23:36 WARN DAGScheduler: Broadcasting large task binary with size 1391.9 KiB
24/11/16 19:23:37 WARN DAGScheduler: Broadcasting large task binary with size 1401.0 KiB
24/11/16 19:23:38 WARN DAGScheduler: Broadcasting large task binary with size 1419.6 KiB
24/11/16 19:23:39 WAR

24/11/16 19:25:06 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
24/11/16 19:25:07 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
24/11/16 19:25:09 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
24/11/16 19:25:10 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
24/11/16 19:25:11 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
24/11/16 19:25:12 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
24/11/16 19:25:13 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
24/11/16 19:25:14 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
24/11/16 19:25:15 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
24/11/16 19:25:16 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
24/11/16 19:25:17 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
24/11/16 19:25:20 WARN DAGScheduler: Broadcasting larg

24/11/16 19:28:34 WARN DAGScheduler: Broadcasting large task binary with size 1466.9 KiB
24/11/16 19:28:35 WARN DAGScheduler: Broadcasting large task binary with size 1467.5 KiB
24/11/16 19:28:36 WARN DAGScheduler: Broadcasting large task binary with size 1468.6 KiB
24/11/16 19:28:36 WARN DAGScheduler: Broadcasting large task binary with size 1470.9 KiB
24/11/16 19:28:37 WARN DAGScheduler: Broadcasting large task binary with size 1475.4 KiB
24/11/16 19:28:38 WARN DAGScheduler: Broadcasting large task binary with size 1484.5 KiB
24/11/16 19:28:39 WARN DAGScheduler: Broadcasting large task binary with size 1503.2 KiB
24/11/16 19:28:40 WARN DAGScheduler: Broadcasting large task binary with size 1540.1 KiB
24/11/16 19:28:41 WARN DAGScheduler: Broadcasting large task binary with size 1551.1 KiB
24/11/16 19:28:44 WARN DAGScheduler: Broadcasting large task binary with size 1551.6 KiB
24/11/16 19:28:44 WARN DAGScheduler: Broadcasting large task binary with size 1552.2 KiB
24/11/16 19:28:45 WAR

24/11/16 19:30:12 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
24/11/16 19:30:13 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
24/11/16 19:30:14 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
24/11/16 19:30:15 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
24/11/16 19:30:16 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
24/11/16 19:30:17 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
24/11/16 19:30:20 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
24/11/16 19:30:21 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
24/11/16 19:30:22 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
24/11/16 19:30:23 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
24/11/16 19:30:24 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
24/11/16 19:30:25 WARN DAGScheduler: Broadcasting larg

24/11/16 19:32:05 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB
24/11/16 19:32:08 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB
24/11/16 19:32:08 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB
24/11/16 19:32:09 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB
24/11/16 19:32:10 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB
24/11/16 19:32:11 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB
24/11/16 19:32:12 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB
24/11/16 19:32:13 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB
24/11/16 19:32:14 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB
24/11/16 19:32:15 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB
24/11/16 19:32:18 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB
24/11/16 19:32:19 WARN DAGScheduler: Broadcasting larg

24/11/16 19:55:02 WARN DAGScheduler: Broadcasting large task binary with size 1557.3 KiB
24/11/16 19:55:03 WARN DAGScheduler: Broadcasting large task binary with size 1559.5 KiB
24/11/16 19:55:04 WARN DAGScheduler: Broadcasting large task binary with size 1564.1 KiB
24/11/16 19:55:05 WARN DAGScheduler: Broadcasting large task binary with size 1573.1 KiB
24/11/16 19:55:06 WARN DAGScheduler: Broadcasting large task binary with size 1591.9 KiB
24/11/16 19:55:07 WARN DAGScheduler: Broadcasting large task binary with size 1628.7 KiB
24/11/16 19:55:08 WARN DAGScheduler: Broadcasting large task binary with size 1639.4 KiB
24/11/16 19:55:10 WARN DAGScheduler: Broadcasting large task binary with size 1639.9 KiB
24/11/16 19:55:11 WARN DAGScheduler: Broadcasting large task binary with size 1640.4 KiB
24/11/16 19:55:12 WARN DAGScheduler: Broadcasting large task binary with size 1641.6 KiB
24/11/16 19:55:13 WARN DAGScheduler: Broadcasting large task binary with size 1643.8 KiB
24/11/16 19:55:14 WAR

24/11/16 19:58:30 WARN DAGScheduler: Broadcasting large task binary with size 1641.6 KiB
24/11/16 19:58:31 WARN DAGScheduler: Broadcasting large task binary with size 1643.8 KiB
24/11/16 19:58:31 WARN DAGScheduler: Broadcasting large task binary with size 1648.3 KiB
24/11/16 19:58:32 WARN DAGScheduler: Broadcasting large task binary with size 1657.4 KiB
24/11/16 19:58:33 WARN DAGScheduler: Broadcasting large task binary with size 1676.2 KiB
24/11/16 19:58:34 WARN DAGScheduler: Broadcasting large task binary with size 1712.9 KiB
24/11/16 19:58:35 WARN DAGScheduler: Broadcasting large task binary with size 1723.8 KiB
24/11/16 19:58:37 WARN DAGScheduler: Broadcasting large task binary with size 1724.2 KiB
24/11/16 19:58:39 WARN DAGScheduler: Broadcasting large task binary with size 1724.8 KiB
24/11/16 19:58:39 WARN DAGScheduler: Broadcasting large task binary with size 1725.9 KiB
24/11/16 19:58:40 WARN DAGScheduler: Broadcasting large task binary with size 1728.2 KiB
24/11/16 19:58:41 WAR

24/11/16 20:00:16 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
24/11/16 20:00:17 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
24/11/16 20:00:18 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
24/11/16 20:00:19 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
24/11/16 20:00:22 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
24/11/16 20:00:23 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
24/11/16 20:00:24 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
24/11/16 20:00:24 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
24/11/16 20:00:25 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
24/11/16 20:00:26 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
24/11/16 20:00:27 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
24/11/16 20:00:29 WARN DAGScheduler: Broadcasting larg

24/11/16 20:03:43 WARN DAGScheduler: Broadcasting large task binary with size 1741.8 KiB
24/11/16 20:03:44 WARN DAGScheduler: Broadcasting large task binary with size 1760.5 KiB
24/11/16 20:03:45 WARN DAGScheduler: Broadcasting large task binary with size 1796.8 KiB
24/11/16 20:03:46 WARN DAGScheduler: Broadcasting large task binary with size 1807.5 KiB
24/11/16 20:03:48 WARN DAGScheduler: Broadcasting large task binary with size 1808.0 KiB
24/11/16 20:03:49 WARN DAGScheduler: Broadcasting large task binary with size 1808.6 KiB
24/11/16 20:03:50 WARN DAGScheduler: Broadcasting large task binary with size 1809.7 KiB
24/11/16 20:03:51 WARN DAGScheduler: Broadcasting large task binary with size 1812.0 KiB
24/11/16 20:03:52 WARN DAGScheduler: Broadcasting large task binary with size 1816.5 KiB
24/11/16 20:03:52 WARN DAGScheduler: Broadcasting large task binary with size 1825.6 KiB
24/11/16 20:03:53 WARN DAGScheduler: Broadcasting large task binary with size 1844.3 KiB
24/11/16 20:03:55 WAR

24/11/16 20:05:31 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
24/11/16 20:05:32 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
24/11/16 20:05:33 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
24/11/16 20:05:35 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
24/11/16 20:05:36 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
24/11/16 20:05:37 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
24/11/16 20:05:38 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
24/11/16 20:05:39 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB
24/11/16 20:05:40 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB
24/11/16 20:05:43 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB
24/11/16 20:05:44 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB
24/11/16 20:05:44 WARN DAGScheduler: Broadcasting larg

24/11/16 20:10:18 WARN DAGScheduler: Broadcasting large task binary with size 1045.7 KiB
24/11/16 20:10:19 WARN DAGScheduler: Broadcasting large task binary with size 1054.7 KiB
24/11/16 20:10:20 WARN DAGScheduler: Broadcasting large task binary with size 1073.5 KiB
24/11/16 20:10:22 WARN DAGScheduler: Broadcasting large task binary with size 1110.3 KiB
24/11/16 20:10:23 WARN DAGScheduler: Broadcasting large task binary with size 1121.3 KiB
24/11/16 20:10:27 WARN DAGScheduler: Broadcasting large task binary with size 1121.8 KiB
24/11/16 20:10:28 WARN DAGScheduler: Broadcasting large task binary with size 1122.4 KiB
24/11/16 20:10:29 WARN DAGScheduler: Broadcasting large task binary with size 1123.5 KiB
24/11/16 20:10:30 WARN DAGScheduler: Broadcasting large task binary with size 1125.8 KiB
24/11/16 20:10:31 WARN DAGScheduler: Broadcasting large task binary with size 1130.3 KiB
24/11/16 20:10:32 WARN DAGScheduler: Broadcasting large task binary with size 1139.4 KiB
24/11/16 20:10:34 WAR

24/11/16 20:12:32 WARN DAGScheduler: Broadcasting large task binary with size 1918.9 KiB
24/11/16 20:12:33 WARN DAGScheduler: Broadcasting large task binary with size 1955.7 KiB
24/11/16 20:12:34 WARN DAGScheduler: Broadcasting large task binary with size 1966.7 KiB
24/11/16 20:12:38 WARN DAGScheduler: Broadcasting large task binary with size 1967.2 KiB
24/11/16 20:12:39 WARN DAGScheduler: Broadcasting large task binary with size 1967.8 KiB
24/11/16 20:12:40 WARN DAGScheduler: Broadcasting large task binary with size 1968.9 KiB
24/11/16 20:12:41 WARN DAGScheduler: Broadcasting large task binary with size 1971.2 KiB
24/11/16 20:12:42 WARN DAGScheduler: Broadcasting large task binary with size 1975.7 KiB
24/11/16 20:12:44 WARN DAGScheduler: Broadcasting large task binary with size 1984.8 KiB
24/11/16 20:12:45 WARN DAGScheduler: Broadcasting large task binary with size 2003.5 KiB
24/11/16 20:12:46 WARN DAGScheduler: Broadcasting large task binary with size 2040.3 KiB
24/11/16 20:12:48 WAR

24/11/16 20:15:01 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB
24/11/16 20:15:02 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB
24/11/16 20:15:03 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB
24/11/16 20:15:05 WARN DAGScheduler: Broadcasting large task binary with size 2.8 MiB
24/11/16 20:15:06 WARN DAGScheduler: Broadcasting large task binary with size 2.8 MiB
24/11/16 20:15:07 WARN DAGScheduler: Broadcasting large task binary with size 2.8 MiB
24/11/16 20:15:09 WARN DAGScheduler: Broadcasting large task binary with size 2.8 MiB
24/11/16 20:15:10 WARN DAGScheduler: Broadcasting large task binary with size 2.8 MiB
24/11/16 20:15:15 WARN DAGScheduler: Broadcasting large task binary with size 2.8 MiB
24/11/16 20:15:16 WARN DAGScheduler: Broadcasting large task binary with size 2.8 MiB
24/11/16 20:15:17 WARN DAGScheduler: Broadcasting large task binary with size 2.8 MiB
24/11/16 20:15:18 WARN DAGScheduler: Broadcasting larg

### 4.6 Make predictions on the train and test sets

In [20]:
ridge_train_predictions = ridge_model.transform(train_df)
gbt_train_predictions = gbt_model.transform(train_df)
rf_train_predictions = rf_model.transform(train_df)

ridge_test_predictions = ridge_model.transform(test_df)
gbt_test_predictions = gbt_model.transform(test_df)
rf_test_predictions = rf_model.transform(test_df)

# 5. Model Evaluation & Optimization

### 5.1 Calculate RMSE of model predictions on train and test sets

In [21]:
ridge_train_rmse = evaluator.evaluate(ridge_train_predictions)
gbt_train_rmse = evaluator.evaluate(gbt_train_predictions)
rf_train_rmse = evaluator.evaluate(rf_train_predictions)

ridge_test_rmse = evaluator.evaluate(ridge_test_predictions)
gbt_test_rmse = evaluator.evaluate(gbt_test_predictions)
rf_test_rmse = evaluator.evaluate(rf_test_predictions)

                                                                                

In [22]:
print(f"\nRidge Regression Train RMSE: {ridge_train_rmse}")
print(f"Gradient-Boosted Tree Train RMSE: {gbt_train_rmse}")
print(f"Random Forest Train RMSE: {rf_train_rmse}\n")

print(f"\nRidge Regression Test RMSE: {ridge_test_rmse}")
print(f"Gradient-Boosted Tree Test RMSE: {gbt_test_rmse}")
print(f"Random Forest Test RMSE: {rf_test_rmse}\n")


Ridge Regression Train RMSE: 86.09802416032562
Gradient-Boosted Tree Train RMSE: 72.59316352077796
Random Forest Train RMSE: 78.11254781302699


Ridge Regression Test RMSE: 86.09335444183954
Gradient-Boosted Tree Test RMSE: 72.69570278675752
Random Forest Test RMSE: 78.09106589460644



### 5.2 Identify the optimized hyperparameters for each model

In [25]:
ridge_best_model = ridge_model.stages[-1].bestModel
gbt_best_model = gbt_model.stages[-1].bestModel
rf_best_model = rf_model.stages[-1].bestModel

print("\nBest Hyperparameters:")

print(f"Ridge Regression: regParam = {ridge_best_model._java_obj.parent().getRegParam()}")

print(f"GBT: maxDepth = {gbt_best_model._java_obj.parent().getMaxDepth()},\
    maxIter = {gbt_best_model._java_obj.parent().getMaxIter()}")

print(f"Random Forest: numTrees = {rf_best_model._java_obj.parent().getNumTrees()},\
      maxDepth = {rf_best_model._java_obj.parent().getMaxDepth()}")


Best Hyperparameters:
Ridge Regression: regParam = 0.01
GBT: maxDepth = 9,    maxIter = 40
Random Forest: numTrees = 40,      maxDepth = 9


In [26]:
train_df.unpersist()
test_df.unpersist()

DataFrame[latitude: double, longitude: double, elevation: double, wind_direction: double, wind_speed: double, ceiling_height: double, visibility: double, air_temp: double, dew_point: double, sea_level_pressure: double]

In [27]:
# Save each trained model to GCS
ridge_model.save("gs://dsa5208-proj2/saved_models/ridge_model")
gbt_model.save("gs://dsa5208-proj2/saved_models/gbt_model")
rf_model.save("gs://dsa5208-proj2/saved_models/rf_model")

24/11/16 21:35:41 WARN TaskSetManager: Stage 12973 contains a task of very large size (1697 KiB). The maximum recommended task size is 1000 KiB.
24/11/16 21:35:56 WARN TaskSetManager: Stage 12985 contains a task of very large size (1642 KiB). The maximum recommended task size is 1000 KiB.
                                                                                