<a href="https://colab.research.google.com/github/nvakiet/SparkML-Regression_And_Classification/blob/main/SparkML_Regression.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Install and initialize Spark on Google Colab

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

!wget -q https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz

!tar xf spark-3.2.1-bin-hadoop3.2.tgz

!pip install -q findspark

In [2]:
# Initialize Spark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"
import findspark
findspark.init()

# Download the "Predict Future Sales" dataset from Kaggle

In [3]:
# Install Kaggle python package and create directory for kaggle credential
!pip install kaggle
!mkdir ~/.kaggle
# Copy the Kaggle credential to runtime machine (remember to mount the Google Drive containing the credential file)
!cp /content/drive/MyDrive/kaggle.json ~/.kaggle/kaggle.json

# Download the dataset and unzip
!kaggle competitions download -c competitive-data-science-predict-future-sales
!unzip competitive-data-science-predict-future-sales.zip


Downloading competitive-data-science-predict-future-sales.zip to /content
  0% 0.00/15.1M [00:00<?, ?B/s]
100% 15.1M/15.1M [00:00<00:00, 169MB/s]
Archive:  competitive-data-science-predict-future-sales.zip
  inflating: item_categories.csv     
  inflating: items.csv               
  inflating: sales_train.csv         
  inflating: sample_submission.csv   
  inflating: shops.csv               
  inflating: test.csv                


# Import packages and initialize a Spark session

In [None]:
from pyspark.sql import functions as f
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.feature import *
from pyspark.ml.evaluation import *

In [5]:
spark = (SparkSession
        .builder
        .appName('Lab 3 - SparkML: Regression')
        .getOrCreate()
)

# Load train and test datasets into Pyspark

In [14]:
data = spark.read.csv("sales_train.csv", header = True, inferSchema = True)
test = spark.read.csv("test.csv", header=True, inferSchema=True)

# Raw data overview
Firstly, let's see what the raw data looks like.

In [15]:
# Print the dataframe schema
print("Training data schema")
data.printSchema()
print("Test data schema")
test.printSchema()

Training data schema
root
 |-- date: string (nullable = true)
 |-- date_block_num: integer (nullable = true)
 |-- shop_id: integer (nullable = true)
 |-- item_id: integer (nullable = true)
 |-- item_price: double (nullable = true)
 |-- item_cnt_day: double (nullable = true)

Test data schema
root
 |-- ID: integer (nullable = true)
 |-- shop_id: integer (nullable = true)
 |-- item_id: integer (nullable = true)



In [16]:
# See the first 10 samples
data.show(10)

+----------+--------------+-------+-------+----------+------------+
|      date|date_block_num|shop_id|item_id|item_price|item_cnt_day|
+----------+--------------+-------+-------+----------+------------+
|02.01.2013|             0|     59|  22154|     999.0|         1.0|
|03.01.2013|             0|     25|   2552|     899.0|         1.0|
|05.01.2013|             0|     25|   2552|     899.0|        -1.0|
|06.01.2013|             0|     25|   2554|   1709.05|         1.0|
|15.01.2013|             0|     25|   2555|    1099.0|         1.0|
|10.01.2013|             0|     25|   2564|     349.0|         1.0|
|02.01.2013|             0|     25|   2565|     549.0|         1.0|
|04.01.2013|             0|     25|   2572|     239.0|         1.0|
|11.01.2013|             0|     25|   2572|     299.0|         1.0|
|03.01.2013|             0|     25|   2573|     299.0|         3.0|
+----------+--------------+-------+-------+----------+------------+
only showing top 10 rows



Let's generate a summary of the whole dataframe to have a better understanding of the problem

In [17]:
print("Column number:", len(data.columns))
print("Raw data summary:")
data.summary().show()

Column number: 6
Raw data summary:
+-------+----------+-----------------+------------------+------------------+------------------+-----------------+
|summary|      date|   date_block_num|           shop_id|           item_id|        item_price|     item_cnt_day|
+-------+----------+-----------------+------------------+------------------+------------------+-----------------+
|  count|   2935849|          2935849|           2935849|           2935849|           2935849|          2935849|
|   mean|      null|14.56991146343017|33.001728290521754|10197.227056977385| 890.8532326980396|1.242640885140891|
| stddev|      null|9.422987708755963| 16.22697304833343|6324.2973538912065|1729.7996307126489|2.618834430895435|
|    min|01.01.2013|                0|                 0|                 0|              -1.0|            -22.0|
|    25%|      null|                7|                22|              4476|             249.0|              1.0|
|    50%|      null|               14|               

