In [1]:
import numpy as np
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, format_string
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from datetime import datetime
from math import sin, cos, pi

## Initialise spark session / context

In [2]:
ss = SparkSession.builder.getOrCreate()
sc = ss.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/09 13:19:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Remove columns that contain commas

NOTE: They cause issues when trying to construct the spark RDD / DF. May need to do the same in databricks

In [3]:
data = pd.read_csv('data/sample_dataset.csv')

text_cols = [
    'preciptype', 'timezone', 'conditions', 'day_agg_preciptype', 
    'day_agg_description', 'day_agg_source', 'source', 'stations',
    'icon', '_id', 'day_agg_conditions'
]
invalid = []

for column in text_cols:
    if any(',' in val for val in data[column].values):
        invalid.append(column)

data.drop(columns=invalid).to_csv('data/sample_dataset_processed.csv', index=False)
invalid

['conditions', 'day_agg_conditions']

## Load Data into spark df

NOTE: In databricks, this will be changed to load all data in from MongoDB

In [4]:
# ========= NEED TO CHANGE WHEN ADDED TO DATABRICKS =========

df = sc.textFile('data/sample_dataset_processed.csv')
header = df.first()
df = df.filter(lambda x: x != header) \
        .map(lambda x: x.split(','))
columns = header.split(',')
df = df.toDF()
for idx, column in enumerate(df.columns):
    df = df.withColumnRenamed(column, columns[idx])

# ==========================================================

                                                                                

## Define time / date embeddings functions

In [5]:
def create_time_embedding(time, embedding_num):
    """ Function to compute time embeddings """
    hour = datetime.strptime(time, '%H:%M:%S').hour
    if embedding_num == 0:
        return round(sin(2 * pi * hour / 24), 5)
    elif embedding_num == 1:
        return round(cos(2 * pi * hour / 24), 5)
    
def create_date_embedding(date, embedding_num):
    """ Function to compute date embeddings """
    day = (datetime.strptime(date, '%Y-%m-%d') - datetime(2022, 1, 1)).days
    if embedding_num == 0:
        return round(sin(2 * pi * day / 365), 5)
    elif embedding_num == 1:
        return round(cos(2 * pi * day / 365), 5)
    
def compute_sunset_sunrise_time(time, sunrise, sunset):
    """ Function to create feature that indicates how far 
        from sunset / sunrise the observation is """
    time = datetime.strptime(time, '%H:%M:%S')
    sunrise = datetime.strptime(sunrise, '%H:%M:%S')
    sunset = datetime.strptime(sunset, '%H:%M:%S')
    time_min = time.minute + time.hour * 60
    sunrise_min = sunrise.minute + sunrise.hour * 60
    sunset_min = sunset.minute + sunset.hour * 60
    if time_min < sunrise_min or sunset_min < time_min:
        return 0.
    else:
        min_val = min(time_min - sunrise_min, sunset_min - time_min)
        range_val = sunset_min - sunrise_min
        return round(2 * min_val / range_val, 5)

## Convert function to UDF functions

In [6]:
# Converting time embedding functions to UDF 
time_emb_0_func = udf(lambda z: create_time_embedding(z, 0),DoubleType())
time_emb_1_func = udf(lambda z: create_time_embedding(z, 1),DoubleType())

# Converting date embedding functions to UDF 
date_emb_0_func = udf(lambda z: create_date_embedding(z, 0),DoubleType())
date_emb_1_func = udf(lambda z: create_date_embedding(z, 1),DoubleType())

# Convert sunset / sunrise function to UDF
sunset_sunrise_func = udf(lambda t, sr, ss: compute_sunset_sunrise_time(t, sr, ss), DoubleType())

## Apply UDF functions to create columns

In [7]:
# Create time embedding columns in spark df
df = df.withColumn('time_emb_0', time_emb_0_func(col('time')))
df = df.withColumn('time_emb_1', time_emb_1_func(col('time')))

# Create date embedding coluns in spark df
df = df.withColumn('date_emb_0', date_emb_0_func(col('date')))
df = df.withColumn('date_emb_1', date_emb_1_func(col('date')))

