## ETL

In this notebook we will transform the raw data that we explored in the previous one. The target of our problem, for each day, is the amount of the next bill that the reseller will pay. 

As the model is going to need to run every day to keep predictions relevant, the ETL that we build here is going to be deployed in Glue as a process that we are going to run every day orchestrated by Step Functions.

The Glue process is going to read the raw data from the Data Lake, and is going to take the last 4 months of history to build features relevant to predict how much is each reseller going to expend next time, based on reseller's characteristics and past shopping patterns. 


In [1]:
import pandas as pd
from datetime import date
import numpy as np
from sklearn.preprocessing import OneHotEncoder
from sklearn.preprocessing import LabelEncoder
import boto3
import pickle
import io
import awswrangler

In [2]:
bucket = 'zoomagri-maxi-bucket-sagemaker'

In [3]:
%store bucket

Stored 'bucket' (str)


In [4]:
session = awswrangler.Session()
df = session.pandas.read_sql_athena(
    sql="select * from resellers_sample",
    database="implementationdb"
)
df_r = session.pandas.read_sql_athena(
    sql="select * from reseller",
    database="implementationdb"
)

In [21]:
df

Unnamed: 0,date,id_reseller,bill,mean-last-30,mean-last-7,std-last-30,days_without_purchase,weekday,next_bill,last_bill,zone,cluster
3,2019-01-31,499921276,10515.048,9249.553500,9249.55350,1789.679485,2,Thursday,10515.048,7984.059,1050,B
4,2019-02-01,499921276,0.000,9249.553500,9249.55350,1789.679485,0,Friday,10717.103,10515.048,1050,B
5,2019-02-02,499921276,0.000,9249.553500,9249.55350,1789.679485,1,Saturday,10717.103,10515.048,1050,B
6,2019-02-03,499921276,0.000,9249.553500,9249.55350,1789.679485,2,Sunday,10717.103,10515.048,1050,B
7,2019-02-04,499921276,0.000,9249.553500,10515.04800,1789.679485,3,Monday,10717.103,10515.048,1050,B
...,...,...,...,...,...,...,...,...,...,...,...,...
139880,2019-05-13,500800672,9741.581,11722.998800,12917.17375,7280.688377,2,Monday,9741.581,1196.989,1050,A
139881,2019-05-14,500800672,0.000,11722.998800,12795.63600,7280.688377,0,Tuesday,11711.968,9741.581,1050,A
139882,2019-05-15,500800672,0.000,11348.496444,12795.63600,7619.490861,1,Wednesday,11711.968,9741.581,1050,A
139883,2019-05-16,500800672,0.000,11348.496444,5469.28500,7619.490861,2,Thursday,11711.968,9741.581,1050,A


In [5]:
%store df

Stored 'df' (DataFrame)


In [6]:
%store df_r

Stored 'df_r' (DataFrame)


In [7]:
df['date'] = pd.to_datetime(df['date'])

### Filter the last 4 months of data

In [8]:
max_date = df['date'].max()

In [9]:
min_date = max_date - pd.to_timedelta(120, unit='d')

In [10]:
df = df[df['date'] > min_date]

### Feature engeneering based on past history

For each one of the arround 12K resellers, we are going to compute features based on their shopping history: mean and standard deviation of the last week and the last month, and also the amount of the last purchase and how many days as he or she spent without shopping to the date.

