In [1]:
#---import required libraries---

import os
import numpy as np
import pandas as pd
import spotipy
import pyodbc

from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from scipy.spatial.distance import cdist
from collections import defaultdict
from spotipy.oauth2 import SpotifyClientCredentials
from azure.identity import AzureCliCredential

import warnings
warnings.filterwarnings("ignore")

#---import data from csv---

data = pd.read_csv('data\data_features.csv')

#---import data from db---

driver= 'ODBC Driver 18 for SQL Server'

with open('data/server.txt', 'r') as tf:
    server = tf.read()
with open('data/database.txt', 'r') as tf:
    database = tf.read()
with open('data/username.txt', 'r') as tf:
    username = tf.read()
with open('data/password.txt', 'r') as tf:
    password = tf.read()

#with pyodbc.connect('Driver='+driver+';Server=tcp:'+server+',1433;Database='+database+';UID='+username+';PWD='+password+';Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;') as conn:
#    with conn.cursor() as cursor:
#        cursor.execute("SELECT TOP (1) [name] FROM [dbo].[data_features]")
#        row = cursor.fetchone()
#        while row:
#            print (str(row[0]) + " " + str(row[0]))
#            row = cursor.fetchone()

#---create spotipy spotify connection---

with open('data/spotipyclientid.txt', 'r') as tf:
    SPOTIPY_CLIENT_ID = tf.read()
with open('data/spotipyclientsecret.txt', 'r') as tf:
    SPOTIPY_CLIENT_SECRET = tf.read()
    
sp = spotipy.Spotify(auth_manager=SpotifyClientCredentials(client_id=SPOTIPY_CLIENT_ID, client_secret=SPOTIPY_CLIENT_SECRET))

#---create pipeline for kmeans---

song_cluster_pipeline = Pipeline([('scaler', StandardScaler()), 
                                  ('kmeans', KMeans(n_clusters=10, 
                                   verbose=False))
                                 ], verbose=False)

#---fit columns of data to pipeline---

X = data.select_dtypes(np.number)
number_cols = list(X.columns)
song_cluster_pipeline.fit(X)
song_cluster_labels = song_cluster_pipeline.predict(X)
data['cluster_label'] = song_cluster_labels

#---set number_cols equal to columns of data---

number_cols = ['track_number', 'disc_number', 'danceability', 'energy', 'key', 'loudness', 'mode', 'speechiness', 
               'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo', 'duration_ms', 'time_signature', 'year']

#---definitions---

def find_song(name, artists, year):
    song_data = defaultdict()
    results = sp.search(q='track: {} artists: {} year: {}'.format(name,artists,year), limit=1, type='track')
    if results['tracks']['items'] == []:
        return None

    results = results['tracks']['items'][0]
    #print(results)
    
    out_name = results['name']
    out_artist = results['artists'][0]['name']
    out_year = results['album']['release_date'][0:4]

    #print(out_name)
    #print(out_artist)
    #print(out_year)
    
    if (out_name.casefold() != name.casefold()) or (out_artist.casefold() != artists.casefold()):
        return None
    
    track_id = results['id']
    audio_features = sp.audio_features(track_id)[0]

    song_data['name'] = [out_name]
    song_data['year'] = [int(out_year)]
    song_data['explicit'] = [int(results['explicit'])]
    song_data['duration_ms'] = [results['duration_ms']]
    song_data['popularity'] = [results['popularity']]
    song_data['track_number'] = [results['track_number']]
    song_data['disc_number'] = [results['disc_number']]

    for key, value in audio_features.items():
        song_data[key] = value

    return pd.DataFrame(song_data)

def get_song_data(song, spotify_data):
    
    try:
        song_data = spotify_data[(spotify_data['name'] == song['name']) & (spotify_data['artists'] == song['artists']) & (spotify_data['year'] == song['year'])].iloc[0]
        return song_data
    
    except IndexError:
        if (find_song(song['name'], song['artists'], song['year']) is None):
            return None
        return find_song(song['name'], song['artists'], song['year'])
        

def get_mean_vector(song_list, spotify_data):
    
    song_vectors = []
    
    for song in song_list:
        song_data = get_song_data(song, spotify_data)
        if song_data is None:
            print('Warning: "{}" does not exist in Spotify or in database'.format(song['name']))
            return None
        song_vector = song_data[number_cols].values
        song_vectors.append(song_vector)  
    
    song_matrix = np.array(list(song_vectors))
    return np.mean(song_matrix, axis=0)


def flatten_dict_list(dict_list):
    
    flattened_dict = defaultdict()
    for key in dict_list[0].keys():
        flattened_dict[key] = []
    
    for dictionary in dict_list:
        for key, value in dictionary.items():
            flattened_dict[key].append(value)
            
    return flattened_dict


def recommend_songs( song_list, spotify_data, n_songs=10):
    
    metadata_cols = ['name', 'year', 'artists']
    song_dict = flatten_dict_list(song_list)
    
    song_center = get_mean_vector(song_list, spotify_data)
    if (song_center is None):
        return "Song Could Not Be Located in Spotify or Tunit Database"
    scaler = song_cluster_pipeline.steps[0][1]
    scaled_data = scaler.transform(spotify_data[number_cols])
    scaled_song_center = scaler.transform(song_center.reshape(1, -1))
    distances = cdist(scaled_song_center, scaled_data, 'cosine')
    index = list(np.argsort(distances)[:, :n_songs][0])

    rec_songs = spotify_data.iloc[index]
    rec_songs = rec_songs[~rec_songs['name'].isin(song_dict['name'])]
    return rec_songs[metadata_cols].to_dict(orient='records')

[recommend_songs([{'name': 'How to Save A Life', 'artists': 'The Fray', 'year':2005},
                  {'name': 'If I Die Young', 'artists': 'The Band Perry', 'year':2010},
                  {'name': 'Somebody That I Used To Know', 'artists': 'Gotye', 'year':2011}], data)]

In [2]:
[recommend_songs([{'name': 'It Must Be A Pain', 'artists': 'sewerperson', 'year':2022}],  data)]

[[{'name': 'Lovers or Friends', 'year': 2020, 'artists': "['YSN Fab']"},
  {'name': 'Butterflies', 'year': 2020, 'artists': "['Macca Wiles']"},
  {'name': 'Glitter 17', 'year': 2018, 'artists': "['MAX BLACK', 'Seul']"},
  {'name': 'Better Than This', 'year': 2020, 'artists': "['Paloma Faith']"},
  {'name': 'In the Spirit World Now - Synthetic Remix',
   'year': 2020,
   'artists': "['Ceremony']"},
  {'name': 'Van Horn - GOLDHOUSE Remix',
   'year': 2020,
   'artists': "['Saint Motel', 'GOLDHOUSE']"},
  {'name': "Ain't That a Shame",
   'year': 2016,
   'artists': "['The Fogcutters']"},
  {'name': 'The Girl Who Tried', 'year': 2018, 'artists': "['Astari Nite']"},
  {'name': 'Ok 4 Now', 'year': 2019, 'artists': "['Lil Skies']"},
  {'name': "Innocence d'aimer", 'year': 2018, 'artists': "['Jeanne Mas']"}]]

In [3]:
[recommend_songs([{'name': 'looking for a song, any song', 'artists': 'The Police', 'year':2000}],  data)]



['Song Could Not Be Located in Spotify or Tunit Database']