In [1]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from sklearn.preprocessing import MinMaxScaler
from torch.utils.data import DataLoader, TensorDataset

In [2]:
!python -c "import numpy; print(numpy.__version__)"

1.22.4


In [2]:
import sys
project_root = '../../src/'
sys.path.insert(0, project_root)

In [2]:
user_data = pd.read_csv('../../datasets/user_month_datasets/user1_1month_listening_history.csv')

In [3]:
import numpy as np
import pandas as pd


def preprocess_data(df):
    """
    Preprocess the listening history to normalize features and prepare input for the RNN.
    """
    # Define feature columns
    feature_columns = [
        'duration (ms)', 'danceability', 'energy', 'loudness', 
        'speechiness', 'acousticness', 'instrumentalness', 
        'liveness', 'valence', 'tempo', 'spec_rate'
    ]
    
    # Normalize features
    scaler = MinMaxScaler()
    df[feature_columns] = scaler.fit_transform(df[feature_columns])
    
    # Convert the dataset into sequences for the RNN
    sequences = df[feature_columns].values
    
    return sequences, scaler

In [4]:

import torch
import torch.nn as nn

class RNNModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=3, dropout=0.2):
        super(RNNModel, self).__init__()
        
        # Define RNN with multiple layers
        self.rnn = nn.RNN(input_size, hidden_size, num_layers=num_layers, 
                           batch_first=True, dropout=dropout)
        
        # Layer normalization for stability
        #self.layer_norm = nn.LayerNorm(hidden_size)
        
        # Fully connected layers for projection
        self.fc1 = nn.Linear(hidden_size, hidden_size // 2)
        self.fc2 = nn.Linear(hidden_size // 2, output_size)
        
        # Activation functions
        self.relu = nn.ReLU()
        self.tanh = nn.Tanh()
        
        # Dropout for regularization
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x):
        # Pass input through RNN layers
        _, hidden = self.rnn(x)
        
        # Take the last hidden state of the last RNN layer
        hidden = hidden[-1]
        
        # Normalize the hidden state
        #hidden = self.layer_norm(hidden)
        
        # Pass through fully connected layers with activation
        out = self.fc1(hidden)
        out = self.relu(out)
        out = self.dropout(out)
        
        # Final projection to taste vector
        taste_vector = self.fc2(out)
        taste_vector = self.tanh(taste_vector)  # Optional, for bounded output
        
        return taste_vector

In [13]:
def train_rnn_model(model, train_loader, epochs=1000, learning_rate=0.001):
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    
    for epoch in range(epochs):
        model.train()
        epoch_loss = 0
        for sequences in train_loader:
            sequences = sequences.float()
            
            # Forward pass
            outputs = model(sequences)
            weights = torch.arange(1, sequences.shape[1] + 1, device=sequences.device).float()
            weights /= weights.sum()  # Normalize weights

            # Compute weighted sum across the sequence
            target_vector = (sequences * weights.unsqueeze(0).unsqueeze(-1)).sum(dim=1)
            loss = criterion(outputs, target_vector)  # Predict the last song's vector
            epoch_loss += loss.item()
            
            # Backward pass
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        
        #print(f"Epoch {epoch + 1}, Loss: {epoch_loss / len(train_loader):.4f}")

In [6]:
from annoy import AnnoyIndex

def build_annoy_index(song_vectors, num_trees=10):
    """
    Build an Annoy index for nearest neighbor search.
    """
    num_features = song_vectors.shape[1]
    annoy_index = AnnoyIndex(num_features, 'euclidean')
    
    for i, vector in enumerate(song_vectors):
        annoy_index.add_item(i, vector)
    
    annoy_index.build(num_trees)
    return annoy_index

In [7]:
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials

# Set up Spotify API credentials
client_id = '75d0ab19dcdc4db7821a27bf07df72a0'  # Replace with your Spotify client ID
client_secret = 'f64897e446834d7cb83b1c90916242df'  # Replace with your Spotify client secret

# Authenticate with Spotify
client_credentials_manager = SpotifyClientCredentials(client_id=client_id, client_secret=client_secret)
sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)

