# 🎬 MovieLens Recommendation System - Google Colab Edition

This notebook contains all recommender methods with **separate cells for each experiment**.

## Features
- **BaselineRecommender**: Global mean baseline
- **SVDRecommender**: Truncated SVD with biases  
- **FunkSVDRecommender**: SGD-based matrix factorization (PyTorch GPU)
- **ALSRecommender**: Alternating Least Squares (PyTorch GPU)
- **HybridRecommender**: CF + Content features
- **CompetitionEnsemble**: Stacked ensemble with LightGBM

## Instructions
1. **Upload your data**: Upload `train.csv`, `movies.csv`, `tags.csv`, and `ratings_submission.csv`
2. **Run cells 1-10** to load all code and data
3. **Run cells 11-17** independently for each experiment
4. **Cell 18** shows final results summary

✅ **Resume-friendly**: If disconnected, just re-run the cell you were on!

## 1. Setup & Installation

In [None]:
# Install required packages
!pip install -q pandas numpy scikit-learn scipy torch lightgbm

# Check GPU availability
import torch
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"CUDA device: {torch.cuda.get_device_name(0)}")
    print(f"CUDA version: {torch.version.cuda}")

In [None]:
# Core imports
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from abc import ABC, abstractmethod
from pathlib import Path
from typing import List, Tuple, Dict, Optional, Any
from datetime import datetime
from collections import defaultdict
from scipy.sparse import csr_matrix
from scipy.sparse.linalg import svds
from sklearn.model_selection import KFold
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import TruncatedSVD as SklearnTruncatedSVD
import json
import warnings
warnings.filterwarnings('ignore')

np.random.seed(42)
torch.manual_seed(42)

# Device selection for GPU acceleration
def get_device() -> torch.device:
    if torch.cuda.is_available():
        return torch.device("cuda")
    elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
        return torch.device("mps")
    return torch.device("cpu")

DEVICE = get_device()
print(f"✓ Using device: {DEVICE}")

# Check LightGBM
try:
    import lightgbm as lgb
    LGBM_AVAILABLE = True
    print("✓ LightGBM available")
except ImportError:
    LGBM_AVAILABLE = False
    from sklearn.ensemble import GradientBoostingRegressor
    print("⚠ LightGBM not available, using sklearn GradientBoosting")

## 2. Data Upload

In [None]:
# Upload data files (Colab-specific)
try:
    from google.colab import files
    print("Upload your data files (train.csv, movies.csv, tags.csv, ratings_submission.csv)...")
    uploaded = files.upload()
    DATA_DIR = "."
    print(f"✓ Uploaded {len(uploaded)} files")
except ImportError:
    DATA_DIR = "recsys-runi-2026/"
    print(f"Running locally. Data directory: {DATA_DIR}")

## 3. Data Loader & Evaluation Functions

In [None]:
class DataLoader:
    """Loads and preprocesses competition data files."""
    
    def __init__(self, data_dir: str = "."):
        self.data_dir = Path(data_dir)
        self.train_df = None
        self.movies_df = None
        self.tags_df = None
        self.submission_template_df = None
        
    def load_train_data(self, explicit_only: bool = True) -> pd.DataFrame:
        print("Loading training data...")
        self.train_df = pd.read_csv(
            self.data_dir / "train.csv",
            dtype={'user_id': 'int32', 'movie_id': 'int32', 'rating': 'float32'}
        )
        print(f"Loaded {len(self.train_df):,} interactions")
        if explicit_only:
            self.train_df = self.train_df.dropna(subset=['rating'])
            print(f"Filtered to {len(self.train_df):,} explicit ratings")
        return self.train_df
    
    def load_movies_metadata(self) -> pd.DataFrame:
        print("Loading movies metadata...")
        self.movies_df = pd.read_csv(self.data_dir / "movies.csv", dtype={'movie_id': 'int32'})
        print(f"Loaded {len(self.movies_df):,} movies")
        return self.movies_df
    
    def load_tags(self) -> pd.DataFrame:
        print("Loading tags...")
        self.tags_df = pd.read_csv(self.data_dir / "tags.csv", dtype={'user_id': 'int32', 'movie_id': 'int32'})
        print(f"Loaded {len(self.tags_df):,} tags")
        return self.tags_df
    
    def load_submission_template(self) -> pd.DataFrame:
        print("Loading submission template...")
        self.submission_template_df = pd.read_csv(self.data_dir / "ratings_submission.csv")
        split_ids = self.submission_template_df['id'].str.split('_', expand=True)
        self.submission_template_df['user_id'] = split_ids[0].astype('int32')
        self.submission_template_df['movie_id'] = split_ids[1].astype('int32')
        print(f"Loaded {len(self.submission_template_df):,} submission pairs")
        return self.submission_template_df
    
    def get_genre_features(self) -> Dict[int, np.ndarray]:
        if self.movies_df is None:
            self.load_movies_metadata()
        all_genres = set()
        for genres_str in self.movies_df['genres'].dropna():
            all_genres.update(genres_str.split('|'))
        all_genres = sorted(list(all_genres))
        genre_features = {}
        for _, row in self.movies_df.iterrows():
            movie_id = row['movie_id']
            genres = row['genres'].split('|') if pd.notna(row['genres']) else []
            genre_vector = np.array([1 if g in genres else 0 for g in all_genres], dtype=np.float32)
            genre_features[movie_id] = genre_vector
        print(f"Extracted genre features with {len(all_genres)} unique genres")
        return genre_features

# Evaluation Functions
def compute_weighted_rmse(y_true, y_pred, movie_ids, train_df):
    movie_counts = train_df.groupby('movie_id').size().to_dict()
    weights = np.array([1.0 / np.sqrt(movie_counts.get(mid, 1)) for mid in movie_ids])
    squared_errors = (y_pred - y_true) ** 2
    weighted_squared_errors = weights * squared_errors
    return np.sqrt(weighted_squared_errors.sum() / weights.sum())

def compute_rmse(y_true, y_pred):
    return np.sqrt(np.mean((y_true - y_pred) ** 2))

def evaluate_recommender(recommender, test_df, train_df):
    pairs = list(zip(test_df['user_id'], test_df['movie_id']))
    y_pred = np.clip(recommender.predict_batch(pairs), 0.5, 5.0)
    y_true = test_df['rating'].values
    movie_ids = test_df['movie_id'].values
    return {
        'weighted_rmse': compute_weighted_rmse(y_true, y_pred, movie_ids, train_df),
        'rmse': compute_rmse(y_true, y_pred)
    }

def train_val_test_split(df, val_size=0.15, test_size=0.15, random_state=42):
    np.random.seed(random_state)
    n = len(df)
    indices = np.random.permutation(n)
    test_end = int(n * test_size)
    val_end = test_end + int(n * val_size)
    test_df = df.iloc[indices[:test_end]].copy()
    val_df = df.iloc[indices[test_end:val_end]].copy()
    train_df = df.iloc[indices[val_end:]].copy()
    print(f"Split: Train={len(train_df):,}, Val={len(val_df):,}, Test={len(test_df):,}")
    return train_df, val_df, test_df

print("✓ Data utilities defined")

## 4. Base Recommender Classes

In [None]:
class BaseRecommender(ABC):
    """Abstract base class for all recommendation algorithms."""
    
    def __init__(self, **kwargs):
        self.is_fitted = False
        self.kwargs = kwargs
    
    @abstractmethod
    def fit(self, train_df: pd.DataFrame) -> 'BaseRecommender':
        pass
    
    @abstractmethod
    def predict(self, user_id: int, movie_id: int) -> float:
        pass
    
    def predict_batch(self, pairs: List[Tuple[int, int]]) -> np.ndarray:
        return np.array([self.predict(u, m) for u, m in pairs])
    
    def predict_dataframe(self, df: pd.DataFrame) -> np.ndarray:
        pairs = list(zip(df['user_id'], df['movie_id']))
        return self.predict_batch(pairs)

class BaselineRecommender(BaseRecommender):
    """Simple baseline recommender that predicts global mean rating."""
    
    def __init__(self):
        super().__init__()
        self.global_mean = None
    
    def fit(self, train_df: pd.DataFrame) -> 'BaselineRecommender':
        self.global_mean = train_df['rating'].mean()
        self.is_fitted = True
        print(f"Baseline fitted with global mean = {self.global_mean:.4f}")
        return self
    
    def predict(self, user_id: int, movie_id: int) -> float:
        return self.global_mean
    
    def predict_batch(self, pairs: List[Tuple[int, int]]) -> np.ndarray:
        return np.full(len(pairs), self.global_mean, dtype=np.float32)

print("✓ Base recommender classes defined")

## 5. SVD Recommender

In [None]:
class SVDRecommender(BaseRecommender):
    """Matrix Factorization using SVD with user and item biases."""
    
    def __init__(self, n_factors: int = 50, random_state: int = 42):
        super().__init__()
        self.n_factors = n_factors
        self.random_state = random_state
        self.global_mean = None
        self.user_biases = {}
        self.item_biases = {}
        self.user_factors = None
        self.item_factors = None
        self.singular_values = None
        self.user_id_to_idx = {}
        self.item_id_to_idx = {}
    
    def fit(self, train_df: pd.DataFrame) -> 'SVDRecommender':
        print(f"Training SVD with {self.n_factors} factors...")
        self.global_mean = train_df['rating'].mean()
        
        unique_users = train_df['user_id'].unique()
        unique_items = train_df['movie_id'].unique()
        self.user_id_to_idx = {uid: idx for idx, uid in enumerate(unique_users)}
        self.item_id_to_idx = {iid: idx for idx, iid in enumerate(unique_items)}
        n_users, n_items = len(unique_users), len(unique_items)
        print(f"Users: {n_users:,}, Items: {n_items:,}")
        
        user_means = train_df.groupby('user_id')['rating'].mean()
        self.user_biases = (user_means - self.global_mean).to_dict()
        item_means = train_df.groupby('movie_id')['rating'].mean()
        self.item_biases = (item_means - self.global_mean).to_dict()
        
        user_indices = train_df['user_id'].map(self.user_id_to_idx).values
        item_indices = train_df['movie_id'].map(self.item_id_to_idx).values
        ratings = train_df['rating'].values.copy()
        
        for i, (uid, iid) in enumerate(zip(train_df['user_id'], train_df['movie_id'])):
            ratings[i] -= self.global_mean + self.user_biases.get(uid, 0) + self.item_biases.get(iid, 0)
        
        rating_matrix = csr_matrix((ratings, (user_indices, item_indices)), shape=(n_users, n_items))
        k = min(self.n_factors, min(n_users, n_items) - 1)
        U, s, Vt = svds(rating_matrix, k=k, random_state=self.random_state)
        
        self.user_factors = U
        self.singular_values = s
        self.item_factors = Vt
        self.is_fitted = True
        print(f"✓ SVD training complete!")
        return self
    
    def predict(self, user_id: int, movie_id: int) -> float:
        prediction = self.global_mean + self.user_biases.get(user_id, 0) + self.item_biases.get(movie_id, 0)
        user_idx = self.user_id_to_idx.get(user_id)
        item_idx = self.item_id_to_idx.get(movie_id)
        if user_idx is not None and item_idx is not None:
            prediction += np.dot(self.user_factors[user_idx] * self.singular_values, self.item_factors[:, item_idx])
        return prediction
    
    def predict_batch(self, pairs) -> np.ndarray:
        predictions = np.full(len(pairs), self.global_mean, dtype=np.float32)
        for i, (user_id, movie_id) in enumerate(pairs):
            predictions[i] += self.user_biases.get(user_id, 0) + self.item_biases.get(movie_id, 0)
            user_idx = self.user_id_to_idx.get(user_id)
            item_idx = self.item_id_to_idx.get(movie_id)
            if user_idx is not None and item_idx is not None:
                predictions[i] += np.dot(self.user_factors[user_idx] * self.singular_values, self.item_factors[:, item_idx])
        return predictions

