In [5]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [6]:
%load_ext autoreload
%autoreload 2

import sys
sys.path.append('..')

import logging
import pytorch_lightning as pl
import warnings

warnings.filterwarnings('ignore')
logging.getLogger("pytorch_lightning").setLevel(logging.ERROR)

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


## Data load

In [7]:
! mkdir ../../data
! curl -OL https://storage.yandexcloud.net/di-datasets/age-prediction-nti-sbebank-2019.zip
! unzip -j -o age-prediction-nti-sbebank-2019.zip 'data/*.csv' -d ../../data
! mv age-prediction-nti-sbebank-2019.zip ../../data/

mkdir: cannot create directory ‘../../data’: File exists
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  239M  100  239M    0     0  49.4M      0  0:00:04  0:00:04 --:--:-- 54.9M02k      0  0:05:49 --:--:--  0:05:49  701k
Archive:  age-prediction-nti-sbebank-2019.zip
  inflating: ../../data/test.csv     
  inflating: ../../data/small_group_description.csv  
  inflating: ../../data/train_target.csv  
  inflating: ../../data/transactions_train.csv  
  inflating: ../../data/transactions_test.csv  


## Data Preproccessing

In [12]:
import os
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import types as T


data_path = '../../data/'

spark = SparkSession.builder.master("local[8]").appName("PysparkDataPreprocessor").getOrCreate()

In [93]:
source_data = spark.read.options(header=True, inferSchema=True).csv(os.path.join(data_path, 'transactions_train.csv'))
source_data.show(2)

                                                                                

+---------+----------+-----------+----------+
|client_id|trans_date|small_group|amount_rur|
+---------+----------+-----------+----------+
|    33172|         6|          4|    71.463|
|    33172|         6|         35|    45.017|
+---------+----------+-----------+----------+
only showing top 2 rows



In [134]:
df = source_data.alias('df')
df.show()

+---------+----------+-----------+----------+
|client_id|trans_date|small_group|amount_rur|
+---------+----------+-----------+----------+
|    33172|         6|          4|    71.463|
|    33172|         6|         35|    45.017|
|    33172|         8|         11|    13.887|
|    33172|         9|         11|    15.983|
|    33172|        10|         11|    21.341|
|    33172|        11|         11|    17.941|
|    33172|        12|         11|    17.726|
|    33172|        13|         18|    47.397|
|    33172|        13|          1|   220.009|
|    33172|        13|         11|     9.067|
|    33172|        16|          3|    18.319|
|    33172|        16|          1|     9.846|
|    33172|        16|         11|    19.666|
|    33172|        17|         82|     2.544|
|    33172|        19|          3|    16.388|
|    33172|        19|         32|    45.795|
|    33172|        19|          1|     4.184|
|    33172|        19|         11|    15.479|
|    33172|        20|          1|

In [None]:
import logging
import datetime
from pyspark.sql.window import Window

from .base import DataPreprocessor


logger = logging.getLogger(__name__)


