In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('Recommender Systems').getOrCreate()
spark

In [3]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [4]:
movies = spark.read.csv('./data/movielens_ratings.csv', 
                        header= True, 
                        inferSchema=True) 

In [5]:
movies.show(5)

+-------+------+------+
|movieId|rating|userId|
+-------+------+------+
|      2|   3.0|     0|
|      3|   1.0|     0|
|      5|   2.0|     0|
|      9|   4.0|     0|
|     11|   1.0|     0|
+-------+------+------+
only showing top 5 rows



In [6]:
movies.describe().show()

+-------+------------------+------------------+------------------+
|summary|           movieId|            rating|            userId|
+-------+------------------+------------------+------------------+
|  count|              1501|              1501|              1501|
|   mean| 49.40572951365756|1.7741505662891406|14.383744170552964|
| stddev|28.937034065088994| 1.187276166124803| 8.591040424293272|
|    min|                 0|               1.0|                 0|
|    max|                99|               5.0|                29|
+-------+------------------+------------------+------------------+



In [7]:
movies.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- userId: integer (nullable = true)



In [8]:
xTrain, xTest = movies.randomSplit((0.8, 0.2), seed= 42)

In [9]:
als = ALS(maxIter= 5, 
          regParam= 0.01, 
          userCol= 'userId', 
          itemCol= 'movieId', 
          ratingCol= 'rating')

In [10]:
model = als.fit(xTrain)

In [11]:
predictions = model.transform(xTest)

In [12]:
predictions.show()

+-------+------+------+------------+
|movieId|rating|userId|  prediction|
+-------+------+------+------------+
|     31|   1.0|    27|   2.5645566|
|     31|   1.0|     5|   1.8662523|
|     31|   1.0|    19|   2.0380602|
|     31|   3.0|    14|   1.8544002|
|     31|   1.0|     0|  0.30410588|
|     85|   3.0|     6|   2.1187246|
|     85|   4.0|     7|   2.6593423|
|     53|   1.0|    12|    2.629088|
|     78|   1.0|    28|  0.38350978|
|     34|   1.0|    16|   1.6747544|
|     34|   1.0|    15|   2.0311506|
|     34|   1.0|     0|     3.60704|
|     28|   3.0|     1|   2.9588513|
|     28|   1.0|     5|  0.84762955|
|     28|   1.0|     2|   -2.614057|
|     76|   1.0|     1|  0.26065665|
|     76|   1.0|    19|-0.024657637|
|     76|   3.0|     7|   0.5248914|
|     76|   1.0|    25|  -1.2432127|
|     76|   1.0|     2| -0.90144485|
+-------+------+------+------------+
only showing top 20 rows



In [13]:
evaluator = RegressionEvaluator(metricName= 'rmse', 
                                labelCol= 'rating',
                                predictionCol= 'prediction')

In [14]:
rmse = evaluator.evaluate(predictions)

In [15]:
print(f'RMSE : {rmse}')

RMSE : 1.6015854711844535


In [16]:
single_user = xTest.filter(xTest['userId'] == 11).select(['movieId', 'userId'])

In [17]:
single_user.show()

+-------+------+
|movieId|userId|
+-------+------+
|     11|    11|
|     32|    11|
|     59|    11|
|     62|    11|
|     66|    11|
|     67|    11|
+-------+------+



In [18]:
recommandations = model.transform(single_user)

In [19]:
recommandations.orderBy('prediction', ascending = False).show()

+-------+------+-----------+
|movieId|userId| prediction|
+-------+------+-----------+
|     66|    11|   3.241285|
|     11|    11|  2.9160783|
|     59|    11|  1.9382064|
|     67|    11|  1.4041697|
|     32|    11|-0.45979792|
|     62|    11| -0.6789422|
+-------+------+-----------+