print("✓ SVD recommender defined")

## 6. FunkSVD Recommender (GPU Accelerated)

In [None]:
class FunkSVDModel(nn.Module):
    """PyTorch module for FunkSVD matrix factorization."""
    
    def __init__(self, n_users: int, n_items: int, n_factors: int, global_mean: float):
        super().__init__()
        self.global_mean = global_mean
        
        # Learnable parameters
        self.user_biases = nn.Embedding(n_users, 1)
        self.item_biases = nn.Embedding(n_items, 1)
        self.user_factors = nn.Embedding(n_users, n_factors)
        self.item_factors = nn.Embedding(n_items, n_factors)
        
        # Initialize with small random values
        scale = 1.0 / np.sqrt(n_factors)
        nn.init.zeros_(self.user_biases.weight)
        nn.init.zeros_(self.item_biases.weight)
        nn.init.normal_(self.user_factors.weight, 0, scale)
        nn.init.normal_(self.item_factors.weight, 0, scale)
    
    def forward(self, user_indices: torch.Tensor, item_indices: torch.Tensor) -> torch.Tensor:
        """Compute predictions for user-item pairs."""
        user_bias = self.user_biases(user_indices).squeeze(-1)
        item_bias = self.item_biases(item_indices).squeeze(-1)
        user_vec = self.user_factors(user_indices)
        item_vec = self.item_factors(item_indices)
        
        # Dot product of latent factors
        interaction = torch.sum(user_vec * item_vec, dim=1)
        
        return self.global_mean + user_bias + item_bias + interaction
    
    def l2_regularization(self) -> torch.Tensor:
        """Compute L2 regularization term."""
        return (
            torch.sum(self.user_biases.weight ** 2) +
            torch.sum(self.item_biases.weight ** 2) +
            torch.sum(self.user_factors.weight ** 2) +
            torch.sum(self.item_factors.weight ** 2)
        )


class FunkSVDRecommender(BaseRecommender):
    """
    Matrix Factorization using Mini-batch Gradient Descent with GPU acceleration.
    
    Jointly optimizes:
    r_ui = mu + b_u + b_i + p_u^T * q_i
    
    where:
    - mu: global mean rating
    - b_u: user bias (learned)
    - b_i: item bias (learned)
    - p_u: user latent factor vector
    - q_i: item latent factor vector
    
    Uses PyTorch MPS backend for GPU acceleration on Apple Silicon.
    Mini-batch training for efficient GPU utilization.
    """
    
    def __init__(
        self,
        n_factors: int = 50,
        lr: float = 0.005,
        reg: float = 0.02,
        n_epochs: int = 30,
        batch_size: int = 1024,
        early_stop_patience: int = 5,
        lr_decay: float = 0.95,
        min_lr: float = 0.0001,
        val_fraction: float = 0.1,
        random_state: int = 42,
        verbose: bool = True,
        use_gpu: bool = True
    ):
        """
        Initialize FunkSVD recommender.
        
        Args:
            n_factors: Number of latent factors
            lr: Initial learning rate
            reg: L2 regularization strength
            n_epochs: Maximum number of training epochs
            batch_size: Mini-batch size for GPU training
            early_stop_patience: Stop if no improvement for this many epochs
            lr_decay: Multiply learning rate by this factor each epoch
            min_lr: Minimum learning rate (stop decaying below this)
            val_fraction: Fraction of training data to use for validation
            random_state: Random seed for reproducibility
            verbose: Print training progress
            use_gpu: Whether to use GPU acceleration (MPS on Apple Silicon)
        """
        super().__init__()
        self.n_factors = n_factors
        self.lr = lr
        self.reg = reg
        self.n_epochs = n_epochs
        self.batch_size = batch_size
        self.early_stop_patience = early_stop_patience
        self.lr_decay = lr_decay
        self.min_lr = min_lr
        self.val_fraction = val_fraction
        self.random_state = random_state
        self.verbose = verbose
        self.use_gpu = use_gpu
        
        # Set device
        if use_gpu:
            self.device = get_device()
        else:
            self.device = torch.device("cpu")
        
        # Model (initialized in fit)
        self.model = None
        self.global_mean = None
        
        # Numpy copies for prediction (CPU)
        self.user_biases = None
        self.item_biases = None
        self.user_factors = None
        self.item_factors = None
        
        # Mappings
        self.user_id_to_idx = {}
        self.item_id_to_idx = {}
        
        # Training history
        self.train_losses = []
        self.val_losses = []
    
    def _compute_rmse(self, model: nn.Module, user_indices: torch.Tensor, 
                      item_indices: torch.Tensor, ratings: torch.Tensor) -> float:
        """Compute RMSE on given data."""
        model.eval()
        with torch.no_grad():
            predictions = model(user_indices, item_indices)
            mse = torch.mean((predictions - ratings) ** 2)
            return torch.sqrt(mse).item()
    
    def fit(self, train_df: pd.DataFrame) -> 'FunkSVDRecommender':
        """
        Train FunkSVD model using mini-batch SGD with GPU acceleration.
        
        Args:
            train_df: DataFrame with columns [user_id, movie_id, rating]
        
        Returns:
            Self (for method chaining)
        """
        if self.verbose:
            print(f"Training FunkSVD with {self.n_factors} factors on {self.device}...")
            print(f"Hyperparameters: lr={self.lr}, reg={self.reg}, epochs={self.n_epochs}, batch_size={self.batch_size}")
        
        torch.manual_seed(self.random_state)
        np.random.seed(self.random_state)
        
        # Compute global mean
        self.global_mean = float(train_df['rating'].mean())
        
        # Create mappings
        unique_users = train_df['user_id'].unique()
        unique_items = train_df['movie_id'].unique()
        
        self.user_id_to_idx = {uid: idx for idx, uid in enumerate(unique_users)}
        self.item_id_to_idx = {iid: idx for idx, iid in enumerate(unique_items)}
        
        n_users = len(unique_users)
        n_items = len(unique_items)
        
        if self.verbose:
            print(f"Users: {n_users:,}, Items: {n_items:,}")
        
        # Convert to numpy/torch
        user_indices = torch.from_numpy(
            train_df['user_id'].map(self.user_id_to_idx).values.astype(np.int64)
        )
        item_indices = torch.from_numpy(
            train_df['movie_id'].map(self.item_id_to_idx).values.astype(np.int64)
        )
        ratings = torch.from_numpy(train_df['rating'].values.astype(np.float32))
        
        # Split into train and validation
        n_samples = len(ratings)
        indices = torch.randperm(n_samples)
        
        use_validation = self.val_fraction > 0
        
        if use_validation:
            val_size = int(n_samples * self.val_fraction)
            val_idx = indices[:val_size]
            train_idx = indices[val_size:]
            
            train_users = user_indices[train_idx].to(self.device)
            train_items = item_indices[train_idx].to(self.device)
            train_ratings = ratings[train_idx].to(self.device)
            
            val_users = user_indices[val_idx].to(self.device)
            val_items = item_indices[val_idx].to(self.device)
            val_ratings = ratings[val_idx].to(self.device)
            
            if self.verbose:
                print(f"Training samples: {len(train_idx):,}, Validation: {len(val_idx):,}")
        else:
            train_users = user_indices.to(self.device)
            train_items = item_indices.to(self.device)
            train_ratings = ratings.to(self.device)
            val_users = val_items = val_ratings = None
            
            if self.verbose:
                print(f"Training samples: {n_samples:,} (no validation split)")
        
        # Create model
        self.model = FunkSVDModel(n_users, n_items, self.n_factors, self.global_mean).to(self.device)
        
        # Optimizer with L2 regularization via weight_decay
        optimizer = torch.optim.Adam(self.model.parameters(), lr=self.lr, weight_decay=self.reg)
        scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=self.lr_decay)
        
        # Training loop
        best_val_loss = float('inf')
        patience_counter = 0
        best_state = None
        
        n_train = len(train_users)
        n_batches = (n_train + self.batch_size - 1) // self.batch_size
        
        for epoch in range(self.n_epochs):
            self.model.train()
            
            # Shuffle training data
            perm = torch.randperm(n_train, device=self.device)
            train_users_shuf = train_users[perm]
            train_items_shuf = train_items[perm]
            train_ratings_shuf = train_ratings[perm]
            
            epoch_loss = 0.0
            
            for batch_idx in range(n_batches):
                start = batch_idx * self.batch_size
                end = min(start + self.batch_size, n_train)
                
                batch_users = train_users_shuf[start:end]
                batch_items = train_items_shuf[start:end]
                batch_ratings = train_ratings_shuf[start:end]
                
                # Forward pass
                predictions = self.model(batch_users, batch_items)
                loss = torch.mean((predictions - batch_ratings) ** 2)
                
                # Backward pass
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
                
                epoch_loss += loss.item() * (end - start)
            
            # Compute epoch metrics
            train_rmse = np.sqrt(epoch_loss / n_train)
            self.train_losses.append(train_rmse)
            
            if use_validation:
                val_rmse = self._compute_rmse(self.model, val_users, val_items, val_ratings)
                self.val_losses.append(val_rmse)
                
                current_lr = optimizer.param_groups[0]['lr']
                if self.verbose:
                    print(f"Epoch {epoch+1}/{self.n_epochs}: Train RMSE={train_rmse:.4f}, Val RMSE={val_rmse:.4f}, LR={current_lr:.6f}")
                
                # Early stopping
                if val_rmse < best_val_loss:
                    best_val_loss = val_rmse
                    patience_counter = 0
                    best_state = {k: v.clone() for k, v in self.model.state_dict().items()}
                else:
                    patience_counter += 1
                    if patience_counter >= self.early_stop_patience:
                        if self.verbose:
                            print(f"Early stopping at epoch {epoch+1}")
                        break
            else:
                current_lr = optimizer.param_groups[0]['lr']
                if self.verbose:
                    print(f"Epoch {epoch+1}/{self.n_epochs}: Train RMSE={train_rmse:.4f}, LR={current_lr:.6f}")
            
            # Learning rate decay
            if current_lr > self.min_lr:
                scheduler.step()
        
        # Restore best model
        if best_state is not None:
            self.model.load_state_dict(best_state)
        
        # Copy parameters to numpy for CPU prediction
        self.model.eval()
        with torch.no_grad():
            self.user_biases = self.model.user_biases.weight.cpu().numpy().flatten()
            self.item_biases = self.model.item_biases.weight.cpu().numpy().flatten()
            self.user_factors = self.model.user_factors.weight.cpu().numpy()
            self.item_factors = self.model.item_factors.weight.cpu().numpy()
        
        self.is_fitted = True
        
        if self.verbose:
            print(f"✓ FunkSVD training complete on {self.device}!")
            if use_validation:
                print(f"Best validation RMSE: {best_val_loss:.4f}")
            print(f"Global mean: {self.global_mean:.4f}")
        
        return self
    
    def predict(self, user_id: int, movie_id: int) -> float:
        """
        Predict rating for a user-movie pair.
        
        Args:
            user_id: User identifier
            movie_id: Movie identifier
        
        Returns:
            Predicted rating
        """
        if not self.is_fitted:
            raise RuntimeError("Model must be fitted before prediction")
        
        prediction = self.global_mean
        
        user_idx = self.user_id_to_idx.get(user_id)
        item_idx = self.item_id_to_idx.get(movie_id)
        
        if user_idx is not None:
            prediction += self.user_biases[user_idx]
        
        if item_idx is not None:
            prediction += self.item_biases[item_idx]
        
        if user_idx is not None and item_idx is not None:
            prediction += np.dot(self.user_factors[user_idx], self.item_factors[item_idx])
        
        return prediction
    
    def predict_batch(self, pairs: List[Tuple[int, int]]) -> np.ndarray:
        """
        Efficient batch prediction.
        
        Args:
            pairs: List of (user_id, movie_id) tuples
        
        Returns:
            Array of predicted ratings
        """
        if not self.is_fitted:
            raise RuntimeError("Model must be fitted before prediction")
        
        predictions = np.full(len(pairs), self.global_mean, dtype=np.float32)
        
        for i, (user_id, movie_id) in enumerate(pairs):
            user_idx = self.user_id_to_idx.get(user_id)
            item_idx = self.item_id_to_idx.get(movie_id)
            
            if user_idx is not None:
                predictions[i] += self.user_biases[user_idx]
            
            if item_idx is not None:
                predictions[i] += self.item_biases[item_idx]
            
            if user_idx is not None and item_idx is not None:
                predictions[i] += np.dot(self.user_factors[user_idx], self.item_factors[item_idx])
        
        return predictions

