<a href="https://colab.research.google.com/github/vinija/musicRecommender/blob/main/The_notebook.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **WELCOME TO THIS NOTEBOOK**

In [9]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


Let's install pyspark

In [5]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 43 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 53.4 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=3b8c162b29610e39b5b441dc0e570788ddbe6cce1ecc0e1da915f7bebba7953f
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


Importing the modules

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, desc , col, max
from pyspark.ml.feature import  StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

Creating the spark session


In [7]:
spark = SparkSession.builder.appName("lastfm").getOrCreate()

# Loading the dataset

In [10]:
file_path = '/content/drive/MyDrive/dataset/dataset/dataset/listenings.csv'
df_listenings = spark.read.format('csv').option('header',True).option('inferSchema',True).load(file_path) #data frame, header will infer column types from csv
df_listenings.show()

+-----------+-------------+--------------------+---------------+--------------------+
|    user_id|         date|               track|         artist|               album|
+-----------+-------------+--------------------+---------------+--------------------+
|000Silenced|1299680100000|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|1299679920000|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|1299679440000|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|1299679200000|            Acapella|          Kelis|            Acapella|
|000Silenced|1299675660000|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|1297511400000|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|1294498440000|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|1292438340000|               ObZen|      Meshuggah|               ObZen|
|000Silenced|1292437740000|   Yama's Messengers|      


# Cleaning tables 

In [11]:
df_listenings = df_listenings.drop('date') #drop date and time col  
df_listenings.show()

+-----------+--------------------+---------------+--------------------+
|    user_id|               track|         artist|               album|
+-----------+--------------------+---------------+--------------------+
|000Silenced|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|            Acapella|          Kelis|            Acapella|
|000Silenced|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|               ObZen|      Meshuggah|               ObZen|
|000Silenced|   Yama's Messengers|         Gojira|The Way of All Flesh|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For No...|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For

In [12]:
df_listenings = df_listenings.na.drop()
df_listenings.show()

+-----------+--------------------+---------------+--------------------+
|    user_id|               track|         artist|               album|
+-----------+--------------------+---------------+--------------------+
|000Silenced|           Price Tag|       Jessie J|         Who You Are|
|000Silenced|Price Tag (Acoust...|       Jessie J|           Price Tag|
|000Silenced|Be Mine! (Ballad ...|          Robyn|            Be Mine!|
|000Silenced|            Acapella|          Kelis|            Acapella|
|000Silenced|   I'm Not Invisible|      The Tease|   I'm Not Invisible|
|000Silenced|Bounce (Feat NORE...|       MSTRKRFT|         Fist of God|
|000Silenced|Don't Stop The Mu...|        Rihanna|Addicted 2 Bassli...|
|000Silenced|               ObZen|      Meshuggah|               ObZen|
|000Silenced|   Yama's Messengers|         Gojira|The Way of All Flesh|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For No...|
|000Silenced|On the Brink of E...|   Napalm Death|Time Waits For

In [13]:
rows = df_listenings.count()
cols = len(df_listenings.columns)
print(rows,cols)

13758905 4



# Let's Perform some aggregation
to see how many times each user has listened to specific track


In [14]:
df_listenings_agg = df_listenings.select('user_id', 'track').groupby('user_id', 'track').agg(count('*').alias('count')).orderBy('user_id')
df_listenings_agg.show()

+-------+--------------------+-----+
|user_id|               track|count|
+-------+--------------------+-----+
| --Seph|Chelsea Hotel - L...|    1|
| --Seph|          Paris 2004|    7|
| --Seph|Airplanes [feat H...|    1|
| --Seph|              Monday|    1|
| --Seph|         The Embrace|    1|
| --Seph|Hungarian Dance No 5|    1|
| --Seph|       Life On Mars?|    1|
| --Seph|Belina (Original ...|    1|
| --Seph| White Winter Hymnal|    3|
| --Seph|               Leloo|    1|
| --Seph|      Hour for magic|    2|
| --Seph|Hungarian Rhapsod...|    1|
| --Seph|     The Way We Were|    1|
| --Seph| Air on the G String|    1|
| --Seph|Vestido Estampado...|    1|
| --Seph|        Window Blues|    1|
| --Seph|  California Waiting|    1|
| --Seph|       Phantom Pt II|    1|
| --Seph|Virus (Luke Fair ...|    1|
| --Seph|   Summa for Strings|    1|
+-------+--------------------+-----+
only showing top 20 rows



In [15]:
row = df_listenings_agg.count()
col = len(df_listenings_agg.columns)
print(row,col)

9930128 3


In [16]:
#we want to decrement our row size, its almost 10 million, lets reduce it inorder to process our data much faster
df_listenings_agg = df_listenings_agg.limit(20000)


# Let's convert the user id and track columns into unique integers




In [17]:
indexer = [StringIndexer(inputCol = col, outputCol = col + '_index').fit(df_listenings_agg) for col in list(set(df_listenings_agg.columns)- set(['count']))]

pipeline = Pipeline(stages = indexer)

data = pipeline.fit(df_listenings_agg).transform(df_listenings_agg)
data.show()

+-------+--------------------+-----+-----------+-------------+
|user_id|               track|count|track_index|user_id_index|
+-------+--------------------+-----+-----------+-------------+
| --Seph|          Nightmares|    1|    10600.0|         69.0|
| --Seph|Virus (Luke Fair ...|    1|    15893.0|         69.0|
| --Seph|Airplanes [feat H...|    1|      521.0|         69.0|
| --Seph|Belina (Original ...|    1|     3280.0|         69.0|
| --Seph|              Monday|    1|      334.0|         69.0|
| --Seph|Hungarian Dance No 5|    1|     7555.0|         69.0|
| --Seph|       Life On Mars?|    1|     1164.0|         69.0|
| --Seph|  California Waiting|    1|      195.0|         69.0|
| --Seph|       Phantom Pt II|    1|     1378.0|         69.0|
| --Seph|   Summa for Strings|    1|    13737.0|         69.0|
| --Seph|      Hour for magic|    2|     7492.0|         69.0|
| --Seph|Hungarian Rhapsod...|    1|     7556.0|         69.0|
| --Seph|     The Way We Were|    1|    14958.0|       

In [18]:
data = data.select('user_id_index', 'track_index', 'count').orderBy('user_id_index')



In [None]:
#data.show()

# Train and Test data

In [3]:
(training, test) = data.randomSplit([0.5,0.5])

NameError: ignored

# Let's Create our Model

In [2]:


USERID = "user_id_index"
TRACK = "track_index"
COUNT = "count"

als = ALS(maxIter = 5, regParam = 0.01, userCol = USERID, itemCol = TRACK, ratingCol = COUNT)
print(als)

model = als.fit(training)

# predictions = model.transform(test)

NameError: ignored


# Generate top 10 Track recommendations for each user

In [None]:
# os.environ['PYSPARK_DRIVER_PYTHON_OPTS']= "notebook"
# os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
# os.environ['PYSPARK_PYTHON'] = sys.executable

NameError: ignored

In [None]:
recs = model.recommendForAllUsers(10)

NameError: ignored

In [None]:
recs.show()

In [None]:
recs.take(1)