class PysparkDataPreprocessor(DataPreprocessor):
    """Data preprocessor based on pandas.DataFrame
    During preprocessing it
        * transform `cols_event_time` column with date and time
        * encodes category columns `cols_category` into ints;
        * apply logarithm transformation to `cols_log_norm' columns;
        * groups flat data by `col_id`;
        * arranges data into list of dicts with features
    Parameters
    ----------
    col_id : str
        name of column with ids
    cols_event_time : str,
        name of column with time and date
    cols_category : list[str],s
        list of category columns
    cols_log_norm : list[str],
        list of columns to be logarithmed
    cols_identity : list[str],
        list of columns to be passed as is without any transformation
    cols_target: List[str],
        list of columns with target
    time_transformation: str. Default: 'default'.
        type of transformation to be applied to time column
    print_dataset_info : bool. Default: False.
        If True, print dataset stats during preprocessor fitting and data transformation
    """
    def __init__(self,
                 col_id: str,
                 cols_event_time: str,
                 cols_category: List[str],
                 cols_log_norm: List[str],
                 cols_identity: List[str],
                 cols_target: List[str] = [],
                 time_transformation: str = 'default'):

        super().__init__(col_id, cols_event_time, cols_category, cols_log_norm, cols_identity, cols_target)
        self.print_dataset_info = print_dataset_info
        self.time_transformation = time_transformation
        self.time_min = None
        
        
    def fit(self, df, **params):
        """
        Parameters
        ----------
        dt : pandas.DataFrame with flat data
        Returns
        -------
        self : object
            Fitted preprocessor.
        """
        # Reset internal state before fitting
        self._reset()

        for col in self.cols_category:
            mapping = {row[col]: i + 1 for i, row in enumerate(df.select(col).distinct().collect())}
            self.cols_category_mapping[col] = mapping

        for col in self.cols_log_norm:
            df = df.withColumn('sign', F.when(F.col(col) >= 0, 1).otherwise(-1))
            self.cols_log_norm_maxes[col] = source_data.select((F.log1p(F.abs(F.col(col))) * F.col('sign')).alias('log1p_signed'))\
                                                                    .agg({"log1p_signed": "max"}).collect()[0]['max(log1p_signed)']

        if self.time_transformation == 'hours_from_min':
            self.time_min = df.select((F.col(self.cols_event_time))\
                                      .cast(dataType=T.TimestampType()).alias('dt'))\
                                      .agg({'dt': 'min'}).collect()[0]['min(dt)']
            self.time_min = (self.time_min - datetime.datetime(1970,1,1)).total_seconds()

        return self

    
    def transform(self, df, copy=True):
        """Perform preprocessing.
        Parameters
        ----------
        df : pandas.DataFrame with flat data
        copy : bool, default=None
            Copy the input X or not.
        Returns
        -------
        features : List of dicts grouped by col_id.
        """
        self.check_is_fitted()
        df_data = df.alias('df_data') if copy else df

        # event_time mapping
        if self.time_transformation == 'none':
            pass
        elif self.time_transformation == 'default':
            df_data = self._td_default(df_data, self.cols_event_time)
        elif self.time_transformation == 'float':
            df_data = self._td_float(df_data, self.cols_event_time)
        elif self.time_transformation == 'gender':
            df_data = self._td_gender(df_data, self.cols_event_time)
        elif self.time_transformation == 'hours_from_min':
            df_data = self._td_hours(df_data, self.cols_event_time)
        else:
            raise NotImplementedError(f'Unknown type of data transformation: "{self.time_transformation}"')
        

    def TRANSFORM(self, df, copy=True):
        """Perform preprocessing.
        Parameters
        ----------
        df : pandas.DataFrame with flat data
        copy : bool, default=None
            Copy the input X or not.
        Returns
        -------
        features : List of dicts grouped by col_id.
        """
        self.check_is_fitted()
        df_data = df.copy() if copy else df

        if self.print_dataset_info:
            logger.info(f'Found {df_data[self.col_id].nunique()} unique ids')

        # event_time mapping
        if self.time_transformation == 'none':
            pass
        elif self.time_transformation == 'default':
            df_data = self._td_default(df_data, self.cols_event_time)
        elif self.time_transformation == 'float':
            df_data = self._td_float(df_data, self.cols_event_time)
        elif self.time_transformation == 'gender':
            df_data = self._td_gender(df_data, self.cols_event_time)
        elif self.time_transformation == 'hours_from_min':
            df_data = self._td_hours(df_data, self.cols_event_time)
        else:
            raise NotImplementedError(f'Unknown type of data transformation: "{self.time_transformation}"')

        for col in self.cols_category:
            if col not in self.cols_category_mapping:
                raise KeyError(f"column {col} isn't in fitted category columns")
            pd_col = df_data[col].astype(str)
            df_data[col] = pd_col.map(self.cols_category_mapping[col]) \
                .fillna(max(self.cols_category_mapping[col].values()))
            if self.print_dataset_info:
                logger.info(f'Encoder stat for "{col}":\ncodes | trx_count\n{pd_hist(df_data[col], col)}')

        for col in self.cols_log_norm:
            df_data[col] = np.log1p(abs(df_data[col])) * np.sign(df_data[col])
            df_data[col] /= self.cols_log_norm_maxes[col]
            if self.print_dataset_info:
                logger.info(f'Encoder stat for "{col}":\ncodes | trx_count\n{pd_hist(df_data[col], col)}')

        if self.print_dataset_info:
            df = df_data.groupby(self.col_id)['event_time'].count()
            logger.info(f'Trx count per clients:\nlen(trx_list) | client_count\n{pd_hist(df, "trx_count")}')

        # column filter
        columns_for_filter = reduce(iadd, [
            self.cols_category,
            self.cols_log_norm,
            self.cols_identity,
            ['event_time', self.col_id],
            self.cols_target,
        ], [])
        used_columns = [col for col in df_data.columns if col in columns_for_filter]

        logger.info('Feature collection in progress ...')
        features = df_data[used_columns] \
            .assign(et_index=lambda x: x['event_time']) \
            .set_index([self.col_id, 'et_index']).sort_index() \
            .groupby(self.col_id).apply(lambda x: {k: v[0] if k in self.cols_target else np.array(v)
                                                   for k, v in x.to_dict(orient='list').items()}) \
            .rename('feature_arrays').reset_index().to_dict(orient='records')

        def squeeze(rec):
            return {self.col_id: rec[self.col_id], **rec['feature_arrays']}
        features = [squeeze(r) for r in features]

        if self.print_dataset_info:
            feature_names = list(features[0].keys())
            logger.info(f'Feature names: {feature_names}')

        logger.info(f'Prepared features for {len(features)} clients')
        return features
    
    
    @staticmethod
    def _td_default(df, cols_event_time):
        w = Window().orderBy(cols_event_time)
        tmp_df = df.select(cols_event_time).distinct()
        tmp_df = tmp_df.withColumn('event_time', F.row_number().over(w) - 1)
        df = df.join(tmp_df, on=cols_event_time)
        return df

    
    @staticmethod
    def _td_float(df, col_event_time):
        logger.info('To-float time transformation begins...')
        df = df.withColumn('event_time', F.col(col_event_time).astype('float'))
        logger.info('To-float time transformation ends')
        return df

    
    @staticmethod
    def _td_gender(df, col_event_time):
        """Gender-dataset-like transformation
        'd hh:mm:ss' -> float where integer part is day number and fractional part is seconds from day begin
        '1 00:00:00' -> 1.0
        '1 12:00:00' -> 1.5
        '1 01:00:00' -> 1 + 1 / 24
        '2 23:59:59' -> 1.99
        '432 12:00:00' -> 432.5   '000432 12:00:00'
        :param df:
        :param col_event_time:
        :return:
        """
        logger.info('Gender-dataset-like time transformation begins...')
        df = df.withColumn('_et_day', F.substring(F.lpad(F.col(col_event_time), 15, '0'), 1, 6).cast('float'))

        df = df.withColumn('_et_time', F.substring(F.lpad(F.col(col_event_time), 15, '0'), 8, 8))
        df = df.withColumn('_et_time', F.regexp_replace('_et_time', r'\:60$', ':59'))
        df = df.withColumn('_et_time', F.unix_timestamp('_et_time', 'HH:mm:ss') / (24 * 60 * 60))

        df = df.withColumn('event_time', F.col('_et_day') + F.col('_et_time'))
        df = df.drop('_et_day', '_et_time')
        logger.info('Gender-dataset-like time transformation ends')
        return df

    
    def _td_hours(self, df, col_event_time):
        logger.info('To hours time transformation begins...')
        df = df.withColumn('_dt', (F.col(col_event_time)).cast(dataType=T.TimestampType()))
        df = df.withColumn('event_time', ((F.col('_dt')).cast('float') - self.time_min) / 3600)
        df = df.drop('_dt')
        logger.info('To hours time transformation ends')
        return df

   
    def _reset(self):
        """Reset internal data-dependent state of the preprocessor, if necessary.
        __init__ parameters are not touched.
        """
        self.time_min = None
        super()._reset()


