# Recommendation Engine

In [1]:
# Importing the necessary libraries to conduct EDA
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from sklearn.pipeline import Pipeline
from sklearn.decomposition import PCA
import plotly.express as px
from sklearn.manifold import TSNE
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
from collections import defaultdict
from sklearn.metrics import euclidean_distances
from scipy.spatial.distance import cdist
import difflib
from spotipy.oauth2 import SpotifyOAuth

import warnings
warnings.filterwarnings('ignore')

In [None]:
# Importing data from csv file and aliasing
song_df = pd.read_csv('tracks_features.csv')

# K Means Clustering

Imputing all NaN values with the mean

In [None]:
# Defining the pipeline
song_cluster_pipeline = Pipeline([
    ('scaler', StandardScaler()), 
    ('kmeans', KMeans(n_clusters=20, verbose=False))
])

# Selecting numerical columns
X = song_df.select_dtypes(np.number)

# Imputing missing values with mean
X_imputed = X.fillna(X.mean())

# Fitting the pipeline to the data
song_cluster_pipeline.fit(X_imputed)

# Predicting cluster labels
song_cluster_labels = song_cluster_pipeline.predict(X_imputed)

# Adding cluster labels to the data
song_df['cluster_label'] = song_cluster_labels


Clustering Songs with K Means

In [2]:
# Defining the pipeline
song_cluster_pipeline = Pipeline([
    ('scaler', StandardScaler()), 
    ('kmeans', KMeans(n_clusters=20, verbose=False))
])

# Selecting numerical columns
X = song_df.select_dtypes(np.number)

# Imputing missing values with mean
X_imputed = X.fillna(X.mean())

# Fitting the pipeline to the data
song_cluster_pipeline.fit(X_imputed)

# Predicting cluster labels
song_cluster_labels = song_cluster_pipeline.predict(X_imputed)

# Adding cluster labels to the data
song_df['cluster_label'] = song_cluster_labels


NameError: name 'song_df' is not defined

Clustering genres with K means

In [None]:
# Defining the cluster pipeline
cluster_pipeline = Pipeline([('scaler', StandardScaler()), ('kmeans', KMeans(n_clusters=10))])

# Selecting the numerical columns
X = genre_df.select_dtypes(np.number)

# Fitting the pipeline to the data
cluster_pipeline.fit(X)

# Predicting cluster labels
genre_df['cluster'] = cluster_pipeline.predict(X)

In [None]:
# Visualising the Clusters with t-SNE
tsne_pipeline = Pipeline([('scaler', StandardScaler()), ('tsne', TSNE(n_components=2, verbose=1))])
genre_embedding = tsne_pipeline.fit_transform(X)
projection = pd.DataFrame(columns=['x', 'y'], data=genre_embedding)
projection['genres'] = genre_df['genres']
projection['cluster'] = genre_df['cluster']

fig = px.scatter(
    projection, x='x', y='y', color='cluster', hover_data=['x', 'y', 'genres'])
fig.show()

Building the recommendation Engine

In [None]:
def find_song(name, year):
    """
    Obtain the song details using Spotify API and returning as a pandas dataframe
    """
    # Initialising the Spotify client with my client ID and secret
    sp = spotipy.Spotify(auth_manager=SpotifyClientCredentials(client_id="23671532bcb84c5ea53092ffc7cb8f12",
                                                           client_secret="6c40f572e59a4a2a80e12111a380867d"))
    song_data = defaultdict(list)
    results = sp.search(q='track: {} year: {}'.format(name, year), limit=1)
    if not results['tracks']['items']:
        return None

    results = results['tracks']['items'][0]
    track_id = results['id']
    audio_features = sp.audio_features(track_id)[0]

    # Basic song information
    song_data['name'].append(name)
    song_data['year'].append(year)
    song_data['explicit'].append(int(results['explicit']))
    song_data['duration_ms'].append(results['duration_ms'])
    song_data['popularity'].append(results['popularity'])
    
    # Extracting artists information
    artists = [artist['name'] for artist in results['artists']]
    song_data['artists'].append(artists)

    # Audio features, setting default values if keys are missing
    keys = [
        'danceability', 'energy', 'key', 'loudness', 'mode', 'speechiness',
        'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo',
        'type', 'id', 'uri', 'track_href', 'analysis_url', 'time_signature'
    ]

    for key in keys:
        song_data[key].append(audio_features.get(key, None))

    return pd.DataFrame(song_data)

In [None]:
number_cols = ['valence', 'year', 'acousticness', 'danceability', 'duration_ms', 'energy', 'explicit',
 'instrumentalness', 'key', 'liveness', 'loudness', 'mode', 'speechiness', 'tempo']

def get_song_data(song, spotify_data):
    return find_song(song['name'], song['year'])


