In [1]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SalesPrediction1026").getOrCreate()

In [2]:
from pyspark.sql.functions import lit
from pyspark.sql import functions

In [4]:
holidays_events = spark.read.csv("../holidays_events.csv", header=True, inferSchema=True)
items = spark.read.csv("../items.csv", header=True, inferSchema=True)
#oil = spark.read.csv("../oil.csv", header=True, inferSchema=True)
stores = spark.read.csv("../stores.csv", header=True, inferSchema=True)
test = spark.read.csv("../test.csv", header=True, inferSchema=True)
train = spark.read.csv("../train.csv", header=True, inferSchema=True)
#train_sample = train.sample(False, 0.05, 1)
transactions = spark.read.csv("../transactions.csv", header=True, inferSchema=True)

In [5]:
#deal with oil dataframe in pandas with backfill, then transform it to spark dataframe
#oil_pandas = pd.read_csv("../oil.csv")

In [6]:
#spark.sql("""SET spark.sql.autoBroadcastJoinThreshold = -1""")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [11]:
#combine test set and train set together
#add 'source' column to both train and test set so we can separate them after data cleaning
train = train.withColumn('source', lit('train'))
test = test.withColumn('source', lit('test'))
train_without_target = train.select("id", 'date', 'store_nbr', 'item_nbr', 'onpromotion', 'source')
#use .union() to add them together
train_test_set = train_without_target.union(test)
train_test_set.show()

+---+-------------------+---------+--------+-----------+------+
| id|               date|store_nbr|item_nbr|onpromotion|source|
+---+-------------------+---------+--------+-----------+------+
|  0|2013-01-01 00:00:00|       25|  103665|       null| train|
|  1|2013-01-01 00:00:00|       25|  105574|       null| train|
|  2|2013-01-01 00:00:00|       25|  105575|       null| train|
|  3|2013-01-01 00:00:00|       25|  108079|       null| train|
|  4|2013-01-01 00:00:00|       25|  108701|       null| train|
|  5|2013-01-01 00:00:00|       25|  108786|       null| train|
|  6|2013-01-01 00:00:00|       25|  108797|       null| train|
|  7|2013-01-01 00:00:00|       25|  108952|       null| train|
|  8|2013-01-01 00:00:00|       25|  111397|       null| train|
|  9|2013-01-01 00:00:00|       25|  114790|       null| train|
| 10|2013-01-01 00:00:00|       25|  114800|       null| train|
| 11|2013-01-01 00:00:00|       25|  115267|       null| train|
| 12|2013-01-01 00:00:00|       25|  115

## Join train and holiday_events dataframe

In [12]:
train_holiday = train_test_set.join(holidays_events, 'date', 'left_outer')
train_holiday.show(2)

+-------------------+-------+---------+--------+-----------+------+----+------+-----------+-----------+-----------+
|               date|     id|store_nbr|item_nbr|onpromotion|source|type|locale|locale_name|description|transferred|
+-------------------+-------+---------+--------+-----------+------+----+------+-----------+-----------+-----------+
|2013-05-06 00:00:00|5202345|        1|  103520|       null| train|null|  null|       null|       null|       null|
|2013-05-06 00:00:00|5202346|        1|  105574|       null| train|null|  null|       null|       null|       null|
+-------------------+-------+---------+--------+-----------+------+----+------+-----------+-----------+-----------+
only showing top 2 rows



In [13]:
train_test_set.unpersist()

DataFrame[id: int, date: timestamp, store_nbr: int, item_nbr: int, onpromotion: boolean, source: string]

## Clean Oil Dataframe

In [7]:
# oil_pandas = oil_pandas.fillna(method='bfill')
# oil_pandas = oil_pandas.fillna(method='ffill')
# oil_pandas.head()

Unnamed: 0,date,dcoilwtico
0,2013-01-01,93.14
1,2013-01-02,93.14
2,2013-01-03,92.97
3,2013-01-04,93.12
4,2013-01-07,93.2


In [8]:
# oil_pandas.isnull().sum()

date          0
dcoilwtico    0
dtype: int64

## Transform pandas dataframe back to spark dataframe

In [9]:
# oil_spark = spark.createDataFrame(oil_pandas)
# oil_spark.show()

