# Music Recommendation System

## Description

This project will create a recommendation system that will recommend music artists to a user based on their listening history. To create this system we'll be using Spark and the collaborative filtering technique. 

In [1]:
# import the necessary libraries
import findspark
findspark.init()

from pyspark.mllib.recommendation import *
import random
from operator import *
from collections import defaultdict
from pyspark import SparkContext

In [2]:
# Initialize Spark Context
spark = SparkContext('local', 'music_recommender')

## Loading the data

We'll load in the data into three different RDDs.

In [3]:
# Import test files from location into RDD variables

artistData = spark.textFile('data/artist_data_small.txt')            
artistAlias = spark.textFile('data/artist_alias_small.txt')
userArtistData = spark.textFile('data/user_artist_data_small.txt')

## Data Exploration

In this section we'll clean the data a little and find the top 3 users with the most total play counts and their mean play count for an artist.

In [4]:
#Reading the artist data 
artistData = artistData.map(lambda row: row.split('\t'))
artistData = artistData.map(lambda row: (int(row[0]),row[1]))

# Reading the artistAlias data
artistAlias = artistAlias.map(lambda row: row.split('\t'))
artistAlias = artistAlias.map(lambda row: (int(row[0]),int(row[1])))

# Reading the userArtistData and storing them in the variables
userArtistData = userArtistData.map(lambda row: row.split(' '))
userArtistData = userArtistData.map(lambda row: (int(row[0]),int(row[1]),int(row[2])))

In [5]:
# show the first 5 rows of the artistData dataset
print(artistData.take(5))

[(1240105, 'André Visior'), (1240113, 'riow arai'), (1240132, 'Outkast & Rage Against the Machine'), (6776115, '小松正夫'), (1030848, "Raver's Nature")]


In [6]:
# show the first 5 rows of the artistAlias dataset
print(artistAlias.take(5))

[(1027859, 1252408), (1017615, 668), (6745885, 1268522), (1018110, 1018110), (1014609, 1014609)]


In [7]:
# show the first 5 rows of the userArtistData dataset
print(userArtistData.take(5))

[(1059637, 1000010, 238), (1059637, 1000049, 1), (1059637, 1000056, 1), (1059637, 1000062, 11), (1059637, 1000094, 1)]


In [8]:
# check the number of rows in each dataset
print('artistData rows:', artistData.count())
print('userArtistData rows:', userArtistData.count())
print('artistAlias rows:', artistAlias.count())

artistData rows: 30537
userArtistData rows: 49481
artistAlias rows: 587


In [9]:
# create a dictionary of the artistAlias dataset
artistAlias_dic={}
for key,val in artistAlias.collect():
    if key not in artistAlias_dic:
        artistAlias_dic[key]=val

# if the artistid exists, replace the artistid from artistAlias, otherwise use the orginial value
userArtistData = userArtistData.map(lambda row: (row[0], artistAlias_dic[row[1]], row[2])   
                                    if row[1] in artistAlias_dic else (row[0], row[1], row[2])) 

# create an RDD with the userid and playcount objects of the original tuple
rdd = userArtistData.map(lambda row: (row[0],row[2]))  

# Count instances by key and store in broadcast variable
broadcast_var = spark.broadcast(rdd.countByKey())  

# Compute and display users with the highest playcount along with their mean playcount across artists
sol = rdd.groupByKey().mapValues(sum)
sol = sol.sortBy(lambda item: item[1], ascending=False).take(3)
for user, count in sol:
    print('User {} has a total play count of {} and a mean play count of {}.'.format(user, count, int(count/ broadcast_var.value[user])))

User 1059637 has a total play count of 674412 and a mean play count of 1878.
User 2064012 has a total play count of 548427 and a mean play count of 9455.
User 2069337 has a total play count of 393515 and a mean play count of 1519.


## Split the Data for Testing

We'll use the randomSplit function to divide our data into a training test, validation set, and test set using a random seed value of 55.

In [10]:
# split the data with a set seed of 55
splits = userArtistData.randomSplit([0.4,0.4,0.2], 55)
train = splits[0]
test = splits[2]
validation = splits[1]

In [11]:
# show the first 5 rows for each dataset to verify the splits are different
print('Train dataset:', train.take(5))
print('Test dataset:', test.take(5))
print('Validation dataset:', validation.take(5))

