# Camada Silver – Tratamento e Padronização

## Objetivo
Transformar os dados da camada Bronze em uma base canônica (Silver), com:

- padronização de tipos (ex.: datas)
- normalização de atributos (ex.: duração em minutos / temporadas)
- normalização relacional de campos multivalorados (N:N), garantindo melhor qualidade analítica

## Fonte
- Entrada: `bronze.disney_titles_raw`
- Saídas:
  - `silver.disney_titles_clean` (tabela canônica)
  - tabelas normalizadas:
    - `silver.title_genres`
    - `silver.title_countries`
    - `silver.title_directors`
    - `silver.title_cast`


## Principais Transformações

- Conversão de `date_added` para tipo data
- Normalização de duração (minutos / temporadas)
- Padronização de campos categóricos


In [0]:
from pyspark.sql import functions as F

df = spark.table("bronze.disney_titles_raw")

# Padronizações leves e canônicas
df_silver = (
    df
    # trims básicos
    .withColumn("show_id", F.trim(F.col("show_id")))
    .withColumn("type", F.trim(F.col("type")))
    .withColumn("title", F.trim(F.col("title")))
    .withColumn("rating", F.trim(F.col("rating")))
    .withColumn("listed_in", F.trim(F.col("listed_in")))
    .withColumn("country", F.trim(F.col("country")))
    .withColumn("director", F.trim(F.col("director")))
    .withColumn("cast", F.trim(F.col("cast")))
    .withColumn("description", F.trim(F.col("description")))
    
    # parse de data
    .withColumn("date_added_dt", F.to_date("date_added", "MMMM d, yyyy"))
    
    # normalização de duração (num/unidade)
    .withColumn("duration_num", F.regexp_extract("duration", r"(\d+)", 1).cast("int"))
    .withColumn(
        "duration_unit",
        F.when(F.col("duration").contains("min"), F.lit("min"))
         .when(F.col("duration").contains("Season"), F.lit("season"))
         .otherwise(F.lit(None))
    )
    .withColumn("duration_minutes", F.when(F.col("duration_unit") == "min", F.col("duration_num")))
    .withColumn("seasons", F.when(F.col("duration_unit") == "season", F.col("duration_num")))
    
    # tipagem do ano de lançamento
    .withColumn("release_year_int", F.col("release_year").cast("int"))
    
    # normalização de strings vazias para null (boa prática)
    .withColumn("director", F.nullif(F.col("director"), F.lit("")))
    .withColumn("cast", F.nullif(F.col("cast"), F.lit("")))
    .withColumn("country", F.nullif(F.col("country"), F.lit("")))
    .withColumn("rating", F.nullif(F.col("rating"), F.lit("")))
)

# Remover duplicidades por show_id (se existirem) mantendo a primeira ocorrência
df_silver = df_silver.dropDuplicates(["show_id"])

# Persistência canônica Silver
(df_silver
 .write
 .format("delta")
 .mode("overwrite")
 .saveAsTable("silver.disney_titles_clean"))


## Normalização de Campos Multivalorados (N:N)

O dataset contém campos com múltiplos valores em uma única coluna (separados por vírgula),
o que prejudica análises e modelagem estrela.

Nesta etapa, normalizamos para tabelas auxiliares do tipo:
- show_id + valor_normalizado

Campos normalizados:
- `listed_in` → gêneros (title_genres)
- `country` → países (title_countries)
- `director` → diretores (title_directors)
- `cast` → elenco (title_cast)


In [0]:
from pyspark.sql import DataFrame

def normalize_multivalue(df_base: DataFrame, id_col: str, src_col: str, out_col: str, out_table: str):
    """
    Cria uma tabela normalizada (N:N) a partir de uma coluna multivalorada separada por vírgula.
    - Remove espaços extras
    - Remove valores vazios
    - Remove duplicidades show_id + valor
    """
    normalized = (
        df_base
        .select(F.col(id_col).alias(id_col), F.col(src_col).alias(src_col))
        .filter(F.col(src_col).isNotNull())
        .withColumn(out_col, F.explode(F.split(F.col(src_col), r",\s*")))
        .withColumn(out_col, F.trim(F.col(out_col)))
        .filter(F.col(out_col).isNotNull() & (F.col(out_col) != ""))
        .select(id_col, out_col)
        .dropDuplicates([id_col, out_col])
    )
    
    (normalized
     .write
     .format("delta")
     .mode("overwrite")
     .saveAsTable(out_table))
    
    return normalized


In [0]:
base = spark.table("silver.disney_titles_clean")

# Gêneros: listed_in
normalize_multivalue(
    df_base=base,
    id_col="show_id",
    src_col="listed_in",
    out_col="genre",
    out_table="silver.title_genres"
)

# Países: country
normalize_multivalue(
    df_base=base,
    id_col="show_id",
    src_col="country",
    out_col="country_name",
    out_table="silver.title_countries"
)

# Diretores: director
normalize_multivalue(
    df_base=base,
    id_col="show_id",
    src_col="director",
    out_col="director_name",
    out_table="silver.title_directors"
)

# Elenco: cast
normalize_multivalue(
    df_base=base,
    id_col="show_id",
    src_col="cast",
    out_col="cast_name",
    out_table="silver.title_cast"
)


## Validações de Qualidade (Silver)

Objetivos:
- confirmar que a Silver canônica foi criada
- medir completude (nulos) por atributo relevante
- confirmar volume das tabelas normalizadas
- identificar possíveis inconsistências em domínios (ex.: type, rating)


In [0]:
%sql
SELECT
  COUNT(*) AS total,
  SUM(CASE WHEN show_id IS NULL THEN 1 ELSE 0 END) AS null_show_id,
  SUM(CASE WHEN type IS NULL THEN 1 ELSE 0 END) AS null_type,
  SUM(CASE WHEN title IS NULL THEN 1 ELSE 0 END) AS null_title,
  SUM(CASE WHEN listed_in IS NULL THEN 1 ELSE 0 END) AS null_listed_in,
  SUM(CASE WHEN release_year_int IS NULL THEN 1 ELSE 0 END) AS null_release_year,
  SUM(CASE WHEN date_added_dt IS NULL THEN 1 ELSE 0 END) AS null_date_added
FROM silver.disney_titles_clean;


In [0]:
%sql
SELECT type, COUNT(*) AS qtd
FROM silver.disney_titles_clean
GROUP BY type
ORDER BY qtd DESC;


In [0]:
%sql
SELECT 'title_genres' AS tabela, COUNT(*) AS linhas FROM silver.title_genres
UNION ALL
SELECT 'title_countries', COUNT(*) FROM silver.title_countries
UNION ALL
SELECT 'title_directors', COUNT(*) FROM silver.title_directors
UNION ALL
SELECT 'title_cast', COUNT(*) FROM silver.title_cast;


## Evidência e Rastreabilidade

As transformações desta camada (tipagem, normalização de duração e normalização relacional)
são reproduzíveis a partir deste notebook versionado no GitHub integrado ao Databricks.

As tabelas geradas na Silver serão usadas na Gold para:
- modelo estrela (dimensões + fato)
- análises de diversidade e perfis
- agregações e métricas de negócio


Observações importantes:

- Não normalizei `description` porque não é multivalorado e normalmente é usado em NLP/EDA (Gold/ML).
- Não normalizei `rating` porque é categórico simples e já está canônico na Silver.
- Mantive as tabelas normalizadas na camada Silver porque facilitam criar dimensões e pontes na Gold e evitam repetição de parsing/explode em toda query analítica
