In [1]:
cd ../

/Users/hoangle/Projects/two-tower


In [8]:
import pandas as pd
import polars as pl
import torch
import torch.nn as nn
import numpy as np
import torch.nn.functional as F
import lightning as L
from torch.utils.data import DataLoader, Dataset
from torch.optim import AdamW
from torch.nn import Module
from torch import Tensor
from loguru import logger
from lightning.pytorch.callbacks import TQDMProgressBar

from utils import Paths

# Read processed data

In [3]:
paths = Paths("steam", "loo")

In [9]:
raw_train = pl.read_parquet(paths.train())
raw_val = pl.read_parquet(paths.val())
raw_test = pl.read_parquet(paths.test())

In [10]:
raw_val.head()

user_id,product_id,timestamp,genres,price
i64,i64,i64,list[i32],f32
0,1,1514937600,"[1, 4]",19.99
1,9,1489190400,"[1, 8]",19.99
2,38,1515110400,"[1, 14]",59.990002
3,44,1515110400,"[1, 4, 6]",2.99
4,82,1514592000,"[4, 6]",19.99


## Add negative samples into training

In [34]:
inters_positive = pl.concat(
    [
        raw_train.select('user_id', 'product_id'),
        raw_val.select('user_id', 'product_id'),
        raw_test.select('user_id', 'product_id')
    ],
)

In [42]:
N = 100 + 10 + 10

users = raw_train.select('user_id').unique()
products = raw_train.select('product_id').unique()

inters_negative = (
    users
    .join(products, how='cross')
    .join(inters_positive, on=['user_id', 'product_id'], how='anti')

    # Get N negative interactions for each user
    .with_columns(
        pl.col('product_id').rank(method='ordinal').over('user_id').alias('rank')
    )
    .filter(pl.col('rank') <= N)
)

inters_negative.head()

user_id,product_id,rank
i64,i64,u32
9444,21,22
9444,3,4
9444,6,7
9444,12,13
9444,33,34


In [41]:
(
    inters_negative
    
).head()

user_id,product_id,rank
i64,i64,u32
23857,9,9
23857,27,27
23857,12,12
23857,15,15
23857,18,18


# Define data loader

In [6]:
MAX_N_GENRES = 9
DEVICE = torch.device("mps")
BSZ = 100
N_EPOCHS = 20

N_USERS = 25569
N_PRODS = 4008
N_GENRES = 22

In [7]:
class DatasetRS(Dataset):
    def __init__(self, data: pd.DataFrame) -> None:
        super().__init__()

        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        item = self.data.iloc[index]

        raw_genres = item['genres']
        raw_pad = np.zeros_like(raw_genres)

        genres = torch.tensor(np.pad(raw_genres, (0, MAX_N_GENRES - len(raw_genres))), device=DEVICE, dtype=torch.int32)
        pad = torch.tensor(np.pad(raw_pad, (0, MAX_N_GENRES - len(raw_pad))), device=DEVICE, dtype=torch.int32)

        user = torch.tensor(item['user_id'], device=DEVICE, dtype=torch.int32)
        product = torch.tensor(item['product_id'], device=DEVICE, dtype=torch.int32)
        price = torch.tensor(item['price'], device=DEVICE, dtype=torch.float32)

        return {'user': user, 'product': product, 'genres': genres, 'pad': pad, 'price': price}

# Define model

In [8]:



class Block(Module):
    def __init__(self, n: int, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)

        self.seq1 = nn.Sequential(
            nn.Linear(n, n),
            nn.LayerNorm(n),
            nn.GELU(),
        )
        self.seq2 = nn.Sequential(
            nn.LayerNorm(n),
            nn.GELU(),
        )

    def forward(self, X: Tensor) -> Tensor:
        out = self.seq2(self.seq1(X) + X)

        return out

