In [5]:
import os
import glob
import time
from copy import deepcopy

import numpy as np
import pandas as pd
import pyspark

In [6]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [12]:
SEED = 42
np.random.seed(SEED)

# Description

Этот ноутбук описывает подход к транзакционным данным, имеющим многоуровневую структуру. Такими данными могут быть, например, список транзакций (покупок) клиента. Можно рассматривать их как одну единую последовательность, которую нужно свернуть (e.g. транзакции -> клиент), а можно сделать некоторую иерархию сверток (транзакции -> чеки -> клиент). Здесь рассматривается второй подход и используется библиотека pytorch-lifestream, которая позволит нам получить эмбеддинги первой свертки (эмбеддинги чеков), а затем свернуть чеки еще раз в качестве downstream-task и предсказывать, например, дефолт клиента.

Последовательные действия, которые мы совершим:
- создадим искуственный датасет транзакций; присвоим каждой транзакции id чека и id клиента (транзакции из одного и того же чека могут быть только у одного клиента)
- зададимся рандомным искуственным таргетом для downstream-task (классификации)
- выполним препроцессинг данных и получим транзакционный pyspark.sql.DataFrame
- сохраним его в виде sparkpickle (еще на кластере)
- сериализуем имеющийся sparkpickle (уже на сервере)
- выполним upstream task (сформируем эмбеддинги чеков с помощью библиотеки pytorch-lightning)
- разделим чеки по клиентам, выполним еще 1 свертку и downstream-task (классификацию)

# Data

Абстрагируемся от данных и просто сгенерим несколько фичей для нашего условного датасета:

In [15]:
np.random.seed(SEED)
n_samples = 10000
data = pd.DataFrame(np.random.randn(n_samples, 5))

In [16]:
data.head()

Unnamed: 0,0,1,2,3,4
0,0.496714,-0.138264,0.647689,1.52303,-0.234153
1,-0.234137,1.579213,0.767435,-0.469474,0.54256
2,-0.463418,-0.46573,0.241962,-1.91328,-1.724918
3,-0.562288,-1.012831,0.314247,-0.908024,-1.412304
4,1.465649,-0.225776,0.067528,-1.424748,-0.544383


In [17]:
data.columns = [str(i) for i in data.columns]

Добавим категориальные фичи:

In [24]:
np.random.seed(SEED)
cat = ['one', 'two', 'three']
data['cat'] = np.random.choice(cat, n_samples)

In [25]:
data.head()

Unnamed: 0,0,1,2,3,4,cat
0,0.496714,-0.138264,0.647689,1.52303,-0.234153,three
1,-0.234137,1.579213,0.767435,-0.469474,0.54256,one
2,-0.463418,-0.46573,0.241962,-1.91328,-1.724918,three
3,-0.562288,-1.012831,0.314247,-0.908024,-1.412304,three
4,1.465649,-0.225776,0.067528,-1.424748,-0.544383,one


Добавим рандомное поле даты:

In [43]:
from random import randrange
from datetime import timedelta, datetime

def random_date(start, end):
    """
    This function will return a random datetime between two datetime
    objects.
    """
    delta = end - start
    int_delta = (delta.days * 24 * 60 * 60) + delta.seconds
    random_second = randrange(int_delta)
    return start + timedelta(seconds=random_second)

In [44]:
d1 = datetime.strptime('1/1/2008 1:30 PM', '%m/%d/%Y %I:%M %p')
d2 = datetime.strptime('1/1/2009 4:50 AM', '%m/%d/%Y %I:%M %p')

data['date'] = [random_date(d1, d2) for _ in range(n_samples)]

In [45]:
data.head()

Unnamed: 0,0,1,2,3,4,cat,date,receipt_id,target
0,0.496714,-0.138264,0.647689,1.52303,-0.234153,three,2008-08-19 17:39:42,861,1
1,-0.234137,1.579213,0.767435,-0.469474,0.54256,one,2008-05-17 21:36:41,1295,1
2,-0.463418,-0.46573,0.241962,-1.91328,-1.724918,three,2008-09-28 13:30:45,1131,1
3,-0.562288,-1.012831,0.314247,-0.908024,-1.412304,three,2008-07-16 03:40:22,1096,1
4,1.465649,-0.225776,0.067528,-1.424748,-0.544383,one,2008-04-12 14:33:01,1639,0