# Create sunrise / sunset column in spark df
df = df.withColumn(
    'time_sunset_sunrise', 
    sunset_sunrise_func(
        col('time'),
        col('day_agg_sunrise'), 
        col('day_agg_sunset')
    )
)

## Join in lat / lon PCA values

In [8]:
# Load in lat / lon PCA values
df_lat_lon = ss.read.csv("data/PCA_lat_lon.csv", header=True, inferSchema=True)
df_lat_lon = df_lat_lon.withColumnRenamed('lat', 'lat_pca_df')
df_lat_lon = df_lat_lon.withColumnRenamed('lon', 'lon_pca_df')

# Join PCA values into main df
df = df.join(df_lat_lon, df.lat == df_lat_lon.lat_pca_df)

## Remove columns and convert to correct data types

In [9]:
schema = StructType([
    StructField("temp", DoubleType() ,True),
    StructField("humidity", DoubleType(), True),
    StructField("dew", DoubleType(), True),
    StructField("precip", DoubleType(), True),
    StructField("windgust", DoubleType(), True),
    StructField("windspeed", DoubleType(), True),
    StructField("pressure", DoubleType(), True),
    StructField("visibility", DoubleType(), True),
    StructField("cloud_cover_perc", DoubleType(), True),
    StructField("lat", DoubleType(), True),
    StructField("lon", DoubleType(), True),
    StructField("time_emb_0", DoubleType(), True),
    StructField("time_emb_1", DoubleType(), True),
    StructField("date_emb_0", DoubleType(), True),
    StructField("date_emb_1", DoubleType(), True),
    StructField("time_sunset_sunrise", DoubleType(), True),
    StructField("PC1", DoubleType(), True),
    StructField("PC2", DoubleType(), True),
    StructField("solar_radiation", DoubleType(), True)
])

# Only include columns listed in the schema and convert all cols to double
df = df.select(*[value.name for value in schema])
for column in df.columns:
    df = df.withColumn(column, col(column).cast('double'))

## Process dataframe ready for modelling

In [10]:
# Rename solar_radiation column as target
df = df.withColumnRenamed("solar_radiation", "label")

# Vectorise the features into single column
va = VectorAssembler(outputCol="features", inputCols=df.columns[0:-1])
df = va.transform(df).select("features", "label")

# Split the full df into train / test
df_sets = df.randomSplit([0.6, 0.4], 1)
df_train = df_sets[0].cache()
df_test = df_sets[1].cache()

23/03/09 13:20:29 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [11]:
df_train.show(10)