In [98]:
import os
import pandas as pd
import numpy as np


data_path = '../../data/'

source_data_pd = pd.read_csv(os.path.join(data_path, 'transactions_train.csv'))
source_data_pd.head(2)

Unnamed: 0,client_id,trans_date,small_group,amount_rur
0,33172,6,4,71.463
1,33172,6,35,45.017


In [107]:
df = source_data_pd.copy()
cols_event_time = 'trans_date'

df_event_time = pd.DataFrame({cols_event_time: df[cols_event_time].drop_duplicates()})
df_event_time = df_event_time.sort_values(cols_event_time)
df_event_time['event_time'] = np.arange(len(df_event_time))
df = pd.merge(df, df_event_time, on=cols_event_time)

In [118]:
df

Unnamed: 0,client_id,trans_date,small_group,amount_rur,event_time
0,33172,6,4,71.463,6
1,33172,6,35,45.017,6
2,44477,6,4,17.879,6
3,44477,6,1,122.482,6
4,1049,6,36,19.418,6
...,...,...,...,...,...
26450572,22890,22,166,29.774,22
26450573,22890,22,1,23.763,22
26450574,9281,22,1,12.636,22
26450575,49473,22,25,0.938,22


In [108]:
df

Unnamed: 0,client_id,trans_date,small_group,amount_rur,event_time
0,33172,6,4,71.463,6
1,33172,6,35,45.017,6
2,44477,6,4,17.879,6
3,44477,6,1,122.482,6
4,1049,6,36,19.418,6
...,...,...,...,...,...
26450572,22890,22,166,29.774,22
26450573,22890,22,1,23.763,22
26450574,9281,22,1,12.636,22
26450575,49473,22,25,0.938,22


