In [None]:
!conda install -c cyclus java-jdk

In [None]:
# Import libraries
import findspark
findspark.init()

from pyspark.mllib.recommendation import *
import random
from operator import *
from collections import defaultdict
from pyspark import SparkContext, SparkConf

In [None]:
# Initialize Spark Context
spark = SparkContext.getOrCreate()
spark.stop()
spark = SparkContext('local','Recommender')

In [None]:
# Import test files from location into RDD variables
artistData = spark.textFile('./data_raw/artist_data.txt').map(lambda s:(int(s.split("\t")[0]),s.split("\t")[1]))
artistAlias = spark.textFile('./data_raw/artist_alias.txt')
userArtistData = spark.textFile('./data_raw/user_artist_data.txt')

In [None]:
# Split a sequence into seperate entities and store as int

userArtistData = userArtistData.map(lambda s:(int(s.split(" ")[0]),int(s.split(" ")[1]),int(s.split(" ")[2])))

In [None]:
# Create an RDD consisting of 'userid' and 'playcount' objects of original tuple
userSum = userArtistData.map(lambda x:(x[0],x[2]))
playCount1 = userSum.map(lambda x: (x[0],x[1])).reduceByKey(lambda a,b : a+b)
playCount2 = userSum.map(lambda x: (x[0],1)).reduceByKey(lambda a,b:a+b)
playSumAndCount = playCount1.leftOuterJoin(playCount2)

In [None]:
# Create a dictionary of the 'artistAlias' dataset

artistAliasDictionary = {}
dataValue = artistAlias.map(lambda s:(int(s.split("\t")[0]),int(s.split("\t")[1])))
for temp in dataValue.collect():
    artistAliasDictionary[temp[0]] = temp[1]

In [None]:
# If artistid exists, replace with artistsid from artistAlias, else retain original

userArtistData = userArtistData.map(lambda x: (x[0], artistAliasDictionary[x[1]] if x[1] in artistAliasDictionary else x[1], x[2]))

In [None]:
# Count instances by key and store in broadcast variable

playSumAndCount = playSumAndCount.map(lambda x: (x[0],x[1][0],int(x[1][0]/x[1][1])))

In [None]:
# Compute and display users with the highest playcount along with their mean playcount across artists

TopThree = playSumAndCount.top(3,key=lambda x: x[1])
for i in TopThree:
    print('User '+str(i[0])+' has a total play count of '+str(i[1])+' and a mean play count of '+str(i[2])+'.')

In [None]:
# Split the 'userArtistData' dataset into training, validation and test datasets. Store in cache for frequent access

trainData, validationData, testData = userArtistData.randomSplit((0.4,0.4,0.2),seed=13)
trainData.cache()
validationData.cache()
testData.cache()

In [None]:
def modelEval(model, dataset):
    
    # All artists in the 'userArtistData' dataset
    AllArtists = spark.parallelize(set(userArtistData.map(lambda x:x[1]).collect()))
    
    # Set of all users in the current (Validation/Testing) dataset
    AllUsers = spark.parallelize(set(dataset.map(lambda x:x[0]).collect()))
    
    # Create a dictionary of (key, values) for current (Validation/Testing) dataset
    ValidationAndTestingDictionary ={}
    for temp in AllUsers.collect():
        tempFilter = dataset.filter(lambda x:x[0] == temp).collect()
        for item in tempFilter:
            if temp in ValidationAndTestingDictionary:
                ValidationAndTestingDictionary[temp].append(item[1])
            else:
                ValidationAndTestingDictionary[temp] = [item[1]]           
    
    # Create a dictionary of (key, values) for training dataset
    TrainingDictionary = {}
    for temp in AllUsers.collect():
        tempFilter = trainData.filter(lambda x:x[0] == temp).collect()
        for item in tempFilter:
            if temp in TrainingDictionary:
                TrainingDictionary[temp].append(item[1])
            else:
                TrainingDictionary[temp] = [item[1]]
        
    # For each user, calculate the prediction score i.e. similarity between predicted and actual artists
    PredictionScore = 0.00
    for temp in AllUsers.collect():
        ArtistPrediction =  AllArtists.map(lambda x:(temp,x))
        ModelPrediction = model.predictAll(ArtistPrediction)
        tempFilter = ModelPrediction.filter(lambda x :not x[1] in TrainingDictionary[x[0]])
        topPredictions = tempFilter.top(len(ValidationAndTestingDictionary[temp]),key=lambda x:x[2])
        l=[]
        for i in topPredictions:
            l.append(i[1])
        PredictionScore+=len(set(l).intersection(ValidationAndTestingDictionary[temp]))/len(ValidationAndTestingDictionary[temp])    

    # Print average score of the model for all users for the specified rank
    print("The model score for rank "+str(model.rank)+" is ~"+str(PredictionScore/len(ValidationAndTestingDictionary)))

In [None]:
 rankList = [10,50,200]
 for rank in rankList:
     model = ALS.trainImplicit(trainData, rank , seed=5028)
     modelEval(model,validationData)

In [None]:
bestModel = ALS.trainImplicit(trainData, rank=10, seed=5028)
modelEval(bestModel, testData)

In [None]:
# Find the top 5 artists for a particular user and list their names
TopFive = bestModel.recommendProducts(1059637,5)
for item in range(0,5):
    print("Artist "+str(item)+": "+artistData.filter(lambda x:x[0] == TopFive[item][1]).collect()[0][1])