[Stage 5:>                                                          (0 + 1) / 1]

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(18,[0,1,2,5,9,10...|  0.0|
|[-10.7,59.81,-17....|  0.0|
|[-10.5,59.86,-16....| 67.8|
|[-7.0,43.31,-17.4...| 55.0|
|[-6.6,76.95,-10.0...|  1.0|
|[-6.6,84.07,-8.9,...|  1.0|
|[-6.1,47.0,-15.6,...|  0.0|
|[-6.1,74.06,-10.0...|  1.0|
|[-5.7,70.23,-10.3...|  5.0|
|[-5.2,52.58,-13.4...|  6.0|
+--------------------+-----+
only showing top 10 rows



                                                                                

## Train random forest regressor model

In [12]:
rf = RandomForestRegressor(maxDepth=20)
rf.fit(df_train)

                                                                                

23/03/09 13:21:15 WARN DAGScheduler: Broadcasting large task binary with size 1722.2 KiB


                                                                                

23/03/09 13:21:17 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB


                                                                                

23/03/09 13:21:19 WARN DAGScheduler: Broadcasting large task binary with size 5.8 MiB


[Stage 30:>                                                         (0 + 2) / 2]

23/03/09 13:21:20 WARN DAGScheduler: Broadcasting large task binary with size 1543.9 KiB


                                                                                

23/03/09 13:21:22 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB


[Stage 32:>                                                         (0 + 2) / 2]

23/03/09 13:21:23 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB


                                                                                

23/03/09 13:21:26 WARN DAGScheduler: Broadcasting large task binary with size 17.3 MiB




23/03/09 13:21:28 WARN DAGScheduler: Broadcasting large task binary with size 3.6 MiB


                                                                                

23/03/09 13:21:31 WARN DAGScheduler: Broadcasting large task binary with size 27.5 MiB




23/03/09 13:21:34 WARN DAGScheduler: Broadcasting large task binary with size 5.0 MiB


[Stage 38:>                                                         (0 + 0) / 2]

23/03/09 13:21:39 WARN DAGScheduler: Broadcasting large task binary with size 39.0 MiB


Exception in thread "refresh progress" java.lang.OutOfMemoryError: Java heap space
Exception in thread "Spark Context Cleaner" java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.ContextCleaner$$Lambda$774/0x000000080071e840.get$Lambda(Unknown Source)
	at java.base/java.lang.invoke.DirectMethodHandle$Holder.invokeStatic(DirectMethodHandle$Holder)
	at java.base/java.lang.invoke.Invokers$Holder.linkToTargetMethod(Invokers$Holder)
	at org.apache.spark.ContextCleaner.$anonfun$keepCleaning$1(ContextCleaner.scala:195)
	at org.apache.spark.ContextCleaner$$Lambda$721/0x00000008006ec840.apply$mcV$sp(Unknown Source)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1446)
	at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:189)
	at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:79)


23/03/09 13:21:43 ERROR Utils: uncaught error in thread Spark Context Cleaner, stopping SparkContext
java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.ContextCleaner$$Lambda$774/0x000000080071e840.get$Lambda(Unknown Source)
	at java.base/java.lang.invoke.DirectMethodHandle$Holder.invokeStatic(DirectMethodHandle$Holder)
	at java.base/java.lang.invoke.Invokers$Holder.linkToTargetMethod(Invokers$Holder)
	at org.apache.spark.ContextCleaner.$anonfun$keepCleaning$1(ContextCleaner.scala:195)
	at org.apache.spark.ContextCleaner$$Lambda$721/0x00000008006ec840.apply$mcV$sp(Unknown Source)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1446)
	at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:189)
	at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:79)
23/03/09 13:21:43 ERROR Utils: throw uncaught fatal error in thread Spark Context Cleaner
java.lang.OutOfMemoryError: Java heap space
	at org.apa

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 63694)
Traceback (most recent call last):
  File "/opt/anaconda3/envs/DistDataSys/lib/python3.10/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/anaconda3/envs/DistDataSys/lib/python3.10/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/opt/anaconda3/envs/DistDataSys/lib/python3.10/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/anaconda3/envs/DistDataSys/lib/python3.10/socketserver.py", line 747, in __init__
    self.handle()
  File "/opt/anaconda3/envs/DistDataSys/lib/python3.10/site-packages/pyspark/accumulators.py", line 281, in handle
    poll(accum_updates)
  File "/opt/anaconda3/envs/DistDataSys/lib/python3.10/site-packages/pyspark/accumulators.py", line 253, in poll
    if func()

ConnectionRefusedError: [Errno 61] Connection refused

In [None]:
# split into train/test
# train_df, test_df = df.randomSplit([0.8, 0.2])

In [None]:
# fit model
# from pyspark.ml.regression import RandomForestRegressor
# rf = RandomForestRegressor(maxDepth=20)
# rfmodel = rf.fit(train_df)

In [None]:
# get parameters
# numTrees = rfmodel.getNumTrees
# maxDepth = rfmodel.getOrDefault('maxDepth')
# rfmodel.featureImportances

# # print feature importance values in descending order
# feat_imp_dict = dict(zip(train_df.columns, feat_imp))
# for name, importance in sorted(feat_imp_dict.items(), key=lambda x: x[1], reverse=True):
#     print(f"{name}: {importance}")

In [None]:
# evaluate
# predictions = rfModel.transform(test_df)

# # compute RMSE -- can change to mae if we want MAE instead
# rmse_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="rmse")
# rmse = rmse_evaluator.evaluate(predictions)

# # compute R-squared
# r2_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="r2")
# r2 = r2_evaluator.evaluate(predictions)