In [9]:
import torch.nn as nn
import torch
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import TensorDataset

from torch.utils.data import Dataset, DataLoader

import pandas as pd
import numpy as np

from datetime import datetime
from tqdm import tqdm

import random
from pathlib import Path

from sklearn.model_selection import train_test_split

In [10]:
import os
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'

In [14]:
BASE_DIR = Path(os.getcwd()).parent
DATA_DIR = BASE_DIR / "data"
df_users = pd.read_parquet(DATA_DIR / 'user_features_clean.parquet')
df_movies = pd.read_parquet(DATA_DIR / 'Movies_clean_Vec_v4_25keywords.parquet')
df_ratings = pd.read_parquet(DATA_DIR / 'ratings_groupped_ids.parquet')

# Przygotowanie movieId dla datasetów

In [15]:
print(df_users.info())
print(df_ratings.info())
print(df_movies.info())

empty_pos_ratings = df_ratings['pos'].apply(lambda x: len(x) == 0).sum()
empty_neg_ratings = df_ratings['neg'].apply(lambda x: len(x) == 0).sum()

if empty_pos_ratings != 0 or empty_neg_ratings != 0:
    print(f'Empty ratings: pos: {empty_pos_ratings}, neg: {empty_neg_ratings}')
    raise Exception("Users without a single pos/neg rating exist in the ratings_groupped_ids dataset")

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 198832 entries, 0 to 198831
Data columns (total 29 columns):
 #   Column                   Non-Null Count   Dtype  
---  ------                   --------------   -----  
 0   userId                   198832 non-null  int64  
 1   num_rating               198832 non-null  float64
 2   avg_rating               198832 non-null  float64
 3   weekend_watcher          198832 non-null  float64
 4   genre_Action             198832 non-null  float64
 5   genre_Adventure          198832 non-null  float64
 6   genre_Animation          198832 non-null  float64
 7   genre_Comedy             198832 non-null  float64
 8   genre_Crime              198832 non-null  float64
 9   genre_Documentary        198832 non-null  float64
 10  genre_Drama              198832 non-null  float64
 11  genre_Family             198832 non-null  float64
 12  genre_Fantasy            198832 non-null  float64
 13  genre_History            198832 non-null  float64
 14  genr

In [16]:
unique_ids = set(
        df_users['movies_seq'].explode().tolist()
        + df_ratings['pos'].explode().tolist() 
        + df_ratings['neg'].explode().tolist()
    )

print('Unique movieIds:', len(unique_ids))
unique_ids = sorted(unique_ids)

movieId_to_idx = {id_: idx for idx, id_ in enumerate(unique_ids)}
print('min idx:', min(movieId_to_idx.values()))
print('max idx:', max(movieId_to_idx.values()))

n_items = len(unique_ids)

assert min(movieId_to_idx.values()) == 0
assert max(movieId_to_idx.values()) == n_items - 1

Unique movieIds: 82932
min idx: 0
max idx: 82931


In [17]:
# Zmapuj movieId do indeksów
df_users['movies_seq'] = df_users['movies_seq'].apply(lambda lst: [movieId_to_idx[m] for m in lst])
df_ratings['pos'] = df_ratings['pos'].apply(lambda lst: [movieId_to_idx[m] for m in lst])
df_ratings['neg'] = df_ratings['neg'].apply(lambda lst: [movieId_to_idx[m] for m in lst])

# df_movies musi być ograniczone tylko do używanych filmów
df_movies = df_movies[df_movies['movieId'].isin(movieId_to_idx)]
df_movies['movieId'] = df_movies['movieId'].map(movieId_to_idx)

# Final sanity check
assert df_users['movies_seq'].explode().max() < n_items
assert df_ratings['pos'].explode().max() < n_items
assert df_ratings['neg'].explode().max() < n_items
assert df_movies['movieId'].max() < n_items
assert df_movies['movieId'].notna().all(), "Some movieIds weren't mapped!"

In [18]:
max_movie_idx = df_users['movies_seq'].explode().max()
print("max_movie_idx =", max_movie_idx)
print("n_items =", n_items)

