# PySpark on SageMaker Studio with EMR Cluster

This notebook shows how to run PySpark code within a SageMaker Studio notebook by using an EMR cluster for executing jobs. For this example we use the **SparkMagic - PySpark** image and kernel

In [None]:
%load_ext sagemaker_studio_analytics_extension.magics
%sm_analytics emr connect --cluster-id j-3CPACJST7EJ2M --auth-type None

## Notebook Scoped Dependencies
Notebook-scoped libraries provide you the following benefits:

* Runtime installation – You can import your favorite Python libraries from PyPI repositories and install them on your remote cluster on the fly when you need them. These libraries are instantly available to your Spark runtime environment. There is no need to restart the notebook session or recreate your cluster.
* Dependency isolation – The libraries you install using EMR Notebooks are isolated to your notebook session and don’t interfere with bootstrapped cluster libraries or libraries installed from other notebook sessions. These notebook-scoped libraries take precedence over bootstrapped libraries. Multiple notebook users can import their preferred version of the library and use it without dependency clashes on the same cluster.
* Portable library environment – The library package installation happens from your notebook file. This allows you to recreate the library environment when you switch the notebook to a different cluster by re-executing the notebook code. At the end of the notebook session, the libraries you install through EMR Notebooks are automatically removed from the hosting EMR cluster.

In [None]:
%%configure -f
{ "conf":{
          "spark.pyspark.python": "python3",
          "spark.pyspark.virtualenv.enabled": "true",
          "spark.pyspark.virtualenv.type":"native",
          "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv"
         }
}

***

### Upgrade pip

In this cells we are updating the pip version for installing pyarrow module

In [None]:
sc.uninstall_package("pip")

In [None]:
sc.install_pypi_package("pip")

***

In [None]:
sc.install_pypi_package("pyarrow") # install pyarrow to run vectorized UDFs

Checks python modules installed

In [None]:
sc.list_packages()

***

In [None]:
%%local
import sagemaker
from sagemaker import get_execution_role


role = get_execution_role()
sess = sagemaker.Session()

s3_bucket = ""
s3_processed_data_location = f"s3a://{s3_bucket}/data/output/" # location where spark will write the processed data for training

object_name = "LD2011_2014.csv"

s3_input_data_location = "s3://{}/data/input/{}".format(s3_bucket, object_name)
schema = "date TIMESTAMP, client STRING, value FLOAT" # source data schema

Now we have all we need to preprocess the data with spark. We'll send to spark cluster the location of the input data, the S3 location of where we'd like the output to go, and the schema information

In [None]:
%%send_to_spark -i s3_input_data_location -t str -n s3_input_data_location

In [None]:
%%send_to_spark -i s3_processed_data_location -t str -n s3_processed_data_location

In [None]:
%%send_to_spark -i schema -t str -n schema

# Data preprocessing with Apache Spark
The input dataset comes in the following format:

|    | date                | client   |   value |
|---:|:--------------------|:---------|--------:|
|  0 | 2011-01-01 00:15:00 | MT_001   |       0 |
|  1 | 2011-01-01 00:30:00 | MT_001   |       0 |
|  2 | 2011-01-01 00:45:00 | MT_001   |       0 |
|  3 | 2011-01-01 01:00:00 | MT_001   |       0 |
|  4 | 2011-01-01 01:15:00 | MT_001   |       0 |

The first column contains the timestamp of the observation in 15 min increments. The `client` column uniquely identifies each timeseries (i.e. the customer), and the `value` column provides the electricity demand at that time

For DeepAR we'll need to transform the timeseries data into a json lines format where each line contains a json object representing each client and having the following schema: <br>
`{"start": ..., "target": [0, 0, 0, 0], "dynamic_feat": [[0, 1, 1, 0]], "cat": [0, 0]}` <br>
We'll only use the `start` attribute which contains the start date for the timesries, the `target` attribute which contains the observations, and the `cat` attribute with which will encode each client as a category. DeepAR supports providing additional categorical and continuous features to improve the quality of the forecast

