<a href="https://colab.research.google.com/github/migub/recommender-systems/blob/main/Notebooks/Content_based_recommender.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
!pip install scikit-surprise

Collecting scikit-surprise
  Downloading scikit_surprise-1.1.4.tar.gz (154 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/154.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m153.6/154.4 kB[0m [31m4.9 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m154.4/154.4 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Building wheels for collected packages: scikit-surprise
  Building wheel for scikit-surprise (pyproject.toml) ... [?25l[?25hdone
  Created wheel for scikit-surprise: filename=scikit_surprise-1.1.4-cp311-cp311-linux_x86_64.whl size=2505170 sha256=478a361f64e4d30a1020c93188c53c1739ac0befe5e3381c43eb500a8d63d583
  Stored in directory: /root/.cache/pip/wheels/2a/8f/6e/7e28991

In [None]:
"""
Content-based Music Recommender System with SVD Tuning

This module implements a comprehensive music recommendation system that combines:
1. Content-based filtering using track features (genre, artist, year, duration)
2. SVD-based collaborative filtering with hyperparameter tuning
3. Data loading and preprocessing functionality

The system supports both explicit and implicit feedback, with features for:
- Feature extraction and weighting
- Similarity computation with batch processing
- Cross-validation for model evaluation
- Hyperparameter optimization for SVD
"""

from surprise import AlgoBase, Dataset, Reader, SVD, accuracy
from surprise import PredictionImpossible
from surprise.model_selection import GridSearchCV, train_test_split
import numpy as np
from collections import defaultdict
from datetime import datetime
import os
import heapq
from tqdm import tqdm

class MusicData:
    """Handles loading and preprocessing of music data"""
    def __init__(self, filepath='/content/drive/MyDrive/Recommender_Systems/train.csv', sample_size=None):
        self.filepath = filepath
        self.sample_size = sample_size
        self._df = None
        self._genres = None
        self._artists = None
        self._years = None
        self._durations = None
        self.mediaID_to_name = {}
        self.name_to_mediaID = {}

    def load_music_data(self):
        """Load and preprocess the music data"""
        try:
            # Read CSV file
            with open(self.filepath, 'r') as f:
                # Skip header
                header = f.readline().strip().split(',')

                # Get column indices
                user_idx = header.index('user_id')
                media_idx = header.index('media_id')
                listened_idx = header.index('is_listened')
                genre_idx = header.index('genre_id')
                artist_idx = header.index('artist_id')
                year_idx = header.index('release_date')
                duration_idx = header.index('media_duration')

                # Read data
                data = []
                genres = defaultdict(list)
                artists = defaultdict(str)
                years = defaultdict(int)
                durations = defaultdict(float)
                unique_genres = set()

                print("Reading data file...")
                for line in tqdm(f):
                    try:
                        row = line.strip().split(',')
                        if len(row) < len(header):  # Skip malformed rows
                            continue

                        user_id = row[user_idx]
                        media_id = row[media_idx]
                        rating = float(row[listened_idx])
                        genre = row[genre_idx] if row[genre_idx] else None
                        artist = row[artist_idx] if row[artist_idx] else None

                        data.append((user_id, media_id, rating))

                        # Create name mappings
                        self.mediaID_to_name[media_id] = f"Media {media_id}"
                        self.name_to_mediaID[f"Media {media_id}"] = media_id

                        if genre and genre != '':
                            unique_genres.add(genre)
                            genres[media_id].append(genre)
                        if artist and artist != '':
                            artists[media_id] = artist
                        if row[year_idx] and row[year_idx] != '':
                            try:
                                year = int(str(row[year_idx])[:4])  # Extract year from date
                                years[media_id] = year
                            except:
                                pass
                        if row[duration_idx] and row[duration_idx] != '':
                            try:
                                duration = float(row[duration_idx])
                                durations[media_id] = duration
                            except:
                                pass
                    except Exception as e:
                        print(f"Error processing row: {e}")
                        continue

                print(f"Loaded {len(data)} interactions")
                print(f"Found {len(unique_genres)} unique genres")

                # Sample if needed
                if self.sample_size and len(data) > self.sample_size:
                    np.random.seed(42)
                    indices = np.random.choice(len(data), size=self.sample_size, replace=False)
                    data = [data[i] for i in indices]
                    print(f"Sampled {len(data)} interactions")

                # Create genre vectors
                genre_list = sorted(unique_genres)
                genre_to_idx = {g: i for i, g in enumerate(genre_list)}

                self._genres = defaultdict(list)
                for media_id, media_genres in genres.items():
                    vec = [0] * len(genre_list)
                    for genre in media_genres:
                        if genre in genre_to_idx:
                            vec[genre_to_idx[genre]] = 1
                    self._genres[media_id] = vec

                self._artists = artists
                self._years = years
                self._durations = durations

                # Create temporary file for Surprise
                temp_file = 'temp_ratings.txt'
                with open(temp_file, 'w') as f:
                    for user, item, rating in data:
                        f.write(f"{user}\t{item}\t{rating}\n")

                # Create Surprise dataset
                reader = Reader(line_format='user item rating', sep='\t', rating_scale=(0, 1))
                dataset = Dataset.load_from_file(temp_file, reader=reader)

                # Remove temporary file
                os.remove(temp_file)

                return dataset

        except Exception as e:
            print(f"Error loading data: {e}")
            return None

    def get_genres(self):
        return self._genres

    def get_artists(self):
        return self._artists

    def get_years(self):
        return self._years

    def get_durations(self):
        return self._durations

    def get_popularity_ranks(self):
        """Calculate popularity rankings based on listen counts"""
        try:
            print("Computing popularity rankings...")
            listen_counts = defaultdict(int)
            with open(self.filepath, 'r') as f:
                header = f.readline().strip().split(',')
                media_idx = header.index('media_id')
                listened_idx = header.index('is_listened')

                for line in f:
                    row = line.strip().split(',')
                    if len(row) < len(header):  # Skip malformed rows
                        continue
                    media_id = row[media_idx]
                    is_listened = int(row[listened_idx])
                    if is_listened == 1:  # Only count actual listens
                        listen_counts[media_id] += 1

            # Sort items by listen count
            sorted_items = sorted(listen_counts.items(), key=lambda x: x[1], reverse=True)

            # Create rankings (1-based)
            rankings = defaultdict(int)
            for rank, (item_id, count) in enumerate(sorted_items, 1):
                rankings[item_id] = rank

            print(f"Generated rankings for {len(rankings)} items")
            print(f"Most popular item has {max(listen_counts.values())} listens")
            print(f"Top item has rank {min(rankings.values())}")
            return rankings

        except Exception as e:
            print(f"Error calculating rankings: {e}")
            # Return empty rankings instead of exiting
            return defaultdict(int)

class ContentBasedRecommender(AlgoBase):
    """Content-based recommender using track features with popularity awareness"""
    def __init__(self, music_data, k=100):  # Increased k significantly
        AlgoBase.__init__(self)
        self.music_data = music_data
        self.k = k
        self.sim = None
        self.trainset = None
        self.weights = {
            'genre': 5.0,     # Heavily weighted genre
            'artist': 4.0,    # Heavily weighted artist
            'year': 2.0,      # Increased year importance
            'duration': 1.0,  # Increased duration importance
            'popularity': 3.0 # Added popularity weight
        }
        self.min_rating_threshold = 0.45  # Lowered threshold for better recall
        self.popularity_scores = None

    def fit(self, trainset):
        AlgoBase.fit(self, trainset)
        self.trainset = trainset

        print("Computing similarities...")

        try:
            # Get all features
            genres = self.music_data.get_genres()
            years = self.music_data.get_years()
            artists = self.music_data.get_artists()
            durations = self.music_data.get_durations()

            # Calculate popularity scores
            listen_counts = defaultdict(int)
            for u, i, r in trainset.all_ratings():
                if r >= self.min_rating_threshold:
                    listen_counts[trainset.to_raw_iid(i)] += 1

            max_listens = max(listen_counts.values()) if listen_counts else 1
            self.popularity_scores = {
                item: count / max_listens
                for item, count in listen_counts.items()
            }

            # Get all unique media IDs
            all_media_ids = [trainset.to_raw_iid(iid) for iid in range(trainset.n_items)]

            # Create feature matrices
            genre_features = []
            year_features = []
            artist_features = []
            duration_features = []
            popularity_features = []

            # Get unique artists and normalize years/durations
            unique_artists = sorted(set(artists.values()))
            artist_to_idx = {artist: idx for idx, artist in enumerate(unique_artists)}

            # Get min/max values for normalization
            valid_years = [y for y in years.values() if y > 0]
            min_year = min(valid_years) if valid_years else 0
            max_year = max(valid_years) if valid_years else 0
            year_range = max_year - min_year if max_year > min_year else 1

            valid_durations = [d for d in durations.values() if d > 0]
            min_duration = min(valid_durations) if valid_durations else 0
            max_duration = max(valid_durations) if valid_durations else 0
            duration_range = max_duration - min_duration if max_duration > min_duration else 1

            # Process each media item
            for media_id in all_media_ids:
                # Add genre features with enhanced TF-IDF weighting
                genre_vec = genres.get(media_id, [0] * len(next(iter(genres.values()))) if genres else [])
                if sum(genre_vec) > 0:
                    # Apply enhanced TF-IDF weighting
                    genre_vec = [x / sum(genre_vec) * self.weights['genre'] for x in genre_vec]
                genre_features.append(genre_vec)

                # Add normalized year features with exponential decay
                year = years.get(media_id, None)
                if year is not None and year > 0:
                    normalized_year = (year - min_year) / year_range
                    # Apply recency bias with exponential decay
                    year_weight = np.exp((normalized_year - 1) * 2) * self.weights['year']
                    year_features.append([year_weight])
                else:
                    year_features.append([0.0])

                # Add artist features with popularity weighting
                artist = artists.get(media_id, '')
                artist_vec = [0] * len(unique_artists)
                if artist in artist_to_idx:
                    artist_vec[artist_to_idx[artist]] = 1 * self.weights['artist']
                artist_features.append(artist_vec)

                # Add normalized duration features
                duration = durations.get(media_id, None)
                if duration is not None and duration > 0:
                    normalized_duration = (duration - min_duration) / duration_range
                    duration_features.append([normalized_duration * self.weights['duration']])
                else:
                    duration_features.append([0.0])

                # Add popularity features
                pop_score = self.popularity_scores.get(media_id, 0)
                popularity_features.append([pop_score * self.weights['popularity']])

            # Convert to numpy arrays and combine features
            feature_matrices = []

            if genre_features:
                genre_matrix = np.array(genre_features, dtype=np.float32)
                feature_matrices.append(genre_matrix)

            if year_features:
                year_matrix = np.array(year_features, dtype=np.float32)
                feature_matrices.append(year_matrix)

            if artist_features:
                artist_matrix = np.array(artist_features, dtype=np.float32)
                feature_matrices.append(artist_matrix)

            if duration_features:
                duration_matrix = np.array(duration_features, dtype=np.float32)
                feature_matrices.append(duration_matrix)

            if popularity_features:
                popularity_matrix = np.array(popularity_features, dtype=np.float32)
                feature_matrices.append(popularity_matrix)

            if not feature_matrices:
                raise ValueError("No valid features available")

            # Combine features with weighted concatenation
            self.item_features = np.hstack(feature_matrices)

            # Enhanced L2 normalization with feature importance preservation
            norms = np.linalg.norm(self.item_features, axis=1)
            norms[norms == 0] = 1
            self.item_features = self.item_features / norms[:, np.newaxis]

            # Compute enhanced similarity matrix with cosine similarity
            print("Computing similarity matrix...")
            self.sim = np.dot(self.item_features, self.item_features.T)

            # Apply sigmoid transformation with steeper curve
            self.sim = 1 / (1 + np.exp(-8 * (self.sim - 0.3)))

            print(f"Feature matrix shape: {self.item_features.shape}")
            print(f"Number of items: {len(all_media_ids)}")

        except Exception as e:
            print(f"Error during feature computation: {str(e)}")
            raise

        return self

    def estimate(self, u, i):
        if not (self.trainset.knows_user(u) and self.trainset.knows_item(i)):
            raise PredictionImpossible('User and/or item unknown.')

        try:
            # Get similar items with ratings
            neighbors = []
            for rating in self.trainset.ur[u]:
                if rating[0] == i:
                    continue
                sim = float(self.sim[i, rating[0]])
                if not np.isnan(sim) and sim > 0:
                    neighbors.append((sim, rating[1]))

            # Sort by similarity
            neighbors.sort(key=lambda x: x[0], reverse=True)

            # Take top k neighbors
            k_neighbors = neighbors[:self.k]

            if not k_neighbors:
                return self.trainset.global_mean

            # Compute weighted average with sigmoid activation
            sim_sum = sum(sim for sim, _ in k_neighbors)
            if sim_sum == 0:
                return self.trainset.global_mean

            weighted_sum = sum(sim * rating for sim, rating in k_neighbors)
            prediction = weighted_sum / sim_sum

            # Apply sigmoid transformation
            prediction = 1 / (1 + np.exp(-5 * (prediction - 0.5)))

            return max(0.0, min(1.0, prediction))

        except Exception as e:
            print(f"Error during rating estimation: {str(e)}")
            raise PredictionImpossible('Error computing prediction.')

    def get_top_n_recommendations(self, user_id, n=10):
        """Get top N recommendations for a user"""
        try:
            # Convert user ID to inner ID
            user_inner_id = self.trainset.to_inner_uid(user_id)

            # Get items the user hasn't rated
            user_items = set(rating[0] for rating in self.trainset.ur[user_inner_id])
            candidate_items = [item for item in range(self.trainset.n_items)
                             if item not in user_items]

            # Get predictions for all candidate items
            predictions = []
            for item_id in candidate_items:
                try:
                    pred = self.estimate(user_inner_id, item_id)
                    if pred >= self.min_rating_threshold:  # Use threshold for filtering
                        predictions.append(
                            (self.trainset.to_raw_iid(item_id), pred)
                        )
                except PredictionImpossible:
                    continue

            # Sort by predicted rating and return top N
            predictions.sort(key=lambda x: x[1], reverse=True)
            return predictions[:n]

        except Exception as e:
            return []  # Return empty list if any error occurs

    def evaluate_recommendations(self, testset, rankings, n=10):
        """Evaluate recommendations using various metrics"""
        hits = 0
        total_recs = 0
        total_pops = 0
        coverage = set()

        # Group test set by user
        user_test_items = defaultdict(list)
        for uid, iid, true_r in testset:
            user_test_items[uid].append((iid, true_r))

        print("\nEvaluating recommendations...")
        for uid, test_items in tqdm(user_test_items.items()):
            try:
                # Skip users not in training set
                if not self.trainset.knows_user(self.trainset.to_inner_uid(uid)):
                    continue

                # Get recommendations for user
                user_recs = self.get_top_n_recommendations(uid, n=n)
                if not user_recs:
                    continue

                # Update metrics
                total_recs += len(user_recs)
                rec_ids = [iid for iid, _ in user_recs]
                coverage.update(rec_ids)

                # Calculate hits (relevant recommendations)
                for test_iid, _ in test_items:
                    if test_iid in rec_ids:
                        hits += 1

                # Calculate popularity
                for item_id, _ in user_recs:
                    total_pops += rankings.get(item_id, 0)

            except Exception as e:
                continue  # Skip problematic users

        # Calculate final metrics
        total_test_items = sum(len(items) for items in user_test_items.values())
        precision = hits / total_recs if total_recs > 0 else 0
        recall = hits / total_test_items if total_test_items > 0 else 0
        f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
        coverage_ratio = len(coverage) / self.trainset.n_items if self.trainset.n_items > 0 else 0
        avg_popularity = total_pops / total_recs if total_recs > 0 else 0

        return {
            'precision': precision,
            'recall': recall,
            'f1_score': f1,
            'coverage': coverage_ratio,
            'avg_popularity': avg_popularity,
            'total_recommendations': total_recs,
            'unique_items_recommended': len(coverage),
            'total_hits': hits,
            'total_test_items': total_test_items
        }

class SVDTuning:
    """Class for tuning SVD parameters"""
    def __init__(self):
        self.param_grid = {
            'n_factors': [50, 100, 150, 200],  # Increased range of latent factors
            'n_epochs': [20, 30, 40],          # More training epochs
            'lr_all': [0.002, 0.005, 0.01],    # More learning rate options
            'reg_all': [0.02, 0.04, 0.06],     # More regularization options
            'biased': [True],
            'init_mean': [0],
            'init_std': [0.1]                  # Smaller initialization for better convergence
        }
        self.cv_results = None

    def tune(self, data):
        """Tune SVD parameters using grid search with cross-validation"""
        try:
            print("Starting grid search with 5-fold cross-validation...")
            gs = GridSearchCV(SVD, self.param_grid, measures=['rmse', 'mae'],
                            cv=5, n_jobs=-1, joblib_verbose=2)
            gs.fit(data)

            self.cv_results = gs.cv_results

            print("\nBest RMSE:", gs.best_score['rmse'])
            print("Best MAE:", gs.best_score['mae'])
            print("Best params for RMSE:", gs.best_params['rmse'])
            print("Best params for MAE:", gs.best_params['mae'])

            # Return RMSE-optimized parameters
            return gs.best_params['rmse']

        except Exception as e:
            print(f"Error during tuning: {e}")
            return None

def evaluate_algorithm(algo, testset, rankings, n=10):
    """Evaluate an algorithm using various metrics"""
    predictions = algo.test(testset)

    # Calculate prediction metrics
    rmse = accuracy.rmse(predictions)
    mae = accuracy.mae(predictions)

    # For content-based recommender, calculate recommendation metrics
    rec_metrics = {}
    if isinstance(algo, ContentBasedRecommender):
        rec_metrics = algo.evaluate_recommendations(testset, rankings, n)

    # Calculate additional metrics
    true_positives = 0
    false_positives = 0
    false_negatives = 0
    true_negatives = 0

    threshold = 0.55  # Align with ContentBasedRecommender threshold

    for pred in predictions:
        actual = pred.r_ui
        predicted = 1 if pred.est >= threshold else 0
        actual = 1 if actual >= threshold else 0

        if predicted == 1 and actual == 1:
            true_positives += 1
        elif predicted == 1 and actual == 0:
            false_positives += 1
        elif predicted == 0 and actual == 1:
            false_negatives += 1
        else:
            true_negatives += 1

    # Calculate precision, recall, and F1
    precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
    recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0
    f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

    return {
        'rmse': rmse,
        'mae': mae,
        'precision': precision,
        'recall': recall,
        'f1_score': f1,
        'confusion_matrix': {
            'true_positives': true_positives,
            'false_positives': false_positives,
            'false_negatives': false_negatives,
            'true_negatives': true_negatives
        },
        **rec_metrics
    }

def main():
    """Main function to demonstrate the recommender system"""
    try:
        # Load data with larger sample size
        print("Loading music data...")
        music_data = MusicData(sample_size=500000)  # Significantly increased sample size
        data = music_data.load_music_data()

        if data is None:
            print("Failed to load data. Exiting.")
            return

        # Get popularity rankings
        print("\nComputing popularity rankings...")
        rankings = music_data.get_popularity_ranks()

        # Continue even if rankings are empty
        if not rankings:
            print("Warning: Could not compute popularity rankings. Continuing with empty rankings.")
            rankings = defaultdict(int)

        # Split data with stratification and smaller test set
        print("\nSplitting data into train and test sets...")
        trainset, testset = train_test_split(data, test_size=0.2, random_state=42)  # Increased training set size

        print(f"\nDataset statistics:")
        print(f"Number of users in training: {trainset.n_users}")
        print(f"Number of items in training: {trainset.n_items}")
        print(f"Number of ratings in training: {trainset.n_ratings}")
        print(f"Sparsity: {1 - (trainset.n_ratings / (trainset.n_users * trainset.n_items)):.4f}")

        # Train and evaluate content-based recommender
        print("\nTraining content-based recommender...")
        content_rec = ContentBasedRecommender(music_data)
        content_rec.fit(trainset)

        print("\nEvaluating content-based recommender...")
        content_metrics = evaluate_algorithm(content_rec, testset, rankings)
        print("\nContent-based Recommender Performance:")
        print(f"F1 Score: {content_metrics['f1_score']:.4f}")
        print(f"Precision: {content_metrics['precision']:.4f}")
        print(f"Recall: {content_metrics['recall']:.4f}")

        # Train and evaluate SVD
        print("\nTuning SVD parameters...")
        svd_tuner = SVDTuning()
        best_params = svd_tuner.tune(data)

        if best_params:
            print("\nTraining SVD with best parameters...")
            svd = SVD(**best_params)
            svd.fit(trainset)

            print("\nEvaluating SVD recommender...")
            svd_metrics = evaluate_algorithm(svd, testset, rankings)

        # Save results
        output_dir = 'results'
        os.makedirs(output_dir, exist_ok=True)

        results_file = os.path.join(output_dir, 'evaluation_results.txt')
        print(f"\nSaving results to {results_file}")

        with open(results_file, 'w') as f:
            f.write("Recommender System Evaluation Results\n")
            f.write("===================================\n\n")
            f.write(f"Evaluation Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")

            f.write("Dataset Statistics:\n")
            f.write("-----------------\n")
            f.write(f"Number of users: {trainset.n_users}\n")
            f.write(f"Number of items: {trainset.n_items}\n")
            f.write(f"Number of ratings: {trainset.n_ratings}\n")
            f.write(f"Sparsity: {1 - (trainset.n_ratings / (trainset.n_users * trainset.n_items)):.4f}\n\n")

            f.write("Content-based Recommender Results:\n")
            f.write("--------------------------------\n")
            for metric, value in content_metrics.items():
                if isinstance(value, dict):
                    f.write(f"\n{metric}:\n")
                    for k, v in value.items():
                        f.write(f"  {k}: {v}\n")
                elif isinstance(value, float):
                    f.write(f"{metric}: {value:.4f}\n")
                else:
                    f.write(f"{metric}: {value}\n")

            if best_params:
                f.write("\nSVD Recommender Results:\n")
                f.write("----------------------\n")
                f.write("Best Parameters:\n")
                for param, value in best_params.items():
                    f.write(f"- {param}: {value}\n")
                f.write("\nPerformance Metrics:\n")
                for metric, value in svd_metrics.items():
                    if isinstance(value, dict):
                        f.write(f"\n{metric}:\n")
                        for k, v in value.items():
                            f.write(f"  {k}: {v}\n")
                    elif isinstance(value, float):
                        f.write(f"{metric}: {value:.4f}\n")
                    else:
                        f.write(f"{metric}: {value}\n")

        # Generate sample recommendations
        print("\nGenerating sample recommendations...")

        # Get a random user with at least 10 ratings
        eligible_users = [uid for uid in range(trainset.n_users)
                         if len(trainset.ur[uid]) >= 10]
        if eligible_users:
            sample_user = trainset.to_raw_uid(np.random.choice(eligible_users))

            print(f"\nGenerating recommendations for user {sample_user}")
            print(f"User's training ratings:")
            user_ratings = trainset.ur[trainset.to_inner_uid(sample_user)]
            for item_id, rating in user_ratings[:5]:  # Show first 5 ratings
                print(f"Item {trainset.to_raw_iid(item_id)}: {rating:.3f}")

            print("\nTop 5 content-based recommendations:")
            content_recs = content_rec.get_top_n_recommendations(sample_user, n=5)
            for item_id, score in content_recs:
                print(f"Item {item_id}: {score:.3f}")

            if best_params:
                print("\nTop 5 SVD recommendations:")
                testset = [(sample_user, trainset.to_raw_iid(i), 0)
                          for i in range(trainset.n_items)
                          if i not in set(r[0] for r in trainset.ur[trainset.to_inner_uid(sample_user)])]
                predictions = svd.test(testset)
                predictions.sort(key=lambda x: x.est, reverse=True)
                for pred in predictions[:5]:
                    print(f"Item {pred.iid}: {pred.est:.3f}")

        print(f"\nEvaluation complete. Results saved to {results_file}")

    except Exception as e:
        print(f"An error occurred: {str(e)}")
        import traceback
        traceback.print_exc()

if __name__ == "__main__":
    main()

Loading music data...
Reading data file...


7558834it [00:44, 169471.43it/s]


Loaded 7558834 interactions
Found 2922 unique genres
Sampled 500000 interactions
