# Spotify GCN Model Recommender in PyTorch

### Importing Packages + Device Declaration

In [21]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torch_geometric.data import Data
from torch_geometric.utils import from_scipy_sparse_matrix
from torch_geometric.nn import GCNConv
import torch.nn.functional as F
from scipy.sparse import coo_matrix
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
import random
import os

# Set device to mps if available
device = 'mps' if torch.backends.mps.is_available() else 'cuda' if torch.cuda.is_available() else 'cpu'
print("Device:", device)

Device: mps


### Data Preprocessing

In [22]:
print("Loading interaction data...")
interaction_data = pd.read_csv('data/user/user_data.csv')
print("Data loaded.")

# Creating mappings for users and items
print("Creating mappings for users and items...")
user_mapping = {user: idx for idx, user in enumerate(interaction_data['user_id'].unique())}
item_mapping = {item: idx for idx, item in enumerate(interaction_data['song_id'].unique())}
interaction_data['user_id'] = interaction_data['user_id'].map(user_mapping)
interaction_data['song_id'] = interaction_data['song_id'].map(item_mapping)
print("Mappings created.")

# Creating the interaction matrix based on the entire dataset
print("Creating interaction matrix...")
rows = interaction_data['user_id'].values
cols = interaction_data['song_id'].values
data = interaction_data['play_count'].values
interaction_matrix = coo_matrix((data, (rows, cols)), shape=(len(user_mapping), len(item_mapping)))
edge_index, edge_attr = from_scipy_sparse_matrix(interaction_matrix)
edge_index = edge_index.to(device)
print("Interaction matrix created.")

Loading interaction data...
Data loaded.
Creating mappings for users and items...
Mappings created.
Creating interaction matrix...
Interaction matrix created.


In [23]:
# Splitting data into training and testing sets
print("Splitting data into training and testing sets...")
train_data, test_data = train_test_split(interaction_data, test_size=0.2, random_state=42)
print("Data split completed.")

# Variables for train function
train_rows = train_data['user_id'].values
train_cols = train_data['song_id'].values
train_data_values = train_data['play_count'].values

# Variables for test function
test_rows = test_data['user_id'].values
test_cols = test_data['song_id'].values
test_data_values = test_data['play_count'].values

Splitting data into training and testing sets...
Data split completed.


In [24]:
class InteractionDataset(Dataset):
    def __init__(self, user_ids, item_ids, labels):
        self.user_ids = user_ids
        self.item_ids = item_ids
        self.labels = labels

    def __len__(self):
        return len(self.user_ids)

    def __getitem__(self, idx):
        user_id = self.user_ids[idx]
        item_id = self.item_ids[idx]
        label = self.labels[idx]
        return user_id, item_id, label

# Creating the train and test datasets
print("Creating datasets...")
train_dataset = InteractionDataset(train_rows, train_cols, train_data_values)
test_dataset = InteractionDataset(test_rows, test_cols, test_data_values)
print("Datasets created.")

# Creating the DataLoaders
print("Creating DataLoaders...")
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=0)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False, num_workers=0)
print("DataLoaders created.")

Creating datasets...
Datasets created.
Creating DataLoaders...
DataLoaders created.


In [25]:
# Creating the model
class GCNRecommender(nn.Module):
    def __init__(self, num_users, num_items, latent_dim):
        super(GCNRecommender, self).__init__()
        self.user_embedding = nn.Embedding(num_users, latent_dim)
        self.item_embedding = nn.Embedding(num_items, latent_dim)
        self.conv1 = GCNConv(latent_dim, 128)                       
        self.conv2 = GCNConv(128, 64)                               
        self.fc = nn.Linear(64 * 2, 1)
        self.dropout = nn.Dropout(0.4)

    def forward(self, user, item, edge_index):
        user_embed = self.user_embedding(user)                      # (batch_size, latent_dim)
        item_embed = self.item_embedding(item)                      # (batch_size, latent_dim)
        x = torch.cat([user_embed, item_embed], dim=0)              # (batch_size * 2, latent_dim)
        x = F.relu(self.conv1(x, edge_index))                       # (batch_size * 2, 128)
        x = F.relu(self.conv2(x, edge_index))                       # (batch_size * 2, 64)
        x = self.dropout(x)
        user_gcn_embed = x[:user.size(0)]                           # (batch_size, 64)
        item_gcn_embed = x[user.size(0):]                           # (batch_size, 64)
        
        x = torch.cat([user_gcn_embed, item_gcn_embed], dim=1)      # (batch_size, 128)
        x = self.fc(x)                                              # (batch_size, 1)
        return x


