# Spark, MLlib and recommendation system based on last.fm data

# Task 1.
### Spark initialization and libraries

In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.sql.types import StructType, StructField, IntegerType
from pyspark.sql import SparkSession
spark = SparkSession.builder.config("spark.port.maxRetries", "60").getOrCreate()

print(spark.version)

3.3.0


# Task 2.
### Read data from csv files and create DataFrame structures

In [2]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# prepare structure for user artist data
struct_uad = StructType([ \
StructField("UserId", IntegerType(), True),
StructField("ArtistId", IntegerType(), True),
StructField("Plays", IntegerType(), True)])

# prepare structure for artist data
struct_ad = StructType([ \
StructField("AId", IntegerType(), True),
StructField("ArtistName", StringType(), True)])

# load data from user_artist_data.txt
df_uad = spark.read.format("csv") \
.option("header", "false") \
.option("delimiter", " ") \
.schema(struct_uad) \
.load("user_artist_data.txt")

# load data from artist_data.txt
df_ad = spark.read.format("csv") \
.option("header", "false") \
.option("delimiter", "\t") \
.schema(struct_ad) \
.load("artist_data.txt")

# Task 3.
### Display the first 10 rows from the df_uad collection by substituting the artist's ArtistName instead of its ArtistId identifier.

In [3]:
df_uad.join(df_ad, df_uad.ArtistId == df_ad.AId, "inner"). \
 select("UserId", "ArtistName", "Plays").show(10)

+-------+--------------------+-----+
| UserId|          ArtistName|Plays|
+-------+--------------------+-----+
|1059637|           Aerosmith|  238|
|1059637|     Edna's Goldfish|    1|
|1059637|The Mighty Mighty...|    1|
|1059637|        Foo Fighters|   11|
|1059637|  The Bouncing Souls|    1|
|1059637|       Alkaline Trio|  423|
|1059637|         The Beatles|    5|
|1059637|           Pennywise|    2|
|1059637|             Incubus|    2|
|1059637|         Bright Eyes|19129|
+-------+--------------------+-----+
only showing top 10 rows



# Task 4.
### Split the data from the df_uad collection into a training collection and a test collection in a ratio of 0.2 to 0.8. Display the number of elements of each collection.

In [4]:
[df_train, df_test] = df_uad.randomSplit([0.8, 0.2])

In [5]:
print("df_test length: " + str(df_test.count()))
print("df_train length: " + str(df_train.count()))
print("df_uad length: " + str(df_uad.count()))
print("df_ad length: " + str(df_ad.count()))

df_test length: 9924
df_train length: 39557
df_uad length: 49481
df_ad length: 30537


# Task 5.
### Build a model for the recommender system using the ALS (Alternating Least Squares) algorithm from the MLlib library and the training set.

In [6]:
# pip install numpy
# restart kernel

from pyspark.ml.recommendation import ALS

als = ALS( \
seed=327, \
implicitPrefs=True, \
rank = 10, \
maxIter = 5, \
userCol="UserId", \
itemCol="ArtistId", \
ratingCol="Plays", \
)
model = als.fit(df_train)

# Task 6.

### Run the model on the test collection. Additionally, set up the deletion of rows in the df_predictions collection that will contain null values.

In [7]:
model.setColdStartStrategy("drop")
df_predictions= model.transform(df_test)

# Task 7.

### Generate the top 10 recommendations found for each user using the recommendForAllUsers method of the ALS model class. Display the recommendations for the first few users and the recommendations for them.

In [8]:
rec4user10 = model.recommendForAllUsers(10)
rec4user10.show(5,250)

