# LAMA into the Wild Part1: DataPreperation

# Overview

In this section We will preprocess the Data that we needed in the whole program. The result will be stored in `../data/pre`

In [2]:
import os
import gc
import typing as t
import logging

import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

from data import DATA_DIR
from lama.util import to_tuple, identity
from lama.util.decorators import enable_logging
from lama.util.StreamerBuilder import StreamerBuilder
from lama.preprocessing.DataProcessor import nans, change_object_col, reformat_dataframe, split_with_index, standarize_col

ModuleNotFoundError: No module named 'data'

# 1.1 Define Global Constants

To make our reading and writing more easier, we decide to define some global constants below, you can change them if you'd like to write to another location.

In [None]:
OUT_DIR = os.path.join(DATA_DIR, "pre")
logger = logging.getLogger('root')

The process looks alike, we will handle them one by one.

- filter out Nan values
- check unique columns
- convert object values to numericals
- catagorize discrete and continous features
- scaling columns with Standardizer or Normalizer. 

# Train and Test

We will first handle train and test dataset.

In [None]:
df_test = pd.read_csv(os.path.join(DATA_DIR, 'test.csv'), header=0)
df_train = pd.read_csv(os.path.join(DATA_DIR, 'train.csv'), header=0)

* check nans

In [None]:
# check nans
print(f'df_test nans: \n{nans(df_test)}\n')
print(f'df_train nans: \n{nans(df_train)}\n')

df_test_copy = df_test.copy()

Before we go on to next section. lets check if there is abnormal distribution in train dataset.

In [None]:
sns.histplot(df_train['target'], kde=True).set(title="HistPlot of Train Target")
unexpected = (df_train['target'] < -30).sum()
print(f'unexpected value: {unexpected}')

Clearly there exists some unexpected values when target < -30.


One more thing to notice is the distributiton is symmetric to 0. 

In [None]:
df_train_copy = df_train[df_train['target'] > -30]
sns.histplot(df_train_copy['target'], kde=True).set(title="HistPlot of Train Target")

then we check if the id is unique, this procedure is important as we might need to outer join other datasets with id. 

In [None]:
train_count = df_train_copy.shape[0]
test_count = df_test_copy.shape[0]
print(df_train_copy['card_id'].nunique() == train_count)
print(df_test_copy['card_id'].nunique() == test_count)

In [None]:
df_train_copy.info()

Now we have to convert obejct to numeric values, as we seen above, there are two object values, since the card_id is a foreign key bound to match more features in other dataset, we'll leave it intact. 

In [None]:

features = ['first_active_month']
df_temps = reformat_dataframe(df_test_copy.append(df_train_copy), features, change_object_col)
df_test_copy, df_train_copy = to_tuple(split_with_index(df_temps, test_count))
del df_temps

After change the columns we would like to see if the train and test features are evenly splitted.

In [None]:
features = ['first_active_month', 'feature_1', 'feature_2', 'feature_3']
for feature in features:
    (df_train_copy[feature].value_counts().sort_index() / train_count).plot()
    (df_test_copy[feature].value_counts().sort_index() / test_count).plot()
    plt.legend(['train', 'test'])
    plt.xlabel(feature)
    plt.ylabel('ratio')
    plt.show()

Now we need to standarize the features, since not every model need standarized data, we will just leave the builder here uncollected.  

In [None]:
builder = StreamerBuilder.build([df_test_copy, df_train_copy]) \
    .map(lambda df: reformat_dataframe(df, features, standarize_col))

Last step is to write the result back, and fetch when we needed.

In [None]:
df_test_copy.to_csv(os.path.join(OUT_DIR, 'test_pre.csv'), index=False)
df_train_copy.to_csv(os.path.join(OUT_DIR, 'train_pre.csv'), index=False)

del df_test, df_test_copy, df_train, df_train_copy, features, test_count, train_count, unexpected


# Merchant Data purge

Now we will hava a look at merchants.csv

In [None]:
df_merchant = pd.read_csv(os.path.join(DATA_DIR, 'merchants.csv'), header=0)
df_merchant.info()

In [None]:
print (df_merchant.shape[0], df_merchant['merchant_id'].nunique())
nans(df_merchant)


Obviously, there exists some merchant whose id appears multiple times in this dataset. In catagory_2 lacks a significant amount of datas. We will check the unique value from category 4 to see if it is possible to repace nans to 0

We will change the object columns to numerical ones.

In [None]:


merchants_category_cols = ['merchant_id', 'merchant_group_id', 'merchant_category_id',
                 'subsector_id', 'category_1', 'most_recent_sales_range', 'most_recent_purchases_range', 'city_id', 'state_id', 'category_4', 'category_2']