class TwoTower(Module):
    def __init__(
        self,
        d_hid: int = 128,
        n_blocks: int = 2,
    ) -> None:
        super().__init__()

        self.embd_users = nn.Embedding(N_USERS, d_hid)
        self.embd_prods = nn.Embedding(N_PRODS, d_hid)
        self.embd_genres = nn.Embedding(N_GENRES, d_hid)

        self.block_user = nn.ModuleList([Block(d_hid)]*n_blocks)
        self.block_prod = nn.ModuleList([Block(2 * d_hid + 1)]*n_blocks)
        self.lin1 = nn.Linear(2 * d_hid + 1, d_hid)

        self.ff1 = nn.Sequential(
            nn.Linear(4 * d_hid, 4 * d_hid),
            nn.LayerNorm(4 * d_hid),
            nn.GELU(),
        )
        self.lin2 = nn.Linear(4 * d_hid, 1)

    def forward(self, users: Tensor, products: Tensor, genres: Tensor, pads: Tensor, prices: Tensor) -> Tensor:
        # users, products, prices: [bz]
        # genres, pads: [bz, MAX_N_GENRES]

        ##################################
        # Step 1: Embed
        ##################################
        users = self.embd_users(users)
        # [bz, d_hid]

        prods = self.embd_prods(products)
        # [bz, d_hid]
        genres = self.embd_genres(genres)
        # [bz, MAX_N_GENRES, d_hid]
        genres *= pads.unsqueeze(-1)
        # [bz, MAX_N_GENRES, d_hid]
        genres = torch.sum(genres, dim=1)
        # [bz, d_hid]

        prods = torch.concat([prods, genres, prices.unsqueeze(-1)], dim=-1)
        # [bz, 2 * d_hid + 1]

        ##################################
        # Step 2: Block
        ##################################
        for block in self.block_user:
            users = block(users)
            # [bz, d_hid]
        for block in self.block_prod:
            prods = block(prods)
            # [bz, d_hid]
        # [bz, 2 * d_hid + 1]
        prods = self.lin1(prods)
        # [bz, d_hid]

        ##################################
        # Step 3: Alignment
        ##################################
        # aligned = torch.concat([users, prods, users - prods, users * prods], dim=-1)
        # [bz, 4 * d_hid]
        # aligned = self.ff1(aligned)
        # logit = F.sigmoid(self.lin2(aligned))

        users = users.unsqueeze(1)
        prods = prods.unsqueeze(-1)
        # logger.debug(f"users: {users.shape}")
        # logger.debug(f"prods: {prods.shape}")

        aligned = users @ prods


        return aligned

class LitTwoTower(L.LightningModule):
    def __init__(
        self,
        d_hid: int = 128,
        n_blocks: int = 2,
        lr: float = 1e-4
    ) -> None:
        super().__init__()

        self.lr = lr

        self.model = TwoTower(d_hid=d_hid, n_blocks=n_blocks)
        self.criterion = nn.BCEWithLogitsLoss()

    def training_step(self, batch, batch_idx) -> Tensor:
        out = self.model(batch['user'], batch['product'], batch['genres'], batch['pad'], batch['price'])
        tgt = torch.ones_like(out, device=DEVICE, dtype=torch.float32)

        loss = self.criterion(out, tgt)

        self.log("train_loss", loss, on_epoch=True, prog_bar=True)

        return loss
    
    def configure_optimizers(self):
        optimizer = AdamW(self.parameters(), lr=self.lr)

        return optimizer

# for x in loader_train:
#     break
# model = TwoTower().to(device=DEVICE)
# model(x['user'], x['product'], x['genres'], x['pad'], x['price'])

In [9]:
loader_train = DataLoader(DatasetRS(raw_train), batch_size=BSZ, shuffle=True,)
# loader_val = DataLoader(DatasetRS(raw_val), batch_size=BSZ)
# model = TwoTower().to(device=DEVICE)

# criterion = nn.BCEWithLogitsLoss()
# optimizer = AdamW(model.parameters())
litmodel = LitTwoTower()

trainer = L.Trainer(
    accelerator="mps",
    devices=1,
    max_epochs=N_EPOCHS,
    callbacks=[TQDMProgressBar(leave=True)]
)

GPU available: True (mps), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs


# Start training

In [10]:
# model.train() 

# for epch in range(1, N_EPOCHS + 1):
#     for x in loader_train:
#         optimizer.zero_grad()

#         out = model(x['user'], x['product'], x['genres'], x['pad'], x['price'])
#         tgt = torch.ones_like(out, device=DEVICE, dtype=torch.float32)

#         loss = criterion(out, tgt)

#         loss.backward()
#         optimizer.step()
#         logger.info(f"loss: {loss.item():.4f}")

trainer.fit(litmodel, train_dataloaders=loader_train)


  | Name      | Type              | Params | Mode 
--------------------------------------------------------
0 | model     | TwoTower          | 4.2 M  | train
1 | criterion | BCEWithLogitsLoss | 0      | train
--------------------------------------------------------
4.2 M     Trainable params
0         Non-trainable params
4.2 M     Total params
16.681    Total estimated model params size (MB)
29        Modules in train mode
0         Modules in eval mode
/Users/hoangle/miniforge3/envs/py/lib/python3.11/site-packages/lightning/pytorch/trainer/connectors/data_connector.py:424: The 'train_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=9` in the `DataLoader` to improve performance.


Training: |          | 0/? [00:00<?, ?it/s]

Training: |          | 0/? [00:00<?, ?it/s]


Detected KeyboardInterrupt, attempting graceful shutdown ...


NameError: name 'exit' is not defined