# Collaborative Filtering with Last.fm (PyTorch)

This notebook reproduces a lightweight matrix factorization recommender for the Last.fm 360k listening history dataset. The sections below mirror Colab cells so you can copy/paste them directly into Google Colab or run locally.

## Colab Cell 1: Setup and Imports

Run the following command if you are using Google Colab (skip it if your environment already has the dependencies):

```python
!pip install torch pandas tqdm requests
```

Then execute the import cell.

In [None]:
import inspect
import os
import tarfile
from pathlib import Path

import pandas as pd
import requests
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from tqdm.auto import tqdm

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {DEVICE}')

## Colab Cell 2: Last.fm Dataset Loader

The dataset class downloads the official Last.fm 360k archive (~543 MB), extracts the TSV with user–artist play counts, maps string identifiers to integer indices, and returns tensors ready for PyTorch. Set `max_rows` to a smaller number if you want a quicker dry run.

In [None]:
class LastFmDataset(Dataset):
    """PyTorch Dataset for the Last.fm 360k implicit feedback data."""

    URL = 'http://mtg.upf.edu/static/datasets/last.fm/lastfm-dataset-360K.tar.gz'
    ARCHIVE_NAME = 'lastfm-dataset-360K.tar.gz'
    EXTRACTED_DIR = 'lastfm-dataset-360K'
    DATA_FILE = 'usersha1-artmbid-artname-plays.tsv'

    def __init__(self, root_dir=None, max_rows=None, min_plays=1):
        super().__init__()
        self.root_dir = Path(root_dir or os.getcwd())
        self.max_rows = max_rows
        self.min_plays = min_plays

        self._ensure_data_ready()
        self.user_ids, self.item_ids, self.ratings = self._load_tensor_data()

        self.num_users = int(self.user_ids.max().item()) + 1
        self.num_items = int(self.item_ids.max().item()) + 1

    # ------------------------------------------------------------------
    # Internal helpers
    # ------------------------------------------------------------------
    def _ensure_data_ready(self):
        archive_path = self.root_dir / self.ARCHIVE_NAME
        extracted_path = self.root_dir / self.EXTRACTED_DIR

        if extracted_path.exists():
            print('Dataset already extracted.')
            return

        if not archive_path.exists():
            self._download_archive(archive_path)

        print('Extracting dataset...')
        with tarfile.open(archive_path, 'r:gz') as tar_ref:
            signature = inspect.signature(tar_ref.extractall)
            extract_kwargs = {'path': self.root_dir}
            if 'filter' in signature.parameters:
                extract_kwargs['filter'] = 'data'
            tar_ref.extractall(**extract_kwargs)
        print('Extraction complete.')

    def _download_archive(self, archive_path):
        archive_path.parent.mkdir(parents=True, exist_ok=True)
        print('Downloading Last.fm 360k dataset (approx 543MB)...')
        response = requests.get(self.URL, stream=True)
        response.raise_for_status()

        total_size = int(response.headers.get('content-length', 0))
        block_size = 1024
        progress = tqdm(total=total_size, unit='B', unit_scale=True, desc=self.ARCHIVE_NAME)

        with archive_path.open('wb') as f:
            for data in response.iter_content(block_size):
                f.write(data)
                progress.update(len(data))
        progress.close()
        print('Download complete.')

    def _load_tensor_data(self):
        data_path = self.root_dir / self.EXTRACTED_DIR / self.EXTRACTED_DIR / self.DATA_FILE
        if not data_path.exists():
            raise FileNotFoundError(f'Missing data file: {data_path}')

        print('Loading user-artist interaction data...')
        df = pd.read_csv(
            data_path,
            sep='	',
            header=None,
            names=['user_id_raw', 'artist_id_raw', 'artist_name', 'plays'],
            on_bad_lines='skip',
            encoding='utf-8',
            nrows=self.max_rows
        )

        df = df[df['artist_id_raw'].notna() & (df['artist_id_raw'] != '')]
        if self.min_plays > 1:
            df = df[df['plays'] >= self.min_plays]

        user_codes, user_uniques = pd.factorize(df['user_id_raw'], sort=True)
        item_codes, item_uniques = pd.factorize(df['artist_id_raw'], sort=True)

        df['user_id_mapped'] = user_codes
        df['item_id_mapped'] = item_codes
        df['rating_binary'] = 1.0

        self.user_index_to_raw = dict(enumerate(user_uniques))
        self.item_index_to_raw = dict(enumerate(item_uniques))
        self.item_index_to_name = (
            df.drop_duplicates('item_id_mapped')
              .set_index('item_id_mapped')['artist_name']
              .to_dict()
        )
        self.user_interactions = (
            df.groupby('user_id_mapped')['item_id_mapped'].apply(set).to_dict()
        )
        self.interactions_df = df[['user_id_mapped', 'item_id_mapped', 'plays', 'artist_name']].copy()

        user_ids = torch.tensor(df['user_id_mapped'].values, dtype=torch.long)
        item_ids = torch.tensor(df['item_id_mapped'].values, dtype=torch.long)
        ratings = torch.tensor(df['rating_binary'].values, dtype=torch.float32)

        print(f"Dataset loaded: {len(user_uniques)} users, {len(item_uniques)} artists, {len(df)} interactions.")
        return user_ids, item_ids, ratings

    # ------------------------------------------------------------------
    # Dataset protocol
    # ------------------------------------------------------------------
    def __len__(self):
        return len(self.user_ids)

    def __getitem__(self, idx):
        return self.user_ids[idx], self.item_ids[idx], self.ratings[idx]


