# Ejercicio : Filtrado Colaborativo con Pandas/numpy/pyspark.ml.recommendation

`pandas` y `numpy` son librerías que reúnen muchas herramientas para manejo de dataframe, vectores y matrices, junto a ellas se utilizará la componente `ml.recommendation`

Puedes ejecutar cada una de las celdas de código haciendo click en ellas y presionando `Shift + Enter`. 

También puedes editar cualquiera de estas celdas. Las celdas no son independientes. Es decir, sí importa el orden en el que las ejecutes, y cualquier cambio que hagas se reflejará en las celdas que ejecutes después.


# Filtrado Coalborativo

El filtrado colaborativo es una técnica utilizada por los sistemas de recomendación para solventar los problemas derivados de la sobreinformación que los consumidores sufren en Internet. Esta tendencia crece cada día más, debido a su enorme funcionalidad son más los usuarios que se valen de esta herramienta en sus búsquedas.

`Existen diferentes tipos de filtrado a la hora de establecer las recomendaciones, se pueden clasificar en cuatro:`

- Filtrados basado en contenido: las recomendaciones se hacen según los contenidos que puedan gustar o interesar.

- Filtrados demográficos: se realizan por las características de los usuarios (edad, sexo, estudios…).

- Filtrados colaborativos: las recomendaciones están basadas en las búsquedas con votos positivos de usuarios similares.

- Filtrados híbridos: mezclan los dos o tres de los filtrados anteriores para una mejor experiencia.


`Hay dos enfoques distintos para emplear el filtrado colaborativo:`

- El filtrado basado en la memoria calcula las similitudes entre los productos o los usuarios.

- El filtrado basado en modelos intenta aprender el patrón subyacente que dicta cómo los usuarios califican o interactúan con los elementos.

En este Ejercicio se refuerza los conceptos de _aprendizaje supervisado_ y a mostrar cómo usar `ml.recommendation` que implementa el algoritmo de los mínimos cuadrados alternos (ALS) para entrenar los modelos de nuestro primer Filtrado Colaborativo.

`Mínimos cuadrados` es una técnica de análisis numérico enmarcada dentro de la optimización matemática, en la que, dados un conjunto de pares ordenados —variable independiente, variable dependiente— y una familia de funciones, se intenta encontrar la función continua, dentro de dicha familia, que mejor se aproxime a los datos (un "mejor ajuste"), de acuerdo con el criterio de mínimo error cuadrático.

Cargamos el dataset de películas que podemos obtener en el sitio: https://grouplens.org/datasets/movielens/