In [None]:
trans_date

In [46]:
type(pd.to_datetime(source_data_pd['trans_date']).min())

pandas._libs.tslibs.timestamps.Timestamp

In [49]:
df_copy = source_data_pd.copy()

len(df_copy['trans_date'].drop_duplicates())

730

In [48]:
len(df_copy)

26450577

In [9]:
pd.to_datetime(source_data_pd['trans_date'])

0          1970-01-01 00:00:00.000000006
1          1970-01-01 00:00:00.000000006
2          1970-01-01 00:00:00.000000008
3          1970-01-01 00:00:00.000000009
4          1970-01-01 00:00:00.000000010
                        ...             
26450572   1970-01-01 00:00:00.000000727
26450573   1970-01-01 00:00:00.000000727
26450574   1970-01-01 00:00:00.000000727
26450575   1970-01-01 00:00:00.000000727
26450576   1970-01-01 00:00:00.000000729
Name: trans_date, Length: 26450577, dtype: datetime64[ns]

In [36]:
dt = source_data_pd

(np.log1p(abs(dt[col])) * np.sign(dt[col])).max()

12.899439277009005

In [5]:
from dltranz.data_preprocessing import PandasDataPreprocessor

preprocessor = PandasDataPreprocessor(
    col_id='client_id',
    cols_event_time='trans_date',
    time_transformation='float',
    cols_category=["trans_date", "small_group"],
    cols_log_norm=["amount_rur"],
    cols_identity=[],
    print_dataset_info=False,
)

In [6]:
%%time

dataset = preprocessor.fit_transform(source_data)

CPU times: user 50.4 s, sys: 3.2 s, total: 53.6 s
Wall time: 53.5 s


In [7]:
from sklearn.model_selection import train_test_split

train, test = train_test_split(dataset, test_size=0.2, random_state=42)

print(len(train), len(test))

24000 6000


## Embedding training

Model training in our framework organised via pytorch-lightning (pl) framework.
The key parts of neural networks training in pl are: 

    * model (pl.LightningModule)
    * data_module (pl.LightningDataModule)
    * pl.trainer (pl.trainer)
    
For futher details check https://www.pytorchlightning.ai/

### model 

In [8]:
from dltranz.seq_encoder import SequenceEncoder
from dltranz.models import Head
from dltranz.lightning_modules.emb_module import EmbModule

