# Recommendation System in Spark using ALS

The idea is to obtain data and convert into the form which ALS can interpret and generate top k recommendatations. The data of number of songs listened by the user pertaining to Genre, Artist and Language is taken mostly into consideration. The predictions from ALS model will be top k Genres, artists, and languages. Using these items, comparing with the top songs belonging to the item are recommended to the user. 

## Creating Spark session and importing libraries required for running the code.

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
import os
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer, IndexToString
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

def is_databricks():
    """The function determine the IDE used for running the code and fetches data accordingly"""
    # get the databricks runtime version
    db_env = os.getenv("DATABRICKS_RUNTIME_VERSION")
    
    # if running on data bricks
    if db_env != None:
        return True
    else:
        return False

def get_training_filename(data_file_name):    
    # if running on data bricks
    if is_databricks():
        # build the full path file name assuming data brick env
        full_path_name = "/FileStore/tables/%s" % data_file_name
    # else the data is assumed to be in the same dir as this notebook
    else:
        # Assume the student is running on their own computer and load the data
        # file from the same dir as this notebook
        full_path_name = data_file_name
    
    # return the full path file name to the caller
    return full_path_name

The below datafram has data of all songs that exist in our KKBox app data.

In [5]:
songs_df = spark.read.option('infer_schema','true').option('header','true').csv(
    get_training_filename('songs.csv'))
songs_df.show(5)

+--------------------+-----------+---------+-------------------+--------------------+-----------+--------+
|             song_id|song_length|genre_ids|        artist_name|            composer|   lyricist|language|
+--------------------+-----------+---------+-------------------+--------------------+-----------+--------+
|CXoTN1eb7AI+DntdU...|     247640|      465|張信哲 (Jeff Chang)|                董貞|     何啟弘|     3.0|
|o0kFgae9QtnYgRkVP...|     197328|      444|          BLACKPINK|TEDDY|  FUTURE BO...|      TEDDY|    31.0|
|DwVvVurfpuz+XPuFv...|     231781|      465|       SUPER JUNIOR|                null|       null|    31.0|
|dKMBWoZyScdxSkihK...|     273554|      465|              S.H.E|              湯小康|     徐世珍|     3.0|
|W3bqWd3T+VeHFzHAU...|     140329|      726|           貴族精選|         Traditional|Traditional|    52.0|
+--------------------+-----------+---------+-------------------+--------------------+-----------+--------+
only showing top 5 rows



In [5]:
# Total number of Genres
songs_df.select('genre_ids').distinct().count()

1047

In [6]:
# Total number of songs.
songs_df.count()

2296833

In [7]:
# Total number of artists
songs_df.select('artist_name').distinct().count()

222409

The below dataframe has data of all users (msno) and the songs they listened to (song_id) and the source where the song is played. Target column is 0 if the user listens to a song after a month of listening to first song and it is 1 if user listens to another song within a month of song listening event.

In [8]:
train_df = spark.read.option('infer_schema','true').option('header','true').csv(
    get_training_filename('train.csv'))
train_df.show(5)

+--------------------+--------------------+-----------------+-------------------+---------------+------+
|                msno|             song_id|source_system_tab| source_screen_name|    source_type|target|
+--------------------+--------------------+-----------------+-------------------+---------------+------+
|FGtllVqz18RPiwJj/...|BBzumQNXUHKdEBOB7...|          explore|            Explore|online-playlist|     1|
|Xumu+NIjS6QYVxDS4...|bhp/MpSNoqoxOIB+/...|       my library|Local playlist more| local-playlist|     1|
|Xumu+NIjS6QYVxDS4...|JNWfrrC7zNN7BdMps...|       my library|Local playlist more| local-playlist|     1|
|Xumu+NIjS6QYVxDS4...|2A87tzfnJTSWqD7gI...|       my library|Local playlist more| local-playlist|     1|
|FGtllVqz18RPiwJj/...|3qm6XTZ6MOCU11x8F...|          explore|            Explore|online-playlist|     1|
+--------------------+--------------------+-----------------+-------------------+---------------+------+
only showing top 5 rows



In [9]:
# Total number of distinct users 
train_df.select('msno').distinct().count()

30755

In [10]:
# Total number of rows in data.
train_df.count()

7377418

In [11]:
# Total number of listening events of users
train_df.where(train_df.target==1).count()

3714656

In [12]:
# Total number of listening events with last listening event occured a month ago
train_df.where(train_df.target==0).count()