![](http://spark-mooc.github.io/web-assets/images/cs110x/movie-camera.png)

Se crea la instancia de spark, utilizando `findspark`, donde podemos comprobar la ruta de donde se carga el spark con `findspark.find()`.

In [None]:
import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext(appName="rating")

from pyspark.sql import SQLContext, SparkSession 
sq = SQLContext(sc)
spark = SparkSession(sc)
findspark.find()

Cargamos los dos archivos proporcionados: `la locación depende de donde se tenga en el equipo` donde tenemos:

Archivo movies:

- `movieId`: identificación de la película

- `title`: nombre de la película

- `genres`: género de la película

Archivo ratings:

- `user_id`: identificación del usuario

- `movieId`: identificación de la película

- `rating`: es la votación para la película de un usuario, está en una escala de 1-10 donde 10 es mejor (solo calificaciones enteras)


In [None]:
from pyspark.sql.types import IntegerType, FloatType, StructType, StructField, StringType
schema = StructType([StructField("movieId", IntegerType()),
                     StructField("title", StringType()),
                     StructField("genres", StringType())])
df_movies = spark.read.csv('file:///C:/movies.csv', header=True, inferSchema = True, schema=schema);
df_movies.show(3)
df_movies.printSchema()

from pyspark.sql.types import IntegerType, StructType, StructField, StringType
schema_gender = StructType([StructField("user_id", IntegerType()),
                            StructField("movieId", IntegerType()),
                             StructField("rating", FloatType())])
df_rating = spark.read.csv('file:///C:/ratings.csv', header=True, inferSchema = True, schema=schema_gender);
df_rating.show(3)
df_rating.printSchema()

ratingsCount = df_rating.count()
moviesCount = df_movies.count()

print('hay {0} clasificaciones y {1} películas en datasets.'.format(ratingsCount, moviesCount))

# Normalización:

El método que se utilizará para normalizar es el `MinMaxScaler` de la componente `pyspark.ml.feature`. Este método estadisticamente realiza la normalización de la siguiente manera:

`MinMaxScaler` transforma un conjunto de datos de filas de vectores, normalizando cada característica individualmente de modo que esté en el rango dado. Toma parámetros:

1.- `min`: 0.0 por defecto. Límite inferior después de la transformación, compartido por todas las características.

2.- `max`: 1.0 por defecto. Límite superior después de la transformación, compartido por todas las características.


Nota: `Para utilizar los escalamientos debemos realizar las trasnformaciones de variables a vectores y uan vez escalado volvemos a dejar la variable como antes para continuar nuestra ejecución.`

In [None]:
from pyspark.ml.feature import VectorAssembler

vector_features = VectorAssembler(inputCols=["rating"], outputCol="features")
df_rating_vec = vector_features.transform(df_rating)
print(df_rating_vec)

In [None]:
from pyspark.ml.feature import MinMaxScaler

standardizer = MinMaxScaler(min=0.0, max=1.0,
                              inputCol='features',
                              outputCol='norm_rating')
modelstd = standardizer.fit(df_rating_vec)
df_rating_norm = modelstd.transform(df_rating_vec)

In [None]:
from pyspark.sql import functions as f

firstelement=f.udf(lambda v:float(v[0]),FloatType())
df_rating_norm = df_rating_norm.withColumn("norm_rating", firstelement("norm_rating"))
print(df_rating_norm.show(5))

# Graficamos la distribución de rating:

Realizamos el gráfico mediante `violin plot` para ver la distribución de los 10.000 primeras marcas de rating

Dato: El `violin plot` combina la funcionalidad del box plot -mostrando los cuartiles de la distribución- y la de la estimación de densidad kernel.

https://seaborn.pydata.org/generated/seaborn.violinplot.html

In [None]:
import pandas as pd
import seaborn as sns
# Convert Spark dataframe to Pandas to plot data distribution
pandas_df = df_rating_norm.limit(100000).toPandas()
sns.violinplot([pandas_df.norm_rating])

# Set de entranamiento, validación y pruebas

Para nuestro caso crearemos un set de entranamiento, de validación y de pruebas, donde:

- Training set (DataFrame), se utilizará para entrenar el modelo

- Validation set (DataFrame), se utilizará para elegir el mejor modelo, validando lo obtenido en el entrenamiento.

- Test set (DataFrame), Se utilizará para ejecutar nuestro entrenamiento.

Entonces:

In [None]:
seed=42
(trainingDFT, testDF) = df_rating_norm.randomSplit([0.8, 0.2], seed=seed)
(trainingDF, valDF) = trainingDFT.randomSplit([0.8, 0.2], seed=seed)

print('Training: {0}, val: {1}, test: {2}'.format(trainingDF.count(), valDF.count(), testDF.count()))

# Entrenamiento

Utilizaremos el filtrado colaborativo basado en modelos:

![](https://courses.edx.org/c4x/BerkeleyX/CS100.1x/asset/Collaborative_filtering.gif)

Para entrenar el modelo utilizaremos la componente pyspark.ml.recommendation import ALS, la definición de la misma en la figura:

![](http://spark-mooc.github.io/web-assets/images/matrix_factorization.png)

# Selección del modelo

El proceso de selección del modelo se realizará a través del análisis de validación cruzada manual y en base a la componente `pyspark.ml.tuning import CrossValidator` (cross-validation) con ajuste automático de hiperparámetros. 
Este ajuste se hace definiendo los posibles valores de los hiperparámetros del modelo y ejecutando una búsqueda en rejilla (grid-search) sobre éstas, para comparar la respuesta de los modelos resultantes y finalmente obtener el óptimo. Los hiperparámetros del modelo ALS son:

- rank = la cantidad de factores latentes en el modelo (4, 8 y 12 como valores seleccionados)
- maxIter = el número máximo de iteraciones (valor predeterminado)
- regParam = el parámetro de regularización (0.1, 0.05 y 0.01 como valores seleccionados)

Utilizaremos como medida a mse (mean squared error), entonces el modelo de menor mse será el elegido.

`Mean squared error (MSE):` En estadística, el error cuadrático medio (ECM) de un estimador mide el promedio de los errores al cuadrado, es decir, la diferencia entre el estimador y lo que se estima. El ECM es una función de riesgo, correspondiente al valor esperado de la pérdida del error al cuadrado o pérdida cuadrática. La diferencia se produce debido a la aleatoriedad o porque el estimador no tiene en cuenta la información que podría producir una estimación más precisa.

![](https://wikimedia.org/api/rest_v1/media/math/render/svg/53ab02a5a1847aa3ff5c6eb69b4023bfb73655f5)

In [None]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
import numpy as np

# Let's initialize our ALS learner
als = ALS()
als.setNonnegative
# Now we set the parameters for the method
als.setMaxIter(5)\
   .setSeed(10)\
   .setUserCol("user_id")\
   .setItemCol("movieId")\
   .setRatingCol("norm_rating")\
   .setPredictionCol("predictions")\
   .setColdStartStrategy("drop")\
   .setNonnegative=True

regEval = RegressionEvaluator(predictionCol="predictions", labelCol="norm_rating", metricName="mse")

ranks = [4, 8, 12]
errors = [0, 0, 0]
models = np.matrix(np.zeros(shape=(3,3), dtype=list))
regparams = [0.1, 0.05, 0.01]
err = 0
min_error = float('inf')
best_rank = -1
for rank in ranks:
  # Set the rank here:
  als.setRank(rank)
  reg = 0
  for regparam in regparams:
      als.setRegParam(regparam)
      # Create the model with these parameters.
      model = als.fit(trainingDF)
      # Run the model to create a prediction. Predict against the validation_df.
      predict_df = model.transform(valDF)

      # Remove NaN values from prediction (due to SPARK-14489)
      predicted_ratings_df = predict_df.filter(predict_df.predictions != float('nan'))

      # Run the previously created RMSE evaluator, reg_eval, on the predicted_ratings_df DataFrame
      error = regEval.evaluate(predicted_ratings_df)
      errors[err] = error
      models[err,reg] = model
      print("For rank %s and Regularizaión %s the RMSE is %s",(rank, regparam, error))
      if error < min_error:
        min_error = error
        best_rank = err
        best_regp = reg
      reg += 1
  err += 1

als.setRank(ranks[best_rank])
als.setRegParam(regparams[best_regp])
print("The best model was trained with rank %s y regularización %s y error mínimo es %s" % (ranks[best_rank], regparams[best_regp], min_error))
my_model = models[best_rank,best_regp]

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

alscv = ALS()
alscv.setMaxIter(5)\
     .setUserCol("user_id")\
     .setItemCol("movieId")\
     .setRatingCol("norm_rating")\
     .setPredictionCol("predictions")\
     .setColdStartStrategy("drop")\
     .setNonnegative=True

paramGrid = ParamGridBuilder().addGrid(alscv.regParam, [0.1, 0.05, 0.01]).addGrid(alscv.rank, [4, 8, 12]).build()

modelEvaluator = RegressionEvaluator(predictionCol="predictions", labelCol="norm_rating", metricName="mse")

crossval = CrossValidator(estimator=alscv,
                          estimatorParamMaps=paramGrid,
                          evaluator=modelEvaluator,
                          numFolds=5)

cvModel = crossval.fit(trainingDF)
best_als_model = cvModel.bestModel
predict_dfcv = best_als_model.transform(valDF)
# Remove NaN values from prediction (due to SPARK-14489)
predicted_ratings_df = predict_df.filter(predict_df.predictions != float('nan'))

# Run the previously created RMSE evaluator, reg_eval, on the predicted_ratings_df DataFrame
errorcv = modelEvaluator.evaluate(predicted_ratings_df)

print("Best number of latent factors (rank parameter): " + str(best_als_model.rank))
print("Best value of regularization factor: " + str(best_als_model._java_obj.parent().getRegParam()))
print("Max Iterations: " + str(best_als_model._java_obj.parent().getMaxIter()))
print("Error mejor modelo: %s",errorcv)

In [None]:
## Validación de cual es el mejor modelo

if min_error < errorcv:
    model_final = best_als_model
    print("Mejor modelo de cross validation manual",  model_final)
else:
    model_final = my_model
    print("Mejor modelo de cross validation Componente",  model_final)



# Evaluación de medidas

Para validar efectividad del modelo también podemos obtener otras medidas que pueden ser útiles para comparar con la nuestra con el objetivo de validar la minimización del error en nuestro modelo, estas son:

`Root Mean Square Error (RMSE):` es una forma estándar de medir el error de un modelo en la predicción de datos cuantitativos. Formalmente se define de la siguiente manera:

![](https://miro.medium.com/max/966/1*lqDsPkfXPGen32Uem1PTNg.png)


`Mean Absolute Error (MAE):` mide la magnitud promedio de los errores en un conjunto de predicciones, sin considerar su dirección. Es el promedio sobre la muestra de prueba de las diferencias absolutas entre la predicción y la observación real donde todas las diferencias individuales tienen el mismo peso.

![](https://miro.medium.com/max/630/1*OVlFLnMwHDx08PHzqlBDag.gif)


`r² metric:` en estadística, el coeficiente de determinación, denominado R² y pronunciado R cuadrado, es un estadístico usado en el contexto de un modelo estadístico cuyo principal propósito es predecir futuros resultados o probar una hipótesis. El coeficiente determina la calidad del modelo para replicar los resultados, y la proporción de variación de los resultados que puede explicarse por el modelo

![](https://wikimedia.org/api/rest_v1/media/math/render/svg/999c9c8c995140c586aff2c9342301846011d44f)

https://es.wikipedia.org/wiki/Coeficiente_de_determinaci%C3%B3n

In [None]:
predict_dfT = model_final.transform(testDF)

def medidas_evaluation(predictions):
    # Model evaluation in test - ratings regression evaluation
    print("Model evaluation on test data:")
    predictions = predictions.na.drop()
    # RMSE
    rmse_evaluator = RegressionEvaluator(metricName="rmse", labelCol="norm_rating", predictionCol="predictions")
    rmse = rmse_evaluator.evaluate(predictions)
    print("Root-mean-square error (RMSE) = " + str(rmse))
    # R2
    r2_evaluator = RegressionEvaluator(metricName="r2", labelCol="norm_rating", predictionCol="predictions")
    r2 = r2_evaluator.evaluate(predictions)
    print("r² metric = " + str(r2))
    # MAE
    mae_evaluator = RegressionEvaluator(metricName="mae", labelCol="norm_rating", predictionCol="predictions")
    mae = mae_evaluator.evaluate(predictions)
    print("Mean Absolute Error (MAE) = " + str(mae))

    return [rmse, r2, mae]

random_test_eval = medidas_evaluation(predict_dfT)



# Recomendaciones

Finalmente con el modelo ya entrenado podemos validar las recomendaciones, para ello crearemos una función que nos permitirá realizar:

`Recomendación por Usuarios`: Ser realizará una recomendación por una cantidad de usuarios.

`Recomendación de películas para usuarios`: Se realizará una recomendación de película para un grupo de usuarios

`Recomendación para un usuario en particular`: Se realizará una recomendación de películas para un usuario en particular

`Recomendación de película en particular para usuarios`: Se realizará recomendación de 1 película en particular para usuarios 

In [None]:
##Mostrar recomendación es una función que retorna el despliegue de datos recomendados por identificación.
##Variables de entrada son:
## Modelo: Modelo entrenado
## df_2: es el dataframe complemento, que no se entreno pero que nos entrega información.
## ident: Es el identificador de lo que queremos desplegar (columna de la predicción)
## Cantidad: Corresponde a la cantidad de datos que desplegará.
## Cantidad_recomendacion: Corresponde a la cantidad de recomendaciones por dato.
## tipo: Si la recomendación será por "Usuario" o por "película"
## variable: Corresponde a un dato opcional, que se ingresa cuando queremos recomendar para un dato en particular o película 
##o usuario

def mostrar_recomendacion(modelo, df_2, ident, cantidad, cantidad_recomendacion, tipo, variable = 0):
    datodf = df_2.toPandas()
    if tipo == 'Usuario':
        arrayrecomend = modelo.recommendForAllUsers(cantidad_recomendacion)
        arrayuser = modelo.recommendForAllUsers(cantidad_recomendacion).select(ident)
        if variable > 0:
            recomend = np.array(arrayrecomend.select('recommendations').where(arrayrecomend[ident] == variable).collect(), dtype=int)
            dato = np.array(arrayuser.where(arrayuser[ident] == variable).collect(), dtype=int)
        else:
            recomend = np.array(arrayrecomend.select('recommendations').collect(), dtype=int)
            dato = np.array(arrayuser.collect(), dtype=int)   
        
        recomend_data  = recomend[:, 0, :, 0]
        matrizrecomendacion = np.append(dato,recomend_data, axis = 1)
        #print(matrizrecomendacion)
        nombre = 'Recomendacion'
        recomendacion = pd.DataFrame({tipo: matrizrecomendacion[:cantidad, 0]})
        for j in range(1, cantidad_recomendacion+1):
            pelicula = []
            for i in range(cantidad):
                pelicula.append(datodf['title'][datodf['movieId']==matrizrecomendacion[i, j]].iloc[0])

            recomendacion[nombre+str(j)] = pelicula
    else:
        arrayrecomend = modelo.recommendForAllItems(cantidad_recomendacion)
        arrayuser = modelo.recommendForAllItems(cantidad_recomendacion).select(ident)
        if variable > 0:
            recomend = np.array(arrayrecomend.select('recommendations').where(arrayrecomend[ident] == variable).collect(), dtype=int)
            dato = np.array(arrayuser.where(arrayuser[ident] == variable).collect(), dtype=int)
        else:
            recomend = np.array(arrayrecomend.select('recommendations').collect(), dtype=int)
            dato = np.array(arrayuser.collect(), dtype=int)
        
        recomend_data  = recomend[:, 0, :, 0]
        nombre = 'Recomendacion'
        recomendacion = pd.DataFrame({tipo: dato[:cantidad, 0]})
        init =0
        for i in range(init, cantidad):
            if i == init:
                recomendacion["Nombre"] = None
            recomendacion["Nombre"].loc[i] = datodf['title'][datodf['movieId']==int(dato[i])].iloc[0]
            for j in range(cantidad_recomendacion):
                if i == init:
                    recomendacion[nombre+str(j+1)] = None
                recomendacion[nombre+str(j+1)].loc[i] = recomend_data[i, j]
   
    return recomendacion

`Recomendación por Usuarios`, se realizará recomendación de 5 películas para 5 usuarios (se puede variar el número)

In [None]:
from IPython.display import display
despliegue = mostrar_recomendacion(model_final, df_movies, 'user_id', 5, 5, 'Usuario')
display(despliegue)

`Recomendación de películas para usuarios`: Se realizarán recomendación de para 5 usuarios para cada película

In [None]:
despliegue2 = mostrar_recomendacion(model_final, df_movies, 'movieId', 3, 5, 'Pelicula')
display(despliegue2)

`Recomendación para un usuario en particular`: Se recomendará 3 películas para el usuario 471

In [None]:
despliegue3 = mostrar_recomendacion(model_final, df_movies, 'user_id', 1, 5, 'Usuario', 471)
display(despliegue3)

`Recomendación de película en particular para usuarios`: Se recomendará la película `Out Cold (2001)` para 4 usuarios

In [None]:
despliegue4 = mostrar_recomendacion(model_final, df_movies, 'movieId', 1, 5, 'Pelicula', 4900)
display(despliegue4)

# Conclusión

Los algoritmos de filtrado colaborativo nos ayudan a generar recomendaciones a personas de acuerdo a sus gustos o preferencias de personas con gustos similares. Esto es de vital importancia para el negocio, dado que podemos establecer con precisión lo que necesita un usuario y por consiguiente mayor posibilidad de captar una venta.

# Referencias

Filtrado Colaborativo definición: https://es.wikipedia.org/wiki/Filtrado_colaborativo

Filtrado Colaborativo definición y contexto: https://www.iebschool.com/blog/filtrado-colaborativo-sirve-e-commerce/

Filtrado Colaborativo con pyspark: https://medium.com/datos-y-ciencia/intro-als-pyspark-7de7f3ba3b0a

Componente filtrado Colaborativo pyspark: http://spark.apache.org/docs/latest/ml-collaborative-filtering.html

k-nereast Neightbord en python: https://www.aprendemachinelearning.com/clasificar-con-k-nearest-neighbor-ejemplo-en-python/

k-nereast Neightbord aproximación en pyspark: https://spark.apache.org/docs/2.2.0/ml-features.html#approximate-nearest-neighbor-search

Ejemplo: https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/2799933550853697/2823893187441173/2202577924924539/latest.html


In [None]:
sc.stop()