## Configurando libs e dependencias

In [None]:
!pip install pyspark

In [None]:
!pip install findspark

In [4]:
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

In [5]:
spark = SparkSession.builder.master('local[*]').getOrCreate()

In [6]:
lines = spark.read.text("/content/drive/MyDrive/FIAP/Fase 03/sample_movielens_ratings.txt").rdd

In [7]:
parts = lines.map(lambda row: row.value.split("::"))

In [10]:
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]),
                                    movieId=int(p[1]),
                                     rating=float(p[2]),
                                     timestamp=int(p[3])))

In [11]:
ratings = spark.createDataFrame(ratingsRDD)

In [None]:
lines.collect()

In [12]:
ratings.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     0|      2|   3.0|1424380312|
|     0|      3|   1.0|1424380312|
|     0|      5|   2.0|1424380312|
|     0|      9|   4.0|1424380312|
|     0|     11|   1.0|1424380312|
|     0|     12|   2.0|1424380312|
|     0|     15|   1.0|1424380312|
|     0|     17|   1.0|1424380312|
|     0|     19|   1.0|1424380312|
|     0|     21|   1.0|1424380312|
|     0|     23|   1.0|1424380312|
|     0|     26|   3.0|1424380312|
|     0|     27|   1.0|1424380312|
|     0|     28|   1.0|1424380312|
|     0|     29|   1.0|1424380312|
|     0|     30|   1.0|1424380312|
|     0|     31|   1.0|1424380312|
|     0|     34|   1.0|1424380312|
|     0|     37|   1.0|1424380312|
|     0|     41|   2.0|1424380312|
+------+-------+------+----------+
only showing top 20 rows



In [14]:
(training, test) = ratings.randomSplit([0.8, 0.2])

In [16]:
als = ALS(maxIter=5,
          regParam=0.01,
          userCol="userId",
          itemCol="movieId",
          ratingCol="rating",
          coldStartStrategy="drop")

In [17]:
model = als.fit(training)

In [19]:
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating',
                                predictionCol='prediction')
rmse = evaluator.evaluate(predictions)
print("Erro médio quadrático = " + str(rmse))

Erro médio quadrático = 1.9019665108148909


In [20]:
userRec = model.recommendForAllUsers(10)

In [21]:
userRec.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|    20|[{22, 4.6087985},...|
|    10|[{46, 5.086785}, ...|
|     0|[{49, 3.96598}, {...|
|     1|[{53, 5.0424814},...|
|    21|[{49, 5.463806}, ...|
|    11|[{83, 6.09783}, {...|
|    12|[{85, 5.706329}, ...|
|    22|[{52, 6.3327794},...|
|     2|[{7, 5.3883266}, ...|
|    13|[{39, 3.8023062},...|
|     3|[{22, 4.800506}, ...|
|    23|[{49, 5.094453}, ...|
|     4|[{29, 4.0230165},...|
|    24|[{22, 5.2664003},...|
|    14|[{63, 5.021929}, ...|
|     5|[{55, 5.090911}, ...|
|    15|[{46, 4.8168697},...|
|    25|[{62, 4.24228}, {...|
|    26|[{90, 6.3747683},...|
|     6|[{25, 4.714786}, ...|
+------+--------------------+
only showing top 20 rows



In [22]:
movieRecs = model.recommendForAllItems(10)

In [23]:
movieRecs.show()

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|     20|[{5, 2.7796068}, ...|
|     40|[{2, 4.0709033}, ...|
|     10|[{26, 4.0654464},...|
|     50|[{11, 3.938213}, ...|
|     80|[{3, 4.136849}, {...|
|     70|[{21, 3.9405656},...|
|     60|[{22, 3.6726198},...|
|     90|[{26, 6.3747683},...|
|     30|[{11, 4.905892}, ...|
|      0|[{16, 2.7652712},...|
|     31|[{12, 3.687317}, ...|
|     81|[{28, 4.365523}, ...|
|     91|[{21, 4.0208364},...|
|      1|[{25, 2.9720063},...|
|     41|[{23, 4.1433153},...|
|     61|[{6, 2.667994}, {...|
|     51|[{26, 5.1060047},...|
|     21|[{17, 3.3865867},...|
|     11|[{16, 1.8616736},...|
|     71|[{16, 3.0007436},...|
+-------+--------------------+
only showing top 20 rows



In [24]:
users = ratings.select(als.getUserCol()).distinct()

In [26]:
users.show()

+------+
|userId|
+------+
|    26|
|    29|
|    19|
|     0|
|    22|
|     7|
|    25|
|     6|
|     9|
|    27|
|    17|
|    28|
|     5|
|     1|
|    10|
|     3|
|    12|
|     8|
|    11|
|     2|
+------+
only showing top 20 rows



In [27]:
UserRecsOnlyItemId = userRec.select(userRec['userId'],
                                    userRec['recommendations']['movieid'])

In [28]:
UserRecsOnlyItemId.show(10, False)

+------+----------------------------------------+
|userId|recommendations.movieid                 |
+------+----------------------------------------+
|20    |[22, 94, 90, 77, 75, 32, 62, 52, 36, 96]|
|10    |[46, 54, 85, 40, 94, 25, 92, 89, 69, 49]|
|0     |[49, 29, 2, 9, 92, 32, 62, 70, 7, 25]   |
|1     |[53, 90, 22, 17, 68, 52, 75, 62, 10, 77]|
|21    |[49, 53, 29, 2, 62, 9, 91, 70, 74, 7]   |
|11    |[83, 39, 44, 32, 69, 27, 79, 48, 65, 30]|
|12    |[85, 64, 17, 37, 35, 58, 16, 31, 50, 40]|
|22    |[52, 75, 74, 63, 88, 30, 22, 69, 64, 62]|
|2     |[7, 93, 83, 39, 79, 37, 40, 34, 92, 19] |
|13    |[39, 93, 76, 29, 72, 83, 38, 32, 74, 25]|
+------+----------------------------------------+
only showing top 10 rows

