# Init

In [11]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
#Create PySpark SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("feature_engineering") \
    .getOrCreate()

# Modules

In [12]:
import pandas as pd
import numpy as np
import datetime
from pyspark.sql import Row
from pyspark.sql import functions as F
from pyspark.sql.types import StringType


# Constante

In [13]:
date_var='DATE_ISO'

# Fonctions

In [14]:
def derive_spark_week(spark_data, date_var="DATE_ISO"):
  """This function takes the raw sales data and adds a week variable,
  later to be used in aggregation manipulations.
 
  Arguments:
      spark_data {pyspark.sql.DataFrame} -- A spark dataframe to use
      date_var {string} -- Name of date column to use in week derivation
 
  Returns:
      pyspark.sql.DataFrame -- A spark dataframe that contains the raw sales data and one additional
          variable called week_index representing week numbers from earliest observed sale.
  """
  
  # 1) first collect all unique data
  # 2) then put them in a python list
  dataset_dates = \
    spark_data. \
    groupBy(date_var). \
    agg({date_var: "count"}). \
    select(date_var).sort(F.col(date_var).asc()). \
    collect()
 
  unique_dates = [x[date_var] for x in dataset_dates]
 
  start_date = datetime.datetime.strptime(str(min(unique_dates)), '%Y%m%d')
  end_date = datetime.datetime.strptime(str(max(unique_dates)), '%Y%m%d')
 
  print("The start date of the data is {} and it will be considered as week index 1, the end date is {} and will be considered the last week index".
        format(start_date.strftime('%Y/%m/%d'), end_date.strftime('%Y/%m/%d')))
 
  date_range = [start_date + datetime.timedelta(days=x) for x in range((end_date - start_date).days+1)]
 
  res = []
  week_index = 0
 
  # 3) derive week index
  # 4) append it to original spark data
  for each_possible_date in date_range:
 
    if each_possible_date.weekday() == 0:
        week_index = week_index + 1
 
    temp_row = Row(
      week_index=week_index, 
      DATE_ISO=each_possible_date.year*10000 + each_possible_date.month*100 + each_possible_date.day, 
      seasonal_week_index=each_possible_date.strftime("%V")  # ISO 8601 week
    ) 
 
    temp_row.__fields__ = ["week_index", date_var, "seasonal_week_index"]
    res.append(temp_row)  
    
  return spark_data.join(spark.createDataFrame(res), on="DATE_ISO", how="left")

In [15]:
def create_features(df):
    """
    Creates time series features from datetime index
    """
    df['dayofweek'] = df['transaction_date'].dt.dayofweek
    df['quarter'] = df['transaction_date'].dt.quarter
    df['month'] = df['transaction_date'].dt.month
    df['year'] = df['transaction_date'].dt.year
    df['dayofyear'] = df['transaction_date'].dt.dayofyear
    df['dayofmonth'] = df['transaction_date'].dt.day
    df['weekofyear'] = df['transaction_date'].dt.weekofyear
    

    return df

# Features engineering with dates

In [16]:
# read sales data with rfm features
sales=pd.read_pickle('intermed/sales.sav').\
        merge(
            right=pd.read_pickle('intermed/rfm.sav')[['customer_Id','RFM_Score','Segment','Score']],
            on ='customer_Id',
            how='left'
             )

sales['DATE_ISO']=sales.transaction_date.astype(str).str.replace('-','')

# Weekend_FLG 
sales['Weekend_FLG']=sales['transaction_date'].dt.day_name().apply(lambda day: 1 if day in ['Sunday', 'Saturday'] else 0)

# end or start of week
sales['isMonthStart'] = sales['transaction_date'].dt.is_month_start.astype(int)
sales['isMonthEnd'] = sales['transaction_date'].dt.is_month_end.astype(int)

# Big black fridy
sales['blackFriday2011']=(pd.to_datetime('2011-11-25') - sales['transaction_date']).dt.days.apply(lambda x: x if x >=-5  and x < 5 else np.NaN)
sales['blackFriday2012']=(pd.to_datetime('2012-11-23') - sales['transaction_date']).dt.days.apply(lambda x: x if x >=-5  and x < 5 else np.NaN)
sales['blackFriday2013']=(pd.to_datetime('2013-11-29') - sales['transaction_date']).dt.days.apply(lambda x: x if x >=-5  and x < 5 else np.NaN)


# big events
christmas= ['2011-12-25','2012-12-25','2013-12-25','2014-12-25']
love=['2011-02-14','2012-02-14','2013-02-14','2014-02-14']
hallow=['2011-10-31','2012-10-31','2013-10-31','2014-10-31']

christmas.extend(love)
christmas.extend(hallow)
for col in christmas:
    sales[col]=(pd.to_datetime(col) - sales['transaction_date']).dt.days.apply(lambda x: x if x >=-5  and x < 0 else  np.NaN)
    
 # add days of week , days of year, days of month, week of month, ...   
