# Tarea Big Data 
------------------------------------
    
## Sistema de Recomendación (Filtrado Colaborativo)
### Rubén Giménez Martín
**- Evaluar el modelo de filtrado colaborativo sobre el dataset de películas (Movies)**

**- Escribir una implementación del mismo para Spark**

###  Introducción

En el siguiente ejercicio se mostrará la implementación, a través de Spark, de un Sistema de Recomendación de tipo filtrado colaborativo.

Se conoce como sistema de recomendación a aquel sistema de filtrado de información que permita predecir las preferencias de determinados usuarios con el objetivo de ofrecerles productos o servicios más afines a dichas preferencias. Estos sistemas son comúnmente utilizados en Internet con multitud de aplicaciones y objetivos, desde aumentar los ratios de conversión en negocios de comercio electrónico hasta mejorar la personalización de determinados sitios web o incrementar la satisfacción de sus usuarios. 

Comúnmente se definen tres fases en un sistema de recomendación:

1.	Captura de las preferencias de los usuarios

     - Preferencias Implícitas: La opinión del usuario es inferida a partir del uso del sitio web o de la app. Como, por ejemplo, número de visitas, número de clics, etc.

    - Preferencias Explícitas: El usuario valora los servicios o los productos de manera consciente. Por ejemplo, los likes y dislikes, o las valoraciones por puntación.  

2.	Extracción de conocimiento. El sistema interpreta la información con el objetivo de ser capaz de predecir las preferencias.

3.	Fase de predicción.

En el caso de este ejercicio, el conjunto de datos proporcionado consta de preferencias explícitas de los usuarios, es decir, de valoraciones o *ratings*. En estos casos, el objetivo de los algorítmos de recomendación es predecir los datos faltanttes de la matriz original de *ratings*, la cual presentará, como se puede ver en la imágen inferior, un alto número de datos faltantes, ya que es extremadamente difícil encontrar un usuario que haya valorado todos los ítems de la matriz.

En este ejercicio, se utilizará el algorítmo ALS (alternating least squares) como sistema de recomendación. Este algortimo interpreta a los usuarios y a los ítems como un pequeño conjunto de factores latentes con los que, mediante la factorización de matrices, es capaz de predecir los registros faltantes, siempre y cuando haya, al menos, un dato por cada fila y cada columna.