assert max_movie_idx < n_items, "Indeks filmu przekracza rozmiar embeddingu"

max_movie_idx = 82931
n_items = 82932


In [19]:
def has_invalid_entries(seq_col):
    return seq_col.explode().isin([-1, np.nan, None]).any()

print("Zawiera niepoprawne wartości:", has_invalid_entries(df_users['movies_seq']))

Zawiera niepoprawne wartości: False


In [None]:
df_movies.info()
df_movies.head(83000)

In [None]:
#FOR QUICK TEST's

DEBUG = False

if DEBUG:
    df_users = df_users.sample(n=1028, random_state=42).copy()
    df_ratings = df_ratings[df_ratings['userId'].isin(df_users['userId'])].copy()


# Przygotowanie danych do uczenia -> do gotowych batchy

In [None]:
# For padding 'global max len'

max_len_a = int(df_movies['actor_ids'].str.len().max())
max_len_d = int(df_movies['director_ids'].str.len().max())
max_len_g = int(df_movies['genre_ids'].str.len().max())

In [None]:
def collect_user_features(u):
        """
        Zwraca cztery tensory: movies_seq, ratings_seq, ts_seq, user_stats
        """
        movies_seq  = torch.tensor(u['movies_seq'], dtype=torch.long)
        ratings_seq = torch.tensor(u['ratings_seq'], dtype=torch.float32)
        ts_seq      = torch.tensor(u['ts_seq'], dtype=torch.float32)
       
        stats_cols  = [c for c in u.index if c.startswith(('num_rating','avg_rating','weekend_watcher','genre_','type_of_viewer_'))]
        user_stats  = torch.tensor(u[stats_cols]
                                        .astype('float32').values,dtype=torch.float32)

        return movies_seq, ratings_seq, ts_seq, user_stats

In [None]:
def collect_movie_features(m, max_len_a, max_len_d, max_len_g):
        """
        Zwraca cztery tensory: combined, actor_ids, director_ids, genre_ids
        """
        numeric = [
            m.runtime,
            m.engagement_score,
            m.cast_importance,
            m.director_score,
        ]
        binary = [
            m.if_blockbuster,
            m.highly_watched,
            m.highly_rated,
            m.has_keywords,
            m.has_cast,
            m.has_director,
        ]
        decades = (m[[c for c in m.index if c.startswith('decade_')]]
                   .astype(int)
                   .tolist())

        dense_feats = torch.tensor(numeric + binary + decades, dtype=torch.float32)
        text_emb = torch.tensor(m.text_embedded, dtype=torch.float32)

        def pad(seq, L):
            seq = seq[:L] + [0] * max(0, L - len(seq))
            return torch.tensor(seq, dtype=torch.long)

        actor_ids    = pad(m.actor_ids,    max_len_a)
        director_ids = pad(m.director_ids, max_len_d)
        genre_ids    = pad(m.genre_ids,    max_len_g)

        return dense_feats, text_emb, actor_ids, director_ids, genre_ids

In [None]:
import faiss
'''
Do zbudowania macierzy embeedingow dla FAISS, do szyukania najblizszych sasiadow
'''
unique_ids = df_movies['movieId'].tolist()
movie_vecs = []

for m_id in unique_ids:
    dense_feats, text_emb, *_ = collect_movie_features(
        df_movies.loc[m_id],
        max_len_a, max_len_d, max_len_g
    )
    combined = torch.cat([dense_feats, text_emb], dim=0)
    # normalizujemy L2 na potrzeby FAISS cosinusowego (wyplaszczanie)
    movie_vecs.append(F.normalize(combined, dim=0))

movie_matrix = torch.stack(movie_vecs)  # macierz [n_movies, D]
movie_matrix_np = movie_matrix.cpu().numpy().astype('float32')
# FAISS IP po L2-normalizacji = cosine similarity
faiss_index = faiss.IndexFlatIP(movie_matrix_np.shape[1])
faiss_index.add(movie_matrix_np)

