# Sistema de recomendacion para un Marketplace

##### Estudiantes:

- Victoria Alvarez
- Karen Velasquez
- Ana Uran
- Nicolas Prieto
- Pablo A. Saldarriaga

In [1]:
### Imports iniciales
import os
from time import time
from pyspark.sql import *
from pyspark.sql import Row
from pyspark.sql.functions import *
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import StringIndexer, StringIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import monotonically_increasing_id 
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit,CrossValidator

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
2,application_1600268116866_0003,pyspark,idle,,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
sc.install_pypi_package("pandas")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting pandas
  Using cached https://files.pythonhosted.org/packages/74/69/18b96b520519818e00b04dd08d7cbc5e764f1465f5a280cf96173f34c54e/pandas-1.1.2-cp37-cp37m-manylinux1_x86_64.whl
Collecting python-dateutil>=2.7.3 (from pandas)
  Using cached https://files.pythonhosted.org/packages/d4/70/d60450c3dd48ef87586924207ae8907090de0b306af2bce5d134d78615cb/python_dateutil-2.8.1-py2.py3-none-any.whl
Installing collected packages: python-dateutil, pandas
Successfully installed pandas-1.1.2 python-dateutil-2.8.1

In [None]:
#%%configure -f 
#{"driverMemory": "20000M"}

# Preprocesamiento de la información (NO SE EJECUTA)

El paso inicial consiste en leer la información alojada en diferentes buckets de S3

Inicialmente se da lectura a los metadatos de los productos con el fin de que estos sean la base para aplicación de modelos cold star, esta metadata fue descargada del enlace http://jmcauley.ucsd.edu/data/amazon/ y posteriormente almacenada en un bucket propio de S3.

Adicionalmente, leemos los datos del amazon customer review data set que se encuentran en un bucket público de amazon 

In [None]:
#### Lectura de la metadata de los productos
df_metadata = spark.read.csv("s3://info-proyecto-mineria-datos/metadata/products_*.csv", header = True, multiLine=True, quote='"',escape='"',sep=',').drop_duplicates(subset=['asin'])
df_metadata = df_metadata.select('customer_id','product_id','star_rating')
df_metadata.persist()
#Contar filas de productos
print(f'Cantidad de productos en la metadata: {df_metadata.count()}')

In [3]:
### Cargamos la informacion de todos los reviews almacenados en S3
df_reviews = spark.read.csv("s3://amazon-reviews-pds/tsv/amazon_reviews_us_*.tsv.gz", sep= '\t', header = True)
df_reviews = df_reviews.select('customer_id','product_id','star_rating')
df_reviews.persist()
### Imprimimos el esquema del conjunto de datos y mostramos las primeras 5 filas
df_reviews.printSchema()
df_reviews.show(5)

### Numero de registros que tiene el conjunto de datos
print(f'La cantidad de reviews que tenemos es: {df_reviews.count()}')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[customer_id: string, product_id: string, star_rating: string]

In [None]:
### Al conjunto de datos de los reviews le agregamos la metadata de los productos
reviews = df_reviews.alias('reviews')
products = df_metadata.alias('products')

all_data = reviews.join(products, reviews.product_id == products.asin,how='left')
all_data.select('product_id','asin','star_rating','price','title').show(5)
all_data.persist()

### Verificar que no cambie la cantidad de reviews despues del join
print(f'La cantidad de reviews despues del join es: {all_data.count()}')

In [None]:
### Eliminamos los registros que no tengan informacion de asin, precio y titulo
df_clean = df_reviews.na.drop(subset = ['asin'])
df_clean.select('product_id','asin','star_rating','price','title').show()
df_clean.persist()

print(f'La cantidad de reviews despues de la eliminacion es: {df_clean.count()}')

In [None]:
### Obtenemos la cantidad de productos diferentes que se tiene en el conjunto de datos de los reviewa
df_clean.agg(countDistinct(col("product_id")).alias("count")).show()

In [None]:
df_clean2.persist()
df_clean2.count()

In [50]:
# Debido a que existen productos con muy pocos reviews y esto afecta el desempeño de los modelos implementados 
# se filtran la base inicialmente parap productos con más de 150 reviews y se juega con este parámetro para conocer la capacidad
# máxima del cluster y de esta forma escalar los modelos.

df_reviews.createOrReplaceTempView("df_clean")
df_clean2 = spark.sql("""with category as (
                                    SELECT 
                                        product_id, 
                                        count(*) as n_reviews 
                                        FROM 
                                        df_clean  group by product_id), 
                                        category_def as (select * from category where n_reviews >= 25) select t1.* 
                                        from 
                                        df_clean as t1 inner join category_def as t2 on t1.product_id = t2.product_id 
                                        where t1.star_rating in ('1','2','3','4','5') """)
df_clean2.persist()
df_clean2.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

90983354

In [51]:
# Adicionalmente se incluyen solo reseñas de clientes que hallan hecho más de 5 reviews, con el fin de mejorar el performance 
# teniendo en cuenta que en el desarrollo del proyecto se tratará el problema del cold star.
df_clean2.createOrReplaceTempView("df_clean2")
df_final = spark.sql("""with category as (
                                    SELECT 
                                        customer_id, 
                                        count(*) as n_customers 
                                        FROM 
                                        df_clean2  group by customer_id), 
                                        category_def as (select * from category where n_customers >= 5) select t1.* 
                                        from 
                                        df_clean2 as t1 inner join category_def as t2 on t1.customer_id = t2.customer_id 
                                        where t1.star_rating in ('1','2','3','4','5') """)

df_final.persist()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[customer_id: string, product_id: string, star_rating: string]

In [52]:
df_final.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

58547703

In [53]:
#Generamos un index para cada código de producto
indexer_product_id = StringIndexer(inputCol="product_id", outputCol="item")
df = indexer_product_id.fit(df_final).transform(df_final)

# Generamos un indice para cada transacción con el fin de luego poderla identificar
ratings_df = df.select("*").withColumn("id", monotonically_increasing_id())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [54]:
#Garantizamos que estén incluidos los nuevos campos
ratings_df.printSchema()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [55]:
#Cantidad de reviews analizados
ratings_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

58547703

In [56]:
#Guardamos en un bucket S3 reviews asociados a productos que tienen más de 150 de estos
#ratings_df.write.parquet("s3a://mineria-info/muestra_genera14M.parquet")#>50

ratings_df.write.parquet("s3a://mineria-datos-20202/muestra_genera14M.parquet")#>50

#ratings_df.write.parquet("s3a://mineria-info/muestra_genera31M.parquet")#>100
#ratings_df.write.parquet("s3a://mineria-info/muestra_genera24M.parquet")#>150 prods ESTE

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Lectura de muestra guardada en S3

Tomamos la información de la muestra base con la que desarrollaremos el modelamiento incial, que es el resultado del procedimiento anterior

In [3]:
#df = spark.read.parquet("s3a://mineria-info/muestra_general.parquet")
df = spark.read.parquet("s3a://mineria-datos-20202/muestra_genera14M.parquet") ### 14 24 44 49 62
print(f'La cantidad de reviews a considerar es: {df.count()}')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

La cantidad de reviews a considerar es: 14919839

A continuación, se extrae la cantidad de productos, usuarios y reviews analizados

In [4]:
df.createOrReplaceTempView("df_base")


info_cuentas = spark.sql(""" SELECT 
                                COUNT(DISTINCT product_id) as cuenta_productos, 
                                COUNT(DISTINCT customer_id) as cuenta_usuarios, 
                                COUNT(customer_id) as cuenta_reviews 
                                FROM 
                                df_base""")

