# **WELCOME TO THIS NOTEBOOK**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


Let's install pyspark

In [None]:
!pip install pyspark==3.0

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


Importing the modules

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, desc , col, max
from pyspark.ml.feature import  StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

Creating the spark session


In [None]:
spark = SparkSession.builder.appName("lastfm").getOrCreate()

# Loading the dataset

In [None]:
file_path = '/content/drive/MyDrive/Colab Notebooks/dataset/listenings.csv'
df_listenings = spark.read.format('csv').option('header', True).option('inferSchema',True).load(file_path)
df_listenings.show()

+-----------+-------------+--------------------+---------------+--------------------+
|    user_id|         date|               track|         artist|               album|
+-----------+-------------+--------------------+---------------+--------------------+
|000Silenced|1299680100000|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|1299679920000|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|1299679440000|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|1299679200000|            Acapella|          Kelis|            Acapella|
|000Silenced|1299675660000|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|1297511400000|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|1294498440000|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|1292438340000|               ObZen|      Meshuggah|               ObZen|
|000Silenced|1292437740000|   Yama's Messengers|      

In [None]:
rows = df_listenings.count()
print(f"Number of rows : {rows}")
df_listenings.columns

Number of rows : 14650594


['user_id', 'date', 'track', 'artist', 'album']


# Cleaning tables 

In [None]:
df_listenings = df_listenings.drop('date')
df_listenings.show()

+-----------+--------------------+---------------+--------------------+
|    user_id|               track|         artist|               album|
+-----------+--------------------+---------------+--------------------+
|000Silenced|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|            Acapella|          Kelis|            Acapella|
|000Silenced|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|               ObZen|      Meshuggah|               ObZen|
|000Silenced|   Yama's Messengers|         Gojira|The Way of All Flesh|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For No...|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For

In [None]:
df_listenings = df_listenings.na.drop()
df_listenings.show()

+-----------+--------------------+---------------+--------------------+
|    user_id|               track|         artist|               album|
+-----------+--------------------+---------------+--------------------+
|000Silenced|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|            Acapella|          Kelis|            Acapella|
|000Silenced|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|               ObZen|      Meshuggah|               ObZen|
|000Silenced|   Yama's Messengers|         Gojira|The Way of All Flesh|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For No...|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For

In [None]:
rows_numbers = df_listenings.count()
colum_numbers = len(df_listenings.columns)
print(f'{rows_numbers, colum_numbers}')

(13758905, 4)


In [None]:
df_listenings.count()

13758905


# Let's Perform some aggregation
to see how many times each user has listened to specific track


In [None]:
#df_listenings_agg = df_listenings.select('user_id','track').groupby('user_id', 'track').agg(count('*').alias('count')).orderBy('user_id')
df_listenings_agg = df_listenings.select('user_id', 'track').groupby('user_id', 'track').agg(count('*').alias('count')).orderBy('user_id')
df_listenings_agg.show(30)

