## Recomendador en Python

#### Kevin Craig Alisauskas


Importamos librerías y cargamos pyspark (modificar como sea necesario según el pc).

In [1]:
import numpy as np
import pandas as pd
import warnings
warnings.filterwarnings('ignore')

import findspark
findspark.init('/home/kubote/spark/spark-2.4.1-bin-hadoop2.7/')

import pyspark
from pyspark.sql import Row

try:
    sc.stop()
    print("Stopped and restarted")
except:
    print("Nothing to stop")

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession

sc = SparkContext(master = "local[*]")
sqlContext = SQLContext(sc)

spark = SparkSession.builder.getOrCreate()

Nothing to stop


Importamos los datos

In [2]:
# Leemos las filas del dataset una por una
lines = spark.read.text("./Datos/ratings.dat").rdd
lines2 = spark.read.text("./Datos/movies.dat").rdd

# Dividimos las filas en columnas.
parts = lines.map(lambda row: row.value.split("::"))
parts2 = lines2.map(lambda row: row.value.split("::"))

# Definimos cada variable con su tipo y nombre
ratingsRDD = parts.map(lambda p: Row(UserID = int(p[0]), MovieID = int(p[1]), Rating = float(p[2]), Timestamp = int(p[3])))
moviesRDD = parts2.map(lambda p: Row(MovieID = int(p[0]), Title = str(p[1]), Genres = str(p[2])))

# Lo transformamos en Dataframe.
ratings = spark.createDataFrame(ratingsRDD)
ratings = ratings.drop('Timestamp')

movies = spark.createDataFrame(moviesRDD)

En la implementación en Python realizamos dos pasos, una selección de usuarios y películas posibles por Nearest Neighbors y una selección de películas por la correlación entre las posibles y las vistas por el usuario a recomendar. 
En esta ocasión, existe bastante documentación sobre el filtrado colaborativo en pyspark utilizando la función ALS (Alternating Least Squares), sin embargo, la implementación es muy sencilla y se limita a cargar los datos y ejecutar ALS, indicando las columnas de Item, Usuarios y Ratings, con la posibilidad de mejorar los parámetros del modelo para aumentar su rendimiento. Dada la sencillez de ese método, he decidido adaptar de la mejor manera posible el método utilizado en la implementación en Python, que procedo a explicar paso a paso junto con el código, ya que he tenido que enfrentar algunos problemas.

Para empezar, guardamos los valores únicos de películas y usuarios.

In [3]:
distinct_movies = ratings.select('MovieID').distinct().count()
distinct_users = ratings.select('UserID').distinct().count()

Creamos una tabla de contingencia para obtener posibles usuarios similares según las películas vistas por ambos.

In [4]:
crossed = ratings.crosstab("UserID", "MovieID")

# Necesitaremos los id de los usuarios posteriormente.
users_total_id = [int(row['UserID_MovieID']) for row in crossed.select('UserID_MovieID').collect()]

Comprobamos que la tabla tenga las dimensiones correctas.

In [5]:
cierto = True if distinct_users == crossed.count() else False
print("El número de filas de la tabla de contingencia es correcto: ", cierto)

cierto = True if distinct_movies == len(crossed.columns) - 1 else False
print("El número de columnas de la tabla de contingencia es correcto: ", cierto)

El número de filas de la tabla de contingencia es correcto:  True
El número de columnas de la tabla de contingencia es correcto:  True


Ahora bien, puestos a utilizar Nearest Neighbors como en Python, nos encotramos con que no existe en Spark. Así pues, vamos a usar la tabla de contingencia para obtener un KMeans de 100 clústers, de esa forma encontraremos como pareja del seleccionado por una variable user_id_position (posición en la lista de usuarios) al perteneciente a su mismo cluster.

In [6]:
user_id_position = 1

# Número de clústers
n_clus = 100

In [7]:
# Transformamos la matriz para el KMeans.
from pyspark.ml.feature import VectorAssembler

vec_assembler = VectorAssembler(inputCols = crossed.columns[1:], outputCol='features')
kmeans_data = vec_assembler.transform(crossed)

# Ajustamos el kmeans.
from pyspark.ml.clustering import KMeans

kmeans = KMeans(featuresCol='features', k = n_clus)
model = kmeans.fit(kmeans_data)

# Obtenemos las predicciones (esto añade una columna "prediction" al dataframe).
predictions = model.transform(kmeans_data)

Buscamos el clúster al que pertenece nuestro usuario en la posición 1 (0 en python, el primero) y lo mostramos.

In [8]:
user_id = users_total_id[user_id_position]

cluster_id = predictions.filter(predictions.UserID_MovieID == user_id).select('prediction').collect()[0]['prediction']
print("El clúster que buscamos es el :", cluster_id)

El clúster que buscamos es el : 9


Filtramos los usuarios pertenecientes a el cluster buscado y seleccionamos uno aleatorio (similar_user) a partir del cual recomendar películas a user_id.

In [9]:
# Posibles usuarios (pertenecientes al mismo cluster)
possible_users = predictions.filter(predictions.prediction == cluster_id).filter(predictions.UserID_MovieID != user_id).select('UserID_MovieID')

# Usuario aleatorio dentro del mismo cluster.
similar_user = np.random.randint(possible_users.count())
similar_user_movies = predictions.filter(predictions.UserID_MovieID == possible_users.collect()[similar_user]['UserID_MovieID'])


Calculamos las diferencias con el vector del usuario seleccionado con el similar. Primero definimos los arrays de películas similares y películas vistas por el usuario.


In [10]:
similar_user_movies_array = np.array(similar_user_movies.drop("UserID_MovieID", "features", "prediction").collect())