+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| UserId|                                                                                                                                                                                                 recommendations|
+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1001440|[{1004294, 2.4304168}, {1004296, 2.1118195}, {1004226, 2.110514}, {3111, 1.9429672}, {1004278, 1.894382}, {1002704, 1.8668932}, {3656, 1.6239448}, {1000781, 1.6211643}, {1002850, 1.6190549}, {313, 1.5811418}]|
|1021940|     [{1000024, 2.2526505}, {1007903, 2.199112}, {234, 2.188242}, {1004278, 1.8247613}, {1001048, 1.7763709}, {1014

# Task 8.

### View the top 5 recommendations found for two selected users: 1059637 and 2007381.

In [9]:
twoUsers = df_test.select('UserId').where('UserId == 1059637 OR UserId == 2007381')
model.recommendForUserSubset(twoUsers, 5).show(5, 250)

+-------+----------------------------------------------------------------------------------------------------+
| UserId|                                                                                     recommendations|
+-------+----------------------------------------------------------------------------------------------------+
|2007381|[{1004278, 2.822596}, {4496, 2.3065803}, {1003681, 1.9420762}, {1004296, 1.8899902}, {1, 1.8526235}]|
|1059637|    [{1854, 1.7078325}, {1002704, 1.7000285}, {1330, 1.6864382}, {3150, 1.6025684}, {304, 1.553655}]|
+-------+----------------------------------------------------------------------------------------------------+



# Task 9.

### Display recommendations for one selected user by replacing artist IDs with their names.

In [10]:
from pyspark.sql.functions import *

user = df_train.select('UserId').where('UserId == 1059637')
rec_for_user = model.recommendForUserSubset(user, 5)

rec_for_user.join(df_ad, expr("array_contains(recommendations.ArtistId, AId)")).show()

+-------+--------------------+-------+---------------+
| UserId|     recommendations|    AId|     ArtistName|
+-------+--------------------+-------+---------------+
|1059637|[{1854, 1.7078325...|1002704|         Atreyu|
|1059637|[{1854, 1.7078325...|    304|Ennio Morricone|
|1059637|[{1854, 1.7078325...|   1330|      Tori Amos|
|1059637|[{1854, 1.7078325...|   3150|     The Police|
|1059637|[{1854, 1.7078325...|   1854|    Linkin Park|
+-------+--------------------+-------+---------------+



# Task 10.

###  Check if the recommendations from Task 13 are accurate by displaying a list of artists this user has listened to most often.

In [11]:
df_test.where('UserId == 1042223').orderBy("Plays", ascending=False).show()

+-------+--------+-----+
| UserId|ArtistId|Plays|
+-------+--------+-----+
|1042223| 1002287| 2197|
|1042223| 1000413| 1204|
|1042223| 1011967|  526|
|1042223| 1006087|  406|
|1042223| 1080742|  359|
|1042223| 1022845|  350|
|1042223| 1002061|  334|
|1042223| 1014769|  309|
|1042223| 1000873|  273|
|1042223| 1000427|  266|
|1042223| 1000062|  246|
|1042223| 1001035|  241|
|1042223| 1000693|  213|
|1042223| 1160349|  213|
|1042223| 1002457|  195|
|1042223| 1000569|  162|
|1042223| 1000458|  161|
|1042223| 1004461|  158|
|1042223| 1016808|  141|
|1042223| 1001277|  140|
+-------+--------+-----+
only showing top 20 rows



# Task 11.

### Show the top 8 suggested user recommendations for each artist.

In [12]:
model.recommendForAllItems(8).show(8, 120)

+--------+------------------------------------------------------------------------------------------------------------------------+
|ArtistId|                                                                                                         recommendations|
+--------+------------------------------------------------------------------------------------------------------------------------+
|       1|[{2007381, 1.8526235}, {2000668, 1.7103702}, {1047812, 1.6991439}, {1070932, 1.6399794}, {1059765, 1.571018}, {104222...|
|      13|[{1029563, 0.9975569}, {1026084, 0.9834517}, {2023686, 0.95316607}, {1046559, 0.9491897}, {1059765, 0.7754498}, {2069...|
|      26|[{2023686, 0.95340776}, {1024631, 0.91846883}, {1026084, 0.80602825}, {2005710, 0.70534116}, {1072684, 0.68763524}, {...|
|      27|[{1059334, 0.36629048}, {1059637, 0.30299821}, {1070932, 0.2803811}, {1001440, 0.24842867}, {1072684, 0.2419595}, {10...|
|      28|[{1026084, 1.7549269}, {1072684, 1.2410318}, {1042223, 1.1404506},

# Task 12.

### Show the top 4 user recommendations found for three selected artists: 1205, 1007027, 1007735.

In [13]:
artists = df_train.where('ArtistId == 1205 OR ArtistId == 1007027 OR ArtistId == 1007735')
model.recommendForItemSubset(artists, 4).show(10,150)

+--------+----------------------------------------------------------------------------------------+
|ArtistId|                                                                         recommendations|
+--------+----------------------------------------------------------------------------------------+
| 1007735|[{1072684, 2.1886528}, {1059334, 2.1097171}, {1042223, 1.9921455}, {1058890, 1.6462091}]|
|    1205| [{1026084, 1.686818}, {1072684, 1.4231238}, {1059245, 1.4082575}, {1042223, 1.2573684}]|
| 1007027|[{1072684, 1.5570898}, {1059765, 1.4161333}, {1029563, 1.2807263}, {1042223, 1.1453718}]|
+--------+----------------------------------------------------------------------------------------+



# Task 13.

### Show top 6 user recommendations for five most listened to artists.

In [14]:
countPlaysByArtist = df_test.groupBy("ArtistId").sum("Plays")
topFiveArtists = countPlaysByArtist.select("ArtistId", col("sum(Plays)").alias("Plays")).orderBy("sum(Plays)", ascending=False).limit(5)
model.recommendForItemSubset(topFiveArtists, 6).show(10, 150)

+--------+---------------------------------------------------------------------------------------------------------------------------------------+
|ArtistId|                                                                                                                        recommendations|
+--------+---------------------------------------------------------------------------------------------------------------------------------------+
|    2823|       [{1026084, 1.5712866}, {1059765, 1.170742}, {1029563, 1.1288203}, {2023686, 1.101523}, {2010008, 1.068036}, {2007381, 1.065754}]|
| 1000094|    [{2020513, 1.6666232}, {2062243, 1.3838259}, {1072684, 1.3686398}, {1059765, 1.2664607}, {1046559, 1.247421}, {1031009, 1.1613114}]|
| 1250104|[{1026084, 0.9157724}, {1052054, 0.8655541}, {1070641, 0.75656307}, {1029563, 0.7108729}, {1024631, 0.68852544}, {1046559, 0.64743644}]|
| 1002095|   [{1035511, 1.1575044}, {1029563, 1.1462435}, {2030069, 1.1451075}, {1072684, 1.0785761}, {1059765, 1.0476

# Task 14.

### Show top 6 user recommendations for five most listened to artists.

In [15]:
countPlaysByUser = df_test.groupBy("UserId").agg(count("ArtistId").alias("Plays")).orderBy("Plays", ascending=False).limit(6)
model.recommendForUserSubset(countPlaysByUser, 10).show(10, 150)

+-------+------------------------------------------------------------------------------------------------------------------------------------------------------+
| UserId|                                                                                                                                       recommendations|
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------+
|1001440|[{1004294, 2.4304168}, {1004296, 2.1118195}, {1004226, 2.110514}, {3111, 1.9429672}, {1004278, 1.894382}, {1002704, 1.8668932}, {3656, 1.6239448}, ...|
|1024631|[{1000024, 2.419117}, {234, 2.1759033}, {1231740, 1.9924719}, {1001909, 1.917037}, {1006160, 1.815618}, {1026440, 1.7800384}, {1005820, 1.7629379},...|
|1035511|[{1004278, 3.708456}, {1004296, 3.0369194}, {1231740, 2.9717412}, {1000445, 2.249554}, {969, 2.11371}, {979, 1.9243355}, {1307, 1.9210337}, {100020...|
|1059334|[{1854, 2.5147612}, {1307

# Close Spark session

In [16]:
spark.stop()