+----------+----------+
|      date|dcoilwtico|
+----------+----------+
|2013-01-01|     93.14|
|2013-01-02|     93.14|
|2013-01-03|     92.97|
|2013-01-04|     93.12|
|2013-01-07|      93.2|
|2013-01-08|     93.21|
|2013-01-09|     93.08|
|2013-01-10|     93.81|
|2013-01-11|      93.6|
|2013-01-14|     94.27|
|2013-01-15|     93.26|
|2013-01-16|     94.28|
|2013-01-17|     95.49|
|2013-01-18|     95.61|
|2013-01-21|     96.09|
|2013-01-22|     96.09|
|2013-01-23|     95.06|
|2013-01-24|     95.35|
|2013-01-25|     95.15|
|2013-01-28|     95.95|
+----------+----------+
only showing top 20 rows



In [10]:
# oil_spark.dtypes

[('date', 'string'), ('dcoilwtico', 'double')]

In [12]:
# train_holiday_oil = train_holiday.join(oil_spark, 'date', 'left_outer')
# train_holiday_oil.show()

+-------------------+-------+---------+--------+-----------+------+----+------+-----------+-----------+-----------+----------+
|               date|     id|store_nbr|item_nbr|onpromotion|source|type|locale|locale_name|description|transferred|dcoilwtico|
+-------------------+-------+---------+--------+-----------+------+----+------+-----------+-----------+-----------+----------+
|2013-05-06 00:00:00|5202345|        1|  103520|       null| train|null|  null|       null|       null|       null|      95.8|
|2013-05-06 00:00:00|5202346|        1|  105574|       null| train|null|  null|       null|       null|       null|      95.8|
|2013-05-06 00:00:00|5202347|        1|  105575|       null| train|null|  null|       null|       null|       null|      95.8|
|2013-05-06 00:00:00|5202348|        1|  105577|       null| train|null|  null|       null|       null|       null|      95.8|
|2013-05-06 00:00:00|5202349|        1|  105693|       null| train|null|  null|       null|       null|       n

In [13]:
#train_holiday.unpersist()

DataFrame[date: timestamp, id: int, store_nbr: int, item_nbr: int, onpromotion: boolean, source: string, type: string, locale: string, locale_name: string, description: string, transferred: boolean]

## Join train_holiday_oil with store

### Rename store.type so it would not duplicate with holiday.type

In [None]:
stores = stores.withColumnRenamed("type", "store_type")
train_holiday_oil_store = train_holiday.join(stores, 'store_nbr', 'left_outer')
train_holiday_oil_store.show()

+---------+-------------------+--------+--------+-----------+------+----+------+-----------+-----------+-----------+--------+--------+----------+-------+
|store_nbr|               date|      id|item_nbr|onpromotion|source|type|locale|locale_name|description|transferred|    city|   state|store_type|cluster|
+---------+-------------------+--------+--------+-----------+------+----+------+-----------+-----------+-----------+--------+--------+----------+-------+
|       31|2013-05-06 00:00:00| 5226034|  103501|       null| train|null|  null|       null|       null|       null|Babahoyo|Los Rios|         B|     10|
|       31|2014-01-30 00:00:00|18164509| 1464035|       null| train|null|  null|       null|       null|       null|Babahoyo|Los Rios|         B|     10|
|       31|2013-05-06 00:00:00| 5226035|  103520|       null| train|null|  null|       null|       null|       null|Babahoyo|Los Rios|         B|     10|
|       31|2014-01-30 00:00:00|18164510| 1464066|       null| train|null|  n

In [15]:
#train_holiday_oil.unpersist()

DataFrame[date: timestamp, id: int, store_nbr: int, item_nbr: int, onpromotion: boolean, source: string, type: string, locale: string, locale_name: string, description: string, transferred: boolean, dcoilwtico: double]

## Join train_holiday_oil_store and transaction

In [None]:
#try to avoid the duplicated column name:
train_holiday_oil_store_transaction = train_holiday_oil_store.join(transactions, ['date', 'store_nbr'], 'left_outer')
train_holiday_oil_store_transaction.show()

## Join items table

In [None]:
train_holiday_oil_store_transaction_item = train_holiday_oil_store_transaction.join(items, 'item_nbr', 'left_outer')
#train_holiday_oil_store_transaction_item.show()

In [None]:
train_holiday_oil_store_transaction.unpersist()

## Begin data cleaning

In [None]:
#Created a new dataframe called test, so I would not damage the previous one. 
train_holiday_oil_store_transaction_item_test = train_holiday_oil_store_transaction_item.fillna({"onpromotion": False})
#train_holiday_oil_store_transaction_item_test.show()

In [None]:
train_holiday_oil_store_transaction_item.unpersist()

