# Sistema de Recomendação - SparkMLLib

In [1]:
# Imports
from pyspark.ml.recommendation import ALS

In [2]:
# Spark Session - usada quando se trabalha com Dataframes no Spark
spSession = SparkSession.builder.master("local").appName("DSA-SparkMLLib").getOrCreate()

In [3]:
# Carrega os dados no formato ALS (user, item, rating)
ratingsRDD = sc.textFile("data/user-item.txt")
ratingsRDD.collect()

['1001,9001,10',
 '1001,9002,1',
 '1001,9003,9',
 '1002,9001,3',
 '1002,9002,5',
 '1002,9003,1',
 '1002,9004,10',
 '1003,9001,2',
 '1003,9002,6',
 '1003,9003,2',
 '1003,9004,9',
 '1003,9005,10',
 '1003,9006,8',
 '1003,9007,9',
 '1004,9001,9',
 '1004,9002,2',
 '1004,9003,8',
 '1004,9004,3',
 '1004,9010,10',
 '1004,9011,9',
 '1004,9012,8',
 '1005,9001,8',
 '1005,9002,3',
 '1005,9003,7',
 '1005,9004,1',
 '1005,9010,9',
 '1005,9011,10',
 '1005,9012,9',
 '1005,9013,8',
 '1005,9014,1',
 '1005,9015,1',
 '1006,9001,7',
 '1006,9002,4',
 '1006,9003,8',
 '1006,9004,1',
 '1006,9010,7',
 '1006,9011,6',
 '1006,9012,9']

In [4]:
# Convertendo as strings
ratingsRDD2 = ratingsRDD.map(lambda l: l.split(',')).map(lambda l:(int(l[0]), int(l[1]), float(l[2])))

In [5]:
# Criando um Dataframe
ratingsDF = spSession.createDataFrame(ratingsRDD2, ["user", "item", "rating"])

In [6]:
ratingsDF.show()

+----+----+------+
|user|item|rating|
+----+----+------+
|1001|9001|  10.0|
|1001|9002|   1.0|
|1001|9003|   9.0|
|1002|9001|   3.0|
|1002|9002|   5.0|
|1002|9003|   1.0|
|1002|9004|  10.0|
|1003|9001|   2.0|
|1003|9002|   6.0|
|1003|9003|   2.0|
|1003|9004|   9.0|
|1003|9005|  10.0|
|1003|9006|   8.0|
|1003|9007|   9.0|
|1004|9001|   9.0|
|1004|9002|   2.0|
|1004|9003|   8.0|
|1004|9004|   3.0|
|1004|9010|  10.0|
|1004|9011|   9.0|
+----+----+------+
only showing top 20 rows



In [7]:
# Construindo o modelo
# ALS = Alternating Least Squares --> Algoritmo para sistema de recomendação, que otimiza a loss function 
# e funciona muito bem em ambientes paralelizados
als = ALS(rank = 10, maxIter = 5)
modelo = als.fit(ratingsDF)

In [8]:
# Visualizando o Affinity Score
modelo.userFactors.orderBy("id").collect()



[Row(id=1001, features=[-1.0203386545181274, 0.158844456076622, 0.4343518018722534, 0.4210808277130127, -0.08316308259963989, -0.22315622866153717, 0.43449491262435913, -0.016361340880393982, -0.09652023762464523, 1.2523633241653442]),
 Row(id=1002, features=[-0.7278648018836975, -0.3039030432701111, -1.4372806549072266, 0.41016703844070435, -0.20219342410564423, 0.3068915903568268, 0.23943811655044556, -0.3149692118167877, -0.4226529002189636, -0.5618031024932861]),
 Row(id=1003, features=[-0.27540671825408936, -0.020169362425804138, -1.2622171640396118, 0.6222057342529297, -0.33872950077056885, 0.6792863607406616, -0.05991802737116814, 0.23600471019744873, -0.2451867312192917, -0.479026198387146]),
 Row(id=1004, features=[-1.180858850479126, -0.19838424026966095, 0.10948377102613449, 0.7140416502952576, -0.034305863082408905, -0.058910198509693146, 0.4041875898838043, -0.052296675741672516, -0.1987917274236679, 0.6823390126228333]),
 Row(id=1005, features=[-0.520517110824585, 0.53832

In [9]:
# Criando um dataset de teste com usuários e items para rating
testeDF = spSession.createDataFrame([(1001, 9003),(1001,9004),(1001,9005)], ["user", "item"])

In [10]:
# Previsões  
# Quanto maior o Affinity Score, maior a probabilidade do usuário aceitar uma recomendação
previsoes = (modelo.transform(testeDF).collect())
previsoes

[Row(user=1001, item=9003, prediction=9.008316993713379),
 Row(user=1001, item=9004, prediction=-0.6660882234573364),
 Row(user=1001, item=9005, prediction=-2.7070765495300293)]