<a href="https://colab.research.google.com/github/mkuangdotcom/Music_Recommendation_System/blob/main/User_Centric_Approach_to__Music_Recommendations.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Problem Statement

Music streaming platforms need to recommend relevant songs to users from millions of tracks.
This project implements and compares three recommendation approaches:

1. Baseline: Popularity-based recommendations (recommend top songs to everyone)
2. ALS (Alternating Least Squares): Advanced collaborative filtering
3. Factorization Machines: Machine learning with user features

In this project, we use the Last.fm 360K dataset, scaled to 10,000 users and 5,000 artists for computational efficiency.

Dataset: http://ocelma.net/MusicRecommendationDataset/lastfm-360K.html (depreated)

Download link: https://zenodo.org/record/6090214/files/lastfm-dataset-360K.tar.gz

## Data Loading

In [None]:
!curl -L -o lastfm-dataset-360K.tar.gz https://zenodo.org/records/6090214/files/lastfm-dataset-360K.tar.gz

# Extract
!tar -xzf lastfm-dataset-360K.tar.gz

# Remove tar file
!rm lastfm-dataset-360K.tar.gz

data_path = 'lastfm-dataset-360K/usersha1-artmbid-artname-plays.tsv'

In [None]:
import pandas as pd

# Load
df = pd.read_csv(data_path, sep='\t', header=None,
                 names=['user_id', 'artist_id', 'artist_name', 'play_count'])

print(f"Total interactions: {len(df):,}")
print(f"Unique users: {df['user_id'].nunique():,}")
print(f"Unique artists: {df['artist_id'].nunique():,}")
print(f"Play count range: {df['play_count'].min()} - {df['play_count'].max()}")

df.head()

## Visualize and Scale the Data

Visualizations of user play counts, artist popularity, and user activity distributions.

In [None]:
# User activity analysis
user_activity = df.groupby('user_id').agg({
    'artist_id': 'count',      # number of different artists
    'play_count': 'sum'        # total plays
}).rename(columns={'artist_id': 'num_artists', 'play_count': 'total_plays'})

print("User Activity Stats:")
print(f"Average artists per user: {user_activity['num_artists'].mean():.1f}")
print(f"Median artists per user: {user_activity['num_artists'].median()}")
print(f"Most active user: {user_activity['num_artists'].max()} artists")
print(f"Least active user: {user_activity['num_artists'].min()} artists")

# Artist popularity analysis
artist_popularity = df.groupby('artist_id').agg({
    'user_id': 'count',        # number of different users
    'play_count': 'sum'        # total plays
}).rename(columns={'user_id': 'num_listeners', 'play_count': 'total_plays'})

print("\nArtist Popularity Stats:")
print(f"Average listeners per artist: {artist_popularity['num_listeners'].mean():.1f}")
print(f"Median listeners per artist: {artist_popularity['num_listeners'].median()}")
print(f"Most popular artist: {artist_popularity['num_listeners'].max()} listeners")
print(f"Least popular artist: {artist_popularity['num_listeners'].min()} listeners")

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

# Calculate user activity (number of unique artists per user)
user_activity = df.groupby('user_id')['artist_id'].nunique().reset_index(name='num_artists')

# Calculate artist popularity (number of unique listeners per artist)
artist_popularity = df.groupby('artist_id')['user_id'].nunique().reset_index(name='num_listeners')

# Visualize play count distribution
plt.figure(figsize=(6, 6))
sns.histplot(df['play_count'], bins=50, kde=False, color='skyblue')
plt.title('Distribution of Play Counts')
plt.xlabel('Play Count')
plt.ylabel('Frequency')
plt.yscale('log')
plt.show()

# Visualize user activity distribution
plt.figure(figsize=(6, 6))
sns.histplot(user_activity['num_artists'], bins=50, kde=False, color='lightgreen')
plt.title('Distribution of Number of Artists per User')
plt.xlabel('Number of Artists')
plt.ylabel('Number of Users')
plt.yscale('log')
plt.show()

# Visualize artist popularity distribution
plt.figure(figsize=(6, 6))
sns.histplot(artist_popularity['num_listeners'], bins=50, kde=False, color='salmon')
plt.title('Distribution of Number of Listeners per Artist')
plt.xlabel('Number of Listeners')
plt.ylabel('Number of Artists')
plt.yscale('log')
plt.show()

## Filter and Scale

