# Notebook #2

In this notebook, we worked with the result dataset from Notebook #1 and computed rolling statistics (mean, difference, std, max, min) for a list of features over various time windows.  
This was the most time consuming and computational expensive part of the entire tutorial. We encountered some roadblocks and found some workarounds. Please see below for more details.

## Outline

- [Define Rolling Features and Window Sizes](#Define-list-of-features-for-rolling-compute,-window-sizes)
- [Issues and Solutions](#What-issues-we-encountered-using-Pyspark-and-how-we-solved-them?)
- [Rolling Compute](#Rolling-Compute)
  - [Rolling Mean](#Rolling-Mean)
  - [Rolling Difference](#Rolling-Difference)
  - [Rolling Std](#Rolling-Std)
  - [Rolling Max](#Rolling-Max)
  - [Rolling Min](#Rolling-Min)
- [Join Results](#Join-result-dataset-from-the-five-rolling-compute-cells:)
  


In [None]:
import subprocess
import sys
import os
import re
import time
import atexit
import seaborn as sns
import matplotlib.pyplot as plt

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import concat, col, udf, lag, date_add, explode, lit, unix_timestamp
from pyspark.sql.functions import month, weekofyear, dayofmonth
from pyspark.sql.types import *
from pyspark.sql.types import DateType
from pyspark.sql.dataframe import *
from pyspark.sql.window import Window
from pyspark.sql import Row
from pyspark.ml.classification import *
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, VectorIndexer
from pyspark.ml.feature import StandardScaler, PCA, RFormula
from pyspark.ml import Pipeline, PipelineModel

start_time = time.time()
base_path = "/home/jovyan/data/"
suffix = "_1000"

spark = ( SparkSession
  .builder
  .master("local[30]")
  .appName("pm")
  .getOrCreate()
        )

spark.conf.set("spark.executor.memory", "Xg")
spark.conf.set("spark.driver.memory", "Xg")

spark

## Define list of features for rolling compute, window sizes

In [None]:
rolling_features = [
    'warn_type1_total', 'warn_type2_total', 
    'pca_1_warn','pca_2_warn', 'pca_3_warn', 'pca_4_warn', 'pca_5_warn',
    'pca_6_warn','pca_7_warn', 'pca_8_warn', 'pca_9_warn', 'pca_10_warn',
    'pca_11_warn','pca_12_warn', 'pca_13_warn', 'pca_14_warn', 'pca_15_warn',
    'pca_16_warn','pca_17_warn', 'pca_18_warn', 'pca_19_warn', 'pca_20_warn',
    'problem_type_1', 'problem_type_2', 'problem_type_3','problem_type_4',
    'problem_type_1_per_usage1','problem_type_2_per_usage1',
    'problem_type_3_per_usage1','problem_type_4_per_usage1',
    'problem_type_1_per_usage2','problem_type_2_per_usage2',
    'problem_type_3_per_usage2','problem_type_4_per_usage2',                
    'fault_code_type_1_count', 'fault_code_type_2_count', 'fault_code_type_3_count', 'fault_code_type_4_count',                          
    'fault_code_type_1_count_per_usage1','fault_code_type_2_count_per_usage1',
    'fault_code_type_3_count_per_usage1', 'fault_code_type_4_count_per_usage1',
    'fault_code_type_1_count_per_usage2','fault_code_type_2_count_per_usage2',
    'fault_code_type_3_count_per_usage2', 'fault_code_type_4_count_per_usage2']
               
# lag window 3, 7, 14, 30, 90 days
lags = [3, 7, 14, 30, 90]

print(len(rolling_features))


## What issues we encountered using Pyspark and how we solved them?

-  If the entire list of **46 features** and **5 time windows** were computed for **5 different types of rolling** (mean, difference, std, max, min) all in one go, we always ran into "StackOverFlow" error. 
-  It was because the lineage was too long and Spark could not handle it.
-  We could either create checkPoint and materialize it throughout the process.
-  OR break the workload into chunks and save the result from each chunk as parquet file.

## A few things we found helpful:
-  Before the rolling compute, save the upstream work as a parquet file in Notebook_1 ("Notebook_1_DataCleansing_FeatureEngineering"). It will speed up the whole process because we no need to repeat all the previous steps. It will also help reduce the lineage.
-  Print out the lag and feature name to track progress.
-  Use "htop" command from the terminal to keep track how many CPUs are running for a particular task. For rolling compute, we were considering two potential approaches: 1) Use Spark clusters on HDInsight to perform rolling compute in parallel; 2) Use single node Spark on a powerful VM. By looking at htop dashboard, we saw all the 32 cores were running at the same time for a single task (for example compute rolling mean). So if say we divide the workload onto multiple nodes and each node runs a type of rolling compute, the amount of time taken will be comparable with running everything in a sequential manner on a single node Spark on a powerful machine.
-  Use "%%time" for each cell to get an estimate of the total run time, we will then have a better idea where and what to optimze the process.
-  Materialize the intermediate results by either caching in memory or writing as parquet files. We chose to save as parquet files because we did not want to repeat the compute again in case cache() did not work or any part of the rolling compute did not work.
-  Why parquet? There are many reasons, just to name a few: parquet not only saves the data but also the schema, it is a preferred file format by Spark, you are allowed to read only the data you need, etc..

<br>


## Rolling Compute
### Rolling Mean

In [None]:
%%time

# Import data from Azure Blob Storage
df = spark.read.parquet(base_path + 'tmp/notebook1_result' + suffix + '.parquet')

for lag_n in lags:
    wSpec = Window.partitionBy('deviceid').orderBy('date').rowsBetween(1-lag_n, 0)
    for col_name in rolling_features:
        df = df.withColumn(col_name+'_rollingmean_'+str(lag_n), F.avg(col(col_name)).over(wSpec))
        print("Lag = %d, Column = %s" % (lag_n, col_name))

# Save the intermediate result for downstream work
df.write.mode('overwrite').parquet(base_path + 'tmp/data_rollingmean' + suffix + '.parquet')


### Rolling Difference

In [None]:
%%time

# Load result dataset from Notebook #1
df = spark.read.parquet(base_path + 'tmp/notebook1_result' + suffix + '.parquet')

for lag_n in lags:
    wSpec = Window.partitionBy('deviceid').orderBy('date').rowsBetween(1-lag_n, 0)
    for col_name in rolling_features:
        df = df.withColumn(col_name+'_rollingdiff_'+str(lag_n), col(col_name)-F.avg(col(col_name)).over(wSpec))
        print("Lag = %d, Column = %s" % (lag_n, col_name))

rollingdiff = df.select(['key'] + list(s for s in df.columns if "rollingdiff" in s))

# Save the intermediate result for downstream work
rollingdiff.write.mode('overwrite').parquet(base_path + 'tmp/rollingdiff' + suffix + '.parquet')


### Rolling Std

In [None]:
%%time

# Load result dataset from Notebook #1
df = spark.read.parquet(base_path + 'tmp/notebook1_result' + suffix + '.parquet')

for lag_n in lags:
    wSpec = Window.partitionBy('deviceid').orderBy('date').rowsBetween(1-lag_n, 0)
    for col_name in rolling_features:
        df = df.withColumn(col_name+'_rollingstd_'+str(lag_n), F.stddev(col(col_name)).over(wSpec))
        print("Lag = %d, Column = %s" % (lag_n, col_name))

# There are some missing values for rollingstd features
rollingstd_features = list(s for s in df.columns if "rollingstd" in s)
df = df.fillna(0, subset=rollingstd_features)
rollingstd = df.select(['key'] + list(s for s in df.columns if "rollingstd" in s))

# Save the intermediate result for downstream work
rollingstd.write.mode('overwrite').parquet(base_path + 'tmp/rollingstd' + suffix + '.parquet')


### Rolling Max

In [None]:
%%time

# Load result dataset from Notebook #1
df = spark.read.parquet(base_path + 'tmp/notebook1_result' + suffix + '.parquet')

for lag_n in lags:
    wSpec = Window.partitionBy('deviceid').orderBy('date').rowsBetween(1-lag_n, 0)
    for col_name in rolling_features:
        df = df.withColumn(col_name+'_rollingmax_'+str(lag_n), F.max(col(col_name)).over(wSpec))
        print("Lag = %d, Column = %s" % (lag_n, col_name))

rollingmax = df.select(['key'] + list(s for s in df.columns if "rollingmax" in s))

# Save the intermediate result for downstream work
rollingmax.write.mode('overwrite').parquet(base_path + 'tmp/rollingmax' + suffix + '.parquet')


### Rolling Min

In [None]:
%%time

# Load result dataset from Notebook #1
df = spark.read.parquet(base_path + 'tmp/notebook1_result' + suffix + '.parquet')

for lag_n in lags:
    wSpec = Window.partitionBy('deviceid').orderBy('date').rowsBetween(1-lag_n, 0)
    for col_name in rolling_features:
        df = df.withColumn(col_name+'_rollingmin_'+str(lag_n), F.min(col(col_name)).over(wSpec))
        print("Lag = %d, Column = %s" % (lag_n, col_name))

rollingmin = df.select(['key'] + list(s for s in df.columns if "rollingmin" in s))

# Save the intermediate result for downstream work
rollingmin.write.mode('overwrite').parquet(base_path + 'tmp/rollingmin' + suffix + '.parquet')


## Join result dataset from the five rolling compute cells:
-  Join in Spark is usually very slow, it is better to reduce the number of partitions before the join.
-  Check the number of partitions of the pyspark dataframe.
-  **repartition vs coalesce**. If we only want to reduce the number of partitions, it is better to use coalesce because repartition involves reshuffling which is computational more expensive and takes more time.
<br>


In [None]:
# Import result dataset 
rollingmean = spark.read.parquet(base_path + 'tmp/data_rollingmean' + suffix + '.parquet')
rollingdiff = spark.read.parquet(base_path + 'tmp/rollingdiff' + suffix + '.parquet')
rollingstd = spark.read.parquet(base_path + 'tmp/rollingstd' + suffix + '.parquet')
rollingmax = spark.read.parquet(base_path + 'tmp/rollingmax' + suffix + '.parquet')
rollingmin = spark.read.parquet(base_path + 'tmp/rollingmin' + suffix + '.parquet')

# Check the number of partitions for each dataset
print(rollingmean.rdd.getNumPartitions())
print(rollingdiff.rdd.getNumPartitions())
print(rollingstd.rdd.getNumPartitions())
print(rollingmax.rdd.getNumPartitions())
print(rollingmin.rdd.getNumPartitions())


In [None]:
%%time

# To make join faster, reduce the number of partitions (not necessarily to "1")
rollingmean = rollingmean.coalesce(1)
rollingdiff = rollingdiff.coalesce(1)
rollingstd = rollingstd.coalesce(1)
rollingmax = rollingmax.coalesce(1)
rollingmin = rollingmin.coalesce(1)

rolling_result = rollingmean.join(rollingdiff, 'key', 'inner')\
                 .join(rollingstd, 'key', 'inner')\
                 .join(rollingmax, 'key', 'inner')\
                 .join(rollingmin, 'key', 'inner')
            

## Write the final result as parquet file for downstream work in Notebook_3
rolling_result.write.mode('overwrite').parquet(base_path + 'tmp/notebook2_result' + suffix + '.parquet')
