In [11]:
# danceability,energy,key,loudness,mode,speechiness,acousticness,instrumentalness,liveness,valence,tempo

!pip install spotipy
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
import os
from os import listdir
from os.path import isfile, join
import json
import re
import csv
import random

import tensorflow as tf
from tensorflow import keras

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [12]:
# load spotify api credentials from .env file

# Upload the .env file to the colab, then you'll be able to use the spotify API (sp)

!pip install python-dotenv
from dotenv import load_dotenv
load_dotenv()

CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID")
CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET")

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [13]:
#Authentication - without user
client_credentials_manager = SpotifyClientCredentials(client_id=CLIENT_ID, client_secret=CLIENT_SECRET)
sp = spotipy.Spotify(client_credentials_manager = client_credentials_manager)
sp

<spotipy.client.Spotify at 0x7fa6cd558e80>

In [14]:
datasetPath = "/content/subset" 

# read json files of playlists.
# returns array of playlists (playlists -> array of track ids)
def get_playlists_from_json(fileNames):
  playlists = [];

  for fileName in fileNames:
    f = open(datasetPath + "/" + fileName)
    resp = json.load(f)
    f.close()

    for playlist in resp["playlists"]:
      track_ids = []
      for track in playlist["tracks"]:
        track_ids.append(track["track_uri"])
      playlists.append(track_ids)
  
  return playlists

In [15]:
playlists = get_playlists_from_json(["mpd.slice.0-999.json"])
playlists2 = get_playlists_from_json(["mpd.slice.1000-1999.json"])
playlists3 = get_playlists_from_json(["mpd.slice.2000-2999.json"])
playlists4 = get_playlists_from_json(["mpd.slice.3000-3999.json"])
playlists5 = get_playlists_from_json(["mpd.slice.4000-4999.json"])
playlists6 = get_playlists_from_json(["mpd.slice.5000-5999.json"])
playlists7 = get_playlists_from_json(["mpd.slice.6000-6999.json"])
playlists8 = get_playlists_from_json(["mpd.slice.7000-7999.json"])
playlists9 = get_playlists_from_json(["mpd.slice.8000-8999.json"])
playlists10 = get_playlists_from_json(["mpd.slice.9000-9999.json"])
playlists = playlists + playlists2 + playlists3 + playlists4 + playlists5 + playlists6 + playlists7 + playlists8 + playlists9 + playlists10
len(playlists)

10000

In [16]:
# randomly generate a triplet tuple (anchor, positive, negative)
def random_triplet_generator():
  random_playlist = random.sample(range(10000), 2)
  playlist_len1 = len(playlists[random_playlist[0]])
  playlist_len2 = len(playlists[random_playlist[1]])
  random_indices = random.sample(range(playlist_len1), 2)
  random_index_neg = random.randint(0, len(playlists[random_playlist[1]])-1)
  random_song_anchor = random_indices[0]
  random_song_positive = random_indices[1]

  anchor = playlists[random_playlist[0]][random_song_anchor]
  
  positive = playlists[random_playlist[0]][random_song_positive]

  while True:
    if playlists[random_playlist[1]][random_index_neg] not in playlists[random_playlist[0]]:
      negative = playlists[random_playlist[1]][random_index_neg]
      break
    else:
      random_index_neg = random.randint(0, len(playlists[random_playlist[1]])-1)
  
  return anchor, positive, negative

In [17]:
# number of pairs to randomly generate.
NUM_PAIRS = 5000

# Get feature vectors for the track id triplets.
def data_to_feature_vector_triplets():
  pairings = []
  for i in range(NUM_PAIRS):
    pairings.append(random_triplet_generator())

  pairings_as_feature_vectors = []
  for each_tuple in pairings:
    resp = sp.audio_features(each_tuple)
    resp = [list(x.values())[:11] for x in resp]
    pairings_as_feature_vectors.append(resp)
  return pairings, pairings_as_feature_vectors         

In [18]:
pairings, feature_vectors = data_to_feature_vector_triplets()

In [19]:
# save ids & triplets to txt file.

f = open("triplets.txt", "w")
for ids, features in zip(pairings, feature_vectors):
  ids_str = " ".join([id.split(":")[-1] for id in ids])
  features_str = " ".join([str(tuple(f)).replace(" ", "") for f in features])
  f.write(ids_str + " " + features_str + "\n")
f.close()

In [20]:
# load triplets from triplets.txt file.

import ast

id_triplet_tuples = []
features_triplet_tuples = []

with open("triplets.txt") as file:
    for line in file:
        arr = line.strip().split(" ")
        if len(arr) == 6:
          id_triplet_tuples.append((arr[0], arr[1], arr[2]))
          features_triplet_tuples.append((ast.literal_eval(arr[3]), ast.literal_eval(arr[4]), ast.literal_eval(arr[5])))

print(id_triplet_tuples[:5])
print(features_triplet_tuples[:5])

