In [1]:
%cd ../

/mnt/kireev/pycharm-deploy/vtb


In [2]:
import pickle
import random

In [3]:
from glob import glob

In [4]:
import numpy as np
import pandas as pd

In [5]:
import torch
import pytorch_lightning as pl

In [6]:
from pyhocon import ConfigFactory

In [7]:
import matplotlib.pyplot as plt

In [8]:
from dltranz.data_load.iterable_processing.category_size_clip import CategorySizeClip
from dltranz.data_load import augmentation_chain
from dltranz.data_load.augmentations.seq_len_limit import SeqLenLimit
from dltranz.data_load.augmentations.random_slice import RandomSlice

from dltranz.seq_encoder import create_encoder

from dltranz.metric_learn.sampling_strategies import get_sampling_strategy
from dltranz.metric_learn.losses import get_loss

from dltranz.tb_interface import get_scalars

In [9]:
from vtb_code.data import PairedDataset, paired_collate_fn, DropDuplicate
from vtb_code.metrics import PrecisionK, MeanReciprocalRankK, ValidationCallback

In [10]:
FOLD_ID = 0

In [11]:
fold_id_test = FOLD_ID

In [12]:
folds_count = len(glob('data/train_matching_*.csv'))
folds_count

6

In [13]:
# fold_id_valid = np.random.choice([i for i in range(folds_count) if i != fold_id_test], size=1)[0]
fold_id_valid = (fold_id_test + 1) % folds_count
fold_id_valid

1

In [14]:
df_matching_train = pd.concat([pd.read_csv(f'data/train_matching_{i}.csv')
                              for i in range(folds_count) 
                              if i not in (fold_id_test, fold_id_valid)])
df_matching_valid = pd.read_csv(f'data/train_matching_{fold_id_valid}.csv')
df_matching_test = pd.read_csv(f'data/train_matching_{fold_id_test}.csv')

In [15]:
[len(df) for df in [df_matching_train, df_matching_valid, df_matching_test]]

[11720, 2930, 2931]

In [16]:
def trx_types(df):
    df['mcc_code'] = df['mcc_code'].astype(str)
    df['currency_rk'] = df['currency_rk'].astype(str)
    df['event_time'] = pd.to_datetime(df['transaction_dttm']).astype(int) / 1e9
    return df[['user_id', 'event_time', 'mcc_code', 'currency_rk', 'transaction_amt']]

In [17]:
df_trx_train = pd.concat([trx_types(pd.read_csv(f'data/transactions_{i}.csv'))
                              for i in range(folds_count) 
                              if i not in (fold_id_test, fold_id_valid)])
df_trx_valid = trx_types(pd.read_csv(f'data/transactions_{fold_id_valid}.csv'))
df_trx_test = trx_types(pd.read_csv(f'data/transactions_{fold_id_test}.csv'))
df_trx_puzzle = trx_types(pd.read_csv(f'data/transactions_unmatched.csv'))

In [18]:
def click_types(df):
    df['event_time'] = pd.to_datetime(df['timestamp']).astype(int) / 1e9
    df = pd.merge(df, pd.read_csv('data/click_categories.csv'), on='cat_id')
    df['cat_id'] = df['cat_id'].astype(str)
    return df[['user_id', 'event_time', 'cat_id', 'level_0', 'level_1', 'level_2']]

In [19]:
df_click_train = pd.concat([click_types(pd.read_csv(f'data/clickstream_{i}.csv'))
                              for i in range(folds_count) 
                              if i not in (fold_id_test, fold_id_valid)])
df_click_valid = click_types(pd.read_csv(f'data/clickstream_{fold_id_valid}.csv'))
df_click_test = click_types(pd.read_csv(f'data/clickstream_{fold_id_test}.csv'))
df_click_puzzle = click_types(pd.read_csv(f'data/clickstream_unmatched.csv'))

In [20]:
import logging
import numpy as np
import pandas as pd

from typing import List

from dltranz.data_preprocessing.base import DataPreprocessor
from dltranz.data_preprocessing.util import pd_hist

logger = logging.getLogger(__name__)