print('✓ FunkSVD recommender defined')

## 7. ALS Recommender (GPU Accelerated)

In [None]:
class ALSRecommender(BaseRecommender):
    """
    Matrix Factorization using Alternating Least Squares with GPU acceleration.
    
    Prediction formula:
    r_ui = mu + b_u + b_i + p_u^T * q_i
    
    Algorithm:
    1. Fix Q (item factors), solve for P (user factors) in closed form
    2. Fix P (user factors), solve for Q (item factors) in closed form
    3. Repeat until convergence
    
    The closed-form solution with regularization:
    p_u = (Q^T * Q + lambda * I)^-1 * Q^T * r_u
    
    Uses PyTorch MPS backend for GPU acceleration on Apple Silicon.
    """
    
    def __init__(
        self,
        n_factors: int = 50,
        reg: float = 0.1,
        n_iterations: int = 15,
        random_state: int = 42,
        verbose: bool = True,
        use_gpu: bool = True
    ):
        """
        Initialize ALS recommender.
        
        Args:
            n_factors: Number of latent factors
            reg: L2 regularization strength (higher = more regularization)
            n_iterations: Number of alternating iterations
            random_state: Random seed for reproducibility
            verbose: Print training progress
            use_gpu: Whether to use GPU acceleration (MPS on Apple Silicon)
        """
        super().__init__()
        self.n_factors = n_factors
        self.reg = reg
        self.n_iterations = n_iterations
        self.random_state = random_state
        self.verbose = verbose
        self.use_gpu = use_gpu
        
        # Set device
        if use_gpu:
            self.device = get_device()
        else:
            self.device = torch.device("cpu")
        
        # Model parameters (stored as numpy for compatibility)
        self.global_mean = None
        self.user_biases = None
        self.item_biases = None
        self.user_factors = None
        self.item_factors = None
        
        # Mappings
        self.user_id_to_idx = {}
        self.item_id_to_idx = {}
        
        # Training history
        self.losses = []
    
    def _compute_rmse_gpu(self, user_indices: torch.Tensor, item_indices: torch.Tensor, 
                          ratings: torch.Tensor, user_factors: torch.Tensor, 
                          item_factors: torch.Tensor, user_biases: torch.Tensor,
                          item_biases: torch.Tensor) -> float:
        """Compute RMSE on GPU."""
        with torch.no_grad():
            predictions = (
                self.global_mean +
                user_biases[user_indices] +
                item_biases[item_indices] +
                torch.sum(user_factors[user_indices] * item_factors[item_indices], dim=1)
            )
            mse = torch.mean((ratings - predictions) ** 2)
            return torch.sqrt(mse).item()
    
    def fit(self, train_df: pd.DataFrame) -> 'ALSRecommender':
        """
        Train ALS model with GPU acceleration.
        
        Args:
            train_df: DataFrame with columns [user_id, movie_id, rating]
        
        Returns:
            Self (for method chaining)
        """
        if self.verbose:
            print(f"Training ALS with {self.n_factors} factors on {self.device}...")
            print(f"Hyperparameters: reg={self.reg}, iterations={self.n_iterations}")
        
        torch.manual_seed(self.random_state)
        np.random.seed(self.random_state)
        
        # Compute global mean
        self.global_mean = float(train_df['rating'].mean())
        
        # Create mappings
        unique_users = train_df['user_id'].unique()
        unique_items = train_df['movie_id'].unique()
        
        self.user_id_to_idx = {uid: idx for idx, uid in enumerate(unique_users)}
        self.item_id_to_idx = {iid: idx for idx, iid in enumerate(unique_items)}
        
        n_users = len(unique_users)
        n_items = len(unique_items)
        
        if self.verbose:
            print(f"Users: {n_users:,}, Items: {n_items:,}")
        
        # Compute biases
        user_means = train_df.groupby('user_id')['rating'].mean()
        user_biases_np = np.zeros(n_users, dtype=np.float32)
        for uid, mean in user_means.items():
            user_biases_np[self.user_id_to_idx[uid]] = mean - self.global_mean
        
        item_means = train_df.groupby('movie_id')['rating'].mean()
        item_biases_np = np.zeros(n_items, dtype=np.float32)
        for iid, mean in item_means.items():
            item_biases_np[self.item_id_to_idx[iid]] = mean - self.global_mean
        
        # Create index arrays
        user_indices_np = train_df['user_id'].map(self.user_id_to_idx).values.astype(np.int64)
        item_indices_np = train_df['movie_id'].map(self.item_id_to_idx).values.astype(np.int64)
        ratings_np = train_df['rating'].values.astype(np.float32)
        
        # Move to GPU
        user_indices = torch.from_numpy(user_indices_np).to(self.device)
        item_indices = torch.from_numpy(item_indices_np).to(self.device)
        ratings = torch.from_numpy(ratings_np).to(self.device)
        user_biases = torch.from_numpy(user_biases_np).to(self.device)
        item_biases = torch.from_numpy(item_biases_np).to(self.device)
        
        # Normalized ratings (subtract global mean and biases)
        ratings_normalized = (
            ratings - 
            self.global_mean - 
            user_biases[user_indices] - 
            item_biases[item_indices]
        )
        
        # Build user->items and item->users mappings for efficient access
        user_to_items = [[] for _ in range(n_users)]
        user_to_ratings = [[] for _ in range(n_users)]
        item_to_users = [[] for _ in range(n_items)]
        item_to_ratings = [[] for _ in range(n_items)]
        
        for idx in range(len(train_df)):
            u = user_indices_np[idx]
            i = item_indices_np[idx]
            r = float(ratings_normalized[idx].cpu())
            user_to_items[u].append(i)
            user_to_ratings[u].append(r)
            item_to_users[i].append(u)
            item_to_ratings[i].append(r)
        
        if self.verbose:
            sparsity = 1 - len(train_df) / (n_users * n_items)
            print(f"Matrix sparsity: {sparsity:.4f}")
        
        # Initialize factors randomly on GPU
        scale = 1.0 / np.sqrt(self.n_factors)
        user_factors = torch.randn(n_users, self.n_factors, device=self.device, dtype=torch.float32) * scale
        item_factors = torch.randn(n_items, self.n_factors, device=self.device, dtype=torch.float32) * scale
        
        # Regularization matrix
        reg_matrix = self.reg * torch.eye(self.n_factors, device=self.device, dtype=torch.float32)
        
        # Alternating optimization
        for iteration in range(self.n_iterations):
            # Step 1: Fix item factors, update user factors
            for u in range(n_users):
                item_idxs = user_to_items[u]
                if len(item_idxs) == 0:
                    continue
                
                item_idxs_t = torch.tensor(item_idxs, dtype=torch.long, device=self.device)
                user_ratings_t = torch.tensor(user_to_ratings[u], dtype=torch.float32, device=self.device)
                
                Q_u = item_factors[item_idxs_t]  # (n_rated, n_factors)
                A = Q_u.T @ Q_u + reg_matrix     # (n_factors, n_factors)
                b = Q_u.T @ user_ratings_t       # (n_factors,)
                
                user_factors[u] = torch.linalg.solve(A, b)
            
            # Step 2: Fix user factors, update item factors
            for i in range(n_items):
                user_idxs = item_to_users[i]
                if len(user_idxs) == 0:
                    continue
                
                user_idxs_t = torch.tensor(user_idxs, dtype=torch.long, device=self.device)
                item_ratings_t = torch.tensor(item_to_ratings[i], dtype=torch.float32, device=self.device)
                
                P_i = user_factors[user_idxs_t]  # (n_rated, n_factors)
                A = P_i.T @ P_i + reg_matrix     # (n_factors, n_factors)
                b = P_i.T @ item_ratings_t       # (n_factors,)
                
                item_factors[i] = torch.linalg.solve(A, b)
            
            # Compute loss
            rmse = self._compute_rmse_gpu(user_indices, item_indices, ratings,
                                          user_factors, item_factors, user_biases, item_biases)
            self.losses.append(rmse)
            
            if self.verbose:
                print(f"Iteration {iteration+1}/{self.n_iterations}: RMSE={rmse:.4f}")
        
        # Move final parameters back to CPU/numpy for prediction compatibility
        self.user_factors = user_factors.cpu().numpy()
        self.item_factors = item_factors.cpu().numpy()
        self.user_biases = user_biases.cpu().numpy()
        self.item_biases = item_biases.cpu().numpy()
        
        self.is_fitted = True
        
        if self.verbose:
            print(f"✓ ALS training complete on {self.device}!")
            print(f"Final RMSE: {self.losses[-1]:.4f}")
            print(f"Global mean: {self.global_mean:.4f}")
        
        return self
    
    def predict(self, user_id: int, movie_id: int) -> float:
        """
        Predict rating for a user-movie pair.
        
        Args:
            user_id: User identifier
            movie_id: Movie identifier
        
        Returns:
            Predicted rating
        """
        if not self.is_fitted:
            raise RuntimeError("Model must be fitted before prediction")
        
        # Start with global mean
        prediction = self.global_mean
        
        # Add biases
        user_idx = self.user_id_to_idx.get(user_id)
        item_idx = self.item_id_to_idx.get(movie_id)
        
        if user_idx is not None:
            prediction += self.user_biases[user_idx]
        
        if item_idx is not None:
            prediction += self.item_biases[item_idx]
        
        # Add latent factor interaction
        if user_idx is not None and item_idx is not None:
            prediction += np.dot(self.user_factors[user_idx], self.item_factors[item_idx])
        
        return prediction
    
    def predict_batch(self, pairs: List[Tuple[int, int]]) -> np.ndarray:
        """
        Efficient batch prediction.
        
        Args:
            pairs: List of (user_id, movie_id) tuples
        
        Returns:
            Array of predicted ratings
        """
        if not self.is_fitted:
            raise RuntimeError("Model must be fitted before prediction")
        
        predictions = np.full(len(pairs), self.global_mean, dtype=np.float32)
        
        for i, (user_id, movie_id) in enumerate(pairs):
            user_idx = self.user_id_to_idx.get(user_id)
            item_idx = self.item_id_to_idx.get(movie_id)
            
            if user_idx is not None:
                predictions[i] += self.user_biases[user_idx]
            
            if item_idx is not None:
                predictions[i] += self.item_biases[item_idx]
            
            if user_idx is not None and item_idx is not None:
                predictions[i] += np.dot(self.user_factors[user_idx], self.item_factors[item_idx])
        
        return predictions

print('✓ ALS recommender defined')

## 8. Hybrid Recommender (CF + Content)

