### Download dataset
<b>Dataset location: </b>http://files.grouplens.org/datasets/hetrec2011/hetrec2011-lastfm-2k.zip <br /><br />
Given the number of times users have listened to songs of an artist, make artist recommendations for the user

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName('Use Implicit Collaborative Filtering for band recommendations') \
    .getOrCreate()

rawData = spark.read\
               .format('csv')\
               .option('delimiter', '\t')\
               .option('header', 'true')\
               .load('../datasets/lastfm/user_artists.dat')
                
rawData.toPandas().head()

Unnamed: 0,userID,artistID,weight
0,2,51,13883
1,2,52,11690
2,2,53,11351
3,2,54,10300
4,2,55,8983


#### Extract all the columns and cast the values as int

In [2]:
from pyspark.sql.functions import col

dataset = rawData.select(col('userID').cast('int'), 
                         col('artistID').cast('int'), 
                         col('weight').cast('int')
                        )

dataset

DataFrame[userID: int, artistID: int, weight: int]

#### Examine the weight field
This lists the number of times the user has listened to songs of that artist

In [3]:
dataset.select('weight').describe().toPandas()

Unnamed: 0,summary,weight
0,count,92834.0
1,mean,745.2439300256372
2,stddev,3751.32208038768
3,min,1.0
4,max,352698.0


#### Standardize the weight column
* In order to get recommendations using implicit feedback (such as number of times an artist has been listened to), we need to standardize the weight column
* Pyspark does not contain a built-in standardizer for scalar data (only for vectors) which is why we standardize the column values on our own

In [4]:
from pyspark.sql.functions import stddev, mean, col

df = dataset.select(mean('weight').alias('mean_weight'), 
               stddev('weight').alias('stddev_weight'))\
       .crossJoin(dataset)\
       .withColumn('weight_scaled' , 
                   (col('weight') - col('mean_weight')) / col('stddev_weight'))
        
df.toPandas().head()

Unnamed: 0,mean_weight,stddev_weight,userID,artistID,weight,weight_scaled
0,745.24393,3751.32208,2,51,13883,3.502167
1,745.24393,3751.32208,2,52,11690,2.917573
2,745.24393,3751.32208,2,53,11351,2.827205
3,745.24393,3751.32208,2,54,10300,2.547037
4,745.24393,3751.32208,2,55,8983,2.195961


#### Split the dataset into training and test sets

In [5]:
(trainingData, testData) = df.randomSplit([0.8, 0.2])

#### Define the ALS model
The metrics used to evaluate ALS models which use implicit feedback are:
* Mean Average Precision (MAP)
* Normalized Discounted Cumulative Gain (NDCG)

These are not part of Pyspark yet so will need to be implemented by us (not covered in this course)

In [6]:
from pyspark.ml.recommendation import ALS

als = ALS(maxIter=10, 
          regParam=0.1, 
          userCol='userID', 
          itemCol='artistID',
          implicitPrefs=True,
          ratingCol='weight_scaled',
          coldStartStrategy='drop')

model = als.fit(trainingData)


#### Get the predictions from our model on the test data

In [7]:
predictions = model.transform(testData)
predictions.toPandas().head()

Unnamed: 0,mean_weight,stddev_weight,userID,artistID,weight,weight_scaled,prediction
0,745.24393,3751.32208,1398,463,698,-0.012594,0.000576
1,745.24393,3751.32208,1137,463,77,-0.178136,0.0
2,745.24393,3751.32208,479,463,23,-0.192531,-0.002222
3,745.24393,3751.32208,1692,463,1018,0.072709,-0.000701
4,745.24393,3751.32208,487,463,170,-0.153344,0.008195


#### Examine the distribution of the original weights and the predictions

In [8]:
predictionsPandas = predictions.select('weight_scaled', 'prediction').toPandas()
predictionsPandas.describe()

