# Eleven supercase

### Data preparation

In [1]:
import pandas as pd
import numpy as np

import dask
import dask.dataframe as dd
import dask.distributed as dist

In [2]:
import pathlib

In [3]:
cluster = dist.LocalCluster(n_workers=4, memory_limit='8G') # MBP config
client = dist.Client(cluster)

In [4]:
client

0,1
Client  Scheduler: tcp://127.0.0.1:50626  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 8  Memory: 32.00 GB


In [94]:
common_params = {
    'sep': ';',
    'index_col': 0
}

#### Clients

In [182]:
!head ../data/CLIENTS_new.csv

;CLIENT_NUMBER;CREATION_DATE;GENDER;BIRTH_YEAR;ZIPCODE
0;-1182315409949716431;2002-12-05;Mme;1955.0;68440
1;7142641182482142162;2002-12-05;Mme;;24100
2;-4786858050561650068;2002-12-05;Mme;1945.0;39240
3;-8463163259230597037;2002-12-05;Mme;;42290
4;-8047395753631590665;2003-10-06;Mme;1934.0;13006
5;-2369385976575044628;1995-03-01;Mme;1950.0;78770
6;159112630817189253;2002-12-04;Mme;;30132
7;-1608550333657870307;2002-12-04;Mme;;30840
8;-4343559112705444389;2002-12-04;Mme;;.


In [183]:
!wc -l ../data/CLIENTS_new.csv

 3125801 ../data/CLIENTS_new.csv


In [184]:
clients = pd.read_csv('../data/CLIENTS_new.csv', parse_dates=[2], **common_params, dtype={'CLIENT_NUMBER': object})
# Lowercase columns
clients.columns = clients.columns.map(str.lower)

  mask |= (ar1 == a)


In [185]:
clients.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 3125800 entries, 0 to 3125799
Data columns (total 5 columns):
client_number    object
creation_date    datetime64[ns]
gender           object
birth_year       float64
zipcode          object
dtypes: datetime64[ns](1), float64(1), object(3)
memory usage: 143.1+ MB


In [186]:
# Normalize client numbers
#clients['client_number'] = np.abs(clients['client_number'])
# Categorise gender
clients['gender'] = clients['gender'].astype('category')
# Clean zipcode : we remove all foreign zipcodes
valid_zipcodes = clients['zipcode'].str.match(r'[A-Z0-9-]{5}', na=False)
clients.loc[clients['zipcode'].isnull() | ~valid_zipcodes, 'zipcode'] = np.nan

#### Commandes

In [187]:
!head ../data/CMD_new.csv

;CLIENT_NUMBER;ORDER_NUMBER;ORDER_DATE;ORDER_CHANNEL;PRE_TAX_AMOUNT;CVIC
0;7178742953965764755;2302390;2010-01-05;Telephone;84.28;True
1;-2373390172893352743;2302392;2010-01-17;Telephone;97.85;True
2;-240944575938990062;2302393;2010-01-05;Telephone;58.45;True
3;1720395480962300217;2302394;2010-01-05;Telephone;31.19;True
4;7161131950067982732;2302396;2010-01-14;Telephone;37.45;True
5;-792970086774522568;2302397;2010-01-20;Courier;53.59;True
6;-6274234340433084502;2302398;2010-01-05;Telephone;69.74;False
7;879133938684538799;2302399;2010-01-05;Telephone;95.64;False
8;3966491398253851080;2302400;2010-01-05;Telephone;59.78;True


In [188]:
!wc -l ../data/CMD_new.csv

 7323228 ../data/CMD_new.csv


In [189]:
orders = pd.read_csv('../data/CMD_new.csv', parse_dates=[3], **common_params, dtype={'CLIENT_NUMBER': object})
# Lowercase columns
orders.columns = orders.columns.map(str.lower)

  mask |= (ar1 == a)


In [190]:
orders.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 7323227 entries, 0 to 7323226
Data columns (total 6 columns):
client_number     object
order_number      int64
order_date        datetime64[ns]
order_channel     object
pre_tax_amount    float64
cvic              bool
dtypes: bool(1), datetime64[ns](1), float64(1), int64(1), object(2)
memory usage: 342.2+ MB