sales=create_features(sales)

  df['weekofyear'] = df['transaction_date'].dt.weekofyear


In [17]:
big_event=sales.columns[sales.columns.str.startswith('20')]
sales['event']=sales[big_event].idxmin(axis=1)
sales['event_value']=sales[big_event].min(axis=1)
sales=sales.drop(big_event, axis=1)

In [18]:
calendar_data=sales[['DATE_ISO','transaction_date','blackFriday2013','blackFriday2012', 'blackFriday2011','event_value','isMonthEnd','isMonthStart','Weekend_FLG',
                      'quarter','month'	,'year','dayofweek', 'dayofyear','dayofmonth', 'weekofyear']].\
    sort_values(by='transaction_date').\
    drop_duplicates()

In [19]:
calendar_data

Unnamed: 0,DATE_ISO,transaction_date,blackFriday2013,blackFriday2012,blackFriday2011,event_value,isMonthEnd,isMonthStart,Weekend_FLG,quarter,month,year,dayofweek,dayofyear,dayofmonth,weekofyear
17482,20110125,2011-01-25,,,,,0,0,0,1,1,2011,1,25,25,4
4558,20110126,2011-01-26,,,,,0,0,0,1,1,2011,2,26,26,4
18246,20110127,2011-01-27,,,,,0,0,0,1,1,2011,3,27,27,4
10638,20110128,2011-01-28,,,,,0,0,0,1,1,2011,4,28,28,4
11724,20110129,2011-01-29,,,,,0,0,1,1,1,2011,5,29,29,4
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
804,20140216,2014-02-16,,,,-2.0,0,0,1,1,2,2014,6,47,16,7
18361,20140217,2014-02-17,,,,-3.0,0,0,0,1,2,2014,0,48,17,8
83,20140218,2014-02-18,,,,-4.0,0,0,0,1,2,2014,1,49,18,8
7572,20140219,2014-02-19,,,,-5.0,0,0,0,1,2,2014,2,50,19,8


# SPARK Dataframe

In [21]:
# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
# Create a Spark DataFrame from a pandas DataFrame using Arrow
sales_sdf = spark.createDataFrame(sales)
calendar_sdf = derive_spark_week(spark.createDataFrame(calendar_data))


The start date of the data is 2011/01/25 and it will be considered as week index 1, the end date is 2014/02/20 and will be considered the last week index


In [22]:
sales_daily_total=sales.\
        groupby('DATE_ISO').agg({'total_amt':'sum'}).\
        reset_index()
sales_daily_total.head()

Unnamed: 0,DATE_ISO,total_amt
0,20110125,48957.025
1,20110126,62045.75
2,20110127,30886.96
3,20110128,39730.275
4,20110129,42027.57


In [23]:
sales_level_1=sales_daily_total.merge(calendar_sdf.toPandas(), on='DATE_ISO')
sales_level_1['seasonal_week_index']=sales_level_1['seasonal_week_index'].astype(int)
sales_level_1.to_pickle('intermed/sales_level_1.sav')
sales_level_1.head()

Unnamed: 0,DATE_ISO,total_amt,transaction_date,blackFriday2013,blackFriday2012,blackFriday2011,event_value,isMonthEnd,isMonthStart,Weekend_FLG,quarter,month,year,dayofweek,dayofyear,dayofmonth,weekofyear,week_index,seasonal_week_index
0,20110125,48957.025,2011-01-25,,,,,0,0,0,1,1,2011,1,25,25,4,0,4
1,20110126,62045.75,2011-01-26,,,,,0,0,0,1,1,2011,2,26,26,4,0,4
2,20110127,30886.96,2011-01-27,,,,,0,0,0,1,1,2011,3,27,27,4,0,4
3,20110128,39730.275,2011-01-28,,,,,0,0,0,1,1,2011,4,28,28,4,0,4
4,20110129,42027.57,2011-01-29,,,,,0,0,1,1,1,2011,5,29,29,4,0,4


# Features engineering with week and seasonalities

In [24]:
# total sales by week + season markers
features_eng=derive_spark_week(sales_sdf)
features_eng.toPandas()

The start date of the data is 2011/01/25 and it will be considered as week index 1, the end date is 2014/02/20 and will be considered the last week index