class PandasDataPreprocessor(DataPreprocessor):
    """Data preprocessor based on pandas.DataFrame

    During preprocessing it
        * transform `cols_event_time` column with date and time
        * encodes category columns `cols_category` into ints;
        * apply logarithm transformation to `cols_log_norm' columns;
        * groups flat data by `col_id`;
        * arranges data into list of dicts with features

    Parameters
    ----------
    col_id : str
        name of column with ids
    cols_event_time : str,
        name of column with time and date
    cols_category : list[str],
        list of category columns
    cols_log_norm : list[str],
        list of columns to be logarithmed
    time_transformation: str. Default: 'default'.
        type of transformation to be applied to time column
    print_dataset_info : bool. Default: False.
        If True, print dataset stats during preprocessor fitting and data transformation
    """

    def __init__(self,
                 col_id: str,
                 cols_event_time: str,
                 cols_category: List[str],
                 cols_log_norm: List[str],
                 time_transformation: str = 'default',
                 print_dataset_info: bool = False):

        super().__init__(col_id, cols_event_time, cols_category, cols_log_norm)
        self.print_dataset_info = print_dataset_info
        self.time_transformation = time_transformation

    def fit(self, dt, **params):
        """
        Parameters
        ----------
        dt : pandas.DataFrame with flat data

        Returns
        -------
        self : object
            Fitted preprocessor.
        """
        # Reset internal state before fitting
        self._reset()

        for col in self.cols_category:
            pd_col = dt[col].astype(str)
            mapping = {k: i + 1 for i, k in enumerate(pd_col.value_counts().index)}
            self.cols_category_mapping[col] = mapping

            if self.print_dataset_info:
                logger.info(f'Encoder stat for "{col}":\ncodes | trx_count\n{pd_hist(dt[col], col)}')

        return self

    def transform(self, df, copy=True):
        """Perform preprocessing.
        Parameters
        ----------
        df : pandas.DataFrame with flat data
        copy : bool, default=None
            Copy the input X or not.
        Returns
        -------
        features : List of dicts grouped by col_id.
        """
        self.check_is_fitted()
        df_data = df.copy() if copy else df

        if self.print_dataset_info:
            logger.info(f'Found {df_data[self.col_id].nunique()} unique ids')

        # event_time mapping
        if self.time_transformation == 'none':
            pass
        elif self.time_transformation == 'default':
            df_data = self._td_default(df_data, self.cols_event_time)
        elif self.time_transformation == 'float':
            df_data = self._td_float(df_data, self.cols_event_time)
        elif self.time_transformation == 'gender':
            df_data = self._td_gender(df_data, self.cols_event_time)
        else:
            raise NotImplementedError(f'Unknown type of data transformation: "{self.time_transformation}"')

        for col in self.cols_category:
            if col not in self.cols_category_mapping:
                raise KeyError(f"column {col} isn't in fitted category columns")
            pd_col = df_data[col].astype(str)
            df_data[col] = pd_col.map(self.cols_category_mapping[col]) \
                .fillna(max(self.cols_category_mapping[col].values()))
            if self.print_dataset_info:
                logger.info(f'Encoder stat for "{col}":\ncodes | trx_count\n{pd_hist(df_data[col], col)}')

        for col in self.cols_log_norm:
            df_data[col] = np.log1p(abs(df_data[col])) * np.sign(df_data[col])
            df_data[col] /= abs(df_data[col]).max()
            if self.print_dataset_info:
                logger.info(f'Encoder stat for "{col}":\ncodes | trx_count\n{pd_hist(df_data[col], col)}')

        if self.print_dataset_info:
            df = df_data.groupby(self.col_id)['event_time'].count()
            logger.info(f'Trx count per clients:\nlen(trx_list) | client_count\n{pd_hist(df, "trx_count")}')

        # column filter
        used_columns = [col for col in df_data.columns
                        if col in self.cols_category + self.cols_log_norm + ['event_time', self.col_id]]

        logger.info('Feature collection in progress ...')
        features = df_data[used_columns] \
            .assign(et_index=lambda x: x['event_time']) \
            .set_index([self.col_id, 'et_index']).sort_index() \
            .groupby(self.col_id).apply(lambda x: {k: np.array(v) for k, v in x.to_dict(orient='list').items()}) \
            .rename('feature_arrays').reset_index().to_dict(orient='records')

        def squeeze(rec):
            return {self.col_id: rec[self.col_id], **rec['feature_arrays']}
        features = [squeeze(r) for r in features]

        if self.print_dataset_info:
            feature_names = list(features[0].keys())
            logger.info(f'Feature names: {feature_names}')

        logger.info(f'Prepared features for {len(features)} clients')
        return features

    @staticmethod
    def _td_default(df, cols_event_time):
        df_event_time = df[cols_event_time].drop_duplicates()
        df_event_time = df_event_time.sort_values(cols_event_time)
        df_event_time['event_time'] = np.arange(len(df_event_time))
        df = pd.merge(df, df_event_time, on=cols_event_time)
        logger.info('Default time transformation')
        return df

    @staticmethod
    def _td_float(df, col_event_time):
        df['event_time'] = df[col_event_time].astype(float)
        logger.info('To-float time transformation')
        return df

    @staticmethod
    def _td_gender(df, col_event_time):
        """Gender-dataset-like transformation

        'd hh:mm:ss' -> float where integer part is day number and fractional part is seconds from day begin
        '1 00:00:00' -> 1.0
        '1 12:00:00' -> 1.5
        '1 01:00:00' -> 1 + 1 / 24
        '2 23:59:59' -> 1.99
        '432 12:00:00' -> 432.5

        :param df:
        :param col_event_time:
        :return:
        """
        padded_time = df[col_event_time].str.pad(15, 'left', '0')
        day_part = padded_time.str[:6].astype(float)
        time_part = pd.to_datetime(padded_time.str[7:], format='%H:%M:%S').values.astype(int) // 1e9
        time_part = time_part % (24 * 60 * 60) / (24 * 60 * 60)
        df['event_time'] = day_part + time_part
        logger.info('Gender-dataset-like time transformation')
        return df