seq_encoder = SequenceEncoder(
    category_features=preprocessor.get_category_sizes(),
    numeric_features=["amount_rur"],
    trx_embedding_noize=0.003
)

head = Head(input_size=seq_encoder.embedding_size, use_norm_encoder=True)

model = EmbModule(seq_encoder=seq_encoder, head=head)

### Data module

In [9]:
from dltranz.data_load.data_module.emb_data_module import EmbeddingTrainDataModule

dm = EmbeddingTrainDataModule(
    dataset=train,
    pl_module=model,
    min_seq_len=25,
    seq_split_strategy='SampleSlices',
    category_names = model.seq_encoder.category_names,
    category_max_size = model.seq_encoder.category_max_size,
    split_count=5,
    split_cnt_min=25,
    split_cnt_max=200,
    train_num_workers=16,
    train_batch_size=256,
    valid_num_workers=16,
    valid_batch_size=256
)

### Trainer

In [10]:
import torch
import pytorch_lightning as pl

import logging
# logging.getLogger("lightning").addHandler(logging.NullHandler())
# logging.getLogger("lightning").propagate = False

trainer = pl.Trainer(
#     progress_bar_refresh_rate=0,
    max_epochs=150,
    gpus=1 if torch.cuda.is_available() else 0
)

### Training 

In [None]:
%%time

trainer.fit(model, dm)

## Inference 

In [12]:
# embedding inference

from dltranz.inference import get_embeddings

train_embeds = get_embeddings(
    data=train,
    model=model, 
    category_names = model.seq_encoder.category_names,
    category_max_size = model.seq_encoder.category_max_size,
)

test_embeds = get_embeddings(
    data=test,
    model=model, 
    category_names = model.seq_encoder.category_names,
    category_max_size = model.seq_encoder.category_max_size,
)

train_embeds.shape, test_embeds.shape

24000it [00:01, 21719.33it/s]
6000it [00:00, 31278.41it/s]                                                                                                            
                                                                                                                                        

((24000, 512), (6000, 512))

In [13]:
# join target and embeddings

df_target = pd.read_csv(os.path.join(data_path, 'train_target.csv'))
df_target = df_target.set_index('client_id')
df_target.rename(columns={"bins": "target"}, inplace=True)

train_df = pd.DataFrame(data=train_embeds, columns=[f'embed_{i}' for i in range(train_embeds.shape[1])])
train_df['client_id'] = [x['client_id'] for x in train]
train_df = train_df.merge(df_target, how='left', on='client_id')

test_df = pd.DataFrame(data=test_embeds, columns=[f'embed_{i}' for i in range(test_embeds.shape[1])])
test_df['client_id'] = [x['client_id'] for x in test]
test_df = test_df.merge(df_target, how='left', on='client_id')

print(train_df.shape, test_df.shape)
train_df.head(2)

(24000, 514) (6000, 514)


Unnamed: 0,embed_0,embed_1,embed_2,embed_3,embed_4,embed_5,embed_6,embed_7,embed_8,embed_9,...,embed_504,embed_505,embed_506,embed_507,embed_508,embed_509,embed_510,embed_511,client_id,target
0,0.344823,0.340152,0.231864,-0.789117,-0.013289,-0.056129,-0.988241,-0.010464,-0.064898,-0.029179,...,0.342959,0.040367,0.253053,0.712581,-0.148498,0.016645,-0.124844,-0.07812,36253,1
1,0.276711,0.492666,0.781279,-0.824952,0.02034,-0.014695,-0.891105,-0.047485,0.063514,0.170822,...,0.372329,0.049816,0.346733,0.46575,-0.08058,0.011563,-0.040877,-0.028394,396,2


Obtained embeddings can be used as features for model training

For example:

In [14]:
from sklearn.linear_model import LogisticRegression

embed_columns = [x for x in train_df.columns if x.startswith('embed')]
x_train, y_train = train_df[embed_columns], train_df['target']
x_test, y_test = test_df[embed_columns], test_df['target']

clf = LogisticRegression()
clf.fit(x_train, y_train)
clf.score(x_test, y_test)

0.6296666666666667