Train dataset: [(1059637, 1000049, 1), (1059637, 1000123, 2), (1059637, 1000130, 19129), (1059637, 1000289, 2), (1059637, 1000305, 1)]
Test dataset: [(1059637, 1000263, 180), (1059637, 1000527, 1), (1059637, 1000632, 250), (1059637, 1000926, 1), (1059637, 1001249, 2)]
Validation dataset: [(1059637, 1000010, 238), (1059637, 1000056, 1), (1059637, 1000062, 11), (1059637, 1000094, 1), (1059637, 1000112, 423)]


In [12]:
# show the number of rows in each split of the dataset
print('Train dataset rows:', train.count())
print('Test dataset rows:', test.count())
print('Validation dataset rows:', validation.count())

Train dataset rows: 19782
Test dataset rows: 9963
Validation dataset rows: 19736


## Model

We will build a function that will evaluate the model results. The model will predict X number of artist recommendations for a user. The recommendations will be compared to the artists the user has actually listened to. This will be done for all users with the average value returned.

In [13]:
# create a function to evaluate the model
def modelEval(model, dataset):
    
    # All artists in the 'userArtistData' dataset
    artist = spark.parallelize(set(userArtistData.map(lambda row: row[1]).collect()))
    
    # Set of all users in the current (Validation/Testing) dataset
    users = dataset.map(lambda row: row[0]).distinct().collect()

    # For each user, calculate the prediction score i.e. similarity between predicted and actual artists
    score = 0
    for user in users:
        Artist_data = artist.map(lambda z: (user,z))
        artist_train = train.filter(lambda z:z[0]==user)
        artist_train = artist_train.map(lambda z:z[1])
        
        lst_artist_train = artist_train.collect()
        artist_predict = Artist_data.filter(lambda z: z[1] not in lst_artist_train)
        predictions = model.predictAll(artist_predict)
        
        artist_dataset = dataset.filter(lambda z: z[0]==user).map(lambda z: z[1])
        
        predicted = predictions.sortBy(lambda z: z[2], ascending=False).take(artist_dataset.count())
        
        intersect_1 = set(list(map(lambda x:x[1], predicted)))
        intersect_2 = set(artist_dataset.collect())
 
        correct_predictions = set(intersect_1.intersection(intersect_2))
        artist_len = artist_dataset.count()
        correct_len = len(correct_predictions)
        temp = float(correct_len/artist_len)
        score += temp
    
    return float(score/len(users))

For this project, the rank parameter will be tested to help see which performs the best on the dataset. Values of 2, 5, 10, and 20 will be tested and a seed of 55 will be used for reproducibility.

In [14]:
rankParameters = [2, 5, 10, 20]
for rank in rankParameters:
    model = ALS.trainImplicit(train, rank , seed = 55)
    score = modelEval(model,validation)
    print("The model score for rank {} is {}".format(rank, score))

The model score for rank 2 is 0.08592256956348664
The model score for rank 5 is 0.0909463603579993
The model score for rank 10 is 0.09248896954043462
The model score for rank 20 is 0.08437272233820214


The rank parameter of 10 performed the best, so we will use that as our best model

In [None]:
bestModel = ALS.trainImplicit(train, rank=10, seed = 55)
modelEval(bestModel, test)

## Testing out Artist Recommendations

We'll use the three users found earlier to see what their top 5 artist recommendations are

In [None]:
# list the top 5 recommended artists for user 1059637
recommend = bestModel.recommendProducts(1059637, 5)
solution = [i[1] for i in recommend]
artist_dic = artistData.collectAsMap()

for i in range(len(solution)):
    print('Artist {}: {}'.format(i+1, artist_dic[solution[i]] ))

In [None]:
# list the top 5 recommended artists for user 2064012
recommend = bestModel.recommendProducts(2064012, 5)
solution = [i[1] for i in recommend]
artist_dic = artistData.collectAsMap()

for i in range(len(solution)):
    print('Artist {}: {}'.format(i+1, artist_dic[solution[i]] ))

In [None]:
# list the top 5 recommended artists for user 2069337 
recommend = bestModel.recommendProducts(2069337, 5)
solution = [i[1] for i in recommend]
artist_dic = artistData.collectAsMap()

for i in range(len(solution)):
    print('Artist {}: {}'.format(i+1, artist_dic[solution[i]] ))