def get_mean_vector(song_list, spotify_data):
    
    song_vectors = []
    
    for song in song_list:
        song_data = get_song_data(song, spotify_data)
        if song_data is None:
            print('Warning: {} does not exist in Spotify or in database'.format(song['name']))
            continue
        song_vector = song_data[number_cols].values
        song_vectors.append(song_vector)  
    
    song_matrix = np.array(list(song_vectors))
    return np.mean(song_matrix, axis=0)


def flatten_dict_list(dict_list):
    if isinstance(dict_list, dict):  # Checks if input is a single dictionary
        dict_list = [dict_list]  # Converts it to a list containing that dictionary
    
    flattened_dict = defaultdict(list)
    for key in dict_list[0].keys():
        flattened_dict[key] = []
    
    for dictionary in dict_list:
        for key, value in dictionary.items():
            flattened_dict[key].append(value)
            
    return flattened_dict


def recommend_songs_1(song_list, spotify_data, n_songs=10):
    metadata_cols = ['name', 'year', 'artists', 'id']
    song_dict = flatten_dict_list(song_list)
    
    # Calculate the mean vector
    song_center = get_mean_vector(song_list, spotify_data)
    scaler = song_cluster_pipeline.steps[0][1]
    scaled_data = scaler.transform(spotify_data[number_cols])
    scaled_song_center = scaler.transform(song_center.reshape(1, -1))
    
    # Calculate distances
    distances = cdist(scaled_song_center, scaled_data, 'cosine')
    index = list(np.argsort(distances)[:, :n_songs][0])
    
    # Select recommended songs
    rec_songs = spotify_data.iloc[index]
    
    # Exclude input songs by name
    rec_songs = rec_songs[~rec_songs['name'].isin(song_dict['name'])]
    
    # If no songs left, increase n_songs and relax criteria
    if rec_songs.empty:
        index = list(np.argsort(distances)[:, :n_songs * 2][0])
        rec_songs = spotify_data.iloc[index]
        rec_songs = rec_songs[~rec_songs['name'].isin(song_dict['name'])]
    
    return rec_songs[metadata_cols].to_dict(orient='records')

# Amending the model to create playlists

Let's firstly create a new_playlist function, with the aim being to initiate a new playlist - creating a blank dataframe.

In [None]:
def new_playlist():
    global playlist_df
    playlist_df = pd.DataFrame(columns=['name', 'year', 'artists', 'id'])

Now let's create a function to initialise the playlist, this involves concatenating the initial suggested song to out playlist_df and then creating anew df called holding_df to 'hold' our suggested song and give the user the option to skip or add to our playlist_df:

In [None]:
def recommend_songs_initialise(song_list, spotify_data, n_songs=1):
    global df, holding_df, playlist_df

    metadata_cols = ['name', 'year', 'artists', 'id']
    song_dict = flatten_dict_list(song_list)
    
    song_center = get_mean_vector(song_list, spotify_data)
    scaler = song_cluster_pipeline.steps[0][1]
    scaled_data = scaler.transform(spotify_data[number_cols])
    scaled_song_center = scaler.transform(song_center.reshape(1, -1))
    distances = cdist(scaled_song_center, scaled_data, 'cosine')
    index = list(np.argsort(distances)[:, :n_songs][0])

    # Select recommended songs
    rec_songs = spotify_data.iloc[index]

    # Exclude input songs from the output by name
    rec_songs = rec_songs[~rec_songs['name'].isin(song_dict['name'])]

    # If no songs left, increase n_songs and relax criteria
    if rec_songs.empty:
        index = list(np.argsort(distances)[:, :n_songs * 2][0])
        rec_songs = spotify_data.iloc[index]
        rec_songs = rec_songs[~rec_songs['name'].isin(song_dict['name'])]


    # Convert the recommended songs to a dictionary format
    recommended_songs_dict = rec_songs[metadata_cols].to_dict(orient='records')
    
    # Convert the dictionary to a DataFrame
    recommended_songs_df = pd.DataFrame(recommended_songs_dict)
    
    # If holding_df is not already defined, create it with the required columns
    holding_df = pd.DataFrame(columns=metadata_cols)
    
    # Append the new recommendations to holding_df
    holding_df = pd.concat([holding_df, recommended_songs_df], ignore_index=True)
    
    # Create a DataFrame for the songs from song_list
    playlist_songs = []
    for song in song_list:
        song_data = get_song_data(song, spotify_data)
        if song_data is not None:
            # Extract values from the DataFrame
            name = song_data['name'].values[0]
            year = song_data['year'].values[0]
            artists = song_data['artists'].values[0]
            id = song_data['id'].values[0]
            
            playlist_songs.append({
                'name': name,
                'year': year,
                'artists': artists,
                'id': id
            })

    playlist_df = pd.DataFrame(playlist_songs)