merchants_numeric_cols = ['numerical_1', 'numerical_2', 'avg_sales_lag3', 'avg_purchases_lag3', 'active_months_lag3', 'avg_sales_lag6', 'avg_purchases_lag6', 'active_months_lag6',
                'avg_sales_lag12', 'avg_purchases_lag12', 'active_months_lag12']

assert len(merchants_numeric_cols) + len(merchants_category_cols) == len(df_merchant.columns)

check category columns unique values

In [None]:
df_merchant[merchants_category_cols].nunique()
df_merchant[merchants_category_cols].dtypes
df_merchant['category_2'] .unique()
df_merchant['category_2'].fillna(-1, inplace=True)
cols = ['category_1', 'most_recent_sales_range', 'most_recent_purchases_range', 'category_4']
df_merchant = reformat_dataframe(df_merchant, cols, change_object_col)
print(df_merchant[df_merchant['category_2'] == -1])

In [None]:
df_merchant[merchants_numeric_cols].dtypes
nans(df_merchant)
df_merchant.describe()

In [None]:
inf_cols = ['avg_purchases_lag3', 'avg_purchases_lag6', 'avg_purchases_lag12']

# replace infinity with second largest values
df_merchant = reformat_dataframe(df_merchant, inf_cols, lambda df: df.replace(np.inf, df[df != np.inf].max()))
df_merchant.describe()

Fill nans with mean values

In [None]:
df_merchant = reformat_dataframe(df_merchant, merchants_numeric_cols, lambda df: df.fillna(df.mean()))


# History Transaction and New Merchant Transaction Data Purge

These two datasets will be purged all together bacause they are highly relevant. In order to process the datasets efficiently we will use a StreamerBuilder to process the data chunk by chunk

In [None]:

chunksize=10 ** 6

def histories_builder() -> StreamerBuilder[pd.DataFrame] :
    return StreamerBuilder.build(pd.read_csv(os.path.join(DATA_DIR, "historical_transactions.csv"),
                       chunksize=chunksize))

def append_df(df1, df2):
    return df1.append(df2)

def sum_df(df1, df2):
    return df1.add(df2)

df_new_merchants = pd.read_csv(os.path.join(DATA_DIR, "new_merchant_transactions.csv"))
df_new_merchants.info()


We will firstly check the dulplciate columns

In [None]:
duplicate_cols = []

for col in df_new_merchants.columns:
    if col in df_merchant.columns:
        duplicate_cols.append(col)

duplicate_cols = ['merchant_id', 'city_id', 'category_1', 'merchant_category_id', 'category_2', 'state_id', 'subsector_id']
df_merchant = df_merchant.drop(duplicate_cols[1:], axis=1)
df_merchant = df_merchant.loc[df_merchant['merchant_id'].drop_duplicates().index]
df_merchant.to_csv(os.path.join(OUT_DIR, 'merchants_pre.csv'), index=False)

In [None]:
df_new_merchants[duplicate_cols].drop_duplicates().shape

In [None]:
df_new_merchants['merchant_id'].nunique()
cols = ['most_recent_sales_range', 'most_recent_purchases_range', 'merchant_id']
df_new_merchants = df_new_merchants.merge(df_merchant[cols], how='left', on='merchant_id')

In [None]:
transactions_numeric = ['month_lag', 'installments', 'purchase_amount']
transactions_category = ['card_id', 'authorized_flag', 'category_3', 'category_1', 'merchant_category_id', 'subsector_id', 'merchant_id', 'city_id', "state_id", 'category_2', 'most_recent_sales_range', 'most_recent_purchases_range']
# reserved for time series model
transactions_time_cols = ['purchase_date']


assert len(df_new_merchants.columns) == len(transactions_category) + len(transactions_numeric) + len(transactions_time_cols)

In [None]:
df_new_merchants[transactions_category].dtypes

In [None]:
nans(df_new_merchants)

In [None]:
features = ['authorized_flag', 'category_1', 'category_3']
df_new_merchants = reformat_dataframe(df_new_merchants, features, change_object_col)


# convert dtype to small int
df_new_merchants['category_2'].fillna(-1, inplace=True)
df_new_merchants['category_2'] = df_new_merchants['category_2'].astype(np.int8)
df_new_merchants['category_1'] = df_new_merchants['category_1'].astype(np.int8)
df_new_merchants['authorized_flag'] = df_new_merchants['authorized_flag'].astype(np.int8)
df_new_merchants['installments'] = df_new_merchants['installments'].astype(np.int8)
df_new_merchants['state_id'] = df_new_merchants['state_id'].astype(np.int8)
df_new_merchants['city_id'] = df_new_merchants['city_id'].astype(np.int8)
df_new_merchants['subsector_id'] = df_new_merchants['subsector_id'].astype(np.int8)

