# Large-scale Matrix Factorization for Music Recommendations

Xiaoyu Lu

The objective of this assignment is to predict personalized music recommendations based on the users' ratings of the music. I used the last.fm dataset, which include a main file of more than 140000 users, 1.5 million unique artists and 24 million users' information regarding how often they played a give song/artist. We also have supplementary files which have the names of artists and artists IDs.

In [1]:
from os import chdir, getcwd
from glob import glob
import numpy as np
import pyspark
import matplotlib.pyplot as plt
from pyspark import SparkContext
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

%matplotlib inline

In [2]:
path = getcwd()
chdir(path)

sc = SparkContext()

In [45]:
userartist = sc.textFile('user_artist_data_small.txt').map(lambda x: x.split(' '))
alias = sc.textFile('artist_alias_small.txt').map(lambda x: x.split('\t'))
artist = sc.textFile("artist_data_small.txt").map(lambda x: x.split('\t'))

Print out first 10 of each file:

In [25]:
for rdd in [userartist, alias, artist]:
    print(rdd.take(10))

[['1059637', '1000010', '238'], ['1059637', '1000049', '1'], ['1059637', '1000056', '1'], ['1059637', '1000062', '11'], ['1059637', '1000094', '1'], ['1059637', '1000112', '423'], ['1059637', '1000113', '5'], ['1059637', '1000114', '2'], ['1059637', '1000123', '2'], ['1059637', '1000130', '19129']]
[['1027859', '1252408'], ['1017615', '668'], ['6745885', '1268522'], ['1018110', '1018110'], ['1014609', '1014609'], ['6713071', '2976'], ['1014175', '1014175'], ['1008798', '1008798'], ['1013851', '1013851'], ['6696814', '1030672']]
[['1240105', 'André Visior'], ['1240113', 'riow arai'], ['1240132', 'Outkast & Rage Against the Machine'], ['6776115', '小松正夫'], ['1030848', "Raver's Nature"], ['6671601', 'Erguner, Kudsi'], ['1106617', 'Bloque'], ['1240185', 'Lexy & K. Paul'], ['6671631', 'Rev. W.M. Mosley'], ['6671632', 'Labelle, Patti']]


In [47]:
userartist.flatMap(alias).first()

Traceback (most recent call last):
  File "/Users/Larry/anaconda3/lib/python3.6/site-packages/pyspark/cloudpickle.py", line 148, in dump
    return Pickler.dump(self, obj)
  File "/Users/Larry/anaconda3/lib/python3.6/pickle.py", line 409, in dump
    self.save(obj)
  File "/Users/Larry/anaconda3/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/Users/Larry/anaconda3/lib/python3.6/pickle.py", line 751, in save_tuple
    save(element)
  File "/Users/Larry/anaconda3/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/Users/Larry/anaconda3/lib/python3.6/site-packages/pyspark/cloudpickle.py", line 255, in save_function
    self.save_function_tuple(obj)
  File "/Users/Larry/anaconda3/lib/python3.6/site-packages/pyspark/cloudpickle.py", line 292, in save_function_tuple
    save((code, closure, base_globals))
  File "/Users/Larry/anaconda3/lib/python3.6/pickle.py", line 476, 

PicklingError: Could not serialize object: Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

## 1. Pre-processing the dataset

In [48]:
from pyspark.sql import SparkSession
from pyspark.sql import Column
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from pyspark.ml.classification import NaiveBayes

path = getcwd()
chdir(path)

spark = SparkSession\
        .builder\
        .appName("ALS")\
        .getOrCreate()

In [3]:
df = spark.read.csv("training_test_data.txt",header=True,inferSchema=True)

Below is a udf (user defined function) that split the body text. The lines directly beneath applies the funciton to the `body` column meanwhile change the type to `ArrayType`.

In [4]:
def text_split(text):
    """
    user-defined funtion to split the text
    """
    text = text.split()
    return text

In [5]:
clean_udf = udf(text_split, ArrayType(StringType()))
df = df.withColumn("body", clean_udf("body"))

In [6]:
#following section transforms the text using TFIDF
start = time.clock()
hashingTF = HashingTF(inputCol="body", outputCol="term_freq")
df = hashingTF.transform(df)
idf = IDF(inputCol="term_freq", outputCol="tfidf")
idfModel = idf.fit(df)
df = idfModel.transform(df)
print ("pyspark TFIDF processing time: {0:.5f} s".format(time.clock() - start))