In [11]:
def completeItem(dfItem):
    min_date = dfItem['date'].min()
    max_date = dfItem['date'].max()
    if min_date == max_date:
        #only one data point
        return
    r = pd.date_range(start=min_date, end=max_date)
    dfItemNew = dfItem.set_index('date').reindex(r).rename_axis('date').reset_index()
    
    dfItemNew['mean-last-30'] = dfItemNew['bill'].rolling(30,min_periods=1).mean().reset_index()['bill']
    dfItemNew['mean-last-7'] = dfItemNew['bill'].rolling(7,min_periods=1).mean().reset_index()['bill']
    dfItemNew['std-last-30'] = dfItemNew['bill'].rolling(30,min_periods=1).std().reset_index()['bill']
    dfItemNew['bill'] = dfItemNew['bill'].fillna(0)
    dfItemNew['id_reseller'] = dfItem['id_reseller'].max()
    dfItemNew['std-last-30'].fillna(method='ffill',inplace=True)
    dfItemNew['mean-last-7'].fillna(method='ffill',inplace=True)
    resp = []
    counter = 0
    for index,row in dfItemNew.iterrows(): 
        resp.append(counter)
        if row['bill'] == 0: 
            counter += 1 
        else:
            counter = 0
    dfItemNew['days_without_purchase'] = pd.Series(resp)
    return dfItemNew

In [12]:
i = 0
dfCompletedList = []
for nid,item in df.groupby('id_reseller'):
    i = i+1
    if i%200 == 0:
        print ('processed {} resellers'.format(str(i)))
    dfCompletedList.append(completeItem(item))

processed 200 resellers
processed 400 resellers
processed 600 resellers
processed 800 resellers
processed 1000 resellers


In [13]:
df = pd.concat(dfCompletedList).copy()
del dfCompletedList
df['weekday']  = df['date'].dt.weekday_name


### Compute next bill

In [14]:
df['next_bill'] = df.replace(0,np.nan).groupby('id_reseller')['bill'].fillna(method='bfill')

### Compute last bill

In [15]:
df['last_bill'] = df.replace(0,np.nan).groupby('id_reseller')['bill'].fillna(method='ffill').copy()
different_zero = df['last_bill'].shift(1)
df.loc[df['bill'] != 0,'last_bill'] = np.nan
df['last_bill'] = df['last_bill'].fillna(different_zero)

### Join the sales with the rest of the reseller info

In [16]:
df = df.merge(df_r,how='inner',on='id_reseller')

In [17]:
df = df.dropna()

## Deal with categorical variables

To deal with categorical variables (reseller's cluster and reseller's zone), we will use a combination of sklearn's Label Encoder, a preprocessing module that transforms strings in numeric lables, and One Hot Encoder, that takes this numerical variables and creates dummy (0/1 state) variables. 

This modules are python objects that keep in their internal variables the information necessary to transform new data.  So, in the Glue ETL we are going to store this objects in pkl format


In [18]:
df['zone'].value_counts().head(8)

1019    25569
1050    13174
1031    10392
1033     7007
1067     5590
1051     4818
1034     4402
1015     4070
Name: zone, dtype: int64

In [19]:
df['zone'] = df['zone'].apply(lambda x: x if x in [1019,1050,1031,1033,1051,1067] else 0)

In [20]:
le_cluster = LabelEncoder()
ohe_cluster = OneHotEncoder(handle_unknown='ignore')
df_cluster = pd.DataFrame(ohe_cluster.fit_transform(le_cluster.fit_transform(df['cluster'].fillna('')).reshape(-1, 1)).todense())
df_cluster = df_cluster.add_prefix('cluster_')

In [22]:
le_zone = LabelEncoder()
ohe_zone = OneHotEncoder(handle_unknown='ignore')
df_zone = pd.DataFrame(ohe_zone.fit_transform(le_zone.fit_transform(df['zone'].fillna('')).reshape(-1, 1)).todense())
df_zone = df_zone.add_prefix('zone_')

In [23]:
le_weekday = LabelEncoder()
ohe_weekday = OneHotEncoder(handle_unknown='ignore')
df_weekday = pd.DataFrame(ohe_weekday.fit_transform(le_weekday.fit_transform(df['weekday']).reshape(-1, 1)).todense())
df_weekday = df_weekday.add_prefix('weekday_')

