In [2]:
from pyspark.sql import SparkSession

from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml.recommendation import ALS

from pyspark.sql import Row
 

In [3]:
spark = SparkSession\
        .builder\
        .appName("RecSys")\
        .config("spark.mongodb.read.connection.uri","mongodb://172.17.0.2:27017/rec.recomendacao")\
        .config("spark.mongodb.write.connection.uri","mongodb://172.17.0.2:27017/rec.recomendacao")\
        .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.3.0")\
        .getOrCreate()

In [11]:
linhas = spark.read.text("./dados/sample_movielens_ratings.txt").rdd
partes = linhas.map(lambda row: row.value.split("::"))
ratingsRDD = partes.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]), rating=float(p[2]), timestamp=int(p[3])))
ratings = spark.createDataFrame(ratingsRDD.collect())

In [12]:
type(ratings)

pyspark.sql.dataframe.DataFrame

In [13]:
ratings.show(10)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     0|      2|   3.0|1424380312|
|     0|      3|   1.0|1424380312|
|     0|      5|   2.0|1424380312|
|     0|      9|   4.0|1424380312|
|     0|     11|   1.0|1424380312|
|     0|     12|   2.0|1424380312|
|     0|     15|   1.0|1424380312|
|     0|     17|   1.0|1424380312|
|     0|     19|   1.0|1424380312|
|     0|     21|   1.0|1424380312|
+------+-------+------+----------+
only showing top 10 rows



In [15]:
(training, test) = ratings.randomSplit([0.8,0.2])

In [16]:
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
              coldStartStrategy="drop")
model = als.fit(training)

In [19]:
predicoes = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

In [20]:
erro = evaluator.evaluate(predicoes)
print("Erro médio quadrático = "+str(erro))

Erro médio quadrático = 1.848448167851217


In [21]:
userRecs = model.recommendForAllUsers(3)

In [22]:
userRecs.show(10,False)

+------+---------------------------------------------------+
|userId|recommendations                                    |
+------+---------------------------------------------------+
|20    |[{83, 5.294015}, {63, 4.963829}, {52, 4.8285}]     |
|10    |[{62, 4.8162794}, {93, 4.517762}, {29, 4.3242645}] |
|0     |[{62, 3.92786}, {9, 3.291734}, {40, 3.0559452}]    |
|1     |[{58, 5.31978}, {47, 4.5226946}, {25, 3.8565567}]  |
|21    |[{29, 5.1598487}, {53, 4.7203784}, {2, 3.8707504}] |
|11    |[{32, 5.254582}, {27, 5.0810575}, {30, 4.9302206}] |
|12    |[{9, 5.5025663}, {35, 5.0510454}, {55, 4.7976894}] |
|22    |[{22, 5.078688}, {75, 4.91083}, {88, 4.8385706}]   |
|2     |[{32, 5.9938793}, {49, 5.986617}, {8, 5.111637}]   |
|13    |[{93, 3.7036908}, {53, 3.2669418}, {74, 3.0079873}]|
+------+---------------------------------------------------+
only showing top 10 rows



In [27]:
recomendacoes = userRecs.select(userRecs["userId"],userRecs["recommendations"]["movieId"].alias("movieId")
                                    ,userRecs["recommendations"]["rating"].alias("rating") )

In [29]:
recomendacoes.show(20, False)

+------+------------+---------------------------------+
|userId|movieId     |rating                           |
+------+------------+---------------------------------+
|20    |[83, 63, 52]|[5.294015, 4.963829, 4.8285]     |
|10    |[62, 93, 29]|[4.8162794, 4.517762, 4.3242645] |
|0     |[62, 9, 40] |[3.92786, 3.291734, 3.0559452]   |
|1     |[58, 47, 25]|[5.31978, 4.5226946, 3.8565567]  |
|21    |[29, 53, 2] |[5.1598487, 4.7203784, 3.8707504]|
|11    |[32, 27, 30]|[5.254582, 5.0810575, 4.9302206] |
|12    |[9, 35, 55] |[5.5025663, 5.0510454, 4.7976894]|
|22    |[22, 75, 88]|[5.078688, 4.91083, 4.8385706]   |
|2     |[32, 49, 8] |[5.9938793, 5.986617, 5.111637]  |
|13    |[93, 53, 74]|[3.7036908, 3.2669418, 3.0079873]|
|3     |[51, 94, 18]|[5.1235037, 4.4322443, 4.14624]  |
|23    |[96, 55, 65]|[5.2013264, 5.133546, 5.072284]  |
|4     |[2, 62, 29] |[4.1768446, 4.0502434, 3.929776] |
|24    |[25, 22, 96]|[6.7787385, 6.0487514, 5.055647] |
|14    |[29, 63, 2] |[5.189298, 4.4338837, 4.261

In [30]:
recomendacoes.write.format("mongodb").mode("append").save()