In [1]:
import os
os.environ["OMP_NUM_THREADS"] = "4"
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = "7"

import pandas as pd
import numpy as np
import torch
from functools import partial
import pytorch_lightning as pl
import warnings
warnings.filterwarnings("ignore")

from torch.utils.data import DataLoader

from ptls.data_load.datasets import MemoryMapDataset
from ptls.data_load.iterable_processing.iterable_seq_len_limit import ISeqLenLimit
from ptls.data_load.iterable_processing.to_torch_tensor import ToTorch
from ptls.data_load.iterable_processing.feature_filter import FeatureFilter
from ptls.nn import TrxEncoder, RnnSeqEncoder
from ptls.frames.coles import CoLESModule
from ptls.data_load.iterable_processing import SeqLenFilter
from ptls.frames.coles import ColesIterableDataset
from ptls.frames.coles.split_strategy import SampleSlices
from ptls.frames import PtlsDataModule
from ptls.preprocessing import PandasDataPreprocessor
from ptls.data_load.utils import collate_feature_dict
from ptls.data_load.iterable_processing_dataset import IterableProcessingDataset

from tqdm.auto import tqdm
import lightgbm as ltb

from datetime import datetime

pd.set_option('display.expand_frame_repr', False)

  from .autonotebook import tqdm as notebook_tqdm


# Data preprocessing

In [4]:
transactions_train = pd.read_parquet("geo_train.parquet")
transactions_test = pd.read_parquet("geo_test.parquet")

transactions_train = transactions_train.drop(columns=['mon'])

In [6]:
%%time

preprocessor = PandasDataPreprocessor(
    col_id="client_id",
    col_event_time="event_time",
    event_time_transformation="dt_to_timestamp",
    cols_category=['geohash_4',
                   'geohash_5',
                   'geohash_6'],
    return_records=False,
)

processed_train = preprocessor.fit_transform(transactions_train)

processed_test = preprocessor.transform(transactions_test)

CPU times: user 8min 42s, sys: 44.8 s, total: 9min 27s
Wall time: 9min 25s


In [7]:
target_train = pd.read_parquet("train_target.parquet")

target_preprocessor = PandasDataPreprocessor(
    col_id="client_id",
    col_event_time="mon",
    event_time_transformation="dt_to_timestamp",
    cols_identity=["target_1", "target_2", "target_3", "target_4"],
    return_records=False,
)

processed_target = target_preprocessor.fit_transform(target_train)

In [8]:
test_target_b = pd.read_parquet("test_target_ids.parquet")

**Обработка датасета:**

- Транзакции, у которых размер < min_seq_len выкидываются
- Транзакции, у которых длина > max_seq_len, обрезаются и конвертируются в torch.tensor
- Не нужные для CoLES фичи удаляются

In [9]:
train = MemoryMapDataset(
    data=processed_train.to_dict("records"),
    i_filters=[
        FeatureFilter(drop_feature_names=['client_id', 'target_1', 'target_2', 'target_3', 'target_4']),
        SeqLenFilter(min_seq_len=32),
        ISeqLenLimit(max_seq_len=4096),
        ToTorch()
    ]
)

test = MemoryMapDataset(
    data=processed_test.to_dict("records"),
    i_filters=[
        FeatureFilter(drop_feature_names=['client_id', 'target_1', 'target_2', 'target_3', 'target_4']),
        SeqLenFilter(min_seq_len=32),
        ISeqLenLimit(max_seq_len=4096),
        ToTorch()
    ]
)

In [10]:
train_ds = ColesIterableDataset(
    data=train,
    splitter=SampleSlices(
        split_count=5,
        cnt_min=32,
        cnt_max=180
    )
)

valid_ds = ColesIterableDataset(
    data=test,
    splitter=SampleSlices(
        split_count=5,
        cnt_min=32,
        cnt_max=180
    )
)

In [11]:
train_dl = PtlsDataModule(
    train_data=train_ds,
    train_num_workers=16,
    train_batch_size=256,
    valid_data=valid_ds,
    valid_num_workers=16,
    valid_batch_size=256
)