In [None]:
#train_holiday_oil_store_transaction_item_test_003[train_holiday_oil_store_transaction_item_test_003['source'] == 'test'].count()

### create a table for on_promotion, encoded

In [69]:
# from pyspark.sql import functions as F
# onpromotions = train_holiday_oil_store_transaction_item_test.select("onpromotion").distinct().rdd.flatMap(lambda x: x).collect()

# exprs = [F.when(F.col("onpromotion") == onpromotion, 1).otherwise(0).alias('onpromotion_' + onpromotion)
#          for onpromotion in onpromotions]

#train_holiday_oil_store_transaction_item_test.select("item_nbr", *exprs).show()

In [None]:
#item table with onpromotion encoded to 1 and 0
# item_onpromotion = train_holiday_oil_store_transaction_item_test.select("item_nbr", *exprs)

### Encode item_family and create a new table

In [None]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

stringIndexer = StringIndexer(inputCol="family", outputCol="familyIndex")
model = stringIndexer.fit(train_holiday_oil_store_transaction_item_test)
indexed = model.transform(train_holiday_oil_store_transaction_item_test)

encoder = OneHotEncoder(inputCol="familyIndex", outputCol="familyIndexVec")
train_holiday_oil_store_transaction_item_test = encoder.transform(indexed)

train_holiday_oil_store_transaction_item_test = train_holiday_oil_store_transaction_item_test.drop('family')
#family_encoded.show()

In [None]:
stringIndexer = StringIndexer(inputCol="city", outputCol="cityIndex")
model = stringIndexer.fit(train_holiday_oil_store_transaction_item_test)
indexed = model.transform(train_holiday_oil_store_transaction_item_test)

encoder = OneHotEncoder(inputCol="cityIndex", outputCol="cityIndexVec")
train_holiday_oil_store_transaction_item_test = encoder.transform(indexed)

train_holiday_oil_store_transaction_item_test.show()

In [None]:
train_holiday_oil_store_transaction_item_test = train_holiday_oil_store_transaction_item_test.drop('city')

In [None]:
stringIndexer = StringIndexer(inputCol="cluster", outputCol="clusterIndex")
model = stringIndexer.fit(train_holiday_oil_store_transaction_item_test)
indexed = model.transform(train_holiday_oil_store_transaction_item_test)

encoder = OneHotEncoder(inputCol="clusterIndex", outputCol="clusterIndexVec")
train_holiday_oil_store_transaction_item_test = encoder.transform(indexed)

train_holiday_oil_store_transaction_item_test = train_holiday_oil_store_transaction_item_test.drop('cluster')
train_holiday_oil_store_transaction_item_test.show()

### Deal with holiday_event dataframe:

## Pay attention to how I fillna with a bolean value here

In [None]:
#fill NaN value in transferred column:
train_holiday_oil_store_transaction_item_test_002 = train_holiday_oil_store_transaction_item_test.fillna('False', subset=['type', 'locale', 'locale_name', 'description'])
train_holiday_oil_store_transaction_item_test_003 = train_holiday_oil_store_transaction_item_test_002.fillna({"transferred": False})

In [None]:
train_holiday_oil_store_transaction_item_test_003[train_holiday_oil_store_transaction_item_test_003['source'] == 'test'].count()

In [None]:
stringIndexer = StringIndexer(inputCol="type", outputCol="typeIndex")
model = stringIndexer.fit(train_holiday_oil_store_transaction_item_test_003)
indexed = model.transform(train_holiday_oil_store_transaction_item_test_003)

encoder = OneHotEncoder(inputCol="typeIndex", outputCol="typeIndexVec")
train_holiday_oil_store_transaction_item_test_003 = encoder.transform(indexed)

train_holiday_oil_store_transaction_item_test_003.show()

In [None]:
stringIndexer = StringIndexer(inputCol="locale_name", outputCol="locale_nameIndex")
model = stringIndexer.fit(train_holiday_oil_store_transaction_item_test_003)
indexed = model.transform(train_holiday_oil_store_transaction_item_test_003)

encoder = OneHotEncoder(inputCol="locale_nameIndex", outputCol="locale_nameIndexVec")
train_holiday_oil_store_transaction_item_test_003 = encoder.transform(indexed)

train_holiday_oil_store_transaction_item_test_003.show()

## Combine all dataframes together, then drop original columns

