In [1]:
import pandas as pd
import numpy as np
import torch
import pytorch_lightning as pl

from torch.utils.data import Dataset, DataLoader
from typing import Dict, Tuple, Optional

In [2]:
df = pd.read_csv('data/filtered_data.csv', index_col='Unnamed: 0')
df.drop('count_', axis=1, inplace=True)

df.order_ts = pd.to_datetime(df['order_ts'])
df.sort_values(['user_id', 'order_ts'], inplace=True)

  mask |= (ar1 == a)


In [3]:
df.groupby('user_id').count()['item_id'].quantile(.95)

62.0

In [4]:
PATH = 'data/filtered_data.csv'
TRUE = 1
FALSE = 0

class PurchaseHistory(Dataset):
    """
    Датасет истории покупок
    """
    def __init__(self, mode="Train", max_len=70, data_dir=PATH, neg_sample_size=100):
        self.mode = mode
        self.max_len = max_len
        self.data_dir = data_dir
        self.neg_sample_size = neg_sample_size
        self.user_seq, self.item_seq, self.user2idx, self.item2idx, self.item_size = self._preprocess()
        self.negative_sample = self._popular_sampler(self.item_seq)
        
        self.PAD = 0
        self.MASK = len(self.item_seq) + 1

    def _preprocess(self) -> Tuple[pd.DataFrame, pd.Series, Dict[int, int], Dict[int, int], int]:
        """
        Загрузка и препроцессинг данных
        """
        df = pd.read_csv(self.data_dir, index_col='Unnamed: 0')
        df.drop('count_', axis=1, inplace=True)
        df.order_ts = pd.to_datetime(df['order_ts'])
        df.sort_values(['user_id', 'order_ts'], inplace=True)
        
        user2idx = {v: k for k, v in enumerate(df['user_id'].unique())}
        item2idx = {v: k + 1 for k, v in enumerate(df['item_id'].unique())}
        item_size = len(item2idx)
        
        df['user_id'] = df['user_id'].map(user2idx)
        df['item_id'] = df['item_id'].map(item2idx)
        
        user_seq = df.groupby(by="user_id")
        user_seq = user_seq.apply(lambda user: list(user["item_id"]))
        
        user_seq = user_seq[user_seq.agg(len) > 1]
        if self.mode == 'Train':
            pass
        else:
            user_seq = user_seq[user_seq.agg(len) > 4]
        
        return user_seq, df.groupby(by="item_id").size(), user2idx, item2idx, item_size

    def _popular_sampler(self, item_seq: pd.Series) -> pd.Index:
        """
        Сэмплинг популярных товаров
        """
        popular_item = item_seq.sort_values(ascending=False).index
        return popular_item

    def _eval_dataset(self, tokens: list, labels: list) -> Tuple[torch.LongTensor, torch.LongTensor, torch.LongTensor]:
        """
        Создание валидационной/тестовой выборки
        - Leave-one-out evaluation
        """
        candidates = []
        candidates.append(tokens[-1])

        sample_count = 0
        for item in self.negative_sample:
            if sample_count == self.neg_sample_size:
                break
            if item not in set(tokens):
                candidates.append(item)
                sample_count += 1
        
        tokens = tokens[:-1] + [self.MASK]
        tokens = tokens[-self.max_len:] 
        
        pad_len = self.max_len - len(tokens)
        tokens = [self.PAD] * pad_len + tokens
        
        labels = [TRUE] + [FALSE] * self.neg_sample_size

        return torch.LongTensor(tokens), torch.LongTensor(candidates), torch.LongTensor(labels)

    def __len__(self):
        return len(self.user_seq)

    def __getitem__(self, index):

        seq = self.user_seq[index]
        tokens = []
        labels = []

        if self.mode == "Train":
            if len(seq) <= 4:
                tokens = seq[:-1] + [self.MASK]
                labels = [self.PAD] * (len(seq) - 1) + [seq[-1]]
                
                tokens = tokens[-self.max_len:]
                labels = labels[-self.max_len:]
                pad_len = self.max_len - len(tokens)
                tokens = [self.PAD] * pad_len + tokens
                labels = [self.PAD] * pad_len + labels
                return torch.LongTensor(tokens), torch.LongTensor(labels)
            
            tokens = seq[:-3] + [self.MASK]
            labels = [self.PAD] * (len(seq) - 3) + [seq[-3]]
            
            tokens = tokens[-self.max_len:]
            labels = labels[-self.max_len:]
            pad_len = self.max_len - len(tokens)
            
            tokens = [self.PAD] * pad_len + tokens
            labels = [self.PAD] * pad_len + labels

            return torch.LongTensor(tokens), torch.LongTensor(labels)

        elif self.mode == "Valid":
            tokens = seq[:-1]
            return self._eval_dataset(tokens, labels)

        elif self.mode == "Test":
            tokens = seq[:]
            return self._eval_dataset(tokens, labels)

In [5]:
ph_train = PurchaseHistory()
len(ph_train)

  mask |= (ar1 == a)


937747

In [7]:
ph_val = PurchaseHistory(mode='Valid')
len(ph_val)

  mask |= (ar1 == a)


728575

In [15]:
class DataModule(pl.LightningDataModule):
    """
    DataModule
    - Создание train/valid/test dataloader
    """
    def __init__(self, max_len=70, data_dir=PATH, neg_sample_size=100,
                pin_memory=True, num_workers=4, batch_size=256):
        """
        Initialize DataModule
        - Dataset args
        """
        super(DataModule, self).__init__()
        # Dataset related settings
        self.max_len = max_len
        self.neg_sample_size = neg_sample_size
        self.data_dir = data_dir
        # DataLoader related settings
        self.pin_memory = pin_memory
        self.num_workers = num_workers
        self.batch_size = batch_size
        # Assign vocab size
        self.train_data = PurchaseHistory(
            mode="Train", max_len=self.max_len, data_dir=self.data_dir)
        self.item_size = self.train_data.item_size

    def setup(self, stage: Optional[str] = None) -> None:
        """
        Create train/valid/test datasets
        """
        if stage == "fit" or stage is None:
            self.valid_data = PurchaseHistory(
                mode="Valid", max_len=self.max_len, mask_prob=self.mask_prob, data_dir=self.data_dir,
                neg_sample_size=self.neg_sample_size)
        
        if stage == "test" or stage is None:
            self.test_data = PurchaseHistory(
                mode="Test", max_len=self.max_len, mask_prob=self.mask_prob, data_dir=self.data_dir,
                neg_sample_size=self.neg_sample_size)

    def train_dataloader(self) -> DataLoader:
        return DataLoader(self.train_data, batch_size=self.batch_size, shuffle=True, pin_memory=self.pin_memory, num_workers=self.num_workers)

    def val_dataloader(self) -> DataLoader:
        return DataLoader(self.valid_data, batch_size=self.batch_size, shuffle=False, pin_memory=self.pin_memory, num_workers=self.num_workers)

    def test_dataloader(self) -> DataLoader:
        return DataLoader(self.test_data, batch_size=self.batch_size, shuffle=False, pin_memory=self.pin_memory, num_workers=self.num_workers)


In [16]:
args = {
    'max_len': 70,
    'neg_sample_size': 100,
    'data_dir': PATH,
    'pin_memory': True,
    'num_workers': 4,
    'batch_size': 256
}
dl = DataModule(**args)

  mask |= (ar1 == a)
