Student: Jun Pan

Requirement

The goal of this project is give you practice beginning to work with a distributed recommender system.  It is sufficient for this assignment to build out your application on a single node. 
 
Adapt one of your recommendation systems to work with Apache Spark and compare the performance with your previous iteration. Consider the efficiency of the system and the added complexity of using Spark. You may complete the assignment using PySpark (Python), SparkR (R) , sparklyr (R), or Scala. 
 
Please include in your conclusion:  For your given recommender systemâ€™s data, algorithm(s), and (envisioned) implementation, at what point would you see moving to a distributed platform such as Spark becoming necessary? 
 
You may work on any platform of your choosing, including Databricks Community Edition or in local mode.  You are encouraged but not required to work in a small group on this project. 



###Music Recommendation System Using Apache Spark & Python

Objective: To build a music recommendation system to recommend new musical artists to users based on their listening history. This system employs the use of Apache Spark and the collaborative filtering technique. The Alternating Least Squares (ALS) learning algorithm will be used for the underlying implementation. This project uses publicly available song data from audioscrobbler (Audioscrobbler.com).

Music Listening Dataset: contains profiles for around 150,000 real people
The dataset lists the artists each person listens to, and a counter
indicating how many times each user played each artist.

FILES:
user_artist_data.txt
    3 columns: userid artistid playcount

artist_data.txt
    2 columns: artistid artist_name