In [None]:
train_holiday_oil_store_transaction_item_test_003 = train_holiday_oil_store_transaction_item_test_003.drop('type', 'description', 'transferred', 'state', 'locale', 'locale_name', 'family', 'class')

In [None]:
#Cast bolean values to int
train_holiday_oil_store_transaction_item_test_003 = train_holiday_oil_store_transaction_item_test_003.withColumn("onpromotion", train_holiday_oil_store_transaction_item_test_003["onpromotion"].cast("integer"))

In [None]:
# stringIndexer = StringIndexer(inputCol="onpromotion", outputCol="onpromotionIndex")
# model = stringIndexer.fit(train_holiday_oil_store_transaction_item_test_003)
# indexed = model.transform(train_holiday_oil_store_transaction_item_test_003)

# encoder = OneHotEncoder(inputCol="onpromotionIndex", outputCol="onpromotionIndexVec")
# train_holiday_oil_store_transaction_item_test_004 = encoder.transform(indexed)

# train_holiday_oil_store_transaction_item_test_004 = train_holiday_oil_store_transaction_item_test_004.drop('onpromotion')
# train_holiday_oil_store_transaction_item_test_004.show()
train_holiday_oil_store_transaction_item_test_003.show(5)

In [None]:
#fillna for onpromation:
train_holiday_oil_store_transaction_item_test_003 = train_holiday_oil_store_transaction_item_test_003.fillna(0, subset=['onpromotion'])

In [None]:
train_holiday_oil_store_transaction_item_test_003[train_holiday_oil_store_transaction_item_test_003.onpromotion.isNull()].count()

In [None]:
train_holiday_oil_store_transaction_item_test.unpersist()
train_holiday_oil_store_transaction_item_test_002.unpersist()

## Deal with store_type

In [None]:
stringIndexer = StringIndexer(inputCol="store_type", outputCol="store_type_Index")
model = stringIndexer.fit(train_holiday_oil_store_transaction_item_test_003)
indexed = model.transform(train_holiday_oil_store_transaction_item_test_003)

encoder = OneHotEncoder(inputCol="store_type_Index", outputCol="store_type_Index_Vec")
train_holiday_oil_store_transaction_item_test_003 = encoder.transform(indexed)

In [None]:
train_holiday_oil_store_transaction_item_test_003 = train_holiday_oil_store_transaction_item_test_003.drop('store_type')
train_holiday_oil_store_transaction_item_test_003.show()

In [None]:
train_holiday_oil_store_transaction_item_test_003.columns

In [None]:
train_holiday_oil_store_transaction_item_test_003.where(train_holiday_oil_store_transaction_item_test_003['source'] == 'test').count()

##  (drop NaN oil temporary)

In [None]:
#without oil column
train_holiday_oil_store_transaction_item_test_004 = train_holiday_oil_store_transaction_item_test_003.drop('dcoilwtico')

## Separate Train set and Test set

In [None]:
train_001 = train_holiday_oil_store_transaction_item_test_004.filter(train_holiday_oil_store_transaction_item_test_004.source == 'train')
test_001 = train_holiday_oil_store_transaction_item_test_004.filter(train_holiday_oil_store_transaction_item_test_004.source == 'test')

In [None]:
train_target = train.select('id', 'unit_sales')

In [None]:
train_001 = train_001.join(train_target, 'id', 'left_outer')

In [None]:
train_001 = train_001.drop('transactions')

In [None]:
test.count()

In [None]:
test_001.count()

In [None]:
test_001.show(5)

In [None]:
test_001 = test_001.drop('transactions')

In [None]:
print(train_001.columns)

In [None]:
print(test_001.columns)

# Begin training and predicting

In [None]:
from pyspark.ml.feature import VectorAssembler

In [None]:
from pyspark.ml.regression import LinearRegression

In [None]:
features = ['store_nbr', 'perishable', 'familyIndex', \
                             'cityIndex',  'clusterIndex',  'typeIndex', \
                             'locale_nameIndex',  \
                              'store_type_Index']
assembler = VectorAssembler(
    inputCols=features,
    outputCol='features')

In [None]:
assembled_train = assembler.transform(train_001)
assembled_train.take(5)

In [None]:
train_df, test_df = assembled_train.randomSplit([0.6, 0.4], seed=0)

In [None]:
lr = LinearRegression(maxIter=10).setLabelCol("unit_sales").setFeaturesCol("features")
model = lr.fit(train_df)

In [None]:
testing_summary = model.evaluate(test_df)

In [None]:
testing_summary.rootMeanSquaredError