# Model

- numeric_values обрабатываются как BatchNorm+Linear
- embedidngs - nn.Embedidngs

In [12]:
trx_encoder_params = dict(
    embeddings_noise=0.003,
    embeddings={
        "geohash_4": {'in': preprocessor.get_category_dictionary_sizes()["geohash_4"], "out": 24},
        "geohash_5": {'in': preprocessor.get_category_dictionary_sizes()["geohash_5"], "out": 24},
        'geohash_6': {'in': preprocessor.get_category_dictionary_sizes()["geohash_6"], 'out': 24},
      }
)

- **TrxEncoder** - обрабатывает каждую тразнакцию (строит для неё эмбеддиг)
- **SeqEncoder** - обрабатывает последовательность

In [13]:
seq_encoder = RnnSeqEncoder(
    trx_encoder=TrxEncoder(**trx_encoder_params),
    hidden_size=256,
    type='gru',
)

In [14]:
model = CoLESModule(
    seq_encoder=seq_encoder,
    optimizer_partial=partial(torch.optim.Adam, lr=0.001),
    lr_scheduler_partial=partial(torch.optim.lr_scheduler.StepLR, step_size=3, gamma=0.9025)
)

# Train

In [15]:
trainer = pl.Trainer(
    max_epochs=30,
    limit_val_batches=5000,
#     gpus=[0],
    devices='auto',
    enable_progress_bar=True,
    gradient_clip_val=0.5,
    logger=pl.loggers.TensorBoardLogger(
        save_dir='./logdir',
        name='baseline_result'
    ),
    callbacks=[
        pl.callbacks.LearningRateMonitor(logging_interval='step'),
        pl.callbacks.ModelCheckpoint(every_n_train_steps=5000, save_top_k=-1),
        pl.callbacks.EarlyStopping(monitor='valid/recall_top_k', mode="max")
    ]
)

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs


In [16]:
trainer.fit(model, train_dl)

2024-07-03 17:31:17.335302: E tensorflow/compiler/xla/stream_executor/cuda/cuda_dnn.cc:9342] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-07-03 17:31:17.335352: E tensorflow/compiler/xla/stream_executor/cuda/cuda_fft.cc:609] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-07-03 17:31:17.335369: E tensorflow/compiler/xla/stream_executor/cuda/cuda_blas.cc:1518] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-07-03 17:31:17.341531: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [7]

  | Nam

