## Proyecto final Upskilling Big Data - IT Academy

El conjunto de datos escogidos se trata de tres datasets en formato .json de la plataforma de videojuegos Steam. Los mismos contienen información sobre los juegos, reviews que hacen los usuarios sobre éstos, precios de los videojuegos, etc.<br>
<br>
Fuentes de los datos:<br>
https://www.kaggle.com/datasets/trolukovich/steam-games-complete-dataset<br>
https://cseweb.ucsd.edu/~jmcauley/datasets.html#steam_data<br>
<br>
Como se puede observar a través de los nombres de cada campo de los dataset, los datos contienen numerosas variables. Entre las más destacadas, podemos nombrar: precios de los videojuegos, género del videojuego, tiempo de juego del usuario, nombre del usuario, cantidad de juegos por usuario, reviews realizadas por los usuarios.<br>
Esto abre la puerta a la posibilidad de realizar un análisis diverso sobre el mundo de los videojuegos.<br>


**Objetivos del proyecto** <br>
<br>
Objetivo general: 
* Utilizar las tecnologías aprendidas durante el curso para realizar una limpieza de datos y un análisis exploratorio entre las variables de los dataset con el fin de ver si hay algún patrón interesante en el que valga la pena profundizar. 

<br>
Objetivos específicos:

* Obtener las horas de juego para cada género por año
* Obtener el top 10 histórico de usuarios con más horas jugadas por género.
* Obtener el top 5 histórico de juegos más recomendados por los usuarios.
* Obtener una lista con la cantidad de registros de reseñas de usuarios categorizadas con un análisis de sentimiento, según el año de lanzamiento.
* Desarrollar un sistema de recomendación item-item (eligiendo un videojuego, que el sistema recomiende videojuegos similares).

In [1]:
import os
import sys

In [2]:
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [3]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SQLContext, SparkSession, Row
from pyspark.conf import SparkConf
import pyspark.sql.functions as F
import pyspark.sql.types as T
import plotly.express as px


In [4]:
conf = SparkConf().setAppName('PF_Maria_Soledad_Alvarez')\
            .setMaster("local[*]")\
            .set("spark.executor.memory", "4g")\
            .set("spark.driver.memory", "8g")\
            .set("spark.driver.maxResultSize", "2g")
spark = SparkSession.builder.config(conf=conf).getOrCreate()

In [None]:
conf = spark.sparkContext.getConf()
print("spark.app.name = ", conf.get("spark.app.name"))
print("spark.master = ", conf.get("spark.master"))
print("spark.executor.memory = ", conf.get("spark.executor.memory"))

In [94]:
#spark.stop()

In [5]:
steam_games = spark.read.option("multiline","true").parquet('./parquet_files/steam_games.parquet')
#steam_games.write.mode("overwrite").parquet("./parquet_files/steam_games.parquet")

In [6]:
steam_reviews = spark.read.option("multiline","true").parquet('./parquet_files/steam_reviews.parquet')
#steam_reviews.write.mode("overwrite").parquet("./parquet_files/steam_reviews.parquet")

In [7]:
steam_users = spark.read.option("multiline","true").parquet('./parquet_files/steam_users.parquet')
#steam_users.write.mode("overwrite").parquet("./parquet_files/steam_users.parquet")

In [None]:
sqlContext = SQLContext(spark)

In [9]:
sqlContext.sql("DROP TABLE IF EXISTS steam_games")
sqlContext.registerDataFrameAsTable(steam_games, "steam_games")

In [10]:
sqlContext.sql("DROP TABLE IF EXISTS steam_users")
sqlContext.registerDataFrameAsTable(steam_users, "steam_users")

In [11]:
sqlContext.sql("DROP TABLE IF EXISTS steam_reviews")
sqlContext.registerDataFrameAsTable(steam_reviews, "steam_reviews")

In [12]:
def lower_case(x):
    if type(x) == type(None):
        return ['no genre']
    else:
        res = []
        for x_ in x:
            res.append(x_.lower())
        return res

In [10]:
spark.catalog.listTables()

