$\Large \text{Projects Technical Interview Code Workshop}$

In [None]:
import pandas as pd 
import numpy as np
import kagglehub
import random
import string
import spotipy
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from spotipy.oauth2 import SpotifyClientCredentials
import matplotlib.pyplot as plt
from dotenv import load_dotenv
import os
import time

$\text{Making the dataset - we're not going to go over this in our workshop, but you should definitly check it out at some point.}$

In [None]:
path = kagglehub.dataset_download("sgoutami/spotify-streaming-history")
path = path + "/spotify_history.csv"

raw_data = pd.read_csv(path)
raw_data['ts'] = pd.to_datetime(raw_data['ts'])

load_dotenv()
client_id = os.getenv("CLIENT_ID_2")
client_secret = os.getenv("CLIENT_SECRET_2")
auth_manager = SpotifyClientCredentials(client_id=client_id, client_secret=client_secret)
sp = spotipy.Spotify(auth_manager=auth_manager)

In [None]:
def generate_user_id(length):
    characters = string.ascii_letters + string.digits
    random_string = ''.join(random.choice(characters) for _ in range(length))
    return random_string

def assign_user_ids(df, user_id_labels):
    if len(user_id_labels) > len(df):
        user_id_labels = user_id_labels[:len(df)]
        df = df.sample(frac=1.0, random_state=42).reset_index(drop=True)
        df['userId'] = user_id_labels
        return df
    temp_user_id = generate_user_id(6)
    tracks = round(np.random.normal(loc=100, scale=20))
    user_id_labels.extend([temp_user_id for _ in range(tracks)])
    return assign_user_ids(df, user_id_labels)

def get_song_metadata(uris, sp):
    tracks_response = sp.tracks(uris)["tracks"]
    artist_ids = list({artist["id"] for track in tracks_response for artist in track["artists"]})
    artists_response = sp.artists(artist_ids)["artists"]
    artist_genre_map = {artist["id"]: artist.get("genres", "") for artist in artists_response}
    metadata_list = []
    for track in tracks_response:
        primary_artist = track["artists"][0]
        genres = artist_genre_map.get(primary_artist["id"], [])
        genre = genres[0] if genres else "" 

        metadata = {
            "track_name": track["name"],
            "artists": [artist["name"] for artist in track["artists"]],
            "album": track["album"]["name"],
            "release_date": track["album"]["release_date"],
            "genres": genre,
            "duration_ms": track["duration_ms"],
            "popularity": track["popularity"],
            "explicit": track["explicit"],
            "uri": track["uri"]
        }
        metadata_list.append(metadata)
    return metadata_list

new_dataset = assign_user_ids(raw_data, [])
new_dataset = new_dataset[["userId", "spotify_track_uri", "track_name", "ms_played", "ts"]]


In [None]:
tracks = new_dataset['spotify_track_uri'].unique().tolist()
track_information = []

for start in range(0, len(tracks), 25):
    stop = min(start + 25, len(tracks))
    if start % 1000 == 0 and start != 0:
        print(f"Processed {start} songs. Pausing for 2 minutes to avoid going over Spotify's rate limits.")
        time.sleep(120) 
    uris = tracks[start:stop] 
    try:
        temp_info = get_song_metadata(uris, sp)
    except Exception as e:
        print(f"Didn't process batch {start} to {stop} due to error: {e}.")
        temp_info = []
    track_information.extend(temp_info)
    
saved_tracks_df = pd.DataFrame(track_information)
saved_tracks_df.to_csv("spotify_song_metadata", index=False)
new_dataset.to_csv("spotify_streaming_history.csv", index=False)


$\text{Let's start by exploring our datasets and cleaning them.}$

In [None]:
streaming_history = pd.read_csv("spotify_streaming_history.csv")
song_metadata = pd.read_csv("spotify_song_metadata.csv")

In [None]:
streaming_history.sort_values(by=['ts'], ascending=False).head(10)

In [None]:
song_metadata.head(10)

In [None]:
song_metadata["artists"] = song_metadata["artists"].apply(lambda x: eval(x)[0])
song_metadata['uri'] = song_metadata['uri'].str.replace("spotify:track:", "")
song_metadata = song_metadata[['artists', 'album', 'release_date', 'genres', 'duration_ms', 'popularity', 'uri']]
streaming_history = streaming_history.merge(song_metadata, left_on='spotify_track_uri', right_on='uri', how='left').drop(columns=['uri'])
song_database = streaming_history.copy()
streaming_history = streaming_history[['userId', 'ms_played', 'genres', 'duration_ms', 'popularity', 'release_date']].dropna()
streaming_history['release_date'] = streaming_history['release_date'].astype('datetime64[ns]').dt.year

$\text{Next, lets one-hot encode the genres, release dates, and popularity, so we can group by user!}$

In [None]:
streaming_history = pd.get_dummies(streaming_history, columns=['genres']).rename(columns=lambda x: x.replace("genres_", ""))

year_bins = np.arange(1920, 2030, 10)  
popularity_bins = np.arange(0, 110, 25)
labels = [f"{start}s" for start in year_bins[:-1]]

streaming_history["decade"] = pd.cut(streaming_history["release_date"], bins=year_bins, labels=labels, right=False)
streaming_history["popularity_bin"] = pd.cut(streaming_history["popularity"], bins=popularity_bins, right=False)

streaming_history = pd.get_dummies(streaming_history, columns=["decade"], prefix="", prefix_sep="")
streaming_history = pd.get_dummies(streaming_history, columns=["popularity_bin"], prefix="popularity_", prefix_sep="")
streaming_history.drop(columns=['release_date', 'popularity'], inplace=True)

streaming_history = streaming_history[streaming_history['ms_played']/streaming_history['duration_ms'] > 0.7]
song_database = song_database[song_database['ms_played']/song_database['duration_ms'] > 0.7]
user_characteristics = streaming_history.groupby('userId').agg(np.mean).reset_index()

In [None]:
user_characteristics

In [None]:
song_database

$\text{Now that we have user characteristics, lets begin choosing a model to find the people who have the most similar listening patterns to any given user.}$

In [None]:
#First approach: KNN.
def euclidean_distance(user):
    return lambda x: np.sqrt(np.sum((x - user) ** 2))

def knn(user_id, k = 30):
    user_vector = np.array(user_characteristics[user_characteristics['userId'] == user_id])[0][3:]
    distance_func = euclidean_distance(user_vector)
    user_characteristics['distance'] = user_characteristics.apply(lambda row: distance_func(np.array(row[3:])), axis=1)
    nearest_neighbors = user_characteristics[user_characteristics['userId'] != user_id].nsmallest(k, 'distance')
    return nearest_neighbors['userId'].tolist()

def generate_playlist(user_id, num_songs=30):
    similar_users = knn(user_id, 100)
    similar_users_songs = song_database[song_database['userId'].isin(similar_users)]
    
    #Random Approach
    #recommended_songs = similar_users_songs.sample(n=num_songs, weights='ms_played', replace=False)
    
    #Deterministic Approach
    song_counts = similar_users_songs.value_counts('track_name')
    similar_users_songs ['counts'] = similar_users_songs ['track_name'].map(song_counts)
    recommended_songs = similar_users_songs .sort_values(by=['counts'], ascending=False).drop_duplicates(subset=['track_name']).head(num_songs)
    
    return recommended_songs[['track_name', 'artists', 'counts']]

generate_playlist('1RfKDh', num_songs=10)


In [None]:
#Second approach: Clustering with KMeans (PREFERRED).
def kMeans(k, df):
    X = df.iloc[:, 3:] 
    kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
    df['cluster'] = kmeans.fit_predict(X)
    return df

def plot_elbow_method(df, max_k=15):
    X = df.iloc[:, 3:] 
    wcss = []
    for k in range(1, max_k + 1):
        kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
        kmeans.fit(X)
        wcss.append(kmeans.inertia_)
    
    plt.figure(figsize=(10, 6))
    plt.plot(range(1, max_k + 1), wcss, marker='o')
    plt.title('Elbow Method For Optimal k')
    plt.xlabel('Number of clusters (k)')
    plt.ylabel('WCSS')
    plt.xticks(range(1, max_k + 1))
    plt.grid()
    plt.show()


def generate_playlist_kmeans(user_id, cluster_characteristics, num_songs=30):
    user_cluster = cluster_characteristics[cluster_characteristics['userId'] == user_id]['cluster'].values[0]
    cluster_users = cluster_characteristics[cluster_characteristics['cluster'] == user_cluster]['userId'].tolist()
    cluster_users_songs = song_database[song_database['userId'].isin(cluster_users)]
    
    #Random Approach
    #recommended_songs = cluster_users_songs.sample(n=num_songs, weights='ms_played', replace=False)
    
    #Deterministic Approach
    song_counts = cluster_users_songs.value_counts('track_name')
    cluster_users_songs ['counts'] = cluster_users_songs ['track_name'].map(song_counts)
    recommended_songs = cluster_users_songs .sort_values(by=['counts'], ascending=False).drop_duplicates(subset=['track_name']).head(num_songs)
    
    return recommended_songs[['track_name', 'artists', 'counts']]

In [None]:
plot_elbow_method(user_characteristics, max_k=15)

In [None]:
cluster_characteristics = kMeans(5, user_characteristics)
generate_playlist_kmeans('1RfKDh', cluster_characteristics, 10)