In [24]:
client = boto3.client('s3')
client.put_object(Body=pickle.dumps(le_cluster), Bucket=bucket, Key='preprocessing/le_cluster.pkl');
client.put_object(Body=pickle.dumps(ohe_cluster), Bucket=bucket, Key='preprocessing/ohe_cluster.pkl')
client.put_object(Body=pickle.dumps(le_zone), Bucket=bucket, Key='preprocessing/le_zone.pkl')
client.put_object(Body=pickle.dumps(ohe_zone), Bucket=bucket, Key='preprocessing/ohe_zone.pkl')
client.put_object(Body=pickle.dumps(le_weekday), Bucket=bucket, Key='preprocessing/le_weekday.pkl')
client.put_object(Body=pickle.dumps(ohe_weekday), Bucket=bucket, Key='preprocessing/ohe_weekday.pkl');

## Write to S3 resulting ETL

Now we have to write to S3 all the relevant columns. We will perform a train/validation split of the customers so we can train on a group and get relevant metrics on the other.

The <b>label</b> that we have to predict in this problem is the amount of the next bill. 
SageMaker requires, that the input information in csv format contains <b> no header and no index, and that the first column must be occupied by the label. </b>


In [25]:
df = df[['next_bill', 'bill', 'date', 'id_reseller', 'mean-last-30', 'mean-last-7',
       'std-last-30', 'days_without_purchase', 'weekday', 
       'last_bill', 'zone', 'cluster']]

In [26]:
df = pd.concat([df,df_cluster,df_zone,df_weekday],axis=1)

In [27]:
#Take a random 10% sample of the resellers and then shuffle the records 
val_resellers = list(pd.Series(df['id_reseller'].unique()).sample(frac=0.1))

In [28]:
df_train = df[~df['id_reseller'].isin(val_resellers)].sample(frac=1)

In [29]:
df_validation = df[df['id_reseller'].isin(val_resellers)].sample(frac=1)

In [30]:
df_train.columns

Index(['next_bill', 'bill', 'date', 'id_reseller', 'mean-last-30',
       'mean-last-7', 'std-last-30', 'days_without_purchase', 'weekday',
       'last_bill', 'zone', 'cluster', 'cluster_0', 'cluster_1', 'cluster_2',
       'cluster_3', 'cluster_4', 'zone_0', 'zone_1', 'zone_2', 'zone_3',
       'zone_4', 'zone_5', 'zone_6', 'weekday_0', 'weekday_1', 'weekday_2',
       'weekday_3', 'weekday_4', 'weekday_5', 'weekday_6'],
      dtype='object')

For the model to be useful, we have to drop all the columns that are wither too specific or that are not going to be available on prediction time.

In [31]:
df_train.drop(['date','id_reseller','bill','zone','cluster','weekday'],axis=1,inplace=True)

In [32]:
df_validation.drop(['date','id_reseller','bill','zone','cluster','weekday'],axis=1,inplace=True)

In [33]:
df_train.columns

Index(['next_bill', 'mean-last-30', 'mean-last-7', 'std-last-30',
       'days_without_purchase', 'last_bill', 'cluster_0', 'cluster_1',
       'cluster_2', 'cluster_3', 'cluster_4', 'zone_0', 'zone_1', 'zone_2',
       'zone_3', 'zone_4', 'zone_5', 'zone_6', 'weekday_0', 'weekday_1',
       'weekday_2', 'weekday_3', 'weekday_4', 'weekday_5', 'weekday_6'],
      dtype='object')

In [34]:
pred_columns = list(df_train.columns)[1:]
%store pred_columns

Stored 'pred_columns' (list)


In [35]:
df_train.to_csv('train.csv',index=False,header=False)

In [36]:
df_validation.to_csv('validation.csv',index=False,header=False)

## Local pickle  preprocessing

Now we pickle the sklearn's preprocessing blocks. Later we are going to use them as part of the pipeline, to transform new information.

In [37]:
preprocessing_list = [le_cluster,ohe_cluster,le_zone,ohe_zone,le_weekday,ohe_weekday]


In [38]:
with open('preprocessing.pkl', 'wb') as handle:
    pickle.dump(preprocessing_list, handle)