[('55DuTmvSGwNQR4cBgqYuYL', '6V2D8Lls36APk0THDjBDfE', '6KvhbFSMQOCA5PSqPnTtBA'), ('2p07VcUwRZ5sru3mJ0JogS', '6s9ICeczYOfbHHIaSMq9jd', '31bf9SEOppLU6lQ85d8om6'), ('6s9m5J92By7jii22Q2XtY2', '7kB1UXxStzSa78NdiexiIS', '7rWoskZwxQiLsFfRXxFF50'), ('373zqV0VLz9mnrSaY9kaiX', '13qjycCLStZb9sJje6v0MC', '2Fhm0O9VeJdSZgxNiBZTJJ'), ('1mKXFLRA179hdOWQBwUk9e', '0c1gHntWjKD7QShC8s99sq', '0fuQ65fX8W94q6QwTFyqgI')]
[((0.584, 0.531, 0, -6.907, 1, 0.375, 0.0115, 0, 0.126, 0.346, 94.907), (0.794, 0.522, 8, -7.829, 1, 0.159, 0.0328, 0, 0.156, 0.567, 86.318), (0.801, 0.839, 1, -3.267, 0, 0.24, 0.0206, 7.08e-05, 0.0946, 0.792, 170.013)), ((0.677, 0.776, 5, -5.933, 1, 0.0386, 0.42, 0.000689, 0.0954, 0.642, 121.834), (0.497, 0.927, 0, -4.52, 1, 0.0371, 0.00286, 1.26e-06, 0.413, 0.607, 96.991), (0.818, 0.653, 1, -8.396, 1, 0.204, 0.0335, 5.46e-06, 0.22, 0.533, 99.931)), ((0.68, 0.888, 9, -5.308, 1, 0.055, 0.0527, 2.82e-06, 0.0575, 0.484, 90.076), (0.453, 0.587, 4, -7.584, 1, 0.0406, 0.475, 0, 0.109, 0.625, 145.3

In [21]:
from tensorflow.keras import backend as K
from tensorflow.keras.layers import Layer

class TripletLossLayer(Layer):
  def __init__(self, alpha, **kwargs):
    self.alpha=alpha
    super(TripletLossLayer, self).__init__(**kwargs)

  def get_config(self):
    config = super().get_config().copy()
    config.update({
        'aplha':self.alpha
    })
    return config
  
  def triplet_loss(self, inputs):
    a, p, n, = inputs
    p_dist = K.sum(K.square(a-p), axis=-1)
    n_dist = K.sum(K.square(a-n), axis=1)
    return K.sum(K.maximum(p_dist-n_dist+self.alpha,0),axis=0)

  def call(self, inputs):
    loss = self.triplet_loss(inputs)
    self.add_loss(loss)
    return loss

In [22]:
from tensorflow.keras.models import Model
from tensorflow.keras import layers
from tensorflow.keras.layers import Input

# Input for anchor, positive, and negative images
in_a = Input(shape=(11, 1), name="song_a")
in_p = Input(shape=(11, 1), name="song_p")
in_n = Input(shape=(11, 1), name="song_n")

# create the base model
base = layers.Dense(11)(in_a)
flatten = layers.Flatten()(base)
dense = layers.Dense(11, activation="relu")(flatten) 
dense = layers.BatchNormalization()(dense)
output = layers.Dense(11)(dense)

embedding = Model(in_a, output, name="Embedding")

# Custom vector representation of each song
emb_a, emb_p, emb_n = embedding(in_a), embedding(in_p), embedding(in_n)

# Layer that computes the triplet loss from anchor, positive and negative embedding vectors
triplet_loss_layer = TripletLossLayer(alpha=0.2, name='triplet_loss_layer')([emb_a, emb_p, emb_n])

# Model that can be trained with anchor, positive, and negative feature vectors
model = Model([in_a, in_p, in_n], triplet_loss_layer, name="model_2") 
model.compile(loss=None, optimizer='adam')
model.summary()

Model: "model_2"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 song_a (InputLayer)            [(None, 11, 1)]      0           []                               
                                                                                                  
 song_p (InputLayer)            [(None, 11, 1)]      0           []                               
                                                                                                  
 song_n (InputLayer)            [(None, 11, 1)]      0           []                               
                                                                                                  
 Embedding (Functional)         (None, 11)           1540        ['song_a[0][0]',                 
                                                                  'song_p[0][0]',           

In [23]:
from tensorflow import convert_to_tensor

# format the triplets to be compatible with the .fit()
input_a = convert_to_tensor([i[0] for i in features_triplet_tuples])
input_p = convert_to_tensor([i[1] for i in features_triplet_tuples])
input_n = convert_to_tensor([i[2] for i in features_triplet_tuples])

In [25]:
# Don't run fit if you already have the weights.hdf5 file.

from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping

# Training the Model
EPOCHS = 100 # Max number of epochs

model.fit([input_a, input_p, input_n], 
    epochs = EPOCHS, 
    callbacks=[ModelCheckpoint(filepath='weights.hdf5',
                                monitor = 'loss',
                                save_best_only = True,
                                mode = 'auto',
                                save_weights_only = True,
                                verbose = 1),
                EarlyStopping(monitor='loss',
                              mode='auto',
                              patience=10,
                              verbose=True)])

Epoch 1/100
Epoch 1: loss improved from inf to 11.29639, saving model to weights.hdf5
Epoch 2/100
Epoch 2: loss improved from 11.29639 to 6.47467, saving model to weights.hdf5
Epoch 3/100
Epoch 3: loss improved from 6.47467 to 5.73445, saving model to weights.hdf5
Epoch 4/100
Epoch 4: loss improved from 5.73445 to 5.39839, saving model to weights.hdf5
Epoch 5/100
Epoch 5: loss did not improve from 5.39839
Epoch 6/100
Epoch 6: loss did not improve from 5.39839
Epoch 7/100
Epoch 7: loss improved from 5.39839 to 5.20849, saving model to weights.hdf5
Epoch 8/100
Epoch 8: loss improved from 5.20849 to 5.15758, saving model to weights.hdf5
Epoch 9/100
Epoch 9: loss improved from 5.15758 to 5.09263, saving model to weights.hdf5
Epoch 10/100
Epoch 10: loss did not improve from 5.09263
Epoch 11/100
Epoch 11: loss improved from 5.09263 to 5.06387, saving model to weights.hdf5
Epoch 12/100
Epoch 12: loss did not improve from 5.06387
Epoch 13/100
Epoch 13: loss improved from 5.06387 to 5.04502, sa

<keras.callbacks.History at 0x7fa6c9c00d90>

In [26]:
# load the best weights file created by .fit()
model = Model([in_a, in_p, in_n], triplet_loss_layer) 
model.load_weights("./weights.hdf5")
model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 song_a (InputLayer)            [(None, 11, 1)]      0           []                               
                                                                                                  
 song_p (InputLayer)            [(None, 11, 1)]      0           []                               
                                                                                                  
 song_n (InputLayer)            [(None, 11, 1)]      0           []                               
                                                                                                  
 Embedding (Functional)         (None, 11)           1540        ['song_a[0][0]',                 
                                                                  'song_p[0][0]',             

In [27]:
# bypass the triplet input layers (allows us to just input a single song)
base_model = model.get_layer("Embedding")
base_model.summary()

Model: "Embedding"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 song_a (InputLayer)         [(None, 11, 1)]           0         
                                                                 
 dense (Dense)               (None, 11, 11)            22        
                                                                 
 flatten (Flatten)           (None, 121)               0         
                                                                 
 dense_1 (Dense)             (None, 11)                1342      
                                                                 
 batch_normalization (BatchN  (None, 11)               44        
 ormalization)                                                   
                                                                 
 dense_2 (Dense)             (None, 11)                132       
                                                         

In [28]:
import csv

existing = set()
spotify_features_dataset = []

# read tracks & feature vectors from output.csv
with open('/content/output.csv', 'r') as file:
    next(file) # don't read in header
    reader = csv.reader(file)
    for row in reader:
      if row[12] not in existing:
        existing.add(row[12])
        spotify_features_dataset.append((row[12], tuple([float(i) for i in row[:11]])))

print(spotify_features_dataset[:5])

[('5IbCV9Icebx8rR6wAp5hhP', (0.451, 0.258, 2.0, -15.947, 1.0, 0.0681, 0.763, 0.0, 0.136, 0.646, 78.62)), ('6rKVAvjHcxAzZ1BHtwh5yC', (0.588, 0.189, 0.0, -17.737, 1.0, 0.0451, 0.756, 0.0, 0.169, 0.904, 140.467)), ('6Jlkb1Wh08RYHstWScsTvg', (0.281, 0.0652, 6.0, -22.218, 1.0, 0.0388, 0.959, 9.75e-06, 0.102, 0.316, 77.442)), ('0XhC8bfStML9ygBmfOt1JJ', (0.746, 0.3, 8.0, -16.037, 1.0, 0.164, 0.682, 0.0, 0.39, 0.842, 130.248)), ('0ABxAcsRWlqckkyONsfP67', (0.493, 0.235, 7.0, -14.847, 1.0, 0.14, 0.732, 0.0, 0.126, 0.455, 81.576))]


In [29]:
# get embeddings (transformed feature vectors) for every song in output.csv
lookup_table = {}

track_ids = [tup[0] for tup in spotify_features_dataset]
track_vectors = [tup[1] for tup in spotify_features_dataset]

count = 0
while count < len(track_vectors):
  result = base_model.predict(track_vectors[count:count+1800])
  for (id, vec) in zip(track_ids[count: count+1800], result):
    lookup_table[id] = tuple(vec)
  count += 1800



In [30]:
# save transformed track vectors to file

f = open("transformed.txt", "w")
for key, value in lookup_table.items():
  f.write(key.replace(" ", "") + " " + str(value).replace(" ", "") + "\n")
f.close()

In [None]:
# load transformed track vectors from "transformed.txt" file
# lookup_table = {}

# with open('/content/transformed.txt', 'r') as file:
#   for line in file:
#     arr = line.split(" ")
#     if len(arr) == 2:
#       lookup_table[arr[0]] = ast.literal_eval(arr[1])