info_cuentas.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+---------------+--------------+
|cuenta_productos|cuenta_usuarios|cuenta_reviews|
+----------------+---------------+--------------+
|          115434|         765213|      14919839|
+----------------+---------------+--------------+

# Muestreo para training, validation y test

A partir de la muestra , se realiza un muestreo estratificado por producto donde se calcula la proporción de cada ítem dentro de la muestra total de 14.9 millones, con el fin de mantener dicha proporción en los grupos de entrenamiento, validación y testeo.



Esta muestra de 14.9 millones de reviews fue divida en un 60% - 20% - 20% para datos de entrenamiento, validación y testeo

**Procedimiento:**

In [5]:
#Seleccionamos los las variables relevantes

ratings_df = spark.sql(""" SELECT 
                                cast(t1.star_rating as int) as star_rating_integer, 
                                cast(t1.customer_id as int) as customer_id_integer, 
                                t1.product_id, 
                                t1.item , 
                                t1.id 
                                FROM 
                                df_base t1""")
ratings_df.persist()
ratings_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

14919839

In [6]:
#Selección de muestra: Debido a que es necesario garantizar que exista participación de todos los productos en el set de datos

NH2 = ratings_df.groupBy('product_id').count() #Poblacion por cada pruduct category
n = 0.6

list_of_lat2 = NH2.rdd.map(lambda r: (r.product_id,n)).collectAsMap()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

**training**

In [7]:
ratings_df_training = ratings_df.sampleBy("product_id", fractions=list_of_lat2,seed=10)
ratings_df_training.persist()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[star_rating_integer: int, customer_id_integer: int, product_id: string, item: double, id: bigint]

**Definición de muestras para validación y test ***

In [8]:
ratings_df_training.createOrReplaceTempView("df_training")
ratings_df.createOrReplaceTempView("df_general")

#Proceso para excluir lo que ya se encuentra en el set de training
ratings_df_aux = spark.sql(""" SELECT 
                                    t1.*  
                                    FROM 
                                    df_general as t1 left join df_training as t2 on t1.id = t2.id 
                                    WHERE 
                                    t1.id is null or t2.id is null """)
ratings_df_aux.persist()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[star_rating_integer: int, customer_id_integer: int, product_id: string, item: double, id: bigint]

In [9]:
### Obtención de conjunto de validación y test
NH3 = ratings_df_aux.groupBy('product_id').count()
n = 0.5

list_of_lat3 = NH3.rdd.map(lambda r: (r.product_id,n)).collectAsMap()

### Test
ratings_df_test = ratings_df_aux.sampleBy("product_id", fractions=list_of_lat3,seed=10)
ratings_df_test.persist()


### Validation
ratings_df_test.createOrReplaceTempView("df_test")
ratings_df_aux.createOrReplaceTempView("df_aux")

ratings_df_val = spark.sql("""SELECT 
                                    t1.*  
                                    FROM 
                                    df_aux as t1 left join df_test as t2 on t1.id = t2.id 
                                    WHERE 
                                    t1.id is null or t2.id is null """)
ratings_df_val.persist()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[star_rating_integer: int, customer_id_integer: int, product_id: string, item: double, id: bigint]

A continuación se observa la división de cada uno de las muestras, siempre garantizando que exista la participación de todos los productos (items) en cada grupo de datos

In [10]:
#cantidad de registros en los conjuntos de datos, la particion se hizo 60-20-20
print(f'Cantidad de registros en training: {ratings_df_training.count()}')
print(f'Cantidad de registros en validation: {ratings_df_val.count()}')
print(f'Cantidad de registros en test: {ratings_df_test.count()}')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Cantidad de registros en training: 8952274
Cantidad de registros en validation: 2983493
Cantidad de registros en test: 2984072

## **Implementación de modelos con información histórica**

Con el fin de tener la información requerida para el desarrollo de los modelos, se agregan las variables de sesgo al set de datos

**Training**

In [11]:
ratings_df_training.createOrReplaceTempView("data_reviews")

query = """ with data_summary as (
                            SELECT
                                avg(star_rating_integer) as promedio_global
                                FROM data_reviews
                            ),
                        cliente as(
                            SELECT
                                customer_id_integer,
                                avg(star_rating_integer) as promedio_usuario
                                FROM data_reviews
                                GROUP BY customer_id_integer
                        ),
                        producto as (
                            SELECT
                                product_id,
                                avg(star_rating_integer) as promedio_producto
                                FROM data_reviews
                                GROUP BY product_id
                        ),
                        summary_promg_cli as(
                            SELECT v.*,
                                data_summary.promedio_global as mu
                                FROM
                                (SELECT data_reviews.*,
                                cliente.promedio_usuario as promedio_usuario
                                FROM data_reviews
                                LEFT JOIN cliente ON data_reviews.customer_id_integer=cliente.customer_id_integer) v
                                CROSS JOIN data_summary)
                            SELECT *,
                                cast(star_rating_integer as int) as star_rating_int, 
                                cast(customer_id_integer as int) as customer_id_int,
                                promedio_producto-mu as bx,
                                promedio_usuario-mu as bi,
                                star_rating_integer - mu - (promedio_producto-mu) -(promedio_usuario-mu) as rating_normalizado
                                FROM (
                                SELECT summary_promg_cli.*,
                                    producto.promedio_producto as promedio_producto
                                    FROM
                                    summary_promg_cli
                                    LEFT JOIN producto ON summary_promg_cli.product_id=producto.product_id)"""


training_norm = spark.sql(query)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

**Validación**

In [12]:
training_norm.createOrReplaceTempView("train")
ratings_df_val.createOrReplaceTempView("val")
#test.createOrReplaceTempView("test")

query = """ with tabla_CLIENTES as(
                            SELECT customer_id_integer,
                                    max(bi) as bi
                                    FROM
                                    train
                                    GROUP BY customer_id_integer
                                    ),
                tabla_ITEMS as(
                                SELECT
                                    item,
                                    max(bx) as bx
                                    FROM
                                    train
                                    GROUP BY item),
                tabla_bi as (
                SELECT 
                    val.customer_id_integer,
                    val.star_rating_integer,
                    val.item,
                    tabla_CLIENTES.bi
                    FROM
                    tabla_CLIENTES RIGHT JOIN val ON 
                    val.customer_id_integer=tabla_CLIENTES.customer_id_integer ),

                    data_summary as (
                                SELECT
                                    MAX(mu) as mu
                                    FROM train
                                )
                SELECT *
                    from (
                    select
                        tabla_bi.customer_id_integer,
                        tabla_bi.item,
                        tabla_bi.star_rating_integer,
                        tabla_ITEMS.bx,
                        tabla_bi.bi
                        FROM
                        tabla_bi LEFT JOIN tabla_ITEMS ON tabla_bi.item=tabla_ITEMS.item) CROSS JOIN data_summary """

val = spark.sql(query)
val = val.fillna(0)
val.persist()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[customer_id_integer: int, item: double, star_rating_integer: int, bx: double, bi: double, mu: double]

**Test**

In [13]:
training_norm.createOrReplaceTempView("train")
ratings_df_test.createOrReplaceTempView("test")

