# Modelo de recomendacion

A partir de la libreria Surprice, usamos el algoritmo de KNnWhitMeans, que en su caja negra lo que hace es construir una matriz a partir de los ratings puestos por los usuario
sobre los productos. Calcula la distancia por coseno entre vectores(rating) para ponerle valor numerico a la relacion para poder recomendar en base a los productos.

In [2]:
import pandas as pd
from pyspark.sql import SparkSession 
import numpy as np
from pyspark.sql import functions as F
from pyspark.sql.functions import col, udf, desc

In [3]:
from surprise import Reader
from surprise import Dataset
from surprise import accuracy
from surprise.model_selection import cross_validate
from surprise.model_selection import train_test_split
from surprise.model_selection import GridSearchCV
from pyspark.sql.types import DoubleType, IntegerType, StringType, FloatType
from surprise import SVD, SVDpp, NMF, SlopeOne, CoClustering, KNNBaseline, KNNWithZScore, KNNWithMeans, KNNBasic, BaselineOnly, NormalPredictor



In [4]:
#Iniciamos sesion en Spark
spark = SparkSession.builder.appName('sent').getOrCreate()

In [5]:
#leemos el dataset
dfspark = spark.read.json('../common/data/reviews_processed')

In [6]:
#Definimos una funcion que va a servir para crear un columna nueva
def prom_rating(val, val1):
    ''' Calcula el valor pormedio entre dos valores '''
    return (val + val1)/2   

In [7]:
col_new = udf(prom_rating)

In [8]:
#Creamos una columna nueva que promedia el ranking dado por el usuario y el establecido por el analizis de testo
dfspark = dfspark.withColumn("average_ranq", col_new(col("sentiment"), col('rating')))

In [9]:
#Seleccionamos los campos a utilizar
dfspark1 = dfspark.select('productId', 'reviewerId', 'average_ranq')

In [10]:
#Lista de los productos con mas reviews
mejores_productos_más_calificados = dfspark.groupBy("productId").count().sort(F.col("count").desc()).limit(5000)
productos = mejores_productos_más_calificados.select('productId').rdd.flatMap(lambda x: x).collect()

In [11]:
#Filtramos el dataframe con los productos con mas reviews
dfsparkFiltro = dfspark1.filter((dfspark.productId).isin(productos))

In [12]:
#Cambiamos los nombres de los campos para que el modelo los pueda procesar
modelo = dfsparkFiltro.withColumnRenamed('reviewerId', 'user').withColumnRenamed('productId','item').withColumnRenamed('average_ranq', 'rating')

In [13]:
#Cambiamos el tipo de dato de la columna 'rating'
modelo = modelo.withColumn('rating', F.col('rating').cast(FloatType()))

In [14]:
#Hacemos dos listas
users = modelo.select('user').distinct()
items = modelo.select('item').distinct()
users = users.select('user').rdd.flatMap(lambda x: x).collect()
items = items.select('item').rdd.flatMap(lambda x: x).collect()


In [15]:
#Fortalecemos la conexion de PySpark con Pandas
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

In [16]:
#Pasamos el dataframe a Pandas
df_pd = modelo.toPandas()

In [17]:
#Instanciamos el modelo
sim_options = {"name": "cosine", "user_based": False,}

algoritmo = KNNWithMeans(sim_options=sim_options)

In [18]:
reader = Reader(rating_scale=(1, 5))

In [19]:
data_n = Dataset.load_from_df(df_pd[["user", "item", "rating"]], reader)

In [20]:
#Definimos la matriz de entrenamiento
trainingSet = data_n.build_full_trainset()

In [21]:
algoritmo.fit(trainingSet)

Computing the cosine similarity matrix...
Done computing similarity matrix.


<surprise.prediction_algorithms.knns.KNNWithMeans at 0x7f0f4acf1dd0>

In [22]:
#El modelo devuelvo los 5 productos mas recomendados para un usuario x
recomend = []
usuario = 'A53MRKZW1IM66'
for item in items:
    p1 = algoritmo.predict(usuario, item[:50])
    if p1.est > 1:
        p2 = [p1.est , p1.iid]
        recomend.append(p2)
recomend.sort()
recomend[-5:]

[[5, 'B00JTKDTLE'],
 [5, 'B00JV858HM'],
 [5, 'B00JVZ0DQG'],
 [5, 'B00KWVZ750'],
 [5, 'BT00DDVMVQ']]

In [32]:
def item_id_to_name(id):
    name = dfspark_prod.filter((dfspark_prod.productId)==id).collect()[0][4]    
    return name

In [None]:
con = 0
while con <= 4:
    calification = recomend[-5:][con][0]
    prod_id = recomend[-5:][con][1]
    print(f'Para el usuario {usuario} la recomendacion es : {item_id_to_name(prod_id)}')
    con += 1

In [33]:
dfspark_prod = spark.read.json('../common/data/products_etl')

In [None]:
nombre = dfspark_prod.filter((dfspark1.productId).isin([recomend[1], recomend[3], recomend[5], recomend[7]]))

In [None]:
nombre.select('categories').show(truncate=False)

In [None]:
df_pd