We now need a recommend_songs_continue function, this is for when we have initialised our playlist and now want to recommend the next song - this will therefore acr similarly to the above, however, will not add the initila suggested songs to the playlist_df:

In [None]:
def recommend_songs_continue(song_list, spotify_data, n_songs=1):
    global df, holding_df
    
    metadata_cols = ['name', 'year', 'artists', 'id']
    song_dict = flatten_dict_list(song_list)
    
    song_center = get_mean_vector(song_list, spotify_data)
    scaler = song_cluster_pipeline.steps[0][1]
    scaled_data = scaler.transform(spotify_data[number_cols])
    scaled_song_center = scaler.transform(song_center.reshape(1, -1))
    distances = cdist(scaled_song_center, scaled_data, 'cosine')
    index = list(np.argsort(distances)[:, :n_songs][0])
    
    # Select recommended songs
    rec_songs = spotify_data.iloc[index]

    # Exclude input songs from the output by name
    rec_songs = rec_songs[~rec_songs['name'].isin(song_dict['name'])]

    # If no songs left, increase n_songs and relax criteria
    if rec_songs.empty:
        index = list(np.argsort(distances)[:, :n_songs * 2][0])
        rec_songs = spotify_data.iloc[index]
        rec_songs = rec_songs[~rec_songs['name'].isin(song_dict['name'])]

    # Convert the recommended songs to a dictionary format
    recommended_songs_dict = rec_songs[metadata_cols].to_dict(orient='records')
    
    # Convert the dictionary to a DataFrame
    recommended_songs_df = pd.DataFrame(recommended_songs_dict)
    
    # If holding_df is not already defined, create it with the required columns
    holding_df = pd.DataFrame(columns=metadata_cols)
    
    # Append the new recommendations to holding_df
    holding_df = pd.concat([holding_df, recommended_songs_df], ignore_index=True)


Let's now simplify so we can use the same command to address both fucntions above, they have both been wrapped up into the recommend_songs function which will use recommend_songs_initialise when the playlist_df is empty and recommend_songs_continue when it isn't:

In [None]:
def recommend_songs(song_list, spotify_data, n_songs=1):
    global playlist_df, holding_df
    
    if playlist_df.empty:
        # Run the initialisation process
        recommend_songs_initialise(song_list, spotify_data, n_songs)
    else:
        # Continue with the recommendation process
        recommend_songs_continue(song_list, spotify_data, n_songs)

In [None]:
# Defining the function 'df_to_list' which converts our playlist_df into a format compatable with the recommend_song function
def df_to_list(df):
    # Initialise an empty list to store the dictionaries
    result_list = []
    
    # Iterate over each row in the DataFrame
    for index, row in df.iterrows():
        # Construct a dictionary for each row with 'name' and 'year' columns
        song_dict = {'name': row['name'], 'year': row['year']}
        
        # Append the dictionary to the result list
        result_list.append(song_dict)
    
    # Return the list of dictionaries
    return result_list

In [None]:
def next():
    return recommend_songs(df_to_list(playlist_df), song_df)

In [None]:
# Global variable to track the number of times the function has been called
skip_count = 0

def recommend_songs_skip(song_list, spotify_data, n_songs=1):
    global holding_df, skip_count
    
    metadata_cols = ['name', 'year', 'artists', 'id']
    song_dict = flatten_dict_list(song_list)
    
    song_center = get_mean_vector(song_list, spotify_data)
    scaler = song_cluster_pipeline.steps[0][1]
    scaled_data = scaler.transform(spotify_data[number_cols])
    scaled_song_center = scaler.transform(song_center.reshape(1, -1))
    distances = cdist(scaled_song_center, scaled_data, 'cosine')
    # Increment skip_count by 1 for each function call
    skip_count += 1
    # Calculate the index based on skip_count
    index = list(np.argsort(distances)[:, skip_count:(skip_count + n_songs)][0])
    
    rec_songs = spotify_data.iloc[index]
    rec_songs = rec_songs[~rec_songs['name'].isin(song_dict['name'])]

    # Convert the recommended songs to a dictionary format
    recommended_songs_dict = rec_songs[metadata_cols].to_dict(orient='records')
    
    # Convert the dictionary to a DataFrame
    recommended_songs_df = pd.DataFrame(recommended_songs_dict)
    
    # If holding_df is not already defined, create it with the required columns
    holding_df = pd.DataFrame(columns=metadata_cols)
    
    # Append the new recommendations to holding_df
    holding_df = pd.concat([holding_df, recommended_songs_df], ignore_index=True)
    
    # Print playlist_df with text 'current playlist'
    print("Current Playlist:")
    print(playlist_df)

    #Printing a blank line to present better
    print("    ")

    # Print holding_df with text 'First recommended song'
    print("Next Recommended Song:")
    print(holding_df)
    print()
    
    get_spotify_link_holding()