query = """ with tabla_CLIENTES as(
                            SELECT customer_id_integer,
                                    max(bi) as bi
                                    FROM
                                    train
                                    GROUP BY customer_id_integer
                                    ),
                tabla_ITEMS as(
                                SELECT
                                    item,
                                    max(bx) as bx
                                    FROM
                                    train
                                    GROUP BY item),
                tabla_bi as (
                SELECT 
                    test.customer_id_integer,
                    test.star_rating_integer,
                    test.item,
                    tabla_CLIENTES.bi
                    FROM
                    tabla_CLIENTES RIGHT JOIN test ON 
                    test.customer_id_integer=tabla_CLIENTES.customer_id_integer ),

                    data_summary as (
                                SELECT
                                    MAX(mu) as mu
                                    FROM train
                                )
                SELECT *
                    from (
                    select
                        tabla_bi.customer_id_integer,
                        tabla_bi.item,
                        tabla_bi.star_rating_integer,
                        tabla_ITEMS.bx,
                        tabla_bi.bi
                        FROM
                        tabla_bi LEFT JOIN tabla_ITEMS ON tabla_bi.item=tabla_ITEMS.item) CROSS JOIN data_summary """

test = spark.sql(query)
test = test.fillna(0)
test.persist()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[customer_id_integer: int, item: double, star_rating_integer: int, bx: double, bi: double, mu: double]

In [14]:
training = training_norm.select('rating_normalizado','customer_id_integer','star_rating_integer','item')
training.persist()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[rating_normalizado: double, customer_id_integer: int, star_rating_integer: int, item: double]

# Etapa de modelamiento

## Modelo Baseline

El modelo baseline consiste en obtener la calificación promedio para predecir la siguiente calificación. A partir de allí calculamos el error cuadrático medio, Los modelos baseline consisten en extraer la calificación promedio para estimar la reseña que van a dar los clientes a los productos que aún no compran. En este caso, se ejecutaron cuatro modelos baseline empleando la mediana, media global, la media por producto y por usuario.esta es la referencia para definir si los siguientes modelos presentan mejor rendimiento.

In [15]:
### Baseline con la Mediana
t_ini = time()

training.createOrReplaceTempView("training_df")
median = spark.sql("select percentile_approx(star_rating_integer, 0.5) as median from training_df")
median = float(median.toPandas()['median'][0])

print(f'La mediana en training es: {median}')

se_rdd = val.rdd.map(lambda x: (x['star_rating_integer']-median)**2)
row = Row("valor")
se_df = se_rdd.map(row).toDF()
se_df.createOrReplaceTempView('se_df_median')
baseline_median = spark.sql('SELECT SQRT(AVG(valor)) as RMSE  FROM se_df_median')
baseline_median.show()

print(f'Tiempo: {time()-t_ini}')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

La mediana en training es: 5.0
+------------------+
|              RMSE|
+------------------+
|1.3090958166826092|
+------------------+

Tiempo: 172.58927488327026

In [16]:
### Baseline con el promedio
t_ini = time()

mean = float(training.describe().toPandas()['star_rating_integer'][1]) # mean

print(f'El promedio en trainig es: {mean}')

se_rdd = val.rdd.map(lambda x: (x['star_rating_integer']-mean)**2)
row = Row("valor")
se_df = se_rdd.map(row).toDF()
se_df.createOrReplaceTempView('se_df')
baseline = spark.sql('SELECT SQRT(AVG(valor)) as RMSE  FROM se_df')
baseline.show()

print(f'Tiempo: {time()-t_ini}')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

El promedio en trainig es: 4.313211704646216
+-----------------+
|             RMSE|
+-----------------+
|1.114904805061795|
+-----------------+

Tiempo: 28.577411890029907

In [17]:
### Baseline promedio por producto
t_ini = time()
training.createOrReplaceTempView("training_df")
val.createOrReplaceTempView("validation_df")
mean_prod = spark.sql("""with tabla_prods as(
                        select 
                            item, 
                            AVG(star_rating_integer) as prom_prod 
                            FROM 
                            training_df
                            GROUP BY item),
                        table_aux as(
                        SELECT 
                            validation_df.*,
                            tabla_prods.prom_prod,
                            POWER(validation_df.star_rating_integer-tabla_prods.prom_prod,2) as SE
                            FROM
                            validation_df LEFT JOIN tabla_prods ON validation_df.item=tabla_prods.item
                            )
                            SELECT SQRT(AVG(SE)) as RMSE
                            FROM
                            table_aux
                            """)
                        
mean_prod.show()
print(f'Tiempo: {time()-t_ini}')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------+
|             RMSE|
+-----------------+
|1.055840559903122|
+-----------------+

Tiempo: 29.988484621047974

In [18]:
### Modelo baseline promedio por usuario

t_ini = time()
training.createOrReplaceTempView("training_df")
val.createOrReplaceTempView("validation_df")
mean_cli = spark.sql("""with tabla_customers as(
                        select 
                            customer_id_integer, 
                            AVG(star_rating_integer) as prom_prod 
                            FROM 
                            training_df
                            GROUP BY customer_id_integer)
                        SELECT 
                            validation_df.*,
                            tabla_customers.prom_prod
                            FROM
                            validation_df LEFT JOIN tabla_customers ON validation_df.customer_id_integer=tabla_customers.customer_id_integer
                            
                            """)
### para aquellos que no se les asigno un promedio por usuario
### ponemos el promedio global
mean_cli = mean_cli.fillna(mean)

mean_cli.createOrReplaceTempView("tabla_clientes_f")
mean_cli_final = spark.sql("""with tabla_errores as(
                                SELECT
                                    POWER(tabla_clientes_f.star_rating_integer-tabla_clientes_f.prom_prod,2) as SE
                                    FROM
                                    tabla_clientes_f )
                                    SELECT SQRT(AVG(SE)) as RMSE
                                        FROM
                                        tabla_errores
                                    """)

mean_cli_final.show()
print(f'Tiempo: {time()-t_ini}')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+
|              RMSE|
+------------------+
|1.0676053136258488|
+------------------+

Tiempo: 32.09034276008606

## Modelo de Factores latentes

### ALS 

Se emplea modelos de factores latentes utilizando el algoritmo ALS, el modelo fue entrenado con el conjunto de training previamente mencionado y validado con el conjunto validation, los inputs recibidos por el modelo son el user id correspondiente a la identificación del usuario que califico el producto,  el  star rating que corresponde a la puntuación generada por el usuario y  el item correspondiente al producto calificado por el usuario, siendo estas las variables iniciales para el modelo; como se menciono previamente se realizo la aplicación de diferentes combinaciones entre hiper parámetros generando un total de 9 modelos, a continuación se listan los hiper parámetros (iteraciones, factores latentes y regularización) y resultados de cada modelo:

**Modelo inicial**
- **rank** = 10 por defecto
- **maxIter** = 10 por defecto 
- **regParam** = 0.1 por defectp

In [19]:
### Entrenamiento del modelo
start_time = time()

als = ALS(userCol="customer_id_integer", itemCol="item", ratingCol="star_rating_integer",
          coldStartStrategy="drop",nonnegative=True) #rank=10, maxIter=10, regParam=0.1
model = als.fit(ratings_df_training)

elapsed_time = time() - start_time
print("Elapsed time: %.10f seconds." % elapsed_time)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Elapsed time: 62.3497815132 seconds.

In [57]:
### Metrica del modelo en validación
start_time = time()
predictions = model.transform(ratings_df_val)
predictions_corr = predictions.withColumn("preds_corr",
                               when(col("prediction") < 1 , 1)
                              .when(col("prediction") > 5 , 5)
                              .otherwise(col("prediction")))
predictions_corr.persist()
evaluator = RegressionEvaluator(metricName="rmse", labelCol="star_rating_integer",predictionCol="prediction")
rmse = evaluator.evaluate(predictions_corr)

print("Root-mean-square error = " + str(rmse))
print("Elapsed time: %.10f seconds." % elapsed_time)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Root-mean-square error = 1.2609236011967255
Elapsed time: 223.0072054863 seconds.

