# Music Recommender System Using Pyspark

Nowadays, recommender systems are everywhere. for example, Amazon uses recommender systems to suggest some products that you might be interested in based on the products you've bought earlier. Or Spotify will suggest new tracks based on the songs you use to listen to every day. Most of these recommender systems use some algorithms which are based on Matrix factorization such as NMF( NON NEGATIVE MATRIX FACTORIZATION) or ALS (Alternating Least Square).

So in this Project, we are going to use ALS Algorithm to create a Music Recommender system to suggest new tracks to different users based upon the songs they've been listening to.

### Import Libraries

In [1]:
# Importing the modules
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, desc , col, max
from pyspark.ml.feature import  StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

In [2]:
# Creating Spark Session
spark = SparkSession.builder.appName("lastfm").getOrCreate()

### Load Dataset

In [17]:
# Load the dataset

df = spark.read.format("csv").option("header", True).option("inferSchema", True).load('listenings.csv')
df.show()
print("Successfully Data Read")

+-----------+-------------+--------------------+---------------+--------------------+
|    user_id|         date|               track|         artist|               album|
+-----------+-------------+--------------------+---------------+--------------------+
|000Silenced|1299680100000|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|1299679920000|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|1299679440000|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|1299679200000|            Acapella|          Kelis|            Acapella|
|000Silenced|1299675660000|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|1297511400000|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|1294498440000|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|1292438340000|               ObZen|      Meshuggah|               ObZen|
|000Silenced|1292437740000|   Yama's Messengers|      

### Data Clean

In [18]:
# Drop Date Column in Dataset
df = df.drop("date")
df.show()

+-----------+--------------------+---------------+--------------------+
|    user_id|               track|         artist|               album|
+-----------+--------------------+---------------+--------------------+
|000Silenced|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|            Acapella|          Kelis|            Acapella|
|000Silenced|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|               ObZen|      Meshuggah|               ObZen|
|000Silenced|   Yama's Messengers|         Gojira|The Way of All Flesh|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For No...|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For

In [19]:
# Drop NaN Values in Dataset
df = df.na.drop()
df.show()

+-----------+--------------------+---------------+--------------------+
|    user_id|               track|         artist|               album|
+-----------+--------------------+---------------+--------------------+
|000Silenced|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|            Acapella|          Kelis|            Acapella|
|000Silenced|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|               ObZen|      Meshuggah|               ObZen|
|000Silenced|   Yama's Messengers|         Gojira|The Way of All Flesh|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For No...|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For

In [20]:
# Count Rows & Coulmns

row_numbers = df.count()
column_numbers = len(df.columns)
print(row_numbers, column_numbers)

13758905 4


### Perform Some Aggregation

In [21]:
df_agr = df.select("user_id", "track").groupby("user_id", "track").agg(count("*").alias("count")).orderBy("user_id")
df_agr.show()

