## Dependencias



In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
import os
import matplotlib.pyplot as plt
import numpy as np
from pyspark.sql.window import Window

spark = SparkSession.builder.getOrCreate()
DISPLAY = False # True/False para ver os displays

In [2]:
# Não manipule diretamente esses df, faça uma copia (ex: df_links_media = df_links)

links_path = '../input/links.csv' # usando small pra ir mais rapido
df_links = spark.read.csv(links_path, header=True)

ratings_path = '../input/ratings.csv'# usando small pra ir mais rapido
df_ratings = spark.read.csv(ratings_path, header=True)

movies_metadata_path = '../input/movies_metadata.csv'
df_movies = spark.read.csv(movies_metadata_path, header=True)

keywords_path = '../input/keywords.csv'
df_keywords = spark.read.csv(keywords_path, header=True)

credits_path = '../input/credits.csv'
df_credits = spark.read.csv(credits_path, header=True)

## Preparação do dataset



Aqui vamos preparar o dataframe pra receber exatamente o que queremos no treino do modelo.

Cada instancia do nosso treino representa uma nota feita por um usuário, tenha isso em mente quando desenvolver as features.

No final dessa sesssão teremos um dataframe com algumas informações basicas, a ideia sera fazer joins nele para incluir as features.

In [5]:
df_treino = (
  df_ratings
  .join(df_links,on=['movieId'] ,how="inner")
)

## Features

### Genero
- **user_genero_avg**: Histórico da media das notas do usuário por genero
- **user_genero_count**: Histórico de generos vistos pelo usuário
- **generos_movie**: Generos que o filme avaliado pelo usuário pertence

In [6]:
schema = T.ArrayType(T.StructType([
    T.StructField("id", T.StringType(), True),
    T.StructField("name", T.StringType(), True)
]))

w = Window().partitionBy('genres_explode.id')

df_usuario = (
    df_ratings
  .join(df_links, on=['movieId'])
)

df_movies_edit = (
  df_movies
  .withColumn("genres_array", F.from_json("genres", schema))
  .withColumn("imdbId", F.substring(F.col("imdb_id"), 3, 1000))# tirando tt
  .filter(F.col('genres_array.name').isNotNull())
)

df_genero = (
  df_movies_edit
  .join(df_usuario, on=['imdbId'])
  .withColumn("genres_explode", F.explode("genres_array"))
  .withColumn('num_aparicao', F.count("*").over(w))
  .where(F.col("num_aparicao") > 2)
  #.groupBy("genres_explode.id").agg(F.count("*")).orderBy(F.desc("count(1)"))
  .selectExpr("imdbId","userId","genres_explode.name as genero","rating","timestamp")
)

In [7]:
w = Window().partitionBy('userId','genero').orderBy("timestamp").rangeBetween(Window.unboundedPreceding, 0)

df_genero = (
  df_genero
  .withColumn("timestamp", F.col("timestamp").cast("int"))
  .withColumn('user_genero_count', F.count("*").over(w))
  .withColumn("rating", F.col("rating").cast("float"))
  .withColumn('user_genero_avg', F.round((F.avg("rating").over(w)/5),2))

)

In [None]:
# Ordenando a lista de genero em ordem alfabética
list_generos = df_genero.agg(F.collect_set("genero").alias("generos_unicos")).collect()[0]["generos_unicos"]
list_generos.sort()
print(list_generos)

w = Window().partitionBy('userId').orderBy("timestamp").rangeBetween(Window.unboundedPreceding, 0)

In [None]:
def map_generos(geners):
    if geners is None:
        return []
    return [1 if genre in geners else 0 for genre in list_generos]

map_generos_udf = F.udf(map_generos, T.ArrayType(T.IntegerType()))

df_generos_filme = (
    df_movies_edit
    .withColumn("generos_movie", map_generos_udf(F.col("genres_array.name")))
    .select('imdbId', 'generos_movie')
)


In [None]:
df_genero_count = (
  df_genero
  .groupby("timestamp","userId","imdbId")
  .pivot("genero", values= list_generos)
  .agg(F.first("user_genero_count"))
)

for genero in list_generos:
  df_genero_count = df_genero_count.withColumn(genero, F.when(F.col(genero).isNull(), F.count(genero).over(w)).otherwise(F.col(genero)))

df_genero_count = df_genero_count.select("timestamp","userId","imdbId",F.array(*list_generos).alias("user_genero_count"))

In [None]:
df_genero_avg = (
  df_genero
  .groupby("timestamp","userId","imdbId")
  .pivot("genero", values= list_generos)
  .agg(F.first("user_genero_avg"))
  #.select("timestamp","userId",F.array(*list_generos).alias("user_genero_count"))
)

nota_neutra = 0.2 # estudar a disparidade dessa nota

for genero in list_generos:
  df_genero_avg = (
      df_genero_avg
      .withColumn(genero, (
            F.when(F.col(genero).isNull(), (
                F.when(F.last(genero, ignorenulls=True).over(w).isNull(), F.lit(nota_neutra))
            ).otherwise(F.last(genero, ignorenulls=True).over(w)))
        ).otherwise(F.col(genero))
      )
  )

df_genero_avg = df_genero_avg.select("timestamp","userId","imdbId",F.array(*list_generos).alias("user_genero_avg"))

In [None]:
df_treino = (
  df_treino
  .join(df_generos_filme, on=["imdbId"], how="inner")
  .join(df_genero_count, on=["timestamp","userId","imdbId"], how="inner")
  .join(df_genero_avg, on=["timestamp","userId","imdbId"], how="inner")

)

O dataframe nao tem nota zero, o que faz com que 2,5 nao seja o valor medio exato.

**NOTAS**:
- Uma investigação para entender melhor as notas dos usuários;
- Abordar a normalizacao de outra forma tbm é interessante;
- Podemos colocar nulo no lugar do valor neutro?;
- Analise da feature pra saber se está correta;

### Filme
- **movie_overall_grade**: Nota geral do filme
- **movie_popularity**: Popularidade do filme


In [None]:
df_geral = (
  df_movies
  .withColumn("imdbId", F.substring(F.col("imdb_id"), 3, 1000))# tirando tt
  .select("imdbId","popularity","vote_average")
  .withColumn('popularity', F.log1p(F.col('popularity').cast("float")))
  .withColumn('vote_average', F.col('vote_average').cast("float"))
  .dropna()
  .filter((F.col('vote_average') >= 0) & (F.col('vote_average') <= 10))
)


In [None]:
df_geral_standardized = (
  df_geral
  .withColumn('movie_overall_grade', F.round((F.col("vote_average")/10),2))
  .withColumn("movie_popularity", F.round(F.when(F.col("popularity")> 3, F.lit(1)).otherwise(F.col("popularity")/3),4))
  .select("imdbId","movie_popularity","movie_overall_grade")
)

In [None]:
df_treino = (
  df_treino
  .join(df_geral_standardized, on=["imdbId"], how="inner")
)

## Salvando treino

In [None]:
df_treino.show(truncate=False)

+-------+---------+------+-------+------+------+------------------------------------------------------------+------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------+----------------+-------------------+
|imdbId |timestamp|userId|movieId|rating|tmdbId|generos_movie                                               |user_genero_count                                                 |user_genero_avg                                                                                               |movie_popularity|movie_overall_grade|
+-------+---------+------+-------+------+------+------------------------------------------------------------+------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------+----------------+-------------------+
|0114709|823264602|230253

In [None]:
train_path = os.path.join('./','output-treino')
df_treino.repartition(5).write.mode('overwrite').parquet(train_path)