# Music Recommender System using Apache Spark and Python


## Necessary Package Imports

In [1]:
from pyspark.mllib.recommendation import *
import random
from operator import *

## Loading data

Load the three datasets into RDDs and name them `artistData`, `artistAlias`, and `userArtistData`. View the README, or the files themselves, to see how this data is formated. I only some the sample dataset for the model.

In [2]:
def parser(s, delimeters=" ", to_int=None):
    s = s.split(delimeters)
    if to_int:
        return tuple([int(s[i]) if i in to_int else s[i] for i in range(len(s))])
    return tuple(s)
artistData = sc.textFile("artist_data_small.txt").map(lambda x: parser(x,'\t',[0]))
artistAlias = sc.textFile("artist_alias_small.txt").map(lambda x: parser(x,'\t', [0,1]))
artistAliasMap = artistAlias.collectAsMap()
userArtistData = sc.textFile("user_artist_data_small.txt").map(lambda x: parser(x,' ',[0,1,2]))
userArtistData = userArtistData.map(lambda x: (x[0], artistAliasMap.get(x[1], x[1]), x[2]))


## Data Exploration



In [3]:
def summary(user_id):
    play_list = userArtistData.map(lambda x: (x[0], (x[1], x[2]))).lookup(user_id)
    total = sum(x[1] for x in play_list)
    print "User %s has a total play count of %s and a mean play count of %s." % (user_id, total, total/len(play_list),)
summary(1059637)
summary(2064012)
summary(2069337)

User 1059637 has a total play count of 674412 and a mean play count of 1878.
User 2064012 has a total play count of 548427 and a mean play count of 9455.
User 2069337 has a total play count of 393515 and a mean play count of 1519.


####  Splitting Data for Testing


* A training set, `trainData`, that will be used to train the model. This set should constitute 40% of the data.
* A validation set, `validationData`, used to perform parameter tuning. This set should constitute 40% of the data.
* A test set, `testData`, used for a final evaluation of the model. This set should constitute 20% of the data.


In [4]:
trainingData, validationData, testData = userArtistData.randomSplit([40,40,20], 13)
trainingData.cache()
validationData.cache()
testData.cache()
print trainingData.take(3)
print validationData.take(3)
print testData.take(3)
print trainingData.count()
print validationData.count()
print testData.count()
# validationSet.lookup(1073421)

[(1059637, 1000049, 1), (1059637, 1000056, 1), (1059637, 1000113, 5)]
[(1059637, 1000010, 238), (1059637, 1000062, 11), (1059637, 1000112, 423)]
[(1059637, 1000094, 1), (1059637, 1000130, 19129), (1059637, 1000139, 4)]
19817
19633
10031


## The Recommender Model



In [5]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

def cal_score(predict, actual):
    if len(actual) < len(predict):
#         print "here"
        predict = predict[0:len(actual)]
    return len(list(set(predict) & set(actual)))*1.0/len(actual)

def modelEval(model, dataset):
    # Find the list of all artists in the whole data set
    all_artists = userArtistData.map(lambda x: x[1]).distinct().collect()
    # Find the users in the input dataset
    test_user = dataset.map(lambda p: p[0]).distinct().collect()
    # Find the artists each user listened to in the training set and generate the test data
    global trainingData
    testdata = trainingData.filter(lambda x: x[0] in test_user).map(lambda x: (x[0], x[1])).groupByKey()
    testdata = testdata.map(lambda x: (x[0], list(x[1])))
    testdata = testdata.flatMap(lambda x: [(x[0],a) for a in all_artists if a not in x[1]])
    # Find the artists each user listened to in the input dataset
    testdata_actual = dataset.map(lambda x: (x[0], x[1])).groupByKey().map(lambda x: (x[0], list(x[1]))).collectAsMap()
    predictions = model.predictAll(testdata).map(lambda x: (x[0], (x[1], x[2])))
    predictions = predictions.groupByKey().map(lambda x: (x[0], sorted(list(x[1]), key=lambda y: y[1], reverse=True)))
    predictions = predictions.map(lambda x: (x[0], cal_score([y[0] for y in x[1]], testdata_actual[x[0]])))
    return predictions.map(lambda x:x[1]).reduce(lambda x, y: x+ y) * 1.0 / len(test_user)    
    

### Model Construction



In [6]:
training = trainingData.map(lambda x: Rating(int(x[0]), int(x[1]), float(x[2])))
for r in [2, 10, 20]:
    model = ALS.trainImplicit(training, rank = r, seed=345)
    print "The model score for rank %s is %s" % (r, modelEval(model, validationData),)


The model score for rank 2 is 0.0884421125678
The model score for rank 10 is 0.0980943620399
The model score for rank 20 is 0.0902867925232


In [7]:
bestModel = ALS.trainImplicit(training, rank=10, seed=345)
print modelEval(bestModel, testData)

0.0571818264943


## Trying Some Artist Recommendations


In [8]:
recommended = map(lambda x: x.product, bestModel.recommendProducts(1059637, 5))
for i, artist in enumerate(recommended):
    print "Artist %s: %s" % (i, artistData.lookup(artist)[0],)

Artist 0: Taking Back Sunday
Artist 1: Brand New
Artist 2: Death Cab for Cutie
Artist 3: Elliott Smith
Artist 4: Jimmy Eat World
