# Importing Libraries 

In [1]:
from pyspark import SparkContext
SparkContext.setSystemProperty('spark.executor.memory', '15g')
sc = SparkContext("local", "App Name")

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, desc, col, max
from pyspark.ml.feature import StringIndexer
from pyspark.ml.param import Param, Params
from pyspark.ml import Pipeline
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

## Creating the spark session 

In [3]:
spark = SparkSession.builder.appName('lastfm').getOrCreate()

# Loading the Dataset

In [4]:
file_path = '/Users/taylorgonzalez/Downloads/Music Recommender Dataset/listenings.csv'
df_listenings = spark.read.format('csv').option('header', True).option('inferSchema', True).load(file_path)
df_listenings.show()

+-----------+-------------+--------------------+---------------+--------------------+
|    user_id|         date|               track|         artist|               album|
+-----------+-------------+--------------------+---------------+--------------------+
|000Silenced|1299680100000|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|1299679920000|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|1299679440000|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|1299679200000|            Acapella|          Kelis|            Acapella|
|000Silenced|1299675660000|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|1297511400000|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|1294498440000|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|1292438340000|               ObZen|      Meshuggah|               ObZen|
|000Silenced|1292437740000|   Yama's Messengers|      

# Cleaning Tables 

In [5]:
# Dropping date column
df_listenings = df_listenings.drop('date')
df_listenings.show()

+-----------+--------------------+---------------+--------------------+
|    user_id|               track|         artist|               album|
+-----------+--------------------+---------------+--------------------+
|000Silenced|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|            Acapella|          Kelis|            Acapella|
|000Silenced|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|               ObZen|      Meshuggah|               ObZen|
|000Silenced|   Yama's Messengers|         Gojira|The Way of All Flesh|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For No...|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For

In [6]:
# Dropping missing values in data 
df_listenings = df_listenings.na.drop()
df_listenings.show()

+-----------+--------------------+---------------+--------------------+
|    user_id|               track|         artist|               album|
+-----------+--------------------+---------------+--------------------+
|000Silenced|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|            Acapella|          Kelis|            Acapella|
|000Silenced|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|               ObZen|      Meshuggah|               ObZen|
|000Silenced|   Yama's Messengers|         Gojira|The Way of All Flesh|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For No...|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For

# Aggregation

In [7]:
row_numbers = df_listenings.count()
column_numbers = len(df_listenings.columns)
print(row_numbers, column_numbers)

13758905 4


In [8]:
# Ordering data by user_id and track with count per track
df_listenings_agg = df_listenings.select('user_id', 'track').groupby('user_id', 'track').agg(count('*').alias('count')).orderBy('user_id')
df_listenings_agg.show()