Unnamed: 0,weight_scaled,prediction
count,16256.0,16256.0
mean,0.009621,0.042339
std,0.902777,0.100407
min,-0.198395,-0.651778
25%,-0.168539,0.0
50%,-0.125887,0.002403
75%,-0.026189,0.035174
max,53.959578,0.998437


#### Load the Artist information from the artists.dat file
This will be used to map the artistID listed in the recommendation with the actual artist name

In [9]:
artistData = spark.read\
               .format('csv')\
               .option('delimiter', '\t')\
               .option('header', 'true')\
               .load('../datasets/lastfm/artists.dat')
                
artistData.toPandas().head()

Unnamed: 0,id,name,url,pictureURL
0,1,MALICE MIZER,http://www.last.fm/music/MALICE+MIZER,http://userserve-ak.last.fm/serve/252/10808.jpg
1,2,Diary of Dreams,http://www.last.fm/music/Diary+of+Dreams,http://userserve-ak.last.fm/serve/252/3052066.jpg
2,3,Carpathian Forest,http://www.last.fm/music/Carpathian+Forest,http://userserve-ak.last.fm/serve/252/40222717...
3,4,Moi dix Mois,http://www.last.fm/music/Moi+dix+Mois,http://userserve-ak.last.fm/serve/252/54697835...
4,5,Bella Morte,http://www.last.fm/music/Bella+Morte,http://userserve-ak.last.fm/serve/252/14789013...


#### Define a function to get the artist recommendations
* Similar to what was done in the last exercise for movie recommendations
* Note how the joining of the artistData and artistsDF is a little different - the ids have different name in each table (artistID vs id)

In [10]:
from pyspark.sql.types import IntegerType

def getRecommendationsForUser(userId, numRecs):
    
    usersDF = spark.\
    createDataFrame([userId], IntegerType()).\
    toDF('userId')
    
    userRecs = model.recommendForUserSubset(usersDF, numRecs)
    
    artistsList = userRecs.collect()[0].recommendations
    artistsDF = spark.createDataFrame(artistsList)
    
    recommendedArtists = artistData\
    .join(artistsDF, 
          artistData.id == artistsDF.artistID)\
    .orderBy('rating', ascending=False)\
    .select('name', 'url', 'rating')
    
    return recommendedArtists

#### Get full recommendations for a particular userID

In [18]:
getRecommendationsForUser(939, 10).toPandas()

Unnamed: 0,name,url,rating
0,Avenged Sevenfold,http://www.last.fm/music/Avenged+Sevenfold,0.298474
1,30 Seconds to Mars,http://www.last.fm/music/30+Seconds+to+Mars,0.261807
2,My Chemical Romance,http://www.last.fm/music/My+Chemical+Romance,0.232822
3,Linkin Park,http://www.last.fm/music/Linkin+Park,0.210958
4,The Devil Wears Prada,http://www.last.fm/music/The+Devil+Wears+Prada,0.204236
5,Paramore,http://www.last.fm/music/Paramore,0.194978
6,A Day to Remember,http://www.last.fm/music/A+Day+to+Remember,0.189072
7,Escape The Fate,http://www.last.fm/music/Escape+The+Fate,0.187302
8,All Time Low,http://www.last.fm/music/All+Time+Low,0.177143
9,Bring Me The Horizon,http://www.last.fm/music/Bring+Me+The+Horizon,0.176463


In [19]:
userArtistRaw = dataset.filter(dataset.userID == 939)

userArtistsInfo = artistData.join(userArtistRaw, 
          artistData.id==userArtistRaw.artistID)\
    .orderBy('weight', ascending=False)\
    .select('name', 'weight')

userArtistsInfo.toPandas()


Unnamed: 0,name,weight
0,Avenged Sevenfold,10595
1,30 Seconds to Mars,1646
2,A Skylit Drive,1603
3,Sonic Syndicate,1439
4,Paramore,1184
5,Funeral for a Friend,1127
6,Light This City,1125
7,My Chemical Romance,1055
8,As I Lay Dying,1047
9,Dead by April,1044
