In [None]:
!pip install pyspark



In [None]:
from pyspark.mllib.recommendation import *
import random
import pandas as pd
from operator import *

In [None]:
from pyspark import SparkContext
sc =SparkContext.getOrCreate()
artistData = sc.textFile("/content/sample_data/artist_data_small.txt").map(lambda l: l.split("\t")).map(lambda l: (int(l[0]), l[1]))
artistAlias = sc.textFile("/content/sample_data/artist_alias_small.txt").map(lambda l: l.split("\t")).map(lambda l: (int(l[0]), int(l[1])))
artistAliasDict = artistAlias.collectAsMap()

def canonical(ele):
    if ele in artistAliasDict:
        return artistAliasDict.get(ele)
    return int(ele)    
 
userArtistData = sc.textFile("/content/sample_data/user_artist_data_small.txt").map(lambda line: line.split(" ")).map(lambda e: (int(e[0]), canonical(e[1]), int(e[2])))

In [None]:
import array
from collections import namedtuple

from pyspark import SparkContext, since
from pyspark.rdd import RDD
from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc, inherit_doc
from pyspark.mllib.util import JavaLoader, JavaSaveable
from pyspark.sql import DataFrame

__all__ = ['MatrixFactorizationModel', 'ALS', 'Rating']


class Rating(namedtuple("Rating", ["user", "product", "rating"])):
      
    @since("0.9.0")
    def predict(self, user, product):
        """
        Predicts rating for the given user and product.
        """
        return self._java_model.predict(int(user), int(product))

    @since("0.9.0")
    def predictAll(self, user_product):
        """
        Returns a list of predicted ratings for input user and product
        pairs.
        """
        assert isinstance(user_product, RDD), "user_product should be RDD of (user, product)"
        first = user_product.first()
        assert len(first) == 2, "user_product should be RDD of (user, product)"
        user_product = user_product.map(lambda u_p: (int(u_p[0]), int(u_p[1])))
        return self.call("predict", user_product)

    @since("1.2.0")
    def userFeatures(self):
        """
        Returns a paired RDD, where the first element is the user and the
        second is an array of features corresponding to that user.
        """
        return self.call("getUserFeatures").mapValues(lambda v: array.array('d', v))

    @since("1.2.0")
    def productFeatures(self):
        """
        Returns a paired RDD, where the first element is the product and the
        second is an array of features corresponding to that product.
        """
        return self.call("getProductFeatures").mapValues(lambda v: array.array('d', v))

    @since("1.4.0")
    def recommendUsers(self, product, num):
        """
        Recommends the top "num" number of users for a given product and
        returns a list of Rating objects sorted by the predicted rating in
        descending order.
        """
        return list(self.call("recommendUsers", product, num))

    @since("1.4.0")
    def recommendProducts(self, user, num):
        """
        Recommends the top "num" number of products for a given user and
        returns a list of Rating objects sorted by the predicted rating in
        descending order.
        """
        return list(self.call("recommendProducts", user, num))

    def recommendProductsForUsers(self, num):
        """
        Recommends the top "num" number of products for all users. The
        number of recommendations returned per user may be less than "num".
        """
        return self.call("wrappedRecommendProductsForUsers", num)

In [None]:

temp1 = pd.read_csv("/content/sample_data/artist_alias_small.csv")
temp1.head()


Unnamed: 0,badid,goodid
0,1027859,1252408
1,1017615,668
2,6745885,1268522
3,1018110,1018110
4,1014609,1014609


In [None]:
temp1.describe()

Unnamed: 0,badid,goodid
count,587.0,587.0
mean,3348950.0,1469856.0
std,3223688.0,1830129.0
min,15.0,15.0
25%,1060312.0,1006480.0
50%,1291110.0,1034635.0
75%,6664274.0,1259498.0
max,10713440.0,10361610.0


In [None]:
temp2= pd.read_csv("/content/sample_data/artist_data_small.csv")
temp2.head()


Unnamed: 0,artistid,artist_name
0,1240113,riow arai
1,1240132,Outkast & Rage Against the Machine
2,6776115,小松正夫
3,1030848,Raver's Nature
4,6671601,"Erguner, Kudsi"


In [None]:
temp2.describe()

Unnamed: 0,artistid
count,30537.0
mean,2723610.0
std,2875962.0
min,1.0
25%,1033180.0
50%,1238800.0
75%,2164639.0
max,10788220.0


In [None]:
temp3 = pd.read_csv("/content/sample_data/user_artist_data_small.csv")
temp3.head()

Unnamed: 0,userid,artistid,playcount
0,1059637,1000010,238
1,1059637,1000049,1
2,1059637,1000056,1
3,1059637,1000062,11
4,1059637,1000094,1


In [None]:
temp3.describe()