pyspark TFIDF processing time: 0.01982 s


## 4. Building a Naive Bayes Classifier

The first step is to convert the topics (nominal) to a list of discrete integers

In [7]:
#Using the OneHotEncoder to convert the topics into discrete integers
stringIndexer = StringIndexer(inputCol="topic", outputCol="topicIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)

The entire dataset will be split 3 ways into the training/test/cross-evaluation set, and 3 different split proportions (`50/40/10`, `60/30/10`, and `70/20/10`) were used. The Naive Bayes classfier was trained, and for each split condition our model will train 10 times to evaluate the sensitivity of the model.

A total number of 30 models will be trained, and their parameters and accuracy are stored as key-value pairs in a dictionary.

In [8]:
val_dict = dict()
train_test_cv_split_params = {'50/40/10': [0.5, 0.4, 0.1],
                               '60/30/10': [0.6, 0.3, 0.1], 
                               '70/20/10': [0.7, 0.2, 0.1]}

for split_param in train_test_cv_split_params.keys(): #run the model for each train/test/cv split
    for seed in np.arange(10): #run each model 10 times using different random seed
        train,test,cv = indexed.select("tfidf","topicIndex").randomSplit(train_test_cv_split_params[split_param],seed=seed)

        #Naive bayes
        nb = NaiveBayes(featuresCol="tfidf", labelCol="topicIndex", predictionCol="NB_pred",
                        probabilityCol="NB_prob", rawPredictionCol="NB_rawPred")
        nbModel = nb.fit(train)
        cv = nbModel.transform(cv)
        total = cv.count()
        correct = cv.where(test['topicIndex'] == cv['NB_pred']).count()
        accuracy = correct/total
        val_dict[(split_param, seed)] = accuracy

In [9]:
params = max(val_dict, key = val_dict.get)
print("The combination of parameters that produced the highest accuracy ({0:.2f}): train/test/cv split ratio: {1}, randomseed: {2}".format(max(val_dict.values()),params[0], params[1]))

The combination of parameters that produced the highest accuracy (0.52): train/test/cv split ratio: 70/20/10, randomseed: 5


In [13]:
def meancal(val_dict, split_param):
    l = list()
    for i in val_dict.keys():
        if i[0] == split_param:
            l.append(val_dict[i])
    lmean = np.mean(l)
    lstd = np.std(l)
    return (lmean, lstd)

In [16]:
print('The mean accuracy of the 30 models: {0:.3f}'.format(np.mean(list(val_dict.values()))))

for split_param in train_test_cv_split_params:
    mean_accuracy, std_accuracy = meancal(val_dict, split_param)
    print('The split condition {0} has a mean accuracy of {1:.3f}'.format(split_param, mean_accuracy))
    print('The st.d. of split condition {0} for 10 runs: {1:.3f}'.format(split_param, std_accuracy))

The mean accuracy of the 30 models: 0.491
The split condition 50/40/10 has a mean accuracy of 0.481
The st.d. of split condition 50/40/10 for 10 runs: 0.013
The split condition 60/30/10 has a mean accuracy of 0.495
The st.d. of split condition 60/30/10 for 10 runs: 0.013
The split condition 70/20/10 has a mean accuracy of 0.498
The st.d. of split condition 70/20/10 for 10 runs: 0.012


Generally, the accuracy of the model increases as we have a higher proportion of training data. In our case the highest performing model was produced with a split of `70/20/10` with a mean accuracy of 0.498. The `70/20/10` split also produces the lowest st.d., therefore it is less sensitive to the split of the data compared to the other two. It is possible that if we increase the size of the training data, we will increase the model accuracy.



For this assignment, I built a simple Naive Bayes classifier to classify articles based on the topics. As the
result suggests, when using the highest percent of data for training, the model produced the highest accuracy.
A possible future development might be to gather more data to see if the accuracy improves.

Only a limited number of topics were used for this assignment. It would be interesting to see what the
model performance will be when more topics are used. Also, we only used single topic for each article
entry, without considering multi-labeled articles.

Last, I trained the model based on the TFIDF result created using the cleaned text. I did not consider
the combination of multiple words and their influence to the model. `Ngram` provides such capability, and it
would be interesting to see if the model improves.