
# BM Coursera Advanced Data Science Capstone
## Sahand Azad


Building an artist recommender system
As its name implies, a recommender system is a tool that helps predicting what a user may or may not like among a list of given items. In some sense, you can view this as an alternative to content search, as recommendation engines help users discover products or content that they may not come across otherwise. For example, Facebook suggests friends and pages to users. Youtube recommends videos which users may be interested in. Amazon suggests the products which users may need... Recommendation engines engage users to services, can be seen as a revenue optimization process, and in general help maintaining interest in a service.

In this notebook, I demonstrate how to build a simple recommender system: we focus on music recommendations, and we use a simple algorithm to predict which items users might like, that is called ALS -- alternating least squares.

## Goals
In this notebook, I expect to:

Revisit (or learn) recommender algorithms

Understand the idea of Matrix Factorization and the ALS algorithm (serial and parallel versions)

Build a simple model for a real usecase: music recommender system

Understand how to validate the results

## Steps
Inspect the data using Spark SQL, and build some basic, but very valuable knowledge about the information we have at hand
Formally define what is a sensible algorithm to achieve our goal: given the "history" of user taste for music, recommend new music to discover. Essentialy, we want to build a statistical model of user preferences such that we can use it to "predict" which additional music the user could like
With our formal definition at hand, we will learn different ways to implement such an algorithm. Our goal here is to illustrate what are the difficulties to overcome when implementing a (parallel) algorithm
Finally, we will focus on an existing implementation, available in the Apache Spark MLLib, which we will use out of the box to build a reliable statistical model
## Furthermore:

One important topic that I will cover in the Notebooks is how to validate the results we obtain, and how to choose good parameters to train models especially when using an "opaque" library for doing the job. As a consequence, I will focus on the statistical validation of our recommender system.
Furthermore:

One important topic that I will cover in the Notebooks is how to validate the results we obtain, and how to choose good parameters to train models especially when using an "opaque" library for doing the job. As a consequence, I will focus on the statistical validation of our recommender system.

## 1. Data
Understanding data is one of the most important part when designing any machine learning algorithm. In this notebook, I will use a data set published by Audioscrobbler - a music recommendation system for last.fm. Audioscrobbler is also one of the first internet streaming radio sites, founded in 2002. It provided an open API for “scrobbling”, or recording listeners’ plays of artists’ songs. last.fm used this information to build a powerful music recommender engine.

1.1. Data schema
Unlike a rating dataset which contains information about users' preference for products (one star, 3 stars, and so on), the datasets from Audioscrobbler only has information about events: specifically, it keeps track of how many times a user played songs of a given artist and the names of artists. That means it carries less information than a rating: in the literature, this is called explicit vs. implicit ratings.

## Reading material
### Implicit Feedback for Inferring User Preference: 
A Bibliography Comparing explicit and implicit feedback techniques for web retrieval: TREC-10 interactive track report
Probabilistic Models for Data Combination in Recommender Systems

...	...
## 1.2. Data Exploration: simple descriptive statistics
In order to choose or design a suitable algorithm for achieving our goals, given the data we have, we should first understand data characteristics. To start, we import the necessary packages to work with regular expressions, Data Frames, and other nice features of our programming environment.

In [1]:
!set -e

