## Lab 8 - PySpark ML

In this lab, you are tasked with training a Machine Learning model to predict the maximum temperature at Midway Airport (Chicago) for any given date. We'll be using [NOAA weather data](https://www.ncei.noaa.gov/pub/data/cdo/documentation/GHCND_documentation.pdf) that is hosted in S3 to perform this task.

In [None]:
%%configure -f
{
    "conf": {
        "spark.pyspark.python": "python3",
        "spark.pyspark.virtualenv.enabled": "true",
        "spark.pyspark.virtualenv.type": "native",
        "spark.pyspark.virtualenv.bin.path": "/usr/bin/virtualenv"
    }
}

In this lab, we only use the weather data from CHICAGO_MIDWAY_AP_3SW Station.

You can read the complete Weather Station List [here](https://data.giss.nasa.gov/gistemp/station_data_v4_globe/v4.temperature.inv.txt).

In [3]:
df = spark.read.csv('s3://noaa-ghcn-pds/csv/by_station/USC00111577.csv', header=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- ID: string (nullable = true)
 |-- DATE: string (nullable = true)
 |-- ELEMENT: string (nullable = true)
 |-- DATA_VALUE: string (nullable = true)
 |-- M_FLAG: string (nullable = true)
 |-- Q_FLAG: string (nullable = true)
 |-- S_FLAG: string (nullable = true)
 |-- OBS_TIME: string (nullable = true)

In [5]:
df.show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------+-------+----------+------+------+------+--------+
|         ID|    DATE|ELEMENT|DATA_VALUE|M_FLAG|Q_FLAG|S_FLAG|OBS_TIME|
+-----------+--------+-------+----------+------+------+------+--------+
|USC00111577|19280229|   TMAX|        39|  null|  null|     0|    null|
|USC00111577|19280229|   TMIN|       -28|  null|  null|     0|    null|
|USC00111577|19280229|   PRCP|         0|     T|  null|     0|    null|
|USC00111577|19280229|   SNOW|         0|     T|  null|     0|    null|
|USC00111577|19280229|   SNWD|         0|  null|  null|     0|    null|
|USC00111577|19280301|   TMAX|        22|  null|  null|     0|    null|
|USC00111577|19280302|   TMAX|        56|  null|  null|     0|    null|
|USC00111577|19280303|   TMAX|        28|  null|  null|     0|    null|
|USC00111577|19280304|   TMAX|        50|  null|  null|     0|    null|
|USC00111577|19280305|   TMAX|       -22|  null|  null|     0|    null|
+-----------+--------+-------+----------+------+------+------+--

Check the length of the dataframe:

In [6]:
df.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

277672

## Feature Engineering

First, convert `DATE` to datetime type:

In [None]:
import pyspark.sql.functions as F

In [7]:
df = df.withColumn("DATE", F.to_date(df["DATE"], "yyyyMMdd"))
df.filter(df["DATE"] == '1928-02-29').show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+----------+-------+----------+------+------+------+--------+
|         ID|      DATE|ELEMENT|DATA_VALUE|M_FLAG|Q_FLAG|S_FLAG|OBS_TIME|
+-----------+----------+-------+----------+------+------+------+--------+
|USC00111577|1928-02-29|   TMAX|        39|  null|  null|     0|    null|
|USC00111577|1928-02-29|   TMIN|       -28|  null|  null|     0|    null|
|USC00111577|1928-02-29|   PRCP|         0|     T|  null|     0|    null|
|USC00111577|1928-02-29|   SNOW|         0|     T|  null|     0|    null|
|USC00111577|1928-02-29|   SNWD|         0|  null|  null|     0|    null|
+-----------+----------+-------+----------+------+------+------+--------+

Convert `DATA_VALUE` to integer: 

In [8]:
df = df.withColumn("DATA_VALUE", F.col("DATA_VALUE").cast("int"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Check the data types:

In [9]:
df.dtypes

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('ID', 'string'), ('DATE', 'date'), ('ELEMENT', 'string'), ('DATA_VALUE', 'int'), ('M_FLAG', 'string'), ('Q_FLAG', 'string'), ('S_FLAG', 'string'), ('OBS_TIME', 'string')]

We want to re-organize the data such that each unique value in the `ELEMENT` column becomes a new column. We use groupby and pivot in PySpark to do the transformation:

In [10]:
pivot_df = df.groupBy("DATE")\
             .pivot("ELEMENT")\
             .agg(F.sum("DATA_VALUE"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
pivot_df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|      DATE|ACMH|ACSH|DAPR|DASF|MDPR|MDSF|PGTM|PRCP|PSUN|SNOW|SNWD|TMAX|TMIN|TOBS|TSUN|WDFG|WDFM|WESD|WSFG|WSFM|WT01|WT02|WT03|WT04|WT05|WT06|WT07|WT08|WT09|WT11|WT14|WT16|WT18|
+----------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|1999-02-01|null|null|null|null|null|null|null|  13|null|   0|   0|  78|  17|  56|null|null|null|null|null|null|   1|null|null|null|null|null|null|null|null|null|null|null|null|
|1997-12-11|null|null|null|null|null|null|null|   5|null|   3|  25|  22|   0|   0|null|null|null|null|null|null|   1|null|null|null|null|null|null|null|null|null|null|null|null|
|1975-10-24|  90|  80|null|null|null|null|1500| 307|  42|   0|   0| 256|  72|null| 276| 225| 225|null| 190| 11

Here in this exercise, we only use a couple of these elements for the ML task. Feel free to explore other elements.

- DATE = YYYY-MM-DD

- PRCP = Precipitation (tenths of mm)

- SNOW = Snowfall (mm)

- SNWD = Snow depth (mm)

- TMAX = Maximum temperature (tenths of degrees C)

- TMIN = Minimum temperature (tenths of degrees C)

In [12]:
core_df = pivot_df.select("DATE", "PRCP", "SNOW", "SNWD", "TMAX", "TMIN") \
                  .withColumnRenamed("DATE", "date")\
                  .withColumnRenamed("PRCP", "precip")\
                  .withColumnRenamed("SNOW", "snow")\
                  .withColumnRenamed("SNWD", "snow_depth")\
                  .withColumnRenamed("TMAX", "temp_max")\
                  .withColumnRenamed("TMIN", "temp_min")

core_df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------+----+----------+--------+--------+
|      date|precip|snow|snow_depth|temp_max|temp_min|
+----------+------+----+----------+--------+--------+
|2000-07-03|   249|   0|         0|     244|     189|
|1973-09-04|    76|   0|         0|     283|     211|
|1976-06-24|    58|   0|         0|     233|     178|
|1972-12-24|     3|   0|        25|      33|      -6|
|1981-03-25|     0|   0|         0|     144|     -44|
+----------+------+----+----------+--------+--------+
only showing top 5 rows

In [13]:
core_df.filter(core_df["date"] == '1928-02-29').show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------+----+----------+--------+--------+
|      date|precip|snow|snow_depth|temp_max|temp_min|
+----------+------+----+----------+--------+--------+
|1928-02-29|     0|   0|         0|      39|     -28|
+----------+------+----+----------+--------+--------+

In [14]:
core_df.dtypes

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('date', 'date'), ('precip', 'bigint'), ('snow', 'bigint'), ('snow_depth', 'bigint'), ('temp_max', 'bigint'), ('temp_min', 'bigint')]

Check for missing values:

In [15]:
null_counts = core_df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in core_df.columns])
null_counts.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+------+----+----------+--------+--------+
|date|precip|snow|snow_depth|temp_max|temp_min|
+----+------+----+----------+--------+--------+
|   0|    34| 134|       135|      21|      28|
+----+------+----+----------+--------+--------+

Drop null values:

In [16]:
core_df = core_df.dropna()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Transform temperatures from tenths of degrees C to degree C:

In [17]:
# TODO


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Let's also create another column that identifies the month that a recording appears in. This may be helpful for predicting the maximum temperature for the day as well.

In [25]:
core_df = core_df.withColumn('month', F.month('date'))
core_df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------+----+----------+--------+--------+-----+
|      date|precip|snow|snow_depth|temp_max|temp_min|month|
+----------+------+----+----------+--------+--------+-----+
|1968-06-15|    13|   0|         0|    23.3|    13.3|    6|
|1958-08-15|   137|   0|         0|    27.8|    19.4|    8|
|1946-02-25|     0|   0|         0|     3.9|    -4.4|    2|
|1972-12-24|     3|   0|        25|     3.3|    -0.6|   12|
|1976-06-24|    58|   0|         0|    23.3|    17.8|    6|
|1970-05-29|     0|   0|         0|    30.6|    19.4|    5|
|1974-05-11|    28|   0|         0|    19.4|    11.1|    5|
|1971-05-12|     0|   0|         0|    12.8|     0.6|    5|
|1995-12-01|     0|   0|         0|    10.6|     0.0|   12|
|1998-05-16|     0|   0|         0|    28.3|    18.3|    5|
|1941-06-05|     0|   0|         0|    28.3|    12.2|    6|
|1971-06-25|    18|   0|         0|    28.3|    15.0|    6|
|1952-06-15|     0|   0|         0|    35.0|    21.1|    6|
|1967-05-10|   147|   0|         0|    1

We can then use PySpark's OneHotEncoder class to engineer a `month_dummy` feature that we can include in our model based on the month recorded in `month`.

In [26]:
from pyspark.ml.feature import OneHotEncoder

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [40]:
# TODO: Use OneHotEncoder to transform `month` in `core_df` into a
# one-hot encoded column called `month_dummy`


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Regression Modeling 

Let's use the features above (i.e. `precip`, `snow`, `snow_depth`, `temp_min`, and `month_dummy`) to to predict the max temperature for a given day.

In [33]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Train test split:

In [34]:
train_df = core_df.filter(
    F.col('date') <= F.to_date(F.lit('2020-12-31'))
)
test_df = core_df.filter(
      (F.col('date') > F.to_date(F.lit('2020-12-31')))
    & (F.col('date') <= F.to_date(F.lit('2024-05-03')))
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [41]:
# TODO: Use the VectorAssembler class to convert `precip`, `snow`, 
# `snow_depth`, `temp_min`, and `month_dummy` into the form that can
# be use to train an ML model (you should transform both test and train data)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [42]:
# TODO: Train a Linear Regression model to predict `temp_max` based on
# features assembled above

# TODO: Summarize trained model by printing out coefficients


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [43]:
# TODO: Evaluate model performance on test data using RMSE:


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…