## MSCA 31013 - Big Data Platforms - Course Project

### B2B: Iowa Liquor Sales : Clean, Transform and Feature Engineering
### Submitted by:

#saurabhs
#dmcdonough
#dtallarico90

@uchicago.edu

## 1. Reading and Cleaning Data

### 1.1 Step one is to import the necessary packages and load the data.

In [1]:
# Python packages
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd 
import seaborn as sns
import datetime

#PySpark SQL packages
from pyspark.sql import SparkSession
from pyspark.sql import HiveContext
from pyspark.sql import functions as F
from pyspark.sql.functions import upper, col
from pyspark.sql import SQLContext

# PySpark pipeline and feature packages
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator
from pyspark.ml.evaluation import RegressionEvaluator

# PySpark ML Algorithms for regression
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import GBTRegressor

# Supress pandas or other warnings
import warnings
warnings.filterwarnings('ignore')

%matplotlib inline

### 1.2 Download data, clean special characters and upload onto HDFS
#### Below code is run on hadoop cluster to download and clean the file from the Iowa state website. 
#### NOTE: Lines have been commented since files have been downloadded on HDFS.

Get the file

In [2]:
# curl https://data.iowa.gov/api/views/m3tr-qhgy/rows.csv?accessType=DOWNLOAD -o Iowa_Liquor_Sales.csv

In [3]:
# curl https://data.iowa.gov/api/views/ykb6-ywnd/rows.csv?accessType=DOWNLOAD -o Iowa_Liquor_Stores.csv

In [8]:
#! ls /home/saurabhs/BigData/Project

BigDataProject.ipynb
catmap.csv
Forecast_NoCat.csv
Forecast_WithCat.csv
Iowa_Liquor_Sales-clean.csv
Iowa_Liquor_Sales.csv
Iowa_Liquor_Stores.csv
Iowa_Population.csv
ProjectLiquor_DataCleanTransform_Regression_v2.ipynb
ProjectLiquor_DataCleanTransform_v2.ipynb
ProjectLiquor.ipynb


Remove special characters from file

In [5]:
#! sed -E "s#([0-9]{2})/([0-9]{2})/([0-9]{4})#\3-\1-\2#" < Iowa_Liquor_Sales.csv | tr -d '$' > Iowa_Liquor_Sales-clean.csv

In [6]:
#! ls /home/saurabhs/BigData/Project

In [9]:
# Check file size
#! ls -lah /home/saurabhs/BigData/Project

total 8.0G
drwxrwxr-x 3 saurabhs saurabhs 4.0K Mar 12 10:36 .
drwxrwxr-x 7 saurabhs saurabhs 4.0K Feb 27 09:13 ..
-rw-r--r-- 1 saurabhs saurabhs  64K Mar  7 16:16 BigDataProject.ipynb
-rw-r--r-- 1 saurabhs saurabhs 2.9K Mar 12 10:29 catmap.csv
-rw-r--r-- 1 saurabhs saurabhs 834K Mar 11 15:13 Forecast_NoCat.csv
-rw-r--r-- 1 saurabhs saurabhs 4.9M Mar 11 15:13 Forecast_WithCat.csv
-rw-r--r-- 1 saurabhs saurabhs 4.0G Mar  7 09:42 Iowa_Liquor_Sales-clean.csv
-rw-rw-r-- 1 saurabhs saurabhs 4.0G Mar  7 09:36 Iowa_Liquor_Sales.csv
-rw-r--r-- 1 saurabhs saurabhs 266K Mar  7 09:05 Iowa_Liquor_Stores.csv
-rw-r--r-- 1 saurabhs saurabhs  15K Mar 12 10:29 Iowa_Population.csv
drwxr-xr-x 2 saurabhs saurabhs 4.0K Mar 12 10:28 .ipynb_checkpoints
-rw-r--r-- 1 saurabhs saurabhs 238K Mar 11 17:34 ProjectLiquor_DataCleanTransform_Regression_v2.ipynb
-rw-r--r-- 1 saurabhs saurabhs 176K Mar 12 10:36 ProjectLiquor_DataCleanTransform_v2.ipynb
-rw-r--r-- 1 saurabhs saurabhs 102K Mar  8 14:02 Proje

Move clean file to hdfs

In [8]:
#! hdfs dfs -put -f Iowa_Liquor_Sales-clean.csv /user/saurabhs/BigData/Project

In [9]:
#! hdfs dfs -put -f Iowa_Liquor_Stores.csv /user/saurabhs/BigData/Project

In [10]:
#! hdfs dfs -ls /user/saurabhs/BigData/Project

Found 4 items
-rw-r--r--   3 saurabhs saurabhs     853875 2019-03-11 16:43 /user/saurabhs/BigData/Project/Forecast_NoCat.csv
-rw-r--r--   3 saurabhs saurabhs    5090776 2019-03-11 16:37 /user/saurabhs/BigData/Project/Forecast_WithCat.csv
-rw-r--r--   3 saurabhs saurabhs 4269077843 2019-03-07 09:49 /user/saurabhs/BigData/Project/Iowa_Liquor_Sales-clean.csv
-rw-r--r--   3 saurabhs saurabhs     271756 2019-03-07 09:50 /user/saurabhs/BigData/Project/Iowa_Liquor_Stores.csv


### 1.3 Instantiate sparksession object by checking if this notebook is running locally or on RCC. Load spark dataframe and check schema