In [172]:
# Normalize client numbers
orders['client_number'] = np.abs(orders['client_number'])

How many orders have been made via Internet by the same customer during the same day ?

In [None]:
((orders.loc[lambda df: df['order_channel'] == 'Internet']
      .groupby(['client_number', 'order_date'])
      .count()
      .loc[lambda df: df.index.get_level_values(1) > '2017-08-01', 'order_number']) == 2).sum()

#### Join commandes / utilisateurs

In [191]:
!head ../data/TABLE_CONVERSION_new.csv

;VISITOR_ID;CLIENT_NUMBER
0;153926;2117916268519517296
1;153987;-3983308646720649306
2;153992;-2525192206264180159
3;153996;1543724696132514059
4;154002;-3753347333864614269
5;154003;-6763383487168091997
6;154004;-7429564109483778435
7;154005;403207568119538010
8;154006;2387852670736446380


In [192]:
!wc -l ../data/TABLE_CONVERSION_new.csv

  580405 ../data/TABLE_CONVERSION_new.csv


In [193]:
joins = pd.read_csv('../data/TABLE_CONVERSION_new.csv', **common_params, dtype={'CLIENT_NUMBER': object})
# Lowercase columns
joins.columns = joins.columns.map(str.lower)

In [194]:
joins.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 580404 entries, 0 to 580403
Data columns (total 2 columns):
visitor_id       580404 non-null int64
client_number    580404 non-null object
dtypes: int64(1), object(1)
memory usage: 13.3+ MB


In [177]:
# Normalize client numbers
joins['client_number'] = np.abs(joins['client_number'])

In [196]:
joins.head()

Unnamed: 0,visitor_id,client_number
0,153926,2117916268519517296
1,153987,-3983308646720649306
2,153992,-2525192206264180159
3,153996,1543724696132514059
4,154002,-3753347333864614269


#### Logs

In [None]:
!head ../data/LOG_WEB_201708.csv

In [None]:
!wc -l ../data/LOG_WEB_20*

Quick fix for the 201709 file (remove unneeded column)

In [None]:
# Drop first two columns
(pd.read_csv('../data/LOG_WEB_201709.csv', sep=';')
   .iloc[:, 2:]
   .to_csv('../data/LOG_WEB_201709_new.csv', sep=';'))

We do a quick preprocessing and save the result in parquet in order to improve loading times

In [None]:
logs = (dd.read_csv('../data/LOG_WEB_20*.csv', 
            sep=';', 
            parse_dates=[4,5], 
            blocksize=500*1e6, 
            dtype={'LOADINGS': 'float64'}))

In [None]:
import hashlib

def session_hash(row, selected=None):
    payload = '-'.join([str(row[c]) for c in selected])
    return hashlib.sha1(payload.encode('utf8')).hexdigest()[:12]

In [None]:
mapping = pd.read_csv('../data/funnel_w_label.csv', sep=';')

In [None]:
mapping = mapping.drop(['key', 'cat_4'], axis=1)
mapping = mapping.replace('none', np.nan)
mapping = mapping.applymap(lambda s: s.strip() if s is not np.nan else s)

In [None]:
# Lowercase columns
logs.columns = logs.columns.map(str.lower)
# Drop id and loadings
logs = logs.iloc[:, 1:-1]

## Feature engineering

# Recreate session id
logs['session_id'] = logs.apply(session_hash, axis=1, selected=['visitor_id', 'session_start_date', 'global_sources', 'device_type'], meta=str)