In [None]:
# DO OCENY I EWENTUALNYCH ZMIAN
def find_negative(pos_id, user_negs, top_k=25):
    """
    Dla danego pozytywu (pos_id) szuka w FAISS najbliższego negatywu z listy user_negs. Jeśli żaden z top_k nie należy do user_negs to fallback = losowy wybór z user_negs.
    """
    # Zakladamy co najmniej jeden pos_id
    D, I = faiss_index.search(movie_matrix_np[pos_id].reshape(1, -1), top_k)

    for candidate in I[0]:
        if candidate in user_negs:
            return candidate

    return random.choice(list(user_negs))  # fallback

In [None]:
class TwoTowerDataset(Dataset):

    def __init__(self, df_users, df_ratings, df_movies):
        self.df_users = df_users.reset_index(drop=True)
        self.df_ratings = df_ratings.set_index('userId')
        self.df_movies = df_movies.set_index('movieId')

    def __len__(self):
        return len(self.df_users)

    def __getitem__(self, idx):
        # User features
        u_row = self.df_users.iloc[idx]
        movies_seq, ratings_seq, ts_seq, user_stats = collect_user_features(u_row)
        user_id = u_row['userId']

        pos_list = self.df_ratings.at[user_id, 'pos']
        neg_list = self.df_ratings.at[user_id, 'neg']

        #BPR
        pos_id = random.choice(pos_list)
        neg_id = find_negative(pos_id,set(neg_list))

        # Movie features
        pos_feats, pos_text, pos_actors, pos_directors, pos_genres = collect_movie_features(pos_id, max_len_a, max_len_d, max_len_g)
        neg_feats, neg_text, neg_actors, neg_directors, neg_genres = collect_movie_features(neg_id, max_len_a, max_len_d, max_len_g)

        return {
            'user': {
                'user_statistics': user_stats,
                'movies': movies_seq,
                'ratings': ratings_seq,
                'times': ts_seq,
            },
            'pos_item': {
                'dense_features': pos_feats,
                'text_embedding': pos_text,
                'actor_ids': pos_actors,
                'director_ids': pos_directors,
                'genre_ids': pos_genres,
            },
            'neg_item': {
                'dense_features': neg_feats,
                'text_embedding': neg_text,
                'actor_ids': neg_actors,
                'director_ids': neg_directors,
                'genre_ids': neg_genres,
            }
        }

In [None]:
dataset = TwoTowerDataset(df_users, df_ratings, df_movies)

sample0 = dataset[0]

print("Keys:", sample0.keys())
print("\n--- USER ---")
for k,v in sample0['user'].items():
    print(f" user[{k}]:", type(v), getattr(v, "shape", v[:5] if isinstance(v,list) else v))

print("\n--- POS ITEM ---")
for k,v in sample0['pos_item'].items():
    print(f" pos_item[{k}]:", type(v), v.shape if hasattr(v,'shape') else v[:5])

print("\n--- NEG ITEM ---")
for k,v in sample0['neg_item'].items():
    print(f" neg_item[{k}]:", type(v), v.shape if hasattr(v,'shape') else v[:5])