Epoch 0: 100%|██████████| 389/389 [00:23<00:00, 16.21it/s, v_num=3, seq_len=101.0]
Validation: |          | 0/? [00:00<?, ?it/s][A
Validation:   0%|          | 0/588 [00:00<?, ?it/s][A
Validation DataLoader 0:   0%|          | 0/588 [00:00<?, ?it/s][A
Validation DataLoader 0:   0%|          | 1/588 [00:00<00:21, 27.26it/s][A
Validation DataLoader 0:   0%|          | 2/588 [00:00<00:20, 28.80it/s][A
Validation DataLoader 0:   1%|          | 3/588 [00:00<00:20, 28.37it/s][A
Validation DataLoader 0:   1%|          | 4/588 [00:00<00:21, 27.80it/s][A
Validation DataLoader 0:   1%|          | 5/588 [00:00<00:20, 28.13it/s][A
Validation DataLoader 0:   1%|          | 6/588 [00:00<00:20, 28.22it/s][A
Validation DataLoader 0:   1%|          | 7/588 [00:00<00:20, 28.31it/s][A
Validation DataLoader 0:   1%|▏         | 8/588 [00:00<00:20, 28.41it/s][A
Validation DataLoader 0:   2%|▏         | 9/588 [00:00<00:20, 28.33it/s][A
Validation DataLoader 0:   2%|▏         | 10/588 [00:00<00:20

In [17]:
torch.save(model.state_dict(), './model_geo.pt')

# Inference

Для каждого пользователя известно 12 таргетов, инференс происходит следующим образом:

Чтобы не происходило лика нужно для каждого клиента делать срез до текущего месяца:

Берутся все тразнакции за первый месяц, им соответствует 1-ый таргет из 12,
потом берутся транзакции за первый и второй месяц пользователя и им соотвествует 2-ой таргет и так далее.
То есть для данного пользователя, имеющего транзакции за год, мы можем получить 12 эмбеддингов, каждому из которых соответствует 1 таргет

In [18]:
class GetSplit(IterableProcessingDataset):
    def __init__(
        self,
        start_month,
        end_month,
        year=2022,
        col_id='client_id',
        col_time='event_time'
    ):
        super().__init__()
        self.start_month = start_month
        self.end_month = end_month
        self._year = year
        self._col_id = col_id
        self._col_time = col_time

    def __iter__(self):
        for rec in self._src:
            for month in range(self.start_month, self.end_month+1):
                features = rec[0] if type(rec) is tuple else rec
                features = features.copy()

                if month == 12:
                    month_event_time = datetime(self._year + 1, 1, 1).timestamp()
                else:
                    month_event_time = datetime(self._year, month + 1, 1).timestamp()

                year_event_time = datetime(self._year, 1, 1).timestamp()

                mask = features[self._col_time] < month_event_time

                for key, tensor in features.items():
                    if key.startswith('target'):
                        features[key] = tensor[month - 1].tolist()
                    elif key != self._col_id:
                        features[key] = tensor[mask]

                features[self._col_id] += '_month=' + str(month)

                yield features

def collate_feature_dict_with_target(batch, col_id='client_id', targets=False):
    batch_ids = []
    target_cols = []
    for sample in batch:
        batch_ids.append(sample[col_id])
        del sample[col_id]

        if targets:
            target_cols.append([sample[f'target_{i}'] for i in range(1, 5)])
            del sample['target_1']
            del sample['target_2']
            del sample['target_3']
            del sample['target_4']

    padded_batch = collate_feature_dict(batch)
    if targets:
        return padded_batch, batch_ids, target_cols
    return padded_batch, batch_ids


class InferenceModuleMultimodal(pl.LightningModule):
    def __init__(self, model, pandas_output=True, drop_seq_features=True, model_out_name='out'):
        super().__init__()

        self.model = model
        self.pandas_output = pandas_output
        self.drop_seq_features = drop_seq_features
        self.model_out_name = model_out_name

    def forward(self, x):
        x_len = len(x)
        if x_len == 3:
            x, batch_ids, target_cols = x
        else:
            x, batch_ids = x

        out = self.model(x)
        if x_len == 3:
            target_cols = torch.tensor(target_cols)
            x_out = {
                'client_id': batch_ids,
                'target_1': target_cols[:, 0],
                'target_2': target_cols[:, 1],
                'target_3': target_cols[:, 2],
                'target_4': target_cols[:, 3],
                self.model_out_name: out
            }
        else:
            x_out = {
                'client_id': batch_ids,
                self.model_out_name: out
            }
        torch.cuda.empty_cache()

        if self.pandas_output:
            return self.to_pandas(x_out)
        return x_out

    @staticmethod
    def to_pandas(x):
        expand_cols = []
        scalar_features = {}

        for k, v in x.items():
            if type(v) is torch.Tensor:
                v = v.cpu().numpy()

            if type(v) is list or len(v.shape) == 1:
                scalar_features[k] = v
            elif len(v.shape) == 2:
                expand_cols.append(k)
            else:
                scalar_features[k] = None

        dataframes = [pd.DataFrame(scalar_features)]
        for col in expand_cols:
            v = x[col].cpu().numpy()
            dataframes.append(pd.DataFrame(v, columns=[f'{col}_{i:04d}' for i in range(v.shape[1])]))

        return pd.concat(dataframes, axis=1)

In [19]:
%%time

train = MemoryMapDataset(
    data=processed_train.merge(processed_target.drop("event_time", axis=1), on="client_id", how="inner").to_dict("records"),
    i_filters=[
        ISeqLenLimit(max_seq_len=4096),
        FeatureFilter(keep_feature_names=['client_id', 'target_1', 'target_2', 'target_3', 'target_4']),
        GetSplit(start_month=1, end_month=12),
        ToTorch(),
    ]
)

test = MemoryMapDataset(
    data=processed_test.to_dict("records"),
    i_filters=[
        ISeqLenLimit(max_seq_len=4096),
        FeatureFilter(keep_feature_names=['client_id', 'target_1', 'target_2', 'target_3', 'target_4']),
        ToTorch(),
    ]
)

CPU times: user 3min 47s, sys: 10.4 s, total: 3min 57s
Wall time: 1min 58s


In [20]:
inference_train_dl = DataLoader(
        dataset=train,
        collate_fn=partial(collate_feature_dict_with_target, targets=True),
        shuffle=False,
        num_workers=0,
        batch_size=256,
    )

inference_test_dl = DataLoader(
        dataset=test,
        collate_fn=collate_feature_dict_with_target,
        shuffle=False,
        num_workers=0,
        batch_size=256,
    )

In [21]:
inf_module = InferenceModuleMultimodal(
        model=model,
        pandas_output=True,
        drop_seq_features=True,
        model_out_name='emb',
    )

In [22]:
trainer = pl.Trainer(max_epochs=-1)

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs


In [23]:
inf_test_embeddings = pd.concat(
        trainer.predict(inf_module, inference_test_dl)
    )
inf_test_embeddings.to_parquet("test_geo.parquet", index=False, engine="pyarrow", compression="snappy")

LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [7]


Predicting DataLoader 0: 100%|██████████| 652/652 [01:13<00:00,  8.82it/s]


In [24]:
inf_train_embeddings = pd.concat(
        trainer.predict(inf_module, inference_train_dl)
    )

inf_train_embeddings.to_parquet("train_geo.parquet", index=False, engine="pyarrow", compression="snappy")

LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [7]


Predicting DataLoader 0: 100%|██████████| 5145/5145 [08:36<00:00,  9.96it/s]


In [26]:
not_only_trx = pd.DataFrame({"client_id": test_target_b["client_id"].unique()}).merge(inf_test_embeddings, how="left").fillna(0)
not_only_trx

Unnamed: 0,client_id,emb_0000,emb_0001,emb_0002,emb_0003,emb_0004,emb_0005,emb_0006,emb_0007,emb_0008,...,emb_0246,emb_0247,emb_0248,emb_0249,emb_0250,emb_0251,emb_0252,emb_0253,emb_0254,emb_0255
0,03478d5f75a2b651bfd3ae66836b0a54313d1cea05d75e...,0.993686,-0.706407,-0.99847,0.927595,-0.064002,-0.966055,-0.981383,0.905016,-0.356564,...,0.079912,-0.932962,0.877867,-0.310506,-0.459784,0.202829,-0.028776,0.688743,0.059694,0.849863
1,4bf106c9764392df0850cd907daa93e97dad7df8b35cb9...,0.000000,0.000000,0.00000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,...,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000
2,4c9f58011f50bef4ea99b4f22f5a3264ed1cfb60d23b9f...,0.000000,0.000000,0.00000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,...,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000
3,51d17a1af833d5640f5402d450bdf16dea81329a73648d...,0.000000,0.000000,0.00000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,...,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000
4,52c6fd670cfd93f9075fbdd580d3d4819afa2661a39253...,0.000000,0.000000,0.00000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,...,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
140483,c6041ce381f3df521d1dae3350ccf9b7a5c295270aaa65...,0.000000,0.000000,0.00000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,...,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000
140484,c4995db29ee447c347d7b92619350762b26c93500b90ce...,0.000000,0.000000,0.00000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,...,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000
140485,c45249a15c44bde22eec62b6881983769cd86bc6958cac...,0.000000,0.000000,0.00000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,...,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000
140486,cc92973ca2f42eab12d0af7bc64a5489691af4a135f42f...,0.000000,0.000000,0.00000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,...,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000


In [27]:
not_only_trx.to_parquet("geo_not_only_trx.parquet", index=False, engine="pyarrow", compression="snappy")