3662762

We now create a sample of data to use because the number of rows to compute is huge for even spark and will take longer times and due to computational restrictions we would use sampling from the data to derive predictions. The same idea can be implemented for every other sample.

In [13]:
train_df_sample = train_df.sample(0.1,10)

In [14]:
# The data has even distribution of 1 and 0
train_df_sample.where(train_df_sample.target==1).count()/train_df_sample.count()

0.5036492390977118

In [15]:
train_df_sample.where(train_df_sample.target==0).count()/train_df_sample.count()

0.49635076090228825

## Data Preparation

The data now needs to be prepared for feeding into ALS recommender system.

In [17]:
# It is shown the number of songs listened by each user in the sample
sorted(train_df_sample.groupBy('msno').count().take(10))

[Row(msno='0ImlcZdGXwmJtYpwv7a3Hxwt+IhksSDz+ZYBFfaPANo=', count=31),
 Row(msno='8HdGEX5lHlImmy5Q6wMB0nWq8KxpCIlUz7Yq9t7rzdU=', count=40),
 Row(msno='MCmkCY0A7mQ9Bnam/Ln/bGhVzoyqai1eQ8TcugeH0PI=', count=11),
 Row(msno='VMn4RIfwYGqgC8Cxq39kaOLVGiR6T9+almmLGooO7Uk=', count=53),
 Row(msno='VNBKSYAB2rPCwRr9qF7k2DWahB5a7/vUOnCjdMNKQ4E=', count=8),
 Row(msno='buKhmin3ZO/WIqG2Tq27s5qzpz1rNuoXczuMA6Kj8hI=', count=23),
 Row(msno='cjrLqFlFZ5TlqIaiwzJMmde5o1nYKbNOGPA/FBR/rtU=', count=7),
 Row(msno='dLFvzIVbfES/pWPNYW4RP/NdP01kI3FGv/G2kpbO2sg=', count=37),
 Row(msno='jdJAk1ldbv+15mziwsvfc4/Hoz7kdW9b8F7OvY1Hog8=', count=38),
 Row(msno='vb2J2H2GlF6ensG79o/Gdnku4AqJ4OyBdvDzRrjgnd8=', count=30)]

In [19]:
# songs data
songs_df.show(5)

+--------------------+-----------+---------+-------------------+--------------------+-----------+--------+
|             song_id|song_length|genre_ids|        artist_name|            composer|   lyricist|language|
+--------------------+-----------+---------+-------------------+--------------------+-----------+--------+
|CXoTN1eb7AI+DntdU...|     247640|      465|張信哲 (Jeff Chang)|                董貞|     何啟弘|     3.0|
|o0kFgae9QtnYgRkVP...|     197328|      444|          BLACKPINK|TEDDY|  FUTURE BO...|      TEDDY|    31.0|
|DwVvVurfpuz+XPuFv...|     231781|      465|       SUPER JUNIOR|                null|       null|    31.0|
|dKMBWoZyScdxSkihK...|     273554|      465|              S.H.E|              湯小康|     徐世珍|     3.0|
|W3bqWd3T+VeHFzHAU...|     140329|      726|           貴族精選|         Traditional|Traditional|    52.0|
+--------------------+-----------+---------+-------------------+--------------------+-----------+--------+
only showing top 5 rows



In [20]:
# user and songs
train_df.show(5)

+--------------------+--------------------+-----------------+-------------------+---------------+------+
|                msno|             song_id|source_system_tab| source_screen_name|    source_type|target|
+--------------------+--------------------+-----------------+-------------------+---------------+------+
|FGtllVqz18RPiwJj/...|BBzumQNXUHKdEBOB7...|          explore|            Explore|online-playlist|     1|
|Xumu+NIjS6QYVxDS4...|bhp/MpSNoqoxOIB+/...|       my library|Local playlist more| local-playlist|     1|
|Xumu+NIjS6QYVxDS4...|JNWfrrC7zNN7BdMps...|       my library|Local playlist more| local-playlist|     1|
|Xumu+NIjS6QYVxDS4...|2A87tzfnJTSWqD7gI...|       my library|Local playlist more| local-playlist|     1|
|FGtllVqz18RPiwJj/...|3qm6XTZ6MOCU11x8F...|          explore|            Explore|online-playlist|     1|
+--------------------+--------------------+-----------------+-------------------+---------------+------+
only showing top 5 rows