[Table(name='steam_games', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='steam_reviews', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='steam_users', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

#### Obtener las horas de juego para cada género por año

Vemos que en el dataset steam_users se puede ver el tiempo de juego (play_time) tanto histórico como de las ultimas dos semanas, pero no se distigue por año, por lo que no podremos calcular las horas de juego para cada género por año con esta tabla. <br>
<br>
En cambio, en la tabla steam_reviews tenemos el registro de la fecha que se realizó la review y las horas de juego que tiene acumuladas el jugador para este videojuego en particular -identificado con su product_id- al momento de realizar el comentario. Esto nos permite calcular las horas de juego para cada videojuego separadas por el año. Además, hemos de cruzar esta tabla con la tabla steam_games para obtener el género al que pertenece el juego.

In [23]:
spark.sql('SELECT username, user_id, product_id, COUNT(*) AS count\
           FROM steam_reviews\
           GROUP BY username, user_id, product_id\
           ORDER BY count DESC\
           LIMIT 10').show()

+--------+-------+----------+-----+
|username|user_id|product_id|count|
+--------+-------+----------+-----+
|     123|   NULL|    346110|  238|
|     123|   NULL|    252490|   43|
| quintal|   NULL|    113420|   29|
|Ajit Pai|   NULL|       440|   28|
|     yes|   NULL|       440|   26|
|    Jack|   NULL|       440|   25|
|    Alex|   NULL|       440|   24|
|     123|   NULL|    433850|   23|
|  Monika|   NULL|       440|   23|
|   Ghost|   NULL|       440|   22|
+--------+-------+----------+-----+



In [28]:
spark.sql("SELECT username, user_id, count(text) AS count\
          FROM steam_reviews\
          WHERE username = 123 AND product_id = 346110\
          GROUP BY username, user_id\
          ORDER BY count DESC").show(5)

+--------+-----------------+-----+
|username|          user_id|count|
+--------+-----------------+-----+
|     123|             NULL|  238|
|     123|76561197971448769|    3|
|     123|76561198012216856|    3|
|     123|76561197981368919|    3|
|     123|76561198274978022|    3|
+--------+-----------------+-----+
only showing top 5 rows



In [9]:
spark.sql("SELECT DISTINCT username, user_id, text\
          FROM steam_reviews\
          WHERE username = 123 AND product_id = 346110 AND user_id IS NULL\
        ").count()

181

Vamos a crear una tabla de reviwes quitando campos que no se van a utilizar. Y luego, se eliminarán los registros que estén duplicados. Esto permitirá aumentar la calidad de los datos a la vez que reducirá el tamaño del dataset.<br>
Los campos que quedarán en la tabla serán:
* date
* user_id
* username
* hours
* text
* product_id 

In [9]:
#steam_reviwes_transformed = spark.sql('SELECT DISTINCT date, user_id, username, hours, text, product_id\
#                                    FROM steam_reviews')

In [13]:
#steam_reviwes_transformed.write.mode("overwrite").parquet("./transformed_parquet/steam_reviews.parquet")
steam_reviwes_transformed = spark.read.option("multiline","true").parquet('./transformed_parquet/steam_reviews.parquet')

In [14]:
sqlContext.sql("DROP TABLE IF EXISTS steam_reviwes_transformed")
sqlContext.registerDataFrameAsTable(steam_reviwes_transformed, "steam_reviwes_transformed")

Ahora realizamos un join con la tabla steam_games entre los id de los videojuegos. 

In [50]:
reviews_games_joined = spark.sql('SELECT r.date, r.hours, r.product_id, g.app_name, g.genres\
                                 FROM steam_reviwes_transformed r\
                                 JOIN steam_games g ON r.product_id = g.id')

Verificamos registros nulos

In [51]:
sqlContext.registerDataFrameAsTable(reviews_games_joined, "reviews_games_joined")

In [52]:
reviews_games_joined = spark.sql('SELECT *\
          FROM reviews_games_joined\
          WHERE genres IS NOT NULL AND hours IS NOT NULL')

Para facilitar la busqueda, pasamos a minúsculas el campo de genres.

In [53]:
#reviews_games_joined = reviews_games_joined.withColumn("genres", F.when((F.size(F.col("genres")) == 0), F.lit(None)).otherwise(F.col("genres")))
null_dict = {col:reviews_games_joined.filter(reviews_games_joined[col].isNull()).count() for col in reviews_games_joined.columns}

reviews_games_joined = reviews_games_joined.dropna()

def lower_case(x):
    res = []
    for x_ in x:
        res.append(x_.lower())
    return res

convert_to_lower = F.udf(lower_case, T.ArrayType(T.StringType()))

reviews_games_joined = reviews_games_joined.withColumn("genres", convert_to_lower(F.col("genres")))

Hacemos verificación de registros nulos para asegurarnos que en la tabla no queden registros con campos nulos

In [17]:
null_dict = {col:reviews_games_joined.filter(reviews_games_joined[col].isNull()).count() for col in reviews_games_joined.columns}
null_dict

{'date': 0, 'hours': 0, 'product_id': 0, 'app_name': 0, 'genres': 0}

Finalmente obtenemos una función cuyos argumentos serán el año y el género, los cuales podrán ser ingresados por el usuario.

In [62]:
def playtimeForGenre():
    input_genre = input('Elija el género del videojuego: ')
    genre = input_genre.lower()
    try:
        year = int(input('Elija un año entre 1999 y 2017 (formato AAAA):'))
        if year > 2017 | year < 1999:
            year = int(input('El valor ingresado es incorrecto, elija un año entre 1999 y 2017 (formato AAAA):'))
    except ValueError:
        year = int(input('El valor ingresado es incorrecto, elija un año entre 1999 y 2017 (formato AAAA):'))
    data = reviews_games_joined.filter(F.array_contains(F.col("genres"), genre) & (F.year(F.col('date')) == year))
    if data.isEmpty():
        return "No se han encontrado resultados para los parámetros '{}' y '{}'.".format(input_genre, year)
    else:
        horas = round(data.select(F.sum('hours')).collect()[0][0], 0)
        return f'Las horas acumuladas de juego para el género {input_genre} durante el año {year} es de: ' + str(horas)

In [63]:
playtimeForGenre()

'Las horas acumuladas de juego para el género Action durante el año 2012 es de: 6512807.0'

In [64]:
playtimeForGenre()

'Las horas acumuladas de juego para el género Indie durante el año 2014 es de: 27951930.0'

In [65]:
playtimeForGenre()

"No se han encontrado resultados para los parámetros 'Cualquier cosa' y '2012'."

#### Obtener el top 10 histórico de usuarios con más horas jugadas por género.

En este caso, al buscar el valor histórico de horas de juego podemos utilizar la variable "playTimeForever" de los usuarios de la tabla steam_user. Al cruzar este campo con la tabla de steam_games podremos saber el género de los juegos. Teniendo ya los registros con las horas, los usuarios y el género, se puede crear los filtros deseados.

En primer lugar, observamos que los campos 'items.item_id' e 'items.playtime_forever' son arrays, al cual el primer valor del array en item_id le corresponde el primer valor de playtime_forever, y así con todos los valores del array. Esto no permitirá realizar un join directo con la tabla steam_games. Es necesario desplegar los arrays de manera que para cada registro tengamos un item_id, un playtime_forever con su respectivo user_id.

In [7]:
spark.sql('SELECT user_id, items.item_id, items.playtime_forever\
                                 FROM steam_users').show(5)

+-----------------+--------------------+--------------------+
|          user_id|             item_id|    playtime_forever|
+-----------------+--------------------+--------------------+
|76561197970982479|[10, 20, 30, 40, ...|[6, 0, 7, 0, 0, 0...|
|          js41637|[10, 80, 100, 300...|[0, 0, 0, 220, 0,...|
|        evcentric|[1200, 1230, 1280...|[923, 0, 0, 158, ...|
|       Riot-Punch|[10, 20, 30, 40, ...|[0, 0, 0, 0, 0, 0...|
|            doctr|[300, 20, 50, 70,...|[1131, 89, 178, 1...|
+-----------------+--------------------+--------------------+
only showing top 5 rows



Con la ayuda de las funciones arrays_zip y explode, podemos hacer el despliegue deseado y así obtenemos una nueva tabla con la que se podrá realizar el join con steam_games

In [7]:
steam_users_unfolded= steam_users\
                        .withColumn("tmp",F.arrays_zip("items.item_id", "items.playtime_forever"))\
                        .withColumn("tmp", F.explode("tmp"))\
                        .select("user_id", F.col("tmp.item_id"), F.col("tmp.playtime_forever"))

In [10]:
steam_users_unfolded\
        .join(steam_games, steam_users_unfolded.item_id == steam_games.id)\
        .select(steam_users_unfolded.user_id, steam_users_unfolded.item_id, steam_users_unfolded.playtime_forever, steam_games.genres).show()

+-----------------+-------+----------------+------------------+
|          user_id|item_id|playtime_forever|            genres|
+-----------------+-------+----------------+------------------+
|76561197970982479|     10|               6|          [Action]|
|76561197970982479|     20|               0|          [Action]|
|76561197970982479|     30|               7|          [Action]|
|76561197970982479|     40|               0|          [Action]|
|76561197970982479|     50|               0|          [Action]|
|76561197970982479|     60|               0|          [Action]|
|76561197970982479|     70|               0|          [Action]|
|76561197970982479|    130|               0|          [Action]|
|76561197970982479|    300|            4733|          [Action]|
|76561197970982479|    240|            1853|          [Action]|
|76561197970982479|   3830|             333|          [Action]|
|76561197970982479|   2630|              75|          [Action]|
|76561197970982479|   3900|             

In [8]:
users_games_joined = steam_users_unfolded\
                    .join(steam_games, steam_users_unfolded.item_id == steam_games.id)\
                    .select(steam_users_unfolded.user_id, steam_users_unfolded.item_id, steam_users_unfolded.playtime_forever, steam_games.genres)

In [70]:
users_games_joined.count()

4294257

Vamos a descartar los registros que contengan nulos en alguno de sus campos para evitar errores en el procesamiento posterior

In [12]:
users_games_joined.dropna().count()

4204813

In [9]:
users_games_joined = users_games_joined.dropna()

In [10]:
def lower_case(x):
    if type(x) == type(None):
        return ['no genre']
    else:
        res = []
        for x_ in x:
            res.append(x_.lower())
        return res

convert_to_lower = F.udf(lower_case, T.ArrayType(T.StringType()))

users_games_joined = users_games_joined.withColumn("genres", convert_to_lower(F.col("genres")))

In [15]:
users_games_joined.show(5)

+-----------------+-------+----------------+--------+
|          user_id|item_id|playtime_forever|  genres|
+-----------------+-------+----------------+--------+
|76561197970982479|     10|               6|[action]|
|76561197970982479|     20|               0|[action]|
|76561197970982479|     30|               7|[action]|
|76561197970982479|     40|               0|[action]|
|76561197970982479|     50|               0|[action]|
+-----------------+-------+----------------+--------+
only showing top 5 rows



In [14]:
def top10_historicPlaytimeGenre():
    input_genre = input('Elija el género del videojuego: ')
    genre = input_genre.lower()

    data = users_games_joined.filter(F.array_contains(F.col("genres"), genre))
    if data.isEmpty():
        return "No hay datos para el valor {}".format(input_genre)
    else:
        return data.groupBy('user_id').agg({'playtime_forever':'sum'}).sort('sum(playtime_forever)', ascending=False).show(10)

In [15]:
top10_historicPlaytimeGenre()

+-----------------+---------------------+
|          user_id|sum(playtime_forever)|
+-----------------+---------------------+
|     REBAS_AS_F-T|              2402994|
|         jimmynoe|              1139510|
|      shinomegami|              1111013|
|       Steamified|               998908|
|       idonothack|               834678|
|        clawbot44|               808702|
|76561198058230663|               804067|
|  DownSyndromeKid|               762460|
|          Sp3ctre|               751167|
|76561198063648921|               712732|
+-----------------+---------------------+
only showing top 10 rows



#### Obtener una lista con la cantidad de registros de reseñas de usuarios categorizadas con un análisis de sentimiento, según el año de lanzamiento.

Para llevar a cabo el análisis de sentimiento, Pyspark no cuenta con una librería para el procesamiento natural del lenguaje (NLP) en su biblioteca que permita realizar un análisis de sentimiento directo, por lo que recurrimos NLTK (Natural Language Took Kit).

In [16]:
#!pip install nltk
import nltk
nltk.download('vader_lexicon')

[nltk_data] Downloading package vader_lexicon to
[nltk_data]     C:\Users\AsusPC\AppData\Roaming\nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!


True

Se utilizará el modelo SentimentIntensityAnalyzer el cual esta pensado para analizar textos y definir el peso del sentimiento. El método polarity_scores() devuelve 4 valores: compound, neg, neu, pos. El primero puede tomar un valor entre -1 y 1 y será el valor compuesto que resulta del peso de los aportes de las componentes negativas, neutras y positivas. Mientras más negativo el valor de compound se entiende que el sentimiento del mensaje es más negativo, y viceversa.

In [17]:
from nltk.sentiment import SentimentIntensityAnalyzer

In [18]:
analyzer = SentimentIntensityAnalyzer()

Ahora se define una función que servirá para calificar a los comentarios en tres categorias, en función del valor del compound. Se le dará el valor de 1 a los comentarios con sentimiento positivo cuyo compound sea mayor a 0.3, 0 para aquellos comentarios neutros se encuentren entre 0.3 y -0.3 y 1- a los comentarios negativos que se encuentren con un compound por debajo de -0.3.<br>
<br>
Por otro lado, se creará una columna con los valores del compound del comentario.

In [19]:
def sentiment_analysis(review):
    result = analyzer.polarity_scores(review)['compound'] #---> cada review al argumento
    if result > -1 and result < -0.3:
        return -1
    elif result >= -0.3 and result <= 0.3:
        return 0
    else:
        return 1
    
sentiment_udf = F.udf(lambda text: sentiment_analysis(text), T.IntegerType())
compound_udf = F.udf(lambda text: analyzer.polarity_scores(text)['compound'], T.FloatType())

In [20]:
sentiment_table = steam_reviwes_transformed.withColumn('val_sentiment', sentiment_udf(steam_reviwes_transformed.text))\
                         .withColumn('val_compound', compound_udf(steam_reviwes_transformed.text))\
                         .select('product_id', 'val_sentiment', 'val_compound')

Se procede a contar cuantos sentimientos de cada categoría (positivos (1), neutros (0), negativos(-1)) ha recibido cada videojuego.

In [21]:
sentiment_table_count = sentiment_table.join(steam_games, sentiment_table.product_id == steam_games.id).select(steam_games.developer, steam_games.id, steam_games.app_name, steam_games.release_date, steam_games.sentiment, sentiment_table.val_sentiment)

In [56]:
sentiment_table_count.columns

['developer', 'id', 'app_name', 'release_date', 'sentiment', 'val_sentiment']

In [57]:
sentiment_table_count = sentiment_table_count.groupBy(['release_date', 'id', 'val_sentiment']).agg({'val_sentiment':'count'}).withColumnRenamed('count(val_sentiment)', 'count_sentiment')

In [22]:
#sentiment_table_count.write.mode("overwrite").parquet("./transformed_parquet/sentiment_table_count.parquet")
sentiment_table_count = spark.read.option("multiline","true").parquet('./transformed_parquet/sentiment_table_count.parquet')

Ahora definimos una función que nos permita elegir el año de lanzamiento. La misma devuelve una tabla con los videojuegos que se lanzaron ese año junto con la canitdad de comentarios positivos, neutros y negativos que han recibido. 

In [37]:
def sentimentAnalysisPerReleaseDate():
    try:
        year = int(input('Elija un año entre 1999 y 2017 (formato AAAA):'))
        if year > 2017 | year < 1999:
            year = int(input('El valor ingresado es incorrecto, elija un año entre 1999 y 2017 (formato AAAA):'))
    except ValueError:
        year = int(input('El valor ingresado es incorrecto, elija un año entre 1999 y 2017 (formato AAAA):'))
    data = sentiment_table_count.filter(F.year(F.col('release_date')) == year)
    if data.isEmpty():
        return "No se han encontrado resultados para el año '{}'.".format(year)
    else:
        return data.sort(['count_sentiment', 'app_name', 'val_sentiment'], ascending=False).show()

In [38]:
sentimentAnalysisPerReleaseDate()

+------------+--------------------+-------------+---------------+
|release_date|            app_name|val_sentiment|count_sentiment|
+------------+--------------------+-------------+---------------+
|  2012-10-02|        3D-Coat V4.8|           -1|              1|
|  2012-11-13| AirMech® Soundtrack|           -1|              1|
|  2012-11-29|Atlantis: Pearls ...|           -1|              1|
|  2012-05-11|           Avernum 6|           -1|              1|
|  2012-04-23|  Call of the Ninja!|           -1|              1|
|  2012-04-13|   Choice of Zombies|           -1|              1|
|  2012-07-13|         Clean'Em Up|           -1|              1|
|  2012-07-16|Decisive Campaign...|           -1|              1|
|  2012-05-04|Gas Guzzlers: Com...|           -1|              1|
|  2012-01-27|Hubert's Island A...|           -1|              1|
|  2012-08-01|             LogiGun|           -1|              1|
|  2012-09-03|Luxor: Amun Risin...|           -1|              1|
|  2012-04

#### Obtener el top 5 histórico de juegos más recomendados por los usuarios.

En la tabla steam_games se puede observar que existe un campo con el nombre de 'sentiment'. Esta variable se calcula a partir de un relevo de los usuarios que le dan 'pulgar hacia arriba' o 'pulgar hacia abajo' respondiendo a la pregunta de si recomendarían el videojuego como una forma más de calificarlo al momento de realizar una reseña. A partir de que el juego tiene 10 reseñas, esta variable pasa a tomar los siguientes valores:
* Overwhelmingly Positive
* Very Positive
* Positive
* Mostly Positive
* Mixed
* Mostly Negative
* Negative
* Very Negative
* Overwhelmingly Negative<br>

Teniendo en cuenta lo anterior, para llevar a cabo este punto se tomarán dos estrategias donde se busca combinar los valores del campo 'sentiment' con el análisis de sentimiento realizado a los textos de las reviews, y con esto poder determinar cuales han sido los juegos mas recomendados por los usuarios. <br>
Esto se hará mediante dos estrategias. Por un lado se definirá como juego más recomendado al que cumpla las siguientes condiciones: 
* el que cuente con el mayor promedio de la variable 'val_compound' resultante del análisis de sentimiento.
* que en el campo 'sentiment' de la tabla steam_games se encuentre con la calificación más alta. <br>

Por otro lado, se cambiará la primer condición donde se tendrá en cuenta el recuento de recomendaciones calificadas como positivas por el análisis de sentimiento en vez del promedio de 'val_compound. La segunda condición se tomará de la misma manera.


In [23]:
total_count = steam_games.select('*').count()
distribution = spark.sql('SELECT sentiment, count(*) as count\
          FROM steam_games\
          GROUP BY sentiment\
          ORDER BY count')
sqlContext.sql("DROP TABLE IF EXISTS distribution")
sqlContext.registerDataFrameAsTable(distribution, "distribution")


In [52]:
spark.sql(f'SELECT sentiment, count, ROUND(count/{total_count}*100, 2) as percentaje\
          FROM distribution').show()

+--------------------+-----+----------+
|           sentiment|count|percentaje|
+--------------------+-----+----------+
|Overwhelmingly Ne...|    7|      0.02|
|       Very Negative|   29|      0.09|
|            Negative|  123|      0.38|
|Overwhelmingly Po...|  303|      0.94|
|      9 user reviews|  488|      1.52|
|      8 user reviews|  537|      1.67|
|      7 user reviews|  619|      1.93|
|      6 user reviews|  756|      2.35|
|     Mostly Negative|  802|       2.5|
|      5 user reviews|  846|      2.63|
|      4 user reviews|  964|       3.0|
|      3 user reviews| 1231|      3.83|
|      2 user reviews| 1756|      5.46|
|      1 user reviews| 2496|      7.77|
|     Mostly Positive| 2744|      8.54|
|            Positive| 3281|     10.21|
|       Very Positive| 3868|     12.04|
|               Mixed| 4103|     12.77|
|                NULL| 7182|     22.35|
+--------------------+-----+----------+



En la tabla anterior se pueden ver cuantos juegos hay por cada valor de sentimiento. Se observa que hay una gran cantidad de valores NULL (22,35%).

En este punto decido quitar los juegos que tienen menos de diez reseñas ya que no contienen una canitdad suficiente de reviews para obtener un valor en este campo, y separar los juegos que tienen valor Null en una tabla aparte para cruzarlos con la tabla del análisis de sentimiento luego.

In [24]:
sentiment_table_avg = sentiment_table.groupBy('product_id').agg({'val_compound': 'avg'}).withColumnRenamed('avg(val_compound)', 'avg_compound')

In [84]:
sentiment_table_avg.columns

['product_id', 'avg_compound']

In [25]:
#sentiment_table_avg.write.mode("overwrite").parquet("./transformed_parquet/sentiment_table_avg.parquet")
sentiment_table_avg = spark.read.option("multiline","true").parquet('./transformed_parquet/sentiment_table_avg.parquet')

In [28]:
sentiment_dict =   {"Overwhelmingly Positive": 4,
                    "Very Positive": 3, 
                    "Positive": 2, 
                    "Mostly Positive": 1, 
                    "Mixed": 0,
                    "Mostly Negative": -1, 
                    "Negative": -2, 
                    "Very Negative": -3, 
                    "Overwhelmingly Negative": -4}

In [46]:
sentiment_table_avg.show(5)

+----------+-------------------+
|product_id|       avg_compound|
+----------+-------------------+
|    491580| 0.3006371455533164|
|    687280| 0.3943083295598626|
|    220240| 0.3776530251353691|
|    232770|0.07881258463278748|
|    307620| 0.2898327590704992|
+----------+-------------------+
only showing top 5 rows



In [26]:
sentiment_list = ["Overwhelmingly Positive", "Very Positive", "Positive", "Mostly Positive", "Mixed", "Mostly Negative", "Negative", "Very Negative", "Overwhelmingly Negative"]
games_sentiment = steam_games.select('*').where(steam_games.sentiment.isin(sentiment_list))

In [29]:
from itertools import chain

mapping_expr = F.create_map([F.lit(x) for x in chain(*sentiment_dict.items())])
indexed_df = games_sentiment.withColumn("sentiment_label", mapping_expr[F.col("sentiment")])

In [30]:
avg_sentiment = indexed_df.join(sentiment_table_avg, indexed_df.id == sentiment_table_avg.product_id).select(indexed_df.app_name, indexed_df.sentiment, indexed_df.sentiment_label, sentiment_table_avg.avg_compound, sentiment_table_avg.product_id).sort(['sentiment_label', 'avg_compound'], ascending=False)

In [61]:
very_good = avg_sentiment.select('avg_compound').where(indexed_df.sentiment_label == 3).rdd.flatMap(lambda x: x).collect()

In [49]:
for sentiment in sentiment_list:
    x = avg_sentiment.select('avg_compound').where(indexed_df.sentiment == sentiment).rdd.flatMap(lambda x: x).collect()
    fig = px.histogram(x, title=sentiment, nbins = 50)
    fig.show()

Como es de esperar, a medida que el valor de 'sentiment' va hacia definiciones mas negativas, la distribución de los valores promedios del compound va hacia valores mas negativos.

Ahora, haremos graficas con el count de las reviews que se han catalogado como 1, 0 y -1 haciendo referencia a 'positivas', 'neutras', negativas' para cada videojuego. Nuevamente se graficará en función del campo 'sentiment'

In [36]:
sentiment_table_count = spark.read.option("multiline","true").parquet('./transformed_parquet/sentiment_table_count.parquet')

In [38]:
count_sentiment = indexed_df.join(sentiment_table_count, indexed_df.id == sentiment_table_count.id).select(indexed_df.app_name, indexed_df.sentiment, sentiment_table_count.count_sentiment, sentiment_table_count.val_sentiment, sentiment_table_count.id)

In [48]:
for sentiment in sentiment_list:
    df = count_sentiment.select('count_sentiment', 'val_sentiment').where(indexed_df.sentiment == sentiment).sort('val_sentiment')
    fig = px.histogram(df, x='count_sentiment', color='val_sentiment', title=sentiment, nbins = 200)
    fig.show()

Algo interesante a destacar es que en todos los casoso, las reviews catalogadas con el analisis de sientimiento como 'positivas' (1) son más para todos los valores de 'sentiment'. Luego le sigue las reviews neutrales y por último las negativas.

#### Desarrollar un sistema de recomendación item-item (eligiendo un videojuego, que el sistema recomiende videojuegos similares).

Para realizar este sistema de recomendación utilizaremos el concepto de similitud coseno. Esta es una medida de la similitud existente entre dos vectores que posee un producto interior con el que se evalúa el valor del coseno del ángulo comprendido entre ellos. <br>
<br>
Para esto, es necesario tomar ciertas características del producto, en este caso los videojuegos, y vectorizarlas para luego poder realizar el calculo de la similitud del coseno entre el videojuego elegido y los demás que se encuentren en el dataset. Naturalmente, los que posean características similares, tendrán una similutid mas alta y por lo tanto serán los recomendados.

In [9]:
from pyspark.ml.feature import IDF, CountVectorizer
from pyspark.ml.linalg import VectorUDT

El dataset steam_games contiene para cada videojuego un campo 'tags' donde se enumeran etiquetas que caracterizan a este videojuego, entre ellas el género del videojuego. <br>
<br>
Este campo será entonces el que utilizaremos para vectorizarlo y encontrar juegos similares para recomendar.

In [8]:
steam_games.select('genres', 'tags').distinct().show(5, truncate=False)

+-----------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|genres                       |tags                                                                                                                                                                                                          |
+-----------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[Action]                     |[Action, FPS, Classic, World War II, Retro, Shooter, Singleplayer, 1990's]                                                                                                                                    |
|[Racing]                     |[Simulation, 

In [7]:
game_recommendation = steam_games.select('*').sort('tags').dropna(subset ='tags') #Eliminar los registros que no tengan valores para la columna 'tags'.

Para crear los vectores, utilizaremos al medida TF-IDF (del inglés Term frequency – Inverse document frequency), frecuencia de término – frecuencia inversa de documento o sea, la frecuencia de ocurrencia del término en la colección de documentos. Es una medida numérica que expresa cuán relevante es una palabra para un documento en una colección. El valor tf-idf aumenta proporcionalmente al número de veces que una palabra aparece en el documento, pero es compensada por la frecuencia de la palabra en la colección de documentos, lo que permite manejar el hecho de que algunas palabras son generalmente más comunes que otras

Este paso es similar a calcular la frecuencia de término (TF) de cada etiqueta (tag). Esta es una medida de la frecuencia con la que aparece una frase sobre las etiquetas en el conjunto de datos videojuegos. Esto se realiza ajustando la columna 'tags' utilizando la clase CountVectorizer en Pyspark. <br>
<br>
Para controlar el tamaño de los vectores dispersos, se establece un umbral mínimo de frecuencia de 50, filtrando etiquetas menos comunes o potencialmente ruidosas para centrarnos en las ocurren con suficiente frecuencia y relevancia en todo el conjunto de datos.

In [10]:
vectorizer = CountVectorizer(inputCol="tags", outputCol="tags_frequency", minDF=50)

vectorizer_model = vectorizer.fit(game_recommendation)
game_recommendation_vectorise = vectorizer_model.transform(game_recommendation)

In [11]:
game_recommendation_vectorise.show(5)

+--------------------+--------------------+--------------+------------+------------------+------+---------+-----+-----------------+------------+--------------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|            app_name|           developer|discount_price|early_access|            genres|    id|metascore|price|        publisher|release_date|         reviews_url|    sentiment|               specs|                tags|               title|                 url|      tags_frequency|
+--------------------+--------------------+--------------+------------+------------------+------+---------+-----+-----------------+------------+--------------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|  Mayan Death Robots|      Sileni Studios|          NULL|       false|   [Action, Indie]|347470|       69| 6.99|             NULL|  2015-11-20|h

A continuación, se calcula la frecuencia inversa de documentos (IDF) para evaluar la importancia de cada término en todo el conjunto de datos. Las etiquetas que aparecen con mayor frecuencia recibirán una ponderación menor. Este valor IDF luego se escala con los valores TF para calcular las puntuaciones de TF-IDF. El TF-IDF otorga una puntuación que mide la importancia de una etiqueta, en relación con las demás que aparecen en el conjunto de datos de videojuegos.

In [12]:
idf = IDF(inputCol="tags_frequency", outputCol="tags_tfidf")
idf_model = idf.fit(game_recommendation_vectorise)

game_recommendation_vectorise_tfidf = idf_model.transform(game_recommendation_vectorise)

In [13]:
game_recommendation_vectorise_tfidf.show(5)

+--------------------+--------------------+--------------+------------+------------------+------+---------+-----+-----------------+------------+--------------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|            app_name|           developer|discount_price|early_access|            genres|    id|metascore|price|        publisher|release_date|         reviews_url|    sentiment|               specs|                tags|               title|                 url|      tags_frequency|          tags_tfidf|
+--------------------+--------------------+--------------+------------+------------------+------+---------+-----+-----------------+------------+--------------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|  Mayan Death Robots|      Sileni Studios|          NULL|       false|   [Action,

El campo creado 'tags_tfidf' contiene los vectores que se va a comparar, de la clase SparseVectors. Ahora, se define una función para calcular la similitud del coseno entre dos de estos vectores usando la definición matemática: <br>
<br>
cos(a,b) = (a.b) / (||a||*||b||)

In [14]:
def cosine_similarity(v1, v2):

    dot_product = float(v1.dot(v2))   # Producto punto entre SparseVector v1 & v2
    norm_v1 = float(v1.norm(2))   # Norma del SparseVector v1
    norm_v2 = float(v2.norm(2))   # Norma del SparseVector v2

    if norm_v1 * norm_v2 == 0:
        return 0.0

    similarity = dot_product / (norm_v1 * norm_v2)   # Cálculo de la similitud del coseno
    return similarity

cosine_similarity_udf = F.udf(cosine_similarity, T.DoubleType())

In [15]:
sqlContext.registerDataFrameAsTable(game_recommendation_vectorise_tfidf, 'game_recommendation_vectorise_tfidf')

Por cuestiones de cómputo, seleccionamos una muestra de videojuegos para probar el modelo y realizar recomendaciones.

In [23]:
sample_df = game_recommendation_vectorise_tfidf.sample(0.01, 42)

In [22]:
recomendation_list = []

In [None]:
for game in sample_df.collect():
    game_requested = game['app_name']
    game_vector = game['tags_tfidf']

    #Hay que filtrar el juego del que se pide una recomendación para que no aparezca entre ellas, pues la similitud sera obvia.
    filtered_df = sample_df.select('*').where(sample_df.app_name != game_requested)
    def game_tfidf_vector():
        return game_vector
    game_tfidf_vector = F.udf(game_tfidf_vector, VectorUDT())

    game_recommendation_cosine_similarity = filtered_df.withColumn("similarity_score", cosine_similarity_udf(F.col("tags_tfidf"), game_tfidf_vector()))
    recommendations = game_recommendation_cosine_similarity.orderBy(F.col("similarity_score").desc()).limit(5).collect()

    for rank, result in enumerate(recommendations, start=1):
        game_dict = {'game_requested': game_requested, "Nº": rank, "Game": result['app_name'], "Price": result['price'], "Genre": result['genres'], 'Release date': result['release_date'] } 
        recomendation_list.append(game_dict)


In [29]:
len(recomendation_list)/5

385.0

Como se puede ver, se realizaron las recomendaciones de 385 videojuegos. A continuación, se crea un dataframe con estas recomendaciones, que nos permitirá luego filtrar por el videojuego que elijamos.

In [32]:
recommendations_df = spark.createDataFrame(Row(**x) for x in recomendation_list)
recommendations_df.show(10, truncate=False)

+-----------------------------------------------------------------------+---+-----------------------------------------------------------------------+-----+--------+------------+
|game_requested                                                         |Nº |Game                                                                   |Price|Genre   |Release date|
+-----------------------------------------------------------------------+---+-----------------------------------------------------------------------+-----+--------+------------+
|Street Fighter X Tekken: Street Fighter/Tekken Shared Assist Gem Pack 2|1  |Aliens vs. Predator Swarm Map Pack                                     |7.49 |[Action]|2010-03-18  |
|Street Fighter X Tekken: Street Fighter/Tekken Shared Assist Gem Pack 2|2  |Gotham City Impostors Free to Play: Ninja Costume                      |5.99 |[Action]|2012-08-30  |
|Street Fighter X Tekken: Street Fighter/Tekken Shared Assist Gem Pack 2|3  |Dogfight 1942 Russia Under Siege 

In [33]:
recommendations_df.select('game_requested').distinct().collect()

[Row(game_requested='Black Rose'),
 Row(game_requested='One Piece Burning Blood - Golden Luffy'),
 Row(game_requested='Aliens vs. Predator Swarm Map Pack'),
 Row(game_requested='ARSLAN - Scenario Set 3'),
 Row(game_requested="Aliens: Colonial Marines Collector's Edition Pack"),
 Row(game_requested="Tom Clancy's Rainbow Six® Siege - Ash Watch_Dogs Set"),
 Row(game_requested='Slug Blast'),
 Row(game_requested='DW8XLCE - BASE THEME PACK'),
 Row(game_requested='GGXrd System Voice - MILLIA RAGE'),
 Row(game_requested='Gigantus Tank, Bullet Girls Marking'),
 Row(game_requested='Call of Duty®: Ghosts - Inferno Character Pack'),
 Row(game_requested='DARIUSBURST Chronicle Saviours - DoDonPachi Resurrection'),
 Row(game_requested='HELLDIVERS™ - Commando Pack'),
 Row(game_requested="Tom Clancy's Rainbow Six® Siege - Racer 23 Bundle"),
 Row(game_requested='FOR HONOR™ SEASON PASS'),
 Row(game_requested='Upgrade Pack/エージェントゴーストパック'),
 Row(game_requested='Call of Duty®: Advanced Warfare - Jackpot Per

Se crea la siguiente función para filtrar por videojuego

In [40]:
def getRecomendation(game):
    df = recommendations_df.select('Nº', 'Game', 'Price', 'Genre', 'Release date').where(f'game_requested = "{game}"' )
    if df.isEmpty():
        return 'No se encontraron resultados para el juego seleccionado.'
    else:
        return df.show()

**Ejemplos** 

In [41]:
getRecomendation('Counter-Strike: Condition Zero')

+---+--------------------+------------+--------------------+------------+
| Nº|                Game|       Price|               Genre|Release date|
+---+--------------------+------------+--------------------+------------+
|  1|Battlefield: Bad ...|       19.99|            [Action]|  2010-03-02|
|  2|Zombie Panic! Source|        Free|[Action, Free to ...|  2007-12-28|
|  3|      Tribes: Ascend|Free to Play|[Action, Free to ...|  2012-06-27|
|  4|            Far Cry®|        9.99|            [Action]|  2004-03-23|
|  5|Arma 2: Private M...|        8.99|[Action, Simulati...|  2010-11-30|
+---+--------------------+------------+--------------------+------------+



In [42]:
getRecomendation('Slug Blast')

+---+--------------------+-----+--------+------------+
| Nº|                Game|Price|   Genre|Release date|
+---+--------------------+-----+--------+------------+
|  1|Aliens vs. Predat...| 7.49|[Action]|  2010-03-18|
|  2|Street Fighter X ...| 2.99|[Action]|  2012-07-24|
|  3|Gotham City Impos...| 5.99|[Action]|  2012-08-30|
|  4|Dogfight 1942 Rus...| 2.99|[Action]|  2012-09-28|
|  5|Hitman: Absolutio...| 0.99|[Action]|  2012-11-19|
+---+--------------------+-----+--------+------------+



In [43]:
getRecomendation('Cualquier cosa')

'No se encontraron resultados para el juego seleccionado.'