Представим, что это какие-то данные по транзакциям из чеков, дадим 2 id этим чекам: id чека и id пользователя (так как данные искусственные, дадим id пользователя уже после свертки по чекам, в реальности нужно будет иметь mapping [id чека -> id юзера], чтобы потом сматчить

In [46]:
np.random.seed(SEED)
USER_ID = 'user_id'
RECEIPT_ID = 'receipt_id'
data[RECEIPT_ID] = np.random.randint(1, 3000, n_samples)

In [47]:
data.head()

Unnamed: 0,0,1,2,3,4,cat,date,receipt_id,target
0,0.496714,-0.138264,0.647689,1.52303,-0.234153,three,2008-08-19 17:39:42,861,1
1,-0.234137,1.579213,0.767435,-0.469474,0.54256,one,2008-05-17 21:36:41,1295,1
2,-0.463418,-0.46573,0.241962,-1.91328,-1.724918,three,2008-09-28 13:30:45,1131,1
3,-0.562288,-1.012831,0.314247,-0.908024,-1.412304,three,2008-07-16 03:40:22,1096,1
4,1.465649,-0.225776,0.067528,-1.424748,-0.544383,one,2008-04-12 14:33:01,1639,0


Добавим еще таргет:

In [48]:
np.random.seed(SEED)
mapping = {i: np.random.randint(0, 2) for i in range(1, 3001)}

In [49]:
TARGET = 'target'
data[TARGET] = data[RECEIPT_ID].map(mapping)

In [50]:
data.head()

Unnamed: 0,0,1,2,3,4,cat,date,receipt_id,target
0,0.496714,-0.138264,0.647689,1.52303,-0.234153,three,2008-08-19 17:39:42,861,1
1,-0.234137,1.579213,0.767435,-0.469474,0.54256,one,2008-05-17 21:36:41,1295,1
2,-0.463418,-0.46573,0.241962,-1.91328,-1.724918,three,2008-09-28 13:30:45,1131,1
3,-0.562288,-1.012831,0.314247,-0.908024,-1.412304,three,2008-07-16 03:40:22,1096,1
4,1.465649,-0.225776,0.067528,-1.424748,-0.544383,one,2008-04-12 14:33:01,1639,0


# Preprocessing

### Эта часть - на кластере

In [51]:
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10000 entries, 0 to 9999
Data columns (total 9 columns):
 #   Column      Non-Null Count  Dtype         
---  ------      --------------  -----         
 0   0           10000 non-null  float64       
 1   1           10000 non-null  float64       
 2   2           10000 non-null  float64       
 3   3           10000 non-null  float64       
 4   4           10000 non-null  float64       
 5   cat         10000 non-null  object        
 6   date        10000 non-null  datetime64[ns]
 7   receipt_id  10000 non-null  int64         
 8   target      10000 non-null  int64         
dtypes: datetime64[ns](1), float64(5), int64(2), object(1)
memory usage: 703.2+ KB


В классе ниже можно переопределять его поведение так, как тебе нужно:

In [52]:
from dltranz.data_preprocessing.receipts_pyspark_preprocessor import PysparkDataPreprocessor

In [53]:
cols_log_norm = [str(i) for i in range(5)]
preprocessor = PysparkDataPreprocessor(col_id=RECEIPT_ID, cols_event_time='date', cols_category=['cat'], 
                                       cols_log_norm=cols_log_norm, target_col=TARGET)

In [None]:
sc = pyspark.SparkContext()
sqlCtx = pyspark.SQLContext(sc)

In [55]:
data_spark = sqlCtx.createDataFrame(data)

In [56]:
data_spark.printSchema()

root
 |-- 0: double (nullable = true)
 |-- 1: double (nullable = true)
 |-- 2: double (nullable = true)
 |-- 3: double (nullable = true)
 |-- 4: double (nullable = true)
 |-- cat: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- receipt_id: long (nullable = true)
 |-- target: long (nullable = true)



In [57]:
data_trans = preprocessor.fit_transform(data_spark)

In [58]:
data_trans.printSchema()

root
 |-- receipt_id: long (nullable = true)
 |-- transactions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- 0: double (nullable = true)
 |    |    |-- 1: double (nullable = true)
 |    |    |-- 2: double (nullable = true)
 |    |    |-- 3: double (nullable = true)
 |    |    |-- 4: double (nullable = true)
 |    |    |-- event_time: double (nullable = true)
 |    |    |-- cat: integer (nullable = true)
 |    |    |-- target_target: long (nullable = true)



In [59]:
#получаем размерности для категориальных эмбеддингов
embs_size = preprocessor.get_category_sizes()

In [60]:
embs_size

{'cat': 4}

In [61]:
#сделать репартишн, если нужно
#data_trans = data_trans.repartition(10)

Три функции ниже выполняют схожий функционал, как твои из ноутбуков 04, 05, только абстрагируют свое поведение в зависимости от данных. Можно также прочитать доку для каждой функции и каждого написанного класса

In [62]:
from alpha_data_load.utilities import preprocess_data_template, read_pickle_template, serialize_pickled

Это определение типа данных для каждой фичи, обязательно нужно переопределять для своей задачи

In [63]:
feature_arrays = {
    '0': np.float32,
    '1': np.float32,
    '2': np.float32,
    '3': np.float32,
    '4': np.float32,
    'event_time': np.float32,
    'cat': np.int32,
    'target_target': np.int64
}

In [64]:
#функция для пиклирования
preprocess_data = lambda row: preprocess_data_template(row=row, id=RECEIPT_ID,
                                                       feature_arrays=feature_arrays, array_col='transactions', 
                                                       sort_by='event_time', one_level=True)

In [65]:
pickle_file_path = 'data/receipts_pickled'

In [66]:
data_trans\
    .filter(f'{RECEIPT_ID} is not null')\
    .rdd.map(preprocess_data)\
    .saveAsPickleFile(pickle_file_path)

Дальше нужно перенести данные на сервер и продолжать работать там

### На сервере

#### Сериализация

In [67]:
#обертка для read_pickle_template
#поменять внутренности на те, которые нужны
def read_pickle(file_path):
    col_date = []
    array_col = ''
    date_col = ''
    id_col = RECEIPT_ID
    some_feature = '0'
    one_layer = True
    return read_pickle_template(file_path=file_path,
                             col_date=col_date,
                             array_col=array_col,
                             date_col=date_col,
                             id_col=id_col,
                             some_feature=some_feature,
                             one_layer=one_layer
                            )

In [68]:
file_paths = sorted(glob.glob(f'{pickle_file_path}/part*'))

In [69]:
GLOBAL_PATH = 'data/receipts_serialized'

In [72]:
!mkdir data/receipts_serialized

In [73]:
serialize_pickled(target_folder=GLOBAL_PATH, file_paths=file_paths, read_func=read_pickle)

1it [00:00,  6.68it/s]


#### Работа с моделями

In [74]:
from alpha_data_load.diskfile import DiskFile
from alpha_data_load.dataset import Dataset
from alpha_data_load.dataloader import DataLoader, get_easy_batch_transaction

Твой DiskFile:

In [75]:
data = DiskFile(os.path.join(GLOBAL_PATH, 'data.txt'), os.path.join(GLOBAL_PATH, 'index.txt'))

Dataset это просто абстракция над Diskfile, которая позволяет таскать данные не по id, а просто по индексу

In [76]:
dataset = Dataset(data, app_id=RECEIPT_ID, app_date='', features='', some_feature='0')

In [77]:
dataset[1]

{'0': array([0.06999812, 0.18924706, 0.21722993, 0.5149987 , 0.21089943],
       dtype=float32),
 '1': array([ 0.15990822, -0.40554327,  0.02196037,  0.43131843,  0.36119235],
       dtype=float32),
 '2': array([ 0.4008634 , -0.43726364,  0.07333303,  0.35812753,  0.27695101],
       dtype=float32),
 '3': array([-0.48013213,  0.12490778,  0.2812665 ,  0.2580878 , -0.59865505],
       dtype=float32),
 '4': array([-0.67617327,  0.32774323, -0.44579032,  0.5126856 ,  0.43708882],
       dtype=float32),
 'event_time': array([ 883.05475, 3296.688  , 6320.827  , 7251.8076 , 8140.143  ],
       dtype=float32),
 'cat': array([3, 2, 2, 1, 2], dtype=int32),
 'target_target': array([1, 1, 1, 1, 1]),
 'receipt_id': 2}

Dataloader просто позволяет итерироваться по dataset с произвольным размером батча:

In [78]:
# batch_size нужно задавать настолько большим, насколько это возможно, чтобы за один fit обрабатывалось как можно больше данных
batch_size = 5000

dataloader = DataLoader(dataset=dataset, batch_size=batch_size, get_batch_func=get_easy_batch_transaction)

Как итерироваться по dataloder-у:

In [79]:
%%time

last_batch = None
for batch in dataloader:
    last_batch = batch

CPU times: user 79.4 ms, sys: 24.9 ms, total: 104 ms
Wall time: 108 ms


In [80]:
len(last_batch)

2889

#### Часть 1: составление эмбеддингов

Эта часть обычная для lifestream, можно посмотреть их ноутбук в demo/

In [81]:
from dltranz.seq_encoder import SequenceEncoder
from dltranz.models import Head
from dltranz.lightning_modules.emb_module import EmbModule
from dltranz.inference import get_embeddings
from dltranz.data_load.data_module.emb_data_module import EmbeddingTrainDataModule

import torch
import pytorch_lightning as pl
from pytorch_lightning.loggers import TensorBoardLogger

In [82]:
seq_encoder = SequenceEncoder(
    category_features=preprocessor.get_category_sizes(),
    numeric_features=[str(i) for i in range(5)],
    trx_embedding_noize=0.003, rnn_hidden_size=32
)
head = Head(input_size=seq_encoder.embedding_size, use_norm_encoder=True)

model = EmbModule(seq_encoder=seq_encoder, head=head)

In [83]:
%load_ext tensorboard

Тренировочный цикл:

In [84]:
#%tensorboard --logdir tb_logs --host 10.110.147.198 --port NNNN - на сервере
%tensorboard --logdir tb_logs

Reusing TensorBoard on port 6007 (pid 9791), started 20:57:32 ago. (Use '!kill 9791' to kill it.)

In [85]:
logger = TensorBoardLogger('tb_logs', name='logs_embs_0')

Параметры этого модуля можно и нужно менять:

In [86]:
%%time
start_time = time.time()
for idx, train_set in enumerate(dataloader):
    end_time = time.time()
    print(f'Batch loaded in: {end_time - start_time}')
    dm = EmbeddingTrainDataModule(
        dataset=train_set,
        pl_module=model,
        min_seq_len=0,
        seq_split_strategy='SampleSlices',
        category_names=model.seq_encoder.category_names,
        category_max_size=model.seq_encoder.category_max_size,
        split_count=2,
        split_cnt_min=1,
        split_cnt_max=20,
        train_num_workers=8,
        train_batch_size=256,
        valid_num_workers=8,
        valid_batch_size=256,
    )
    trainer = pl.Trainer(
        progress_bar_refresh_rate=50,
        log_every_n_steps=1,
        max_epochs=5,
        gpus = 0 if torch.cuda.is_available() else None,
        logger=logger
    )
    trainer.fit(model, dm)
    print(f'Batch {idx} processed in {time.time() - end_time}')
    start_time = time.time()

  f"Setting `Trainer(progress_bar_refresh_rate={progress_bar_refresh_rate})` is deprecated in v1.5 and"
GPU available: False, used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs


Batch loaded in: 0.09631514549255371


0it [00:00, ?it/s]

0it [00:00, ?it/s]


  | Name               | Type             | Params
--------------------------------------------------------
0 | _seq_encoder       | SequenceEncoder  | 5.4 K 
1 | _validation_metric | BatchRecallTopPL | 0     
2 | _head              | Head             | 0     
--------------------------------------------------------
5.4 K     Trainable params
0         Non-trainable params
5.4 K     Total params
0.021     Total estimated model params size (MB)


Validation sanity check: 0it [00:00, ?it/s]

Training: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Batch 0 processed in 5.591724157333374
CPU times: user 7.94 s, sys: 1.84 s, total: 9.78 s
Wall time: 5.69 s


Получение эмбеддингов:

In [87]:
%%time

receipts_embeddings = []
start = time.time()
for idx, train_set in enumerate(dataloader):
    train_part = get_embeddings(
        data=train_set,
        model=model,
        category_names=model.seq_encoder.category_names,
        category_max_size=model.seq_encoder.category_max_size
    )
    receipts_embeddings.append(train_part)
    end = time.time()
    print(f'Time taken: {end - start}')
    start = end
receipts_embeddings = np.vstack(receipts_embeddings)

2889it [00:00, 26288.73it/s]
                                             

Time taken: 0.3164360523223877
CPU times: user 292 ms, sys: 34.4 ms, total: 326 ms
Wall time: 317 ms




In [88]:
#[количество чеков, размерность эмбеддинга]
receipts_embeddings.shape

(2889, 32)

#### Часть 2: downstream_task

Сначала нужно распределить чеки по user_id. Так как чики хранились отсортированными по RECEIPTS_ID, то и их эмбеддинги тоже отсортированны, нам нужно просто сделать маппинг (который нужно на реальных данных подготовить еще на кластере)

Предположим, что было 150 клиентов:

In [97]:
#заявки -> соответствующие им промежутки id чеков
app_ids_to_receipts_ids = {i: (20*(i - 1) + 1, 20 * i) for i in range(1, 146)}

In [98]:
#так как только 2882 чека
#таких проблем с реальным маппингом не будет
app_ids_to_receipts_ids[145] = (2881, receipts_embeddings.shape[0] - 1)

Здесь на искуственных данных нужно отследить, что id ровно столько, сколько сгенерилось чеков

In [99]:
receipt_ids_to_app_ids = {
    receipt_id: app_id
    for app_id, tup in app_ids_to_receipts_ids.items()
    for receipt_id in range(tup[0], tup[1] + 1)
}

In [100]:
receipt_ids_to_app_ids[21]

2

Про класс ниже можно почитать доку

In [101]:
from alpha_data_load.utilities import DatasetsGenerator

In [102]:
dataset_generator = DatasetsGenerator(dataset, receipt_ids_to_app_ids, id_col=RECEIPT_ID, target_col='target_target')

In [103]:
res = dataset_generator(receipts_embeddings)

100%|██████████| 145/145 [00:00<00:00, 1560.97it/s]


Как выглядит один семпл: Tuple[Dict, int]

In [104]:
res[0]

({'receipt_id': 1.0,
  '0': array([ 0.27910092,  0.5252086 ,  0.11165908, -0.02218523,  0.24247044,
          0.310381  ,  0.04120712,  0.51737857,  0.45540878,  0.4699042 ,
          0.14008899,  0.01874733,  0.4467544 ,  0.36327326,  0.03977561,
          0.4734821 ,  0.30410612,  0.23712412,  0.16762829,  0.17979477],
        dtype=float32),
  '1': array([-0.08909094, -0.457062  , -0.19610643, -0.09879395, -0.08763146,
         -0.4171199 , -0.08267222, -0.5367806 , -0.35846817, -0.289579  ,
         -0.12352007, -0.28352857, -0.31207332, -0.3169745 ,  0.28155258,
         -0.16134907, -0.36622337, -0.26354057, -0.28993714, -0.44213387],
        dtype=float32),
  '2': array([ 0.01987745, -0.39330494, -0.64115   , -0.30135262, -0.2698008 ,
         -0.33657405, -0.1455698 , -0.12504806, -0.26581216, -0.54488486,
         -0.7667012 , -0.338543  , -0.14303362, -0.4619649 , -0.28063688,
         -0.33899334, -0.5601277 ,  0.09214871, -0.20367323, -0.5832383 ],
        dtype=float32),
 

Мои модули, можешь зайти туда и сам менять какие-то вещи, либо (пере)просто писать нужный класс для lightning на основе тех, что есть в lifestream

In [108]:
from dltranz.lightning_modules.classification_module import ClassificationModule
from dltranz.data_load.data_module.receipts_data_module import ReceiptsTrainDataModule

In [109]:
rnn_hid = 16
new_seq_encoder = SequenceEncoder(category_features={},
                                  numeric_features=dataset_generator.features,
                                  rnn_hidden_size=rnn_hid
                                 )
new_head = Head(rnn_hid, True, objective='classification')

In [116]:
class_model = ClassificationModule(new_seq_encoder, new_head)

Если данных будет много, нужно будет делать следующие шаги с подгрузкой данных в оперативку или сразу после получения части эмбеддингов, сейчас просто за 1 fit

In [111]:
logger_classification = TensorBoardLogger('tb_logs', name='logs_class_0')

In [118]:
test_size = 10

data_module = ReceiptsTrainDataModule(
    dataset=res[:-test_size],
    test_dataset=deepcopy(res[-test_size:]),
    train_num_workers=8,
    train_batch_size=64,
    valid_num_workers=8,
    valid_batch_size=64,
    val_size=0.3,
    weighted=True
)

In [119]:
classification_trainer = pl.Trainer(
    progress_bar_refresh_rate=50,
    max_epochs=10,
    gpus=0 if torch.cuda.is_available() else None,
    log_every_n_steps=1,
    logger=logger_classification
)

GPU available: False, used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs


In [120]:
%%time

classification_trainer.fit(class_model, data_module)


  | Name               | Type            | Params
-------------------------------------------------------
0 | _loss              | BCELoss         | 0     
1 | _seq_encoder       | SequenceEncoder | 2.5 K 
2 | _validation_metric | AUROC           | 0     
3 | _head              | Head            | 17    
-------------------------------------------------------
2.5 K     Trainable params
0         Non-trainable params
2.5 K     Total params
0.010     Total estimated model params size (MB)


Validation sanity check: 0it [00:00, ?it/s]

Training: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

CPU times: user 2.1 s, sys: 1.77 s, total: 3.87 s
Wall time: 4.09 s