In [None]:
def skip():
    return recommend_songs_skip(df_to_list(playlist_df), song_df)

In [None]:
def add():
    """
    Add songs from holding_df to playlist_df without duplicates.
    """
    global playlist_df, holding_df, skip_count
    # Concatenate holding_df with playlist_df
    playlist_df = pd.concat([playlist_df, holding_df], ignore_index=True)
    # Drop duplicate rows based on subset of columns excluding 'artists'
    playlist_df = playlist_df.drop_duplicates(subset=['id'], ignore_index=True)
    # Resetting the skip counter
    skip_count = 0

    next()
    
    # Print playlist_df with text 'current playlist'
    print("Current Playlist:")
    print(playlist_df)

    #Printing a blank line to present better
    print("    ")

    # Print holding_df with text 'First recommended song'
    print("Recommended Song:")
    print(holding_df)
    print()

    get_spotify_link_holding()

In [None]:
def finalise():
    global playlist_df

    # Ensure playlist_df is defined and has the necessary columns
    if playlist_df is None or playlist_df.empty:
        print("The playlist_df is empty or not defined. No songs to add to the playlist.")
        return None

    try:
        # Authenticate with Spotify API
        sp = spotipy.Spotify(auth_manager=SpotifyOAuth(client_id='23671532bcb84c5ea53092ffc7cb8f12',
                                                       client_secret='6c40f572e59a4a2a80e12111a380867d',
                                                       redirect_uri='http://localhost:8885/callback',
                                                       scope='playlist-modify-public'))
        
        # Prompt the user to input the username
        username = input("Enter your Spotify username: ").strip()

        # Check if username is valid
        if not username:
            print("Invalid username. Please try again.")
            return None

        # Prompt the user to input the playlist name
        playlist_name = input("Enter the name of the playlist: ").strip()

        # Check if playlist name is valid
        if not playlist_name:
            print("Invalid playlist name. Please try again.")
            return None

        # Create a new playlist
        playlist = sp.user_playlist_create(user=username, name=playlist_name, public=True)

        # Extract track IDs from the playlist_df DataFrame
        track_ids = playlist_df['id'].tolist()

        # Check if track_ids is not empty
        if not track_ids:
            print("No tracks available in playlist_df to add to the playlist.")
            return None

        # Add tracks to the playlist
        sp.playlist_add_items(playlist_id=playlist['id'], items=track_ids)

        # Obtain the public link to the playlist
        playlist_link = playlist['external_urls']['spotify']

        # Empty the playlist_df
        playlist_df = pd.DataFrame(columns=['name', 'year', 'artists', 'id'])

        print(f"Playlist '{playlist_name}' created successfully. Link: {playlist_link}")

        return playlist_link
    except spotipy.SpotifyException as e:
        print(f"Spotify API error: {e}")
        return None
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return None

In [None]:
def initialise():
    # Initialising the Spotify client with my client ID and secret
    sp = spotipy.Spotify(auth_manager=SpotifyClientCredentials(client_id="23671532bcb84c5ea53092ffc7cb8f12",
                                                           client_secret="6c40f572e59a4a2a80e12111a380867d"))
    # Prompt the user to input the name of the starting song
    song_name = input("Enter the name of your starting song: ")

    # Prompt the user to input the year of the starting song
    song_year = input("Enter the year of your starting song: ")

    # Call the recommend_songs function with the input
    recommend_songs([{'name': song_name, 'year': int(song_year)}], song_df)
    
    # Print playlist_df with text 'current playlist'
    print("Current Playlist:")
    print(playlist_df)

    #Printing a blank line to present better
    print("    ")

    # Print holding_df with text 'First recommended song'
    print("Recommended Song:")
    print(holding_df)
    print()
    
    get_spotify_link_holding()

In [None]:
def get_spotify_link_holding():
    """
    Get Spotify links for each song in the holding_df DataFrame and print them in a readable format.
    
    Parameters:
    holding_df (pd.DataFrame): DataFrame containing song information including 'id'.
    """
    sp = spotipy.Spotify(auth_manager=SpotifyClientCredentials(client_id="23671532bcb84c5ea53092ffc7cb8f12",
                                                           client_secret="6c40f572e59a4a2a80e12111a380867d"))
    for idx, row in holding_df.iterrows():
        try:
            track_id = row['id']
            track_name = row['name']
            
            # Fetches the song details using the track ID
            track_info = sp.track(track_id)
            
            # Extracts the Spotify link
            spotify_link = track_info['external_urls']['spotify']
            
            # Prints the song name and link in the desired format
            print(f"Have a listen, what do you think of '{track_name}'? {spotify_link}")
        
        except Exception as e:
            print(f"An error occurred for track ID {track_id}: {e}")
            print(f"Unable to retrieve link for '{track_name}'.")