artist_alias.txt
    2 columns: badid, goodid
    known incorrectly spelt artists and the correct artist id. 
    you can correct errors in user_artist_data as you read it in using this file
    (we're not yet finished merging this data)   


In [1]:
#Import Necessory Packages
from pyspark.mllib.recommendation import *
import random
from operator import *
from collections import defaultdict

In [2]:
# Load Data into RDD variables
from pyspark import SparkContext
sc =SparkContext()

artistData=sc.textFile('C:/Users/tbao/Desktop/Data612 Project5/artist_data_small.txt')
artistAlias=sc.textFile('C:/Users/tbao/Desktop/Data612 Project5/artist_alias_small.txt')
userArtistData=sc.textFile('C:/Users/tbao/Desktop/Data612 Project5/user_artist_data_small.txt')

Data Exploration
In the block below, we try to use the users' total play counts to locate three users with the highest number of total play counts (sum of all counters).   

In [3]:
#DATA EXPLORATION
# Split a sequence into seperate entities and store as int
artistData=artistData.map(lambda x: x.split("\t")).map(lambda x: (int(x[0]), x[1]))
artistAlias=artistAlias.map(lambda x: x.split("\t")).map(lambda x: (int(x[0]), int(x[1])))
userArtistData = userArtistData.map(lambda x: x.split(" ")).map(lambda x: (int(x[0]), int(x[1]), int(x[2])))

# Create a dictionary of the 'artistAlias' dataset
artistAliasDict =  dict(artistAlias.collect())

# If artistid exists, replace with artistsid from artistAlias, else retain original
userArtistData = userArtistData.map(lambda x: (x[0], artistAliasDict[x[1]], x[2]) if x[1] in artistAliasDict.keys() else x)

# Create an RDD consisting of 'userid' and 'playcount' objects of original tuple
userPlays=userArtistData.map(lambda x: (x[0],x[2]))

# Count instances by key and store in broadcast variable
userCount=sc.broadcast(userPlays.countByKey())

# Compute and display users with the highest playcount along with their mean playcount across artists
topThreeUsers=userPlays.reduceByKey(lambda a,b: a+b).takeOrdered(3,key=lambda x:-x[1])
for x in topThreeUsers:
    print('User %d has a total play count of %d and a mean play count of %d.'%(x[0],x[1],x[1]/userCount.value[x[0]]))

User 1059637 has a total play count of 674412 and a mean play count of 1878.
User 2064012 has a total play count of 548427 and a mean play count of 9455.
User 2069337 has a total play count of 393515 and a mean play count of 1519.


The Recommender Model
For this project, we will train the model with implicit feedback. You can read more information about this from the collaborative filtering page: http://spark.apache.org/docs/latest/mllib-collaborative-filtering.html. The function you will be using has a few tunable parameters that will affect how the model is built. Therefore, to get the best model, we will do a small parameter sweep and choose the model that performs the best on the validation set

Therefore, we must first devise a way to evaluate models. Once we have a method for evaluation, we can run a parameter sweep, evaluate each combination of parameters on the validation data, and choose the optimal set of parameters. The parameters then can be used to make predictions on the test data.

Model Evaluation
Although there may be several ways to evaluate a model, we will use a simple method here. Suppose we have a model and some dataset of true artist plays for a set of users. This model can be used to predict the top X artist recommendations for a user and these recommendations can be compared the artists that the user actually listened to (here, X will be the number of artists in the dataset of true artist plays). Then, the fraction of overlap between the top X predictions of the model and the X artists that the user actually listened to can be calculated. This process can be repeated for all users and an average value returned.

For example, suppose a model predicted [1,2,4,8] as the top X=4 artists for a user. Suppose, that user actually listened to the artists [1,3,7,8]. Then, for this user, the model would have a score of 2/4=0.5. To get the overall score, this would be performed for all users, with the average returned.

NOTE: when using the model to predict the top-X artists for a user, do not include the artists listed with that user in the training data.

Name your function modelEval and have it take a model (the output of ALS.trainImplicit) and a dataset as input. For parameter tuning, the dataset parameter should be set to the validation data (validationData). After parameter tuning, the model can be evaluated on the test data (testData).

In [None]:
def modelEval(model,dataset):
    trueArtists = dataset.map(lambda x:(x[0],x[1])).groupByKey()
    allArtists = userArtistDataTemp.map(lambda x:(x[1])).distinct()
    score_list = []
    for artist in trueArtists.collect():
        nonTrainArtists = allArtists.subtract(trainData.filter(lambda x : (x[0] == artist[0])).map(lambda x:(x[1])))
        nonTrainArtists = nonTrainArtists.map(lambda x : (artist[0],x))
        predictedResults = model.predictAll(nonTrainArtists).map(lambda r: (r[0], r[1], r[2]))
        recommendations =  sc.parallelize(predictedResults.takeOrdered(len(artist[1]), key=lambda x: -x[2])).map(lambda x : x[1])
        score_list.append(calculate_score(artist[1], recommendations))
    return sum(score_list)/len(score_list)

def calculate_score(actual_list, predicted_list):
    score = float(sc.parallelize(actual_list).intersection(predicted_list).count())/float(len(actual_list))
    return score

In [4]:
#Splitting Data for Testing
#using randomSplit function to divide the data (userArtistData) into:
#a train set (405), a validation set(40%) and a test set (20%).
trainData, validationData, testData = userArtistData.randomSplit([4,4,2], seed=123)
trainData.cache()
validationData.cache()
testData.cache()

# Display the first 3 records of each dataset followed by the total count of records for each datasets
print(trainData.take(3))
print(validationData.take(3))
print(testData.take(3))
print(trainData.count())
print(validationData.count())
print(testData.count())

[(1059637, 1000049, 1), (1059637, 1000056, 1), (1059637, 1000062, 11)]
[(1059637, 1000113, 5), (1059637, 1000123, 2), (1059637, 1000241, 188)]
[(1059637, 1000010, 238), (1059637, 1000263, 180), (1059637, 1000427, 20)]
19751
19681
10049


The Recommender Model
For this project, we will train the model with implicit feedback. You can read more information about this from the collaborative filtering page: http://spark.apache.org/docs/latest/mllib-collaborative-filtering.html. The function you will be using has a few tunable parameters that will affect how the model is built. Therefore, to get the best model, we will do a small parameter sweep and choose the model that performs the best on the validation set

Therefore, we must first devise a way to evaluate models. Once we have a method for evaluation, we can run a parameter sweep, evaluate each combination of parameters on the validation data, and choose the optimal set of parameters. The parameters then can be used to make predictions on the test data.

Model Evaluation
Although there may be several ways to evaluate a model, we will use a simple method here. Suppose we have a model and some dataset of true artist plays for a set of users. This model can be used to predict the top X artist recommendations for a user and these recommendations can be compared the artists that the user actually listened to (here, X will be the number of artists in the dataset of true artist plays). Then, the fraction of overlap between the top X predictions of the model and the X artists that the user actually listened to can be calculated. This process can be repeated for all users and an average value returned.

For example, suppose a model predicted [1,2,4,8] as the top X=4 artists for a user. Suppose, that user actually listened to the artists [1,3,7,8]. Then, for this user, the model would have a score of 2/4=0.5. To get the overall score, this would be performed for all users, with the average returned.

NOTE: when using the model to predict the top-X artists for a user, do not include the artists listed with that user in the training data.

Name your function modelEval and have it take a model (the output of ALS.trainImplicit) and a dataset as input. For parameter tuning, the dataset parameter should be set to the validation data (validationData). After parameter tuning, the model can be evaluated on the test data (testData).

In [5]:
#Build ALS model (http://spark.apache.org/docs/latest/mllib-collaborative-filtering.html)
def modelEval(model, dataset):
    
    # All artists in the 'userArtistData' dataset
    allArtists = userArtistData.map(lambda x: x[1]).collect()
    
    # Set of all users in the current (Validation/Testing) dataset
    userArtists = set(dataset.map(lambda x: x[0]).collect())
    
    # Create a dictionary of (key, values) for current (Validation/Testing) dataset
    userArtistsDict = dict(dataset.map(lambda x: (x[0], x[1])).groupByKey().mapValues(set).collect())
    
    # Create a dictionary of (key, values) for training dataset
    userArtistTrain = dict(trainData.map(lambda x: (x[0],x[1])).groupByKey().mapValues(set).collect())
    
    # For each user, calculate the prediction score i.e. similarity between predicted and actual artists
    total = 0
    for key in userArtists:
        # Find the set of artists who are not in the training dataset
        nonTrainArtists = set(allArtists) - userArtistTrain[key]
        # Obtain artists actually listened to by the user
        origArtists = userArtistsDict[key]
        # Count of artists
        origArtistsCnt = len(origArtists)
        # Map user to each artists and create RDD
        userArtistTest = sc.parallelize(map(lambda x: (key, x),nonTrainArtists))
        # Predict top artists listen to by the user
        predArtists = model.predictAll(userArtistTest).sortBy(ascending=False, keyfunc = lambda x: x[2]).map(lambda x:x[1]).take(origArtistsCnt)
        # Add the score of the model
        total += (float(len(set(predArtists).intersection(origArtists))) / origArtistsCnt)
        
    # Print average score of the model for all users for the specified rank
    print(rank,float(total)/len(userArtists))

Model Construction
Now we can build the best model possibly using the validation set of data and the modelEval function. Although, there are a few parameters we could optimize, for the sake of time, we will just try a few different values for the rank parameter (leave everything else at its default value, except make seed=345). Loop through the values [2, 10, 20] and figure out which one produces the highest scored based on your model evaluation function.

Note: this procedure may take several minutes to run.

In [9]:
#Construct model
rankList = [2,10,20]
for rank in rankList:
    model = ALS.trainImplicit(trainData, rank , seed=123)
    modelEval(model,validationData)

2 0.08780208882767691
10 0.09986291068065163


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 13149.0 failed 1 times, most recent failure: Lost task 2.0 in stage 13149.0 (TID 6531, localhost, executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:170)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:97)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:108)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:164)
	... 14 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor84.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:170)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:97)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:108)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:164)
	... 14 more


