 ## Etapa 2 — Tratamento (limpeza e padronização)
 Agora eu pego a tabela bruta (titles_raw) e crio tabelas limpas e estruturadas:
 - titles_clean: base padronizada (snake_case implícito nas saídas e textos em minúsculos).
 - titles_by_country: relacionamento título × país (explode coluna multivalor country).
 - titles_by_genre: relacionamento título × gênero (explode listed_in).

 Nesta etapa eu uso bastante SQL no SQLite por ser leve e repetir a lógica de produção.

In [13]:
from pathlib import Path
import sqlite3
import pandas as pd

CWD = Path.cwd().resolve()
PROJECT_DIR = CWD.parent if CWD.name.lower() in {"scr", "src", "notebooks"} else CWD

DATA_DIR = PROJECT_DIR / "data"
RAW_DIR = DATA_DIR / "raw"
PROC_DIR = DATA_DIR / "processed"
SQLITE_PATH = DATA_DIR / "netflix.db"

Conexão com SQLite
 Abro/uso o banco local netflix.db. É prático para testes e depois dá para migrar
 a mesma lógica para outro SGBD se eu quiser.

In [14]:
conn = sqlite3.connect(SQLITE_PATH.as_posix())

Criação da tabela titles_clean
 Aqui eu:
 - Padronizo texto: lower(trim(...)) para ficar em minúsculas e sem espaços nas pontas.
 - Preencho nulos:
   - country: viro "unknown" quando vazio/nulo (facilita contagens).
   - rating: viro "not_rated" (padroniza o "sem classificação").
 - Datas:
   - "date_added" vem em texto (ex.: "Sep 9, 2019"). Uso substr + instr + printf para
     converter no formato "YYYY-MM-DD" aceito pelo date(). Assim eu posso filtrar por período.
 - Números:
   - release_year vira INTEGER.
 - Duração:
   - duration original vira duas colunas:
     - duration_value
     - duration_unit  (min/season)
   Isso separa a medida do valor, deixando as análises muito mais simples.

In [15]:
create_titles_clean_sql = """
DROP TABLE IF EXISTS titles_clean;

CREATE TABLE titles_clean AS
SELECT
    show_id,
    lower(trim(type)) AS type,
    lower(trim(title)) AS title,
    lower(trim(director)) AS director,
    lower(trim("cast")) AS "cast",
    CASE
        WHEN country IS NULL OR trim(country) = '' THEN 'unknown'
        ELSE lower(trim(country))
    END AS country,
    CASE
        WHEN date_added IS NOT NULL AND length(trim(date_added)) > 0
        THEN date(
            substr(date_added, -4) || '-' ||
            printf('%02d',
                (instr('JanFebMarAprMayJunJulAugSepOctNovDec',
                       substr(date_added, 1, 3)) + 2) / 3
            ) || '-' ||
            printf('%02d', CAST(replace(substr(date_added, instr(date_added, ' ')+1), ',', '') AS INT))
        )
        ELSE NULL
    END AS date_added,
    CAST(release_year AS INTEGER) AS release_year,
    CASE
        WHEN rating IS NULL OR trim(rating) = '' THEN 'not_rated'
        ELSE lower(trim(rating))
    END AS rating,
    lower(trim(duration)) AS duration,
    CASE
        WHEN lower(duration) LIKE '%min%' THEN CAST(replace(lower(duration), 'min', '') AS INT)
        WHEN lower(duration) LIKE '%season%' THEN CAST(replace(replace(lower(duration), 'seasons', ''), 'season', '') AS INT)
        ELSE NULL
    END AS duration_value,
    CASE
        WHEN lower(duration) LIKE '%min%' THEN 'min'
        WHEN lower(duration) LIKE '%season%' THEN 'season'
        ELSE NULL
    END AS duration_unit,
    lower(trim(listed_in)) AS listed_in,
    lower(trim(description)) AS description
FROM titles_raw;
"""
conn.executescript(create_titles_clean_sql)
print("titles_clean criada")


