In [None]:
# importing libraries
import pandas as pd
import numpy as np
import pyswarms as ps

from sklearn.preprocessing import StandardScaler
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score

from surprise import Dataset, Reader
from surprise import SVD, NMF, KNNWithZScore, CoClustering
from surprise.model_selection import ShuffleSplit
from surprise.model_selection import train_test_split
from sklearn.metrics.pairwise import cosine_similarity
from surprise import accuracy

from collections import defaultdict
from statistics import mean
import random

In [None]:
# Train Triplets Data
train_triplets = 'K:/Datasets/EchoNest/Train Triplets/train_triplets.txt'
train_ratings = pd.read_csv(train_triplets, sep="\t", header=None)
train_ratings.columns = ['user_id', 'song_id', 'listen_count']

# Spotify Metadata 
song_metadata = pd.read_csv('K:/Notebook Files/spotify_metadata.csv')

# get ratings for songs that are in the metadata
ratings = train_ratings[train_ratings['song_id'].isin(song_metadata['en_song_id'])]

In [None]:
# how many songs each user has listened to
user_counts = ratings.groupby('user_id')['song_id'].count()
user_counts.describe().apply(lambda x: format(x, 'f'))

In [None]:
# how many users listen to the same song on average
song_user = ratings.groupby('song_id')['user_id'].count()
song_user.describe()

In [None]:
# filter out users that have listened to less than 16 songs
flt_users = user_counts[user_counts > 15].index.to_list() #25%

# filter out songs that have less than 200 users
flt_songs = song_user[song_user > 14].index.to_list() #50%

# filter out dataset with user and song id's
ratings_flt = ratings[(ratings['user_id'].isin(flt_users)) & (ratings['song_id'].isin(flt_songs))].reset_index(drop=True)

In [None]:
# top 10000 songs with most ratings
top_songs = song_user.sort_values(ascending=False)[:10000].index.to_list()
# filter for only top 10000 popular songs
ratings_flt_pop = ratings_flt[(ratings_flt['song_id'].isin(top_songs))].reset_index(drop=True)

# top 10000 users with most ratings
most_ratings_user = user_counts.sort_values(ascending=False)[:10000].index.to_list()
# filter for only top 10000 users
ratings_flt_pop = ratings_flt_pop[(ratings_flt_pop['user_id'].isin(most_ratings_user))].reset_index(drop=True)

In [None]:
# binning technique 
bins = [0,1,2,3,4,5,6,7,8,9,ratings_flt_pop['listen_count'].max()]
ratings_flt_pop['rating'] = pd.cut(ratings_flt_pop['listen_count'], bins=bins, labels=[1,2,3,4,5,6,7,8,9,10])
ratings_flt_pop['rating'] = ratings_flt_pop.rating.astype('int')

In [None]:
# Initialize Reader class with rating scale from 1 to 10
reader = Reader(rating_scale=(1, 10))

# load dataset class with ratings 
data = Dataset.load_from_df(ratings_flt_pop[['user_id', 'song_id', 'rating']], reader)

In [None]:
# code from suprise library website
# source: https://surprise.readthedocs.io/en/stable/FAQ.html

# Return precision and recall at k metrics for each user
def precision_recall_at_k(predictions, k=20, threshold=3):
    
    # First map the predictions to each user.
    user_est_true = defaultdict(list)
    for uid, _, true_r, est, _ in predictions:
        user_est_true[uid].append((est, true_r))
    
    precisions = dict()
    recalls = dict()
    for uid, user_ratings in user_est_true.items():
        
        # Sort user ratings by estimated value
        user_ratings.sort(key=lambda x: x[0], reverse=True)
        # Number of relevant items
        n_rel = sum((true_r >= threshold) for (_, true_r) in user_ratings)
        # Number of recommended items in top k
        n_rec_k = sum((est >= threshold) for (est, _) in user_ratings[:k])
        # Number of relevant and recommended items in top k
        n_rel_and_rec_k = sum(((true_r >= threshold) and (est >= threshold)) for (est, true_r) in user_ratings[:k])
        # Precision@K: Proportion of recommended items that are relevant
        precisions[uid] = n_rel_and_rec_k / n_rec_k if n_rec_k != 0 else 1
        # Recall@K: Proportion of relevant items that are recommended
        recalls[uid] = n_rel_and_rec_k / n_rel if n_rel != 0 else 1
    
    return precisions, recalls