In [26]:
# Definine model metrics (loss, optimization) and hyperparameters (num_users, num_items, latent_dim)
num_users = len(user_mapping)
num_items = len(item_mapping)
latent_dim = 16
criterion = nn.MSELoss()

Double checking that the model is outputting the correct shape 

In [27]:
model = GCNRecommender(num_users, num_items, latent_dim=16).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
criterion = nn.MSELoss()

for user_id, item_id, label in train_loader:
    user_id, item_id, label = user_id.to(device), item_id.to(device), label.to(device)
    outputs = model(user_id, item_id, edge_index)
    print(outputs.shape)  # Should be (batch_size, 1)
    break

torch.Size([64, 1])


In [28]:
def model_training(model, epochs):
    print("Starting training...")
    loss_dict = {}
    for epoch in range(epochs):
        model.train()
        epoch_loss = 0
        for user, item, label in train_loader:
            user = user.to(device)
            item = item.to(device)
            label = label.float().to(device)
            optimizer.zero_grad()
            outputs = model(user, item, edge_index).squeeze()
            loss = criterion(outputs, label)
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
        avg_epoch_loss = epoch_loss / len(train_loader)
        print(f'Epoch {epoch}, Loss: {avg_epoch_loss}')
        loss_dict[epoch] = avg_epoch_loss
        if device == 'mps':
            torch.mps.empty_cache()
        else:
            torch.cuda.empty_cache()
    torch.save(model.state_dict(), 'models/gcn_recommender_model.pth')
    return model, loss_dict

In [29]:
def model_evaluation(model, test_loader):
    model.eval()  # Set model to evaluation mode
    criterion = nn.MSELoss()
    total_loss = 0
    with torch.no_grad():
        for user, item, label in test_loader:
            user = user.to(device)
            item = item.to(device)
            label = label.float().to(device)  # Ensure labels are of float type
            outputs = model(user, item, edge_index).squeeze()
            loss = criterion(outputs, label)
            total_loss += loss.item()
    avg_loss = total_loss / len(test_loader)
    rmse = np.sqrt(avg_loss)
    print(f'Standard Evaluation RMSE: {rmse}')
    return rmse

In [30]:
# Defining model evaluation function with Monte Carlo Dropout
def model_evaluation_mc_dropout(model, n_samples=15):
    """
    evaluating the model using Monte-Carlo Dropout to estimate prediction uncertainty.

    Takes in:
    - n_samples: Number of forward passes to perform for uncertainty estimation. This is the amount of estimations per sample point.

    Returns:
    - mean_predictions: Mean of the predictions from multiple forward passes.
    - std_predictions: Standard deviation of the predictions from multiple forward passes.
    """
    model.train()  # Enable dropout during inference
    all_predictions = []
    for user, item, _ in test_loader:
        user = user.to(device)
        item = item.to(device)
        predictions = []
        for _ in range(n_samples):
            with torch.no_grad():
                output = model(user, item, edge_index).squeeze()
                predictions.append(output.cpu().numpy())
        all_predictions.append(np.stack(predictions, axis=1))
        
    all_predictions = np.concatenate(all_predictions, axis=0)
    mean_predictions = np.mean(all_predictions, axis=1)
    std_predictions = np.std(all_predictions, axis=1)
    return mean_predictions, std_predictions

In [31]:
# Select most uncertain samples for labeling
def select_most_uncertain_samples(std_predictions, top_k=500):
    uncertainties = std_predictions
    uncertain_indices = np.argsort(-uncertainties)[:top_k]
    return [(test_rows[i], test_cols[i]) for i in uncertain_indices]


In [32]:
def update_model_with_active_learning(model, uncertain_samples):
    optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
    criterion = nn.MSELoss()
    model.train()  # Set the model to training mode

    for user, item in uncertain_samples:
        user = torch.tensor([user], dtype=torch.long).to(device)
        item = torch.tensor([item], dtype=torch.long).to(device)
        # Simulating user feedback as actual play_count
        actual_play_count = interaction_data[(interaction_data['user_id'] == user.item()) & 
                                             (interaction_data['song_id'] == item.item())]['play_count'].values[0]
        optimizer.zero_grad()
        outputs = model(user, item, edge_index).squeeze()
        loss = criterion(outputs, torch.tensor([actual_play_count], dtype=torch.float).to(device))
        loss.backward()
        optimizer.step()

    # Clear GPU cache
    if device == 'mps':
        torch.mps.empty_cache()
    else:
        torch.cuda.empty_cache()
    torch.save(model.state_dict(), 'gcn_recommender_model_AL.pth')

    return model