In [None]:
# Filter users with at least 5 unique artists
user_counts = df.groupby('user_id').size()
active_users = user_counts[user_counts >= 5].index
df_filtered = df[df['user_id'].isin(active_users)]

# Filter artists with at least 5 unique listeners
artist_counts = df_filtered.groupby('artist_id').size()
popular_artists = artist_counts[artist_counts >= 5].index
df_filtered = df_filtered[df_filtered['artist_id'].isin(popular_artists)]

# Scale down to the top 20,000 users
top_users = df_filtered['user_id'].value_counts().head(20000).index
df_scaled = df_filtered[df_filtered['user_id'].isin(top_users)]


print(f"Final scaled data: {len(df_scaled):,} interactions")
print(f"Users: {df_scaled['user_id'].nunique():,}")
print(f"Artists: {df_scaled['artist_id'].nunique():,}")

## Preprocessing

In [None]:
from scipy.sparse import csr_matrix

# Create user and artist mappings
user_mapping = {user: idx for idx, user in enumerate(df_scaled['user_id'].unique())}
artist_mapping = {artist: idx for idx, artist in enumerate(df_scaled['artist_id'].unique())}

# Map IDs to indices
user_mapping = {user: idx for idx, user in enumerate(df_scaled['user_id'].unique())}
artist_mapping = {artist: idx for idx, artist in enumerate(df_scaled['artist_id'].unique())}

df_scaled['user_idx'] = df_scaled['user_id'].map(user_mapping)
df_scaled['artist_idx'] = df_scaled['artist_id'].map(artist_mapping)

# Create sparse matrix
rows = df_scaled['user_idx'].values
cols = df_scaled['artist_idx'].values
data = df_scaled['play_count'].values
sparse_matrix = csr_matrix((data, (rows, cols)),
                           shape=(len(user_mapping), len(artist_mapping)))

# Sparse Matrix Statistics
print(f"\nSparse Matrix Statistics:")
print(f"Shape: {sparse_matrix.shape}")
print(f"Total possible entries: {sparse_matrix.shape[0] * sparse_matrix.shape[1]:,}")
print(f"Non-zero entries: {sparse_matrix.nnz:,}")
sparsity_percent = (1 - sparse_matrix.nnz / (sparse_matrix.shape[0] * sparse_matrix.shape[1])) * 100
print(f"Sparsity: {sparsity_percent:.2f}%")
print(f"Density: {100 - sparsity_percent:.2f}%")

## Train/Test Split

In [None]:
import numpy as np

# Split into train and test sets
train_matrix = sparse_matrix.copy()
test_interactions = []

test_ratio = 0.2 # 20% for testing
min_interactions = 5  # only consider users with at least 5 interactions

for user_idx in range(sparse_matrix.shape[0]):
    user_interactions = sparse_matrix.getrow(user_idx)
    nonzero_items = user_interactions.nonzero()[1]

    if len(nonzero_items) >= min_interactions:
        n_test = max(1, int(len(nonzero_items) * test_ratio))
        test_items = np.random.choice(nonzero_items, n_test, replace=False)

        for item_idx in test_items:
            test_interactions.append({
                'user_idx': user_idx,
                'item_idx': item_idx,
                'rating': sparse_matrix[user_idx, item_idx]
            })
            train_matrix[user_idx, item_idx] = 0

train_matrix.eliminate_zeros()

print(f"\nTrain Matrix Statistics:")
print(f"Shape: {train_matrix.shape}")
print(f"Total possible entries: {train_matrix.shape[0] * train_matrix.shape[1]:,}")
print(f"Non-zero entries: {train_matrix.nnz:,}")
train_sparsity = (1 - train_matrix.nnz / (train_matrix.shape[0] * train_matrix.shape[1])) * 100
print(f"Train set sparsity: {train_sparsity:.2f}%")

In [None]:
# Check for data leakage
assert train_matrix.nnz + len(test_interactions) == sparse_matrix.nnz, "Data leakage detected"

# Check sparsity
train_sparsity = (1 - train_matrix.nnz / (train_matrix.shape[0] * train_matrix.shape[1])) * 100
print(f"Train set sparsity: {train_sparsity:.2f}%")

# Check for cold-start users
train_user_counts = np.array(train_matrix.sum(axis=1)).flatten()
cold_start_users = np.sum(train_user_counts == 0)
print(f"Cold-start users: {cold_start_users}")

## Model Implementation and Training