In [None]:
class HybridRecommender(BaseRecommender):
    """
    Hybrid recommender combining collaborative filtering with content features.
    
    Prediction formula:
    r_ui = mu + b_u + b_i + p_u^T * q_i + w^T * genre_features_i
    
    where:
    - mu: global mean rating
    - b_u: user bias
    - b_i: item bias
    - p_u, q_i: latent factors (collaborative)
    - w: genre weight vector
    - genre_features_i: binary genre vector for item i
    
    Benefits:
    - Better cold-start handling (can use genres for new items)
    - Content features act as additional regularization
    - Combines strengths of both approaches
    """
    
    def __init__(
        self,
        n_factors: int = 50,
        lr: float = 0.005,
        reg: float = 0.02,
        content_reg: float = 0.01,
        n_epochs: int = 30,
        early_stop_patience: int = 5,
        lr_decay: float = 0.95,
        min_lr: float = 0.0001,
        val_fraction: float = 0.1,
        genre_features: Optional[Dict[int, np.ndarray]] = None,
        random_state: int = 42,
        verbose: bool = True
    ):
        """
        Initialize Hybrid recommender.
        
        Args:
            n_factors: Number of latent factors
            lr: Initial learning rate
            reg: L2 regularization for latent factors and biases
            content_reg: L2 regularization for genre weights
            n_epochs: Maximum number of training epochs
            early_stop_patience: Stop if no improvement for this many epochs
            lr_decay: Multiply learning rate by this factor each epoch
            min_lr: Minimum learning rate
            val_fraction: Fraction of training data for validation
            genre_features: Dict mapping movie_id to binary genre vector
            random_state: Random seed
            verbose: Print training progress
        """
        super().__init__()
        self.n_factors = n_factors
        self.lr = lr
        self.reg = reg
        self.content_reg = content_reg
        self.n_epochs = n_epochs
        self.early_stop_patience = early_stop_patience
        self.lr_decay = lr_decay
        self.min_lr = min_lr
        self.val_fraction = val_fraction
        self.genre_features = genre_features
        self.random_state = random_state
        self.verbose = verbose
        
        # Model parameters
        self.global_mean = None
        self.user_biases = None
        self.item_biases = None
        self.user_factors = None
        self.item_factors = None
        self.genre_weights = None
        
        # Genre feature info
        self.n_genres = None
        self.internal_genre_features = None  # Numpy array for fast access
        
        # Mappings
        self.user_id_to_idx = {}
        self.item_id_to_idx = {}
        
        # Training history
        self.train_losses = []
        self.val_losses = []
    
    def _extract_genres_from_df(self, train_df: pd.DataFrame) -> Dict[int, np.ndarray]:
        """Extract genre features from training data if not provided."""
        # Get unique movies and their genres (would need movies_df for real implementation)
        # This is a fallback - ideally genre_features is provided
        unique_items = train_df['movie_id'].unique()
        # Return empty features (no genres)
        return {iid: np.array([], dtype=np.float32) for iid in unique_items}
    
    def _init_parameters(self, n_users: int, n_items: int):
        """Initialize model parameters."""
        np.random.seed(self.random_state)
        
        # Biases
        self.user_biases = np.zeros(n_users, dtype=np.float32)
        self.item_biases = np.zeros(n_items, dtype=np.float32)
        
        # Latent factors
        scale = 1.0 / np.sqrt(self.n_factors)
        self.user_factors = np.random.normal(0, scale, (n_users, self.n_factors)).astype(np.float32)
        self.item_factors = np.random.normal(0, scale, (n_items, self.n_factors)).astype(np.float32)
        
        # Genre weights
        if self.n_genres > 0:
            self.genre_weights = np.zeros(self.n_genres, dtype=np.float32)
    
    def _get_genre_contribution(self, item_idx: int) -> float:
        """Get genre-based contribution to prediction."""
        if self.n_genres == 0 or self.genre_weights is None:
            return 0.0
        
        genre_vec = self.internal_genre_features[item_idx]
        return np.dot(self.genre_weights, genre_vec)
    
    def _compute_loss(self, user_indices: np.ndarray, item_indices: np.ndarray,
                      ratings: np.ndarray) -> float:
        """Compute RMSE loss."""
        predictions = (
            self.global_mean +
            self.user_biases[user_indices] +
            self.item_biases[item_indices] +
            np.sum(self.user_factors[user_indices] * self.item_factors[item_indices], axis=1)
        )
        
        # Add genre contribution
        if self.n_genres > 0 and self.genre_weights is not None:
            genre_contrib = np.sum(
                self.internal_genre_features[item_indices] * self.genre_weights,
                axis=1
            )
            predictions += genre_contrib
        
        mse = np.mean((predictions - ratings) ** 2)
        return np.sqrt(mse)
    
    def fit(self, train_df: pd.DataFrame) -> 'HybridRecommender':
        """
        Train hybrid model using SGD.
        
        Args:
            train_df: DataFrame with columns [user_id, movie_id, rating]
        
        Returns:
            Self (for method chaining)
        """
        if self.verbose:
            print(f"Training Hybrid (CF + Content) with {self.n_factors} factors...")
            print(f"Hyperparameters: lr={self.lr}, reg={self.reg}, content_reg={self.content_reg}")
        
        # Compute global mean
        self.global_mean = train_df['rating'].mean()
        
        # Create mappings
        unique_users = train_df['user_id'].unique()
        unique_items = train_df['movie_id'].unique()
        
        self.user_id_to_idx = {uid: idx for idx, uid in enumerate(unique_users)}
        self.item_id_to_idx = {iid: idx for idx, iid in enumerate(unique_items)}
        
        n_users = len(unique_users)
        n_items = len(unique_items)
        
        if self.verbose:
            print(f"Users: {n_users:,}, Items: {n_items:,}")
        
        # Setup genre features
        if self.genre_features is None:
            if self.verbose:
                print("No genre features provided, running without content features")
            self.n_genres = 0
            self.internal_genre_features = np.zeros((n_items, 0), dtype=np.float32)
        else:
            # Build internal genre feature matrix
            sample_vec = next(iter(self.genre_features.values()))
            self.n_genres = len(sample_vec)
            self.internal_genre_features = np.zeros((n_items, self.n_genres), dtype=np.float32)
            
            for iid, idx in self.item_id_to_idx.items():
                if iid in self.genre_features:
                    self.internal_genre_features[idx] = self.genre_features[iid]
            
            if self.verbose:
                print(f"Using {self.n_genres} genre features")
        
        # Initialize parameters
        self._init_parameters(n_users, n_items)
        
        # Convert to numpy arrays
        user_indices = train_df['user_id'].map(self.user_id_to_idx).values.astype(np.int32)
        item_indices = train_df['movie_id'].map(self.item_id_to_idx).values.astype(np.int32)
        ratings = train_df['rating'].values.astype(np.float32)
        
        # Train/validation split
        n_samples = len(ratings)
        indices = np.random.permutation(n_samples)
        
        # Handle val_fraction=0 (no internal validation)
        use_validation = self.val_fraction > 0
        
        if use_validation:
            val_size = int(n_samples * self.val_fraction)
            val_indices = indices[:val_size]
            train_indices = indices[val_size:]
            
            train_users = user_indices[train_indices]
            train_items = item_indices[train_indices]
            train_ratings = ratings[train_indices]
            
            val_users = user_indices[val_indices]
            val_items = item_indices[val_indices]
            val_ratings = ratings[val_indices]
            
            if self.verbose:
                print(f"Training samples: {len(train_indices):,}, Validation: {len(val_indices):,}")
        else:
            train_users = user_indices
            train_items = item_indices
            train_ratings = ratings
            val_users = None
            val_items = None
            val_ratings = None
            
            if self.verbose:
                print(f"Training samples: {n_samples:,} (no validation split)")
        
        # Training loop
        current_lr = self.lr
        best_val_loss = float('inf')
        patience_counter = 0
        best_params = None
        
        for epoch in range(self.n_epochs):
            # Shuffle training data
            perm = np.random.permutation(len(train_ratings))
            train_users_shuffled = train_users[perm]
            train_items_shuffled = train_items[perm]
            train_ratings_shuffled = train_ratings[perm]
            
            # SGD updates
            for u, i, r in zip(train_users_shuffled, train_items_shuffled, train_ratings_shuffled):
                # Compute prediction
                pred = (
                    self.global_mean +
                    self.user_biases[u] +
                    self.item_biases[i] +
                    np.dot(self.user_factors[u], self.item_factors[i])
                )
                
                # Add genre contribution
                if self.n_genres > 0:
                    pred += np.dot(self.genre_weights, self.internal_genre_features[i])
                
                # Compute error
                error = r - pred
                
                # Update biases
                self.user_biases[u] += current_lr * (error - self.reg * self.user_biases[u])
                self.item_biases[i] += current_lr * (error - self.reg * self.item_biases[i])
                
                # Update latent factors
                user_factor_old = self.user_factors[u].copy()
                self.user_factors[u] += current_lr * (error * self.item_factors[i] - self.reg * self.user_factors[u])
                self.item_factors[i] += current_lr * (error * user_factor_old - self.reg * self.item_factors[i])
                
                # Update genre weights
                if self.n_genres > 0:
                    genre_vec = self.internal_genre_features[i]
                    self.genre_weights += current_lr * (error * genre_vec - self.content_reg * self.genre_weights)
            
            # Compute losses
            train_loss = self._compute_loss(train_users, train_items, train_ratings)
            self.train_losses.append(train_loss)
            
            if use_validation:
                val_loss = self._compute_loss(val_users, val_items, val_ratings)
                self.val_losses.append(val_loss)
                
                if self.verbose:
                    print(f"Epoch {epoch+1}/{self.n_epochs}: Train RMSE={train_loss:.4f}, Val RMSE={val_loss:.4f}, LR={current_lr:.6f}")
                
                # Early stopping
                if val_loss < best_val_loss:
                    best_val_loss = val_loss
                    patience_counter = 0
                    best_params = {
                        'user_biases': self.user_biases.copy(),
                        'item_biases': self.item_biases.copy(),
                        'user_factors': self.user_factors.copy(),
                        'item_factors': self.item_factors.copy(),
                        'genre_weights': self.genre_weights.copy() if self.genre_weights is not None else None
                    }
                else:
                    patience_counter += 1
                    if patience_counter >= self.early_stop_patience:
                        if self.verbose:
                            print(f"Early stopping at epoch {epoch+1}")
                        break
            else:
                # No validation - just train for all epochs
                if self.verbose:
                    print(f"Epoch {epoch+1}/{self.n_epochs}: Train RMSE={train_loss:.4f}, LR={current_lr:.6f}")
            
            # Learning rate decay
            current_lr = max(current_lr * self.lr_decay, self.min_lr)
        
        # Restore best parameters (only if we used validation)
        if use_validation and best_params is not None:
            self.user_biases = best_params['user_biases']
            self.item_biases = best_params['item_biases']
            self.user_factors = best_params['user_factors']
            self.item_factors = best_params['item_factors']
            self.genre_weights = best_params['genre_weights']
        
        self.is_fitted = True
        
        if self.verbose:
            print(f"✓ Hybrid training complete!")
            if use_validation:
                print(f"Best validation RMSE: {best_val_loss:.4f}")
        
        return self
    
    def predict(self, user_id: int, movie_id: int) -> float:
        """
        Predict rating for a user-movie pair.
        
        Args:
            user_id: User identifier
            movie_id: Movie identifier
        
        Returns:
            Predicted rating
        """
        if not self.is_fitted:
            raise RuntimeError("Model must be fitted before prediction")
        
        prediction = self.global_mean
        
        user_idx = self.user_id_to_idx.get(user_id)
        item_idx = self.item_id_to_idx.get(movie_id)
        
        if user_idx is not None:
            prediction += self.user_biases[user_idx]
        
        if item_idx is not None:
            prediction += self.item_biases[item_idx]
            # Add genre contribution even for unknown users (cold-start)
            if self.n_genres > 0 and self.genre_weights is not None:
                prediction += np.dot(self.genre_weights, self.internal_genre_features[item_idx])
        
        if user_idx is not None and item_idx is not None:
            prediction += np.dot(self.user_factors[user_idx], self.item_factors[item_idx])
        
        return prediction
    
    def predict_batch(self, pairs: List[Tuple[int, int]]) -> np.ndarray:
        """
        Efficient batch prediction.
        
        Args:
            pairs: List of (user_id, movie_id) tuples
        
        Returns:
            Array of predicted ratings
        """
        if not self.is_fitted:
            raise RuntimeError("Model must be fitted before prediction")
        
        predictions = np.full(len(pairs), self.global_mean, dtype=np.float32)
        
        for i, (user_id, movie_id) in enumerate(pairs):
            user_idx = self.user_id_to_idx.get(user_id)
            item_idx = self.item_id_to_idx.get(movie_id)
            
            if user_idx is not None:
                predictions[i] += self.user_biases[user_idx]
            
            if item_idx is not None:
                predictions[i] += self.item_biases[item_idx]
                if self.n_genres > 0 and self.genre_weights is not None:
                    predictions[i] += np.dot(self.genre_weights, self.internal_genre_features[item_idx])
            
            if user_idx is not None and item_idx is not None:
                predictions[i] += np.dot(self.user_factors[user_idx], self.item_factors[item_idx])
        
        return predictions