In [35]:
print("Script started.")
print("Initialising Model...")
init_model = GCNRecommender(num_users, num_items, latent_dim=16).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
criterion = nn.MSELoss()

if not os.path.exists("models/gcn_recommender_model.pth"):
    trained_model, loss_dict = model_training(model = init_model, epochs=3)
else:
    print("Loading existing model...")
    init_model.load_state_dict(torch.load("models/gcn_recommender_model.pth"))
    trained_model = init_model

print("Evaluating Model...")
model_evaluation(model = trained_model, test_loader=test_loader)

print("Performing Monte Carlo Dropout...")
mean_predictions, std_predictions = model_evaluation_mc_dropout(model = trained_model)

for i in range(1, 11):  # Print the first 10 predictions with uncertainties
    print(f'Prediction {i}: {mean_predictions[i]}, Uncertainty: {std_predictions[i]}')

print("Selecting most uncertain points...")
uncertain_samples = select_most_uncertain_samples(std_predictions=std_predictions, top_k=1000)

print("Updating model with most uncertain points...")
AL_model = update_model_with_active_learning(model=trained_model, uncertain_samples=uncertain_samples)

# Remove uncertain samples from the test set
remaining_test_indices = [i for i in range(len(test_rows)) if (test_rows[i], test_cols[i], test_data_values[i]) not in uncertain_samples]
new_test_rows = test_rows[remaining_test_indices]

new_test_cols = test_cols[remaining_test_indices]
new_test_data_values = test_data_values[remaining_test_indices]

# Create a new test dataset and test loader without the uncertain samples
new_test_dataset = InteractionDataset(new_test_rows, new_test_cols, new_test_data_values)
new_test_loader = DataLoader(new_test_dataset, batch_size=64, shuffle=False, num_workers=0)

print("Evaluating Final Model...")
model_evaluation(model = AL_model, test_loader=new_test_loader)
print("Script completed.")

Script started.
Initialising Model...
Loading existing model...
Evaluating Model...
Standard Evaluation RMSE: 18.633604857569196
Performing Monte Carlo Dropout...
Prediction 1: 0.084018774330616, Uncertainty: 0.000191554514458403
Prediction 2: 0.08428670465946198, Uncertainty: 0.000645314808934927
Prediction 3: 0.0841139480471611, Uncertainty: 0.0003035986446775496
Prediction 4: 0.08388087898492813, Uncertainty: 0.00019261296256445348
Prediction 5: 0.08411452174186707, Uncertainty: 0.0002500434929970652
Prediction 6: 0.08413247764110565, Uncertainty: 0.0002607597562018782
Prediction 7: 0.08424779772758484, Uncertainty: 0.00023870817676652223
Prediction 8: 0.0840759351849556, Uncertainty: 0.00024242023937404156
Prediction 9: 0.0841313824057579, Uncertainty: 0.00019001538748852909
Prediction 10: 0.08457323908805847, Uncertainty: 0.0004915619501844049
Selecting most uncertain points...
Updating model with most uncertain points...


  return F.mse_loss(input, target, reduction=self.reduction)


Evaluating Final Model...
Standard Evaluation RMSE: 56.11326021284721
Script completed.


### Setting up Spotify API

In [None]:
import dotenv
import spotipy
import os
from spotipy import SpotifyClientCredentials
dotenv.load_dotenv()
# Load Spotify API credentials
sp = spotipy.Spotify(auth_manager=SpotifyClientCredentials(
    client_id=os.getenv('SPOTIPY_CLIENT_ID'),
    client_secret=os.getenv('SPOTIFY_CLIENT_SECRET')
))

