In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml.functions import vector_to_array
from pyspark.sql.functions import col, explode

## 1 - Create a Spark Session

In [2]:
spark = SparkSession.builder.appName('collaborative_filtering').master('yarn').getOrCreate()

23/01/27 14:03:36 WARN Utils: Your hostname, miray resolves to a loopback address: 127.0.1.1; using 172.24.33.252 instead (on interface wlo1)
23/01/27 14:03:36 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /home/mahdi/.ivy2/cache
The jars for the packages stored in: /home/mahdi/.ivy2/jars
org.apache.hbase#hbase-shaded-mapreduce added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-a7697366-6d9e-4e22-ae8e-c587c2f9e402;1.0
	confs: [default]


:: loading settings :: url = jar:file:/usr/local/spark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.apache.hbase#hbase-shaded-mapreduce;2.4.14 in local-m2-cache
	found org.slf4j#slf4j-api;1.7.33 in central
	found org.apache.htrace#htrace-core4;4.2.0-incubating in central
	found jakarta.annotation#jakarta.annotation-api;1.3.5 in central
	found jakarta.validation#jakarta.validation-api;2.0.2 in central
	found org.glassfish.hk2.external#jakarta.inject;2.6.1 in central
	found org.javassist#javassist;3.25.0-GA in central
	found org.apache.hadoop#hadoop-distcp;2.10.0 in central
	found org.apache.hadoop#hadoop-annotations;2.10.0 in central
	found org.apache.yetus#audience-annotations;0.5.0 in central
:: resolution report :: resolve 201ms :: artifacts dl 7ms
	:: modules in use:
	jakarta.annotation#jakarta.annotation-api;1.3.5 from central in [default]
	jakarta.validation#jakarta.validation-api;2.0.2 from central in [default]
	org.apache.hadoop#hadoop-annotations;2.10.0 from central in [default]
	org.apache.hadoop#hadoop-distcp;2.10.0 from central in [default]
	org.apache.hbase#hba

## 2 - Put the data into HDFS

In [3]:
!hdfs dfs -mkdir /user/homework4
!hdfs dfs -put games.csv /user/homework4/games.csv
!hdfs dfs -put ratings.csv /user/homework4/ratings.csv