# Function to extract song name from Spotify URL
def get_song_names_from_url(song_urls):
    song_names = []
    for i in range(len(song_urls)):
        track_id = song_urls[i].split("/")[-1].split("?")[0]  # Extract the track ID from the URL
        track_info = sp.track(track_id)  # Get track information
        song_name = track_info['name']  # Extract song name
        artist_name = track_info['artists'][0]['name']  # Extract artist name
        song_names.append(f"{song_name} by {artist_name}")
    return song_names


In [8]:
def generate_recommendations(taste_vector, annoy_index, song_metadata, k):
    """
    Generate song recommendations by querying the Annoy index.
    """
    # Get nearest song indices
    nearest_indices = annoy_index.get_nns_by_vector(taste_vector, k, include_distances=False)
    print(nearest_indices)
    # Index into the song_metadata list directly
    recommended_songs = [song_metadata[i] for i in nearest_indices]
    return recommended_songs

In [9]:
import json
def save_playlists_to_json(playlists, filename="rnn_user_playlists.json"):
    with open(filename, "w") as f:
        json.dump({str(user): playlist for user, playlist in playlists.items()}, f, indent=4)
    print(f"Playlists saved to {filename}")

In [16]:
import os

USER_DATASETS_FOLDER = "../../datasets/user_month_datasets/"
MODELS_FOLDER = "models"
os.makedirs(MODELS_FOLDER, exist_ok=True)

In [None]:
def train_models_for_all_users():
    playlists = {}
    for file in os.listdir(USER_DATASETS_FOLDER):
        if file.endswith(".csv"):
            user_id = file.split("_")[0].replace("user", "")
            print(f"Processing User {user_id}...")
            
            # Load and preprocess data
            user_data = pd.read_csv(os.path.join(USER_DATASETS_FOLDER, file))
            user_data = user_data.sort_values(by="day").drop(columns=["labels", "user_id", "group_no"])
            sequences, _ = preprocess_data(user_data)
            train_loader = DataLoader(sequences, batch_size=1, shuffle=True)
            
            # Train RNN model
            input_size = sequences.shape[1]
            hidden_size = 128
            output_size = sequences.shape[1]
            model = RNNModel(input_size, hidden_size, output_size)
            train_rnn_model(model, train_loader, epochs=15, learning_rate=0.01)

            model_path = os.path.join(MODELS_FOLDER, f"user_{user_id}_model.pth")
            torch.save(model.state_dict(), model_path)
            print(f"Model saved for User {user_id} at {model_path}")
            
def generate_playlists_for_all_users(num_songs=10):
    playlists = {}
    for file in os.listdir(USER_DATASETS_FOLDER):
        if file.endswith(".csv"):
            user_id = file.split("_")[0].replace("user", "")
            print(f"Generating playlist for User {user_id}...")
            
            # Load the trained model
            model_path = os.path.join(MODELS_FOLDER, f"user_{user_id}_model.pth")
            if not os.path.exists(model_path):
                print(f"No trained model found for User {user_id}. Skipping...")
                continue
            
            # Load user data
            user_data = pd.read_csv(os.path.join(USER_DATASETS_FOLDER, file))
            user_data = user_data.sort_values(by="day").drop(columns=["labels", "user_id", "group_no"])
            sequences, _ = preprocess_data(user_data)

            input_size = sequences.shape[1]
            hidden_size = 128
            output_size = sequences.shape[1]
            model = RNNModel(input_size, hidden_size, output_size)
            model.load_state_dict(torch.load(model_path))
            model.eval()
            
            # Generate taste vector
            with torch.no_grad():
                sequence_tensor = torch.tensor(sequences[0:1]).float()
                taste_vector = model(sequence_tensor).squeeze(0).numpy()
            
            # Build Annoy index and generate recommendations
            song_vectors = sequences
            normalized_vectors = song_vectors / np.linalg.norm(song_vectors, axis=1, keepdims=True)
            unique_vectors = np.array(list(set(map(tuple, normalized_vectors))))
            annoy_index = build_annoy_index(unique_vectors)

            song_metadata = user_data['uri'].tolist()
            recommended_uris = generate_recommendations(taste_vector, annoy_index, song_metadata, k=num_songs)
            recommended_songs = get_song_names_from_url(recommended_uris)
            
            playlists[user_id] = recommended_songs
    
    return playlists

train_models_for_all_users()
playlists = generate_playlists_for_all_users(num_songs=10)

In [30]:
save_playlists_to_json(playlists)

Playlists saved to rnn_user_playlists.json