![Matriz original de ratings y factorización de la matriz](http://katbailey.github.io/images/matrix_factorization.png)

### Implementación del algorítmo

**Librerías**

In [27]:
# Importing needed Packages
import numpy as np
import pandas as pd

from pyspark import SparkContext
from pyspark.sql import SQLContext, SparkSession

from pyspark.sql.functions import *
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

from time import time
from sklearn.metrics import mean_squared_error
from math import sqrt

# setting a Seed
SEED = 1917
np.random.seed(SEED)

**Funciones utilizadas**

In [2]:
def rename_columns(data, old_cols, new_cols):
    for old, new in zip(old_cols, new_cols):
        data = data.withColumnRenamed(old, new)
    return data

Inicialización de SparkContext y la sesión en Spark

In [3]:
sc = SparkContext()
sql = SQLContext(sc)
spark = SparkSession.builder.appName("recommendation").getOrCreate()

### Tratamiento Inicial de los datos

**Importación de los 3 conjuntos de datos**

Se importan y renombran correctamente las columnas de los conjuntos de datos

In [4]:
# Importing the 3 dataframes Ratings, Movies & USERS
# "::" changed previously to commas ","
ratings = sql.read.format("csv").options(header="false", inferSchema="true", delimiter = ",").load("data/ratings.dat")
movies = sql.read.format("csv").options(header="false", inferSchema="true", delimiter = ",").load("data/movies.dat")
users = sql.read.format("csv").options(header="false", inferSchema="true", delimiter = ",").load("data/users.dat")

# Column names must be fixed. According to README:
ratings_colnames_old = ratings.columns
movies_colnames_old = movies.columns
users_colnames_old = users.columns
# new
ratings_colnames = ["UserID","MovieID","Rating","Timestamp"]
movies_colnames = ["MovieID","Title","Genres"]
users_colnames = ["UserID","Gender","Age","Occupation","Zipcode"]

ratings_new = rename_columns(data = ratings, old_cols = ratings_colnames_old, new_cols = ratings_colnames)
movies_new= rename_columns(data = movies, old_cols = movies_colnames_old, new_cols = movies_colnames)
users_new = rename_columns(data = users, old_cols = users_colnames_old, new_cols = users_colnames)

ratings = ratings_new
movies = movies_new
users = users_new
del(ratings_new, movies_new, users_new)

Se muestran los 5 primeros registros de cada dataframe

In [5]:
print("\033[1mRatings DataFrame\033[0m")
ratings.show(5)
print("\033[1mMovies DataFrame\033[0m")
movies.show(5)
print("\033[1mUsers DataFrame\033[0m")
users.show(5)

[1mRatings DataFrame[0m
+------+-------+------+---------+
|UserID|MovieID|Rating|Timestamp|
+------+-------+------+---------+
|     1|   1193|     5|978300760|
|     1|    661|     3|978302109|
|     1|    914|     3|978301968|
|     1|   3408|     4|978300275|
|     1|   2355|     5|978824291|
+------+-------+------+---------+
only showing top 5 rows

[1mMovies DataFrame[0m
+-------+--------------------+--------------------+
|MovieID|               Title|              Genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Animation|Childre...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|        Comedy|Drama|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows

[1mUsers DataFrame[0m
+------+------+---+----------+-------+
|UserID|Gender|Age|Occupation|Zipcode|
+------+------+---+----

Se retira la variable `Timestamp` del DataFrame `ratings` dado que no será utilizada.

In [5]:
ratings = ratings.drop("Timestamp")

### Breve exploración y análisis de los datos

Para comenzar, se comprueba la escasez (**sparsity**) de datos de la matriz original de *ratings*.

La fórmula con la que esta escasez calculada se define de la siguiente manera:

$$Sparsity=\frac{\text{Número de Ratings en la Matriz Original}}{\text{Número de Usuarios}\thinspace X \thinspace \text{Número de Items}}$$

In [7]:
rating_count = ratings.count()
users_distinct = ratings.select("UserID").distinct().count()
movies_distinct = ratings.select("MovieID").distinct().count()

sparsity = 1 - (rating_count*1.0 / (users_distinct*movies_distinct))
print ("Sparsity:",  np.round(sparsity, 2)*100, "%")


Sparsity: 96.0 %


Se comprueba que cerca del 96% de las celdas de la matriz original de *ratings* pertenecen a registros faltantes. Sin embargo, el algoritmo, siempre y cuando haya un registro por cada columna y fila, debe ser capaz de inferir todos los valores.

In [7]:
print("\033[1mNumber of rows of the whole DataFrame:\033[0m",ratings.count(), "rows\n")

print("\033[1mMain statistics of the Rating variable:\033[0m")
ratings.select([mean("Rating").alias("Average"), stddev("Rating").alias("standard Deviation"), min("Rating").alias("Min Value"), max("Rating").alias("Max Value")]).show()

print("\033[1mMain statistics of the Age variable:\033[0m")
users.select([mean("Age").alias("Average"), stddev("Age").alias("standard Deviation"), min("Age").alias("Min Value"), max("Age").alias("Max Value")]).show()

print("\033[1mMain statistics of the Occupation variable:\033[0m")
users.select([mean("Occupation").alias("Average"), stddev("Occupation").alias("standard Deviation"), min("Occupation").alias("Min Value"), max("Occupation").alias("Max Value")]).show()

print("\033[1mNumber of Males (M) & Females (F):\033[0m")
# remember 1 in Age means under 18
users.groupBy("Gender").count().show()

print("\033[1mNAs per column in each DataFrame\033[0m")
print("\033[1mRatings:\033[0m")
ratings.select([count(when(isnan(c), c)).alias(c) for c in ratings.columns]).show()
print("\033[1mMovies:\033[0m")
movies.select([count(when(isnan(c), c)).alias(c) for c in movies.columns]).show()
print("\033[1mUsers:\033[0m")
users.select([count(when(isnan(c), c)).alias(c) for c in users.columns]).show()


[1mNumber of rows of the whole DataFrame:[0m 1000209 rows

[1mMain statistics of the Rating variable:[0m
+-----------------+------------------+---------+---------+
|          Average|standard Deviation|Min Value|Max Value|
+-----------------+------------------+---------+---------+
|3.581564453029317|1.1171018453732595|        1|        5|
+-----------------+------------------+---------+---------+

[1mMain statistics of the Age variable:[0m
+------------------+------------------+---------+---------+
|           Average|standard Deviation|Min Value|Max Value|
+------------------+------------------+---------+---------+
|30.639238410596025|12.895961726906837|        1|       56|
+------------------+------------------+---------+---------+

[1mMain statistics of the Occupation variable:[0m
+-----------------+------------------+---------+---------+
|          Average|standard Deviation|Min Value|Max Value|
+-----------------+------------------+---------+---------+
|8.146854304635761| 

## Entrenamiento del Algoritmo ALS

Se recurda que el algoritmo ALS implementado en [Spark](https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html) cuenta con los siguientes  parámetros:

+ *numBlocks is the number of blocks used to parallelize computation (set to -1 to auto-configure).*

+ *rank is the number of features to use (also referred to as the number of latent factors).*

+ *iterations is the number of iterations of ALS to run. ALS typically converges to a reasonable solution in 20 iterations or less.*

+ *lambda specifies the regularization parameter in ALS.*

+ *implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data. alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations.*


Dado que este ejercicio se trata de una demostración de funcionamiento, se entrenará el algoritmo únicamente con un 20% aleatorio de los datos originales con el objetivo de reducir el tiempo de computación.  

In [6]:
(ratings, raw) = ratings.randomSplit([0.2, 0.8], seed = SEED)
del(raw)
ratings.count()

200166

**Partición del conjunto de train y test**

In [7]:
(training, test) = ratings.randomSplit([0.7, 0.3], seed = SEED)
training.count()
print("\033[1mNumber of rows of the training set:\033[0m",training.count(), "rows\n")
print("\033[1mNumber of rows of the test set:\033[0m",test.count(), "rows\n")

[1mNumber of rows of the training set:[0m 139988 rows

[1mNumber of rows of the test set:[0m 60178 rows



**Definición del algoritmo y sus parámetros**

Se tratará de encontrar el mejor modelo posible con los siguientes parámetros:

In [8]:
# Algorithm properties & hyperparameters
als = ALS(userCol="UserID",
          itemCol="MovieID",
          ratingCol="Rating",
          coldStartStrategy="drop",
          nonnegative = True,
          implicitPrefs = False)

param_grid = ParamGridBuilder()\
            .addGrid(als.rank, [5, 15, 20])\
            .addGrid(als.maxIter, [5, 10, 20])\
            .addGrid(als.regParam, [.05, 0.1, 1.5]).build()

evaluator = RegressionEvaluator(metricName="rmse", labelCol="Rating", predictionCol="prediction")

cv = CrossValidator(estimator = als,
                    estimatorParamMaps = param_grid,
                    evaluator = evaluator,
                    numFolds = 5)

**Fase de Entrenamiento**

In [9]:
t0 = time()
model = cv.fit(training)
t1 = time()
print("Time Training: %.2g sec" % (t1 - t0))

Time Training: 4.3e+03 sec


Extracción del mejor modelo y sus parámetros:


In [10]:
best_model = model.bestModel
print ("\033[1mBest Model\033[0m")
print("--------------------")
print ("Rank: ", best_model.rank )
print ("MaxIter: ", best_model._java_obj.parent().getMaxIter())
print ("RegParam: ", best_model._java_obj.parent().getRegParam())

[1mBest Model[0m
--------------------
Rank:  5
MaxIter:  20
RegParam:  0.1


**Predicción**

In [11]:
#predictions: 
predictions = best_model.transform(test)

#rmse
rmse = evaluator.evaluate(predictions)
print ("\033[1mRMSE:\033[0m", rmse)

[1mRMSE:[0m 0.9742822165994155


Se muestra a continuación una muestra de las predicciones realizadas con el modelo:

In [12]:
predictions.show(10)

+------+-------+------+----------+
|UserID|MovieID|Rating|prediction|
+------+-------+------+----------+
|  2383|    148|     2| 3.7267098|
|   319|    463|     2| 2.6483166|
|  5306|    463|     2| 2.7176168|
|  3709|    463|     3| 3.1942973|
|   524|    463|     3| 2.2999346|
|  3224|    463|     3|  2.689817|
|  5222|    471|     4| 2.5919006|
|   406|    471|     2| 3.0395594|
|  4446|    471|     5| 3.0319963|
|  5841|    471|     3| 3.6752293|
+------+-------+------+----------+
only showing top 10 rows



A continuación, mediante un join, se une el DataFrame que contiene las predicciones con los DataFrames `movies`y `users`con el objetivo de observar y entender mejor los datos.

In [13]:
main_df = predictions.join(movies, ["MovieID"], "left")
main_df = main_df.join(users, ["UserID"], "left")
main_df.show()

+------+-------+------+----------+--------------------+--------------------+------+---+----------+-------+
|UserID|MovieID|Rating|prediction|               Title|              Genres|Gender|Age|Occupation|Zipcode|
+------+-------+------+----------+--------------------+--------------------+------+---+----------+-------+
|  2383|    148|     2| 3.7267098|Awfully Big Adven...|           An (1995)|     F| 25|        14|  95125|
|   319|    463|     2| 2.6483166|Guilty as Sin (1993)|Crime|Drama|Thriller|     F| 50|         6|  33436|
|  5306|    463|     2| 2.7176168|Guilty as Sin (1993)|Crime|Drama|Thriller|     F| 25|        17|  92121|
|  3709|    463|     3| 3.1942973|Guilty as Sin (1993)|Crime|Drama|Thriller|     M| 25|        12|  89502|
|   524|    463|     3| 2.2999346|Guilty as Sin (1993)|Crime|Drama|Thriller|     M| 18|         0|  91320|
|  3224|    463|     3|  2.689817|Guilty as Sin (1993)|Crime|Drama|Thriller|     F| 25|        14|  93428|
|  5222|    471|     4| 2.5919006|   

Como ejemplo final, se observan las predicciones para un usuario determinado, transformando el DataFrame a un Pandas DataFrame. En este caso, para el usuario 5841:

In [34]:
main_df = main_df.toPandas()
main_df[main_df.UserID == 5841]

Unnamed: 0,UserID,MovieID,Rating,prediction,Title,Genres,Gender,Age,Occupation,Zipcode
9,5841,471,3,3.675229,Hudsucker Proxy,The (1994),F,35,7,10024
1999,5841,2392,2,2.526871,Jack Frost (1998),Comedy|Drama,F,35,7,10024
4742,5841,1031,5,3.402708,Bedknobs and Broomsticks (1971),Adventure|Children's|Musical,F,35,7,10024
6042,5841,2240,5,3.319477,My Bodyguard (1980),Drama,F,35,7,10024
12695,5841,906,5,3.858044,Gaslight (1944),Mystery|Thriller,F,35,7,10024
13754,5841,3798,3,3.916767,What Lies Beneath (2000),Thriller,F,35,7,10024
16500,5841,1380,3,3.239907,Grease (1978),Comedy|Musical|Romance,F,35,7,10024
27416,5841,586,5,3.019213,Home Alone (1990),Children's|Comedy,F,35,7,10024
28010,5841,3620,2,2.773004,Myth of Fingerprints,The (1997),F,35,7,10024
29338,5841,838,5,4.049261,Emma (1996),Comedy|Drama|Romance,F,35,7,10024