In [21]:
# Genre ids have multiple entries in same cell which means a song belonging to multiple genres
songs_df.select('genre_ids').rdd.map(lambda x:x.genre_ids).take(15)

['465',
 '444',
 '465',
 '465',
 '726',
 '864|857|850|843',
 '458',
 '465',
 '465',
 '352|1995',
 '2157',
 '465',
 '726',
 '458',
 '359']

In [22]:
# Splitting into individual genre and repeating song entry
songs_df = songs_df.withColumn('genre',explode(split(songs_df.genre_ids, "\|")))
songs_df.select('genre').take(20)

[Row(genre='465'),
 Row(genre='444'),
 Row(genre='465'),
 Row(genre='465'),
 Row(genre='726'),
 Row(genre='864'),
 Row(genre='857'),
 Row(genre='850'),
 Row(genre='843'),
 Row(genre='458'),
 Row(genre='465'),
 Row(genre='465'),
 Row(genre='352'),
 Row(genre='1995'),
 Row(genre='2157'),
 Row(genre='465'),
 Row(genre='726'),
 Row(genre='458'),
 Row(genre='359'),
 Row(genre='359')]

Now, user and song data needs to merged to identify the genre, artists and language of songs

In [23]:
train_df_cut = train_df.join(songs_df, 'song_id')
train_df_cut.show(5)

+--------------------+--------------------+-----------------+-------------------+-------------------+------+-----------+---------+------------+--------+--------+--------+-----+
|             song_id|                msno|source_system_tab| source_screen_name|        source_type|target|song_length|genre_ids| artist_name|composer|lyricist|language|genre|
+--------------------+--------------------+-----------------+-------------------+-------------------+------+-----------+---------+------------+--------+--------+--------+-----+
|++XsB7MZl/8xl/d0/...|CUtaQQ1S5S03VKAed...|       my library|Local playlist more|      local-library|     0|     452092|      947|C'est La Vie| Honey B|    null|    -1.0|  947|
|++XsB7MZl/8xl/d0/...|EbFZYurZKp8cx+mUO...|           search|        Artist more|top-hits-for-artist|     0|     452092|      947|C'est La Vie| Honey B|    null|    -1.0|  947|
|++XsB7MZl/8xl/d0/...|RoSfblbwJN/izEnFI...|         discover|   Discover Feature|song-based-playlist|     1|     45

In [24]:
# Sampling from the merged data
train_cut_sample = train_df_cut.sample(0.01,3)

Now we can see how many songs in each Genre the user listened to

In [26]:
train_cut_sample.groupBy('msno','song_id','genre').count().take(10)