user_movies_array = np.array(predictions.filter(predictions.UserID_MovieID == user_id).drop("UserID_MovieID", "features", "prediction").collect())

Ahroa obtendremos las películas posibles, es decir, las películas vistas por el usuario similar y no vistas por el usuario para el cual recomendar.

In [15]:
# Y ahora la diferencia.
diff = similar_user_movies_array - user_movies_array

# Los códigos de las películas.
id_peliculas = np.array(similar_user_movies.drop("UserID_MovieID", "features", "prediction").columns)

# Ahora, las posiciones de las posibles películas (las que tengan valor 1).
Lposibles = id_peliculas[np.where(diff == 1)[1]]

# Y las películas vistas por el usuario user_id.
vistas_user = id_peliculas[np.where(user_movies_array == 1)[1]]

# Mostramos las películas vistas y posibles.
print("Películas vistas por el usuario", user_id)
print(vistas_user)

print("Películas Posibles: ")
print(Lposibles)

Películas vistas por el usuario 645
['1' '111' '1136' '1148' '1198' '1204' '1206' '1208' '1213' '1221' '1225'
 '1227' '1228' '1230' '1252' '1256' '1288' '1293' '1299' '1537' '1836'
 '2028' '2064' '2300' '2324' '2396' '260' '2858' '2937' '2966' '3077'
 '3114' '3210' '34' '3424' '3425' '3929' '593' '608' '68' '745' '750'
 '903' '904']
Películas Posibles: 
['1022' '1028' '1032' '1035' '1081' '1083' '1088' '1210' '1380' '1937'
 '1943' '1947' '1951' '2078' '2081' '2087' '2096' '2565' '2628' '2746'
 '2941' '2946' '3061' '3199' '3545' '3549' '3604' '3605' '364' '3675'
 '588' '899' '900' '912' '918' '938']


Ahora queda el paso de elegir la película a recomendar de entre todas las posibles. La idea original era crear una matriz cruzada con los usuarios como filas, las películas como columnas y los ratings como valores, para de ahí calcular la correlación entre películas, sin embargo, aunque he implementado ese método tanto con la distancia euclidea como con la similitud del coseno entre dos columnas del dataframe, es imposible de calcular con mi equipo, surgen errores de memoria que no se solucionar. Dejaré el código de la implementación comentado al final del trabajo.

Así pues, simplemente vamos a seleccionar la película con mayor rating medio de todas las películas posibles.

In [12]:
# Calculo las medias
from pyspark.sql.functions import mean
medias = ratings.groupby('MovieID').agg({'Rating': "mean"})

medias_movies_id = np.array([int(row['MovieID']) for row in medias.select('MovieID').collect()])
medias_movies_avg = [float(row['avg(Rating)']) for row in medias.select('avg(Rating)').collect()]

# Almaceno las medias de únicamente las películas posibles y las comparo entre si para obtener la que tiene la mayor
# que será la película elegida.
if Lposibles.shape[0] > 0:

    Lscores = []
    i = 0

    for p in Lposibles:

        media = medias_movies_avg[np.where(medias_movies_id == int(p))[0][0]]
        # media = ratings_pivoted.select(p).dropna().select(mean(p)).collect()[0]['avg(' + p + ')']

        # Almacenamos la suma de las correlaciones de la posible película con las vistas.
        Lscores.append(media)


    irecom = int(np.array(Lscores).argmax())
    print("Recomendacion Final: ")
    print("Película con MovieID: ", Lposibles[irecom])
    print(movies.filter(movies.MovieID == Lposibles[irecom]).select('Title').show())

Recomendacion Final: 
Película con MovieID:  912
+-----------------+
|            Title|
+-----------------+
|Casablanca (1942)|
+-----------------+

None


######################################################################################################################
#####################################################################################################################

A continuación dejo el código de la implementación original, que sería funcional si no fuera por los errores de memoria.

In [13]:

# Creo la matriz cruzada.
#ratings_pivoted = ratings.groupby("UserID").pivot("MovieID").avg("Rating").fillna(0)

# Defino las funciones de similitud del coseno y distancia euclidea.
#import pyspark.sql.functions as func

#def cosine_similarity(df, col1, col2):

#    df_cosine = df.select(func.sum(df[col1] * df[col2]).alias('dot'),
#                          func.sqrt(func.sum(df[col1]**2)).alias('norm1'),
#                          func.sqrt(func.sum(df[col2] **2)).alias('norm2'))

#    d = df_cosine.rdd.collect()[0].asDict()

#    return d['dot']/(d['norm1'] * d['norm2'])

#def euclidean_distance(df, col1, col2):

#    df_cosine = df.select(func.sqrt(func.sum((df[col1] - df[col2])**2)).alias('dot'))

#    d = df_cosine.rdd.collect()[0].asDict()

#    return d['dot']


# Ahora vamos a trabajar con la matriz ratings_pivoted, buscando la correlación entre películas a partir
# de las valoraciones de usuarios.

#if Lposibles.shape[0] > 0:

#    Lscores = []
#    i = 0

#    for p in Lposibles:

#        simil = 0

#        for u in vistas_user:

#            print("Iteración ", i, " de ", Lposibles.shape[0]*vistas_user.shape[0])
#            i = i + 1

            # Calculamos la similitud entre la película posible y las vistas por el usuario.
#            simil = simil + euclidean_distance(ratings_pivoted, p, u) # ratings_pivoted_filled.corr(p, u) la correlación 
            # da errores de memoria.

        # Almacenamos la suma de las correlaciones de la posible película con las vistas.
#        Lscores.append(simil)


#    irecom = int(np.array(Lscores).argmin())
#    print("Recomendacion Final: ")
#    print(movies.filter(movies.MovieID == Lposibles[irecom]).select('Title').show())