+-------+--------------------+-----+
|user_id|               track|count|
+-------+--------------------+-----+
| --Seph|           So Lonely|    2|
| --Seph|               Julia|    1|
| --Seph|Every Direction I...|    2|
| --Seph|        Window Blues|    1|
| --Seph|In the Nothing of...|    2|
| --Seph| The Riders of Rohan|    1|
| --Seph|         If It Works|    1|
| --Seph| Air on the G String|    1|
| --Seph|Vestido Estampado...|    1|
| --Seph|          Paris 2004|    7|
| --Seph|Chelsea Hotel - L...|    1|
| --Seph|          I Miss You|    1|
| --Seph|Sunset Soon Forgo...|    1|
| --Seph|   Barbados Carnival|    1|
| --Seph|      Fragile Meadow|    1|
| --Seph|       Phantom Pt II|    1|
| --Seph| White Winter Hymnal|    3|
| --Seph|Belina (Original ...|    1|
| --Seph|   Summa for Strings|    1|
| --Seph|Airplanes [feat H...|    1|
| --Seph|               Leloo|    1|
| --Seph|  California Waiting|    1|
| --Seph|         The Embrace|    1|
| --Seph|Virus (Luke Fair ...|    1|
|

In [None]:
rows_numbers = df_listenings_agg.count()
colum_numbers = len(df_listenings_agg.columns)
print(rows_numbers, colum_numbers)

9930128 3


In [None]:
df_listenings_agg = df_listenings_agg.limit(20000)

# Let's convert the user id and track columns into unique integers




In [None]:
# indexer = [StringIndexer(inputCol=col, outputCol=col+'_index').fit(df_listenings_agg) for col in list(set(df_listenings_agg.columns) - set(['count']))]
# pipeline = Pipeline(stages=indexer)
# data = pipeline.fit(df_listenings_agg).transform(df_listenings_agg)
# data.show()


indexer = [StringIndexer(inputCol=col, outputCol=col+'_index').fit(df_listenings_agg) for col in list(set(df_listenings_agg.columns) - set(['count']))]
pipeline = Pipeline(stages=indexer)
data = pipeline.fit(df_listenings_agg).transform(df_listenings_agg)
data.show()

+-------+--------------------+-----+-------------+-----------+
|user_id|               track|count|user_id_index|track_index|
+-------+--------------------+-----+-------------+-----------+
| --Seph| White Winter Hymnal|    3|         69.0|       59.0|
| --Seph|Virus (Luke Fair ...|    1|         69.0|    15896.0|
| --Seph|Airplanes [feat H...|    1|         69.0|      519.0|
| --Seph|Belina (Original ...|    1|         69.0|     3278.0|
| --Seph|              Monday|    1|         69.0|      334.0|
| --Seph|Hungarian Dance No 5|    1|         69.0|     7558.0|
| --Seph|       Life On Mars?|    1|         69.0|     1161.0|
| --Seph|  California Waiting|    1|         69.0|      197.0|
| --Seph|       Phantom Pt II|    1|         69.0|     1377.0|
| --Seph|   Summa for Strings|    1|         69.0|    13739.0|
| --Seph|      Hour for magic|    2|         69.0|     7495.0|
| --Seph|Hungarian Rhapsod...|    1|         69.0|     7559.0|
| --Seph|     The Way We Were|    1|         69.0|    1

In [None]:
new_data = data.select('user_id_index', 'track_index', 'count')
#data = data.select('user_id_index', 'track_index', 'count').orderBy('user_id_index')
# data = data.select('user_id_index','track_index', 'count').orderBy('user_id_index')
new_data.na.drop()
new_data.show()

+-------------+-----------+-----+
|user_id_index|track_index|count|
+-------------+-----------+-----+
|         69.0|       59.0|    3|
|         69.0|    15896.0|    1|
|         69.0|      519.0|    1|
|         69.0|     3278.0|    1|
|         69.0|      334.0|    1|
|         69.0|     7558.0|    1|
|         69.0|     1161.0|    1|
|         69.0|      197.0|    1|
|         69.0|     1377.0|    1|
|         69.0|    13739.0|    1|
|         69.0|     7495.0|    2|
|         69.0|     7559.0|    1|
|         69.0|    14960.0|    1|
|         69.0|     2455.0|    1|
|         69.0|    15850.0|    1|
|         69.0|     1840.0|    1|
|         69.0|     9057.0|    1|
|         69.0|    14387.0|    1|
|         69.0|    11313.0|    7|
|         69.0|     4183.0|    1|
+-------------+-----------+-----+
only showing top 20 rows



# Train and Test data

In [None]:
(training, test) = new_data.randomSplit([0.5,0.5])

# Let's Create our Model

In [None]:
USERID = 'user_id_index'
TRACK = 'track_index'
COUNT = 'count'

als = ALS(maxIter=5, regParam=0.01, userCol=USERID, itemCol=TRACK, ratingCol=COUNT)
model = als.fit(training)

predications = model.transform(test)

# USERID = 'user_id_index'
# TRACK = 'track_index'
# COUNT = 'count'

# als = ALS(maxIter=5, regParam=0.01 , userCol=abs(USERID), itemCol=TRACK, ratingCol=COUNT)
# model = als.fit(training)

# predications = model.transform(test)


# Generate top 10 Track recommendations for each user

In [None]:
recs = model.recommendForAllUsers(10)

In [None]:
recs.show()

+-------------+--------------------+
|user_id_index|     recommendations|
+-------------+--------------------+
|           31|[[1688, 7.8548727...|
|           85|[[568, 11.067766]...|
|          137|[[7849, 8.964636]...|
|           65|[[3343, 9.272263]...|
|           53|[[102, 8.618941],...|
|          133|[[7849, 17.996504...|
|           78|[[7849, 9.730888]...|
|          108|[[16969, 9.657956...|
|           34|[[5823, 6.2077866...|
|          101|[[954, 10.055234]...|
|          115|[[1738, 10.40938]...|
|          126|[[568, 13.990456]...|
|           81|[[7849, 10.934342...|
|           28|[[12892, 8.42659]...|
|           76|[[7849, 11.532579...|
|           26|[[1738, 8.236904]...|
|           27|[[177, 9.972997],...|
|           44|[[12892, 8.847261...|
|          103|[[954, 11.618872]...|
|           12|[[102, 7.1663237]...|
+-------------+--------------------+
only showing top 20 rows



In [None]:
recs.take(1)

[Row(user_id_index=31, recommendations=[Row(track_index=1688, rating=7.854872703552246), Row(track_index=15453, rating=7.547242641448975), Row(track_index=954, rating=6.902073383331299), Row(track_index=568, rating=6.883672714233398), Row(track_index=273, rating=6.391982078552246), Row(track_index=123, rating=6.133982181549072), Row(track_index=113, rating=5.981383323669434), Row(track_index=14108, rating=5.929976463317871), Row(track_index=9323, rating=5.768881320953369), Row(track_index=15776, rating=5.364698886871338)])]