## Lab 8 - PySpark ML

<p> In this lab, you will be tasked to model the NOAA weather data to make predictions.</p>

Use the NOAA weather data to complete a machine learning prediction task. [Full documentation of the dataset here](https://www.ncei.noaa.gov/pub/data/cdo/documentation/GHCND_documentation.pdf)

In [None]:
%%configure -f
{
    "conf": {
        "spark.pyspark.python": "python3",
        "spark.pyspark.virtualenv.enabled": "true",
        "spark.pyspark.virtualenv.type":"native",
        "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv"
    }
}

In [None]:
sc.install_pypi_package("matplotlib==3.2.1")

In [None]:
import pyspark.sql.functions as F
import matplotlib.pyplot as plt

In this lab, we only use the weather data from CHICAGO_MIDWAY_AP_3SW Station.

You can read the complete Weather Station List [here](https://data.giss.nasa.gov/gistemp/station_data_v4_globe/v4.temperature.inv.txt).

In [None]:
df = spark.read.csv('s3://noaa-ghcn-pds/csv/by_station/USC00111577.csv', header=True)

In [None]:
df.printSchema()

In [None]:
df.show(10)

Check the length of the dataframe:

In [None]:
df.count()

## Data Engineering

First, convert `DATE` to datetime type:

In [None]:
df = df.withColumn("DATE", F.to_date(df["DATE"], "yyyyMMdd"))
df.filter(df["DATE"] == '1928-02-29').show()

Convert `DATA_VALUE` to integer: 

In [None]:
df = df.withColumn("DATA_VALUE", F.col("DATA_VALUE").cast("int"))

Check the data types:

In [None]:
df.dtypes

We want to re-organize the data such that each unique value in the `ELEMENT` column becomes a new column. We use groupby and pivot in PySpark to do the transformation:

In [None]:
pivot_df = df.groupBy("DATE")\
    .pivot("ELEMENT")\
    .agg(F.sum("DATA_VALUE"))

In [None]:
pivot_df.show(5)

Here in this exercise, we only use the core elements for the ML task. Feel free to explore other elements.

- PRCP = Precipitation (tenths of mm)

- SNOW = Snowfall (mm)

- SNWD = Snow depth (mm)

- TMAX = Maximum temperature (tenths of degrees C)

- TMIN = Minimum temperature (tenths of degrees C)

In [None]:
core_df = pivot_df.select(
   "DATE", "PRCP", "SNOW", "SNWD", "TMAX", "TMIN"
).withColumnRenamed("DATE", "date")\
 .withColumnRenamed("PRCP", "precip")\
 .withColumnRenamed("SNOW", "snow")\
 .withColumnRenamed("SNWD", "snow_depth")\
 .withColumnRenamed("TMAX", "temp_max")\
 .withColumnRenamed("TMIN", "temp_min")

core_df.show(5)

In [None]:
core_df.filter(core_df["date"] == '1928-02-29').show()

In [None]:
core_df.dtypes

Check for missing values:

In [None]:
null_counts = core_df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in core_df.columns])
null_counts.show()

Drop null values:

In [None]:
core_df = core_df.dropna()

Transform temperatures from tenths of degrees C to degree C:

In [None]:
# TODO

In [None]:
core_df.show(5)

Plot the temperatures across different years by taking the average of each year:

In [None]:
core_df = core_df.withColumn("year", F.year(F.col("date")))

yearly_avg_temp = core_df.groupBy("year").agg(
    F.avg(F.col("temp_max")).alias("avg_temp_max"),
    F.avg(F.col("temp_min")).alias("avg_temp_min")
)

# collect data back to driver
yearly_data = yearly_avg_temp.sort("year").collect()

In [None]:
# Plot yearly trends of temperatures
years = [row['year'] for row in yearly_data]
avg_temp_max = [row['avg_temp_max'] for row in yearly_data]
avg_temp_min = [row['avg_temp_min'] for row in yearly_data]

plt.plot(years, avg_temp_max, label='max temperature')
plt.plot(years, avg_temp_min, label='min temperature')
plt.legend()
plt.show()
%matplot plt

## Regression Modeling 

Use the 5 core values to do regression prediction on the maximum temperature

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

Train test split:

In [None]:
train_df = core_df.filter(F.col('date') <= F.to_date(F.lit('2020-12-31')))
test_df = core_df.filter((F.col('date') > F.to_date(F.lit('2020-12-31'))) & (F.col('date') <= F.to_date(F.lit('2024-05-03'))))

In [None]:
# TODO: Define the assembler to transform the `precip`, `snow`, `snow_depth`, 
# and `temp_min` to the form that can be handled by pyspark.ml. 
# Make the `outputCol` as `features`

Define the training set and testing set:

In [None]:
final_train_df = train_df.select(F.col('features'), F.col('temp_max').alias('label'))
final_test_df = test_df.select(F.col('features'), F.col('temp_max').alias('label'))

Build a Ridge Regression model (when ElasticNetParam set to 0, the model will use only L2 regularization):

In [None]:
ridge = LinearRegression(elasticNetParam=0, solver="auto")

# Define the parameter grid. Feel free to add more parameters
paramGrid = ParamGridBuilder() \
    .addGrid(ridge.regParam, [0.1, 0.01, 0.001]) \
    .addGrid(ridge.maxIter, [100, 500]) \
    .build()

# TODO: Create the evaluator with RMSE as the evaluation metric
evaluator = ...

# Setup 5-fold cross-validation
crossval = CrossValidator(estimator=ridge,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)

# TODO: Fit the model on training data 


In [None]:
# TODO: Make predictions on the testing data


# TODO: Evaluate the model
rmse = ...
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)


# TODO: Display 5 examples of predicted values and labels for comparison