### Ajuste de hipeparámetros

Con el fin de realizar las combinaciones previamente mencionadas se entranan modelos con diferentes hiperparámetros

- **rank** = la cantidad de factores latentes en el modelo (5, 10 y 15 como valores seleccionados)
- **maxIter** = el número máximo de iteraciones (valor predeterminado 10)
- **regParam** = el parámetro de regularización (0.01, 0.1 y 0.50 como valores seleccionados)

In [20]:
def als_pipeline(df_train,df_val,s,rank, maxiter):
    
    als = ALS(maxIter=maxiter, 
              rank =rank , 
              regParam=s, 
              userCol="customer_id_integer", 
              itemCol="item", 
              ratingCol="star_rating_integer",
              coldStartStrategy="drop")
    
    model = als.fit(df_train)
    predictions = model.transform(df_val)
    predictions_corr = predictions.withColumn("preds_corr",
                               when(col("prediction") < 1 , 1)
                              .when(col("prediction") > 5 , 5)
                              .otherwise(col("prediction")))
    
    predictions_corr.persist()
    evaluator = RegressionEvaluator(metricName="rmse", labelCol="star_rating_integer",predictionCol="preds_corr")
    testset_rmse = evaluator.evaluate(predictions_corr)
    
    return testset_rmse,s,rank

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
downsamples = [0.01, 0.1, 0.5]
rango = [5, 10, 15] 
rmses = [] 
for s in downsamples:
    for d in rango:
        start_time = time()
        print(f" variables - {s}- {d}")
        test_rmse,s,rank = als_pipeline(training,val,s,d, 10)
        elapsed_time = time() - start_time
        rmses.append([test_rmse,s,elapsed_time,rank])
        
df_metrica2 = spark.createDataFrame(rmses,['rmse','regParam','tiempo','rank'])
df_metrica2.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

 variables - 0.01- 5
 variables - 0.01- 10
 variables - 0.01- 15
 variables - 0.1- 5
 variables - 0.1- 10
 variables - 0.1- 15
 variables - 0.5- 5
 variables - 0.5- 10
 variables - 0.5- 15
+------------------+--------+------------------+----+
|              rmse|regParam|            tiempo|rank|
+------------------+--------+------------------+----+
|1.2848505714790919|    0.01|120.71894645690918|   5|
|1.6607101670554205|    0.01|128.82892322540283|  10|
|2.1466029049154316|    0.01|137.08186554908752|  15|
|  1.14548601464932|     0.1|120.69456386566162|   5|
|1.1661973080989725|     0.1|129.01196765899658|  10|
| 1.159792668536722|     0.1|140.32774305343628|  15|
|1.1225435993849013|     0.5| 128.7129738330841|   5|
|1.1213726094604428|     0.5| 131.4477515220642|  10|
| 1.121463316816976|     0.5|  142.866845369339|  15|
+------------------+--------+------------------+----+

Como se observa en la tabla previa, el mejor modelo basado en la métrica RMSE es el que tiene como hiper parámetros 10 iteraciones, 10 factores latentes y 0.5 como parámetro de regularización, permitiéndonos obtener el menor RMSE con un valor  1.121373; de acuerdo a estos resultados, se elige como modelo para ser comparado con el resto de modelos incluidos dentro de la sección, buscando elegir el modelo con mejor desempeño que permita realizar las recomendaciones finales.

## Modelos de factores latentes con sesgo

Se incluyen sesgos al modelo de factores latentes. Se consideraron sesgos globales (media global), a nivel de usuario (media por usuario menos media global) y a nivel de producto (media por producto menos media global). Estos fueron adicionados a la predicción del modelo de filtros colaborativos.

In [25]:
def als_bias_pipeline(df_train,df_test,s,rank, maxiter):
    
    ### realizamos una instancia del modelo
    als = ALS(maxIter=maxiter, 
              rank =rank , 
              regParam=s, 
              userCol="customer_id_integer", 
              itemCol="item", 
              ratingCol="rating_normalizado",
              coldStartStrategy="drop")
    
    ### Entrenamos el modelo
    model = als.fit(df_train)
    
    ### Realizamos la prediccion del modelo en conjutno de validacion
    ### e incorporamos el sesgo a la prediccion
    predictions = model.transform(df_test)
    predictions= predictions.withColumn("Preds_bias", predictions.prediction + 
                                                      predictions.mu + 
                                                      predictions.bx +
                                                      predictions.bi)
    
    predictions.persist()
    predictions_corr = predictions.withColumn("preds_corr",
                                   when(col("Preds_bias") < 1 , 1)
                                  .when(col("Preds_bias") > 5 , 5)
                                  .otherwise(col("Preds_bias")))
    ### Obtenemos la metrica en el conjunto de validacion
    evaluator = RegressionEvaluator(metricName="rmse", labelCol="star_rating_integer",predictionCol="Preds_bias")
    testset_rmse = evaluator.evaluate(predictions_corr)
    
    return testset_rmse, s, rank

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
### Realizamos una ejecucion del modelo con bias
testset_rmse_B,s_B,rank_B = als_bias_pipeline(training,val,0.01,10,10)
print(testset_rmse_B)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1.4641029019407945

In [27]:
downsamples = [0.01, 0.1, 0.5]# list of percentages to downsample training set
rango = [5, 10, 15] 
rmses = [] 
for s in downsamples:
    for d in rango:
        start_time = time()
        print(f" variables - {s}- {d}")
        test_rmse,s,rank = als_bias_pipeline(training,val,s,d, 10)
        elapsed_time = time() - start_time
        rmses.append([test_rmse,s,elapsed_time,rank])
        
df_metricaB = spark.createDataFrame(rmses,['rmse','regParam','tiempo','rank'])
df_metricaB.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-27:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 3457



 variables - 0.01- 5
 variables - 0.01- 10
 variables - 0.01- 15
 variables - 0.1- 5
 variables - 0.1- 10
 variables - 0.1- 15
 variables - 0.5- 5
 variables - 0.5- 10
 variables - 0.5- 15
+------------------+--------+------------------+----+
|              rmse|regParam|            tiempo|rank|
+------------------+--------+------------------+----+
|1.4937858560486668|    0.01|109.81022238731384|   5|
|1.4641029019407945|    0.01|120.35951352119446|  10|
|1.3523364347797808|    0.01| 125.5230360031128|  15|
| 1.136435419937925|     0.1|110.04429459571838|   5|
|1.1230690346479222|     0.1|118.56872224807739|  10|
|1.1014438485647313|     0.1|127.20243740081787|  15|
|1.0301370926307816|     0.5|113.56155252456665|   5|
|1.0301242393107255|     0.5|119.25893378257751|  10|
|1.0301196359631826|     0.5|125.81444311141968|  15|
+------------------+--------+------------------+----+

Como se observa en la tabla previa, el mejor modelo basado en la métrica RMSE es el que tiene como hiper parámetros 10 iteraciones, 15 factores latentes y 0.5 como parámetro de regularización, permitiéndonos obtener el menor RMSE con un valor  1.030120; de acuerdo a estos resultados, se elige como modelo para ser comparado con el resto de modelos incluidos dentro de la sección, buscando elegir el modelo con mejor desempeño que permita realizar las recomendaciones finales.

### Seleccion de modelo

Con base en los resultados anteriores, se tiene que el mejor modelo es el implementado con FACTORES LATENTES CON SESGOS con un RMSE en el conjunto de validacion de 1.030120, por lo cual este será elegido como el modelo final de modelos con información histórica.

# Evaluacion del modelo en conjunto de prueba

Basado en el mejor resultado anterior