In [None]:
def collate_TT(batch):
    user_movies, user_ratings, user_times, user_stats = [], [], [], []
    pos_dense, pos_text, pos_actor, pos_director, pos_genre = [], [], [], [], []
    neg_dense, neg_text, neg_actor, neg_director, neg_genre = [], [], [], [], []

    for row in batch:

        user_stats.append(row['user']['user_statistics'])
        user_movies.append(row['user']['movies'])
        user_ratings.append(row['user']['ratings'])
        user_times.append(row['user']['times'])

        pos_dense.append(row['pos_item']['dense_features'])
        pos_text.append(row['pos_item']['text_embedding'])
        pos_actor.append(row['pos_item']['actor_ids'])
        pos_director.append(row['pos_item']['director_ids'])
        pos_genre.append(row['pos_item']['genre_ids'])

        neg_dense.append(row['neg_item']['dense_features'])
        neg_text.append(row['neg_item']['text_embedding'])
        neg_actor.append(row['neg_item']['actor_ids'])
        neg_director.append(row['neg_item']['director_ids'])
        neg_genre.append(row['neg_item']['genre_ids'])

    batch_user = {
        'user_statistics': torch.stack(user_stats),     # [B, d_stats]
        'movies': torch.stack(user_movies),             # [B, L_u]
        'ratings': torch.stack(user_ratings),           # [B, L_u]
        'times': torch.stack(user_times),               # [B, L_u]
    }

    batch_pos_item = {
        'dense_features': torch.stack(pos_dense),# [B, dense_feat_dim]
        'text_embedding': torch.stack(pos_text),         # [B, text_emb_dim]
        'actor_ids': torch.stack(pos_actor),     # [B, max_len_a]
        'director_ids':torch.stack(pos_director),# [B, max_len_d]
        'genre_ids': torch.stack(pos_genre),     # [B, max_len_g]
    }

    batch_neg_item = {
        'dense_features': torch.stack(neg_dense),
        'text_embedding': torch.stack(neg_text),
        'actor_ids': torch.stack(neg_actor),
        'director_ids': torch.stack(neg_director),
        'genre_ids': torch.stack(neg_genre),
    }

    return {
      'user': batch_user,
      'pos_item': batch_pos_item,
      'neg_item': batch_neg_item
    }

# Przygotowanie zbiorów do treningu

In [None]:
BATCH_SIZE = 4096
train_users, val_users = train_test_split(
    df_users,
    test_size=0.2,
    random_state=213
)

train_dataset = TwoTowerDataset(
    df_users   = train_users,
    df_ratings = df_ratings,
    df_movies  = df_movies
)
val_dataset = TwoTowerDataset(
    df_users   = val_users,
    df_ratings = df_ratings,
    df_movies  = df_movies
)

In [None]:
train_loader = DataLoader(
    dataset       = train_dataset,
    batch_size    = BATCH_SIZE,
    shuffle       = True,
    num_workers   = 4,
    pin_memory    = True,
    collate_fn    = collate_TT,
    drop_last     = False
)
val_loader = DataLoader(
    dataset       = val_dataset,
    batch_size    = BATCH_SIZE,
    shuffle       = False,
    num_workers   = 4,
    pin_memory    = True,
    collate_fn    = collate_TT,
    drop_last     = False
)

In [None]:
batch = next(iter(train_loader))
print("user[movies]",   batch['user']['movies'].shape)   # [BATCH_SIZE, L_u]
print("user[stats]",    batch['user']['stats'].shape)    # [BATCH_SIZE, d_stats]
print("pos_item[dense]",batch['pos_item']['dense'].shape)# [BATCH_SIZE, F]
print("pos_item[actor_ids]", batch['pos_item']['actor_ids'].shape)  # [BATCH_SIZE, max_len_a]

# ARCHITEKTURA TWO TOWER

In [None]:
EMB_DIM = 64

class UserTower(nn.Module):
    def __init__(self, input_dim, embedding_dim=EMB_DIM):
        '''
        input_dim - the number of columns in user features, without sequence columns
        '''
        super().__init__()

        self.item_emb = nn.Embedding(n_items, embedding_dim)

        # A layer to project rating and timestamp into a scalar weight
        self.rating_proj = nn.Linear(2, 1)

        self.mlp = nn.Sequential(
            nn.Linear(input_dim + embedding_dim, 512),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(512, 384),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(384, 256),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(256, embedding_dim)
        )

    def forward(self, batch):
        # Embed movieIds liked by user
        m = self.item_emb(batch['user']['movies'])

        # Get weights
        x = torch.stack([batch['user']['ratings'], batch['user']['times']], dim=-1) # [B, L_u, 2]
        w = torch.sigmoid(self.rating_proj(x))

        # weighted mean-pool
        pooled = (m * w).sum(1) / w.sum(1).clamp_min(1e-6)   # [B, D]

        input = torch.cat([batch['user']['stats'], pooled], dim=-1) # [B, stats+EMB_DIM]
        output = self.mlp(input)                                    # [B, EMB_DIM]
        u = F.normalize(output, dim = 1)
        return u