Now, using the bestModel, we will check the results over the test data. 

In [10]:
bestModel = ALS.trainImplicit(trainData, rank=10, seed=345)
modelEval(bestModel, testData)

20 0.060453764195471694


Trying Some Artist Recommendations
Using the best model above, predict the top 5 artists for user 1059637 using the recommendProducts function. Map the results (integer IDs) into the real artist name using artistAlias. Print the results. 

In [12]:
# Find the top 5 artists for a particular user and list their names
topRating = bestModel.recommendProducts(1059637, 5)
artistRating = map(lambda x: x.product, topRating)
artistDataDict = dict(artistData.collect())
count = 0
for key in artistRating:
    print("Artist " + str(count) + ":", artistDataDict[key])
    count += 1

Artist 0: Counting Crows
Artist 1: Bright Eyes
Artist 2: Taking Back Sunday
Artist 3: Green Day
Artist 4: The Movielife


Discussion:

Even with running just the local instance, Spark improved overall performance. This is clearly the biggest advantage of the distributed processing. The biggest disadvantage is more complex implementation. I believe this is the main tradeoff. 

With a simple recommender system, implementing Spark will be an overkill - not enough benefit for the effort. Also, relative low number dataset would be more appropriate to be handled by a single server. On another hand, something like Netflix, a well worn recommender system example, definitely needs very high performance. 

Additionally, it is important to consider how often a model should be updated. If a recommender system needs to respond to changes quickly, performance again becomes key and distributed processing is worth the effort. 



Discussion:

Even with running just the local instance, Spark improved overall performance, because Spark is a platform for cluster computing. Spark lets you spread data and computations over clusters with multiple nodes(think of each node as a separate computer).  As each node works on its own subset of the total data, it also carries out a part of the total calculations required, so that both data processing and computation are performed in parallel over the nodes in the cluster. It is a fact that parallel computation can make certain types of programming tasks much faster. This is clearly the biggest advantage of the distributed processing. 

The biggest disadvantage is more complex implementation. With a simple recommender system, implementing Spark will be an overkill - not enough benefit for the effort. Also, relative low number dataset would be more appropriate to be handled by a single server. On another hand, something like Netflix, a well worn recommender system example, definitely needs very high performance.

In addition, it is important to consider how often a model should be updated. If a recommender system needs to respond to changes quickly, performance again becomes key and distributed processing is worth the effort.