In [29]:
time_ini = time()
als = ALS(maxIter=10, 
          rank =15 , 
          regParam=0.5, 
          userCol="customer_id_integer", 
          itemCol="item", 
          ratingCol="rating_normalizado",
          coldStartStrategy="drop")

### Entrenamos el modelo
model = als.fit(training)

### Realizamos la prediccion del modelo en conjutno de validacion
### e incorporamos el sesgo a la prediccion
predictions = model.transform(test)
predictions= predictions.withColumn("Preds_bias", predictions.prediction + 
                                                  predictions.mu + 
                                                  predictions.bx +
                                                  predictions.bi)

predictions.persist()
predictions_corr = predictions.withColumn("preds_corr",
                               when(col("Preds_bias") < 1 , 1)
                              .when(col("Preds_bias") > 5 , 5)
                              .otherwise(col("Preds_bias")))

### Obtenemos la metrica en el conjunto de validacion
evaluator = RegressionEvaluator(metricName="rmse", labelCol="star_rating_integer",predictionCol="preds_corr")
testset_rmse = evaluator.evaluate(predictions_corr)
print(testset_rmse)
print(f'Tiempo ejecucion: {time()-time_ini}')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1.019881018369282
Tiempo ejecucion: 231.606201171875

Al aplicar el mejor modelo en el conjunto de test obtenemos que el RMSE en es de 1.019881 obteniendo una mejor métrica e indicando que estos resultados son consistentes e incluso mejores con respecto a los obtenidos en validación.

### Caso de aplicacion - Recomendacion de productos a usuarios

A continuación podemos ver el top número 1 para cada usuario 