Here we will read the data from S3, and then use a compination of PySpark and PandasUDFs to get the data into the right format

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

import random
from pyspark.sql import SparkSession
import pyspark.sql.functions as fn
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType, StructField, ArrayType, DoubleType, StringType, IntegerType

In [None]:
schema = "date TIMESTAMP, client STRING, value FLOAT"

In [None]:
df = spark \
    .read \
    .schema(schema) \
    .options(sep =',', header=True, mode="FAILFAST", timestampFormat="yyyy-MM-dd HH:mm:ss") \
    .csv(s3_input_data_location)

In [None]:
df.show()

In [None]:
# resample from 15min intervals to one hour to speed up training
df = df \
    .groupBy(fn.date_trunc("HOUR", fn.col("date")).alias("date"), fn.col("client")) \
    .agg(fn.mean("value").alias("value"))

In [None]:
# create a dictionary to Integer encode each client
client_list = df.select("client").distinct().collect()
client_list = [rec["client"] for rec in client_list]
client_encoder = dict(zip(client_list, range(len(client_list)))) 

In [None]:
random_client_list = random.sample(client_list, 6)

random_clients_pandas_df = df \
                            .where(fn.col("client").isin(random_client_list)) \
                            .groupBy("date") \
                            .pivot("client").max().toPandas()

random_clients_pandas_df.set_index("date", inplace=True)

In [None]:
random_clients_pandas_df

Aggregating data for removing gaps. So for example if you have data that only comes in Monday to Friday (e.g. stock trading activity), we'd have to insert NaN data points to account for Saturdays and Sundays. A quick way to check if our data has any gaps is to aggregate by the day of the week. Running the commands below we can see that the difference between the count and the lowest count is 24 Hours which is ok as it just means that the last datapoint falls midweek. Also the counts match across all customers so it appears that this dataset does not have any gaps

In [None]:
weekday_counts = df \
                .withColumn("dayofweek", fn.dayofweek("date")) \
                .groupBy("client") \
                .pivot("dayofweek") \
                .count()

In [None]:
weekday_counts.show(5) # show aggregates for several clients
weekday_counts.agg(*[fn.min(col) for col in weekday_counts.columns[1:]]).show() # show minimum counts of observations across all clients
weekday_counts.agg(*[fn.max(col) for col in weekday_counts.columns[1:]]).show() # show maximum counts of observations across all clients

In [None]:
train_start_date = df.select(fn.min("date").alias("date")).collect()[0]["date"]
test_start_date = "2014-01-01"
end_date = df.select(fn.max("date").alias("date")).collect()[0]["date"]

In [None]:
# split the data into train and test set
train_data_tmp = df.where(fn.col("date") < test_start_date)
test_data_tmp = df.where(fn.col("date") >= test_start_date)

In [None]:
# pandasUDFs require an output schema. This one matches the format required for DeepAR
dataset_schema = StructType([StructField("target", ArrayType(DoubleType())),
                             StructField("cat", ArrayType(IntegerType())),
                             StructField("start", StringType())
                            ])

In [None]:
@pandas_udf(dataset_schema, PandasUDFType.GROUPED_MAP)
def prep_deep_ar(df):
    
    df = df.sort_values(by="date")
    client_name = df.loc[0, "client"]
    targets = df["value"].values.tolist()
    cat = [client_encoder[client_name]]
    start = str(df.loc[0,"date"])
    
    return pd.DataFrame([[targets, cat, start]], columns=["target", "cat", "start"])

In [None]:
# Set flag so that _SUCCESS meta files are not written to S3
# DeepAR actually skips these files anyway, but it's a good practice when using directories as inputs to algorithms
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
spark.conf.set("spark.hadoop.orc.overwrite.output.file", "true")

In [None]:
train_data = train_data_tmp.groupBy("client").apply(prep_deep_ar)
test_data = test_data_tmp.groupBy("client").apply(prep_deep_ar)