In [None]:
# open-source code by https://github.com/mikema2019/tune-hyperparameters-with-PSO

class optimisation():
    def __init__(self, n_particles=10):
        self.n_particles = n_particles
        
    def PSO(self, train_set, test_set, algo, parameters):
        
        self.train_set = train_set
        self.test_set = test_set
        self.algo = algo
        self.parameters = parameters
        self.dimensions = len(parameters)
        
        # The Minimum and Maximum Dimension Boundary
        maximum = np.zeros(self.dimensions)
        minimum = np.zeros(self.dimensions)
        
        for count, (key, value) in enumerate(self.parameters.items()):
            minimum[count] = value[1]
            maximum[count] = value[2]
        boundaries = (minimum, maximum)
        
        # parameters
        options = {'c1':0.5, 
                   'c2':0.6, 
                   'w':0.8}
        
        # the GlobalBestPSO method finds the best solution from a set of condidate solutions
        optimizer = ps.single.GlobalBestPSO(n_particles=self.n_particles, dimensions=self.dimensions, 
                                            options=options, bounds=boundaries)
        
        # omptimize method return tuple with optimal cost and position after optimizing 
        best_cost, best_pos = optimizer.optimize(self.searchParameters, iters=10)
        self.BestParams = {}

        # using the best found position for particle, train and test model with best hyper-parameters
        for count, (key, value) in enumerate(self.parameters.items()):
            if value[0].__name__ == 'choice':
                index = value[0](best_pos[count])
                self.BestParams[key] = value[3][index]
            else:
                self.BestParams[key] = value[0](best_pos[count])
        print("\n")
        self.final_model = self.algo(**self.BestParams)
        self.final_model.fit(train_set)
        print(f"PSO selected hyper-parameters: {self.BestParams}")
        
        # testing model and returning predictions 
        predictions = self.final_model.test(test_set)
        result = accuracy.rmse(predictions)
        print(result)
        
        return predictions

    # objective function that finds the best combinations of hyper-parmaters
    def searchParameters(self, parameters):
        score_array = np.zeros((self.n_particles))
        SearchParams = {}
        
        for i in range(self.n_particles):
            for count, (key, value) in enumerate(self.parameters.items()):
                if value[0].__name__ == 'choice':
                    index = value[0](parameters[i,count])
                    SearchParams[key] = value[3][index]
                else:
                    SearchParams[key] = value[0](parameters[i,count])
                
                algorithm = self.algo(**SearchParams)
                algorithm.fit(self.train_set) 

                predictions = algorithm.test(self.test_set)
                score_array[i] = accuracy.rmse(predictions)
            
            # get average of RMSE score and return
            print(f'AVG: {-np.mean(score_array)}')
            return -np.mean(score_array)

In [None]:
algos = [SVD, NMF, KNNWithZScore, CoClustering]
test_size = ['0.25', '0.30', '0.40']
n_splits = 5

results = {}

parameters = defaultdict(dict)
parameters['SVD'] = {
    'n_factors':[choice, 0, 6, [20, 30, 50, 75, 100, 125]],
    'lr_all':[uniform, 0.001, 0.002, 0.003, 0.05],
    'reg_all':[uniform, 0.002, 0.004, 0.006, 0.08]
}

parameters['NMF'] = {
    'n_factors':[choice, 0, 6, [25, 30, 50, 75, 100, 125]],
    'reg_pu': [uniform, 0.001, 0.003, 0.005, 0.01],
    'reg_qi':[uniform, 0.001, 0.003, 0.005, 0.01]
}

parameters['KNNWithZScore'] = {
    'k':[choice, 0, 5, [5, 10, 15, 20, 30]],
    'min_k':[choice, 0, 3, [2, 4, 6]],
}

parameters['CoClustering'] = {
    'n_cltr_u':[choice, 0, 4, [3, 5, 10, 15, 20]],
    'n_cltr_i': [choice, 0, 4, [3, 5, 10, 15, 20]]
}

for algorithm in algos:
    print(f'Current algorithm: {algorithm}')
    
    algorithm_name = str(algorithm).split(".")[3].split(" ")[0].split("'")[0]
    
    for size in test_size:
        # split the dataset into a training set and testing set
        trainset, testset = train_test_split(data, test_size=float(size))
        
        # load the ParticleSwarmOptimisation class and call the PSO method
        pso = optimisation()
        pso.PSO(trainset, testset, algorithm, parameters[algorithm_name])