We'll implement two advanced recommendation approaches:
1. **Baseline**: Popularity-based recommendations for comparison
2. **ALS (Alternating Least Squares)**: Matrix factorization optimized for implicit feedback

### Baseline Model: Popularity-Based Recommendations

First, let's establish our baseline by identifying the most popular artists based on total play counts across all training users.

In [None]:
# Calculate artist popularity from training data
artist_popularity = np.array(train_matrix.sum(axis=0)).flatten()

# Get top artists for baseline recommendations
n_recommendations = 10
top_artists_baseline = np.argsort(artist_popularity)[::-1][:n_recommendations]

print("Baseline Model - Most Popular Artists:")
for i, artist_idx in enumerate(top_artists_baseline):
    plays = artist_popularity[artist_idx]
    print(f"  {i+1}. Artist {artist_idx}: {plays:,.0f} total plays")

### ALS Model: Collaborative Filtering

ALS (Alternating Least Squares) is a matrix factorization technique that learns latent user and item factors by alternately optimizing user and item representations. It's particularly well-suited for implicit feedback data like play counts.

Key advantages:
- Handles sparse matrices efficiently
- Optimized for implicit feedback (play counts vs explicit ratings)
- Scalable to large datasets

In [None]:
from implicit.als import AlternatingLeastSquares
from tqdm import tqdm

def evaluate(model, test_sample, k=10):
    """
      Quick evaluation on sample of test data for hyperparameter search.

      Args:
          model: Trained ALS model
          test_sample (list): Test interactions
          k (int): Number of recommendations

      Returns:
          float: Average precision@k across sampled users
    """
    precisions = []

    # Sample 1000 users for fast evaluation
    user_test_items = {}
    for interaction in test_sample:
        user_idx = interaction['user_idx']
        if user_idx not in user_test_items:
            user_test_items[user_idx] = []
        user_test_items[user_idx].append(interaction['item_idx'])
        if len(user_test_items) >= 1000:
            break

    for user_idx, true_items in user_test_items.items():
        try:
            user_items = train_matrix[user_idx].tocsr()
            recs, _ = model.recommend(userid=user_idx, user_items=user_items, N=k)
            hits = len(set(recs) & set(true_items))
            precisions.append(hits / k)
        except:
            precisions.append(0)

    return np.mean(precisions)

# Parameter search space
param_space = {
    'factors': [30, 50, 70, 100],
    'regularization': [0.001, 0.01, 0.05, 0.1],
    'iterations': [10, 15, 20],
    'alpha': [0.5, 1.0, 2.0, 5.0]
}

# Random search configuration
n_trials = 12
results = []

print(f"Random Hyperparameter Search: {n_trials} trials")
print("-" * 60)

for trial in tqdm(range(n_trials), desc="Searching hyperparameters", ncols=80):
    # Sample random parameters
    params = {
        'factors': np.random.choice(param_space['factors']),
        'regularization': np.random.choice(param_space['regularization']),
        'iterations': np.random.choice(param_space['iterations']),
        'alpha': np.random.choice(param_space['alpha']),
        'random_state': 42
    }

    # Train and evaluate
    model = AlternatingLeastSquares(**params)
    model.fit(train_matrix.tocsr(), show_progress=False)
    precision = evaluate(model, test_interactions, k=10)

    tqdm.write(f"Trial {trial+1:2d}: P@10={precision:.4f} | "
               f"f={params['factors']:3d} r={params['regularization']:.3f} "
               f"i={params['iterations']:2d} a={params['alpha']:.1f}")

    results.append({
        **params,
        'precision': precision,
    })

# Results summary
print("\n" + "=" * 60)
results_df = pd.DataFrame(results).sort_values('precision', ascending=False)
print("Top 3 Configurations:")
print(results_df[['factors', 'regularization', 'iterations', 'alpha', 'precision']].head(3).to_string(index=False))

# Best parameters
best = results_df.iloc[0]
print("\n" + "=" * 60)
print("Best Parameters:")
print(f"  factors={int(best['factors'])}, regularization={best['regularization']}, "
      f"iterations={int(best['iterations'])}, alpha={best['alpha']}")
print(f"  Validation Precision@10: {best['precision']:.4f}")
print("=" * 60)

# Train final model with best parameters
als_model = AlternatingLeastSquares(
    factors=int(best['factors']),
    regularization=best['regularization'],
    iterations=int(best['iterations']),
    alpha=best['alpha'],
    random_state=42
)
als_model.fit(train_matrix.tocsr(), show_progress=True)
print("Training complete.")