# Drop unknown pages
logs = logs.dropna(subset=['pages'], how='all')
# Extract "fil d'ariane" page components 
meta = pd.DataFrame(columns=['page_top', 'page_sub1', 'page_sub2'], dtype='object')
logs[['page_top', 'page_sub1', 'page_sub2']] = logs['pages'].map_partitions(lambda df: df.str.extract(
    '(?P<page_top>[\w ]+)(?:\:\:)*(?P<page_sub1>[\w ]+)*(?:\:\:)*(?P<page_sub2>[\w ]+)*', 
    expand=True
), meta=meta)
logs[['page_top', 'page_sub1', 'page_sub2']] = logs[['page_top', 'page_sub1', 'page_sub2']].applymap(lambda s: s.strip() if isinstance(s, str) else 'none')
# Then, merge rare categories mapping
logs = logs.merge(mapping, how='left', left_on=['page_top', 'page_sub1', 'page_sub2'], right_on=['cat_1', 'cat_2', 'cat_3'])
# Then, apply rules
logs['label'] = logs['label'].where(~logs['page_top'].str.contains('accueil'), 'accueil')
logs['label'] = logs['label'].where(~logs['page_top'].str.contains('category'), 'category')
logs['label'] = logs['label'].where(~(
    (logs['page_top'].str.contains('category')) & 
    (logs['page_sub1'].str.contains('sous_category')) 
), 'category')
logs['label'] = logs['label'].where(~(
    (logs['page_top'].str.contains('category')) & 
    ((logs['page_sub1'].str.contains('produit')) | (logs['page_sub2'].str.contains('produit')))
), 'product')
logs['label'] = logs['label'].fillna('page') # Default value
# Finally, replace 'none' by np.nan
logs[['page_top', 'page_sub1', 'page_sub2']] = logs[['page_top', 'page_sub1', 'page_sub2']].applymap(lambda s: np.nan if s == 'none' else s)

## Encoding
# Parse connected as boolean
logs['connected_session'] = logs['connected_session'].map({'OUI': True, 'NON': False})
# Categorize variables with low cardinality
for col in ['page_top', 'device_type', 'device_model']:
    logs[col] = logs[col].astype('category')

# Drop useless columns
logs = logs.drop(labels=['pages', 'id_session', 'cat_1', 'cat_2', 'cat_3'], axis=1)

In [None]:
logs.to_parquet('../data/logs')

Then, we create the sessions dataset for future clustering use

In [56]:
sessions = dd.read_parquet('../data/sorted_logs')

In [57]:
# Create features
elements = []
elements.append(sessions.groupby(sessions.index)['visitor_id'].first())
elements.append(sessions.groupby(sessions.index)['event_date'].count().rename('nb_pages'))
elements.append(sessions.groupby(sessions.index)['session_start_date'].first())
elements.append(sessions.groupby(sessions.index)['event_date'].max().rename('session_end_date'))
elements.append(sessions.groupby(sessions.index)['global_sources'].first())
elements.append(sessions.groupby(sessions.index)['device_type'].first())
elements.append(sessions.groupby(sessions.index)['device_model'].first())
elements.append(sessions.groupby(sessions.index).apply(lambda df: df['label'].values.tolist()).rename('funnel'))

  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  # Remove the CWD from sys.path while we load stuff.


In [58]:
# Merge everything into a DataFrame
elements = [e.to_frame() for e in elements]
sessions = elements[0]
for e in elements[1:]:
    sessions = sessions.merge(e, left_index=True, right_index=True)

In [59]:
# Some more features
sessions['duration'] = sessions['session_end_date'] - sessions['session_start_date']
sessions['session_date'] = sessions['session_start_date'].dt.date

In [None]:
sessions = sessions.reset_index()
sessions = sessions.compute()

Then we reconcile orders data with sessions data

In [246]:
# First, we filter internet orders 
cleaned_orders = joins.merge(orders, how='inner', on='client_number')
cleaned_orders = cleaned_orders.loc[cleaned_orders.order_channel == 'Internet', ['visitor_id', 'client_number', 'order_date', 'order_channel', 'pre_tax_amount', 'cvic']]

In [247]:
# We make sure merge keys are dates
sessions['session_date'] = pd.to_datetime(sessions['session_date'])
cleaned_orders['order_date'] = pd.to_datetime(cleaned_orders['order_date'])

In [248]:
# We sort merge keys
sessions = sessions.sort_values(by='session_date')
cleaned_orders = cleaned_orders.sort_values(by='order_date')

In [249]:
sessions_with_orders = pd.merge_asof(sessions, cleaned_orders, left_on='session_date', right_on='order_date', by='visitor_id', tolerance=pd.Timedelta('3d'), direction='forward')

In [250]:
sessions_with_orders.to_csv('../data/sessions_with_orders.csv.gz', compression='gzip')

Consistency check

In [252]:
sessions_with_orders.cvic.value_counts()

False    127486
True      29463
Name: cvic, dtype: int64