!mkdir -p dataset
!cd dataset
!wget http://www.iro.umontreal.ca/~lisa/datasets/profiledata_06-May-2005.tar.gz
!tar xvf profiledata_06-May-2005.tar.gz
!mv profiledata_06-May-2005/* dataset/
!rm -r profiledata_06-May-2005

--2019-05-30 13:03:53--  http://www.iro.umontreal.ca/~lisa/datasets/profiledata_06-May-2005.tar.gz
Resolving www.iro.umontreal.ca (www.iro.umontreal.ca)... 132.204.26.36
Connecting to www.iro.umontreal.ca (www.iro.umontreal.ca)|132.204.26.36|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 135880312 (130M) [application/x-gzip]
Saving to: ‘profiledata_06-May-2005.tar.gz.4’


2019-05-30 13:03:59 (24.5 MB/s) - ‘profiledata_06-May-2005.tar.gz.4’ saved [135880312/135880312]

profiledata_06-May-2005/
profiledata_06-May-2005/artist_data.txt
profiledata_06-May-2005/README.txt
profiledata_06-May-2005/user_artist_data.txt
profiledata_06-May-2005/artist_alias.txt


In [2]:
!cat dataset/user_artist_data.txt | head

1000002 1 55
1000002 1000006 33
1000002 1000007 8
1000002 1000009 144
1000002 1000010 314
1000002 1000013 8
1000002 1000014 42
1000002 1000017 69
1000002 1000024 329
1000002 1000025 1
cat: write error: Broken pipe


In [3]:
!pip install pyspark




In [1]:
import os
import sys
import re
import random
from pyspark import SparkContext

from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql.functions import *


%matplotlib inline
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from time import time
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext

conf = SparkConf().setAppName("building a warehouse")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)


# Base directory path
base = "dataset/"


Using SPARK SQL, load data from /dataset/user_artist_data.txt and show the first 20 entries (via function show()).

For this Notebook, from a programming point of view, we are given the schema for the data we use, which is as follows:

           userID: long int
           artistID: long int
           playCount: int
Each line of the dataset contains the above three fields, separated by a "white space".

In [2]:

userArtistDataSchema = StructType([ \
    StructField("userID", LongType(), True), \
    StructField("artistID", LongType(), True), \
    StructField("playCount", IntegerType(), True)])

userArtistDF = sqlContext.read \
    .format('com.databricks.spark.csv') \
    .options(header='false', delimiter=' ') \
    .load(base + "user_artist_data.txt", schema = userArtistDataSchema) \
    .cache()

# we can cache an Dataframe to avoid computing it from the beginning everytime it is accessed.
userArtistDF.cache()

#userArtistDF.show()

DataFrame[userID: bigint, artistID: bigint, playCount: int]

In [3]:
userArtistDF.show()

IllegalArgumentException: 'Unsupported class file major version 55'

### How many distinct users do we have in our data?


In [4]:

allusers = userArtistDF.count()
print("All rows in database: ", allusers )
uniqueUsers = userArtistDF.select('userID').distinct().count()
print("Total n. of distinct users: ", uniqueUsers)

Py4JJavaError: An error occurred while calling o30.count.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange SinglePartition
+- *(1) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#71L])
   +- *(1) InMemoryTableScan
         +- InMemoryRelation [userID#0L, artistID#1L, playCount#2], StorageLevel(disk, memory, deserialized, 1 replicas)
               +- *(1) FileScan csv [userID#0L,artistID#1L,playCount#2] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/sahand/dataset/user_artist_data.txt], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<userID:bigint,artistID:bigint,playCount:int>

	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:151)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:296)
	at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2830)
	at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2829)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
	at org.apache.spark.sql.Dataset.count(Dataset.scala:2829)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.IllegalArgumentException: Unsupported class file major version 55
	at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:166)
	at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:148)
	at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:136)
	at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:237)
	at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:49)
	at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:517)
	at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:500)
	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
	at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
	at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
	at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:134)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
	at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:500)
	at org.apache.xbean.asm6.ClassReader.readCode(ClassReader.java:2175)
	at org.apache.xbean.asm6.ClassReader.readMethod(ClassReader.java:1238)
	at org.apache.xbean.asm6.ClassReader.accept(ClassReader.java:631)
	at org.apache.xbean.asm6.ClassReader.accept(ClassReader.java:355)
	at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:517)
	at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:500)
	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
	at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
	at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
	at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:134)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
	at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:500)
	at org.apache.xbean.asm6.ClassReader.readCode(ClassReader.java:2175)
	at org.apache.xbean.asm6.ClassReader.readMethod(ClassReader.java:1238)
	at org.apache.xbean.asm6.ClassReader.accept(ClassReader.java:631)
	at org.apache.xbean.asm6.ClassReader.accept(ClassReader.java:355)
	at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:517)
	at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:500)
	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
	at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
	at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
	at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:134)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
	at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:500)
	at org.apache.xbean.asm6.ClassReader.readCode(ClassReader.java:2175)
	at org.apache.xbean.asm6.ClassReader.readMethod(ClassReader.java:1238)
	at org.apache.xbean.asm6.ClassReader.accept(ClassReader.java:631)
	at org.apache.xbean.asm6.ClassReader.accept(ClassReader.java:355)
	at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:517)
	at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:500)
	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
	at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
	at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
	at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:134)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
	at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:500)
	at org.apache.xbean.asm6.ClassReader.readCode(ClassReader.java:2175)
	at org.apache.xbean.asm6.ClassReader.readMethod(ClassReader.java:1238)
	at org.apache.xbean.asm6.ClassReader.accept(ClassReader.java:631)
	at org.apache.xbean.asm6.ClassReader.accept(ClassReader.java:355)
	at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:307)
	at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:306)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:306)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
	at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:371)
	at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.map(RDD.scala:370)
	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.inputRDD$lzycompute(InMemoryTableScanExec.scala:111)
	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.inputRDD(InMemoryTableScanExec.scala:104)
	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.inputRDDs(InMemoryTableScanExec.scala:155)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:151)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
	... 37 more


### What are the maximum and minimum values of column userID ?
One limitation of Spark MLlib's ALS implementation - which we will use later - is that it requires IDs for users and items to be nonnegative 32-bit integers. This means that IDs larger than Integer.MAX_VALUE, or 2147483647, can't be used. So we need to check whether this data set conforms to the strict requirements of our library.


In [None]:
MAX_VALUE = 2147483647
#showing the IDs which are invalid
userArtistDF[(userArtistDF.userID.cast("int") < 0)].show()
userArtistDF[(userArtistDF.userID.cast("int") > MAX_VALUE)].show()
#As the result, we don't see any invalid userID
#Otherwise, we can use function describe() of Spark MLlib to show the statistics
userArtistDF.select("userID").describe().show()

### What is the maximum and minimum values of column artistID ?


In [None]:

#Finding min and max of artistID using function groupby() and max|min()
max_artistID = userArtistDF.groupby().max('artistID').collect()[0].asDict()['max(artistID)']
print('Maximum value of artistID: ',max_artistID)
min_artistID = userArtistDF.groupby().min('artistID').collect()[0].asDict()['min(artistID)']
print('Minimum value of artistID: ',min_artistID)

#Again, we can use describe() for short
userArtistDF.select("artistID").describe().show()

We just discovered that we have a total of 148,111 users in our dataset. Similarly, we have a total of 1,631,028 artists in our dataset. The maximum values of userID and artistID are still smaller than the biggest number of integer type. No additional transformation will be necessary to use these IDs.

One thing we can see here is that SPARK SQL provides very concise and powerful methods for data analytics (compared to using RDD and their low-level API). You can see more examples here.

Next, we might want to understand better user activity and artist popularity.

Here is a list of simple descriptive queries that helps us reaching these purposes:

    #. _How many times each user has played a song? This is a good indicator of who are the most active users of our service. Note that a very active user with many play counts does not necessarily mean that the user is also "curious"! Indeed, she could have played the same song several times.
    #. How many play counts for each artist? This is a good indicator of the artist popularity. Since we do not have time information associated to our data, we can only build a, e.g., top-10 ranking of the most popular artists in the dataset. Later in the notebook, we will learn that our dataset has a very "loose" definition about artists: very often artist IDs point to song titles as well. This means we have to be careful when establishing popular artists. Indeed, artists whose data is "well formed" will have the correct number of play counts associated to them. Instead, artists that appear mixed with song titles may see their play counts "diluted" across their songs.

###  How many times each user has played a song? Display 5 samples of the result.


In [None]:
# Compute user activity
# We are interested in how many playcounts each user has scored.
userActivity = userArtistDF.groupBy('userID').sum('playCount').collect()
print(userActivity[0:5])
len(userActivity)


### Plot CDF (or ECDF) of the number of play counts per User ID.
Look at important percentiles (25%, median, 75%, tails such as >90%)
Discuss about users: we will notice that for some users, there is very little interaction with the system, which means that maybe reccommending something to them is going to be more difficult than for other users who interact more with the system.
Look at outliers and reasons about their impact on the reccommender algorithm

In [None]:
pdf = pd.DataFrame(data=userActivity)
Y=np.sort( pdf[1] )
yvals=np.arange(len(Y))/float(len(Y))

print(np.arange(len(Y)))

plt.semilogx( Y, yvals )
plt.xlabel('Play Counts')
plt.ylabel('ECDF')
plt.grid(True,which="both",ls="-")
plt.title('ECDF of number of play counts per User ID')
plt.show()

#Morever, I would like to show some statistical measurements of userAcitvity
print ('Total =', Y.sum())
print ('Mean =', Y.mean())
print ('Min =', Y.min())
print ('Max =', Y.max())

# look at important percentiles (25%, median, 75%, tails such as >90%)
print('Percentile 25% :' + str(np.percentile(Y,25)))
print('Percentile 50% :' + str(np.percentile(Y,50)))
print('Percentile 75% :' + str(np.percentile(Y,75)))
print('Percentile 90% :' + str(np.percentile(Y,90)))
print('Percentile 95% :' + str(np.percentile(Y,95)))
print('Percentile 99% :' + str(np.percentile(Y,99)))

# look at the percentile has playCount less than 10
print('The percentage of user playing less than 10 times P(Y<=10) =', len(Y[Y<=10])/len(Y))

### We have total 371638969 play counts.
### Inaverage, every user plays 2509 times.
### 25% of the users have the play counts less than or equal to (<=) 204 times.
### 50% of the users have the play counts less than or equal to (<=) 892times.
### 75% of the users have the play counts less than or equal to (<=) 2800 times.
### 95% of the users have the play counts less than or equal to (<=) 10120 times.
### 99% of the users have the play counts less than or equal to (<=) 21569 times.
### The result is plausible with the figure above.
### About 7746 users (5.23%) have the play counts less than or equal to (<=) 10 times. These users have very little interaction with the system, so there is more difficult for recommending for these users than other users creating more impact in the system (have a certain number of playCount).
### How many play counts for each artist?

In [None]:
artistPopularity = userArtistDF.groupBy('artistID').sum('playCount').collect()
print(artistPopularity[0:5]) #print first 5 artistID in dataframe
len(artistPopularity)

In [None]:
pdf1 = pd.DataFrame(data=artistPopularity)
Y1=np.sort( pdf1[1] )
yvals1=np.arange(len(Y1))/float(len(Y1))

print(np.arange(len(Y1)))

plt.semilogx( Y1, yvals1 )
plt.xlabel('Play Counts')
plt.ylabel('ECDF')
plt.grid(True,which="both",ls="-")
plt.title('ECDF of number of play counts per artist ID')
plt.show()



#
print ('Sum =', Y1.sum())
print ('Mean =', Y1.mean())
print ('Min =', Y1.min())
print ('Max =', Y1.max())
print ('Top 5 play counts:', Y1[len(Y1)-5:len(Y1)])
print ('Sum top 5 artist play counts:', Y1[len(Y1)-5:len(Y1)].sum())
print ('Percentage of top 5 artist play counts:', Y1[len(Y1)-5:len(Y1)].sum()/Y1.sum())
print ('P(playCount<=10) =', len(Y1[Y1<=10])/len(Y1))
print ('P(playCount<=1000) =', len(Y1[Y1<=1000])/len(Y1))

In [None]:
sortedArtist = sorted(artistPopularity, key = lambda x: -x[1])[:5]

artistID = [w[0] for w in sortedArtist]

y_pos = range(len(sortedArtist))
frequency = [w[1] for w in sortedArtist]

plt.barh(y_pos, frequency[::-1], align='center', alpha=0.4)
plt.yticks(y_pos, artistID[::-1])
plt.xlabel('Play Count')
plt.ylabel('Artist')
plt.title('Top-5 Artist ID per play counts')
plt.show()

##Answer:
### This resuls seem not reasonable.
### In the previous, 98.74% of the artists have the play count less than or equal to (<=) 1000 times. So, top-5-artists is not sufficient to extract infomation about the data, they are the outliers . We should cut them out of data.

### As the result, top-5-artists take about 2.6% of the total play counts, but 98.74% of the artists have the play count less than or equal to (<=) 1000 times. Then this is obviously that some artists are played much more than other artists.
### All seems clear right now, but ... wait a second! What about the problems indicated above about artist "disambiguation"? Are these artist ID we are using referring to unique artists? How can we make sure that such "opaque" identifiers point to different bands? Let's try to use some additional dataset to answer this question:  artist_data.txt dataset. This time, the schema of the dataset consists in:

artist ID: long int

name: string

We will try to find whether a single artist has two different IDs.


### 1.3 Data Integration¶
### Loading artist data
Load the data from /dataset/artist_data.txt and use the SparkSQL API to show 5 samples.



In [None]:

customSchemaArtist = StructType([ \
    StructField("artistID", LongType(), True), \
    StructField("name", StringType(), True)])

artistDF = sqlContext.read \
    .format('com.databricks.spark.csv') \
    .options(header='false', delimiter='\t', mode='DROPMALFORMED') \
    .load(base + "artist_data.txt", schema = customSchemaArtist) \
    .cache()
artistDF.show(5)


### Look at top 20 artists whose name contains "Aerosmith". Take a look at artists that have ID equal to 1000010 and 2082323. Are they pointing to the same artist?
HINT: Function locate(sub_string, string) can be useful in this case.

In [None]:

# get artists whose name contains "Aerosmith"
artistDF[locate("Aerosmith", artistDF.name) > 0].show(20, False)

# show two examples
artistDF[artistDF.artistID==1000010].show()
artistDF[artistDF.artistID==2082323].show()

### in my opinion, they are pointing to the same artist
To answer this question correctly, we need to use an additional dataset artist_alias.txt which contains the ids of mispelled artists and standard artists. The schema of the dataset consists in:

mispelledID ID: long int
standard ID: long int
Using SparkSQL API, load the dataset from /dataset/artist_alias.txt then show 5 samples.

In [None]:

customSchemaArtistAlias = StructType([ \
    StructField("mispelledID", LongType(), True), \
    StructField("standardID", LongType(), True)])

artistAliasDF = sqlContext.read \
    .format('com.databricks.spark.csv') \
    .options(header='false', delimiter='\t', mode='DROPMALFORMED') \
    .load(base + "artist_alias.txt", schema = customSchemaArtistAlias) \
    .cache()

artistAliasDF.show(5)

Verify the answer of "Are artists that have ID equal to 1000010 and 2082323 the same ?" by finding the standard ids corresponding to the mispelled ids 1000010 and 2082323 respectively.¶

In [None]:
artistAliasDF[artistAliasDF.mispelledID == "1000010" ].show()
artistAliasDF[artistAliasDF.mispelledID == "2082323" ].show()

In [None]:
artistAlias = artistAliasDF.rdd.map(lambda row: (row.mispelledID,row.standardID)).collectAsMap()

#checking the total number of standard artistID
len(artistAlias)

In [None]:
from time import time

bArtistAlias = sc.broadcast(artistAlias)

def replaceMispelledIDs(fields):
    finalID = bArtistAlias.value.get(fields[1] ,fields[1])
    return (fields[0], finalID, fields[2])

t0 = time()

newUserArtistDF = sqlContext.createDataFrame(
    userArtistDF.rdd.map(replaceMispelledIDs), 
    userArtistDataSchema
)
newUserArtistDF.show(5)
t1 = time()

print('The script takes %f seconds' %(t1-t0))
newUserArtistDF = newUserArtistDF.cache()

In [None]:
uniqueArtists = newUserArtistDF.select('artistID').distinct().count()

print("Total n. of artists: ", uniqueArtists)

In [None]:
top10ArtistsPC = newUserArtistDF.groupBy('artistID').sum('playCount').orderBy('sum(playCount)', ascending=0).take(10)

y_pos = list(range(len(top10ArtistsPC)))
pdf = pd.DataFrame(data=top10ArtistsPC)

plt.barh(y_pos, pdf[1][::-1], align='center', alpha=0.4)
plt.yticks(y_pos, pdf[0][::-1])
plt.xlabel('Play Count')
plt.ylabel('Artist')
plt.title('Top-10 Artist ID per play counts')
plt.show()

In [None]:
top10UsersByPlayCount = newUserArtistDF.groupBy("userID").sum('playCount').orderBy('sum(playCount)', ascending=0).take(10)

y_pos = list(range(len(top10UsersByPlayCount)))
pdf = pd.DataFrame(data=top10UsersByPlayCount)

plt.barh(y_pos, pdf[1][::-1], align='center', alpha=0.4)
plt.yticks(y_pos, pdf[0][::-1])
plt.xlabel('Play Count') 
plt.ylabel('User')
plt.title('Top-10 Users ID per play counts')
plt.show()

In [None]:
top10UsersByCuriosity = (newUserArtistDF.dropDuplicates(['userID', 'artistID'])
                             .groupBy("userID")
                             .count()
                             .orderBy('count', ascending=0)
                             .take(10)
                         )

#print(top10UsersByCuriosity)
y_pos = range(len(top10UsersByCuriosity))

pdf = pd.DataFrame(data=top10UsersByCuriosity)

plt.barh(y_pos, pdf[1][::-1], align='center', alpha=0.4)
plt.yticks(y_pos, pdf[0][::-1])
plt.xlabel('Number of played artists')
plt.ylabel('User')
plt.title('Top-10 Users ID per Curiosity')
plt.show()

In section Data Integration, I already replaced the ids of mispelled artist IDs by the corresponding standard ids by using SPARK SQL API. However, if the data has the invalid entries such that SPARK SQL API is stuck, the best way to work with it is using an RDD.

Just as a recall, I work with three datasets in user_artist_data.txt, ` andartist_alias.txt`. The entries in these file can be empty or have only one field.

In details our goal now is:

-Read the input user_artist_data.txt and transforms its representation into an output dataset.
-To produce an output "tuple" containing the original user identifier and play counts, but with the artist identifier replaced by its most common alias, as found in the artist_alias.txt dataset.
-Since the artist_alias.txt file is small, we can use a technique called broadcast variables to make such transformation more efficient.
#### Load data from /dataset/artist_alias.txt and filter out the invalid entries to construct a dictionary to map from mispelled artists' ids to standard ids.
NOTE:

-From now on, we will use the "standard" data to train our model.

-If a line contains less than 2 fields or contains invalid numerial values, we can return a special tuple. After that, we can filter out these special tuples.

In [None]:
rawArtistAlias = sc.textFile(base + "artist_alias.txt")

def xtractFields(s):
    # Using white space or tab character as separetors,
    # split a line into list of strings 
    line = re.split("\s|\t",s,1)
    # if this line has at least 2 characters
    if (len(line) > 1):
        try:
            # try to parse the first and the second components to integer type
            return (int(line[0]), int(line[1]))
        except ValueError:
            # if parsing has any error, return a special tuple
            return (-1,-1)
    else:
        # if this line has less than 2 characters, return a special tuple
        return (-1,-1)

artistAlias = (
                rawArtistAlias
                    # extract fields using function xtractFields
                    .map( lambda x: xtractFields(x))

                    # fileter out the special tuples
                    .filter( lambda x: x != (-1,-1) )
                    # collect result to the driver as a "dictionary"
                    .collectAsMap()
                )

#### Prepare RDD userArtistDataRDD by replacing mispelled artists' ids to standard ids. Show 5 samples.
Using broadcast varible can help us increase the effiency.

In [None]:

bArtistAlias = sc.broadcast(artistAlias)
rawUserArtistData = sc.textFile(base + "user_artist_data.txt")

def disambiguate(line):
    [userID, artistID, count] = line.split(' ')
    finalArtistID = bArtistAlias.value.get(artistID,artistID)
    return (userID, finalArtistID,count)


userArtistDataRDD = rawUserArtistData.map(disambiguate)
userArtistDataRDD.take(5)

### 3.4 Training our statistical model
To train a model using ALS, we must use a preference matrix as an input. MLLIB uses the class Rating to support the construction of a distributed preference matrix.

#### Given RDD userArtistDataRDD in previous section, construct a new RDD called trainingData by tranforming each item of it into a Rating object.

In [None]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating


In [None]:
allData = userArtistDataRDD.map(lambda r: Rating( float(r[0]),float(r[1]),int(r[2]))) \
                                            .repartition(12).cache()
allData.take(5)


#### A model can be trained by using ALS.trainImplicit(<training data>, <rank>), where:
training data is the input data you decide to feed to the ALS algorithm
rank is the number of laten features
We can also use some additional parameters to adjust the quality of the model. Currently, let's set

- rank=10
- iterations=5
- lambda_=0.01
- alpha=1.0
 

In [None]:
rank=10
iterations=5
lambda_=0.01
alpha=1.0

#training
t0 = time()
model = ALS.trainImplicit(allData, rank)
t1 = time()
print("finish training model in %f secs" % (t1 - t0))


#### The trained model can be saved into HDFS for later use. This can be done via model.save(sc, <file_name>).
Let's use this function to store our model as name music_rec_model.spark.

In [None]:

#! hdfs dfs -rm -R -f -skipTrash lastfm_model.spark
#print('Delete old model')
model.save(sc,'music_rec_model.spark')
print('Save new model')


#### A saved model can be load from file by using MatrixFactorizationModel.load(sc, <file_name>).
Let's load the saved model from file.

In [None]:
t0 = time()
model = MatrixFactorizationModel.load(sc, 'music_rec_model.spark')
t1 = time()
print("finish loading model in %f secs" % (t1 - t0))


### Show the top-5 artist names recommendated for user 2093760.
The recommendations can be given by function recommendProducts(userID, num_recommendations). These recommendations are only artist ids. We have to map them to artist names by using data in artist_data.txt.

In [None]:
# Make five reccommendations to user 2093760
recommendations = (model.recommendProducts(2093760,5))
print(recommendations)
# construct set of recommendated artists
recArtist = set(rating[1] for rating in recommendations)
recArtist

In [None]:

# construct data of artists (artist_id, artist_name)

rawArtistData = sc.textFile(base + "artist_data.txt")

def xtractFields(s):
    line = re.split("\s|\t",s,1)
    if (len(line) > 1):
        try:
            return (int(line[0]), str(line[1].strip()))
        except ValueError:
            return (-1,"")
    else: 
        return (-1,"")

artistByID = rawArtistData.map(xtractFields).filter(lambda x: x[0] > 0)

In [None]:
# Filter in those artists, get just artist, and print
def artistNames(line):
#     [artistID, name]
    if (line[0] in recArtist):
        return True
    else:
        return False

recList = artistByID.filter(artistNames).values().collect()

print(recList)

#### IMPORTANT NOTE
At the moment, it is necessary to manually unpersist the RDDs inside the model when you are done with it. The following function can be used to make sure models are promptly uncached.

In [None]:
def unpersist(model):
    model.userFeatures().unpersist()
    model.productFeatures().unpersist()

# uncache data and model when they are no longer used  
unpersist(model)


### 3.5 Evaluating Recommendation Quality
In this section, we study how to evaluate the quality of our model. It's hard to say how good the recommendations are. One of serveral methods approach to evaluate a recommender based on its ability to rank good items (artists) high in a list of recommendations. The problem is how to define "good artists". Currently, by training all data, "good artists" is defined as "artists the user has listened to", and the recommender system has already received all of this information as input. It could trivially return the users previously-listened artists as top recommendations and score perfectly. Indeed, this is not useful, because the recommender's is used to recommend artists that the user has never listened to.

To overcome that problem, we can hide the some of the artist play data and only use the rest to train model. Then, this held-out data can be interpreted as a collection of "good" recommendations for each user. The recommender is asked to rank all items in the model, and the rank of the held-out artists are examined. Ideally the recommender places all of them at or near the top of the list.

The recommender's score can then be computed by comparing all held-out artists' ranks to the rest. The fraction of pairs where the held-out artist is ranked higher is its score. 1.0 is perfect, 0.0 is the worst possible score, and 0.5 is the expected value achieved from randomly ranking artists.

AUC(Area Under the Curve) can be used as a metric to evaluate model. It is also viewed as the probability that a randomly-chosen "good" artist ranks above a randomly-chosen "bad" artist.

Next, we split the training data into 2 parts: trainData and cvData with ratio 0.7:0.3 respectively, where trainData is the dataset that will be used to train model. Then we write a function to calculate AUC to evaluate the quality of our model.

#### Split the data into trainData and cvData with ratio 0.9:0.1 and use the first part to train a statistic model with:
- rank=10
- iterations=5
- lambda_=0.01
- alpha=1.0

In [None]:
trainData, cvData = allData.randomSplit([0.7,0.3],1)
trainData.cache()
cvData.cache()

### split the dataset to 70% for training and 30%

In [None]:
#training
t0 = time()
model = ALS.trainImplicit(ratings=trainData,rank=rank,iterations=iterations,lambda_=lambda_ ,alpha=alpha)
t1 = time()
print("finish training model in %f secs" % (t1 - t0))

#### Area under the ROC curve: a function to compute it

In [None]:
allItemIDs = np.array(allData.map(lambda x: x[1]).distinct().collect())
bAllItemIDs = sc.broadcast(allItemIDs)

In [None]:
from random import randint

# Depend on the number of item in userIDAndPosItemIDs,
# create a set of "negative" products for each user. These are randomly chosen
# from among all of the other items, excluding those that are "positive" for the user.
# NOTE 1: mapPartitions operates on many (user,positive-items) pairs at once
# NOTE 2: flatMap breaks the collections above down into one big set of tuples
def xtractNegative(userIDAndPosItemIDs):
    def pickEnoughNegatives(line):
        userID = line[0]
        posItemIDSet = set(line[1])
        #posItemIDSet = line[1]
        negative = []
        allItemIDs = bAllItemIDs.value
        # Keep about as many negative examples per user as positive. Duplicates are OK.
        i = 0
        while (i < len(allItemIDs) and len(negative) < len(posItemIDSet)):
            itemID = allItemIDs[randint(0,len(allItemIDs)-1)]
            if itemID not in posItemIDSet:
                negative.append(itemID)
            i += 1
        
        # Result is a collection of (user,negative-item) tuples
        return map(lambda itemID: (userID, itemID), negative)

    # Init an RNG and the item IDs set once for partition
    # allItemIDs = bAllItemIDs.value
    return map(pickEnoughNegatives, userIDAndPosItemIDs)

def ratioOfCorrectRanks(positiveRatings, negativeRatings):
    
    # find number elements in arr that has index >= start and has value smaller than x
    # arr is a sorted array
    def findNumElementsSmallerThan(arr, x, start=0):
        left = start
        right = len(arr) -1
        # if x is bigger than the biggest element in arr
        if start > right or x > arr[right]:
            return right + 1
        mid = -1
        while left <= right:
            mid = (left + right) // 2
            if arr[mid] < x:
                left = mid + 1
            elif arr[mid] > x:
                right = mid - 1
            else:
                while mid-1 >= start and arr[mid-1] == x:
                    mid -= 1
                return mid
        return mid if arr[mid] > x else mid + 1
    
    ## AUC may be viewed as the probability that a random positive item scores
    ## higher than a random negative one. Here the proportion of all positive-negative
    ## pairs that are correctly ranked is computed. The result is equal to the AUC metric.
    correct = 0 ## L
    total = 0 ## L
    
    # sorting positiveRatings array needs more cost
    #positiveRatings = np.array(map(lambda x: x.rating, positiveRatings))

    negativeRatings = list(map(lambda x:x.rating, negativeRatings))
    
    #np.sort(positiveRatings)
    negativeRatings.sort()# = np.sort(negativeRatings)
    total = len(positiveRatings)*len(negativeRatings)
    
    for positive in positiveRatings:
        # Count the correctly-ranked pairs
        correct += findNumElementsSmallerThan(negativeRatings, positive.rating)
        
    ## Return AUC: fraction of pairs ranked correctly
    return float(correct) / total

def calculateAUC(positiveData, bAllItemIDs, predictFunction):
    # Take held-out data as the "positive", and map to tuples
    positiveUserProducts = positiveData.map(lambda r: (r[0], r[1]))
    # Make predictions for each of them, including a numeric score, and gather by user
    positivePredictions = predictFunction(positiveUserProducts).groupBy(lambda r: r.user)
    
    # Create a set of "negative" products for each user. These are randomly chosen 
    # from among all of the other items, excluding those that are "positive" for the user. 
    negativeUserProducts = positiveUserProducts.groupByKey().mapPartitions(xtractNegative).flatMap(lambda x: x)
    # Make predictions on the rest
    negativePredictions = predictFunction(negativeUserProducts).groupBy(lambda r: r.user)
    
    return (
            positivePredictions.join(negativePredictions)
                .values()
                .map(
                    lambda positive_negativeRatings: ratioOfCorrectRanks(positive_negativeRatings[0], positive_negativeRatings[1])
                )
                .mean()
            )

#### Using part cvData and function calculateAUC to compute the AUC of the trained model.


In [None]:
t0 = time()
auc = calculateAUC( cvData,bAllItemIDs, model.predictAll)
t1 = time()
print("auc=",auc)
print("finish in %f seconds" % (t1 - t0))

### Now we have the AUC of our model, it’s helpful to benchmark this against a simpler approach. For example, consider recommending the globally most-played artists to every user. This is not personalized, but is simple and may be effective.
Implement this simple pupolarity-based prediction algorithm, evaluate its AUC score, and compare to the results achieved by the more sophisticated ALS algorithm.

In [None]:
bListenCount = sc.broadcast(trainData.map(lambda r: (r[1], r[2])).reduceByKey(lambda x,y: x+y).collectAsMap())
def predictMostListened(allData):
    return allData.map(lambda r: Rating(r[0], r[1], bListenCount.value.get( r[1], 0.0)))

In [None]:
auc = calculateAUC(cvData,bListenCount, predictMostListened)
print("AUC score:" + str(auc))

#### 3.6 Personalized recommendations with ALS: Hyperparameters tuning
In the previous section, we build our models with some given paramters without any knowledge about them. Actually, choosing the best parameters' values is very important. It can significantly affect the quality of models. Especially, with the current implementation of ALS in MLLIB, these parameters are not learned by the algorithm, and must be chosen by the caller. The following parameters should get consideration before training models:

rank = 10: the number of latent factors in the model, or equivalently, the number of columns $k$ in the user-feature and product-feature matrices. In non-trivial cases, this is also their rank.

iterations = 5: the number of iterations that the factorization runs. Instead of runing the algorithm until RMSE converged which actually takes very long time to finish with large datasets, we only let it run in a given number of iterations. More iterations take more time but may produce a better factorization.

lambda_ = 0.01: a standard overfitting parameter. Higher values resist overfitting, but values that are too high hurt the factorization's accuracy.

alpha = 1.0: controls the relative weight of observed versus unobserved userproduct interactions in the factorization.

Although all of them have impact on the models' quality, iterations is more of a constraint on resources used in the factorization. So, rank, lambda_ and alpha can be considered hyperparameters to the model. We will try to find "good" values for them. Indeed, the values of hyperparameter are not necessarily optimal. Choosing good hyperparameter values is a common problem in machine learning. The most basic way to choose values is to simply try combinations of values and evaluate a metric for each of them, and choose the combination that produces the best value of the metric.

### Grid Search
For simplicity, assume that we want to explore the following parameter space: $ rank \in \{10, 50\}$, $lambda\_ \in \{1.0, 0.0001\}$ and $alpha \in \{1.0, 40.0\}$.

Find the best combination of them in terms of the highest AUC value.

In [None]:

evaluations = []

for rank in [10, 50]:
    for lambda_ in [1.0, 0.0001]:
        for alpha in [1.0, 40.0]:
            print("Train model with rank=%d lambda_=%f alpha=%f" % (rank, lambda_, alpha))
            # with each combination of params, we should run multiple times and get avg
            # for simple, we only run one time.
            model = ALS.trainImplicit(ratings=trainData,rank=rank,iterations=5,lambda_=lambda_ ,alpha=alpha)
            auc = calculateAUC(cvData,bListenCount,model.predictAll)

            evaluations.append(((rank, lambda_, alpha), auc))

            unpersist(model)

evaluations.sort(key = lambda x: -x[1])
evalDataFrame = pd.DataFrame(data=evaluations)
print(evalDataFrame)

trainData.unpersist()
cvData.unpersist()

#### The combination of parameters that gets the highest AUC is: rank = 10 ; lambda = 1.0 ; alpha = 40
Using "optimal" hyper-parameters obtained, re-train the model and show top-5 artist names recommendated for user 2093760.

In [None]:

model = ALS.trainImplicit(ratings=trainData, rank=10, iterations=5, lambda_=1.0, alpha=40.0)
allData.unpersist()

userID = 2093760
recommendations = model.recommendProducts(userID,5)

recArtist = set(rating[1] for rating in recommendations)

# Filter in those artists, get just artist, and print
def artistNames(line):
#     [artistID, name]
    if (line[0] in recArtist):
        return True
    else:
        return False

recList = artistByID.filter(artistNames).values().collect()
print(recList)

unpersist(model)

it seems that the result of top 5 recommended artists does not change when we add more parameters to the model or modify them (such as lambda, alpha). I think that we should extend to top 50 or top 70 to see how the recommendation changing. Because I think that lambda and alpha parameters affect only when the rating of each artist approximate the mean of total ratings for one user (Top 5 artists have the much larger rating value than others, top 5 artists do not change with modified parameter)
This raises me the question that how strong the impact of those parameters? Is it really nessessary to put into our model when we just need to retrieve a small number of top artists (let's say, less than top 10)?
Additional work
1/ Changing the proportion of splitting data
In this experiment, I will change the percentage of training data to 50%, 80%, 90% and 99% respectively. The purpose is to observe the changing of AUC score

In [None]:
trainData, cvData = allData.randomSplit([0.5,0.5],1)
trainData.cache()
cvData.cache()

t0 = time()
model = ALS.trainImplicit(ratings=trainData,rank=rank,iterations=iterations,lambda_=lambda_ ,alpha=alpha)
t1 = time()
print("finish training model in %f secs" % (t1 - t0))

t0 = time()
auc = calculateAUC( cvData,bAllItemIDs, model.predictAll)
t1 = time()
print("auc=",auc)
print("finish in %f seconds" % (t1 - t0))

In [None]:
In [73]:
trainData, cvData = allData.randomSplit([0.8,0.2],1)
trainData.cache()
cvData.cache()

t0 = time()
model = ALS.trainImplicit(ratings=trainData,rank=rank,iterations=iterations,lambda_=lambda_ ,alpha=alpha)
t1 = time()
print("finish training model in %f secs" % (t1 - t0))

t0 = time()
auc = calculateAUC( cvData,bAllItemIDs, model.predictAll)
t1 = time()
print("auc=",auc)
print("finish in %f seconds" % (t1 - t0))

In [None]:
trainData, cvData = allData.randomSplit([0.9,0.1],1)
trainData.cache()
cvData.cache()

t0 = time()
model = ALS.trainImplicit(ratings=trainData,rank=rank,iterations=iterations,lambda_=lambda_ ,alpha=alpha)
t1 = time()
print("finish training model in %f secs" % (t1 - t0))

t0 = time()
auc = calculateAUC( cvData,bAllItemIDs, model.predictAll)
t1 = time()
print("auc=",auc)
print("finish in %f seconds" % (t1 - t0))

In [None]:
trainData, cvData = allData.randomSplit([0.99,0.01],1)
trainData.cache()
cvData.cache()

t0 = time()
model = ALS.trainImplicit(ratings=trainData,rank=rank,iterations=iterations,lambda_=lambda_ ,alpha=alpha)
t1 = time()
print("finish training model in %f secs" % (t1 - t0))

t0 = time()
auc = calculateAUC( cvData,bAllItemIDs, model.predictAll)
t1 = time()
print("auc=",auc)
print("finish in %f seconds" % (t1 - t0))

inish training model in 325.923175 secs
auc= 0.9856858729938448
finish in 16.474116 seconds
As the above results, we have:

- training 50% -> AUC: 0.9557431751634355
- training 80% -> AUC: 0.9633452139683835
- training 90% -> AUC: 0.9641551756487843
- training 99% -> AUC: 0.9672845649647493

This is obviouly that the proportion of splitting data does affect the result of the AUC score. The more training data, the higher AUC score. In my opinion, having higher AUC score does not mean that the model is working well in general. Because high percentage of training data may prone to overfitting.
###  Expand the the list of recommended artist
As my hypothesis, in this experiment, instead of retrieving top 5 artist, I try to retrieve top 50 and 70 artist with modified parameters to see the differences the result compare to the model with standard parameters
Model with standard parameters (lambda and alpha are set by default)

In [None]:
trainData, cvData = allData.randomSplit([0.7,0.3],1)
trainData.cache()
cvData.cache()

In [None]:

model50 = ALS.trainImplicit(ratings=trainData, rank=10, iterations=5)
trainData.unpersist()

userID = 2093760
recommendations50 = model50.recommendProducts(userID,50)

recArtist = set(rating[1] for rating in recommendations50)

recList50 = artistByID.filter(artistNames).values().collect()

print(recList50)

unpersist(model)

In [None]:
#Model with lamda = 0.0001 , alpha = 40
model50_1 = ALS.trainImplicit(ratings=trainData, rank=10, iterations=5, lambda_= 0.0001, alpha= 40.0)
trainData.unpersist()

userID = 2093760
recommendations50_1 = model50_1.recommendProducts(userID,50)

recArtist = set(rating[1] for rating in recommendations50_1)

recList50_1 = artistByID.filter(artistNames).values().collect()

print(recList50_1)

unpersist(model)

In [None]:
temp3 = [item for item in recList50 if item not in recList50_1]
print(temp3)
print("\n Number of different artists between standard model and model_1:" + str(len(temp3)))

In [None]:
#Model with lambda = 0.1, alpha = 1.0
model50_2 = ALS.trainImplicit(ratings=trainData, rank=10, iterations=5, lambda_= 0.1, alpha= 1.0)
trainData.unpersist()

userID = 2093760
recommendations50_2 = model50_2.recommendProducts(userID,50)

recArtist = set(rating[1] for rating in recommendations50_2)

recList50_2 = artistByID.filter(artistNames).values().collect()

print(recList50_2)

unpersist(model)

In [None]:
temp4 = [item for item in recList50 if item not in recList50_2]
print(temp4)
print("\n Number of different artists between standard model and model_2:" + str(len(temp4)))

 Conclusion: As the result has shown above, I dont need to expand the result list to top 70 artists anymore. It is clearly proving my hypothesis is true
 How we configure the lambda and alpha makes a huge impact on the retrieved result: In my example, the top artists that highly recommended to user="2093760" is ['50Cent', 'Snoopdog', Notorious B.I.G] (they appear in both standard model and modified models). Others artists may vary from different models.
 We may not need to include lambda and alpha to the model if we only retreive the top result less than 10 artists.
#### Proposed method:
 In my opinion: We should cut out outliers (top 5 artists). With this method, recommendation will be more diverse and accurate.
#### Summary
In this notebook, we introduce an algorithm to do matrix factorization and the way of using it to make recommendation. Further more, we studied how to build a large-scale recommender system on SPARK using ALS algorithm and evaluate its quality. Finally, a simple approach to choose good parameters is mentioned.