In [None]:
!pip install scikit-surprise

Collecting scikit-surprise
  Downloading scikit-surprise-1.1.3.tar.gz (771 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/772.0 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m92.2/772.0 kB[0m [31m2.6 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m768.0/772.0 kB[0m [31m11.6 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m772.0/772.0 kB[0m [31m10.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: scikit-surprise
  Building wheel for scikit-surprise (setup.py) ... [?25l[?25hdone
  Created wheel for scikit-surprise: filename=scikit_surprise-1.1.3-cp310-cp310-linux_x86_64.whl size=3163489 sha256=7d8a834fd459e8cc5ed46bb359b1c4687869573022be6f99c54b1ca0f8920207
  Stored in directory: /root/.cache/pip/wheels

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import json
from collections import defaultdict
from typing import List

import numpy as np
import pandas as pd
from surprise import Dataset
from surprise import SVD
from surprise.dump import load
from surprise.model_selection import train_test_split
from surprise.reader import Reader
from tensorflow import keras
from tqdm import tqdm

In [None]:
CNN_MODEL_PATH = "/content/drive/Shareddrives/SWM Project/models/cnn_model"
DATA_PATH = "/content/drive/Shareddrives/SWM Project/data/50k_sampled_playlists.json"
MF_MODEL_PATH = "/content/drive/Shareddrives/SWM Project/models/sample_split"
SPOTIFY_AUGMENTED_DATA = "/content/drive/Shareddrives/SWM Project/data/spotify_augmentation.json"
UNIQUE_TRACKS_PATH = "/content/drive/Shareddrives/SWM Project/data/unique_tracks.npy"

# Loading the playlists and related information

In [None]:
RAW_TO_INNER_MAP_PATH = "/content/drive/Shareddrives/SWM Project/data/raw_to_svd_index.json"
with open(RAW_TO_INNER_MAP_PATH) as fp:
    raw2inner_id_items = json.load(fp)
raw2inner_id_items = {int(float(k)): v for k, v in raw2inner_id_items.items()}

In [None]:
DATA_PATH = '/content/drive/Shareddrives/SWM Project/data/50k_sampled_playlists.json'
with open(DATA_PATH) as fp:
    playlists = json.load(fp)

print("Read input file...")

Read input file...


In [None]:
unique_tracks = np.load(UNIQUE_TRACKS_PATH)

# converting the `unique_tracks` list into a dictionary for convenient encoding
unique_tracks_dict = { track_uri: idx for idx, track_uri in enumerate(unique_tracks) }

# Extract the unique tracks from the dataset

In [None]:
from tqdm import tqdm
from collections import defaultdict
from typing import List

def _get_unique_tracks(playlists: List[dict], song_threshold=0) -> List:
    """
    Gets a list of unique songs from all the playlists. Returns the list of
    unique songs and the overall number of songs
    """
    total_songs = 0
    unique_tracks = defaultdict(int)
    for i in tqdm(range(len(playlists))):
        playlist = playlists[i]
        total_songs += len(playlist["tracks"])
        for track in playlist["tracks"]:
            unique_tracks[track["track_uri"]] += 1

    # filter the songs with threshold less than what's mentioned
    unique_tracks = [ k for k, v in unique_tracks.items() if v >= song_threshold ]

    return [
        unique_tracks, total_songs
    ]

In [None]:
import numpy as np
SAVE_UNIQUE_SONGS_PATH = '/content/drive/Shareddrives/SWM Project/data/unique_tracks.npy'

print("Fetching unique tracks...")
unique_tracks, total_tracks = _get_unique_tracks(playlists, song_threshold=5)
print(f"Found {total_tracks} tracks in the playlists, out of which {len(unique_tracks)} tracks were unique.")
np.save(SAVE_UNIQUE_SONGS_PATH, unique_tracks)

Fetching unique tracks...


100%|██████████| 50000/50000 [00:02<00:00, 18303.70it/s]


Found 3324891 tracks in the playlists, out of which 80283 tracks were unique.


# Building adjacency matrix

In [None]:
def _build_adjacency_matrix(playlists: List[dict], song_idxs: dict):
    """
    Builds a song-playlist adjacency matrix such that if a song is present in a
    playlist, the value in the matrix is set to 1, else 0.
    """
    # the rows of the adjacency matrix are the playlists and the columns are the song indices
    adj_matrix = np.empty((0, 3))
    temp_adj_matrix = np.empty((0, 3))
    for i in tqdm(range(len(playlists))):
        playlist = playlists[i]
        for track in playlist["tracks"]:
            track_idx = song_idxs.get(track["track_uri"], -1)
            if track_idx > -1:
                temp_adj_matrix = np.vstack((temp_adj_matrix, np.array([i + 1, track_idx + 1, 1])))
                if temp_adj_matrix.shape[0] >= 1000:
                    adj_matrix = np.vstack((adj_matrix, temp_adj_matrix))
                    temp_adj_matrix = np.empty((0, 3))
    return adj_matrix

In [None]:
unique_tracks_dict = { track_uri: idx for idx, track_uri in enumerate(unique_tracks) }
adj_matrix = _build_adjacency_matrix(playlists, unique_tracks_dict)

100%|██████████| 50000/50000 [01:01<00:00, 812.20it/s]


# Training data if required

In [None]:
data_df = pd.DataFrame.from_dict({
    "user_id": adj_matrix[:, 0],
    "item_id": adj_matrix[:, 1],
    "rating": adj_matrix[:, 2]
})
# Load the dataset
reader = Reader(rating_scale=(0, 1))
data = Dataset.load_from_df(data_df, reader=reader)
trainset = data.build_full_trainset()

In [None]:
print(trainset.n_items, trainset.n_users)

80283 49627


In [None]:
# Use SVD algorithm
model = SVD()

# Train the model
model.fit(trainset)

<surprise.prediction_algorithms.matrix_factorization.SVD at 0x7ef2214a4610>

In [None]:
from surprise.dump import dump

dump(MF_MODEL_PATH, model)

# Load models - CNN and SVD

In [None]:
cnn_model = keras.models.load_model(CNN_MODEL_PATH)
mf_model, _ = load(MF_MODEL_PATH)

# Prediction

In [None]:
def _vectorize_playlist(playlist, item_matrix, track_index, raw_to_inner_item_map, rows_after_padding=100):
    """
    Converts a playlist into a matrix where every row is a track vector.
    If the number of songs in a playlist is <100, then the rest of the rows are
    filled with zeros.
    Returns: a matrix of shape `max(POSSIBLE_OUTCOMES)` * `item_matrix.shape[1]`
    """
    playlist_matrix = np.empty((0, item_matrix.shape[1]))
    for track in playlist["tracks"]:
        # check if the track exists in the global map
        _track_idx = track_index.get(track["track_uri"], -1)

        # check if the track is mapped in the SVD matrix generated (all song vectors weren't generated)
        _track_idx = raw_to_inner_item_map.get(_track_idx, -1)
        if _track_idx > -1:
            playlist_matrix = np.vstack((playlist_matrix, item_matrix[_track_idx]))

    padding_rows = rows_after_padding - playlist_matrix.shape[0]
    playlist_matrix = np.vstack((
        playlist_matrix,
        np.zeros((padding_rows, item_matrix.shape[1]))
    ))

    return playlist_matrix

In [None]:
def _generate_recommendations(playlist_vector: np.array, track_matrix: np.ndarray, n_recomm: int):
    similarity_scores = np.dot(track_matrix, playlist_vector)
    sorted_indices = np.argsort(similarity_scores)
    similarity_scores = similarity_scores[sorted_indices]

    return sorted_indices[-n_recomm:][::-1], similarity_scores[-n_recomm:][::-1]

In [None]:
def _get_track_uris_from_recommendation_ids(recommendation_ids, idx_to_uri, inner2raw_id_items):
    track_uris = []
    for _id in recommendation_ids:
        track_uri = idx_to_uri.get(inner2raw_id_items[_id])
        if track_uri is not None:
            track_uris.append(track_uri)

    return track_uris

In [None]:
_inner2raw_id_items = {v: k for k, v in trainset._raw2inner_id_items.items()}
idx_to_track_uri = { v: k for k, v in unique_tracks_dict.items() }

In [None]:
from copy import deepcopy


playlist_ind=[]
r_prec=[]

SAMPLE_PLAYLIST_INDEX = 1000

for _sample_playlist_idx in range(100):
    curr_playlist = deepcopy(playlists[_sample_playlist_idx])
    seed_length = len(curr_playlist["tracks"]) - 10 if len(curr_playlist["tracks"]) < 100 else 100

    if seed_length < 1:
        print(f"Seed length is less than zero: got {seed_length}")
        continue


    # generate playlist matrix
    curr_playlist["tracks"] = curr_playlist["tracks"][:seed_length]
    playlist_matrix = _vectorize_playlist(
        curr_playlist,
        mf_model.qi,
        unique_tracks_dict,
        trainset._raw2inner_id_items
    )
    playlist_matrix = playlist_matrix.reshape((1, playlist_matrix.shape[0], playlist_matrix.shape[1]))

    # generating the playlist embedding from the playlist matrix
    prediction = cnn_model.predict(playlist_matrix, verbose=0)
    prediction = prediction.reshape(prediction.shape[1])


    # playlist_uris_without_masking= [track["track_uri"] for track in playlists[_sample_playlist_idx]["tracks"]]
    # fetching the uri's of the spotify tracks already in the current playlist, but not used when generating the embedding
    playlist_uris = [track["track_uri"] for track in playlists[_sample_playlist_idx]["tracks"]][-10:]


    # actual recommendation procedure
    recommendation_indices, scores = _generate_recommendations(prediction, mf_model.qi, 20000)

    track_uris = _get_track_uris_from_recommendation_ids(
        recommendation_indices,
        idx_to_track_uri,
        _inner2raw_id_items
    )

    # check for the intersection between the generated recommendations and the playlist URIs hidden when generating the embedding
    res = set(track_uris).intersection(set(playlist_uris))

    # if len(res) > 0:
    #     print(_sample_playlist_idx, (len(res)/len(playlist_uris_without_masking))*100)
    if len(res)>0:
      playlist_ind.append(_sample_playlist_idx)
      r_prec.append(len(res)/10)


Seed length is less than zero: got -4
Seed length is less than zero: got -1
Seed length is less than zero: got -3
Seed length is less than zero: got -4
Seed length is less than zero: got 0


# Reading Spotify Features for the Unique Songs

In [None]:
with open(SPOTIFY_AUGMENTED_DATA, 'r') as fp:
    spotify_song_features = json.load(fp)

print('Read Spotify song features.....')

Read Spotify song features.....


In [None]:
uri_to_song_features_mapping = {}
for i in spotify_song_features:
    uri_to_song_features_mapping[i['uri']] = i

In [None]:
album_uris = []
for item in playlists[_sample_playlist_idx]['tracks']:
    album_uris.append(item['track_uri'])

# Preprocessing the Spotify Song Features

In [None]:
from sklearn.preprocessing import StandardScaler

def selected_songs(list_of_songs, mappings):
    selected_songs = []
    filtered_songs = []
    fields_to_remove = ['type', 'id', 'uri', 'track_href', 'analysis_url']
    for i in list_of_songs:
        selected_songs.append(mappings[i])
    for obj in selected_songs:
        numeric_values = {key: value for key, value in obj.items() if key not in fields_to_remove}
        filtered_songs.append(numeric_values)
    data = pd.DataFrame(filtered_songs)
    scaler = StandardScaler()
    scaler.random_state = 42
    normalized_df = pd.DataFrame(scaler.fit_transform(data), columns=data.columns)
    normalized_list = normalized_df.to_dict(orient='records')
    return selected_songs,filtered_songs,normalized_list

In [None]:
selected_from_playlist, filtered_from_playlist, normalized_list_playlist = selected_songs(album_uris, uri_to_song_features_mapping)
selected_from_recommended, filtered_from_recommended, normalized_list_recommended = selected_songs(track_uris, uri_to_song_features_mapping)

# Compute Similarity Scores

In [None]:
import numpy as np

def _compute_similarity_score(json_a, json_b):
    # Extract relevant features for similarity computation
    features_a = np.array([json_a['danceability'], json_a['energy'], json_a['key'],
                           json_a['loudness'], json_a['mode'], json_a['speechiness'],
                           json_a['acousticness'], json_a['instrumentalness'],
                           json_a['liveness'], json_a['valence'], json_a['tempo'],
                           json_a['duration_ms'], json_a['time_signature']])

    features_b = np.array([json_b['danceability'], json_b['energy'], json_b['key'],
                           json_b['loudness'], json_b['mode'], json_b['speechiness'],
                           json_b['acousticness'], json_b['instrumentalness'],
                           json_b['liveness'], json_b['valence'], json_b['tempo'],
                           json_b['duration_ms'], json_b['time_signature']])

    # Compute the similarity score using the dot product
    similarity_score = np.dot(features_a, features_b)

    return similarity_score

def compute_similarity_matrix(list_a, list_b):
    # Initialize a matrix to store similarity scores
    similarity_matrix = np.zeros((len(list_a), len(list_b)))

    # Iterate through each JSON object in list A
    for i, json_a in enumerate(list_a):
        # Iterate through each JSON object in list B
        for j, json_b in enumerate(list_b):
            # Compute the similarity score and store it in the matrix
            similarity_matrix[i, j] = _compute_similarity_score(json_a, json_b)

    return similarity_matrix

In [None]:
similarity_matrix = compute_similarity_matrix(normalized_list_recommended, normalized_list_playlist)

#Fetch Top-K Recommendations

In [None]:
def fetch_top_k_recommendations(similarity_scores, weights, recommended_songs, top_k):
    scores = []
    mapping = {}
    for x, w in zip(similarity_scores, weights):
        scores.append(sum(x) * w)
    for i in range(len(recommended_songs)):
        mapping[scores[i]] = recommended_songs[i]['uri']
    sorted_scores = {k: mapping[k] for k in sorted(mapping, reverse=True)}
    top_k_items = {k: sorted_scores[k] for k in list(sorted_scores)[:top_k]}
    recommended_uris = [v for k,v in top_k_items.items()]
    return recommended_uris

In [None]:
top_k = fetch_top_k_recommendations(similarity_matrix, scores, selected_from_recommended, 20)

**Ranking Similarity Score (Kendall's Tau)**

In [None]:
def find_positions(a1, a2):
    positions = {}

    # Create a dictionary with positions of strings in a1
    for i, string in enumerate(a1):
        positions[string] = i  # Adding 1 to make positions 1-indexed

    # Find positions of strings in a2
    result = [positions.get(string, -1) for string in a2]

    return result

def map_to_01(x):
    mapped_value = (x + 1) / 2
    return mapped_value

def kendall_tau(actual, predicted):
    concordant_pairs = 0
    discordant_pairs = 0

    n = len(actual)

    for i in range(n - 1):
        for j in range(i + 1, n):
            # Check for concordant and discordant pairs
            if (actual[i] < actual[j] and predicted[i] < predicted[j]) or (actual[i] > actual[j] and predicted[i] > predicted[j]):
                concordant_pairs += 1
            elif (actual[i] < actual[j] and predicted[i] > predicted[j]) or (actual[i] > actual[j] and predicted[i] < predicted[j]):
                discordant_pairs += 1

    # Calculate Kendall's Tau
    tau = (concordant_pairs - discordant_pairs) / (0.5 * n * (n - 1))

    return tau

In [None]:
commom_item=set(top_k).intersection(set(playlist_uris))
if len(commom_item)>0:
  predicted_order = find_positions(top_k,commom_item)
  actual_order= find_positions(playlist_uris,commom_item)
  sorted_arr_actual = sorted(actual_order)
  new_actual_order=[]
  for i in range(0, len(actual_order), 1):
        new_actual_order.append(sorted_arr_actual.index(actual_order[i]))
  new_actual_order.append(-1)
  sorted_arr_predicted = sorted(predicted_order)
  new_predicted_order=[]
  for i in range(0, len(predicted_order), 1):
        new_predicted_order.append(sorted_arr_predicted.index(predicted_order[i]))
  new_predicted_order.append(-1)



  similarity_score = map_to_01(kendall_tau(new_actual_order, new_predicted_order))

  print(f"Ranking Similarity Score (Kendall's Tau): {similarity_score}")


# R-Precision

In [None]:
df_r_precision = pd.DataFrame({'Playlist_No': playlist_ind,'R-Precison':r_prec})

In [None]:
df_r_precision

Unnamed: 0,Playlist_No,R-Precison
0,0,0.2
1,1,0.1
2,3,0.1
3,4,0.1
4,6,0.1
...,...,...
65,90,0.1
66,94,0.2
67,95,0.1
68,96,0.1


In [None]:
df_r_precision.mean()

Playlist_No    48.271429
R-Precison      0.195714
dtype: float64

In [None]:
df_r_precision.to_csv('r_prec.csv', index=False)

In [None]:
max_index_column1 = df_r_precision['R-Precison'].idxmax()
print(max_index_column1)
print(df_r_precision.iloc[30])

30
Playlist_No    43.0
R-Precison      0.5
Name: 30, dtype: float64


# A Look at the Recommendations

In [None]:
#Taking a random Playlist
playlistNo=31
curr_playlist = deepcopy(playlists[playlistNo])
seed_length = len(curr_playlist["tracks"])

# generate playlist matrix
curr_playlist["tracks"] = curr_playlist["tracks"][:seed_length]
for song in curr_playlist["tracks"]:
    print(song['track_name']+" by "+song['artist_name'])
playlist_matrix = _vectorize_playlist(
    curr_playlist,
    mf_model.qi,
    unique_tracks_dict,
    trainset._raw2inner_id_items
)
playlist_matrix = playlist_matrix.reshape((1, playlist_matrix.shape[0], playlist_matrix.shape[1]))
# generating the playlist embedding from the playlist matrix
prediction = cnn_model.predict(playlist_matrix, verbose=0)
prediction = prediction.reshape(prediction.shape[1])
# actual recommendation procedure
recommendation_indices, scores = _generate_recommendations(prediction, mf_model.qi, 20000)
track_uris = _get_track_uris_from_recommendation_ids(
    recommendation_indices,
    idx_to_track_uri,
    _inner2raw_id_items
)
album_uris = []
for item in playlists[playlistNo]['tracks']:
    album_uris.append(item['track_uri'])
selected_from_playlist, filtered_from_playlist, normalized_list_playlist = selected_songs(album_uris, uri_to_song_features_mapping)
selected_from_recommended, filtered_from_recommended, normalized_list_recommended = selected_songs(track_uris, uri_to_song_features_mapping)
similarity_matrix = compute_similarity_matrix(normalized_list_recommended, normalized_list_playlist)
top_k = fetch_top_k_recommendations(similarity_matrix, scores, selected_from_recommended, 20)

All Me by Drake
Gold Digger by Kanye West
Butterfly Effect by Travis Scott
Down by Marian Hill
It's Time by Imagine Dragons
Thunder by Imagine Dragons
TiK ToK by Kesha
Die Young by Kesha
Blow by Kesha
Burnin' Up by Jonas Brothers
We R Who We R by Kesha
Cruise by Florida Georgia Line
Year 3000 by Jonas Brothers
Your Love Is My Drug by Kesha
Ridin' Solo by Jason Derulo
No Money by Galantis
Runaway (U & I) by Galantis
Peanut Butter Jelly by Galantis
Rich Boy by Galantis
Play That Song by Train
Lush Life by Zara Larsson
Lip Gloss by Lil Mama
Tell Me Something I Don't Know by Selena Gomez & The Scene
Promiscuous by Nelly Furtado
Congratulations by Post Malone
California Gurls - feat. Snoop Dogg by Katy Perry
A Thousand Miles by Vanessa Carlton
Eenie Meenie by Justin Bieber
Don't Stop The Music by Rihanna
Baby by Justin Bieber
7 Things by Miley Cyrus
Fire Burning by Sean Kingston
Fireflies by Owl City
Take It Off by Kesha
Worth It by Fifth Harmony
Here It Is (feat. Chris Brown) by Flo Rida
S

In [None]:
! pip install spotipy
import spotipy
my_spotify_config = {
    'username' : '31z5y2uydez3i2l2se5jy2uvvbvm',
    'client_id' : 'ab9520defec1419d8b7adab309912116',
    'client_secret' : '1c823c86239342b6b40d78ce6fa4035f',
    'redirect_uri' : 'http://localhost:8008/'
}
spotify_account_handle = spotipy.Spotify(
    auth_manager = spotipy.SpotifyOAuth(
    client_id = my_spotify_config['client_id'],
    client_secret = my_spotify_config['client_secret'],
    redirect_uri = my_spotify_config['redirect_uri'],
    scope = 'user-library-read',
    open_browser = False))



In [None]:
for track_id in top_k:
    track_info = spotify_account_handle.track(track_id)
    track_name = track_info['name']
    artist_name = track_info['artists'][0]['name']  # Assuming the first artist in the list
    print(f"{track_name} by {artist_name}")