mkdir: `/user/homework4': File exists
put: `/user/homework4/games.csv': File exists
put: `/user/homework4/ratings.csv': File exists


## 3 - Read the data and ceate dataframes

In [4]:
# Read and parse the data
games = spark.read.csv("/user/homework4/games.csv", header=True, inferSchema=True)
ratings = spark.read.csv("/user/homework4/ratings.csv", header=True, inferSchema=True)

                                                                                

In [5]:
games.show(5)

+-------+--------------------+------------+--------------------+--------------------+
|game_id|                name|release_date|             summary|          meta_score|
+-------+--------------------+------------+--------------------+--------------------+
|      1|The Legend of Zel...|   23-Nov-98|As a young boy, L...|                  99|
|      2|Tony Hawk's Pro S...|   20-Sep-00|As most major pub...|                  98|
|      3| Grand Theft Auto IV|   29-Apr-08|"[Metacritic's 20...| fresh off the bo...|
|      4|         SoulCalibur|    8-Sep-99|This is a tale of...|                  98|
|      5|  Super Mario Galaxy|   12-Nov-07|[Metacritic's 200...|                  97|
+-------+--------------------+------------+--------------------+--------------------+
only showing top 5 rows



In [6]:
ratings.show(5)

+-------+-------+------+
|game_id|user_id|rating|
+-------+-------+------+
|      1|    314|     5|
|      1|    439|     3|
|      1|    588|     5|
|      1|   1169|     4|
|      1|   1185|     4|
+-------+-------+------+
only showing top 5 rows



## 4 - Check the missing values

We see number of missing values in each column.

In [7]:
df = ratings.toPandas()
print(df.shape)
df.count()

(981548, 3)


game_id    981548
user_id    981548
rating     981548
dtype: int64

As we can see, there are no missing values in the data.

## 5 - Split the data into train and test

We give 80% of the data to train and 20% to test.

In [8]:
# Split the data into train and test
(training, test) = ratings.randomSplit([0.8, 0.2])

## 6 - Normalize the data

We normalize the data to have a better performance. First, we create a vector assembler to combine the features into a single vector. Then, we create a normalizer to normalize the data. 

In [9]:
# Create the normalizer objects
assembler = VectorAssembler(inputCols=["rating"], outputCol="features")
scaler = StandardScaler(inputCol="features", outputCol="scaledRating", withStd=True, withMean=True)
scalerModel = scaler.fit(assembler.transform(training))

                                                                                

In [10]:
# Normalize the training data
training = scalerModel.transform(assembler.transform(training))
training = training.drop("features")

# convert scaledRating to a float column
training = training.withColumn("scaledRating", vector_to_array("scaledRating").getItem(0).cast("float"))
training = training.drop("rating")
training = training.withColumnRenamed("scaledRating", "rating")
training.show(5)

[Stage 10:>                                                         (0 + 1) / 1]

+-------+-------+----------+
|game_id|user_id|    rating|
+-------+-------+----------+
|      1|    588| 1.1611812|
|      1|   1169|0.14537998|
|      1|   1185|0.14537998|
|      1|   2077|0.14537998|
|      1|   2487|0.14537998|
+-------+-------+----------+
only showing top 5 rows



                                                                                

In [11]:
# Normalize the test data
test = scalerModel.transform(assembler.transform(test))
test = test.drop("features")

# convert scaledRating to a float column
test = test.withColumn("scaledRating", vector_to_array("scaledRating").getItem(0).cast("float"))
test = test.drop("rating")
test = test.withColumnRenamed("scaledRating", "rating")
test.show(5)

+-------+-------+-----------+
|game_id|user_id|     rating|
+-------+-------+-----------+
|      1|    314|  1.1611812|
|      1|    439|-0.87042123|
|      1|   3922|  1.1611812|
|      1|  10140| 0.14537998|
|      1|  10335| 0.14537998|
+-------+-------+-----------+
only showing top 5 rows



                                                                                

## 7 - Create the ALS model

We create this model with the following parameters:

* maxIter: 5 (we use 5 iterations to train the model)
* regParam: 0.01 (we use regularization to avoid overfitting)
* coldStartStrategy: "drop" (we drop the missing values)

In [12]:
# Build the recommendation model using ALS on the training data
als = ALS(maxIter=5, regParam=0.01, userCol="user_id", itemCol="game_id", ratingCol="rating", coldStartStrategy="drop")
model = als.fit(training)

                                                                                

## 8 - Evaluate the model

We evaluate the model with the RMSE (Root Mean Squared Error).

In [13]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

                                                                                

Root-mean-square error = 1.174760009038952


## 9 - Make some recommendations for 2 users and 2 items

Here we can see the model can do both item-based and user-based recommendations.

In [14]:
# Generate top 10 game recommendations for each user and each item
userRecs = model.recommendForAllUsers(10)
gameRecs = model.recommendForAllItems(10)
userRecs.show(2)
gameRecs.show(2)

                                                                                

+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|      1|[{2302, 0.2719504...|
|      3|[{4772, 4.970701}...|
+-------+--------------------+
only showing top 2 rows





+-------+--------------------+
|game_id|     recommendations|
+-------+--------------------+
|      1|[{46305, 5.828892...|
|      3|[{34723, 9.687718...|
+-------+--------------------+
only showing top 2 rows



                                                                                

## 10 - Create functions to make recommendations for specific users and items

In [15]:
def recommend_for_user(user_id, num_recommendations):
    user = ratings.filter(ratings.user_id == user_id).select("user_id").distinct()
    recommendations = model.recommendForUserSubset(user, num_recommendations)
    recommendations = recommendations.select(explode("recommendations").alias("recommendations"))
    recommendations = recommendations.select("recommendations.game_id", "recommendations.rating")
    return recommendations.join(games, recommendations.game_id == games.game_id).select(games.game_id, games.name, recommendations.rating)

In [16]:
def recommend_for_item(game_id, num_recommendations):
    game = ratings.filter(ratings.game_id == game_id).select("game_id").distinct()
    recommendations = model.recommendForItemSubset(game, num_recommendations)
    recommendations = recommendations.select(explode("recommendations").alias("recommendations"))
    recommendations = recommendations.select("recommendations.user_id", "recommendations.rating")
    return recommendations

Some exmaples of recommendations for specific users and items.

In [21]:
recommend_for_user(10140, 5).show()

+-------+--------------------+---------+
|game_id|                name|   rating|
+-------+--------------------+---------+
|   3550|            Deathrow|2.6548383|
|   7104|         Risk System|2.5164847|
|   5330|         Tetris Axis|2.2537565|
|   5318|Majin and the For...|2.1659617|
|   9497|           Velocibox|2.1539714|
+-------+--------------------+---------+



                                                                                

In [18]:
recommend_for_item(1, 10).show()



+-------+---------+
|user_id|   rating|
+-------+---------+
|  46305| 5.828892|
|  51328| 5.551867|
|  41205| 5.319552|
|  45633|4.9871006|
|   3740|4.8282356|
|  42854|4.5203958|
|   4800|4.4558706|
|  10207|4.4459324|
|  34246| 4.420498|
|   6919|4.4186625|
+-------+---------+



                                                                                