In [30]:
userRecs = model.recommendForAllUsers(1)
userRecs.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+--------------------+
|customer_id_integer|     recommendations|
+-------------------+--------------------+
|              10206|[[115069, 0.01608...|
|              12566|[[101600, 0.02665...|
|              13406|[[101600, 0.00660...|
|              15062|[[114125, 0.00672...|
|              23123|[[5883, 0.0037734...|
|              23619|[[113655, 0.00304...|
|              24252|[[115292, 4.60627...|
|              31264|[[111962, 0.00516...|
|              37004|[[114184, 0.00800...|
|              37827|[[102789, 0.00289...|
|              43548|[[114125, 0.02941...|
|              44185|[[113237, 0.00129...|
|              46318|[[108465, 0.00669...|
|              47067|[[108805, 0.00248...|
|              48077|[[111560, 0.05213...|
|              51528|[[104777, 0.06587...|
|              54110|[[114485, 0.08473...|
|              60683|[[113329, 0.00162...|
|              62320|[[100366, 5.73255...|
|              67630|[[105027, 0.00586...|
+----------

## Modelos cold start

Este modelo usa un MinHashLSH primero para hallar los articulos mas similares a un cierto producto basandose en el título de éste. Este producto podria ser un "producto nuevo" sin ningun problema, lo cual lo hace un modelo apto para el cold start (simplemente se necesita tener la metadata de este articulo). 
Usando LSH, se hallan los "n" productos mas similares para este articulo.
Luego de este paso, en una segunda etapa se usan las variables de precio, categoría y marca del artículo y de sus "n" artículos más similares para computar un score de afinidad, y con este score podremos determinar cuales son los "m" artículos más afines al producto de interés de entre los "n" artículos similares hallados en la etapa 1.
Estos "m" artículos que salen del segundo paso serían los artículos que se recomiendan junto con este artículo. Así, a las personas que hayan comprado alguno de los "m" productos, se les debe recomendar también el producto de interés.

Para validar el rendimiento de este modelo, se extraen algunos artículos del conjunto de datos general y se dejan aparte en un conjunto de validación. Para predecir los ratings de estos artículos, se toma el promedio de los "m" artículos más afines según este procedimiento. Éste sería el rating predicho para este artículo (sin importar el usuario) y con este rating se calcularía la métrica (RMSE) del modelo (comparando los ratings predichos contra los reales) para ver su desempeño.

In [1]:
#### El proceso descrito anteriormente usariamos 3 hiperparametros principales, los cuales son
import time
time1 = time.time()
## "n", numero maximo de vecinos que se obtendrian con el algoritmo de LSH
n = 15

## "m", numero maximo de productos mas afines que se sacarian calculando un score para los n vecinos obtenidos con LSH
m = 5

## "numhasht", numero de tablas de Hash a usar en el objeto MinHashLSH
numhasht = 5
print(str(time.time()-time1)+'s')

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1599766849228_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

7.414817810058594e-05s

In [2]:
### Lectura de parquet con ratings por producto
ratings_producto = spark.read.parquet("s3://info-proyecto-mineria-datos/ratings_producto.parquet")
ratings_producto.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+----------+
|      asin|avg_rating|
+----------+----------+
|0001839233|       4.0|
|0002199009|       4.5|
|0002239221|      4.25|
|0002261952|       4.0|
|0002553465|       5.0|
|0002554623|       5.0|
|0006931863|       5.0|
|0007135653|       5.0|
|0007180691|       4.0|
|0007216521|       5.0|
|0007230060|       5.0|
|0007242980|       4.0|
|0007338155|       4.5|
|0007391382|       5.0|
|0007416865|       4.0|
|0007439237|      4.25|
|0007514794|       5.0|
|0007823819|       5.0|
|0013073915|       4.5|
|0020183402|       4.0|
+----------+----------+
only showing top 20 rows

In [3]:
### Lectura del dataframe de metadata
### Limitar filas para que corra en tiempo prudente
#limite = 3000000
df_metadata = spark.read.csv("s3://info-proyecto-mineria-datos/metadata/products_*.csv", header = True, multiLine=True, quote='"',escape='"',sep=',').drop_duplicates(subset=['asin'])


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
### El dataset que se usara es el df_metadata. Descartaremos todos los productos que tengan campos nulos
df_metadata_full = df_metadata.na.drop()
df_metadata_full = df_metadata_full.filter(df_metadata_full.price.contains('$'))

### Dejar solo los que existan los ratings (solo estos cuentan como productos de nuestro dataset)
df_metadata_full = ratings_producto.select('asin').join(df_metadata_full, 'asin', how='inner')

df_metadata_full.persist()
df_metadata_full.count()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

3704294

In [44]:
### Dividir los datos en entrenamiento y validacion
df_metadata_cl=df_metadata_full.sample(False, 0.999987, seed=1) ## se ajusta para que sean aprox 40 productos en validacion
df_metadata_cl.printSchema()
df_metadata_cl.persist()
print('Productos de entrenamiento: '+str(df_metadata_cl.count()))

df_metadata_val=df_metadata_full.join(df_metadata_cl,['asin'], "left_anti")
df_metadata_val.persist()

print('Productos de validacion: '+str(df_metadata_val.count()))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- asin: string (nullable = true)
 |-- price: string (nullable = true)
 |-- title: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- category: string (nullable = true)
: string (nullable = true)

Productos de entrenamiento: 3704250
Productos de validacion: 44

In [45]:
df_metadata_val.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-------+--------------------+--------------------+--------------------+--------------------+
|
+----------+-------+--------------------+--------------------+--------------------+--------------------+
|B004GW5UM6|  $0.91|Maracas Rattle Ha...|                Jive|['Toys & Games', ...|['', '<b> Materia...|
|B005J4MYYM|  $6.92|Black Zipper Earr...|     Forum Novelties|['Clothing, Shoes...|['BLACK ZIPPER EA...|
|B00TOLW7N4|$166.99|OEM Evinrude E-Te...|Johnson Evinrude OMC|['Automotive', 'M...|['Brand new, genu...|
|B00BW7MV98|$129.99|Beautiful Life Ur...| Beautiful Life Urns|['Home & Kitchen'...|['', 'With an ele...|
|B00J5YV7PY| $12.99|T-Power (6.6ft Lo...|             T POWER|['Electronics', '...|['T-Power Made wi...|
|0578150492|  $6.10|Kathleen and Oona...|Visit Amazon's Wi...|['Books', 'Parent...|["This book is a ...|
|B0079V2P9M| $22.98|.925 Sterling Sil...|          Queenberry|['Clothing, Shoes...|['It fits major b...|
|1575053861|  $5.97|Octopuses (Nature...|Visit Amazon

In [46]:
### Tokenizar los textos y crear conteos de las palabras (shingling)
time1 = time.time()
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType, StringType

import pyspark.sql.functions as f
regexTokenizer = RegexTokenizer(inputCol="title", outputCol="words", pattern='\w+', gaps=False)
characterDataFrame = regexTokenizer.transform(df_metadata_cl)

from pyspark.ml.feature import CountVectorizer
cv = CountVectorizer(inputCol="words", outputCol="features", binary=True)
count_model = cv.fit(characterDataFrame)

### Quitar nulos
resulta = count_model.transform(characterDataFrame)
resu = resulta.withColumn("label", resulta["features"].cast(StringType()))
resu = resu.filter(~resu.label.contains('[]'))
result = resu.drop('label')

result.show(4,truncate=True)
print(str(time.time()-time1)+'s')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|               words|            features|
+----------+-------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|0001714422|  $5.82|'C' Is for Clown ...|Visit Amazon's St...|                  []|["C IS FOR CLOWN ...|[c, is, for, clow...|(262144,[3,12,105...|
|0002168383| $32.25|The Beatles: For ...|            P. Cowan|                  []|['Original 1st ed...|[the, beatles, fo...|(262144,[0,3,2013...|
|0002216973|$123.94|     Red Adam's Lady|Visit Amazon's Gr...|['Books', 'Litera...|["<div><div><B>Gr...|[red, adam, s, lady]|(262144,[5,44,925...|
|    [running, blind]|(262144,[953,3287...|
+----------+-------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 4 rows

19.77

In [47]:
## Ahora, usar el objeto de MinHashLSH, para entrenar el modelo de MinHashLSH con los productos conocidos de la base de datos
time1 = time.time()
from pyspark.ml.feature import MinHashLSH
mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=numhasht, seed=1234)
lsh_model = mh.fit(result)
print(str(time.time()-time1)+'s')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0.40893054008483887s

#### Calculo de las metrica de RMSE para el modelo de cold start

In [48]:
## Hallemos los n vecinos mas similares para cada uno de los productos de test
time1 = time.time()
### Productos nuevos (conjunto de validacion)
sentences = df_metadata_val.select("title").rdd.flatMap(lambda x: x).collect()
price_val = df_metadata_val.select("price").rdd.flatMap(lambda x: x).collect()
brand_val = df_metadata_val.select("brand").rdd.flatMap(lambda x: x).collect()
category_val = df_metadata_val.select("category").rdd.flatMap(lambda x: x).collect()



### Vecinos similares a cada producto
similares_parte1=[]

### Hallar "n" mas similares para cada producto
for sentence in sentences:
    sentenceDataFrame = spark.createDataFrame([
    (0, sentence)
    ], ["id", "title"])
    char2 = regexTokenizer.transform(sentenceDataFrame)
    outp2 = count_model.transform(char2)
    outputDataFrame = lsh_model.transform(outp2)
    key = outputDataFrame.head().features
    similars = lsh_model.approxNearestNeighbors(result, key, n)
    similars.persist()
    similares_parte1.append(similars)
print(str(time.time()-time1)+'s')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

52.70896029472351s

In [49]:
#### Definimos aqui un par de funciones auxiliares

### Funcion para obtener el precio como numero
def obtiene_precio(precio):
    precio1 = precio.replace('$','')
    precio2 = precio1.replace(',','')
    precio3 = precio2.split(' ')
    precio4 = 0
    count=0
    for i in precio3:
        if len(i.strip())>1:
            count = count+1
            precio4 = precio4+float(i)
    return precio4/count


### Funcion para calcular la similaridad de Jaccard (para las listas)
def jaccard_similarity(list1, list2):
    s1 = set(list1)
    s2 = set(list2)
    return len(s1.intersection(s2)) / len(s1.union(s2))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [50]:
### Ahora, hallar los m mas afines a los n mas similares de cada producto
time1 = time.time()
count = 0
mas_afines_son = []
for cruzados_sim_1 in similares_parte1:
    ### Extraigamos de la metadata todas las filas relacionados a cada uno de los segun LSH
    #cruzados_sim_1 = vecino_n.select("asin").join(df_metadata_cl, "asin", how='left')
    

    
    ### Listas de asin, precios, marcas y categorias de los n mas similares
    cruzare = cruzados_sim_1.rdd.flatMap(lambda x: x).collect()
    price_son=[]
    asin_son=[]
    brand_son=[]
    category_son=[]
    for i in range(len(cruzare)):
        est = cruzare[i]
        if i%10==0:
            asin_son.append(est)
        if i%10==1:
            price_son.append(est)
        if i%10==3:
            brand_son.append(est)
        if i%10==4:
            category_son.append(est)

    
    ### Listas de asin, precios, marcas y categorias del producto "nuevo"
    price_es = price_val[count]
    brand_es = brand_val[count]
    category_es = category_val[count]
    
    
    ### Preprocesar los precios: transformacion logaritmica y hallar distancia maxima al precio promedio
    import math
    precios_son = []
    precio_es = math.log(obtiene_precio(price_es), 10)
    max_distanc = 0
    for p in price_son:
        estep = math.log(obtiene_precio(p), 10)
        precios_son.append(estep)
        dista = abs(precio_es-estep)
        if dista > max_distanc:
            max_distanc = dista
            
    ## Por si todos eran iguales
    if max_distanc==0:
        max_distanc=1
            
            
    ### Ahora, vamos a calcular entonces los score para cada uno de los n vecinos
    scores = []
    for i in range(len(precios_son)):
        score = 0

        ### Score de marca
        if brand_son[i] == brand_es and brand_es and brand_son[i]:
            score = score+1
        ### Score de categoria (cuantas coinciden, similaridad jaccard)
        if category_es and category_son[i]:
            simi = jaccard_similarity(category_es, eval(category_son[i]))
            score = score + simi

        ### Score de precio
        score = score + (1 - (abs(precio_es-precios_son[i]))/max_distanc)

        scores.append(score)

        
    ### Recomendar entonces los m con mayor score
    import numpy as np
    index_recomendados = np.argsort(scores)[::-1][:m]
    asin_rec = list(np.array(asin_son)[index_recomendados])
    print(asin_rec)
    count = count+1
    mas_afines_son.append(asin_rec)
    #title_rec = list(np.array(title_son)[index_recomendados])
    #print(title_rec)
print(str(time.time()-time1)+'s')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['B006HV5TD4', 'B006I3CTX4', 'B006IBYR2W', 'B002YEP528', 'B001GI12IY']
['B00ZSOWJAM', 'B00Q6F7AN8', 'B002HO9QBG', 'B00HBR8A84', 'B00005Y7NC']
['B00R56YRLU', 'B00MAO9UO6', 'B008BVZM30', 'B00K08OQKW', 'B004ZJBL64']
['B00BW7RKKS', 'B00BW7LYC8', 'B00BW7LWHA', 'B00BW7M6TI', 'B00BW7RX64']
['B00CJ26LAU', 'B00JMTVQ3K', 'B00J5YUYVW', 'B00J5YUX18', 'B00DIL1GNS']
['1907056017', '0552527564', '1481817248', '0064431460', '0425266354']
['B00K4T70R8', 'B003XYHF30', 'B004WHU7BY', 'B005J2F5UE', 'B005N38V1Y']
['1575054825', '1575050781', '1575058707', '1575052938', '1575055775']
['1441120726', '0762446455', '0718123123', 'B004VLYHN0', '0750238860']
['B000Z31ZJ2', '1479194670', '0060141557', '0967917360', '0984915109']
['B001RZ2IBQ', 'B006F8Y76E', 'B0074ENRVY', 'B001RZ6KT2', 'B000GUWSPO']
['1891024388', '0824826280', '1559362006', '0140172483', '0989302105']
['B000H6SY28', '0310238188', 'B000AXWHHQ', 'B001TH37O4', '1492149659']
['B00H8XZZFC', 'B00H8Y01SM', 'B00H8Y0146', 'B001HBUDE4', 'B002TA0D4C']
['1494

Ya tenemos los productos mas afines a cada uno de los productos del conjunto de validación. Ahora, calculamos el rating promedio de todos los productos más afines a cada producto, y este sería el rating predicho para cada uno de nuestros productos. Con estos ratings predichos y ya mirando los datos reales, calculamos la métrica RMSE.

In [51]:
### Los product id de los productos del conjunto de validacion son 
time1 = time.time()
asin_val = df_metadata_val.select("asin").rdd.flatMap(lambda x: x).collect()
print(asin_val)
print(str(time.time()-time1)+'s')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['B004GW5UM6', 'B005J4MYYM', 'B00TOLW7N4', 'B00BW7MV98', 'B00J5YV7PY', '0578150492', 'B0079V2P9M', '1575053861', '0688076629', '0763647357', 'B006F8Z3N0', '0972249508', '1405791454', 'B00H8XZZXY', '0399115218', '0615657869', 'B0002FTCS4', 'B00007AKDL', 'B00EZ8H12S', 'B0009YWDKM', '8188018066', '0595518036', 'B002IZ9JKW', 'B006YG58H4', 'B00F21EVN4', 'B00CODHUMM', '1569871108', 'B000PSAI90', 'B00CU6GCGS', '0937207039', 'B006O0Y84Y', 'B007YH0IJU', 'B008M4RDH4', '0984638709', 'B005GI8H4M', 'B001G7OU48', 'B003HLM2QO', '0914390260', 'B000BQKTUE', 'B000G0CVOC', '0340768339', 'B00YHV2GVO', '0007455925', '6304560540']
1.0046167373657227s

In [52]:
### Esta funcion recibe la lista de los mas similares a cada producto, y retorna el rating promedio de esos productos
### mas similares. Este rating seria el rating predicho para este producto
import pyspark.sql.functions as F
def prediccion(lista):
    predicho=[]
    for i in range(len(lista)):
        registro=ratings_producto.filter(F.col("asin").isin(lista[i]))
        t = registro.groupBy().agg(F.avg("avg_rating").alias('cnt')).collect()
        predicho.append(t[0].cnt)
    return(predicho)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [53]:
### Los ratings predichos para cada uno de los productos del set de validacion serian
time1 = time.time()
ratings_pred_val = prediccion(mas_afines_son)
print(ratings_pred_val)
print(str(time.time()-time1)+'s')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-53:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 2396



[3.46, 2.6, 4.6, 4.86590909090909, 4.404999999999999, 4.794047619047619, 3.4850000000000003, 4.55, 4.31904761904762, 4.603361344537815, 5.0, 3.736666666666667, 4.251075268817205, 3.866666666666667, 3.8059758771929824, 4.757333333333333, 5.0, 2.8485134371478233, 3.9468376068376068, 4.15, 4.35, 3.9253869969040247, 5.0, 4.377142857142857, 4.2, 3.9814285714285718, 4.8, 4.597105943152455, 4.0, 4.703488372093023, 4.225957918050942, 4.2, 3.5200000000000005, 3.6625, 4.6, 3.8666666666666663, 4.141733120680489, 4.5, 3.7142857142857144, 4.3681196434520135, 4.809523809523809, 4.303207810320781, 4.7043478260869565, 3.883333333333333]
34.138073682785034s

In [67]:
### Mientras que los ratings reales de esos productos son
time1 = time.time()
ratings_reales=[]
for id_es in asin_val:
    registro_es = ratings_producto.filter(ratings_producto.asin.contains(id_es))
    rating_es = registro_es.groupBy().agg(F.avg("avg_rating").alias('cnt')).collect()
    ratings_reales.append(rating_es[0].cnt)
print(ratings_reales)
print(str(time.time()-time1)+'s')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[5.0, 3.0, 4.0, 5.0, 5.0, 5.0, 5.0, 4.5, 3.8, 4.5, 5.0, 5.0, 5.0, 5.0, 4.0, 5.0, 4.857142857142857, 2.573529411764706, 5.0, 4.0, 4.5625, 4.666666666666667, 5.0, 5.0, 5.0, 3.5, 5.0, 5.0, 5.0, 4.8, 4.756756756756757, 5.0, 3.0, 4.333333333333333, 4.75, 5.0, 3.702127659574468, 4.5, 5.0, 2.75, 4.4, 5.0, 5.0, 4.619047619047619]
33.740506649017334s

Exception in thread cell_monitor-67:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 2896



In [68]:
### Calculamos entonces el MSE entre los ratings predichos y los reales
time1 = time.time()
re_es = np.array(ratings_reales)
pr_es = np.array(ratings_pred_val)
rmse = np.sqrt(np.mean((re_es - pr_es)**2))
print('El RMSE del modelo diseñado para cold start en validacion es: ' + str(rmse))
print(str(time.time()-time1)+'s')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

El RMSE del modelo diseñado para cold start en validacion es: 0.7136821332912172
0.00047016143798828125s

In [69]:
### Si hubieramos usado la media global de los ratings de producto, el MSE hubiera sido
### El dataset que se usara es el df_metadata. Descartaremos todos los productos que tengan campos nulos
time1 = time.time()

### Dejar solo los que existan los ratings (solo estos cuentan como productos de nuestro dataset)
df_aux_prom = ratings_producto.join(df_metadata_cl, 'asin', how='inner')

promedio_prod = df_aux_prom.groupBy().agg(F.avg("avg_rating").alias('averag')).collect()


re_es = np.array(ratings_reales)
prbase_es = np.array([promedio_prod[0].averag]*len(ratings_reales))
rmse = np.sqrt(np.mean((re_es - prbase_es)**2))
print('El RMSE de un modelo baseline (promedio general) en validacion es: ' + str(rmse))

print(str(time.time()-time1)+'s')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

El RMSE de un modelo baseline (promedio general) en validacion es: 0.7190592527767478
3.587758779525757s

El valor calculado sería la raíz del error cuadrático medio para los productos que quedaron en el conjunto de validación, los cuales no hacían parte del dataset de productos ya conocidos por el MinHashLSH y por ende esto representaría adecuadamente como sería el uso en producción de este modelo de cold start (recibiría productos que no conoce). 

(Nota: Tener en cuenta que aquí el RMSE se calcula comparando el rating predicho para cada producto con el rating promedio real de cada producto. Se hace con los promedios de los ratings de los productos en vez de con las reviews originales para facilitar los cálculos y los procesamientos. Sin embargo, se podría en una fase más avanzada o en una puesta en producción del modelo considerar incluir otros factores como por ejemplo los sesgos (bias) de los usuarios para mejorar aún más las recomendaciones de este modelo.)

Cómo se puede ver, a pesar de no tener un RMSE tan bueno como los modelos de factorización de matrices con factores latentes de la sección anterior que además incluían Bias, el RMSE obtenido por este modelo es de igual forma decente, por lo que es una buena alternativa para cuando se tenga que dar recomendaciones en el ámbito del problema de cold start (cuando hay productos nuevos, el modelo de la sección anterior no estaría diseñado para sugerir recomendaciones para esos productos nuevos, pero este modelo si podría usarse en esos casos). 

De igual forma, se observa que el modelo solo logra superar levemente al baseline de la media, lo que indica que hay amplias oportunidades de mejora y de trabajo futuro, sin embargo, tener en cuenta que el modelo usado involucra MinHashLSH y lo que busca es encontrar los vecinos más similares de cada producto, en vez de intentar optimizar una métrica (como el rmse) como si lo hacen muchos otros modelos de ML, por lo que no consideramos como un problema el hecho de que el RMSE encontrado aquí no sea muy bueno. Lo que es realmente más importante en este modelo es que los productos que encuentra sí sean muy similares a los productos del conjunto de validación (que harían el papel de productos nuevos). 

Esta capacidad de encontrar productos muy similares al producto nuevo para poder dar buenas recomendaciones en el caso de cold start se aprecia mucho mejor en el siguiente ejemplo.

##### Ejemplo para un producto nuevo cualquiera

Hagamos ahora el ejemplo para un producto nuevo cualquiera, para ver como funcionaría el modelo en producción.

In [57]:

### Metadata de producto nuevo
sentence='Girls pink tutu dress'
price_es = '$19.99'
brand_es = 'Hello Kitty'
category_es = ['Dance', 'Sports & Outdoors', 'Clothing', 'Clothing, Shoes & Jewelry']


### Hallar "n" mas similares para cada producto
sentenceDataFrame = spark.createDataFrame([
(0, sentence)
], ["id", "title"])
char2 = regexTokenizer.transform(sentenceDataFrame)
outp2 = count_model.transform(char2)
outputDataFrame = lsh_model.transform(outp2)
key = outputDataFrame.head().features
similares_parte1 = (lsh_model.approxNearestNeighbors(result, key, n))
similares_parte1.persist()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

: string, words: array<string>, features: vector, hashes: array<vector>, distCol: double]

In [61]:
### Extraigamos de la metadata todas las filas relacionadas a los similares segun LSH
cruzados_sim_1 = similares_parte1.select('asin', 'price', 'title', 'brand', 'category')
cruzados_sim_1.show(truncate=35)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+---------------+-----------------------------------+----------------------+-----------------------------------+
|      asin|          price|                              title|                 brand|                           category|
+----------+---------------+-----------------------------------+----------------------+-----------------------------------+
|B00M66FYZC|$22.00 - $65.00|Little Girls Birthday Cupcake Pi...|        Ella Blu Store|['Clothing, Shoes & Jewelry', 'G...|
|B00G3JICN4|         $29.89|SOPO Hello Kitty Toddler Dress B...|                  SoPo|['Clothing, Shoes & Jewelry', 'B...|
|B00QGMY3EU|$18.60 - $34.99|Clementine Little Girls' Girls T...|    Clementine Apparel|['Clothing, Shoes & Jewelry', 'G...|
|B006PFJMGW|         $29.99|Rare Editions Little Girls' Tutu...|         Rare Editions|['Clothing, Shoes & Jewelry', 'G...|
|0000031909|          $6.94|Mystiqueshapes Girls Ballet Tutu...|        Mystiqueshapes|['Sports & Outdoors', 'Sports & ...|
|0000031

In [62]:
### Listas de asin, precios, marcas y categorias de similares
asin_son = cruzados_sim_1.select("asin").rdd.flatMap(lambda x: x).collect()
price_son = cruzados_sim_1.select("price").rdd.flatMap(lambda x: x).collect()
brand_son = cruzados_sim_1.select("brand").rdd.flatMap(lambda x: x).collect()
category_son = cruzados_sim_1.select("category").rdd.flatMap(lambda x: x).collect()


### Preprocesar los precios: transformacion logaritmica y hallar distancia maxima al precio promedio
import math
precios_son = []
precio_es = math.log(obtiene_precio(price_es), 10)
max_distanc = 0
for p in price_son:
    estep = math.log(obtiene_precio(p), 10)
    precios_son.append(estep)
    dista = abs(precio_es-estep)
    if dista > max_distanc:
        max_distanc = dista

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [63]:
### Ahora, vamos a calcular entonces los score para cada uno de los n vecinos
scores = []
for i in range(len(precios_son)):
    score = 0
    #print(price_son[i])
    #print(category_son[i]) 
    ### Score de marca
    if brand_son[i] == brand_es and brand_es and brand_son[i]:
        score = score+1.0
    #print(score)

    ### Score de categoria (cuantas coinciden, similaridad jaccard)
    if category_es and category_son[i]:
        simi = jaccard_similarity(category_es, eval(category_son[i]))
    #    print(simi)
        score = score + simi
    #print(score)
    
    ### Score de precio
    score = score + (1 - (abs(precio_es-precios_son[i]))/max_distanc)
    
    scores.append(score)

    #print(score)

scores

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[0.44686547727749876, 0.7864036194257403, 1.0087743644900575, 0.7983980114364735, 0.375, 0.4483519406675256, 2.0442880176822786, 1.1799248576206023, 0.7279132507452283, 1.001745205571626, 0.6626532628156876, 0.6526433215161971, 0.526153524763874, 0.7415798296182917, 0.556205667456434]

In [65]:
### Recomendar entonces los m con mayor score
import numpy as np
index_recomendados = np.argsort(scores)[::-1][:m]
asin_rec = list(np.array(asin_son)[index_recomendados])


### Los articulos recomendados segun el score serian entonces estos
recomendaciones_cold_start = cruzados_sim_1.filter(col('asin').isin(asin_rec))
recomendaciones_cold_start.show(truncate=36)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+---------------+------------------------------------+------------------+------------------------------------+
|      asin|          price|                               title|             brand|                            category|
+----------+---------------+------------------------------------+------------------+------------------------------------+
|B00QGMY3EU|$18.60 - $34.99|Clementine Little Girls' Girls Ta...|Clementine Apparel|['Clothing, Shoes & Jewelry', 'Gi...|
|B006PFJMGW|         $29.99|Rare Editions Little Girls' Tutu ...|     Rare Editions|['Clothing, Shoes & Jewelry', 'Gi...|
|B00SH78APA|$10.13 - $22.99|  Hello Kitty Baby Girls' Tutu Dress|       Hello Kitty|['Clothing, Shoes & Jewelry', 'No...|
|B0053WLHEI|         $19.95|Rare Editions Little Girls' Tutu ...|     Rare Editions|['Clothing, Shoes & Jewelry', 'Gi...|
|B00P835VGC|$25.99 - $28.00|Bloch Girls' Toddler Tiffany Dres...|             Bloch|['Clothing, Shoes & Jewelry', 'Gi...|
+----------+------------

La tabla anterior muestra los productos más afines al producto nuevo.

Este ejemplo muestra que efectivamente el modelo logra encontrar los productos más parecidos al producto nuevo, no solo según su título, sino también mirando su precio, marca y categorías. Esto indica que el modelo está funcionando de la forma esperada y cumpliría correctamente su función de enfrentarse con el problema de cold start. Así, para alguien que haya adquirido algunos de los productos que se muestran en la tabla anterior, es buena opcion recomendarle el producto nuevo, del cual usamos solamente su metadata. 