Unnamed: 0,userid
count,49481.0
mean,1328420.0
std,452991.3
min,1000647.0
25%,1024631.0
50%,1059245.0
75%,2010008.0
max,2288164.0


In [None]:
userPlayList = userArtistData.map(lambda x: (x[0], (x[1], x[2]))).aggregateByKey((0,0),\
               (lambda x,y: (x[0] + 1, x[1] + y[1])),\
               (lambda rdd1, rdd2: (rdd1[0]+rdd2[0], rdd1[1]+rdd2[1]))).map(lambda t: (t[0], t[1][1], t[1][1]//t[1][0]))
top3Users = userPlayList.sortBy(ascending=False, keyfunc=(lambda x: x[1])).take(3)
for t in top3Users:
    print("User " + str(t[0]) + " has a total play count of "+ str(t[1]) +" and a mean play count of "+ str(t[2]) + ".")

User 1059637 has a total play count of 674412 and a mean play count of 1878.
User 2064012 has a total play count of 548427 and a mean play count of 9455.
User 2069337 has a total play count of 393515 and a mean play count of 1519.


In [None]:
trainData, validationData, testData = userArtistData.randomSplit([0.4,0.4,0.2], 13)
print(trainData.take(3))
print(validationData.take(3))
print(testData.take(3))
print(trainData.count())
print(validationData.count())
print(testData.count())
trainData.cache()
validationData.cache()
testData.cache()

[(1059637, 1000049, 1), (1059637, 1000056, 1), (1059637, 1000114, 2)]
[(1059637, 1000010, 238), (1059637, 1000062, 11), (1059637, 1000123, 2)]
[(1059637, 1000094, 1), (1059637, 1000112, 423), (1059637, 1000113, 5)]
19769
19690
10022


PythonRDD[26] at RDD at PythonRDD.scala:53

In [None]:
def modelEval(model, dataset):
    
    #get all users and artists
    allUser = set(userArtistData.map(lambda x: (x[0])).collect())
    allArtist = set(userArtistData.map(lambda x: (x[1])).collect())

    #get dictionary of user and their artists for traindata and validation data
    trainUserArtist = trainData.map(lambda x: (x[0], {x[1]})).reduceByKey(lambda x,y: x.union(y)).collectAsMap()
    datasetUserArtist = validationData.map(lambda x: (x[0], {x[1]})).reduceByKey(lambda x,y: x.union(y)).collectAsMap()
    
    overAllScore = 0
    for user in allUser:
        # all actual user preferred artists
        actualUserArtist = datasetUserArtist[user]
        # all artist except that from traindata
        nonTrainDataArtist = allArtist - trainUserArtist[user]
        # prepare the test data consisting of tuples (user, product) => (user, artist)
        test = map(lambda x: (user, x), nonTrainDataArtist)
        # convert the set into rdd as rdd is required by predictAll method
        testRDD = sc.parallelize(test)
        # predict the ratings
        predictedUserArtist = model.predictAll(testRDD)
        # sort the ratings in descending order of ratings
        sortedPrediction = predictedUserArtist.sortBy(ascending=False, keyfunc=lambda x: x.rating)
        # get number of top artists equal to number of actual artist for the user
        c = len(actualUserArtist)
        predictedSet = set(sortedPrediction.map(lambda x: x.product).take(c))
        # take the intersection of actual artist and predicted artist for a user and calculate the score
        correctPrediction = predictedSet.intersection(actualUserArtist)
        overAllScore += float(len(correctPrediction)) / c

    return "The model score for rank "+str(model.rank)+" is "+str(overAllScore/len(allUser))


In [None]:
model = ALS.trainImplicit(trainData, rank=2, seed=345)
print(modelEval(model, validationData))
model = ALS.trainImplicit(trainData, rank=10, seed=345)
print(modelEval(model, validationData))
model = ALS.trainImplicit(trainData, rank=20, seed=345)
print(modelEval(model, validationData))

The model score for rank 2 is 0.08616827592156559
The model score for rank 10 is 0.09441971719854263
The model score for rank 20 is 0.08408995233356337


In [None]:
bestModel = ALS.trainImplicit(trainData, rank=10, seed=345)
modelEval(bestModel, testData)

'The model score for rank 10 is 0.09441971719854263'

In [None]:
artistDict = artistData.collectAsMap()
userid=1059334
ratings = bestModel.recommendProducts(userid, 5)
ratingsArtists = map(lambda r : r.product, ratings)
i=1
print("Artist recommendations for user with ",userid," are:")
for artist in ratingsArtists:
    print("Artist "+ str(i)+ ":",artistDict[artist])
    i+=1

Artist recommendations for user with  1059334  are:
Artist 1: The Shins
Artist 2: My Bloody Valentine
Artist 3: The 5.6.7.8's
Artist 4: The Microphones
Artist 5: Hanson