print('✓ Hybrid recommender defined')

## 9. Feature Engineering

In [None]:
class FeatureEngineer:
    """
    Comprehensive feature extraction for recommendation.
    
    Feature groups:
    - User statistics (mean, std, count, percentiles)
    - Movie statistics (mean, std, count, popularity tier)
    - Genre features (user preferences, movie genres)
    - Tag features (TF-IDF embeddings)
    - Cross features (user-movie interactions)
    """
    
    def __init__(
        self,
        n_tag_components: int = 50,
        n_genre_components: int = 20,
        verbose: bool = True
    ):
        """
        Initialize feature engineer.
        
        Args:
            n_tag_components: Dimensionality of tag embeddings
            n_genre_components: Number of genre features
            verbose: Print progress
        """
        self.n_tag_components = n_tag_components
        self.n_genre_components = n_genre_components
        self.verbose = verbose
        
        # Will be populated during fit
        self.user_stats = {}
        self.movie_stats = {}
        self.user_genre_prefs = {}  # user -> genre preference vector
        self.movie_genres = {}  # movie -> genre vector
        self.movie_tag_embeddings = {}  # movie -> tag embedding
        self.user_tag_prefs = {}  # user -> tag preference vector
        
        self.global_mean = 3.5
        self.genre_list = []
        self.tag_to_idx = {}
        
        self.is_fitted = False
    
    def fit(
        self,
        train_df: pd.DataFrame,
        movies_df: pd.DataFrame,
        tags_df: Optional[pd.DataFrame] = None
    ) -> 'FeatureEngineer':
        """
        Fit feature engineer on training data.
        
        Args:
            train_df: Training ratings [user_id, movie_id, rating]
            movies_df: Movie metadata [movie_id, title, genres]
            tags_df: Optional tags [user_id, movie_id, tag]
        
        Returns:
            Self
        """
        if self.verbose:
            print("Fitting FeatureEngineer...")
        
        self.global_mean = train_df['rating'].mean()
        
        # 1. User statistics
        if self.verbose:
            print("  Computing user statistics...")
        self._compute_user_stats(train_df)
        
        # 2. Movie statistics
        if self.verbose:
            print("  Computing movie statistics...")
        self._compute_movie_stats(train_df)
        
        # 3. Genre features
        if self.verbose:
            print("  Processing genre features...")
        self._process_genres(movies_df, train_df)
        
        # 4. Tag features
        if tags_df is not None:
            if self.verbose:
                print("  Processing tag features...")
            self._process_tags(tags_df, train_df)
        
        self.is_fitted = True
        if self.verbose:
            print("✓ FeatureEngineer fitted!")
        
        return self
    
    def _compute_user_stats(self, train_df: pd.DataFrame):
        """Compute user-level statistics."""
        user_groups = train_df.groupby('user_id')
        
        for user_id, group in user_groups:
            ratings = group['rating'].values
            self.user_stats[user_id] = {
                'mean': np.mean(ratings),
                'std': np.std(ratings) if len(ratings) > 1 else 0.0,
                'count': len(ratings),
                'min': np.min(ratings),
                'max': np.max(ratings),
                'median': np.median(ratings),
                'q25': np.percentile(ratings, 25),
                'q75': np.percentile(ratings, 75),
                'range': np.max(ratings) - np.min(ratings),
                'skew': self._safe_skew(ratings),
            }
    
    def _compute_movie_stats(self, train_df: pd.DataFrame):
        """Compute movie-level statistics."""
        movie_groups = train_df.groupby('movie_id')
        
        # Global movie stats for popularity tiers
        movie_counts = train_df.groupby('movie_id').size()
        count_percentiles = [
            np.percentile(movie_counts, p) for p in [25, 50, 75, 90, 95, 99]
        ]
        
        for movie_id, group in movie_groups:
            ratings = group['rating'].values
            count = len(ratings)
            
            # Popularity tier (0-5)
            tier = sum(count > p for p in count_percentiles)
            
            self.movie_stats[movie_id] = {
                'mean': np.mean(ratings),
                'std': np.std(ratings) if len(ratings) > 1 else 0.0,
                'count': count,
                'log_count': np.log1p(count),
                'min': np.min(ratings),
                'max': np.max(ratings),
                'median': np.median(ratings),
                'q25': np.percentile(ratings, 25),
                'q75': np.percentile(ratings, 75),
                'range': np.max(ratings) - np.min(ratings),
                'skew': self._safe_skew(ratings),
                'popularity_tier': tier,
                'is_rare': int(count < 50),
                'is_popular': int(count > count_percentiles[4]),  # > 95th percentile
            }
    
    def _safe_skew(self, arr: np.ndarray) -> float:
        """Compute skewness safely."""
        if len(arr) < 3:
            return 0.0
        std = np.std(arr)
        if std == 0:
            return 0.0
        return float(np.mean(((arr - np.mean(arr)) / std) ** 3))
    
    def _process_genres(self, movies_df: pd.DataFrame, train_df: pd.DataFrame):
        """Process genre features for movies and users."""
        # Extract all genres
        all_genres = set()
        for genres_str in movies_df['genres'].dropna():
            if isinstance(genres_str, str):
                all_genres.update(genres_str.split('|'))
        
        self.genre_list = sorted(list(all_genres - {'(no genres listed)'}))
        n_genres = len(self.genre_list)
        genre_to_idx = {g: i for i, g in enumerate(self.genre_list)}
        
        if self.verbose:
            print(f"    Found {n_genres} genres")
        
        # Movie genre vectors (one-hot)
        for _, row in movies_df.iterrows():
            movie_id = row['movie_id']
            genre_vec = np.zeros(n_genres, dtype=np.float32)
            
            if pd.notna(row['genres']) and isinstance(row['genres'], str):
                for genre in row['genres'].split('|'):
                    if genre in genre_to_idx:
                        genre_vec[genre_to_idx[genre]] = 1.0
            
            self.movie_genres[movie_id] = genre_vec
        
        # User genre preferences (weighted average of movie genres by rating)
        user_genre_sums = defaultdict(lambda: np.zeros(n_genres, dtype=np.float32))
        user_genre_counts = defaultdict(lambda: np.zeros(n_genres, dtype=np.float32))
        
        for _, row in train_df.iterrows():
            user_id = row['user_id']
            movie_id = row['movie_id']
            rating = row['rating']
            
            if movie_id in self.movie_genres:
                genre_vec = self.movie_genres[movie_id]
                # Weight by rating deviation from mean
                weight = rating - self.global_mean
                user_genre_sums[user_id] += genre_vec * weight
                user_genre_counts[user_id] += genre_vec
        
        for user_id in user_genre_sums:
            counts = user_genre_counts[user_id]
            counts[counts == 0] = 1  # Avoid division by zero
            self.user_genre_prefs[user_id] = user_genre_sums[user_id] / counts
    
    def _process_tags(self, tags_df: pd.DataFrame, train_df: pd.DataFrame):
        """Process tag features using TF-IDF + SVD."""
        # Normalize tags
        tags_df = tags_df.copy()
        tags_df['tag'] = tags_df['tag'].astype(str).str.lower().str.strip()
        
        # Filter to frequent tags
        tag_counts = tags_df['tag'].value_counts()
        min_count = 5
        valid_tags = tag_counts[tag_counts >= min_count].head(500).index
        self.tag_to_idx = {tag: idx for idx, tag in enumerate(valid_tags)}
        n_tags = len(self.tag_to_idx)
        
        if self.verbose:
            print(f"    Using {n_tags} tags")
        
        # Filter tags
        tags_df = tags_df[tags_df['tag'].isin(self.tag_to_idx)]
        
        # Build movie-tag matrix
        unique_movies = tags_df['movie_id'].unique()
        movie_to_row = {mid: i for i, mid in enumerate(unique_movies)}
        n_movies = len(unique_movies)
        
        rows, cols, data = [], [], []
        movie_tag_counts = tags_df.groupby(['movie_id', 'tag']).size()
        
        for (movie_id, tag), count in movie_tag_counts.items():
            if movie_id in movie_to_row and tag in self.tag_to_idx:
                rows.append(movie_to_row[movie_id])
                cols.append(self.tag_to_idx[tag])
                data.append(count)
        
        count_matrix = csr_matrix((data, (rows, cols)), shape=(n_movies, n_tags))
        
        # TF-IDF
        row_sums = np.array(count_matrix.sum(axis=1)).flatten()
        row_sums[row_sums == 0] = 1
        tf = count_matrix.multiply(1.0 / row_sums[:, np.newaxis])
        
        doc_freq = np.array((count_matrix > 0).sum(axis=0)).flatten()
        doc_freq[doc_freq == 0] = 1
        idf = np.log(n_movies / doc_freq)
        
        tfidf = tf.multiply(idf)
        
        # SVD
        k = min(self.n_tag_components, min(n_movies, n_tags) - 1)
        if k > 0:
            U, s, Vt = svds(tfidf.tocsr(), k=k)
            embeddings = U * s
            
            for movie_id, row_idx in movie_to_row.items():
                self.movie_tag_embeddings[movie_id] = embeddings[row_idx].astype(np.float32)
        
        # Mean embedding for movies without tags
        if self.movie_tag_embeddings:
            self.mean_tag_embedding = np.mean(
                list(self.movie_tag_embeddings.values()), axis=0
            ).astype(np.float32)
        else:
            self.mean_tag_embedding = np.zeros(self.n_tag_components, dtype=np.float32)
        
        # User tag preferences (weighted average of movie tag embeddings)
        user_tag_sums = defaultdict(lambda: np.zeros(self.n_tag_components, dtype=np.float32))
        user_tag_counts = defaultdict(float)
        
        for _, row in train_df.iterrows():
            user_id = row['user_id']
            movie_id = row['movie_id']
            rating = row['rating']
            
            if movie_id in self.movie_tag_embeddings and rating >= 4.0:
                user_tag_sums[user_id] += self.movie_tag_embeddings[movie_id]
                user_tag_counts[user_id] += 1
        
        for user_id, tag_sum in user_tag_sums.items():
            count = user_tag_counts[user_id]
            if count > 0:
                self.user_tag_prefs[user_id] = tag_sum / count
    
    def get_user_features(self, user_id: int) -> np.ndarray:
        """
        Get feature vector for a user.
        
        Returns ~30 features.
        """
        features = []
        
        # Basic stats (10 features)
        if user_id in self.user_stats:
            stats = self.user_stats[user_id]
            features.extend([
                stats['mean'],
                stats['std'],
                np.log1p(stats['count']),
                stats['min'],
                stats['max'],
                stats['median'],
                stats['q25'],
                stats['q75'],
                stats['range'],
                stats['skew'],
            ])
        else:
            features.extend([self.global_mean, 0.5, 0, 1, 5, self.global_mean, 
                           2.5, 4.5, 4, 0])
        
        # Genre preferences (n_genres features)
        if user_id in self.user_genre_prefs:
            features.extend(self.user_genre_prefs[user_id].tolist())
        else:
            features.extend([0.0] * len(self.genre_list))
        
        # Tag preferences (n_tag_components features)
        if user_id in self.user_tag_prefs:
            features.extend(self.user_tag_prefs[user_id].tolist())
        else:
            features.extend([0.0] * self.n_tag_components)
        
        return np.array(features, dtype=np.float32)
    
    def get_movie_features(self, movie_id: int) -> np.ndarray:
        """
        Get feature vector for a movie.
        
        Returns ~85 features.
        """
        features = []
        
        # Basic stats (13 features)
        if movie_id in self.movie_stats:
            stats = self.movie_stats[movie_id]
            features.extend([
                stats['mean'],
                stats['std'],
                stats['log_count'],
                stats['min'],
                stats['max'],
                stats['median'],
                stats['q25'],
                stats['q75'],
                stats['range'],
                stats['skew'],
                stats['popularity_tier'],
                stats['is_rare'],
                stats['is_popular'],
            ])
        else:
            features.extend([self.global_mean, 0.5, 0, 1, 5, self.global_mean,
                           2.5, 4.5, 4, 0, 0, 1, 0])
        
        # Genre features (n_genres features)
        if movie_id in self.movie_genres:
            features.extend(self.movie_genres[movie_id].tolist())
        else:
            features.extend([0.0] * len(self.genre_list))
        
        # Tag embedding (n_tag_components features)
        if movie_id in self.movie_tag_embeddings:
            features.extend(self.movie_tag_embeddings[movie_id].tolist())
        else:
            features.extend(self.mean_tag_embedding.tolist())
        
        return np.array(features, dtype=np.float32)
    
    def get_cross_features(self, user_id: int, movie_id: int) -> np.ndarray:
        """
        Get cross features for a user-movie pair.
        
        Returns ~15 features.
        """
        features = []
        
        # User-movie genre match (dot product)
        user_genre = self.user_genre_prefs.get(user_id, np.zeros(len(self.genre_list)))
        movie_genre = self.movie_genres.get(movie_id, np.zeros(len(self.genre_list)))
        
        genre_dot = np.dot(user_genre, movie_genre)
        genre_cos = genre_dot / (np.linalg.norm(user_genre) * np.linalg.norm(movie_genre) + 1e-8)
        
        features.extend([genre_dot, genre_cos])
        
        # User-movie tag match
        user_tag = self.user_tag_prefs.get(user_id, np.zeros(self.n_tag_components))
        movie_tag = self.movie_tag_embeddings.get(movie_id, self.mean_tag_embedding)
        
        tag_dot = np.dot(user_tag, movie_tag)
        tag_cos = tag_dot / (np.linalg.norm(user_tag) * np.linalg.norm(movie_tag) + 1e-8)
        
        features.extend([tag_dot, tag_cos])
        
        # Rating deviation features
        user_mean = self.user_stats.get(user_id, {}).get('mean', self.global_mean)
        movie_mean = self.movie_stats.get(movie_id, {}).get('mean', self.global_mean)
        
        features.extend([
            user_mean - self.global_mean,  # User bias
            movie_mean - self.global_mean,  # Movie bias
            user_mean - movie_mean,  # User-movie bias diff
            abs(user_mean - movie_mean),  # Absolute diff
        ])
        
        # User activity vs movie popularity
        user_count = self.user_stats.get(user_id, {}).get('count', 0)
        movie_count = self.movie_stats.get(movie_id, {}).get('count', 0)
        
        features.extend([
            np.log1p(user_count),
            np.log1p(movie_count),
            np.log1p(user_count) - np.log1p(movie_count),
        ])
        
        # Variance features
        user_std = self.user_stats.get(user_id, {}).get('std', 0.5)
        movie_std = self.movie_stats.get(movie_id, {}).get('std', 0.5)
        
        features.extend([
            user_std,
            movie_std,
            user_std * movie_std,
        ])
        
        return np.array(features, dtype=np.float32)
    
    def get_all_features(self, user_id: int, movie_id: int) -> np.ndarray:
        """Get complete feature vector for a user-movie pair."""
        user_feats = self.get_user_features(user_id)
        movie_feats = self.get_movie_features(movie_id)
        cross_feats = self.get_cross_features(user_id, movie_id)
        
        return np.concatenate([user_feats, movie_feats, cross_feats])
    
    def get_features_batch(
        self,
        user_ids: np.ndarray,
        movie_ids: np.ndarray
    ) -> np.ndarray:
        """Get features for multiple user-movie pairs."""
        n = len(user_ids)
        
        # Get feature dimension
        sample_feats = self.get_all_features(user_ids[0], movie_ids[0])
        n_features = len(sample_feats)
        
        features = np.zeros((n, n_features), dtype=np.float32)
        
        for i in range(n):
            features[i] = self.get_all_features(user_ids[i], movie_ids[i])
        
        return features
    
    def get_feature_names(self) -> List[str]:
        """Get names of all features."""
        names = []
        
        # User stats
        names.extend([
            'user_mean', 'user_std', 'user_log_count', 'user_min', 'user_max',
            'user_median', 'user_q25', 'user_q75', 'user_range', 'user_skew'
        ])
        
        # User genre prefs
        names.extend([f'user_genre_{g}' for g in self.genre_list])
        
        # User tag prefs
        names.extend([f'user_tag_{i}' for i in range(self.n_tag_components)])
        
        # Movie stats
        names.extend([
            'movie_mean', 'movie_std', 'movie_log_count', 'movie_min', 'movie_max',
            'movie_median', 'movie_q25', 'movie_q75', 'movie_range', 'movie_skew',
            'movie_popularity_tier', 'movie_is_rare', 'movie_is_popular'
        ])
        
        # Movie genres
        names.extend([f'movie_genre_{g}' for g in self.genre_list])
        
        # Movie tag embedding
        names.extend([f'movie_tag_{i}' for i in range(self.n_tag_components)])
        
        # Cross features
        names.extend([
            'cross_genre_dot', 'cross_genre_cos',
            'cross_tag_dot', 'cross_tag_cos',
            'user_bias', 'movie_bias', 'bias_diff', 'bias_diff_abs',
            'user_log_count', 'movie_log_count', 'count_diff',
            'user_std', 'movie_std', 'std_product'
        ])
        
        return names