In [21]:
preprocessor_trx = PandasDataPreprocessor(
    col_id='user_id',
    cols_event_time='event_time',
    time_transformation='none',
    cols_category=["mcc_code", "currency_rk"],
    cols_log_norm=["transaction_amt"],
    print_dataset_info=False,
)

preprocessor_click = PandasDataPreprocessor(
    col_id='user_id',
    cols_event_time='event_time',
    time_transformation='none',
    cols_category=['cat_id', 'level_0', 'level_1', 'level_2'],
    cols_log_norm=[],
    print_dataset_info=False,
)

In [22]:
category_max_size_trx = {
    'mcc_code': 350,
    'currency_rk': 5,
}
category_max_size_click = {
    'cat_id': 400,
    'level_0': 400,
    'level_1': 400,
    'level_2': 400,
}

In [23]:
def trx_to_torch(seq):
    seq = CategorySizeClip(category_max_size_trx)(seq)
    for x in seq:
        yield x['user_id'], {
            'event_time': torch.from_numpy(x['event_time']).float(),
            'mcc_code': torch.from_numpy(x['mcc_code']).int(),
            'currency_rk': torch.from_numpy(x['currency_rk']).int(),
            'transaction_amt': torch.from_numpy(x['transaction_amt']).float(),
        }

def click_to_torch(seq):
    seq = CategorySizeClip(category_max_size_click)(seq)
    for x in seq:
        yield x['user_id'], {
            'event_time': torch.from_numpy(x['event_time']).float(),
            'cat_id': torch.from_numpy(x['cat_id']).int(),
            'level_0': torch.from_numpy(x['level_0']).int(),
            'level_1': torch.from_numpy(x['level_1']).int(),
            'level_2': torch.from_numpy(x['level_2']).int(),

        }

In [24]:
features_trx_train = dict(trx_to_torch(preprocessor_trx.fit_transform(df_trx_train)))
features_trx_valid = dict(trx_to_torch(preprocessor_trx.transform(df_trx_valid)))
features_trx_test = dict(trx_to_torch(preprocessor_trx.transform(df_trx_test)))
features_trx_puzzle = dict(trx_to_torch(preprocessor_trx.transform(df_trx_puzzle)))

In [25]:
features_click_train = dict(click_to_torch(preprocessor_click.fit_transform(df_click_train)))
features_click_valid = dict(click_to_torch(preprocessor_click.transform(df_click_valid)))
features_click_test = dict(click_to_torch(preprocessor_click.transform(df_click_test)))
features_click_puzzle = dict(click_to_torch(preprocessor_click.transform(df_click_puzzle)))

In [26]:
with open(f'data/features_f{FOLD_ID}.pickle', 'wb') as f:
    pickle.dump((
        features_trx_train,
        features_trx_valid,
        features_trx_test,
        features_trx_puzzle,
        features_click_train,
        features_click_valid,
        features_click_test,
        features_click_puzzle,
    ), f)

In [27]:
%%time
with open(f'data/features_f{FOLD_ID}.pickle', 'rb') as f:
    (
        features_trx_train,
        features_trx_valid,
        features_trx_test,
        features_trx_puzzle,
        features_click_train,
        features_click_valid,
        features_click_test,
        features_click_puzzle,
    ) = pickle.load(f)

CPU times: user 13.6 s, sys: 4.18 s, total: 17.8 s
Wall time: 17.8 s