In [2]:
from notebook import notebookapp
servers = list(notebookapp.list_running_servers())

# Check if running local or on RCC
if (servers[0]['hostname'] == "localhost"):
    IsRCC = False
else:
    IsRCC = True

Load context objects

In [3]:
if (IsRCC):
    spark = SparkSession.builder.appName('ProjectLiquor').getOrCreate()
    sqlContext = SQLContext(sc) # RCC spark context
else:
    #create Spark session locally
    spark = SparkSession.builder.appName('ProjectLiquor_local').getOrCreate()

    #change configuration settings on Spark 
    conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '5g'), ('spark.app.name', 'Spark Updated Conf'), ('spark.executor.cores', '4'), ('spark.cores.max', '4'), ('spark.driver.memory','8g')])
    sc = spark.sparkContext # Local spark context
    sqlContext = SQLContext(sc)

In [4]:
spark

Confirm configuration of spark driver

In [5]:
#print spark configuration settings
spark.sparkContext.getConf().getAll()

[('spark.driver.port', '59132'),
 ('spark.executor.id', 'driver'),
 ('spark.executor.memory', '5g'),
 ('spark.executor.cores', '4'),
 ('spark.cores.max', '4'),
 ('spark.rdd.compress', 'True'),
 ('spark.driver.memory', '8g'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.driver.host', 'INFERNO-PC'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.name', 'ProjectLiquor_local'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.app.id', 'local-1552427533135')]

Load file path accordingly in case of RCC vs Local

In [6]:
if (IsRCC): 
    # Update to RCC file names
    BD_File = '/user/saurabhs/BigData/Project/Iowa_Liquor_Sales-clean.csv'
else:
    #Update to local file names
    BD_File = 'Iowa_Liquor_Sales-clean.csv'

#### Load the file into PySpark dataframe for downstream processing

In [7]:
%%time
  
df = (spark.read
.option("header", "true")
.option("inferSchema", "true")
.option("multiLine", "true")
.option("delimiter", ",")
.format("csv")
.load(BD_File))

Wall time: 39.5 s


In [8]:
# Repartition based on number of years of data
df = df.repartition(8)

In [9]:
df.rdd.getNumPartitions()

8

#### Check data types and schema of the loaded dataset

In [10]:
#it is good when the data types are as expected. this indicates no corrupted records
df.dtypes

[('Invoice/Item Number', 'string'),
 ('Date', 'string'),
 ('Store Number', 'int'),
 ('Store Name', 'string'),
 ('Address', 'string'),
 ('City', 'string'),
 ('Zip Code', 'string'),
 ('Store Location', 'string'),
 ('County Number', 'int'),
 ('County', 'string'),
 ('Category', 'int'),
 ('Category Name', 'string'),
 ('Vendor Number', 'int'),
 ('Vendor Name', 'string'),
 ('Item Number', 'string'),
 ('Item Description', 'string'),
 ('Pack', 'int'),
 ('Bottle Volume (ml)', 'int'),
 ('State Bottle Cost', 'double'),
 ('State Bottle Retail', 'double'),
 ('Bottles Sold', 'int'),
 ('Sale (Dollars)', 'double'),
 ('Volume Sold (Liters)', 'double'),
 ('Volume Sold (Gallons)', 'double')]

In [11]:
df.printSchema()

root
 |-- Invoice/Item Number: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Store Number: integer (nullable = true)
 |-- Store Name: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zip Code: string (nullable = true)
 |-- Store Location: string (nullable = true)
 |-- County Number: integer (nullable = true)
 |-- County: string (nullable = true)
 |-- Category: integer (nullable = true)
 |-- Category Name: string (nullable = true)
 |-- Vendor Number: integer (nullable = true)
 |-- Vendor Name: string (nullable = true)
 |-- Item Number: string (nullable = true)
 |-- Item Description: string (nullable = true)
 |-- Pack: integer (nullable = true)
 |-- Bottle Volume (ml): integer (nullable = true)
 |-- State Bottle Cost: double (nullable = true)
 |-- State Bottle Retail: double (nullable = true)
 |-- Bottles Sold: integer (nullable = true)
 |-- Sale (Dollars): double (nullable = true)
 |-- Volume Sold (Liters): doub

### 1.4 Clean and transform

Check null values and NaN values

In [12]:
df.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+-------------------+----+------------+----------+-------+-----+--------+--------------+-------------+------+--------+-------------+-------------+-----------+-----------+----------------+----+------------------+-----------------+-------------------+------------+--------------+--------------------+---------------------+
|Invoice/Item Number|Date|Store Number|Store Name|Address| City|Zip Code|Store Location|County Number|County|Category|Category Name|Vendor Number|Vendor Name|Item Number|Item Description|Pack|Bottle Volume (ml)|State Bottle Cost|State Bottle Retail|Bottles Sold|Sale (Dollars)|Volume Sold (Liters)|Volume Sold (Gallons)|
+-------------------+----+------------+----------+-------+-----+--------+--------------+-------------+------+--------+-------------+-------------+-----------+-----------+----------------+----+------------------+-----------------+-------------------+------------+--------------+--------------------+---------------------+
|                  0|   0|           

We have close to 15.7M records, we may drop all NaN records but let's keep for now. 

In [13]:
#df=df.dropna(how='any')

In [14]:
#Drop the zero dollars or negative transaction rows
df=df.filter(df["Sale (Dollars)"]>0)

In [15]:
# Split the first column into invoice number and item number by reversing and then splitting since the invoice is dynamic len.
df = df.withColumn('InvoiceItemNumClean', F.reverse(df["Invoice/Item Number"]))
df = df.withColumn('ItemNum', df["InvoiceItemNumClean"].substr(1, 5))
df = df.withColumn('InvoiceNum', F.substring("InvoiceItemNumClean", 6, 20)) # Large length to take all values (Max Length is 16)

#Add date columns
df = df.withColumn('saledate', F.to_date(F.from_unixtime(F.unix_timestamp('Date', 'MM/dd/yyy'))))
df = df.withColumn("salemonth", F.month("saledate"))
df = df.withColumn("saleyear", F.year("saledate"))
df = df.withColumn('dow', F.date_format("saledate", 'EEEE'))
df = df.withColumn('dow_number', F.date_format("saledate", 'u'))
df = df.withColumn("saleq", F.quarter("saledate"))

# Clean columns for inconsistencies in case
df = df.withColumn('City', F.upper(F.col('City')))
df = df.withColumn('Store Name', F.upper(F.col('Store Name')))
df = df.withColumn('Category Name', F.upper(F.col('Category Name')))
df = df.withColumn('Vendor Name', F.upper(F.col('Vendor Name')))
df = df.withColumn('Item Description', F.upper(F.col('Item Description')))

In [16]:
# Diff of current dates vs saledate column to calc age of store
df = df.withColumn('date_diff', F.datediff(F.to_date(F.from_unixtime(F.unix_timestamp(F.lit('03/09/2019'), 'MM/dd/yyy')))\
                                               , F.to_date(df.saledate)))


In [17]:
# Add Retail cost total volume
df = df.withColumn("sumcost", col("Bottles Sold")*col("State Bottle Cost"))

In [18]:
#Consider only full year data from 2018 to 2013 (Each year calculated using 3 years of rolling data)
df = df.filter(df["saleyear"]<2019).filter(df["saleyear"]>=2013)

In [19]:
#Create temp view for creating subtables
df.createOrReplaceTempView("tempMain")

## 2. Feature Engineering for modeling
### 2.1 Create table sales forecast modeling - This wud be used to fit regression models.
We need to identify all the categories each store sold in every year and then transform those as columns applicable for each store and year combination

In [20]:
df_cat_sales = spark.sql('''
SELECT `Store Number` as storenumber, saleyear, `Category Name` as categoryname, sum(`Sale (Dollars)`) as categorysales
FROM tempMain
GROUP BY `Store Number`,saleyear, `Category Name`
''')

In [21]:
df_cat_sales.show(3)

+-----------+--------+---------------+-----------------+
|storenumber|saleyear|   categoryname|    categorysales|
+-----------+--------+---------------+-----------------+
|       2613|    2015| VODKA FLAVORED|7670.559999999999|
|       3692|    2013| CREAM LIQUEURS|9122.109999999999|
|       5258|    2017|IMPORTED VODKAS|           447.24|
+-----------+--------+---------------+-----------------+
only showing top 3 rows



In [22]:
# Summing annual sales to calculate distribution
df_ann_sales = spark.sql('''
SELECT `Store Number` as storenumber, saleyear, sum(`Sale (Dollars)`) as totalsales
FROM tempMain
GROUP BY `Store Number`,saleyear
''')

In [23]:
# Load category mapping file to merge to dataset (for RCC file needs to be on home directory and not HDFS)
if(IsRCC):
    catmap = pd.read_csv('/home/saurabhs/BigData/Project/catmap.csv')
else:
    catmap = pd.read_csv('catmap.csv')

catmap['catname'] = catmap['catname'].str.upper()

In [24]:
# Load data to pandas to transform rows to columns
pdf_cat_sales = df_cat_sales.toPandas()
pdf_cat_sales = pdf_cat_sales.merge(catmap,left_on='categoryname',right_on='catname',how='left')

pdf_ann_sales = df_ann_sales.toPandas()
pdf_cat_sales = pdf_cat_sales.merge(pdf_ann_sales)

Transforming rows into columns

In [25]:
# Transform to columns
pdf_cat_sales['catratio']=pdf_cat_sales['categorysales']/pdf_cat_sales['totalsales']

pdf_cat_sales_pv = pd.pivot_table(pdf_cat_sales, values='catratio', index=['storenumber', 'saleyear']\
                               ,columns=['type'], aggfunc=np.sum, fill_value=0)
pdf_cat_sales_rows = pd.DataFrame(pdf_cat_sales_pv.to_records())

In [26]:
pdf_cat_sales_rows.head(12)

Unnamed: 0,storenumber,saleyear,Gin,Other,Rum,Tequila,Vodka,Whiskey
0,2106,2013,0.028176,0.221125,0.157904,0.058302,0.201065,0.333428
1,2106,2014,0.02759,0.268574,0.117284,0.060731,0.228773,0.297048
2,2106,2015,0.022929,0.217254,0.191169,0.069177,0.19799,0.301482
3,2106,2016,0.027675,0.265186,0.135684,0.057878,0.193643,0.317652
4,2106,2017,0.023629,0.213437,0.208394,0.079311,0.200236,0.265879
5,2106,2018,0.022272,0.249634,0.153142,0.064041,0.224176,0.286581
6,2113,2013,0.011302,0.23127,0.132035,0.021374,0.241686,0.362332
7,2113,2014,0.033136,0.152988,0.254053,0.039879,0.161301,0.358642
8,2113,2015,0.014375,0.114436,0.222948,0.026078,0.211577,0.410586
9,2113,2016,0.02186,0.131488,0.173465,0.014741,0.199067,0.459379


In [27]:
#Save locally and reload into spark df (Hacked method, otherwise PySpark errors due to schema issues)

if(IsRCC):
    df_cat_sales_cols = sqlContext.createDataFrame(pdf_cat_sales_rows)
else:
    pdf_cat_sales_rows.to_csv("catsales.csv", index=False)

    df_cat_sales_cols = (spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .option("multiLine", "true")
    .option("delimiter", ",")
    .format("csv")
    .load("catsales.csv"))

In [None]:
df_cat_sales_cols.take(1)

[Row(storenumber=2106, saleyear=2013, Gin=0.028176117185159277, Other=0.2211248486779781, Rum=0.15790388475453365, Tequila=0.0583024570020315, Vodka=0.20106516848484013, Whiskey=0.3334275238954576)]

In [None]:
#Store, Year, Mark Up
df_mkup_sales = spark.sql('''
SELECT `Store Number`,saleyear, (sum(`Sale (Dollars)`)-sum(`sumcost`)) /sum(`sumcost`) as markup
FROM tempMain
WHERE saleyear between 2015 and 2018
GROUP BY  `Store Number`,saleyear
Order By `Store Number`,`saleyear`''')

In [None]:
df_mkup_sales.show(3)

+------------+--------+-------------------+
|Store Number|saleyear|             markup|
+------------+--------+-------------------+
|        2106|    2015| 0.5008903458289726|
|        2106|    2016|0.49699790570960534|
|        2106|    2017| 0.4960744197023383|
+------------+--------+-------------------+
only showing top 3 rows



In [None]:
# #Tenure of stores
df_tenure = spark.sql('''
SELECT `Store Number`, max(date_diff) as days_in_business
FROM tempMain
Group By `Store Number`''')

In [None]:
df_tenure.show(3)

+------------+----------------+
|Store Number|days_in_business|
+------------+----------------+
|        3918|            2256|
|        4900|            2097|
|        4818|            2252|
+------------+----------------+
only showing top 3 rows



In [None]:
#Store,Year, Annual sales (current and previous year)
df_year_sales = spark.sql('''
SELECT Year1.`Store Number`, Year1.saleyear, Year1.Yearly_Total_Sales, 
case when Year2.Yearly_Total_Sales is null then 0 else Year2.Yearly_Total_Sales end as Last_Year_Sales
from 
(SELECT `Store Number`,saleyear, sum(`Sale (Dollars)`) as Yearly_Total_Sales
FROM tempMain
WHERE saleyear between 2015 and 2018
GROUP BY  `Store Number`, saleyear
) as Year1
LEFT JOIN  
(SELECT `Store Number`,saleyear, sum(`Sale (Dollars)`) as Yearly_Total_Sales
FROM tempMain
WHERE saleyear between 2014 and 2017
GROUP BY  `Store Number`, saleyear
) as Year2
ON Year2.`Store Number` = Year1.`Store Number` AND Year1.saleyear = (Year2.saleyear+1) 
Order By Year1.`Store Number`, Year1.saleyear
''')


In [None]:
df_year_sales.show(12)

+------------+--------+------------------+------------------+
|Store Number|saleyear|Yearly_Total_Sales|   Last_Year_Sales|
+------------+--------+------------------+------------------+
|        2106|    2015|         172518.49|155672.21999999997|
|        2106|    2016|175869.29000000004|172518.49000000002|
|        2106|    2017|209448.04000000004|175869.29000000004|
|        2106|    2018|216470.00000000006|         209448.04|
|        2113|    2015|12261.899999999998|          13876.64|
|        2113|    2016|          12990.68|           12261.9|
|        2113|    2017|12860.230000000003|          12990.68|
|        2113|    2018|11746.960000000001|          12860.23|
|        2130|    2015|192385.42999999996|         120890.46|
|        2130|    2016|         159179.85|         192385.43|
|        2130|    2017|167556.19999999998|         159179.85|
|        2130|    2018|         199922.26|167556.19999999998|
+------------+--------+------------------+------------------+
only sho

In [None]:
#Store,Year, and (Year-1) and (Year-2) sales by quarter
df_growth_sales = spark.sql('''
SELECT Year1.`Store Number`, Year1.saleyear, Year1.saleq, Year1.Qtr_Sales, 
case when Year2.Qtr_Sales is null then 0 else Year2.Qtr_Sales end as Last_Year_Qtr_Sales
from 
(SELECT `Store Number`,saleyear, saleq, sum(`Sale (Dollars)`) as Qtr_Sales
FROM tempMain
WHERE saleyear between 2014 and 2017
GROUP BY  `Store Number`, saleyear, saleq
) as Year1
LEFT JOIN  
(SELECT `Store Number`,saleyear, saleq, sum(`Sale (Dollars)`) as Qtr_Sales
FROM tempMain
WHERE saleyear between 2013 and 2016
GROUP BY  `Store Number`, saleyear, saleq
) as Year2
ON Year2.`Store Number` = Year1.`Store Number` AND Year1.saleyear = (Year2.saleyear+1) AND Year1.saleq = Year2.saleq
Order By Year1.`Store Number`, Year1.saleyear, Year1.saleq
''')


In [None]:
df_growth_sales.show(12)

+------------+--------+-----+------------------+-------------------+
|Store Number|saleyear|saleq|         Qtr_Sales|Last_Year_Qtr_Sales|
+------------+--------+-----+------------------+-------------------+
|        2106|    2014|    1|32166.630000000005| 34423.240000000005|
|        2106|    2014|    2| 39602.67999999999|           35657.38|
|        2106|    2014|    3|44595.170000000006| 35792.649999999994|
|        2106|    2014|    4|          39307.74|           38479.48|
|        2106|    2015|    1|43174.310000000005|           32166.63|
|        2106|    2015|    2|          43450.72|           39602.68|
|        2106|    2015|    3|          44052.02|           44595.17|
|        2106|    2015|    4|41841.439999999995| 39307.740000000005|
|        2106|    2016|    1|44759.880000000005| 43174.310000000005|
|        2106|    2016|    2|          43903.03|           43450.72|
|        2106|    2016|    3|          39201.35|           44052.02|
|        2106|    2016|    4|     

In [None]:
#Store,Year, City
df_city = spark.sql('''
SELECT `Store Number`,saleyear, City
FROM tempMain
WHERE saleyear between 2015 and 2018
GROUP BY  `Store Number`,saleyear,City
Order By `Store Number`,`saleyear`,City asc''')

In [None]:
df_city.show(5)

+------------+--------+-----------+
|Store Number|saleyear|       City|
+------------+--------+-----------+
|        2106|    2015|CEDAR FALLS|
|        2106|    2016|CEDAR FALLS|
|        2106|    2017|CEDAR FALLS|
|        2106|    2018|CEDAR FALLS|
|        2113|    2015|     GOWRIE|
+------------+--------+-----------+
only showing top 5 rows



Fuse population demographic data with original city dataframe

In [None]:
# Function to create spark df from filename supplied / Used for merging smaller datasets with primary Big dataset
def create_spark_dataframe(file_name):
    """
    will return the spark dataframe input pandas dataframe
    """
    
    if(IsRCC):
        sdf = (spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .option("multiLine", "true")
        .option("delimiter", ",")
        .format("csv")
        .load(file_name))
        
        pandas_data_frame = sdf.toPandas()        
    else:
        pandas_data_frame = pd.read_csv(file_name)
        
    
    for col in pandas_data_frame.columns:
        if ((pandas_data_frame[col].dtypes != np.int64) & (pandas_data_frame[col].dtypes != np.float64)):
            pandas_data_frame[col] = pandas_data_frame[col].fillna('')

    #Save locally and reload into spark df (Hacked method, otherwise PySpark errors due to schema issues)
    
    if(IsRCC):
        spark_data_frame = sqlContext.createDataFrame(pandas_data_frame)
    else:
        pandas_data_frame.to_csv("temppop.csv", index=False)

        spark_data_frame = (spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .option("multiLine", "true")
        .option("delimiter", ",")
        .format("csv")
        .load("temppop.csv"))
    
    return spark_data_frame

In [None]:
# Load Iowa Population data file to merge to dataset (for RCC file needs to be on home directory and not HDFS)
if(IsRCC):
    IowaPop = create_spark_dataframe('/user/saurabhs/BigData/Project/Iowa_Population.csv')
else:
    IowaPop = create_spark_dataframe('Iowa_Population.csv')

In [None]:
IowaPop.show()

+---------------+----------+
|           City|Population|
+---------------+----------+
|     DES MOINES|   214,778|
|   CEDAR RAPIDS|   130,330|
|      DAVENPORT|   102,268|
|     SIOUX CITY|    82,568|
|      IOWA CITY|    73,415|
|       WATERLOO|    68,146|
|           AMES|    65,005|
|WEST DES MOINES|    62,999|
| COUNCIL BLUFFS|    62,317|
|        DUBUQUE|    58,410|
|         ANKENY|    56,237|
|      URBANDALE|    42,222|
|    CEDAR FALLS|    41,167|
|         MARION|    38,014|
|     BETTENDORF|    35,293|
|     MASON CITY|    27,453|
|   MARSHALLTOWN|    27,440|
|        CLINTON|    25,892|
|     BURLINGTON|    25,330|
|        OTTUMWA|    24,705|
+---------------+----------+
only showing top 20 rows



In [None]:
# Merge two dataframes together
df_city_pop = df_city.join(IowaPop, on=['City'], how='left')

In [None]:
df_city_pop.show()

+-----------+------------+--------+----------+
|       City|Store Number|saleyear|Population|
+-----------+------------+--------+----------+
|CEDAR FALLS|        2106|    2015|    41,167|
|CEDAR FALLS|        2106|    2016|    41,167|
|CEDAR FALLS|        2106|    2017|    41,167|
|CEDAR FALLS|        2106|    2018|    41,167|
|     GOWRIE|        2113|    2015|     1,009|
|     GOWRIE|        2113|    2016|     1,009|
|     GOWRIE|        2113|    2017|     1,009|
|     GOWRIE|        2113|    2018|     1,009|
|   WATERLOO|        2130|    2015|    68,146|
|   WATERLOO|        2130|    2016|    68,146|
|   WATERLOO|        2130|    2017|    68,146|
|   WATERLOO|        2130|    2018|    68,146|
|   ROCKWELL|        2152|    2015|       970|
|   ROCKWELL|        2152|    2016|       970|
|     WAUKON|        2178|    2015|     3,744|
|     WAUKON|        2178|    2016|     3,744|
|     WAUKON|        2178|    2017|     3,744|
|     WAUKON|        2178|    2018|     3,744|
| DES MOINES|

#### Merge dataset for forecasting, create temp tables for use in spark sql

In [None]:
#Create temp view for creating subtables
#Master table
df_year_sales.createOrReplaceTempView("df_year_sales")

#Tables to join
df_cat_sales_cols.createOrReplaceTempView("df_cat_sales_cols")
df_mkup_sales.createOrReplaceTempView("df_mkup_sales")
df_tenure.createOrReplaceTempView("df_tenure")
df_growth_sales.createOrReplaceTempView("df_growth_sales")
df_city_pop.createOrReplaceTempView("df_city_pop")

Create QoQ growth columns by transforming from rows into pandas

In [None]:
pdf_growth_sales = df_growth_sales.toPandas()

In [None]:
pdf_growth_sales.head()

Unnamed: 0,Store Number,saleyear,saleq,Qtr_Sales,Last_Year_Qtr_Sales
0,2106,2014,1,32166.63,34423.24
1,2106,2014,2,39602.68,35657.38
2,2106,2014,3,44595.17,35792.65
3,2106,2014,4,39307.74,38479.48
4,2106,2015,1,43174.31,32166.63


In [None]:
pdf_growth_sales['Growth'] = pdf_growth_sales.apply(lambda x: ((x['Qtr_Sales']-x['Last_Year_Qtr_Sales'])\
                                                               /x['Last_Year_Qtr_Sales']), axis=1)

In [None]:
pdf_growth_sales.head()

Unnamed: 0,Store Number,saleyear,saleq,Qtr_Sales,Last_Year_Qtr_Sales,Growth
0,2106,2014,1,32166.63,34423.24,-0.065555
1,2106,2014,2,39602.68,35657.38,0.110645
2,2106,2014,3,44595.17,35792.65,0.245931
3,2106,2014,4,39307.74,38479.48,0.021525
4,2106,2015,1,43174.31,32166.63,0.342208


In [None]:
pdf_growth_sales_pv = pd.pivot_table(pdf_growth_sales, values='Growth',\
                                     index=['Store Number', 'saleyear'],columns=['saleq'],  fill_value=0)#aggfunc=np.sum,

In [None]:
pdf_growth_sales_rows = pd.DataFrame(pdf_growth_sales_pv.to_records())

pdf_growth_sales_rows = pdf_growth_sales_rows.replace([np.inf, -np.inf], 0)
pdf_growth_sales_rows = pdf_growth_sales_rows.fillna(0)

pdf_growth_sales_rows.head()

Unnamed: 0,Store Number,saleyear,1,2,3,4
0,2106,2014,-0.065555,0.110645,0.245931,0.021525
1,2106,2015,0.342208,0.097166,-0.01218,0.064458
2,2106,2016,0.036725,0.01041,-0.110112,0.147308
3,2106,2017,0.143686,0.246036,0.005673,0.335867
4,2113,2014,0.725774,0.189934,0.344151,-0.272514


In [None]:
#Save locally and reload into spark df (Hacked method, otherwise PySpark errors due to schema issues)

if(IsRCC):
    df_growth_sales_QoQ = sqlContext.createDataFrame(pdf_growth_sales_rows)
else:
    pdf_growth_sales_rows.to_csv("growthsales.csv", index=False)

    df_growth_sales_QoQ = (spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .option("multiLine", "true")
    .option("delimiter", ",")
    .format("csv")
    .load("growthsales.csv"))

In [None]:
df_growth_sales_QoQ.show(8)

+------------+--------+--------------------+--------------------+--------------------+--------------------+
|Store Number|saleyear|                   1|                   2|                   3|                   4|
+------------+--------+--------------------+--------------------+--------------------+--------------------+
|        2106|    2014|-0.06555484027651087| 0.11064469683414754| 0.24593093833510546| 0.02152471914901123|
|        2106|    2015| 0.34220805847550717| 0.09716615138167418|-0.01217957011936...| 0.06445804312331337|
|        2106|    2016|0.036724848642630294|0.010409723935529669|-0.11011231721042528| 0.14730826663709462|
|        2106|    2017| 0.14368581863937066| 0.24603586586165047|0.005672763820633847| 0.33586667897093286|
|        2113|    2014|  0.7257740165067664| 0.18993371152974028| 0.34415121255349496|-0.27251363339490564|
|        2113|    2015|  0.5058849709696343| -0.2052626774384956| -0.3481736977093835|-0.21607241703133132|
|        2113|    2016|0.041

In [None]:
df_growth_sales_QoQ.createOrReplaceTempView("df_growth_sales_QoQ")

### 2.2 Merge dataframes into main dataframe with all features for processing in models
Print columns for helping join the tables

In [None]:
print("df_year_sales",df_year_sales.dtypes)
print("df_mkup_sales",df_mkup_sales.dtypes)
print("df_tenure",df_tenure.dtypes)
print("df_growth_sales",df_growth_sales.dtypes)
print("df_city_pop",df_city_pop.dtypes)
print("df_cat_sales_cols",df_cat_sales_cols.dtypes)
print("df_growth_sales_QoQ",df_growth_sales_QoQ.dtypes)

df_year_sales [('Store Number', 'int'), ('saleyear', 'int'), ('Yearly_Total_Sales', 'double'), ('Last_Year_Sales', 'double')]
df_mkup_sales [('Store Number', 'int'), ('saleyear', 'int'), ('markup', 'double')]
df_tenure [('Store Number', 'int'), ('days_in_business', 'int')]
df_growth_sales [('Store Number', 'int'), ('saleyear', 'int'), ('saleq', 'int'), ('Qtr_Sales', 'double'), ('Last_Year_Qtr_Sales', 'double')]
df_city_pop [('City', 'string'), ('Store Number', 'int'), ('saleyear', 'int'), ('Population', 'string')]
df_cat_sales_cols [('storenumber', 'int'), ('saleyear', 'int'), ('Gin', 'double'), ('Other', 'double'), ('Rum', 'double'), ('Tequila', 'double'), ('Vodka', 'double'), ('Whiskey', 'double')]
df_growth_sales_QoQ [('Store Number', 'int'), ('saleyear', 'int'), ('1', 'double'), ('2', 'double'), ('3', 'double'), ('4', 'double')]


In [None]:
df_city_pop.show()

+-----------+------------+--------+----------+
|       City|Store Number|saleyear|Population|
+-----------+------------+--------+----------+
|CEDAR FALLS|        2106|    2015|    41,167|
|CEDAR FALLS|        2106|    2016|    41,167|
|CEDAR FALLS|        2106|    2017|    41,167|
|CEDAR FALLS|        2106|    2018|    41,167|
|     GOWRIE|        2113|    2015|     1,009|
|     GOWRIE|        2113|    2016|     1,009|
|     GOWRIE|        2113|    2017|     1,009|
|     GOWRIE|        2113|    2018|     1,009|
|   WATERLOO|        2130|    2015|    68,146|
|   WATERLOO|        2130|    2016|    68,146|
|   WATERLOO|        2130|    2017|    68,146|
|   WATERLOO|        2130|    2018|    68,146|
|   ROCKWELL|        2152|    2015|       970|
|   ROCKWELL|        2152|    2016|       970|
|     WAUKON|        2178|    2015|     3,744|
|     WAUKON|        2178|    2016|     3,744|
|     WAUKON|        2178|    2017|     3,744|
|     WAUKON|        2178|    2018|     3,744|
| DES MOINES|

#### Create forecasting table

In [None]:
df_forecasting = spark.sql('''
SELECT Year1.`Store Number`, Year1.saleyear, Year1.Yearly_Total_Sales, Year1.Last_Year_Sales, 
case when MkUp.markup is null then 0 else MkUp.markup end as Markup,
case when Tenure.days_in_business is null then 0 else Tenure.days_in_business  end as Tenure_days,
case when QoQ.`1` is null then 0 else QoQ.`1`  end as Q1_QoQ_Sales,
case when QoQ.`2` is null then 0 else QoQ.`2`  end as Q2_QoQ_Sales,
case when QoQ.`3` is null then 0 else QoQ.`3`  end as Q3_QoQ_Sales,
case when QoQ.`4` is null then 0 else QoQ.`4`  end as Q4_QoQ_Sales, 
case when df_city_pop.City is null then '' else df_city_pop.City end as City,
case when df_city_pop.Population is null then '' else df_city_pop.Population end as CityPop
from 
df_year_sales as Year1
LEFT JOIN df_mkup_sales as MkUp
ON MkUp.`Store Number` = Year1.`Store Number` AND MkUp.saleyear = Year1.saleyear
LEFT JOIN df_tenure as Tenure
ON Tenure.`Store Number` = Year1.`Store Number`
LEFT JOIN df_growth_sales_QoQ as QoQ
ON QoQ.`Store Number` = Year1.`Store Number` AND (QoQ.saleyear+1) = Year1.saleyear
LEFT JOIN df_city_pop 
ON df_city_pop.`Store Number` = Year1.`Store Number` AND df_city_pop.saleyear = Year1.saleyear
Order By Year1.`Store Number`, Year1.saleyear
''')


In [None]:
df_forecasting.show(10)

+------------+--------+------------------+------------------+-------------------+-----------+--------------------+--------------------+--------------------+--------------------+-----------+-------+
|Store Number|saleyear|Yearly_Total_Sales|   Last_Year_Sales|             Markup|Tenure_days|        Q1_QoQ_Sales|        Q2_QoQ_Sales|        Q3_QoQ_Sales|        Q4_QoQ_Sales|       City|CityPop|
+------------+--------+------------------+------------------+-------------------+-----------+--------------------+--------------------+--------------------+--------------------+-----------+-------+
|        2106|    2015|         172518.49|155672.21999999997| 0.5008903458289726|       2256|-0.06555484027651087| 0.11064469683414754| 0.24593093833510546| 0.02152471914901123|CEDAR FALLS| 41,167|
|        2106|    2016|175869.29000000004|172518.49000000002|0.49699790570960534|       2256| 0.34220805847550717| 0.09716615138167418|-0.01217957011936...| 0.06445804312331337|CEDAR FALLS| 41,167|
|        2

In [None]:
df_forecasting.createOrReplaceTempView("df_forecasting")

Merge with category dataframe. We are doing it here since the number of columns are large, and we just want to pick all of them except the one's used to join the dataframes

In [None]:
df_final = df_forecasting.join(df_cat_sales_cols, (df_forecasting["Store Number"] == df_cat_sales_cols.storenumber) &\
                             (df_forecasting.saleyear == df_cat_sales_cols.saleyear))\
                                .drop(df_cat_sales_cols.saleyear).drop(df_cat_sales_cols.storenumber)

In [None]:
df_final.show(5)

+------------+--------+------------------+------------------+-------------------+-----------+--------------------+--------------------+--------------------+--------------------+-----------+-------+--------------------+-------------------+-------------------+--------------------+-------------------+-------------------+
|Store Number|saleyear|Yearly_Total_Sales|   Last_Year_Sales|             Markup|Tenure_days|        Q1_QoQ_Sales|        Q2_QoQ_Sales|        Q3_QoQ_Sales|        Q4_QoQ_Sales|       City|CityPop|                 Gin|              Other|                Rum|             Tequila|              Vodka|            Whiskey|
+------------+--------+------------------+------------------+-------------------+-----------+--------------------+--------------------+--------------------+--------------------+-----------+-------+--------------------+-------------------+-------------------+--------------------+-------------------+-------------------+
|        2106|    2015|         172518.4

#### Check shapes of both dataframes before writing to disk

In [None]:
print(("Forecast only:", df_forecasting.count(), len(df_forecasting.columns)))

('Forecast only:', 6088, 12)


In [None]:
print(("Forecast with categories:", df_final.count(), len(df_final.columns)))

('Forecast with categories:', 6088, 18)


#### Correct datatypes and drop columns which are not required

In [None]:
df_final.dtypes

[('Store Number', 'int'),
 ('saleyear', 'int'),
 ('Yearly_Total_Sales', 'double'),
 ('Last_Year_Sales', 'double'),
 ('Markup', 'double'),
 ('Tenure_days', 'int'),
 ('Q1_QoQ_Sales', 'double'),
 ('Q2_QoQ_Sales', 'double'),
 ('Q3_QoQ_Sales', 'double'),
 ('Q4_QoQ_Sales', 'double'),
 ('City', 'string'),
 ('CityPop', 'string'),
 ('Gin', 'double'),
 ('Other', 'double'),
 ('Rum', 'double'),
 ('Tequila', 'double'),
 ('Vodka', 'double'),
 ('Whiskey', 'double')]

In [None]:
#Correct column datatypes
df_final = df_final.withColumn("CityPop", F.regexp_replace(F.col("CityPop"), "[\$#,]", ""))
df_final = df_final.withColumn("CityPop", df_final.CityPop.cast('double'))

In [None]:
df_final.dtypes

[('Store Number', 'int'),
 ('saleyear', 'int'),
 ('Yearly_Total_Sales', 'double'),
 ('Last_Year_Sales', 'double'),
 ('Markup', 'double'),
 ('Tenure_days', 'int'),
 ('Q1_QoQ_Sales', 'double'),
 ('Q2_QoQ_Sales', 'double'),
 ('Q3_QoQ_Sales', 'double'),
 ('Q4_QoQ_Sales', 'double'),
 ('City', 'string'),
 ('CityPop', 'double'),
 ('Gin', 'double'),
 ('Other', 'double'),
 ('Rum', 'double'),
 ('Tequila', 'double'),
 ('Vodka', 'double'),
 ('Whiskey', 'double')]

In [None]:
df_final.printSchema()

root
 |-- Store Number: integer (nullable = true)
 |-- saleyear: integer (nullable = true)
 |-- Yearly_Total_Sales: double (nullable = true)
 |-- Last_Year_Sales: double (nullable = true)
 |-- Markup: double (nullable = true)
 |-- Tenure_days: integer (nullable = true)
 |-- Q1_QoQ_Sales: double (nullable = true)
 |-- Q2_QoQ_Sales: double (nullable = true)
 |-- Q3_QoQ_Sales: double (nullable = true)
 |-- Q4_QoQ_Sales: double (nullable = true)
 |-- City: string (nullable = true)
 |-- CityPop: double (nullable = true)
 |-- Gin: double (nullable = true)
 |-- Other: double (nullable = true)
 |-- Rum: double (nullable = true)
 |-- Tequila: double (nullable = true)
 |-- Vodka: double (nullable = true)
 |-- Whiskey: double (nullable = true)



In [None]:
df_final.show(5)

+------------+--------+------------------+------------------+-------------------+-----------+--------------------+--------------------+--------------------+--------------------+-----------+-------+--------------------+-------------------+-------------------+--------------------+-------------------+-------------------+
|Store Number|saleyear|Yearly_Total_Sales|   Last_Year_Sales|             Markup|Tenure_days|        Q1_QoQ_Sales|        Q2_QoQ_Sales|        Q3_QoQ_Sales|        Q4_QoQ_Sales|       City|CityPop|                 Gin|              Other|                Rum|             Tequila|              Vodka|            Whiskey|
+------------+--------+------------------+------------------+-------------------+-----------+--------------------+--------------------+--------------------+--------------------+-----------+-------+--------------------+-------------------+-------------------+--------------------+-------------------+-------------------+
|        2106|    2015|         172518.4

In [None]:
# incase we decide to query or analyze further
df_final.createOrReplaceTempView("df_final")

### 2.3 Save final dataframe to disk 
#### We now have all columns as numeric and City as string, which will change to vector during modeling. Let us save this file for modelling.

In [None]:
#Function to save to file, it will be saved in a folder created where the notebook is being run
# <fileName>/part-00000
# Rename manually to readable name
# Pass mode=True to save to disk to avoid accidental writes
def saveMyDf(df, fileName, mode=False):  
    if (mode):
        df.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save(fileName)

In [None]:
saveMyDf(df_final, "LiqourSales_Forecast.csv", True)

## Continued in Notebook 2 for  Modelling