# Build Base Feature/Future Store
The resulting output will be a feature store (if based on train) and a future store (if based on test) that is at an item-loc-day level that includes all item, store and event information.

These feature stores will be known as the *BASE FEATURE/FUTURE STORE*. Further information will be added to these feature stores but the base will remain relatively static.
For example, a pipeline that is attempting to better model sales during events will engineer features in the pipeline that add to the base feature store, rather than it getting built ito the base feature store here.


In [None]:
from config import proj
import pyspark.sql.functions as sf
from src.utils.validation_utils import *
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

build_on = "train" # train builds the feature store, test builds the future store.

## Add provided data

### Pull in train or test

In [2]:
if build_on == "train":
    base = spark.read.parquet(str(proj.Config.paths.get("data_proc").joinpath("train.parquet")))
    base = base.filter("date >= '2015-08-14'") ## Filter to two most recent years
elif build_on == "test":
    base = spark.read.parquet(str(proj.Config.paths.get("data_proc").joinpath("test.parquet")))
else:
    raise NotImplemented("Can only build feature or future store")
orig_row_count = base.count()

                                                                                

### Add item

In [3]:
items = spark.read.parquet(str(proj.Config.paths.get("data_proc").joinpath("items.parquet")))
base = base.join(items, base.item_nbr == items.item_nbr, "left").drop(items.item_nbr)
# base.show(5)

### Add store

In [4]:
stores = spark.read.parquet(str(proj.Config.paths.get("data_proc").joinpath("stores.parquet")))
base = base.join(stores, base.store_nbr == stores.store_nbr, "left").drop(stores.store_nbr)
# base.show(5)

## Feature engineering

### New and cleared item flag
This flag will help us know how to treat particular items. If they have been cleared they wont need to be predicted for, so we can possibly filter them out. Or if they are new, a different treatment will need to be applied since the model wont have seen these items before.

In [5]:
train = spark.read.parquet(str(proj.Config.paths.get("data_proc").joinpath("train.parquet")))
test = spark.read.parquet(str(proj.Config.paths.get("data_proc").joinpath("test.parquet")))

train_items = train.select("item_nbr", sf.lit(1).alias("train_fl")).distinct()
test_items = test.select("item_nbr", sf.lit(1).alias("test_fl")).distinct()

item_coverage = train_items.join(test_items, train_items.item_nbr == test_items.item_nbr, "full")

new_items = item_coverage.filter("train_fl is null")\
    .drop(train_items.item_nbr)\
    .select(test_items.item_nbr)\
    .withColumn("new_item", sf.lit(True)) # items not in train

cleared_items = item_coverage.filter("test_fl is null")\
    .drop(test_items.item_nbr)\
    .select(train_items.item_nbr)\
    .withColumn("cleared_item", sf.lit(True)) # items not in test

In [6]:
base = base\
    .join(new_items, ["item_nbr"], "left")\
    .join(cleared_items, ["item_nbr"], "left")\
    .na.fill(value = False, subset=["new_item", "cleared_item"])

### Add events
Engineer appropriate flags for events.
This will add the following columns to the base set
- event_nat - flag for national event
- event_reg - flag for regional event
- event_loc - flag for local event
- event_type - type of event

In [7]:
holidays = spark.read.parquet(str(proj.Config.paths.get("data_proc").joinpath("holidays_events.parquet")))

In [8]:
# deal with dupes and filter transferred and work days
holidays = holidays\
    .filter("transferred == false")\
    .filter("type != 'Work Day'")\
    .groupby(["date", "locale", "locale_name"])\
    .agg(sf.concat_ws("|", sf.collect_list("type")).alias("type")) # combines multiple types into a single row

In [9]:
holidays_nat = holidays\
    .filter("locale == 'National'")\
    .select(["date", "type"])\
    .withColumnRenamed("type", "type_nat")\
    .withColumn("event_nat", sf.lit(True))

holidays_reg = holidays\
    .filter("locale == 'Regional'")\
    .select(["date", "type", "locale_name"])\
    .withColumnRenamed("locale_name", "state")\
    .withColumnRenamed("type", "type_reg")\
    .withColumn("event_reg", sf.lit(True))

holidays_loc = holidays\
    .filter("locale == 'Local'")\
    .select(["date", "type", "locale_name"])\
    .withColumnRenamed("locale_name", "city")\
    .withColumnRenamed("type", "type_loc")\
    .withColumn("event_loc", sf.lit(True))

In [10]:
# join all event flags to base
base = base\
    .join(holidays_nat, ["date"], "left")\
    .join(holidays_reg, ["date", "state"], "left")\
    .join(holidays_loc, ["date", "city"], "left")\
    .na.fill(value = False, subset = ["event_nat", "event_reg", "event_loc"])

In [11]:
# take first non-null event type and create event flag
base = base\
    .withColumn("event_type", sf.coalesce(base["type_nat"], base["type_reg"], base["type_loc"]))\
    .drop("type_nat", "type_reg", "type_loc")\
    .withColumn("event", sf.greatest("event_nat", "event_reg", "event_reg"))

### Times series

- day of week
- etc

# Validation
- Count rows
- Check for nulls, etc

Check row counts

In [12]:
assert base.count() == orig_row_count, "Feature/Future store rows have changed due to bad join. Should be the same as original row count."

                                                                                

Check for missing values

In [13]:
null_allowed = ["event_type"]
null_not_allowed = [col_name for col_name in base.columns if col_name not in null_allowed]

for col_name in null_not_allowed:
    assert_col_has_no_null(base, col_name)

                                                                                

In [14]:
base.show(5)

                                                                                

+----------+-----+---------+--------+---------+---------+-----------+------------+-----+----------+----+-------+--------+------------+---------+---------+---------+----------+
|      date| city|    state|item_nbr|       id|store_nbr|onpromotion|      family|class|perishable|type|cluster|new_item|cleared_item|event_nat|event_reg|event_loc|event_type|
+----------+-----+---------+--------+---------+---------+-----------+------------+-----+----------+----+-------+--------+------------+---------+---------+---------+----------+
|2017-08-16|Quito|Pichincha|   96995|125497040|        1|      false|   GROCERY I| 1093|         0|   D|     13|   false|       false|    false|    false|    false|      null|
|2017-08-16|Quito|Pichincha|   99197|125497041|        1|      false|   GROCERY I| 1067|         0|   D|     13|   false|       false|    false|    false|    false|      null|
|2017-08-16|Quito|Pichincha|  103501|125497042|        1|      false|    CLEANING| 3008|         0|   D|     13|   false

# Write feature store

Initial thoughts about modelling are that it will be done at a family-cluster level, that will produce 33*5=165 smaller models, that will then feed into models that apportion out the sales. This depends on how the promos work since they will have a significant impact on sales.

In [None]:
if build_on == "train":
    file_name = "feature_store.parquet"
elif build_on == "test":
    file_name = "future_store.parquet"
else:
    raise NotImplemented("Can only build feature or future store")

write_path = proj.Config.paths.get("data_proc").joinpath(file_name)
base.write.partitionBy("family").parquet(str(write_path), mode='overwrite')