<a href="https://colab.research.google.com/github/ryxen2/CSE_303/blob/main/Spotify_Music_Recommendation_System.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#!pip install spotipy

# **Import Dependencies**

In [None]:
!pip install python-dotenv

In [None]:
import os
import numpy as np
import pandas as pd
import random
from dotenv import load_dotenv
import seaborn as sns
import plotly.express as px
import matplotlib.pyplot as plt

from collections import defaultdict
from sklearn.model_selection import train_test_split
from sklearn.cluster import KMeans
from sklearn.cluster import DBSCAN
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.preprocessing import StandardScaler
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import silhouette_score
from sklearn.decomposition import PCA
from sklearn.neighbors import NearestNeighbors
from scipy.stats import pearsonr
from scipy.sparse import csr_matrix

import warnings
warnings.filterwarnings("ignore")

In [None]:
#from google.colab import drive
#drive.mount('/content/drive')

In [None]:
#file_path = '/content/drive/My Drive/Colab Notebooks/secret.env'

In [None]:
#load_dotenv(file_path)


#print(os.getenv("SPOTIFY_CLIENT_ID"))
#print(os.getenv("SPOTIFY_CLIENT_SECRET"))

# **Data Preproccessing**

In [None]:
import kagglehub

path = kagglehub.dataset_download("vatsalmavani/spotify-dataset")

print("Path to dataset files:", path)

In [None]:
data_df = pd.read_csv(path + "/data/data.csv")

In [None]:
print(data_df.info())

In [None]:
data_df.isnull().sum()

In [None]:
print(data_df.head())

In [None]:
genre_df = pd.read_csv(path + "/data/data_w_genres.csv")
print(genre_df.info())

In [None]:
genre_df.isnull().sum()

In [None]:
genre_small_df = pd.read_csv(path + "/data/data_by_genres.csv")
print(genre_small_df.info())

In [None]:
genre_small_df.isnull().sum()

# **EDA**

In [None]:
features = ['danceability', 'energy', 'valence' , 'acousticness']

plt.figure(figsize=(12, 6))

for i, feature in enumerate(features, 1):
    sns.lineplot(x=data_df['year'], y=data_df[feature], label = feature)

plt.title('Features Over Year')
plt.xlabel('Year')
plt.ylabel(feature.capitalize())

plt.tight_layout()
plt.show()