Unnamed: 0,DATE_ISO,customer_Id,DOB,Gender,city_code,prod_cat_code,prod_cat,prod_sub_cat_code,prod_subcat,transaction_id,...,quarter,month,year,dayofyear,dayofmonth,weekofyear,event,event_value,week_index,seasonal_week_index
0,20110318,270181,1970-01-10,F,2.0,3,Electronics,8,Personal Appliances,47260895038,...,1,3,2011,77,18,11,,,7,11
1,20110318,269345,1970-06-26,F,10.0,4,Bags,4,Women,36820359169,...,1,3,2011,77,18,11,,,7,11
2,20110318,266794,1971-02-28,F,9.0,1,Clothing,3,Kids,21355537809,...,1,3,2011,77,18,11,,,7,11
3,20110318,274369,1973-03-14,F,9.0,3,Electronics,9,Cameras,6878976004,...,1,3,2011,77,18,11,,,7,11
4,20110318,270672,1973-05-03,M,2.0,3,Electronics,5,Computers,39847743831,...,1,3,2011,77,18,11,,,7,11
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
20871,20140129,270240,1988-12-07,M,1.0,1,Clothing,1,Women,64939234724,...,1,1,2014,29,29,5,,,157,05
20872,20140129,271972,1989-09-04,M,1.0,3,Electronics,4,Mobiles,8172983400,...,1,1,2014,29,29,5,,,157,05
20873,20140129,271304,1989-12-15,F,1.0,3,Electronics,9,Cameras,37626170750,...,1,1,2014,29,29,5,,,157,05
20874,20140129,267243,1990-03-19,F,6.0,1,Clothing,1,Women,65582112405,...,1,1,2014,29,29,5,,,157,05


In [26]:
sales_total = \
  derive_spark_week(sales_sdf). \
  withColumn("year", F.col("DATE_ISO").cast(StringType()).substr(1, 4)). \
  groupby(["Customer_Id", "year", "seasonal_week_index", "week_index"]). \
  agg(*([F.sum(x).alias(x) for x in ["total_amt"]])). \
  withColumn("summer_weeks", F.when((F.col("seasonal_week_index") >= 14) & (F.col("seasonal_week_index") <= 39), F.lit(1)).otherwise(F.lit(0)))
 
sales_total.show()

The start date of the data is 2011/01/25 and it will be considered as week index 1, the end date is 2014/02/20 and will be considered the last week index
+-----------+----+-------------------+----------+---------+------------+
|Customer_Id|year|seasonal_week_index|week_index|total_amt|summer_weeks|
+-----------+----+-------------------+----------+---------+------------+
|     275024|2012|                 10|        58| 4823.325|           0|
|     267152|2011|                 49|        45|  2625.48|           0|
|     271611|2011|                 05|         1|  1600.04|           0|
|     267940|2011|                 28|        24|  4875.26|           1|
|     267943|2012|                 43|        91| 6005.675|           0|
|     270321|2011|                 23|        19|   2607.8|           1|
|     275186|2013|                 48|       148|  6059.82|           0|
|     273794|2011|                 39|        35| 3795.675|           1|
|     270839|2012|                 50|     

In [27]:
# form seasonal volume share: by year
 
# grouping on summer_weeks alone would get full period vals
# grouping on summer_weeks + year would get year vals
time_groupers = ["summer_weeks", "year"]
seasonal_share = sales_total. \
  groupby(["Customer_Id"] + time_groupers). \
  agg(F.sum(F.col("total_amt")).alias("sales")). \
  groupBy(["Customer_Id", "year"]). \
  pivot("summer_weeks"). \
  agg(F.max(F.col("sales"))). \
  withColumnRenamed("0", "winter_weeks"). \
  withColumnRenamed("1", "summer_weeks"). \
  fillna(0).\
  withColumn("winter_weeks_share", F.round(F.col("winter_weeks")/(F.col("winter_weeks") + F.col("summer_weeks")),1)). \
  withColumn("summer_weeks_share", F.round(F.col("summer_weeks")/(F.col("winter_weeks") + F.col("summer_weeks")),1)).\
  fillna(0)
seasonal_share.show()

+-----------+----+------------+-----------------+------------------+------------------+
|Customer_Id|year|winter_weeks|     summer_weeks|winter_weeks_share|summer_weeks_share|
+-----------+----+------------+-----------------+------------------+------------------+
|     275120|2013|     2095.08|              0.0|               1.0|               0.0|
|     271189|2014|    4524.975|              0.0|               1.0|               0.0|
|     272487|2012|      2983.5|          3228.81|               0.5|               0.5|
|     267103|2012|    9185.865|         3347.045|               0.7|               0.3|
|     267698|2013|      112.71|           183.43|               0.4|               0.6|
|     269610|2011|         0.0|          3301.74|               0.0|               1.0|
|     274114|2013|      565.76|         4757.025|               0.1|               0.9|
|     268486|2011|         0.0|         3983.525|               0.0|               1.0|
|     269228|2012|    1868.555| 

In [28]:
seasonal_share.\
    groupBy("summer_weeks_share"). \
    agg({"Customer_Id": "count"}).\
    show()

+------------------+------------------+
|summer_weeks_share|count(Customer_Id)|
+------------------+------------------+
|               0.0|              5077|
|               0.2|               350|
|               0.7|               353|
|               0.1|               271|
|               1.0|              4288|
|               0.6|               416|
|               0.8|               353|
|               0.5|               429|
|               0.4|               375|
|               0.9|               266|
|               0.3|               392|
+------------------+------------------+