In [None]:
# convert purchase date to timeseries
datetime_index = pd.DatetimeIndex(df_new_merchants['purchase_date'])
df_new_merchants.info()
transactions_dtype = df_new_merchants.dtypes
df_new_merchants['purchase_day'] = datetime_index.day
df_new_merchants['purchase_month'] = datetime_index.month
df_new_merchants['purchase_year'] = datetime_index.year
df_new_merchants['purchase_hour_section'] = datetime_index.time

transactions_features = df_new_merchants.columns
transactions_path = os.path.join(OUT_DIR, 'transactions_pre.csv')
if os.path.exists(transactions_path):
    os.remove(transactions_path)
df_new_merchants.to_csv(transactions_path, mode='a', index=False)


There is no need to fill numerical columns since they don't have nans

In [None]:
# reserved for large memory write-ins
del df_new_merchants, datetime_index
gc.collect()

In [None]:
histories_nans = histories_builder().map(nans).reduce(sum_df).collect(identity)
print(histories_nans)

As we can discover the history columns and new merchants columns are the same, so we can easily using the above methods again.

In [None]:
def add_datetime_index(df: pd.DataFrame):
    _datetime_index = pd.DatetimeIndex(df['purchase_date'])
    df['purchase_day'] = _datetime_index.day
    df['purchase_month'] = _datetime_index.month
    df['purchase_year'] = _datetime_index.year
    df['purchase_hour_section'] = _datetime_index.time
    return df

def convert_columns(df: pd.DataFrame):
    df['category_2'].fillna(-1, inplace=True)
    return df

to_csv_builder = histories_builder() \
                    .map(lambda df: reformat_dataframe(df, features, change_object_col)) \
                    .map(lambda df: df.merge(df_merchant[cols], how='left', on='merchant_id')) \
                    .map(convert_columns) \
                    .map(add_datetime_index)

# write to csv
to_csv_builder.consume(lambda df: df.to_csv(transactions_path, mode='a', index=False, header=False))

del to_csv_builder


# 1.2 Features Engineering

In [None]:
PRE_DIR = os.path.join(DATA_DIR, "pre")



We will extract each columns by their numerical features and append them to the end

In [None]:
def create_aggs_cols() -> t.Tuple[t.Dict[str, str], t.List[str]]:
    aggs = {}
    for col in transactions_numeric:
        aggs[col] = ['nunique', 'mean', 'min', 'max', 'var', 'skew', 'sum']
    for col in transactions_category:
        aggs[col] = ['nunique']
    aggs['card_id'] = ['size', 'count']
    cols = []
    for key in aggs.keys():
        cols.extend([key+"_"+stat for stat in aggs[key]])
    return aggs, cols

In [None]:

@enable_logging("append_new_columns.log")
def append_new_columns(transaction, aggs, cols):
    df = transaction[transaction['month_lag'] < 0].groupby('card_id').agg(aggs)
    df.columns = [co + "_hist" for co in cols]
    df2 = transaction[transaction['month_lag'] >= 0].groupby('card_id').agg(aggs)
    df2.columns =[co + "_hist" for co in cols]
    df = df.reset_index()
    df2 = df2.reset_index()
    df = df.merge(df2, how='left', on='card_id')
    df2 = transaction.groupby('card_id').agg(aggs)
    df2.columns = cols
    df2 = df2.reset_index()
    if not df.empty:
        df = df.merge(df2, how='left', on='card_id')
    else:
        df = df2.copy()
    del transaction
    gc.collect()
    return df

@enable_logging("write_to_csv.log")
def write_to_csv(df):
    train = pd.read_csv(os.path.join(PRE_DIR, "train_pre.csv"))
    train = train.merge(df, how='left', on='card_id')
    logging.debug(f'Training df after first merge')

    test = pd.read_csv(os.path.join(PRE_DIR, "test_pre.csv"))
    test = test.merge(df, how='left', on='card_id')

    train.to_csv(os.path.join(PRE_DIR, "train_groupby.csv"), index=False, mode='a')
    test.to_csv(os.path.join(PRE_DIR, "test_groupby.csv"), index=False, mode='a')
    del df
    gc.collect()

The Following block is extremely time-consuming, run with caution.

In [None]:
train_groupby, test_groupby = os.path.join(PRE_DIR, "train_groupby.csv"), os.path.join(PRE_DIR, "test_groupby.csv")
if os.path.exists(train_groupby):
    os.remove(train_groupby)
if os.path.exists(test_groupby):
    os.remove(test_groupby)

aggs, cols = create_aggs_cols()

dtype = transactions_dtype.to_dict()

# we will use dtypes collected from previous to save memory
transactions = pd.read_csv(os.path.join(PRE_DIR, "transactions_pre.csv"), dtype=dtype)
transactions = append_new_columns(transactions, aggs, cols)
write_to_csv(transactions)