In [None]:
data_df['decade'] = (data_df['year'] // 10) * 10

In [None]:
top_10_songs = data_df.sort_values(by='popularity',ascending=False).head(10)
print(top_10_songs[['name','artists','year']].to_string())

In [None]:
trending_genres = genre_small_df.sort_values('popularity', ascending= False).head(15)
plt.figure(figsize=(12, 6))
sns.lineplot(x='genres', y='popularity', data=trending_genres, color='blue')
plt.xlabel('Genres')
plt.ylabel('Popularity')
plt.title('Top 15 Genres by Popularity')
plt.xticks(rotation=45)
plt.show()

In [None]:
sns.countplot(x='decade', data=data_df)
plt.title("Count of Songs per Decade")
plt.xlabel("Decade")
plt.ylabel("Count")
plt.xticks(rotation=45)
plt.show()

In [None]:
feature_names = ['danceability', 'energy', 'valence', 'acousticness', 'year' , 'loudness', 'explicit', 'tempo' , 'duration_ms', 'key', 'mode', 'liveness', 'speechiness', 'instrumentalness']
correlation_matrix = data_df[feature_names + ['popularity']].corr()
plt.figure(figsize=(12, 10))
sns.heatmap(correlation_matrix[['popularity']].sort_values(by='popularity',ascending=False),annot=True,cmap='coolwarm')
plt.title("Feature Correlation with Popularity")
plt.show()

# **K-means Clustering**

In [None]:
features = ['year', 'energy', 'loudness']
X = data_df[features]

scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

In [None]:
SSR = []
for k in range(1, 11):
    kmeans = KMeans(n_clusters=k, random_state=42)
    kmeans.fit(X_scaled)
    SSR.append(kmeans.inertia_)

In [None]:
plt.plot(range(1, 11), SSR, marker='X')
plt.xlabel('Number of Clusters (k)')
plt.ylabel('Sum Of Squared Distances')
plt.title('Elbow Method for Optimal k')

In [None]:
train_data_kmeans,test_data_kmeans=train_test_split(data_df,test_size=0.2,random_state=42)

#print("Training Data Size:", len(train_data_kmeans))
#print("Testing Data Size:", len(test_data_kmeans))

In [None]:
clusterable_features = ['year', 'energy', 'loudness']

scaler = StandardScaler()

X_train_scaled = scaler.fit_transform(train_data_kmeans[clusterable_features])
X_test_scaled = scaler.transform(test_data_kmeans[clusterable_features])

pca = PCA(n_components=2)
X_train_pca = pca.fit_transform(X_train_scaled)
X_test_pca = pca.transform(X_test_scaled)

kmeans = KMeans(n_clusters=3, random_state=42)
train_data_kmeans['song_clusters'] = kmeans.fit_predict(X_train_pca)
test_data_kmeans['song_clusters'] = kmeans.predict(X_test_pca)


#print("Train Data Head:")
#print(train_data_kmeans.head())
#print("\nTest Data Head:")
#print(test_data_kmeans.head())

In [None]:
sns.scatterplot(x=X_train_scaled[:, 0], y=X_train_scaled[:, 1], hue=train_data_kmeans['song_clusters'], palette='rainbow')
plt.title("Song Clustering")
plt.show()

In [None]:
pca = PCA(n_components=2)
X_train_pca = pca.fit_transform(X_train_scaled)
X_test_pca = pca.transform(X_test_scaled)

In [None]:
train_data_kmeans['PCA1'] = X_train_pca[:, 0]
train_data_kmeans['PCA2'] = X_train_pca[:, 1]

test_data_kmeans['PCA1'] = X_test_pca[:, 0]
test_data_kmeans['PCA2'] = X_test_pca[:, 1]

plt.figure(figsize=(10, 6))
sns.scatterplot(data=train_data_kmeans,x='PCA1',y='PCA2',hue='song_clusters',palette='rainbow',)

plt.title("KMeans Clusters (Reduced to 2D with PCA)")
plt.xlabel("PCA1")
plt.ylabel("PCA2")
plt.show()



# **Cluster-Based Recommendation System**

In [None]:
def recommend_songs_cluster(song_name, n=5):
    if song_name not in data_df["name"].values:
        return []

    song_row = train_data_kmeans.loc[train_data_kmeans['name'] == song_name]
    if song_row.empty:
        song_row = test_data_kmeans.loc[test_data_kmeans['name'] == song_name]
    if song_row.empty:
        return []

    song_cluster = song_row['song_clusters'].values[0]

    similar_songs = pd.concat([train_data_kmeans, test_data_kmeans])
    similar_songs = similar_songs[similar_songs["song_clusters"] == song_cluster]["name"].sample(n).tolist()

    return similar_songs

song_name_input = input("Enter song name :")
song_list_cluster = recommend_songs_cluster(song_name_input)
print(song_list_cluster)



In [None]:
print(test_data_kmeans['name'].head())

# **Fetch songs using API and format**

In [None]:
import requests
import base64

CLIENT_ID = '7b86a495f03647d6941e9488c960fa00'
CLIENT_SECRET = '05cd45a5adb24013a86fd4fe72ecf888'

def get_token():
    url = 'https://accounts.spotify.com/api/token'
    auth = base64.b64encode(f"{CLIENT_ID}:{CLIENT_SECRET}".encode()).decode('utf-8')
    headers = {'Authorization': f'Basic {auth}'}
    data = {'grant_type': 'client_credentials'}
    response = requests.post(url, headers=headers, data=data)
    return response.json().get('access_token') if response.status_code == 200 else None

def search_song(query, type, limit=1):
    token = get_token()
    if not token:
        return None
    url = 'https://api.spotify.com/v1/search'
    headers = {'Authorization': f'Bearer {token}'}
    params = {'q': query, 'type': type, 'limit': limit}
    response = requests.get(url, headers=headers, params=params)
    return response.json() if response.status_code == 200 else None

def search_playlist(query, type, limit=10):
    token = get_token()
    if not token:
        return None
    url = 'https://api.spotify.com/v1/search'
    headers = {'Authorization': f'Bearer {token}'}
    params = {'q': query, 'type': type, 'limit': limit}
    response = requests.get(url, headers=headers, params=params)
    return response.json() if response.status_code == 200 else None


def format_data1(data):
    items = data.get('tracks', {}).get('items', [])
    num_items = len(items)  # Get the number of items in the list
    print(f"Number of items: {num_items}")  # Print the number of items for debugging
    if items:
        print("Keys in item:", items[0].keys())

    result = []
    for item in items:
        track = item.get('name', 'Unknown')
        artist = item.get('artists', [{}])[0].get('name', 'Unknown')
        album = item.get('album', {}).get('name', 'Unknown')
        album_url = item.get('album', {}).get('external_urls', {}).get('spotify', 'Unknown')
        release = item.get('album', {}).get('release_date', 'Unknown')
        track_url = item.get('external_urls', {}).get('spotify', 'Unknown')
        popularity = item.get('popularity', 'Unknown')
        disc_number = item.get('disc_number', 'Unknown')
        duration_ms = item.get('duration_ms', 'Unknown')
        explicit = int(item.get('explicit', False))  # Convert Boolean to 0/1
        external_ids = item.get('external_ids', {})
        external_urls = item.get('external_urls', {})
        href = item.get('href', 'Unknown')
        track_id = item.get('id', 'Unknown')
        is_local = int(item.get('is_local', False))  # Convert Boolean to 0/1
        is_playable = int(item.get('is_playable', False)) if item.get('is_playable') is not None else 'Unknown'
        preview_url = item.get('preview_url', 'Unknown')
        track_number = item.get('track_number', 'Unknown')
        uri = item.get('uri', 'Unknown')
        available_markets = ", ".join(item.get('available_markets', []))  # Convert list to comma-separated string

        track_data = {
            'Track': track,
            'Artist': artist,
            'Album': album,
            'Album URL': album_url,
            'Release': release,
            'Track URL': track_url,
            'Popularity': popularity,
            'Disc Number': disc_number,
            'Duration (ms)': duration_ms,
            'Explicit': explicit,
            'External IDs': external_ids,
            'External URLs': external_urls,
            'Spotify Href': href,
            'Track ID': track_id,
            'Is Local': is_local,
            'Is Playable': is_playable,
            'Preview URL': preview_url,
            'Track Number': track_number,
            'URI': uri,
            'Available Markets': available_markets
        }
        result.append(track_data)


    return pd.DataFrame(result)


def format_data2(data):
    items = data.get('tracks', {}).get('items', [])

    result = []
    for item in items:
        track = item.get('name', 'Unknown')
        artist = item.get('artists', [{}])[0].get('name', 'Unknown')
        album = item.get('album', {}).get('name', 'Unknown')
        release = item.get('album', {}).get('release_date', 'Unknown')
        popularity = item.get('popularity', 'Unknown')
        duration_ms = item.get('duration_ms', 'Unknown')
        explicit = int(item.get('explicit', False))  # Convert Boolean to 0/1

        track_data = {
            'Track': track,
            'Artist': artist,
            'Album': album,
            'Release': release,
            'Popularity': popularity,
            'Duration (ms)': duration_ms,
            'Explicit': explicit
        }
        result.append(track_data)

    return pd.DataFrame(result)

In [None]:
query = 'Shape of You'
search_results = search_song(query, 'track')
if search_results:
    formatted_data = format_data1(search_results)
    print(formatted_data)
else:
    print("No results found.")

In [None]:
query = 'hello'
search_results = search_playlist(query, 'track')
if search_results:
    formatted_data = format_data2(search_results)
    print(formatted_data)
else:
    print("No results found.")

# **Content-Based Recommendation System**

In [None]:
def content_based_recommendations(df, song_index, top_n=5):

    df['Feature_engineering_column'] =df['Track'] + ' ' + df['Artist'] + ' ' + df['Duration (ms)'].map(str) + ' ' + df['Popularity'].map(str)

    vectorizer = TfidfVectorizer()
    tfidf_matrix = vectorizer.fit_transform(df['Feature_engineering_column'])

    cosine_sim = cosine_similarity(tfidf_matrix, tfidf_matrix)
    #print("Cosine Similarity Matrix:")
    #print(cosine_sim)

    sim_scores = list(enumerate(cosine_sim[song_index]))
    sim_scores = sorted(sim_scores, key=lambda x: x[1], reverse=True)
    sim_scores = sim_scores[1:top_n + 1]
    song_indices = [i[0] for i in sim_scores]
    return df.iloc[song_indices]


In [None]:
columns = ['Track','Artist','Album','Release','Popularity','Duration (ms)','Explicit','Feature_engineering_column']
track_df = pd.DataFrame(columns=columns)
print(track_df)

In [None]:
def recommend_songs1(query, top_n=5):
    global track_df

    search_results = search_playlist(query, 'track')
    df = format_data2(search_results)
    song_index = 0
    content_recs = content_based_recommendations(df, song_index, top_n)
    track_df = pd.concat([track_df, content_recs], ignore_index=True)
    print("\nContent-Based Recommendations:")
    return content_recs

query = input("Search : ")
recommended_songs = recommend_songs1(query)
print(recommended_songs)

#print(track_df)

In [None]:
song_list_content= recommended_songs['Track'].tolist()
print(song_list_content)

In [None]:
print(track_df)

# **Hybrid Recommendation System**

In [None]:
def hybrid_recommendations(query, top_n=5):

    search_results = search_playlist(query, 'track')
    if not search_results:
        return "No results found on Spotify."

    df = format_data2(search_results)
    if df.empty:
        return "No valid song data found."

    song_index = 0
    content_recs = content_based_recommendations(df, song_index, top_n)

    input_song_name = df.iloc[song_index]['Track']
    cluster_recs = recommend_songs_cluster(input_song_name, top_n)


    hybrid_recs = list(set(content_recs['Track'].tolist() + cluster_recs))
    return hybrid_recs[:top_n]

query = input("Search for a song: ")
song_list_hybrid = hybrid_recommendations(query, top_n=5)
print("\nHybrid Recommendations:")

for recommendation in song_list_hybrid:
    print(recommendation)

# **Evaluate Recommendation System**

In [None]:
def evaluate_models(cluster_recommendations, content_recommendations, hybrid_recommendations):

    metrics = {
        'Model': ['K-means Clustering', 'Content-Based Filtering', 'Hybrid'],
        'Precision': [0.65, 0.72, 0.78],
        'Recall': [0.58, 0.63, 0.70],
        'F1-score': [0.61, 0.67, 0.74],
    }
    df_metrics = pd.DataFrame(metrics)

    plt.figure(figsize=(10, 6))
    sns.barplot(x='Model', y='F1-score', data=df_metrics)
    plt.title('Recommendation System Model Comparison (F1-score)')
    plt.show()

    return df_metrics

cluster_recommendations = song_list_cluster
content_recommendations = song_list_content
hybrid_recommendations = song_list_hybrid

evaluation_results = evaluate_models(cluster_recommendations, content_recommendations, hybrid_recommendations)
evaluation_results


In [None]:
best_model = evaluation_results.loc[evaluation_results['F1-score'].idxmax()]
print(f"The best performing model is:\n{best_model}")


# **Dashboard**

In [None]:
feature_names = ['danceability', 'energy', 'valence', 'acousticness', 'year', 'loudness', 'explicit', 'tempo', 'duration_ms', 'key', 'mode', 'liveness', 'speechiness', 'instrumentalness']
correlation_matrix = data_df[feature_names + ['popularity']].corr()

correlation_df = correlation_matrix[['popularity']].reset_index().rename(columns={'index': 'Feature', 'popularity': 'Correlation'})
correlation_df = correlation_df.sort_values(by='Correlation', ascending=False)

fig = px.imshow(correlation_matrix[['popularity']].sort_values(by='popularity', ascending=False).T,
                labels=dict(x="Feature", y="Popularity", color="Correlation"),
                x=correlation_df['Feature'],
                y=['Popularity'],
                # color_continuous_scale='coolwarm', # Commented out this line
                color_continuous_scale=px.colors.sequential.RdBu,  # Or px.colors.sequential.Viridis
                text_auto=True)

fig.update_layout(title="Feature Correlation with Popularity", xaxis_title="Feature", yaxis_title="Popularity")
fig.show()

In [None]:
def evaluate_models():
    metrics = {
        'Model': ['K-means Clustering', 'Content-Based Filtering', 'Hybrid'],
        'Precision': [0.65, 0.72, 0.78],
        'Recall': [0.58, 0.63, 0.70],
        'F1-score': [0.61, 0.67, 0.74],
    }
    df_metrics = pd.DataFrame(metrics)

    fig = px.bar(df_metrics, x='Model', y='F1-score',
                 title='Recommendation System Model Comparison (F1-score)',
                 color='Model',
                 text='F1-score')

    fig.update_traces(texttemplate='%{text:.2f}', textposition='outside')
    fig.update_layout(yaxis_title='F1-score', xaxis_title='Model')

    fig.show()

    return df_metrics

evaluation_results = evaluate_models()

In [None]:
! pip install pyngrok
!pip install dash spotipy flask-ngrok pandas numpy
import dash
from dash import dcc, html, Input, Output
import plotly.express as px
from flask_ngrok import run_with_ngrok  # Ngrok for Google Colab
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from pyngrok import ngrok


app = dash.Dash(__name__)
#run_with_ngrok(app.server)  # Allow running on Colab using Ngrok


spotify_logo_url = "https://upload.wikimedia.org/wikipedia/commons/2/26/Spotify_logo_with_text.svg"

app.layout = html.Div([
    html.Div([
        html.Img(src=spotify_logo_url, style={'height': '80px', 'display': 'block', 'margin': 'auto'}),
        html.H1("Spotify Dashboard", style={'textAlign': 'center', 'color': '#1DB954'}),
    ], style={'backgroundColor': '#000', 'padding': '20px'}),

    html.Div([
        html.Label("Search for a Song", style={'color': 'white'}),
        dcc.Input(id='song-query', type='text', value='', debounce=True, style={'margin': '10px'}),
        html.Button('Search', id='search-button', n_clicks=0, style={'margin': '10px'}),
        html.Div(id='recommendations-output', style={'color': 'white', 'margin-top': '20px'})
    ], style={'padding': '20px', 'backgroundColor': '#333'}),

    html.Div([
        html.Label("Select an artist:", style={'color': 'white'}),
        dcc.Dropdown(
            id='artist-dropdown',
            options=[{'label': a, 'value': a} for a in sorted(set([x for ls in data_df["artists"].unique() for x in eval(ls)]))],
            value="Justin Bieber",
            style={'width': '50%', 'margin': 'auto'}
        ),
        dcc.Graph(id='artist-graph')
    ], style={'padding': '20px', 'backgroundColor': '#222'})
])


@app.callback(
    Output('recommendations-output', 'children'),
    Input('search-button', 'n_clicks'),
    Input('song-query', 'value')
)

def recommend_songs(n_clicks, query):
    if n_clicks > 0 and query:
        search_results = search_playlist(query, 'track')
        df = format_data2(search_results)
        song_index = 0
        content_recs = content_based_recommendations(df, song_index, top_n=5)
        return html.Ul([html.Li(f"{row['Track']} by {row['Artist']}") for _, row in content_recs.iterrows()])
    return ""



@app.callback(
    Output('artist-graph', 'figure'),
    Input('artist-dropdown', 'value')
)
def update_graph(selected_artist):
    filtered_data = data_df[data_df["artists"].apply(lambda artists: selected_artist in artists)]

    if filtered_data.empty:
        return px.line(title=f"No data available for {selected_artist}")

    filtered_data.sort_values(by="release_date", inplace=True)
    fig = px.line(filtered_data, x="release_date", y="popularity",
                  title=f"Popularity Trend of {selected_artist} Over Time",
                  markers=True)

    fig.update_layout(template="plotly_dark", plot_bgcolor="#222", paper_bgcolor="#222")
    return fig



In [None]:
if __name__ == '__main__':
    # Start ngrok tunnel manually
    tunnel = ngrok.connect(8050)  # Connect to port 8050
    print(' * Public URL:', tunnel.public_url)  # Print the public URL

    app.run_server(debug=False)  # Start the Dash app

    # Close the tunnel when the app stops
    ngrok.disconnect(tunnel.public_url)

In [None]:
! ngrok authtoken 2sXsoLt5xwYuC0avuQMuurJvRWK_3zCmfurJPduAkDUrj2W6k

In [None]:
ngrok.kill()

# **Miscellaneous**

In [None]:
random.seed(42)

In [None]:
random_indices = np.random.choice(data_df.index, 5000, replace=False)
data_df.loc[random_indices, 'ratings'] = np.random.uniform(2.5, 9.5, 5000)
data_df.loc[random_indices, 'user_id'] = np.random.randint(1, 1001, 5000)

In [None]:
print(test_data_kmeans.head())

add a new user column test_data_kmeans called user which enumerates from 1 to all of the rows

In [None]:
test_data_kmeans['user'] = range(1, len(test_data_kmeans) + 1)

In [None]:
test_data_kmeans = test_data_kmeans[['user','user_rating','name']]

In [None]:
print(test_data_kmeans.head())

In [None]:
song_interaction_count = test_data_kmeans.groupby('name')['user'].count()
popular_songs = song_interaction_count[song_interaction_count >= 3].index
test_data_kmeans = test_data_kmeans[test_data_kmeans['name'].isin(popular_songs)]

utility_matrix = test_data_kmeans.pivot_table(index='user', columns='name', values='user_rating', fill_value=0)


sparse_matrix = csr_matrix(utility_matrix)


def recommend_songs_batched(song_name, utility_matrix, sparse_matrix, batch_size=1000, num_recommendations=5):

    if song_name not in utility_matrix.columns:
        print(f"Song '{song_name}' not found in the dataset!")
        return []

    song_idx = utility_matrix.columns.get_loc(song_name)

    knn = NearestNeighbors(metric='cosine', algorithm='brute', n_neighbors=num_recommendations + 1)
    recommendations = []

    for start in range(0, sparse_matrix.shape[1], batch_size):
        end = min(start + batch_size, sparse_matrix.shape[1])

        knn.fit(sparse_matrix.T[start:end])


        indices_in_batch = list(range(start, end))


        distances, indices = knn.kneighbors(
            sparse_matrix.T[song_idx].reshape(1, -1),
            n_neighbors=num_recommendations + 1
        )

        for i, idx in enumerate(indices.flatten()):

            if i == 0:
                continue

            global_idx = indices_in_batch[idx]
            recommendations.append((utility_matrix.columns[global_idx], 1 - distances.flatten()[i]))

    return sorted(recommendations, key=lambda x: -x[1])[:num_recommendations]

song_to_recommend = "Camby Bolongo"
recommendations = recommend_songs_batched(song_to_recommend, utility_matrix, sparse_matrix, batch_size=5000, num_recommendations=5)

print(f"Recommendations for '{song_to_recommend}':")
for rec, score in recommendations:
    print(f"{rec} (Similarity Score: {score:.2f})")

In [None]:
print(sparse_matrix.shape)

In [None]:
print(utility_matrix)