print('✓ FeatureEngineer defined')

## 10. Competition Ensemble (Stacked Model)

In [None]:
class CompetitionEnsemble:
    """
    Two-level stacked ensemble optimized for Weighted RMSE.
    
    Level 1: Base models (SVD, Implicit ALS, Neural CF)
    Level 2: LightGBM meta-learner with W-RMSE sample weights
    """
    
    def __init__(
        self,
        use_implicit: bool = True,
        use_neural: bool = True,
        n_folds: int = 5,
        lgbm_params: Optional[Dict] = None,
        random_state: int = 42,
        verbose: bool = True
    ):
        """
        Initialize competition ensemble.
        
        Args:
            use_implicit: Include Implicit ALS model
            use_neural: Include Neural CF model
            n_folds: Number of folds for OOF predictions
            lgbm_params: LightGBM parameters (or use defaults)
            random_state: Random seed
            verbose: Print progress
        """
        if not LGBM_AVAILABLE and not SKLEARN_GB_AVAILABLE:
            raise ImportError("LightGBM or sklearn required")
        
        self.use_implicit = use_implicit
        self.use_neural = use_neural
        self.n_folds = n_folds
        self.random_state = random_state
        self.verbose = verbose
        
        # Default LightGBM parameters optimized for RMSE
        self.lgbm_params = lgbm_params or {
            'objective': 'regression',
            'metric': 'rmse',
            'boosting_type': 'gbdt',
            'num_leaves': 63,
            'learning_rate': 0.05,
            'feature_fraction': 0.8,
            'bagging_fraction': 0.8,
            'bagging_freq': 5,
            'min_child_samples': 20,
            'lambda_l1': 0.1,
            'lambda_l2': 0.1,
            'verbose': -1,
            'seed': random_state,
            'n_jobs': -1,
        }
        
        # Models and components
        self.base_models = {}
        self.meta_model = None
        self.feature_engineer = None
        
        # Stored data
        self.train_df = None
        self.movie_counts = None
        self.global_mean = 3.5
        
        self.is_fitted = False
    
    def _compute_sample_weights(self, movie_ids: np.ndarray) -> np.ndarray:
        """Compute W-RMSE sample weights: w_i = 1 / sqrt(movie_count)."""
        weights = np.array([
            1.0 / np.sqrt(self.movie_counts.get(mid, 1))
            for mid in movie_ids
        ])
        # Normalize
        weights = weights / weights.sum() * len(weights)
        return weights
    
    def _train_base_models(
        self,
        train_df: pd.DataFrame,
        movies_df: pd.DataFrame,
        tags_df: Optional[pd.DataFrame],
        implicit_df: Optional[pd.DataFrame]
    ):
        """Train all base models."""
        # SVDRecommender already defined in notebook
        
        # 1. SVD models with different factors
        if self.verbose:
            print("\n[1/4] Training SVD models...")
        
        self.base_models['svd_100'] = SVDRecommender(n_factors=100, random_state=self.random_state)
        self.base_models['svd_100'].fit(train_df)
        
        self.base_models['svd_50'] = SVDRecommender(n_factors=50, random_state=self.random_state)
        self.base_models['svd_50'].fit(train_df)
        
        # 2. Implicit ALS
        if self.use_implicit and implicit_df is not None:
            if self.verbose:
                print("\n[2/4] Training Implicit ALS...")
            try:
        # ImplicitALS not available in this notebook
                self.base_models['implicit_als'] = ImplicitALS(
                    n_factors=100,
                    alpha=40.0,
                    n_iterations=15,
                    random_state=self.random_state,
                    verbose=self.verbose
                )
                self.base_models['implicit_als'].fit(train_df, implicit_df)
            except Exception as e:
                warnings.warn(f"Implicit ALS failed: {e}")
        
        # 3. Neural CF
        if self.use_neural:
            if self.verbose:
                print("\n[3/4] Training Neural CF...")
            try:
        # NeuralCF not available in this notebook
                if TORCH_AVAILABLE:
                    self.base_models['neural_cf'] = NeuralCF(
                        gmf_dim=32,
                        mlp_dims=[64, 32, 16],
                        n_epochs=15,
                        batch_size=2048,
                        random_state=self.random_state,
                        verbose=self.verbose
                    )
                    self.base_models['neural_cf'].fit(train_df)
            except Exception as e:
                warnings.warn(f"Neural CF failed: {e}")
        
        # 4. Feature Engineer
        if self.verbose:
            print("\n[4/4] Fitting feature engineer...")
        # FeatureEngineer already defined in notebook
        self.feature_engineer = FeatureEngineer(
            n_tag_components=50,
            verbose=self.verbose
        )
        self.feature_engineer.fit(train_df, movies_df, tags_df)
    
    def _get_base_predictions(
        self,
        user_ids: np.ndarray,
        movie_ids: np.ndarray
    ) -> np.ndarray:
        """Get predictions from all base models."""
        n = len(user_ids)
        pairs = list(zip(user_ids, movie_ids))
        
        preds = []
        for name, model in self.base_models.items():
            try:
                model_preds = model.predict_batch(pairs)
                preds.append(model_preds)
            except Exception as e:
                warnings.warn(f"Model {name} prediction failed: {e}")
                preds.append(np.full(n, self.global_mean))
        
        return np.column_stack(preds)
    
    def _generate_oof_predictions(
        self,
        train_df: pd.DataFrame,
        movies_df: pd.DataFrame,
        tags_df: Optional[pd.DataFrame],
        implicit_df: Optional[pd.DataFrame]
    ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        """
        Generate out-of-fold predictions for stacking.
        
        Returns:
            (features, base_predictions, targets)
        """
        # SVDRecommender already defined in notebook
        
        n = len(train_df)
        user_ids = train_df['user_id'].values
        movie_ids = train_df['movie_id'].values
        targets = train_df['rating'].values
        
        # Initialize arrays for OOF predictions
        n_base_models = 2  # SVD_100, SVD_50
        if self.use_implicit:
            n_base_models += 1
        if self.use_neural:
            n_base_models += 1
        
        oof_base_preds = np.zeros((n, n_base_models), dtype=np.float32)
        
        # Cross-validation for OOF
        kfold = KFold(n_splits=self.n_folds, shuffle=True, random_state=self.random_state)
        
        if self.verbose:
            print(f"\nGenerating OOF predictions ({self.n_folds}-fold)...")
        
        for fold, (train_idx, val_idx) in enumerate(kfold.split(train_df)):
            if self.verbose:
                print(f"\n--- Fold {fold + 1}/{self.n_folds} ---")
            
            fold_train = train_df.iloc[train_idx]
            fold_val = train_df.iloc[val_idx]
            
            val_user_ids = fold_val['user_id'].values
            val_movie_ids = fold_val['movie_id'].values
            val_pairs = list(zip(val_user_ids, val_movie_ids))
            
            col = 0
            
            # SVD 100
            svd_100 = SVDRecommender(n_factors=100, random_state=self.random_state)
            svd_100.fit(fold_train)
            oof_base_preds[val_idx, col] = svd_100.predict_batch(val_pairs)
            col += 1
            
            # SVD 50
            svd_50 = SVDRecommender(n_factors=50, random_state=self.random_state)
            svd_50.fit(fold_train)
            oof_base_preds[val_idx, col] = svd_50.predict_batch(val_pairs)
            col += 1
            
            # Implicit ALS (if available)
            if self.use_implicit and implicit_df is not None:
                try:
        # ImplicitALS not available in this notebook
                    impl_als = ImplicitALS(
                        n_factors=100, alpha=40.0, n_iterations=10,
                        random_state=self.random_state, verbose=False
                    )
                    impl_als.fit(fold_train, implicit_df)
                    oof_base_preds[val_idx, col] = impl_als.predict_batch(val_pairs)
                except Exception as e:
                    oof_base_preds[val_idx, col] = self.global_mean
                col += 1
            
            # Neural CF (if available)
            if self.use_neural:
                try:
        # NeuralCF not available in this notebook
                    if TORCH_AVAILABLE:
                        ncf = NeuralCF(
                            gmf_dim=32, mlp_dims=[64, 32, 16],
                            n_epochs=10, batch_size=2048,
                            random_state=self.random_state, verbose=False
                        )
                        ncf.fit(fold_train)
                        oof_base_preds[val_idx, col] = ncf.predict_batch(val_pairs)
                    else:
                        oof_base_preds[val_idx, col] = self.global_mean
                except Exception as e:
                    oof_base_preds[val_idx, col] = self.global_mean
                col += 1
        
        # Now fit feature engineer on FULL training data
        if self.verbose:
            print("\nFitting feature engineer on full data...")
        # FeatureEngineer already defined in notebook
        self.feature_engineer = FeatureEngineer(n_tag_components=50, verbose=False)
        self.feature_engineer.fit(train_df, movies_df, tags_df)
        
        # Extract features for all training samples
        if self.verbose:
            print("Extracting features...")
        features = self.feature_engineer.get_features_batch(user_ids, movie_ids)
        
        return features, oof_base_preds, targets
    
    def fit(
        self,
        train_df: pd.DataFrame,
        movies_df: pd.DataFrame,
        tags_df: Optional[pd.DataFrame] = None,
        implicit_df: Optional[pd.DataFrame] = None,
        use_oof: bool = True
    ) -> 'CompetitionEnsemble':
        """
        Fit the competition ensemble.
        
        Args:
            train_df: Training ratings [user_id, movie_id, rating]
            movies_df: Movie metadata
            tags_df: Optional tags
            implicit_df: Optional implicit interactions
            use_oof: Use out-of-fold predictions for stacking (recommended)
        
        Returns:
            Self
        """
        np.random.seed(self.random_state)
        
        self.train_df = train_df.copy()
        self.global_mean = train_df['rating'].mean()
        self.movie_counts = train_df.groupby('movie_id').size().to_dict()
        
        if self.verbose:
            print("="*60)
            print("COMPETITION ENSEMBLE TRAINING")
            print("="*60)
            print(f"Training samples: {len(train_df):,}")
            print(f"Users: {train_df['user_id'].nunique():,}")
            print(f"Movies: {train_df['movie_id'].nunique():,}")
        
        # Get training data arrays
        user_ids = train_df['user_id'].values
        movie_ids = train_df['movie_id'].values
        targets = train_df['rating'].values
        
        if use_oof:
            # Generate OOF predictions and features
            features, base_preds, targets = self._generate_oof_predictions(
                train_df, movies_df, tags_df, implicit_df
            )
        else:
            # Direct training (may overfit)
            self._train_base_models(train_df, movies_df, tags_df, implicit_df)
            base_preds = self._get_base_predictions(user_ids, movie_ids)
            features = self.feature_engineer.get_features_batch(user_ids, movie_ids)
        
        # Combine features and base predictions
        X = np.hstack([base_preds, features])
        y = targets
        
        # Compute sample weights for W-RMSE
        sample_weights = self._compute_sample_weights(movie_ids)
        
        if self.verbose:
            print(f"\nMeta-learner input shape: {X.shape}")
            print(f"Feature breakdown: {base_preds.shape[1]} base preds + {features.shape[1]} features")
        
        # Train meta-learner
        if LGBM_AVAILABLE:
            if self.verbose:
                print("\nTraining LightGBM meta-learner...")
            
            train_data = lgb.Dataset(X, label=y, weight=sample_weights)
            
            self.meta_model = lgb.train(
                self.lgbm_params,
                train_data,
                num_boost_round=500,
                valid_sets=[train_data],
                callbacks=[
                    lgb.early_stopping(stopping_rounds=50, verbose=self.verbose),
                    lgb.log_evaluation(period=50 if self.verbose else 0)
                ]
            )
            self._use_lgbm = True
        else:
            if self.verbose:
                print("\nTraining sklearn GradientBoosting meta-learner...")
            
            from sklearn.ensemble import GradientBoostingRegressor
            self.meta_model = GradientBoostingRegressor(
                n_estimators=200,
                learning_rate=0.05,
                max_depth=5,
                min_samples_split=20,
                min_samples_leaf=10,
                subsample=0.8,
                random_state=self.random_state,
                verbose=1 if self.verbose else 0
            )
            self.meta_model.fit(X, y, sample_weight=sample_weights)
            self._use_lgbm = False
        
        # Re-train base models on full data for inference
        if use_oof:
            if self.verbose:
                print("\nRe-training base models on full data...")
            self._train_base_models(train_df, movies_df, tags_df, implicit_df)
        
        self.is_fitted = True
        
        if self.verbose:
            print("\n" + "="*60)
            print("✓ COMPETITION ENSEMBLE TRAINING COMPLETE!")
            print("="*60)
        
        return self
    
    def predict(self, user_id: int, movie_id: int) -> float:
        """Predict rating for a user-movie pair."""
        if not self.is_fitted:
            raise RuntimeError("Model must be fitted first")
        
        # Get base predictions
        pairs = [(user_id, movie_id)]
        base_preds = []
        
        for name, model in self.base_models.items():
            try:
                pred = model.predict(user_id, movie_id)
                base_preds.append(pred)
            except:
                base_preds.append(self.global_mean)
        
        # Get features
        features = self.feature_engineer.get_all_features(user_id, movie_id)
        
        # Combine
        X = np.concatenate([base_preds, features]).reshape(1, -1)
        
        # Meta-learner prediction
        if hasattr(self, '_use_lgbm') and self._use_lgbm:
            pred = self.meta_model.predict(X)[0]
        else:
            pred = self.meta_model.predict(X)[0]
        
        return float(np.clip(pred, 0.5, 5.0))
    
    def predict_batch(self, pairs: List[Tuple[int, int]]) -> np.ndarray:
        """Predict ratings for multiple pairs."""
        if not self.is_fitted:
            raise RuntimeError("Model must be fitted first")
        
        n = len(pairs)
        user_ids = np.array([p[0] for p in pairs])
        movie_ids = np.array([p[1] for p in pairs])
        
        # Get base predictions
        base_preds = self._get_base_predictions(user_ids, movie_ids)
        
        # Get features
        features = self.feature_engineer.get_features_batch(user_ids, movie_ids)
        
        # Combine
        X = np.hstack([base_preds, features])
        
        # Meta-learner predictions
        predictions = self.meta_model.predict(X)
        
        return np.clip(predictions, 0.5, 5.0)
    
    def get_feature_importance(self) -> pd.DataFrame:
        """Get feature importance from meta-learner."""
        if not self.is_fitted:
            raise RuntimeError("Model must be fitted first")
        
        # Build feature names
        base_model_names = list(self.base_models.keys())
        feature_names = base_model_names + self.feature_engineer.get_feature_names()
        
        if hasattr(self, '_use_lgbm') and self._use_lgbm:
            importance = self.meta_model.feature_importance(importance_type='gain')
        else:
            importance = self.meta_model.feature_importances_
        
        df = pd.DataFrame({
            'feature': feature_names[:len(importance)],
            'importance': importance
        }).sort_values('importance', ascending=False)
        
        return df

print('✓ CompetitionEnsemble defined')

## 11. Load Data (Run Once)

In [None]:
# Load and split data - RUN THIS ONCE
loader = DataLoader(DATA_DIR)
full_df = loader.load_train_data(explicit_only=True)

try:
    genre_features = loader.get_genre_features()
    movies_df = loader.movies_df
except:
    genre_features = None
    movies_df = None

try:
    tags_df = loader.load_tags()
except:
    tags_df = None

train_df, val_df, test_df = train_val_test_split(full_df)

print(f"\n✓ Data loaded and ready for experiments!")

---

# 🧪 EXPERIMENTS (Run Each Cell Independently)

**Each experiment below is in its own cell. You can run them in any order, skip them, or re-run if needed.**

---

## Experiment 1: Baseline

In [None]:
# BASELINE EXPERIMENT
print("\n" + "="*60)
print("EXPERIMENT: BaselineRecommender")
print("="*60 + "\n")

baseline = BaselineRecommender()
baseline.fit(train_df)

val_metrics = evaluate_recommender(baseline, val_df, train_df)
print(f"Validation: W-RMSE={val_metrics['weighted_rmse']:.6f}, RMSE={val_metrics['rmse']:.6f}")

train_val_df = pd.concat([train_df, val_df], ignore_index=True)
baseline_final = BaselineRecommender()
baseline_final.fit(train_val_df)

test_metrics = evaluate_recommender(baseline_final, test_df, train_val_df)
print(f"Test: W-RMSE={test_metrics['weighted_rmse']:.6f}, RMSE={test_metrics['rmse']:.6f}")

baseline_result = {
    'model': 'BaselineRecommender',
    'val_wrmse': val_metrics['weighted_rmse'],
    'test_wrmse': test_metrics['weighted_rmse'],
    'model_instance': baseline_final
}

print("\n✓ Baseline experiment complete!")

## Experiment 2: SVD (Currently Best!)

In [None]:
# SVD EXPERIMENT
print("\n" + "="*60)
print("EXPERIMENT: SVDRecommender")
print("="*60 + "\n")

svd = SVDRecommender(n_factors=50, random_state=42)
svd.fit(train_df)

val_metrics = evaluate_recommender(svd, val_df, train_df)
print(f"Validation: W-RMSE={val_metrics['weighted_rmse']:.6f}, RMSE={val_metrics['rmse']:.6f}")

train_val_df = pd.concat([train_df, val_df], ignore_index=True)
svd_final = SVDRecommender(n_factors=50, random_state=42)
svd_final.fit(train_val_df)

test_metrics = evaluate_recommender(svd_final, test_df, train_val_df)
print(f"Test: W-RMSE={test_metrics['weighted_rmse']:.6f}, RMSE={test_metrics['rmse']:.6f}")

svd_result = {
    'model': 'SVDRecommender',
    'val_wrmse': val_metrics['weighted_rmse'],
    'test_wrmse': test_metrics['weighted_rmse'],
    'model_instance': svd_final
}

print("\n✓ SVD experiment complete!")

## Experiment 3: FunkSVD (FIXED - Better Parameters)

In [None]:
# FUNKSVD EXPERIMENT (FIXED VERSION)
print("\n" + "="*60)
print("EXPERIMENT: FunkSVDRecommender (FIXED)")
print("="*60 + "\n")

# Fixed parameters: more factors, higher lr, more epochs
funksvd = FunkSVDRecommender(
    n_factors=100,           # Increased from 50
    lr=0.01,                 # Increased from 0.005
    reg=0.05,                # Increased regularization
    n_epochs=50,             # Increased from 20
    batch_size=2048,         # Larger batches
    early_stop_patience=10,  # More patience
    random_state=42,
    verbose=True
)
funksvd.fit(train_df)

val_metrics = evaluate_recommender(funksvd, val_df, train_df)
print(f"\nValidation: W-RMSE={val_metrics['weighted_rmse']:.6f}, RMSE={val_metrics['rmse']:.6f}")

train_val_df = pd.concat([train_df, val_df], ignore_index=True)
funksvd_final = FunkSVDRecommender(
    n_factors=100, lr=0.01, reg=0.05, n_epochs=50, 
    batch_size=2048, val_fraction=0.0,  # No internal validation
    random_state=42, verbose=True
)
funksvd_final.fit(train_val_df)

test_metrics = evaluate_recommender(funksvd_final, test_df, train_val_df)
print(f"Test: W-RMSE={test_metrics['weighted_rmse']:.6f}, RMSE={test_metrics['rmse']:.6f}")

funksvd_result = {
    'model': 'FunkSVDRecommender',
    'val_wrmse': val_metrics['weighted_rmse'],
    'test_wrmse': test_metrics['weighted_rmse'],
    'model_instance': funksvd_final
}

print("\n✓ FunkSVD experiment complete!")

## Experiment 4: ALS (FIXED - Higher Regularization)

In [None]:
# ALS EXPERIMENT (FIXED VERSION - was severely overfitting)
print("\n" + "="*60)
print("EXPERIMENT: ALSRecommender (FIXED)")
print("="*60 + "\n")

# Fixed: MUCH higher regularization to prevent overfitting
als = ALSRecommender(
    n_factors=50,
    reg=0.5,              # INCREASED from 0.1 to 0.5!
    n_iterations=10,      # Reduced from 15
    random_state=42,
    verbose=True
)
als.fit(train_df)

val_metrics = evaluate_recommender(als, val_df, train_df)
print(f"\nValidation: W-RMSE={val_metrics['weighted_rmse']:.6f}, RMSE={val_metrics['rmse']:.6f}")

train_val_df = pd.concat([train_df, val_df], ignore_index=True)
als_final = ALSRecommender(
    n_factors=50, reg=0.5, n_iterations=10,
    random_state=42, verbose=True
)
als_final.fit(train_val_df)

test_metrics = evaluate_recommender(als_final, test_df, train_val_df)
print(f"Test: W-RMSE={test_metrics['weighted_rmse']:.6f}, RMSE={test_metrics['rmse']:.6f}")

als_result = {
    'model': 'ALSRecommender',
    'val_wrmse': val_metrics['weighted_rmse'],
    'test_wrmse': test_metrics['weighted_rmse'],
    'model_instance': als_final
}

print("\n✓ ALS experiment complete!")

## Experiment 5: Hybrid (CF + Content)

In [None]:
# HYBRID EXPERIMENT
print("\n" + "="*60)
print("EXPERIMENT: HybridRecommender")
print("="*60 + "\n")

if genre_features is None:
    print("⚠ Genre features not available, skipping Hybrid")
    hybrid_result = {'model': 'HybridRecommender', 'val_wrmse': float('inf'), 'test_wrmse': float('inf')}
else:
    hybrid = HybridRecommender(
        n_factors=50,
        lr=0.005,
        reg=0.02,
        n_epochs=30,
        genre_features=genre_features,
        random_state=42,
        verbose=True
    )
    hybrid.fit(train_df)

    val_metrics = evaluate_recommender(hybrid, val_df, train_df)
    print(f"\nValidation: W-RMSE={val_metrics['weighted_rmse']:.6f}, RMSE={val_metrics['rmse']:.6f}")

    train_val_df = pd.concat([train_df, val_df], ignore_index=True)
    hybrid_final = HybridRecommender(
        n_factors=50, lr=0.005, reg=0.02, n_epochs=30,
        val_fraction=0.0,  # No internal validation
        genre_features=genre_features, random_state=42, verbose=True
    )
    hybrid_final.fit(train_val_df)

    test_metrics = evaluate_recommender(hybrid_final, test_df, train_val_df)
    print(f"Test: W-RMSE={test_metrics['weighted_rmse']:.6f}, RMSE={test_metrics['rmse']:.6f}")

    hybrid_result = {
        'model': 'HybridRecommender',
        'val_wrmse': val_metrics['weighted_rmse'],
        'test_wrmse': test_metrics['weighted_rmse'],
        'model_instance': hybrid_final
    }

print("\n✓ Hybrid experiment complete!")

## Experiment 6: Competition Ensemble (Best Shot!)

In [None]:
# COMPETITION ENSEMBLE EXPERIMENT
print("\n" + "="*60)
print("EXPERIMENT: CompetitionEnsemble")
print("="*60 + "\n")

if movies_df is None:
    print("⚠ Movies data not available, skipping Ensemble")
    ensemble_result = {'model': 'CompetitionEnsemble', 'val_wrmse': float('inf'), 'test_wrmse': float('inf')}
else:
    train_val_df = pd.concat([train_df, val_df], ignore_index=True)

    ensemble = CompetitionEnsemble(
        n_folds=3,  # Start with 3 for speed
        use_implicit=False,  # Disable for simplicity
        use_neural=False,    # Disable for simplicity  
        verbose=True,
        random_state=42
    )
    ensemble.fit(train_val_df, movies_df, tags_df, use_oof=True)

    # Evaluate on test
    pairs = list(zip(test_df['user_id'], test_df['movie_id']))
    preds = ensemble.predict_batch(pairs)

    test_wrmse = compute_weighted_rmse(
        test_df['rating'].values, preds, 
        test_df['movie_id'].values, train_val_df
    )
    test_rmse = compute_rmse(test_df['rating'].values, preds)

    print(f"\nTest: W-RMSE={test_wrmse:.6f}, RMSE={test_rmse:.6f}")

    ensemble_result = {
        'model': 'CompetitionEnsemble',
        'val_wrmse': test_wrmse,  # Using test as proxy
        'test_wrmse': test_wrmse,
        'model_instance': ensemble
    }

    # Show feature importance
    print("\n" + "="*60)
    print("TOP 20 MOST IMPORTANT FEATURES")
    print("="*60)
    print(ensemble.get_feature_importance().head(20).to_string(index=False))

print("\n✓ Ensemble experiment complete!")

---

## 📊 FINAL RESULTS SUMMARY

**Run this cell after completing your experiments to see the final comparison.**

---

In [None]:
# RESULTS SUMMARY
# Collect all results
all_results = []

# Add results from experiments that were run
if 'baseline_result' in globals():
    all_results.append(baseline_result)
if 'svd_result' in globals():
    all_results.append(svd_result)
if 'funksvd_result' in globals():
    all_results.append(funksvd_result)
if 'als_result' in globals():
    all_results.append(als_result)
if 'hybrid_result' in globals():
    all_results.append(hybrid_result)
if 'ensemble_result' in globals():
    all_results.append(ensemble_result)

if not all_results:
    print("⚠ No experiments completed yet. Run experiment cells above first.")
else:
    # Sort by test W-RMSE
    sorted_results = sorted(all_results, key=lambda x: x['test_wrmse'])

    print("\n" + "="*80)
    print(f"{'FINAL RESULTS SUMMARY':^80}")
    print("="*80 + "\n")

    print(f"{'Rank':<6}{'Model':<30}{'Val W-RMSE':<15}{'Test W-RMSE':<15}")
    print("-" * 65)

    for rank, r in enumerate(sorted_results, 1):
        marker = "🏆" if rank == 1 else "  "
        print(f"{marker} {rank:<4}{r['model']:<30}{r['val_wrmse']:<15.6f}{r['test_wrmse']:<15.6f}")

    print("\n" + "="*80)
    print(f"🏆 BEST MODEL: {sorted_results[0]['model']}")
    print(f"   Test W-RMSE: {sorted_results[0]['test_wrmse']:.6f}")

    # Improvement over baseline
    baseline_score = next((r['test_wrmse'] for r in all_results if r['model'] == 'BaselineRecommender'), None)
    if baseline_score:
        best_score = sorted_results[0]['test_wrmse']
        improvement = baseline_score - best_score
        pct_improvement = (improvement / baseline_score) * 100
        print(f"   Improvement over baseline: {improvement:.6f} ({pct_improvement:.2f}%)")
    
    print("\n" + "="*80)
    print("✓ All experiments complete!")
    print("="*80)

## 🎯 Generate Submission

**Use your best model to generate the submission file.**

In [None]:
# Generate submission with best model
# Change 'svd_final' to your best model (e.g., ensemble, hybrid_final, etc.)

if 'svd_final' in globals():
    best_model = svd_final  # Change this to your best model
    
    submission_df = loader.load_submission_template()
    pairs = list(zip(submission_df['user_id'], submission_df['movie_id']))
    predictions = np.clip(best_model.predict_batch(pairs), 0.5, 5.0)
    
    submission = pd.DataFrame({
        'id': submission_df['id'],
        'prediction': predictions
    })
    
    output_path = f"submission_{best_model.__class__.__name__}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
    submission.to_csv(output_path, index=False)
    
    print(f"✓ Submission saved to: {output_path}")
    print(f"  Mean: {predictions.mean():.4f}, Std: {predictions.std():.4f}")
    
    # Download in Colab
    try:
        from google.colab import files
        files.download(output_path)
    except:
        print(f"  File saved locally: {output_path}")
else:
    print("⚠ No model available yet. Run experiments first!")