class ItemTower(nn.Module):
    def __init__(self,dense_feat_dim,text_emb_dim,vocab_sizes,embedding_dim=EMB_DIM):
        '''
        vocab_sizes - tuple odpowiednio n_actors, n_directors, n_genres
        dense_feat_dim – wymiary numeric+binary+decades+text
        tex_emb_dim - Wektor o wielkosc 300 opisujacy dane tekstowe filmu
        '''
        super().__init__()

        self.actor_emb = nn.Embedding(vocab_sizes[0], embedding_dim)
        self.director_emb = nn.Embedding(vocab_sizes[1], embedding_dim)
        self.genre_emb = nn.Embedding(vocab_sizes[2], embedding_dim)

        self.meta_mlp = nn.Sequential(
            nn.Linear(dense_feat_dim, 128),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(128, embedding_dim),
            nn.ReLU()
        )

        self.text_mlp = nn.Sequential( #--- to consider za ostre zejscie z 512 -> 64, moze posredni 256
            nn.Linear(text_emb_dim, 512),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(512, embedding_dim),
            nn.ReLU()
        )

        MLP_INPUT_DIM = embedding_dim*5 # odpowiednio nn.Embeedings * 3 oraz meta_mlp oraz text_mlp
        self.final_mlp = nn.Sequential(
            nn.Linear(MLP_INPUT_DIM, 512),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512,256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256,embedding_dim)
        )

    def forward(self, batch, key: str = "pos_item"):

        dense_feats = batch[key]['dense_feats']     # [B, dense_feat_dim]
        text_emb = batch[key]['text_embedding']     # [B, text_emb_dim]

        actor_ids = batch[key]['actor_ids']         # [B, max_len_a]
        director_ids = batch[key]['director_ids']
        genre_ids = batch[key]['genre_ids']

        dense_vec = self.meta_mlp(dense_feats)      # [B, D]
        text_vec = self.text_mlp(text_emb)          # [B, D]

        cast_imp = dense_feats[:, 2:3]              # [B, 1]
        director_score = dense_feats[:, 3:4]        # [B, 1]

        a = self.actor_emb   (actor_ids).mean(dim=1)    # [B, D]
        d = self.director_emb(director_ids).mean(dim=1) # [B, D]
        g = self.genre_emb   (genre_ids).mean(dim=1)    # [B, D]

        # We add weights based on importance score
        a = a * cast_imp
        d = d * director_score #--- do rozwazenia Max pooling lub Attention pooling

        input = torch.cat([a, d, g, dense_vec, text_vec], dim=-1)   # [B, 5D]
        output = self.final_mlp(input)                              # [B, D]
        i = F.normalize(output, dim=1)
        return i


In [None]:
class TwoTowerModel(nn.Module):
    def __init__(self, stats_dim, n_items, vocab_sizes,
                 dense_feat_dim, text_emb_dim, embedding_dim=64):
        super().__init__()
        self.user_tower = UserTower(stats_dim, n_items, embedding_dim)
        self.item_tower = ItemTower(dense_feat_dim, text_emb_dim, vocab_sizes, embedding_dim)

    def forward(self, batch):
        # user‐tower
        u = self.user_tower(
            batch['user']['user_statistics'],
            batch['user']['movies'],
            batch['user']['ratings'],
            batch['user']['times'],
        )
        # item‐tower pozytywne
        i_pos = self.item_tower(
            batch['pos_item']['dense_features'],
            batch['pos_item']['text_embedding'],
            batch['pos_item']['actor_ids'],
            batch['pos_item']['director_ids'],
            batch['pos_item']['genre_ids']
        )
        # item‐tower negatywne
        i_neg = self.item_tower(
            batch['neg_item']['dense_features'],
            batch['neg_item']['text_embedding'],
            batch['neg_item']['actor_ids'],
            batch['neg_item']['director_ids'],
            batch['neg_item']['genre_ids']
        )
        return u, i_pos, i_neg # każdy [B, 64]