[Row(msno='RoSfblbwJN/izEnFIVw8TgOpm8R/NEpUC84Oz/b32HQ=', song_id='++XsB7MZl/8xl/d0/QFmZ4TWIRDKhAQ5saiqmjznmi4=', genre='947', count=1),
 Row(msno='iJvXG1nt/tdFPK8vKBqZMZ6e1X7c07QnyBX4d4eaTgs=', song_id='+KgAKlO7+zYsMtKzuTe3nH+iXXblehvzTNZIt4DXQ9U=', genre='2122', count=1),
 Row(msno='EfcNnWocQIAEn5gw6sHktzvZXBC9c+u2HpuzrX5leOA=', song_id='+tGKT7z6HuwhZa8mRI9hE+rQDgQFZhtVCGVrQtt7bVw=', genre='465', count=1),
 Row(msno='pXrqb5LRcKCG83LnFoa20jebHP4/gRnEvzq8Ik22jzY=', song_id='0yfLxyewedjmipOOlVzrr/Od05t5Sy+TeFQE3TWFVPA=', genre='465', count=1),
 Row(msno='flouUtxucgcV+QCOOCqv09HJCI77R36fhmTDG2ZLhwc=', song_id='1H7Za3rqrfGZhzWonEKkW3BR1LfJaWlErO8nNZaldZM=', genre='444', count=1),
 Row(msno='0D+9iiB5E2zcWdnMf6BmwoihK+oIqsnRvDog21wHnec=', song_id='1H7Za3rqrfGZhzWonEKkW3BR1LfJaWlErO8nNZaldZM=', genre='444', count=1),
 Row(msno='lc8/HRnjiaPsc/cZ6o6f6uKCAwn1opP43648Mx5dSe4=', song_id='1H7Za3rqrfGZhzWonEKkW3BR1LfJaWlErO8nNZaldZM=', genre='444', count=1),
 Row(msno='HZykoHL6T7YcO50XiCxQfrt+aQAjT

So the count of genre becomes the Genre interest of the user

In [27]:
# Genre Interest
user_genre = train_cut_sample.select('msno','genre').groupBy('msno','genre').count().rdd.map(lambda x: x).toDF(['User', 'genre', 'genre_interest'])
user_genre.orderBy('genre_interest', ascending=False).show(5)

+--------------------+-----+--------------+
|                User|genre|genre_interest|
+--------------------+-----+--------------+
|KGXNZ/H3VxvET/+rG...|  465|            30|
|MXIMDXO0j3UpaT7Fv...|  465|            28|
|cqjRBV/jWN2ujhc+z...|  465|            25|
|JkQacE3rvmhh65R04...|  465|            24|
|mGDObQQojFOJfK2rJ...|  465|            23|
+--------------------+-----+--------------+
only showing top 5 rows



Similarly we do for artist interest and language interest and the dataframes of each are shown below

In [28]:
# Artist interest
user_artist = train_cut_sample.select('msno','artist_name').groupBy('msno','artist_name').count().rdd.map(lambda x: x).toDF(['User', 'artist_name', 'artist_interest'])
user_artist.orderBy('artist_interest', ascending=False).show(5)

+--------------------+-------------------------+---------------+
|                User|              artist_name|artist_interest|
+--------------------+-------------------------+---------------+
|cg775T0DlPU4gthIR...|        江蕙 (Jody Jiang)|              6|
|nkSobo2abUYPOD4T2...|          五月天 (Mayday)|              6|
|2tmUzRCcD0l3et0ck...|証聲音樂圖書館 ECHO MUSIC|              6|
|MXIMDXO0j3UpaT7Fv...|証聲音樂圖書館 ECHO MUSIC|              6|
|l49SPru9KzNKnBIzi...|            吳兆南+魏龍豪|              6|
+--------------------+-------------------------+---------------+
only showing top 5 rows



In [29]:
# Language Interest
user_language = train_cut_sample.select('msno','language').groupBy('msno','language').count().rdd.map(lambda x: x).toDF(['User', 'language', 'language_interest'])
user_language.orderBy('language_interest', ascending=False).show(5)

+--------------------+--------+-----------------+
|                User|language|language_interest|
+--------------------+--------+-----------------+
|MXIMDXO0j3UpaT7Fv...|    52.0|               51|
|JkQacE3rvmhh65R04...|     3.0|               40|
|SZ5NNypqaTWljFO1H...|    52.0|               32|
|cqjRBV/jWN2ujhc+z...|     3.0|               31|
|KGXNZ/H3VxvET/+rG...|     3.0|               30|
+--------------------+--------+-----------------+
only showing top 5 rows



So, now the task is to find popular songs in each Genre to recommend to the user

In [98]:
# Aggregating the dataframe to find songs popular in each Genre
genre_pop = train_cut_sample.select('genre','song_id','msno').groupBy('genre','song_id').count().rdd.map(lambda x: x).toDF(['genre', 'song', 'genre_song_popularity'])
genre_pop.orderBy('genre_song_popularity', ascending=False).show(5)

+-----+--------------------+---------------------+
|genre|                song|genre_song_popularity|
+-----+--------------------+---------------------+
|  465|wBTWuHbjdjxnG1lQc...|                  143|
|  458|FynUyq0+drmIARmK1...|                  141|
|  458|reXuGcEWDDCnL0K3T...|                  139|
|  451|YN4T/yvvXtYrBVN8K...|                  135|
|  458|T86YHdD4C9JSc274b...|                  121|
+-----+--------------------+---------------------+
only showing top 5 rows



In [99]:
# Aggregating the dataframe to find songs popular for each artist
artist_pop = train_cut_sample.select('artist_name','song_id','msno').groupBy('artist_name','song_id').count().rdd.map(lambda x: x).toDF(['artist_name', 'song', 'artist_song_popularity'])
artist_pop.orderBy('artist_song_popularity', ascending=False).show(5)

+-------------+--------------------+----------------------+
|  artist_name|                song|artist_song_popularity|
+-------------+--------------------+----------------------+
|  Alan Walker|J4qKkLIoW7aYACuTu...|                   211|
|  Alan Walker|v/3onppBGoSpGsWb8...|                   186|
|  Alan Walker|zHqZ07gn+YvF36FWz...|                   145|
|田馥甄 (Hebe)|wBTWuHbjdjxnG1lQc...|                   143|
|  Eric 周興哲|FynUyq0+drmIARmK1...|                   141|
+-------------+--------------------+----------------------+
only showing top 5 rows



In [100]:
# Aggregating the dataframe to find songs popular in each language
language_pop = train_cut_sample.select('language','song_id','msno').groupBy('language','song_id').count().rdd.map(lambda x: x).toDF(['language', 'song', 'language_song_popularity'])
language_pop.orderBy('language_song_popularity', ascending=False).show(5)

+--------+--------------------+------------------------+
|language|                song|language_song_popularity|
+--------+--------------------+------------------------+
|    52.0|J4qKkLIoW7aYACuTu...|                     211|
|    52.0|v/3onppBGoSpGsWb8...|                     186|
|    52.0|zHqZ07gn+YvF36FWz...|                     145|
|     3.0|wBTWuHbjdjxnG1lQc...|                     143|
|     3.0|FynUyq0+drmIARmK1...|                     141|
+--------+--------------------+------------------------+
only showing top 5 rows



In [33]:
# Distinct Genres in the data
train_cut_sample.select('genre').distinct().count()

111

In [34]:
# Users in the data
train_cut_sample.select('msno').count()

75846

In [102]:
# User vs Genre interest and Artist interest and language interest
User_side = user_genre.join(user_artist, "User", 'outer').join(user_language, 'User','outer')
User_side.show(5)

+--------------------+-----+--------------+------------------+---------------+--------+-----------------+
|                User|genre|genre_interest|       artist_name|artist_interest|language|language_interest|
+--------------------+-----+--------------+------------------+---------------+--------+-----------------+
|+XsXM6G4UNrdF3AWA...| 1259|             1|              ZIZO|              1|    31.0|                1|
|+qr25BVs1befBijPS...|  921|             1|            玖壹壹|              1|     3.0|                2|
|+qr25BVs1befBijPS...|  921|             1|            玖壹壹|              1|    52.0|                2|
|+qr25BVs1befBijPS...|  921|             1|ZAYN| Taylor Swift|              1|     3.0|                2|
|+qr25BVs1befBijPS...|  921|             1|ZAYN| Taylor Swift|              1|    52.0|                2|
+--------------------+-----+--------------+------------------+---------------+--------+-----------------+
only showing top 5 rows



In [103]:
# Popular songs in each category
song_side = genre_pop.join(artist_pop, 'song', 'outer').join(language_pop, "song", 'outer')
song_side.show(5)

+--------------------+-----+---------------------+------------------+----------------------+--------+------------------------+
|                song|genre|genre_song_popularity|       artist_name|artist_song_popularity|language|language_song_popularity|
+--------------------+-----+---------------------+------------------+----------------------+--------+------------------------+
|++XsB7MZl/8xl/d0/...|  947|                    1|      C'est La Vie|                     1|    -1.0|                       1|
|+KgAKlO7+zYsMtKzu...| 2122|                    1|   Astrud Gilberto|                     1|    52.0|                       1|
|+tGKT7z6HuwhZa8mR...|  465|                    1|周華健 (Emil Chau)|                     1|     3.0|                       1|
|0yfLxyewedjmipOOl...|  465|                    1|陳零九 (Nine Chen)|                     1|     3.0|                       1|
|1H7Za3rqrfGZhzWon...|  444|                    3|      BANGTAN BOYS|                     3|    31.0|                

# Song Recommendations using ALS

We need to index the columns in the data to feed into the ALS

In [37]:
from pyspark.ml.feature import StringIndexer, IndexToString

In [38]:
indexer = StringIndexer(inputCol = "User", outputCol="UserIndex")

In [39]:
indexed = indexer.fit(user_genre).transform(user_genre).drop('User')

In [40]:
# we use this kind of dataframe for generating predictions and it is the format prefered for ALS
indexed.show(5)

+-----+--------------+---------+
|genre|genre_interest|UserIndex|
+-----+--------------+---------+
|  465|             6|    651.0|
| 2122|             2|    502.0|
|  465|             2|   3039.0|
|  465|             2|  16734.0|
|  458|             6|    158.0|
+-----+--------------+---------+
only showing top 5 rows



In [41]:
indexed = indexed.selectExpr('cast(genre as int) as genre_id', 'cast(UserIndex as int)', 'genre_interest')

In [42]:
indexed.printSchema()

root
 |-- genre_id: integer (nullable = true)
 |-- UserIndex: integer (nullable = true)
 |-- genre_interest: long (nullable = true)



In [43]:
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

In [44]:
# Splitting into training and testing
training, test = indexed.randomSplit([0.8,0.2])

In [45]:
# Initialising the ALS
als = ALS()\
.setMaxIter(5)\
.setRegParam(0.01)\
.setUserCol("UserIndex")\
.setItemCol("genre_id")\
.setRatingCol("genre_interest")

In [46]:
# Below are the hyperparameters can be added to ALS
print(als.explainParams())

alpha: alpha for implicit preference (default: 1.0)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext. (default: 10)
coldStartStrategy: strategy for dealing with unknown or new users/items at prediction time. This may be useful in cross-validation or production scenarios, for handling user/item ids the model has not seen in the training data. Supported values: 'nan', 'drop'. (default: nan)
finalStorageLevel: StorageLevel for ALS model factors. (default: MEMORY_AND_DISK)
implicitPrefs: whether to use implicit preference (default: False)
intermediateStorageLevel: StorageLevel for intermediate datasets. Cannot be 'NONE'. (default: MEMORY_AND_DISK)
itemCol: column name for item ids. Ids must be within the integer value range. (default: item, current: genre_id)
maxIter: max number of iterations (>= 

In [47]:
# Training the ALS
alsModel = als.fit(training)

In [48]:
# Generating Predictions
predictions = alsModel.transform(test)

In [49]:
# So for each user, we get 10 top recommended Genres
recomm_genre = alsModel.recommendForAllUsers(10).selectExpr('UserIndex', 'explode(recommendations) as recommended')
recomm_genre.show(10)

+---------+-----------------+
|UserIndex|      recommended|
+---------+-----------------+
|     1580| [545, 7.8692727]|
|     1580|[1995, 5.6934133]|
|     1580|[2086, 5.2582836]|
|     1580|  [850, 4.759304]|
|     1580|[1572, 4.7450814]|
|     1580|  [481, 4.554379]|
|     1580| [873, 4.4750085]|
|     1580|  [516, 4.389927]|
|     1580|  [465, 3.836222]|
|     1580| [388, 3.2870884]|
+---------+-----------------+
only showing top 10 rows



In [50]:
# Generated predictions
top_genre_user = recomm_genre.where(recomm_genre.UserIndex == 1580).rdd.map(lambda x: x.recommended).map(lambda x: x.genre_id).collect()
top_genre_user

[545, 1995, 2086, 850, 1572, 481, 873, 516, 465, 388]

In [104]:
# song recommendations
song_side.sort('genre', 'genre_song_popularity', ascending=False).where(song_side.genre == top_genre_user[1]).select('song').show(10)

+--------------------+
|                song|
+--------------------+
|UMzNsDKRVjcNUYhlT...|
|Sbz9GfHVOAERJA7o4...|
|1iHRL+cugxRVTz0Zb...|
|KnZV077x3kJoBnJm/...|
|d90L+EZxCshVGS4rU...|
|5DfRZqAreMU3ovigj...|
|lTvKbkQhBBLCYrVyR...|
|fh+mNN016KM5hFM7D...|
|9LaJP77sG3MTpRIDO...|
|sjXtOEX2/uwyf36Y5...|
+--------------------+
only showing top 10 rows



# Song recommendations based on Artist user is interested in

In [52]:
# converting columns via string indexing
indexer = StringIndexer(inputCol = "User", outputCol="UserIndex")
indexer_artist = StringIndexer(inputCol = 'artist_name', outputCol = 'artist_index')

In [53]:
indexed_artist = indexer.fit(user_artist).transform(user_artist)
indexed_artist = indexer_artist.fit(indexed_artist).transform(indexed_artist)

In [54]:
indexed_artist.show(5)

+--------------------+--------------------+---------------+---------+------------+
|                User|         artist_name|artist_interest|UserIndex|artist_index|
+--------------------+--------------------+---------------+---------+------------+
|GH5CcteDnxOzg5zkI...|  信樂團 (Shin Band)|              1|    630.0|       120.0|
|S7FV7CKTmx2ymejZJ...|      Rag'N'Bone Man|              1|    541.0|      1400.0|
|5fFQq48H5HIiyfYkk...|   Peter Paul & Mary|              1|  11529.0|      4717.0|
|K5x68e9t0PySwLAoe...|周湯豪 (NICKTHEREAL)|              1|   3066.0|        26.0|
|4Lxw2WOLQw7LBrp6W...|              詹雅雯|              1|   9940.0|       206.0|
+--------------------+--------------------+---------------+---------+------------+
only showing top 5 rows



In [83]:
indexed_artist_als = indexed_artist.selectExpr('cast(artist_index as int)', 'cast(UserIndex as int)', 'artist_interest', 'artist_name')
indexed_artist_als.show(5)

+------------+---------+---------------+--------------------+
|artist_index|UserIndex|artist_interest|         artist_name|
+------------+---------+---------------+--------------------+
|         120|      630|              1|  信樂團 (Shin Band)|
|        1400|      541|              1|      Rag'N'Bone Man|
|        4717|    11529|              1|   Peter Paul & Mary|
|          26|     3066|              1|周湯豪 (NICKTHEREAL)|
|         206|     9940|              1|              詹雅雯|
+------------+---------+---------------+--------------------+
only showing top 5 rows



In [84]:
training_artist, test_artist = indexed_artist_als.randomSplit([0.8, 0.2])

In [85]:
als = ALS()\
.setMaxIter(5)\
.setRegParam(0.01)\
.setUserCol("UserIndex")\
.setItemCol("artist_index")\
.setRatingCol("artist_interest")

In [86]:
alsModel_artist = als.fit(training_artist)

In [87]:
predictions = alsModel_artist.transform(test_artist)

In [89]:
# Recommended artists(string indexed values)
recomm_artist = alsModel_artist.recommendForAllUsers(10).selectExpr('UserIndex', 'explode(recommendations) as recommended')
recomm_artist.show(10)

+---------+-----------------+
|UserIndex|      recommended|
+---------+-----------------+
|     1580| [1651, 4.933778]|
|     1580| [633, 4.1720943]|
|     1580|  [828, 3.925313]|
|     1580|[1781, 3.5492597]|
|     1580|[1559, 3.5222251]|
|     1580|[1094, 3.5146184]|
|     1580|[1118, 3.4517207]|
|     1580| [764, 3.4117632]|
|     1580|[1713, 3.3856668]|
|     1580| [2444, 3.355938]|
+---------+-----------------+
only showing top 10 rows



In [61]:
# string indexed artists
top_artist_user = recomm_artist.where(recomm_artist.UserIndex == 1580).rdd.map(lambda x: x.recommended).collect()#map(lambda x: x.artist_index).collect()
artist_user = sc.parallelize(top_artist_user).toDF()
artist_user.show(10)

+------------+------------------+
|artist_index|            rating|
+------------+------------------+
|         637| 4.602288246154785|
|         603|4.6009321212768555|
|        1603| 4.285262107849121|
|         497| 4.222316265106201|
|         479|  4.03449010848999|
|         686| 4.012558460235596|
|         971| 4.010550022125244|
|        1354| 3.862031936645508|
|         498| 3.829875946044922|
|         594|3.8296868801116943|
+------------+------------------+



In [105]:
# obtaining artist names from above
recomm_artists = indexed_artist.select(indexed_artist.artist_index.cast('int'), 'artist_name').join(artist_user, "artist_index", 'inner').distinct().rdd.map(
lambda x: x.artist_name).collect()
recomm_artists

['Muse',
 '約書亞樂團',
 'Richard Clayderman',
 'Eir Aoi (藍井エイル)',
 'SCANDAL',
 'High School Musical Original Soundtrack',
 'Gallant x Tablo x Eric Nam',
 'Ella Fitzgerald',
 'Nogizaka46 (乃木坂46)',
 '滾石金韻民歌百大精選']

In [122]:
# recommended songs based on artists
song_side.sort('artist_name', 'artist_song_popularity', ascending=False).where(song_side.artist_name == recomm_artists[1]).select('song').show(10)

+--------------------+
|                song|
+--------------------+
|s8KQo0qsDIS42y5xg...|
|sOYNImNuRdogfOvo9...|
|s8KQo0qsDIS42y5xg...|
|cR3/3Zf2wzp9uQteN...|
|t/HLyst7l4EjgPCuQ...|
|gJwLE8jSKMh2YEuLX...|
|QLDzqRYCPufbZve5u...|
|1fruU7bvpRMfDMkco...|
|2hyXkI5tyKL0oEkTS...|
|FHLjRKekX6BnbLjDj...|
+--------------------+
only showing top 10 rows



# Song recommendations based on language

In [143]:
indexer = StringIndexer(inputCol = "User", outputCol="UserIndex")

In [147]:
indexed_language = indexer.fit(user_language).transform(user_language)

In [148]:
indexed_language = indexed_language.select(indexed_language.language.cast('int'), 'UserIndex', 'language_interest')
indexed_language.show()

+--------+---------+-----------------+
|language|UserIndex|language_interest|
+--------+---------+-----------------+
|       3|    719.0|                7|
|      52|   1305.0|                3|
|      52|  15835.0|                2|
|       3|    614.0|                5|
|       3|   1077.0|                9|
|       3|   3951.0|                2|
|      52|    108.0|               11|
|      52|  11014.0|                1|
|       3|   7422.0|                1|
|       3|   9733.0|                5|
|      52|  12981.0|                1|
|      52|   7342.0|                1|
|      52|    615.0|                5|
|       3|  16188.0|                2|
|      -1|   3441.0|                1|
|       3|  13053.0|                3|
|       3|   7082.0|                4|
|      52|   7750.0|                4|
|       3|   9763.0|                2|
|      52|    510.0|                4|
+--------+---------+-----------------+
only showing top 20 rows



In [163]:
indexed_language = indexed_language.fillna({'language':'0'})

In [164]:
als = ALS()\
.setMaxIter(5)\
.setRegParam(0.01)\
.setUserCol("UserIndex")\
.setItemCol("language")\
.setRatingCol("language_interest")

In [165]:
training_language, test_language = indexed_language.randomSplit([0.8, 0.2])

In [167]:
alsModel_language = als.fit(training_language)

In [168]:
predictions = alsModel_language.transform(test_language)

In [171]:
# Top k languages recommended for the user
recomm_language = alsModel_language.recommendForAllUsers(10).selectExpr('UserIndex', 'explode(recommendations) as recommended')
recomm_language.show(10)

+---------+----------------+
|UserIndex|     recommended|
+---------+----------------+
|     1580|  [3, 3.7705626]|
|     1580| [52, 1.8470466]|
|     1580| [24, 0.9479289]|
|     1580|  [10, 0.895591]|
|     1580|[45, 0.89142084]|
|     1580| [59, 0.8420744]|
|     1580| [38, 0.6820886]|
|     1580|[31, 0.57250977]|
|     1580|[17, 0.54873055]|
|     1580|[-1, 0.36088538]|
+---------+----------------+
only showing top 10 rows



In [178]:
# Top 10 languages for a user
top_languages_user = recomm_language.where(recomm_language.UserIndex == 1580).rdd.map(lambda x: x.recommended).map(lambda x: x.language).collect()
top_languages_user

[3, 52, 24, 10, 45, 59, 38, 31, 17, -1]

In [179]:
# Recommended songs.
song_side.sort('language', 'language_song_popularity', ascending=False).where(song_side.language == top_languages_user[1]).select('song').show(10)

+--------------------+
|                song|
+--------------------+
|J4qKkLIoW7aYACuTu...|
|J4qKkLIoW7aYACuTu...|
|v/3onppBGoSpGsWb8...|
|v/3onppBGoSpGsWb8...|
|zHqZ07gn+YvF36FWz...|
|zHqZ07gn+YvF36FWz...|
|+LztcJcPEEwsikk6+...|
|IKMFuL0f5Y8c63Hg9...|
|9YYrODwrXpDcCjOJy...|
|9YYrODwrXpDcCjOJy...|
+--------------------+
only showing top 10 rows



The model is not evaluated using RMSE since we not completely have values for each user and the predictions since some users might not have rated the item. The recommendations are completely new songs which user havent discovered yet.

# Conclusion:

1. The ALS model is used widely by many companies for making recommendations but the ALS is not completely used as an only tool. It is combined with many collaborative filtering models to obtain near predictions of items.
2. Recommender systems are difficult to evaluate: if some classical metrics such that MSE, accuracy, recall or precision can be used, one should keep in mind that some desired properties such as diversity (serendipity) and explainability can’t be assessed this way ; real conditions evaluation (like A/B testing or sample testing) is finally the only real way to evaluate a new recommender system but requires a certain confidence in the model