+-------+--------------------+-----+
|user_id|               track|count|
+-------+--------------------+-----+
| --Seph|In the Nothing of...|    2|
| --Seph|        Window Blues|    1|
| --Seph|               Julia|    1|
| --Seph|Hungarian Rhapsod...|    1|
| --Seph|     The Way We Were|    1|
| --Seph|Vestido Estampado...|    1|
| --Seph|         The Embrace|    1|
| --Seph|      Hour for magic|    2|
| --Seph|       Life On Mars?|    1|
| --Seph| Air on the G String|    1|
| --Seph|   Summa for Strings|    1|
| --Seph|              Monday|    1|
| --Seph|Belina (Original ...|    1|
| --Seph| White Winter Hymnal|    3|
| --Seph|          Paris 2004|    7|
| --Seph|       Phantom Pt II|    1|
| --Seph|Airplanes [feat H...|    1|
| --Seph|  California Waiting|    1|
| --Seph|Hungarian Dance No 5|    1|
| --Seph|Virus (Luke Fair ...|    1|
+-------+--------------------+-----+
only showing top 20 rows



In [22]:
row_numbers = df_agr.count()
column_numbers = len(df_agr.columns)
print(row_numbers, column_numbers)

9930128 3


In [23]:
df_agr = df_agr.limit(50000)

### Convert The User_Id & Track Column Into Unique Integers

In [25]:
indexer = [StringIndexer(inputCol=col, outputCol=col+'_index').fit(df_agr) for col in list(set(df_agr.columns) - set(['count']))]

pipeline = Pipeline(stages=indexer)
data = pipeline.fit(df_agr).transform(df_agr)
data.show()

+-------+--------------------+-----+-----------+-------------+
|user_id|               track|count|track_index|user_id_index|
+-------+--------------------+-----+-----------+-------------+
| --Seph|Virus (Luke Fair ...|    1|    35316.0|        171.0|
| --Seph| White Winter Hymnal|    3|      213.0|        171.0|
| --Seph|Airplanes [feat H...|    1|     2603.0|        171.0|
| --Seph|Belina (Original ...|    1|     9436.0|        171.0|
| --Seph|              Monday|    1|     1866.0|        171.0|
| --Seph|Hungarian Dance No 5|    1|     3936.0|        171.0|
| --Seph|       Life On Mars?|    1|      297.0|        171.0|
| --Seph|  California Waiting|    1|     1335.0|        171.0|
| --Seph|       Phantom Pt II|    1|     4973.0|        171.0|
| --Seph|      Hour for magic|    2|    18143.0|        171.0|
| --Seph|   Summa for Strings|    1|    31010.0|        171.0|
| --Seph|Hungarian Rhapsod...|    1|    18281.0|        171.0|
| --Seph|     The Way We Were|    1|    33435.0|       

In [26]:
data = data.select("user_id_index", "track_index", "count").orderBy("user_id_index")
data.show()

+-------------+-----------+-----+
|user_id_index|track_index|count|
+-------------+-----------+-----+
|          0.0|    16801.0|    1|
|          0.0|     1114.0|    1|
|          0.0|    26200.0|    1|
|          0.0|    36046.0|    1|
|          0.0|    11628.0|    1|
|          0.0|     3480.0|    1|
|          0.0|    28739.0|    1|
|          0.0|    11912.0|    1|
|          0.0|    29071.0|    1|
|          0.0|    26510.0|    1|
|          0.0|    35136.0|    1|
|          0.0|     3951.0|    1|
|          0.0|    18882.0|    1|
|          0.0|    29384.0|    1|
|          0.0|    23775.0|    1|
|          0.0|     8592.0|    1|
|          0.0|     9659.0|    1|
|          0.0|    12023.0|    1|
|          0.0|    11778.0|    1|
|          0.0|    14996.0|    1|
+-------------+-----------+-----+
only showing top 20 rows



### Train & Test Data

In [27]:
(training, test) = data.randomSplitplitSplitomSplitdomSplit([0.5, 0.5])

### Create Model

In [30]:
USERID = "user_id_index"
TRACK = "track_index"
COUNT = "count"

als = ALS(maxIter=5, regParam=0.01, userCol=USERID, itemCol=TRACK, ratingCol=COUNT)
model = als.fit(training)

In [31]:
# Prediction
prediction = model.transform(test)

### Generate Top 10 Recommendation For Each User

In [32]:
recommend = model.recommendForAllUsers(10)
recommend.show()

+-------------+--------------------+
|user_id_index|     recommendations|
+-------------+--------------------+
|          148|[{37408, 17.98018...|
|          243|[{7766, 7.363263}...|
|           31|[{19458, 16.71079...|
|          251|[{59, 21.859558},...|
|           85|[{28192, 16.81901...|
|          137|[{13575, 10.23010...|
|           65|[{13575, 9.84371}...|
|           53|[{19458, 18.22799...|
|          255|[{23666, 8.808033...|
|          133|[{23666, 25.55483...|
|          296|[{29295, 10.99125...|
|          322|[{23666, 26.89952...|
|           78|[{17709, 27.62168...|
|          321|[{27269, 12.46993...|
|          362|[{59, 17.534376},...|
|          155|[{17709, 17.73564...|
|          108|[{23666, 9.553433...|
|          211|[{23666, 24.41329...|
|          193|[{59, 21.473232},...|
|           34|[{59, 10.148405},...|
+-------------+--------------------+
only showing top 20 rows



In [36]:
recommend.take(1)

[Row(user_id_index=148, recommendations=[Row(track_index=37408, rating=17.980186462402344), Row(track_index=59, rating=8.915945053100586), Row(track_index=19458, rating=7.45954704284668), Row(track_index=1192, rating=7.213980197906494), Row(track_index=787, rating=7.069788932800293), Row(track_index=477, rating=5.499311923980713), Row(track_index=252, rating=5.364506244659424), Row(track_index=5321, rating=5.324398994445801), Row(track_index=28192, rating=5.189373016357422), Row(track_index=34417, rating=5.020595073699951)])]