+-------+--------------------+-----+
|user_id|               track|count|
+-------+--------------------+-----+
| --Seph|Chelsea Hotel - L...|    1|
| --Seph|          Paris 2004|    7|
| --Seph|Airplanes [feat H...|    1|
| --Seph|              Monday|    1|
| --Seph|         The Embrace|    1|
| --Seph|Hungarian Dance No 5|    1|
| --Seph|       Life On Mars?|    1|
| --Seph|Belina (Original ...|    1|
| --Seph| White Winter Hymnal|    3|
| --Seph|               Leloo|    1|
| --Seph|      Hour for magic|    2|
| --Seph|Hungarian Rhapsod...|    1|
| --Seph|     The Way We Were|    1|
| --Seph| Air on the G String|    1|
| --Seph|Vestido Estampado...|    1|
| --Seph|        Window Blues|    1|
| --Seph|  California Waiting|    1|
| --Seph|       Phantom Pt II|    1|
| --Seph|Virus (Luke Fair ...|    1|
| --Seph|   Summa for Strings|    1|
+-------+--------------------+-----+
only showing top 20 rows



In [9]:
# Displaying newly formed data 
row_numbers = df_listenings_agg.count()
column_numbers = len(df_listenings_agg.columns)
print(row_numbers, column_numbers)

9930128 3


# Converting string columns into index columns 

In [10]:
# Creating a sample size to work with
df_listenings_agg = df_listenings_agg.limit(20000)

In [22]:
# Iterating over data and adding indexes per row 
indexer = [StringIndexer(inputCol=col, outputCol=col+'_index').setHandleInvalid("keep").fit(df_listenings_agg) for col in list(set(df_listenings_agg.columns) - set(['count']))]

pipeline = Pipeline(stages=indexer)

data = pipeline.fit(df_listenings_agg).transform(df_listenings_agg)
data.show()

+-------+--------------------+-----+-------------+-----------+
|user_id|               track|count|user_id_index|track_index|
+-------+--------------------+-----+-------------+-----------+
| --Seph|          Nightmares|    1|         69.0|    10600.0|
| --Seph|Virus (Luke Fair ...|    1|         69.0|    15893.0|
| --Seph|Airplanes [feat H...|    1|         69.0|      521.0|
| --Seph|Belina (Original ...|    1|         69.0|     3280.0|
| --Seph|              Monday|    1|         69.0|      334.0|
| --Seph|Hungarian Dance No 5|    1|         69.0|     7555.0|
| --Seph|       Life On Mars?|    1|         69.0|     1164.0|
| --Seph|  California Waiting|    1|         69.0|      195.0|
| --Seph|       Phantom Pt II|    1|         69.0|     1378.0|
| --Seph|   Summa for Strings|    1|         69.0|    13737.0|
| --Seph|      Hour for magic|    2|         69.0|     7492.0|
| --Seph|Hungarian Rhapsod...|    1|         69.0|     7556.0|
| --Seph|     The Way We Were|    1|         69.0|    1

In [23]:
data = data.select('user_id_index' ,'track_index','count').orderBy('user_id_index')

In [24]:
data.show()

+-------------+-----------+-----+
|user_id_index|track_index|count|
+-------------+-----------+-----+
|          0.0|    10628.0|    1|
|          0.0|     3338.0|    1|
|          0.0|    12168.0|    1|
|          0.0|    11626.0|    2|
|          0.0|    10094.0|    4|
|          0.0|      427.0|    1|
|          0.0|    16878.0|    1|
|          0.0|    11722.0|    1|
|          0.0|    15074.0|    1|
|          0.0|     1359.0|    1|
|          0.0|     5874.0|    1|
|          0.0|    11184.0|    1|
|          0.0|     2372.0|    2|
|          0.0|    14316.0|    1|
|          0.0|     5346.0|    1|
|          0.0|    11194.0|    1|
|          0.0|     2241.0|    1|
|          0.0|     2864.0|    1|
|          0.0|     2663.0|    4|
|          0.0|     6064.0|    1|
+-------------+-----------+-----+
only showing top 20 rows



# Train and Test Data

In [25]:
(training,test) = data.randomSplit([0.5,0.5])

# Create Model

In [26]:
USERID = 'user_id_index'
TRACK = 'track_index'
COUNT = 'count'

als = ALS(maxIter=5 , regParam=0.01, userCol=USERID, itemCol=TRACK, ratingCol=COUNT)
model = als.fit(training)

predictions = model.transform(test)

# Top 10 Track recommendations for each user 

In [27]:
recs = model.recommendForAllUsers(10)



In [28]:
recs.show()

+-------------+--------------------+
|user_id_index|     recommendations|
+-------------+--------------------+
|            0|[{1739, 11.687727...|
|            1|[{16908, 12.54488...|
|            2|[{11940, 13.24107...|
|            3|[{1739, 10.787095...|
|            4|[{461, 7.879473},...|
|            5|[{225, 9.314997},...|
|            6|[{11940, 42.61984...|
|            7|[{225, 12.201307}...|
|            8|[{11940, 12.63299...|
|            9|[{11940, 25.75350...|
|           10|[{12890, 8.892447...|
|           11|[{11940, 14.54535...|
|           12|[{308, 5.45276}, ...|
|           13|[{12890, 6.350863...|
|           14|[{308, 14.327518}...|
|           15|[{233, 11.043821}...|
|           16|[{1739, 11.146429...|
|           17|[{308, 8.705842},...|
|           18|[{308, 10.246171}...|
|           19|[{11940, 7.087863...|
+-------------+--------------------+
only showing top 20 rows



In [29]:
recs.take(1)

[Row(user_id_index=0, recommendations=[Row(track_index=1739, rating=11.687726974487305), Row(track_index=5292, rating=9.350181579589844), Row(track_index=3345, rating=8.483217239379883), Row(track_index=233, rating=8.144327163696289), Row(track_index=12060, rating=8.022287368774414), Row(track_index=1439, rating=7.7392168045043945), Row(track_index=2128, rating=7.078488349914551), Row(track_index=14537, rating=7.078488349914551), Row(track_index=2245, rating=7.012636184692383), Row(track_index=14792, rating=7.012636184692383)])]