## Colab Cell 3: Matrix Factorization Model

We model the user–item interaction with two embedding tables. The predicted preference is the dot product of user and artist embeddings.

In [None]:
class CollaborativeFiltering(nn.Module):
    """Simple matrix factorization (user/item embeddings + dot product)."""

    def __init__(self, num_users, num_items, embedding_dim=50):
        super().__init__()
        self.user_embedding = nn.Embedding(num_users, embedding_dim)
        self.item_embedding = nn.Embedding(num_items, embedding_dim)

        nn.init.uniform_(self.user_embedding.weight, -0.01, 0.01)
        nn.init.uniform_(self.item_embedding.weight, -0.01, 0.01)

    def forward(self, user_indices, item_indices):
        user_vecs = self.user_embedding(user_indices)
        item_vecs = self.item_embedding(item_indices)
        return torch.sum(user_vecs * item_vecs, dim=1)


## Colab Cell 4: Training Loop

We minimise mean squared error against the implicit 1.0 target. For a production system you would typically switch to pairwise ranking losses (BPR) or weighted alternations, but this keeps the demo concise.

In [None]:
def train_model(dataloader, num_users, num_items, embedding_dim=50, epochs=3, learning_rate=0.005, device=DEVICE):
    model = CollaborativeFiltering(num_users, num_items, embedding_dim=embedding_dim).to(device)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    for epoch in range(1, epochs + 1):
        model.train()
        total_loss = 0.0
        progress = tqdm(dataloader, desc=f'Epoch {epoch}/{epochs}', leave=False)

        for user_idx, item_idx, ratings in progress:
            user_idx = user_idx.to(device)
            item_idx = item_idx.to(device)
            ratings = ratings.to(device)

            optimizer.zero_grad()
            predictions = model(user_idx, item_idx)
            loss = criterion(predictions, ratings)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            progress.set_postfix({'loss': loss.item()})

        avg_loss = total_loss / max(len(dataloader), 1)
        print(f'Epoch {epoch} finished. Average loss: {avg_loss:.4f}')

    print('Training finished.')
    return model


## Colab Cell 5: Train and Generate Recommendations

This final cell wires everything together. Adjust `max_rows` if you want to prototype on a subset (e.g., `max_rows=500_000`). After training, we show a random listener’s top plays and new artist recommendations.

In [None]:
MAX_ROWS = None  # Set to a smaller integer for quicker experiments, e.g. 500_000
BATCH_SIZE = 64
EPOCHS = 3
EMBEDDING_DIM = 50
TOP_K = 10

lastfm_dataset = LastFmDataset(max_rows=MAX_ROWS)
dataloader = DataLoader(lastfm_dataset, batch_size=BATCH_SIZE, shuffle=True, drop_last=False)

model = train_model(
    dataloader,
    lastfm_dataset.num_users,
    lastfm_dataset.num_items,
    embedding_dim=EMBEDDING_DIM,
    epochs=EPOCHS,
    device=DEVICE
)

model.eval()

with torch.no_grad():
    sample_user_id = torch.randint(0, lastfm_dataset.num_users, (1,), generator=torch.Generator().manual_seed(42)).item()
    user_tensor = torch.tensor([sample_user_id], dtype=torch.long, device=DEVICE)
    user_embedding = model.user_embedding(user_tensor).squeeze(0)

    all_item_ids = torch.arange(lastfm_dataset.num_items, dtype=torch.long, device=DEVICE)
    item_embeddings = model.item_embedding(all_item_ids)
    scores = torch.matmul(item_embeddings, user_embedding)

    known_items = lastfm_dataset.user_interactions.get(sample_user_id, set())
    if known_items:
        known_idx = torch.tensor(list(known_items), dtype=torch.long, device=DEVICE)
        scores[known_idx] = float('-inf')

    k = min(TOP_K, lastfm_dataset.num_items - len(known_items))
    if k <= 0:
        raise ValueError('No unseen items available for recommendation. Try a different user.')

    top_scores, top_indices = torch.topk(scores, k)

print(f"
Sample recommendations for user index {sample_user_id} (raw ID: {lastfm_dataset.user_index_to_raw.get(sample_user_id)})
")

user_history = (
    lastfm_dataset.interactions_df[lastfm_dataset.interactions_df['user_id_mapped'] == sample_user_id]
    .sort_values('plays', ascending=False)
)
print('Top listened artists:')
print(user_history[['artist_name', 'plays']].head(5).to_string(index=False))

print(f"
Top {k} recommended artists:")
for rank, (score, idx) in enumerate(zip(top_scores.tolist(), top_indices.tolist()), start=1):
    artist_name = lastfm_dataset.item_index_to_name.get(idx, f'Artist {idx}')
    print(f"{rank:2d}. {artist_name} (score: {score:.4f})")