From the summary, we can see the dataset has a shape of (2935849,6). This is a time series where the item price and sales count depend on the dates. There's also the fact that the problem requires us to predict the sales count by month while this dataset is indexed by date, so some aggregation must be done on this dataframe.  
Judging from the summary (mean, std, min, max) of item_price and item_cnt_day, there're definitely outliers in the data, and maybe some missing values.  
Because the basic Spark MLlib library doesn't have any class for time series modeling, we must perform some feature engineering to convert this dataset from a time series forecasting problem to a regression problem so we can fit it with SparkML's regressors.

# Data preprocessing
To prevent data leakage problem, we remove any item in training set which doesn't exist in test set. With Spark, this can be done by doing a semi join.

In [48]:
# Perform right outer join
train = (data.join(test, ["shop_id", "item_id"], "semi")
              .select(data.columns))
# See if the number of rows decreases
print("Rows after joining:", train.count())
print("Columns after joining:", train.columns)

Rows after joining: 1224439
Columns after joining: ['date', 'date_block_num', 'shop_id', 'item_id', 'item_price', 'item_cnt_day']


Remove any row where the item price is negative (i.e outliers), this is to better calculate the monthly mean price for that item.

In [49]:
train = train.where("item_price > 0")

Group the data by month block index, shop id and item id.  
Then perform aggregation to create new features based on each (month, shop, item) tuple.

In [50]:
train = (
    train.groupBy(["date_block_num", "shop_id", "item_id"]).agg(
        f.sum("item_price").alias("all_itemPrice_month"), # Sum of item prices by each item per month
        f.mean("item_price").alias("mean_itemPrice_month"), # Mean of item prices by each item per month
        f.sum("item_cnt_day").alias("item_cnt_month"), # Total item sold per month
        f.mean("item_cnt_day").alias("mean_itemCnt_month"), # Mean of item sold per month
        f.count("item_cnt_day").alias("days_sold_month") # Number of days the item is sold each month
    ).withColumnRenamed("date_block_num", "month_index")
)

In [51]:
train.columns

['month_index',
 'shop_id',
 'item_id',
 'all_itemPrice_month',
 'mean_itemPrice_month',
 'item_cnt_month',
 'mean_itemCnt_month',
 'days_sold_month']

Generate all posible combinations of month indices, shop ids and item ids. This is for later when we create lag features from time series, an item maybe available at a certain month, but not all months before that, which will make it impossible to create lag features for that item. So we need these combinations to fill in the missing records.

In [53]:
combinations = (
    (train.select("month_index").distinct()).crossJoin(
        train.select("shop_id").distinct())
    ).crossJoin(
        train.select("item_id").distinct()
    )

To fill the missing records with these combinations, simply do a right outer join. Then fill the missing values with 0.

In [54]:
train_monthly = train.join(combinations, ["month_index", "shop_id", "item_id"], "rightouter")
train_monthly = train_monthly.fillna(0)

Now let's see what our new dataset looks like

In [55]:
print("10 first samples:")
train_monthly.show(10)
print("Summary:")
train_monthly.summary().show()

10 first samples:
+-----------+-------+-------+-------------------+--------------------+--------------+------------------+---------------+
|month_index|shop_id|item_id|all_itemPrice_month|mean_itemPrice_month|item_cnt_month|mean_itemCnt_month|days_sold_month|
+-----------+-------+-------+-------------------+--------------------+--------------+------------------+---------------+
|          1|     31|   1829|             7495.0|              1499.0|           5.0|               1.0|              5|
|          4|     31|   1829|             2398.0|              1199.0|           2.0|               1.0|              2|
|          8|     31|   1829|             2398.0|              1199.0|           2.0|               1.0|              2|
|         10|     31|   1829|                0.0|                 0.0|           0.0|               0.0|              0|
|         13|     31|   1829|           2878.045|   479.6741666666667|          11.0|1.8333333333333333|              6|
|          3| 

# Exploratory Data Analysis (EDA)

# References
1. https://www.kaggle.com/code/dimitreoliveira/model-stacking-feature-engineering-and-eda/notebook