# 5. Implementation of Scalable Demand Forecasting with PySpark in Google Colab
Similar to setting up Prophet, PySpark installation can be very difficult at times. However, those tasks are extremely easy Google Colaboratory. 

First, go to <a href = "https://research.google.com/colaboratory">Google Colab</a> and click "File" -> "New notebook" to create a new notebook.

### 5.1. Preparation
#### 5.1.1. Mount to Google Drive
For easy access to files, connect the notebook to your Google Drive.

In [18]:
# Import library
from google.colab import drive

# Connect to your google drive
drive.mount('/content/drive')

Mounted at /content/drive
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


#### 5.1.2. Install PySpark and Prophet
Installing PySpark and Prophet only require one line of code for each.

In [19]:
# Install Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Download Spark
!wget -q https://dlcdn.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz

# Unzip
!tar xf spark-3.0.3-bin-hadoop2.7.tgz

# Install spark
!pip install -q findspark

# Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"

#
import findspark
findspark.init()

In [3]:
# Install Prophet                                                                                                                                                                                                  
!pip install Prophet

Collecting Prophet
  Downloading prophet-1.0.1.tar.gz (65 kB)
[K     |████████████████████████████████| 65 kB 2.0 MB/s 
Collecting cmdstanpy==0.9.68
  Downloading cmdstanpy-0.9.68-py3-none-any.whl (49 kB)
[K     |████████████████████████████████| 49 kB 4.9 MB/s 
Collecting ujson
  Downloading ujson-5.2.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (45 kB)
[K     |████████████████████████████████| 45 kB 3.0 MB/s 
Building wheels for collected packages: Prophet
  Building wheel for Prophet (setup.py) ... [?25l[?25hdone
  Created wheel for Prophet: filename=prophet-1.0.1-py3-none-any.whl size=6639859 sha256=1bed5f7a5cadf1e380640aada559bd26dae318aee1afc01acd6ce38910fb499d
  Stored in directory: /root/.cache/pip/wheels/4e/a0/1a/02c9ec9e3e9de6bdbb3d769d11992a6926889d71567d6b9b67
Successfully built Prophet
Installing collected packages: ujson, cmdstanpy, Prophet
  Attempting uninstall: cmdstanpy
    Found existing installation: cmdstanpy 0.9.5
    Uninstalling cmdstanpy-0.9

#### 5.1.3. Load necessary packages

In [4]:
# Import library
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
from prophet import Prophet


#### 5.1.4. Upload the CSV file to Google Drive
- Click the folder icon in the left menu as shown in the image below.
- Although you can save anywhere you wish, I like to save it in the Google Drive Colab Notebook folder. To do so, go to "content" -> "dive" -> "MyDrive" -> Colab Notebooks -> create "data" folder
- Click the three dots next to "data". You can upload the CSV file we saved by clicking "Upload"

<img src ="https://github.com/youngdataspace/Time-Series-Forecasting-in-Spark/blob/main/Google%20Colab1.JPG?raw=true">
<img src = "https://github.com/youngdataspace/Time-Series-Forecasting-in-Spark/blob/main/Google%20Colab2.JPG?raw=true">
<img src = "https://github.com/youngdataspace/Time-Series-Forecasting-in-Spark/blob/main/Google%20Colab3.JPG?raw=true">

#### 5.1.5. Import the CSV file and explore it
Import the CSV file we just uploaded to Google Drive.


In [5]:
# Import the csv file and explore it
sales_pd = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/data/store_sales.csv')

# Convert ds to datetime
sales_pd['ds'] = pd.to_datetime(sales_pd['ds'])

# Display info
sales_pd.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 913000 entries, 0 to 912999
Data columns (total 4 columns):
 #   Column  Non-Null Count   Dtype         
---  ------  --------------   -----         
 0   ds      913000 non-null  datetime64[ns]
 1   store   913000 non-null  int64         
 2   item    913000 non-null  int64         
 3   y       913000 non-null  float64       
dtypes: datetime64[ns](1), float64(1), int64(2)
memory usage: 27.9 MB


In [6]:
# Descriptive statistics
sales_pd.describe()

Unnamed: 0,store,item,y
count,913000.0,913000.0,913000.0
mean,5.5,25.5,52.250287
std,2.872283,14.430878,28.801144
min,1.0,1.0,0.0
25%,3.0,13.0,30.0
50%,5.5,25.5,47.0
75%,8.0,38.0,70.0
max,10.0,50.0,231.0


#### 5.1.6. Increase the sample size by 10X
Check the number of unique groups.

In [7]:
# Unique store-items
sales_pd[['item', 'store']].nunique()

item     50
store    10
dtype: int64

We have 50 items and 10 stores, which means that we have 500 unique groups. Let's increase this data set by ten folds by concatenating it by itself until the number of groups reaches 5,000.

In [8]:
# Create a larger data frame
sales_pd_10k = pd.DataFrame()
for i in range(0,10):
    temp_pd = sales_pd.copy()
    ip1 = i + 1
    temp_pd['store'] = temp_pd['store'] + (10 * i)
    sales_pd_10k = pd.concat([sales_pd_10k, temp_pd])
    print('added data frame', ip1)

added data frame 1
added data frame 2
added data frame 3
added data frame 4
added data frame 5
added data frame 6
added data frame 7
added data frame 8
added data frame 9
added data frame 10


In [9]:
# Check the number of unique groups
sales_pd_10k[['item', 'store']].nunique()

item      50
store    100
dtype: int64

We now have 100 stores x 50 items = 5,000 store-item groups.

### 5.2. Prophet x PySpark
#### 5.2.1. Create a Spark session
Spark Sessions utilize Spark's functions. They are created in the Driver program, which is inside the Master node. 

Spark uses Master-Slave architecture. Salve nodes execute the tasks assigned by the Master node.

In [10]:
# Create a Spark Session - Run it on a standalone mode since it is just a practice
# master(): Either yarn or mesos; local[X] when running in standalone
# appName(): Name of the application
# getOrCreate: returns existing SparkSession; otherwise, create a new one
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4051')\
        .getOrCreate()

#### 5.2.2. Parallelization and structure schema
Reading the CSV file with PySpark.

In [11]:
# Read the csv file
sales_df = spark.createDataFrame(sales_pd_10k)

# Display the schema
sales_df.printSchema()

root
 |-- ds: timestamp (nullable = true)
 |-- store: long (nullable = true)
 |-- item: long (nullable = true)
 |-- y: double (nullable = true)



We need to partition all the data based on store and item for parallel processing.

In [12]:
# Partition the data
sales_df.createOrReplaceTempView("item_sales")
sql = "select * from item_sales"
sales_part = (spark.sql(sql)\
   .repartition(spark.sparkContext.defaultParallelism, 
   ['store', 'item'])).cache()
sales_part.explain()

== Physical Plan ==
InMemoryTableScan [ds#0, store#1L, item#2L, y#3]
   +- InMemoryRelation [ds#0, store#1L, item#2L, y#3], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- Exchange hashpartitioning(store#1L, item#2L, 1), false, [id=#11]
            +- *(1) Scan ExistingRDD[ds#0,store#1L,item#2L,y#3]




Next, we will structure the output of the data. See <a href = "https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.types.StructType.html">here</a> for different types of struct fields.


In [13]:
# Define a schema
schema = StructType([
                     StructField('store', IntegerType()),
                     StructField('item', IntegerType()),
                     StructField('ds', TimestampType()),
                     StructField('y', FloatType()),
                     StructField('yhat', DoubleType()),
                     StructField('yhat_upper', DoubleType()),
                     StructField('yhat_lower', DoubleType()),
                     ])  

#### 5.2.3. Utilize Pandas UDF and PySpark to train multiple models in parallel
The next step is to set parameters, fit the model, and predict sales just as we did for 1 forecast model. We are going to build a function and apply that function to all store-item groups. The only difference between this and our previous 1-model forecast is that we are going to utilize Pandas UDF and PySpark to parallelize the process.

In [14]:
# define the Pandas UDF 
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def apply_model(store_pd):
  
  # instantiate the model and set parameters
  model = Prophet(
      interval_width=0.95,
      growth='linear',
      daily_seasonality=False,
      weekly_seasonality=True,
      yearly_seasonality=True,
      seasonality_mode='multiplicative'
  )
  
  # fit the model to historical data
  model.fit(store_pd)
  
  # Create a data frame that lists 90 dates starting from Jan 1 2018
  future = model.make_future_dataframe(
      periods=90,
      freq='d',
      include_history=True)
  
  # Out of sample prediction
  future = model.predict(future)

  # Create a data frame that contains store, item, y, and yhat
  f_pd = future[['ds', 'yhat', 'yhat_upper', 'yhat_lower']]
  st_pd = store_pd[['ds', 'store', 'item', 'y']]
  result_pd = f_pd.join(st_pd.set_index('ds'), on='ds', how='left')
  
  # fill store and item
  result_pd['store'] = store_pd['store'].iloc[0]
  result_pd['item'] = store_pd['item'].iloc[0]
  #result_pd['store'] = store_pd['store'].fillna(method='ffill')
  #result_pd['item'] = store_pd['item'].fillna(method='ffill')
  return result_pd[['store', 'item', 'ds', 'y', 'yhat',
                    'yhat_upper', 'yhat_lower']]

In [15]:
# Apply the function to all store-items
results = sales_part.groupby(['store', 'item']).apply(apply_model)

# Print the results - calculate the time to run
import timeit
start = timeit.default_timer()
results.show()
stop = timeit.default_timer()



+-----+----+-------------------+----+------------------+------------------+--------------------+
|store|item|                 ds|   y|              yhat|        yhat_upper|          yhat_lower|
+-----+----+-------------------+----+------------------+------------------+--------------------+
|    1|   1|2013-01-01 00:00:00|13.0|10.051272869689301|18.517132295245975|   1.647939275969288|
|    1|   1|2013-01-02 00:00:00|11.0|10.528625323821489|18.945047285164023|  2.0535882242744057|
|    1|   1|2013-01-03 00:00:00|14.0|11.053264561305632|19.166267433371335|  1.9700010043874727|
|    1|   1|2013-01-04 00:00:00|13.0|12.244392640789227|20.201176449280286|   4.116689111099245|
|    1|   1|2013-01-05 00:00:00|10.0| 13.78033453999933| 22.59021233927294|    5.38647052961445|
|    1|   1|2013-01-06 00:00:00|12.0|14.378950515739104|22.840947052264926|       5.45097483763|
|    1|   1|2013-01-07 00:00:00|10.0| 7.872892467507367| 16.46605719095353|-0.18726325030012617|
|    1|   1|2013-01-08 00:00:0

In [16]:
# Print the time it took to forecast 500 models
print('Time: ', stop - start)   

Time:  69.43607786799998


It only took 29 seconds to train 500 models and forecast 3 months out!

In [17]:
results.coalesce(1)
results.createOrReplaceTempView('forecasted')
spark.sql("SELECT * FROM forecasted WHERE ITEM==1 AND STORE==1").show()

+-----+----+-------------------+----+------------------+------------------+--------------------+
|store|item|                 ds|   y|              yhat|        yhat_upper|          yhat_lower|
+-----+----+-------------------+----+------------------+------------------+--------------------+
|    1|   1|2013-01-01 00:00:00|13.0|10.051272869689301|18.517132295245975|   1.647939275969288|
|    1|   1|2013-01-02 00:00:00|11.0|10.528625323821489|18.945047285164023|  2.0535882242744057|
|    1|   1|2013-01-03 00:00:00|14.0|11.053264561305632|19.166267433371335|  1.9700010043874727|
|    1|   1|2013-01-04 00:00:00|13.0|12.244392640789227|20.201176449280286|   4.116689111099245|
|    1|   1|2013-01-05 00:00:00|10.0| 13.78033453999933| 22.59021233927294|    5.38647052961445|
|    1|   1|2013-01-06 00:00:00|12.0|14.378950515739104|22.840947052264926|       5.45097483763|
|    1|   1|2013-01-07 00:00:00|10.0| 7.872892467507367| 16.46605719095353|-0.18726325030012617|
|    1|   1|2013-01-08 00:00:0

# 6. Conclusion
In this long post, we went through several topics. We started with identifying trends and seasonality, moved on to building a Prophet model, and scaled the process to model 500 distinct models with PySpark. We didn't get to cover CNN, LSTM, and Seasonal ARIMA but I am planning on adding them in a few days.