## Recommendation Generation Functions

We'll implement functions to generate recommendations from both our baseline and ALS models, then evaluate their performance using standard recommendation metrics.

In [None]:
def get_baseline_recommendations(k=10):
    """Return top K popular artists."""
    return top_artists_baseline[:k]

def get_als_recommendations(user_idx, k=10):
    """
    Get personalized ALS recommendations for a user.

    Args:
        user_idx: user index (0 to n_users-1)
        k: number of recommendations

    Returns:
        artist_indices: array of recommended artist IDs
        scores: prediction scores
    """
    try:
        # Get user's listening history (for filtering)
        user_items = train_matrix[user_idx].tocsr()

        recs, scores = als_model.recommend(
            userid=user_idx,
            user_items=user_items,
            N=k,
            filter_already_liked_items=True
        )
        return recs, scores

    except Exception as e:
        print(f"Error getting recommendations for user {user_idx}: {e}")
        # Fallback to baseline
        return np.array([]), np.array([])

# Test the functions
print("Testing recommendation functions...")
test_user = 0
baseline_recs = get_baseline_recommendations(5)
als_recs, als_scores = get_als_recommendations(test_user, 5)

print(f"Baseline recommendations: {baseline_recs}")
print(f"ALS recommendations for user {test_user}: {als_recs}")
print(f"ALS scores: {als_scores}")

## Model Evaluation and Comparison

Now we'll evaluate both models on our test set and compare their performance across multiple metrics.

In [None]:
def evaluate_recommendations(model_name, get_recs_func, test_interactions, k=10):
    """
    Evaluate recommendation quality using multiple metrics.

    Args:
        model_name (str): Name of the model being evaluated
        get_recs_func (function): Function that generates recommendations
                                   For baseline: takes only k
                                   For other models: takes (user_idx, k)
        test_interactions (list): List of test interaction dictionaries with keys:
                                  'user_idx', 'item_idx', 'rating'
        k (int): Number of recommendations to evaluate (default: 10)

    Returns:
        dict: Dictionary containing evaluation metrics:
              - precision: Average precision@k across all users
              - recall: Average recall@k across all users
              - ndcg: Average NDCG@k across all users
              - coverage: Fraction of items recommended at least once
              - name: Model name string
    """
    print(f"\nEvaluating {model_name}...")

    # Group test items by user
    user_test_items = {}
    for interaction in test_interactions:
        user_idx = interaction['user_idx']
        artist_idx = interaction['item_idx']
        if user_idx not in user_test_items:
            user_test_items[user_idx] = []
        user_test_items[user_idx].append(artist_idx)

    print(f"  Evaluating on {len(user_test_items)} users")

    # Initialize metric lists
    precisions = []
    recalls = []
    ndcgs = []
    all_recommended = set()

    for user_idx, true_items in user_test_items.items():
        # Get recommendations based on model type
        if model_name == "Baseline":
            recs = get_recs_func(k)
        else:
            recs, _ = get_recs_func(user_idx, k)

        # Ensure recommendations are valid indices
        recs = np.array(recs)
        valid_recs = recs[recs < train_matrix.shape[1]]

        # Calculate hits
        rec_set = set(valid_recs)
        true_set = set(true_items)
        hits = len(rec_set & true_set)

        # Precision@k: fraction of recommendations that are relevant
        precision = hits / k if k > 0 else 0
        precisions.append(precision)

        # Recall@k: fraction of relevant items that are recommended
        recall = hits / len(true_set) if len(true_set) > 0 else 0
        recalls.append(recall)

        # NDCG@k: ranking quality metric (relevant items at top positions score higher)
        dcg = 0.0
        for i, item in enumerate(valid_recs[:k]):
            if item in true_set:
                dcg += 1.0 / np.log2(i + 2)

        idcg = 0.0
        for i in range(min(len(true_set), k)):
            idcg += 1.0 / np.log2(i + 2)

        ndcg = dcg / idcg if idcg > 0 else 0.0
        ndcgs.append(ndcg)

        # Track all recommended items for coverage
        all_recommended.update(valid_recs)

    # Calculate average metrics
    avg_precision = np.mean(precisions)
    avg_recall = np.mean(recalls)
    avg_ndcg = np.mean(ndcgs)
    coverage = len(all_recommended) / train_matrix.shape[1]

    print(f"  Precision@{k}: {avg_precision:.4f}")
    print(f"  Recall@{k}: {avg_recall:.4f}")
    print(f"  NDCG@{k}: {avg_ndcg:.4f}")
    print(f"  Coverage: {coverage:.4f}")

    return {
        'precision': avg_precision,
        'recall': avg_recall,
        'ndcg': avg_ndcg,
        'coverage': coverage,
        'name': model_name
    }

