%md
# Projeto de Análise de Dados - IMDB

## Objetivo do Notebook 3

Neste notebook, realizei a etapa de tratamento dos dados brutos do projeto. Foram aplicadas limpezas nas tabelas para corrigir inconsistências, como anos de nascimento e falecimento fora de faixas válidas, além do tratamento de campos nulos e eliminação de duplicidades. Também foi calculada a idade das pessoas com base nas informações disponíveis. Por fim, foram criadas views temporárias que servirão como base para a construção da camada gold no próximo passo do projeto.


##1 - Importar Biblioteca

In [0]:
from pyspark.sql.functions import col, year, current_date, when
from pyspark.sql.types import IntegerType, DoubleType

##2 - Dataframes da Camada Anterior

In [0]:
df_name_basics = spark.table("default.name_basics")
df_title_basics = spark.table("default.title_basics")
df_title_crew = spark.table("default.title_crew")
df_title_episode = spark.table("default.title_episode")
df_title_ratings = spark.table("default.title_ratings")
df_title_principals = spark.table("default.title_principals")
df_title_akas = spark.table("default.title_akas")

##3 - Tratamento dos Dados

In [0]:
#Ano Atual
ano_atual = year(current_date())

3.1 - Tabela Name_Basics

In [0]:
#Adicionando a Coluna de Idade
df_name_basics_tratado = (
    df_name_basics
    .dropDuplicates()
    .withColumn("birthYear", col("birthYear").cast(IntegerType()))
    .withColumn("deathYear", col("deathYear").cast(IntegerType()))

#Tornar Datas aceitáveis
    .withColumn("birthYear", when((col("birthYear") > 1800) & (col("birthYear") <= ano_atual), col("birthYear")).otherwise(None))
    .withColumn("deathYear", when((col("deathYear") >= col("birthYear")) & (col("deathYear") <= ano_atual), col("deathYear")).otherwise(None))

#Preencher Nulos
 .na.fill({"birthYear": 0, "deathYear": 0})

 #Calculo da Idade
  .withColumn(
        "idade",
        when(col("deathYear") > 0, col("deathYear") - col("birthYear"))
        .when(col("birthYear") > 0, ano_atual - col("birthYear"))
        .otherwise(None)
    )
)

3.2 - Tabela Title_Basics

In [0]:
df_title_basics_tratado = (
    df_title_basics
    .dropDuplicates()
    .withColumn("startYear", col("startYear").cast(IntegerType()))
    .withColumn("endYear", col("endYear").cast(IntegerType()))
    .withColumn("runtimeMinutes", col("runtimeMinutes").cast(IntegerType()))

#Datas aceitáveis
  .withColumn("startYear", when((col("startYear") > 1800) & (col("startYear") <= ano_atual), col("startYear")).otherwise(None))
    .withColumn("endYear", when((col("endYear") >= col("startYear")) & (col("endYear") <= ano_atual), col("endYear")).otherwise(None))
    .na.fill({"startYear": 0, "endYear": 0, "runtimeMinutes": 0})
)

3.3 Tabela Title_Crew

In [0]:
## 3.3 title_crew
df_title_crew_tratado = (
    df_title_crew
    .dropDuplicates()
)


3.4 - Tabela Title_Episode

In [0]:
df_title_episode_tratado = (
    df_title_episode
    .dropDuplicates()
    .withColumn("seasonNumber", col("seasonNumber").cast(IntegerType()))
    .withColumn("episodeNumber", col("episodeNumber").cast(IntegerType()))
    .na.fill({"seasonNumber": 0, "episodeNumber": 0})
)

3.5 - Tabela Title_Ratings

In [0]:
df_title_ratings_tratado = (
    df_title_ratings
    .dropDuplicates()
    .na.fill({"averageRating": 0.0, "numVotes": 0})
)

3.6 - Tabela Title_Principals

In [0]:
df_title_principals_tratado = (
    df_title_principals
    .dropDuplicates()
    .withColumn("ordering", col("ordering").cast(IntegerType()))
)

3.7 - Tabela Title_Akas

In [0]:
df_title_akas_tratado = (
    df_title_akas
    .dropDuplicates()
    .withColumn("ordering", col("ordering").cast(IntegerType()))
    .withColumn("isOriginalTitle", col("isOriginalTitle").cast(IntegerType()))
)

##4 - Verificações de integridade (joins para manter somente chaves existentes)

In [0]:
## Verificar se tconst existe em title_basics
df_title_ratings_tratado = df_title_ratings_tratado.join(df_title_basics_tratado, on="tconst", how="inner")
df_title_episode_tratado = df_title_episode_tratado.join(df_title_basics_tratado, on="tconst", how="inner")
df_title_crew_tratado = df_title_crew_tratado.join(df_title_basics_tratado, on="tconst", how="inner")
df_title_principals_tratado = df_title_principals_tratado.join(df_title_basics_tratado, on="tconst", how="inner")

## Join com titleId (que corresponde a tconst)
df_title_akas_tratado = (
    df_title_akas_tratado
    .join(df_title_basics_tratado, df_title_akas_tratado["titleId"] == df_title_basics_tratado["tconst"], "inner")
    .select(df_title_akas_tratado["*"])
)

## Verificar se nconst existe no name_basics (para principals)
df_title_principals_tratado = df_title_principals_tratado.join(df_name_basics_tratado, on="nconst", how="inner")


##5 - Criar Views Temporárias

In [0]:
df_name_basics_tratado.createOrReplaceTempView("tratado_name_basics")
df_title_basics_tratado.createOrReplaceTempView("tratado_title_basics")
df_title_crew_tratado.createOrReplaceTempView("tratado_title_crew")
df_title_episode_tratado.createOrReplaceTempView("tratado_title_episode")
df_title_ratings_tratado.createOrReplaceTempView("tratado_title_ratings")
df_title_principals_tratado.createOrReplaceTempView("tratado_title_principals")
df_title_akas_tratado.createOrReplaceTempView("tratado_title_akas")

##6 - Tabelas Silver Salvas no Metastore

In [0]:
df_name_basics_tratado.write.mode("overwrite").saveAsTable("silver_name_basics")
df_title_basics_tratado.write.mode("overwrite").saveAsTable("silver_title_basics")
df_title_crew_tratado.write.mode("overwrite").saveAsTable("silver_title_crew")
df_title_episode_tratado.write.mode("overwrite").saveAsTable("silver_title_episode")
df_title_ratings_tratado.write.mode("overwrite").saveAsTable("silver_title_ratings")
df_title_principals_tratado.write.mode("overwrite").saveAsTable("silver_title_principals")
df_title_akas_tratado.write.mode("overwrite").saveAsTable("silver_title_akas")

print("✅ Tabelas silver salvas no Metastore com sucesso.")


✅ Tabelas silver salvas no Metastore com sucesso.