In [None]:
df =  pd.read_csv('../../datasets/user1_1month_listening_history.csv')
# Drop irrelevant columns
df = df.drop(columns=["labels", "user_id", "group_no"])
df = df.sort_values(by="day")

# Preprocess data
sequences, scaler = preprocess_data(df)

# Prepare DataLoader
train_loader = torch.utils.data.DataLoader(sequences, batch_size=1, shuffle=True)

# Define and train the RNN model
input_size = sequences.shape[1]  # Number of features
hidden_size = 128  # Size of the hidden layer
output_size = sequences.shape[1]  # Output is the same size as input
model = RNNModel(input_size, hidden_size, output_size)
train_rnn_model(model, train_loader, epochs=15, learning_rate=0.001)

In [55]:
model.eval()

# sequence_tensor = torch.tensor(sequences).unsqueeze(0)  # Add batch dimension
# sequence_tensor = sequence_tensor / torch.norm(sequence_tensor, dim=-1, keepdim=True)  # Normalize features

with torch.no_grad():
  sequence_tensor = torch.tensor(sequences[0:1]).float()
  taste_vector = model(sequence_tensor.float()).squeeze(0).numpy()

song_vectors = sequences
normalized_vectors = song_vectors / np.linalg.norm(song_vectors, axis=1, keepdims=True)
unique_vectors = np.array(list(set(map(tuple, normalized_vectors))))

# Build the Annoy index
annoy_index = build_annoy_index(unique_vectors)

# Generate recommendations
song_metadata = df['uri'].tolist()  # Convert 'uri' column to a list
recommended_songs_uris = generate_recommendations(taste_vector, annoy_index, song_metadata, k=10)

# Fetch song names using the Spotify API
recommended_songs = get_song_names_from_url(recommended_songs_uris)

# Display the recommended songs
print("\nRecommended Songs:")
for i, song in enumerate(recommended_songs, start=1):
    print(f"{i}. {song}")

[520, 394, 560, 5, 417, 545, 508, 240, 335, 190]

Recommended Songs:
1. Black Water - Single Version by The Doobie Brothers
2. The Paris of Nowhere by The Wonder Years
3. Education by Private Productions
4. Trains by Blippi
5. Remembrance, Remembrance - Score by James Horner
6. Tell Pencil to hmu let's collab by Deejay Chainwallet
7. Beautiful People (feat. Carolina Liar) by Cher Lloyd
8. Burden by Aminé
9. Lemonade by Marco Nobel
10. Cleanse Me (Search Me, O God) by Hymns on Piano


In [61]:
model.eval()

# sequence_tensor = torch.tensor(sequences).unsqueeze(0)  # Add batch dimension
# sequence_tensor = sequence_tensor / torch.norm(sequence_tensor, dim=-1, keepdim=True)  # Normalize features

with torch.no_grad():
  sequence_tensor = torch.tensor(sequences[0:1]).float()
  taste_vector = model(sequence_tensor.float()).squeeze(0).numpy()

song_vectors = sequences
normalized_vectors = song_vectors / np.linalg.norm(song_vectors, axis=1, keepdims=True)
unique_vectors = np.array(list(set(map(tuple, normalized_vectors))))

# Build the Annoy index
annoy_index = build_annoy_index(unique_vectors)

# Generate recommendations
song_metadata = df['uri'].tolist()  # Convert 'uri' column to a list
recommended_songs_uris = generate_recommendations(taste_vector, annoy_index, song_metadata, k=10)

# Fetch song names using the Spotify API
recommended_songs = get_song_names_from_url(recommended_songs_uris)

# Display the recommended songs
print("\nRecommended Songs:")
for i, song in enumerate(recommended_songs, start=1):
    print(f"{i}. {song}")

[479, 183, 713, 290, 126, 36, 85, 107, 203, 398]

Recommended Songs:
1. Quevedo: Bzrp Music Sessions, Vol. 52 by Sergio Rodríguez
2. Dear Stranger by STRFKR
3. Úton by Slow Village
4. A Mí Me Esta Doliendo by Banda MS de Sergio Lizárraga
5. Thrones of Blood by Sullivan King
6. Let Live by Of Mice & Men
7. Adagio by Secret Garden
8. Después de Todo - Remasterizado by Juan Formell
9. Education by Private Productions
10. Forever Xe3 (Vibe Mashup) by Vibe