In [None]:
# Evaluate baseline
baseline_results = evaluate_recommendations(
    "Baseline",
    get_baseline_recommendations,
    test_interactions,
    k=10
)

# Evaluate ALS
als_results = evaluate_recommendations(
    "ALS",
    get_als_recommendations,
    test_interactions,
    k=10
)

# Compare results
print(f"\n{'Metric':<15} {'Baseline':<12} {'ALS':<12} {'Improvement'}")
print("-"*60)
for metric in ['precision', 'recall', 'ndcg', 'coverage']:
    base = baseline_results[metric]
    als = als_results[metric]
    improvement = ((als - base) / base * 100) if base > 0 else 0
    print(f"{metric.capitalize():<15} {base:<12.4f} {als:<12.4f} {improvement:>+7.1f}%")

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Performance comparison
metrics = ['Precision@10', 'Recall@10', 'NDCG@10', 'Coverage']
baseline_vals = [baseline_results['precision'], baseline_results['recall'], baseline_results['ndcg'], baseline_results['coverage']]
als_vals = [als_results['precision'], als_results['recall'], als_results['ndcg'], als_results['coverage']]


x = np.arange(len(metrics))
width = 0.35

axes[0].bar(x - width/2, baseline_vals, width, label='Baseline', alpha=0.8, color='skyblue')
axes[0].bar(x + width/2, als_vals, width, label='ALS Model', alpha=0.8, color='lightcoral')
axes[0].set_ylabel('Score')
axes[0].set_title('Model Performance Comparison')
axes[0].set_xticks(x)
axes[0].set_xticklabels(metrics)
axes[0].legend()
axes[0].grid(axis='y', alpha=0.3)

# Improvement percentages
improvements = []
for base, als in zip(baseline_vals, als_vals):
    imp = ((als - base) / base * 100) if base > 0 else 0
    improvements.append(imp)

colors = ['green' if x > 0 else 'red' for x in improvements]
bars = axes[1].bar(metrics, improvements, color=colors, alpha=0.7)
axes[1].set_ylabel('Improvement (%)')
axes[1].set_title('ALS vs Baseline Improvement')
axes[1].axhline(y=0, color='black', linestyle='-', linewidth=0.8)
axes[1].grid(axis='y', alpha=0.3)

# Add value labels
for bar, val in zip(bars, improvements):
    height = bar.get_height()
    axes[1].text(bar.get_x() + bar.get_width()/2., height + (2 if height > 0 else -5),
                f'{val:.1f}%', ha='center', va='bottom' if height > 0 else 'top')

plt.tight_layout()
plt.show()

## Sample Recommendations Analysis

Let's examine actual recommendations for a sample user to understand the difference between baseline and collaborative filtering approaches.

In [None]:
sample_user = 0

# User's profile
user_artists = train_matrix[sample_user].nonzero()[1]
total_plays = train_matrix[sample_user].sum()

print(f"Sample User {sample_user} Profile:")
print(f"  Artists listened to: {len(user_artists)}")
print(f"  Total plays: {total_plays}")

# Get recommendations
baseline_recs = get_baseline_recommendations(5)
als_recs, als_scores = get_als_recommendations(sample_user, 5)

print(f"\nBaseline Recommendations (Same for Everyone):")
for i, artist_idx in enumerate(baseline_recs):
    plays = int(artist_popularity[artist_idx])
    print(f"  {i+1}. Artist {artist_idx} ({plays:,} total plays)")

print(f"\nALS Recommendations (Personalized for User {sample_user}):")
for i, (artist_idx, score) in enumerate(zip(als_recs, als_scores)):
    print(f"  {i+1}. Artist {artist_idx} (score: {score:.3f})")

# Check against test set
user_test_items = [item['item_idx'] for item in test_interactions if item['user_idx'] == sample_user]
if user_test_items:
    baseline_hits = len(set(baseline_recs) & set(user_test_items))
    als_hits = len(set(als_recs) & set(user_test_items))
    print(f"\nActual Relevance (Test Set Hits):")
    print(f"  Baseline: {baseline_hits}/5")
    print(f"  ALS: {als_hits}/5")