In [13]:
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

import os, sys
os.environ['PYSPARK_DRIVER_PYTHON_OPTS']= "notebook"
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['HADOOP_HOME'] = "C:\\apps\hadoop"
os.environ['SPARK_HOME'] = "C:\\apps\spark-3.3.3-bin-hadoop3"

import findspark
findspark.init()

In [None]:
spark = SparkSession.builder.master('local[*]').getOrCreate()

# Read data

In [15]:
df = spark.read.text('./sample_movielens_ratings.txt')
df_rdd = spark.read.text('./sample_movielens_ratings.txt').rdd

df.show(5)
df_rdd.collect()

+--------------------+
|               value|
+--------------------+
| 0::2::3::1424380312|
| 0::3::1::1424380312|
| 0::5::2::1424380312|
| 0::9::4::1424380312|
|0::11::1::1424380312|
+--------------------+
only showing top 5 rows



[Row(value='0::2::3::1424380312'),
 Row(value='0::3::1::1424380312'),
 Row(value='0::5::2::1424380312'),
 Row(value='0::9::4::1424380312'),
 Row(value='0::11::1::1424380312'),
 Row(value='0::12::2::1424380312'),
 Row(value='0::15::1::1424380312'),
 Row(value='0::17::1::1424380312'),
 Row(value='0::19::1::1424380312'),
 Row(value='0::21::1::1424380312'),
 Row(value='0::23::1::1424380312'),
 Row(value='0::26::3::1424380312'),
 Row(value='0::27::1::1424380312'),
 Row(value='0::28::1::1424380312'),
 Row(value='0::29::1::1424380312'),
 Row(value='0::30::1::1424380312'),
 Row(value='0::31::1::1424380312'),
 Row(value='0::34::1::1424380312'),
 Row(value='0::37::1::1424380312'),
 Row(value='0::41::2::1424380312'),
 Row(value='0::44::1::1424380312'),
 Row(value='0::45::2::1424380312'),
 Row(value='0::46::1::1424380312'),
 Row(value='0::47::1::1424380312'),
 Row(value='0::48::1::1424380312'),
 Row(value='0::50::1::1424380312'),
 Row(value='0::51::1::1424380312'),
 Row(value='0::54::1::1424380312

Colunas separadas por '::', devem ser separadas utilizando a função map

# Columns and DataFrame creation

In [16]:
# Split string by column separator
parts = df_rdd.map(lambda row: row.value.split('::'))
parts.collect()

[['0', '2', '3', '1424380312'],
 ['0', '3', '1', '1424380312'],
 ['0', '5', '2', '1424380312'],
 ['0', '9', '4', '1424380312'],
 ['0', '11', '1', '1424380312'],
 ['0', '12', '2', '1424380312'],
 ['0', '15', '1', '1424380312'],
 ['0', '17', '1', '1424380312'],
 ['0', '19', '1', '1424380312'],
 ['0', '21', '1', '1424380312'],
 ['0', '23', '1', '1424380312'],
 ['0', '26', '3', '1424380312'],
 ['0', '27', '1', '1424380312'],
 ['0', '28', '1', '1424380312'],
 ['0', '29', '1', '1424380312'],
 ['0', '30', '1', '1424380312'],
 ['0', '31', '1', '1424380312'],
 ['0', '34', '1', '1424380312'],
 ['0', '37', '1', '1424380312'],
 ['0', '41', '2', '1424380312'],
 ['0', '44', '1', '1424380312'],
 ['0', '45', '2', '1424380312'],
 ['0', '46', '1', '1424380312'],
 ['0', '47', '1', '1424380312'],
 ['0', '48', '1', '1424380312'],
 ['0', '50', '1', '1424380312'],
 ['0', '51', '1', '1424380312'],
 ['0', '54', '1', '1424380312'],
 ['0', '55', '1', '1424380312'],
 ['0', '59', '2', '1424380312'],
 ['0', '61', '

In [21]:
ratings_rdd = parts.map(lambda x: Row(userId=int(x[0]),\
                                     movieId=int(x[1]),\
                                     rating=float(x[2]),\
                                     timestamp=int(x[3])))

ratings_rdd.collect()

[Row(userId=0, movieId=2, rating=3.0, timestamp=1424380312),
 Row(userId=0, movieId=3, rating=1.0, timestamp=1424380312),
 Row(userId=0, movieId=5, rating=2.0, timestamp=1424380312),
 Row(userId=0, movieId=9, rating=4.0, timestamp=1424380312),
 Row(userId=0, movieId=11, rating=1.0, timestamp=1424380312),
 Row(userId=0, movieId=12, rating=2.0, timestamp=1424380312),
 Row(userId=0, movieId=15, rating=1.0, timestamp=1424380312),
 Row(userId=0, movieId=17, rating=1.0, timestamp=1424380312),
 Row(userId=0, movieId=19, rating=1.0, timestamp=1424380312),
 Row(userId=0, movieId=21, rating=1.0, timestamp=1424380312),
 Row(userId=0, movieId=23, rating=1.0, timestamp=1424380312),
 Row(userId=0, movieId=26, rating=3.0, timestamp=1424380312),
 Row(userId=0, movieId=27, rating=1.0, timestamp=1424380312),
 Row(userId=0, movieId=28, rating=1.0, timestamp=1424380312),
 Row(userId=0, movieId=29, rating=1.0, timestamp=1424380312),
 Row(userId=0, movieId=30, rating=1.0, timestamp=1424380312),
 Row(userId=

In [24]:
ratings_df = spark.createDataFrame(ratings_rdd)

print(f'N lines: {ratings_df.count()}')
ratings_df.show()

N lines: 1501
+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     0|      2|   3.0|1424380312|
|     0|      3|   1.0|1424380312|
|     0|      5|   2.0|1424380312|
|     0|      9|   4.0|1424380312|
|     0|     11|   1.0|1424380312|
|     0|     12|   2.0|1424380312|
|     0|     15|   1.0|1424380312|
|     0|     17|   1.0|1424380312|
|     0|     19|   1.0|1424380312|
|     0|     21|   1.0|1424380312|
|     0|     23|   1.0|1424380312|
|     0|     26|   3.0|1424380312|
|     0|     27|   1.0|1424380312|
|     0|     28|   1.0|1424380312|
|     0|     29|   1.0|1424380312|
|     0|     30|   1.0|1424380312|
|     0|     31|   1.0|1424380312|
|     0|     34|   1.0|1424380312|
|     0|     37|   1.0|1424380312|
|     0|     41|   2.0|1424380312|
+------+-------+------+----------+
only showing top 20 rows



# Train and Test set split

In [27]:
train_set, test_set = ratings_df.randomSplit([0.8, 0.2])

print(f'N lines training: {train_set.count()}')
print(f'N lines test: {test_set.count()}')

N lines training: 1195
N lines test: 306


# ALS algorithm

In [30]:
als_model = ALS(maxIter=5, regParam=0.01,\
            userCol='userId', itemCol='movieId',\
            ratingCol='rating', coldStartStrategy='drop')

In [31]:
model = als_model.fit(train_set)

In [33]:
predictions = model.transform(test_set)

In [35]:
predictions.show()

+------+-------+------+----------+------------+
|userId|movieId|rating| timestamp|  prediction|
+------+-------+------+----------+------------+
|    28|      7|   1.0|1424380312|   0.7075938|
|    28|     23|   3.0|1424380312|   1.0342724|
|    28|     38|   2.0|1424380312|   1.3245434|
|    28|     49|   4.0|1424380312|   0.7454979|
|    28|     54|   1.0|1424380312|  0.46670848|
|    28|     57|   3.0|1424380312|   1.6913445|
|    28|     58|   1.0|1424380312|  0.43619207|
|    28|     63|   1.0|1424380312|   1.1125308|
|    28|     65|   1.0|1424380312|  -2.3221834|
|    28|     81|   5.0|1424380312|-0.049452484|
|    28|     85|   1.0|1424380312|   2.4463272|
|    28|     89|   4.0|1424380312|   4.2550488|
|    26|     22|   5.0|1424380312|  -1.7819904|
|    26|     27|   1.0|1424380312|    2.950621|
|    26|     35|   1.0|1424380312|   1.3410832|
|    26|     58|   1.0|1424380312|   2.2109134|
|    26|     61|   1.0|1424380312|   1.1850226|
|    26|     81|   3.0|1424380312|  -0.6

# Evaluate model

In [37]:
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')

rmse = evaluator.evaluate(predictions)
print(f'RMSE: {rmse}')

RMSE: 1.9867064619048382


# Recommend items for users

In [38]:
user_recommendations = model.recommendForAllUsers(10)

user_recommendations.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|    20|[{22, 4.762234}, ...|
|    10|[{94, 4.468815}, ...|
|     0|[{92, 4.126448}, ...|
|     1|[{54, 4.596189}, ...|
|    21|[{88, 5.4728146},...|
|    11|[{85, 7.584094}, ...|
|    12|[{55, 5.313814}, ...|
|    22|[{74, 5.0085053},...|
|     2|[{39, 5.2809434},...|
|    13|[{47, 3.8272634},...|
|     3|[{51, 4.8490534},...|
|    23|[{76, 5.474644}, ...|
|     4|[{25, 4.5183096},...|
|    24|[{96, 5.084022}, ...|
|    14|[{52, 4.8943124},...|
|     5|[{17, 5.36756}, {...|
|    15|[{46, 4.9984207},...|
|    25|[{83, 6.129487}, ...|
|    26|[{60, 5.571095}, ...|
|     6|[{62, 5.1006765},...|
+------+--------------------+
only showing top 20 rows



In [39]:
# Para cada usuário, tem-se uma lista com os 10 filmes mais relevantes
user_recommendations.collect()[0]['recommendations']

[Row(movieId=22, rating=4.762234210968018),
 Row(movieId=62, rating=4.19888973236084),
 Row(movieId=94, rating=4.1922078132629395),
 Row(movieId=20, rating=4.178473472595215),
 Row(movieId=75, rating=3.8430469036102295),
 Row(movieId=74, rating=3.7401721477508545),
 Row(movieId=69, rating=3.725153684616089),
 Row(movieId=2, rating=3.509711742401123),
 Row(movieId=51, rating=3.509058713912964),
 Row(movieId=77, rating=3.496901273727417)]

In [41]:
user_recommendations.count()

30

# Recommend users for movies

In [40]:
movies_recommendations = model.recommendForAllItems(10)

movies_recommendations.show(5)

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|     20|[{17, 4.908371}, ...|
|     40|[{2, 3.4968219}, ...|
|     10|[{23, 3.8588986},...|
|     50|[{23, 4.006258}, ...|
|     80|[{3, 3.964463}, {...|
+-------+--------------------+
only showing top 5 rows



# Create DF with only the movie recommendations

In [46]:
user_recommendations.select(user_recommendations['recommendations']['movieId']).show(5)

+-----------------------+
|recommendations.movieId|
+-----------------------+
|   [22, 62, 94, 20, ...|
|   [94, 23, 2, 27, 8...|
|   [92, 9, 7, 26, 24...|
|   [54, 62, 90, 20, ...|
|   [88, 29, 53, 18, ...|
+-----------------------+
only showing top 5 rows



In [48]:
user_recommendations_only_items = user_recommendations.select(user_recommendations['userId'], user_recommendations['recommendations']['movieId'].alias('movieId_recommendations'))

user_recommendations_only_items.show()

+------+-----------------------+
|userId|movieId_recommendations|
+------+-----------------------+
|    20|   [22, 62, 94, 20, ...|
|    10|   [94, 23, 2, 27, 8...|
|     0|   [92, 9, 7, 26, 24...|
|     1|   [54, 62, 90, 20, ...|
|    21|   [88, 29, 53, 18, ...|
|    11|   [85, 47, 48, 27, ...|
|    12|   [55, 49, 17, 27, ...|
|    22|   [74, 75, 22, 88, ...|
|     2|   [39, 83, 8, 93, 3...|
|    13|   [47, 93, 29, 85, ...|
|     3|   [51, 18, 80, 23, ...|
|    23|   [76, 55, 49, 32, ...|
|     4|   [25, 62, 70, 52, ...|
|    24|   [96, 52, 30, 90, ...|
|    14|   [52, 76, 90, 96, ...|
|     5|   [17, 46, 55, 22, ...|
|    15|   [46, 98, 26, 64, ...|
|    25|   [83, 39, 76, 96, ...|
|    26|   [60, 9, 41, 59, 2...|
|     6|   [62, 25, 47, 92, ...|
+------+-----------------------+
only showing top 20 rows