In [None]:
playlists = {
    "Today's Top Hits": "37i9dQZF1DXcBWIGoYBM5M",
    "Global Top 50": "37i9dQZEVXbMDoHDwVN2tF",
    "Global Viral 50": "37i9dQZEVXbLiRSasKsNU9",
    "New Music Friday": "37i9dQZF1DX4JAvHpjipBk",
    "Hot Country": "37i9dQZF1DX1lVhptIYRda",
    "Beast Mode": "37i9dQZF1DX76Wlfdnj7AP",
    "Chill Hits": "37i9dQZF1DX4WYpdgoIcn6",
    "Soft Pop Hits": "37i9dQZF1DX3YSRoSdA634",
    "Good Vibes": "37i9dQZF1DX6GwdWRQMQpq",
    "Evening Acoustic": "37i9dQZF1DXbJmiEZs5p2t",
    "All Out 80s": "37i9dQZF1DX4UtSsGT1Sbe",
    "All Out 90s": "37i9dQZF1DXbTxeAdrVG2l",
    "Your Favorite Coffeehouse": "37i9dQZF1DX6ziVCJnEm59",
    "Acoustic Hits": "37i9dQZF1DX4E3UdUs7fUx",
    "Deep Focus": "37i9dQZF1DWZeKCadgRdKQ",
    "Throwback Thursday": "37i9dQZF1DX4UtSsGT1Sbe",
    "Peaceful Guitar": "37i9dQZF1DX0jgyAiPl8Af",
    "Classic Road Trip Songs": "37i9dQZF1DWSThc8QnxalT",
    "Relax & Unwind": "37i9dQZF1DX6MOzVr6s0AO",
    "Top 50 USA": "37i9dQZEVXbLRQDuF5jeBp",
    "Viral 50 USA": "37i9dQZEVXbKuaTI1Z1Afx",
    "Top 50 UK": "37i9dQZEVXbLnolsZ8PSNw",
    "Viral 50 UK": "37i9dQZEVXbL3DLHfQeDmV",
    "Top 50 Brazil": "37i9dQZEVXbMXbN3EUUhlg",
    "Viral 50 Brazil": "37i9dQZEVXbMMy2roB9myp",
    "Top 50 France": "37i9dQZEVXbIPWwFssbupI",
    "Viral 50 France": "37i9dQZEVXbIZM8SIgu6df",
    "Top 50 Japan": "37i9dQZEVXbKXQ4mDTEBXq",
    "Viral 50 Japan": "37i9dQZEVXbKqiTGXuCOsB",
    "Top 50 India": "37i9dQZEVXbLZ52XmnySJg",
    "Viral 50 India": "37i9dQZEVXbMWDif5SCBJq",
    "Top 50 Italy": "37i9dQZEVXbIQnj7RRhdSX",
    "Viral 50 Italy": "37i9dQZEVXbKbvcwe5owJ1",
    "Top 50 South Korea": "37i9dQZEVXbJZyENOWUFo7",
    "Viral 50 South Korea": "37i9dQZEVXbNxXF4SkHj9F",
    "Top 50 Australia": "37i9dQZEVXbJPcfkRz0wJ0",
    "Viral 50 Australia": "37i9dQZEVXbK4fwx2r07XW",
    "Top 50 Germany": "37i9dQZEVXbJiZcmkrIHGU",
    "Viral 50 Germany": "37i9dQZEVXbKglSdDwFtE9"
}


### Spotify API Functions

In [None]:
def get_tracks(playlist_id):
    try:
        results = sp.playlist_tracks(playlist_id)
        tracks = results['items']
        
        while results['next']:
            results = sp.next(results)
            tracks.extend(results['items'])
        return tracks
    except spotipy.exceptions.SpotifyException as e:
        # print(f"Error fetching playlist {playlist_id}: {e}") # Commented out for easy debugging as auto error message showed anyways
        return []

def generate_playlist_data():
    """getting """
    song_data = []
    playlist_data = []
    for playlist_name, playlist_id in playlists.items():
        tracks = get_tracks(playlist_id)
        for song in tracks:
            track = song.get('track')
            if track is not None:  # Check if track is not None
                song_data.append({
                    'song_id': track.get('id'),
                    'title': track.get('name'),
                    'artist': track['artists'][0]['name'],
                    'album': track['album']['name'],
                    'duration': track['duration_ms'] // 1000,  # convert ms to seconds
                    'popularity': track['popularity'],
                    'release_date': track['album']['release_date']
                })

                playlist_data.append({
                    'playlist_name': playlist_name,
                    'song_id': track.get('id'),
                })
        
        # Respect API limit
        time.sleep(2)
    
    song_df = pd.DataFrame(song_data)
    playlist_df = pd.DataFrame(playlist_data)
    return song_df, playlist_df