titles_clean criada


 Explosão da coluna multivalor country para titles_by_country
 country vem como uma string com países separados por vírgula (ex.: "Brazil, United States").
 Para contar e cruzar corretamente, eu explodo em linhas: um show_id para cada país.

 Como faço isso no SQLite:
 - Transformo a string em um array JSON de países.
 - Uso json_each(...) (extensão JSON do SQLite) para "iterar" pelos elementos.
 - Faço trim e lower para padronizar.


In [16]:
create_by_country_sql = """
DROP TABLE IF EXISTS titles_by_country;

CREATE TABLE titles_by_country AS
WITH exploded AS (
    SELECT
        show_id,
        trim(value) AS country
    FROM titles_clean,
         json_each('["' || replace(ifnull(country,''), ',', '","') || '"]')
    WHERE trim(value) <> ''
)
SELECT show_id, lower(country) AS country
FROM exploded;
"""

conn.executescript(create_by_country_sql)

# Validação
count_countries = conn.execute("SELECT COUNT(*) FROM titles_by_country;").fetchone()[0]
print(f"titles_by_country criada com {count_countries} linhas")


titles_by_country criada com 10843 linhas


Explosão da coluna multivalor listed_in (gêneros) para titles_by_genre
 Mesma ideia da coluna country, mas agora para gêneros. Cada gênero vira uma linha.


In [17]:
create_by_genre_sql = """
DROP TABLE IF EXISTS titles_by_genre;

CREATE TABLE titles_by_genre AS
WITH exploded AS (
    SELECT
        show_id,
        trim(value) AS genre
    FROM titles_clean,
         json_each('["' || replace(ifnull(listed_in,''), ',', '","') || '"]')
    WHERE trim(value) <> ''
)
SELECT show_id, lower(genre) AS genre
FROM exploded;
"""

conn.executescript(create_by_genre_sql)

# Validação
count_genres = conn.execute("SELECT COUNT(*) FROM titles_by_genre;").fetchone()[0]
print(f"titles_by_genre criada com {count_genres} linhas")


titles_by_genre criada com 19323 linhas


 Índices para acelerar consultas
 Índices deixam filtros, joins e ordenações muito mais rápidos nas colunas mais usadas.
 Aqui eu crio índices em:
 - show_id, type, date_added na base limpa.
 - country e genre nas tabelas explodidas.

In [18]:
conn.executescript("""
CREATE INDEX IF NOT EXISTS ix_titles_clean_show_id ON titles_clean(show_id);
CREATE INDEX IF NOT EXISTS ix_titles_clean_type ON titles_clean(type);
CREATE INDEX IF NOT EXISTS ix_titles_clean_date_added ON titles_clean(date_added);
CREATE INDEX IF NOT EXISTS ix_titles_by_country_country ON titles_by_country(country);
CREATE INDEX IF NOT EXISTS ix_titles_by_genre_genre ON titles_by_genre(genre);
""")
print("Índices criados.")


Índices criados.


Exportar tabelas tratadas para data/processed
 Exporto as três tabelas para **Parquet** (formato compactado e rápido).
 Vantagens do Parquet:
 - Colunar (ótimo para leitura seletiva de colunas).
 - Compacto (economiza espaço).

In [19]:
tables = {
    "titles_clean": "SELECT * FROM titles_clean;",
    "titles_by_country": "SELECT * FROM titles_by_country;",
    "titles_by_genre": "SELECT * FROM titles_by_genre;"
}

try:
    for name, query in tables.items():
        df_out = pd.read_sql_query(query, conn)
        pq_path = PROC_DIR / f"{name}.parquet"
        df_out.to_parquet(pq_path, index=False)
        print(f"Exportado: {pq_path}")
except Exception as e:
    print("Parquet não exportado", e)


Exportado: C:\Users\maria\OneDrive\Documentos\Portifolio-dados\netflix-analytics\data\processed\titles_clean.parquet
Exportado: C:\Users\maria\OneDrive\Documentos\Portifolio-dados\netflix-analytics\data\processed\titles_by_country.parquet
Exportado: C:\Users\maria\OneDrive\Documentos\Portifolio-dados\netflix-analytics\data\processed\titles_by_genre.parquet


Encerramento da etapa de tratamento.

In [20]:
conn.close()
print("Tratamento concluído com sucesso")


Tratamento concluído com sucesso
