# ETL Raw para Silver - Movies Dataset

Este notebook executa o ETL da camada Raw para a camada Silver do projeto.

Etapas:
- Extract: leitura do CSV bruto
- Transform: limpeza, padronizacao, enriquecimento e validacao
- Load: gravacao na camada Silver (CSV) e opcionalmente no PostgreSQL


## 1. Importacoes e configuracao

In [None]:
import pandas as pd
import numpy as np
from pathlib import Path
import os
from datetime import datetime


In [None]:
pd.set_option("display.max_columns", None)
pd.set_option("display.max_colwidth", 120)


### 1.1 Caminhos do projeto

In [None]:
CWD = Path.cwd()
PROJECT_ROOT = None

for candidate in [CWD, *CWD.parents]:
    if (candidate / "Data Layer").exists() and (candidate / "base de dados.csv").exists():
        PROJECT_ROOT = candidate
        break

if PROJECT_ROOT is None:
    raise FileNotFoundError(
        f"Nao encontrei a pasta do projeto a partir de {CWD}. "
        "Verifique se o notebook esta dentro do repositorio."
    )

RAW_FILE = PROJECT_ROOT / "base de dados.csv"
SILVER_DIR = PROJECT_ROOT / "Data Layer" / "silver"
SILVER_FILE = SILVER_DIR / "movies_silver.csv"
SILVER_PARQUET = SILVER_DIR / "movies_silver.parquet"

print(f"Projeto: {PROJECT_ROOT}")
print(f"Raw: {RAW_FILE}")
print(f"Silver: {SILVER_FILE}")


### 1.2 Conexao com banco (opcional)

In [None]:
# Ative apenas se quiser carregar no PostgreSQL
LOAD_TO_DB = False

DB_HOST = os.getenv("DB_HOST", "localhost")
DB_PORT = os.getenv("DB_PORT", "5432")
DB_NAME = os.getenv("DB_NAME", "movies_dw")
DB_USER = os.getenv("DB_USER", "postgres")
DB_PASSWORD = os.getenv("DB_PASSWORD", "postgres")

CONNECTION_STRING = f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"

try:
    from sqlalchemy import create_engine, text
    HAS_SQLALCHEMY = True
except Exception:
    HAS_SQLALCHEMY = False

engine = None
if LOAD_TO_DB and HAS_SQLALCHEMY:
    engine = create_engine(CONNECTION_STRING, pool_size=5, max_overflow=10)
    print("Conexao PostgreSQL pronta")
elif LOAD_TO_DB:
    print("SQLAlchemy nao instalado. Desative LOAD_TO_DB ou instale a dependencia.")


## 2. Extract - leitura dos dados brutos

In [None]:
print("Carregando CSV bruto...")
df_raw = pd.read_csv(RAW_FILE, low_memory=False)

print(f"Linhas: {len(df_raw):,}")
print(f"Colunas: {len(df_raw.columns)}")
print(f"Memoria: {df_raw.memory_usage(deep=True).sum() / 1024**2:.2f} MB")

print("Amostra:")
display(df_raw.head())

## 3. Transform - limpeza e enriquecimento

In [None]:
def normalize_text(series: pd.Series) -> pd.Series:
    # Padroniza strings e converte vazios para NA
    s = series.astype("string").str.strip()
    s = s.replace({"": pd.NA, "None": pd.NA, "nan": pd.NA, "NaN": pd.NA})
    return s


def clean_csv_list(value: str):
    # Limpa campos CSV (separados por virgula)
    if pd.isna(value):
        return pd.NA
    parts = [p.strip() for p in str(value).split(",")]
    parts = [p for p in parts if p and p.lower() not in {"nan", "none"}]
    if not parts:
        return pd.NA
    seen = set()
    dedup = []
    for p in parts:
        if p not in seen:
            dedup.append(p)
            seen.add(p)
    return ", ".join(dedup)


def normalize_imdb_id(value: str):
    if pd.isna(value):
        return pd.NA
    v = str(value).strip()
    if v == "":
        return pd.NA
    if not v.startswith("tt"):
        return pd.NA
    return v if len(v) <= 12 else v[:12]


def parse_bool(value):
    if pd.isna(value):
        return pd.NA
    v = str(value).strip().lower()
    if v in {"true", "t", "1", "yes"}:
        return True
    if v in {"false", "f", "0", "no"}:
        return False
    return pd.NA


def cinema_era(year):
    if pd.isna(year):
        return pd.NA
    y = int(year)
    if y < 1930:
        return "Cinema mudo"
    if y < 1960:
        return "Era dourada"
    if y < 1980:
        return "Nova Hollywood"
    if y < 2000:
        return "Blockbuster"
    if y < 2010:
        return "Digital"
    return "Streaming"


In [None]:
df = df_raw.copy()

text_cols = [
    "title",
    "status",
    "original_language",
    "original_title",
    "overview",
    "tagline",
    "genres",
    "production_companies",
    "production_countries",
    "spoken_languages",
    "keywords",
    "homepage",
    "imdb_id",
    "poster_path",
    "backdrop_path",
]

for col in text_cols:
    if col in df.columns:
        df[col] = normalize_text(df[col])

# Normaliza imdb_id
if "imdb_id" in df.columns:
    df["imdb_id"] = df["imdb_id"].apply(normalize_imdb_id)

# Normaliza idioma
if "original_language" in df.columns:
    df["original_language"] = df["original_language"].str.lower()
    df.loc[df["original_language"].str.len() != 2, "original_language"] = pd.NA

# Normaliza adult
if "adult" in df.columns:
    df["adult"] = df["adult"].apply(parse_bool)

# Conversao de tipos numericos
num_cols = ["id", "vote_average", "vote_count", "revenue", "budget", "runtime", "popularity"]
for col in num_cols:
    if col in df.columns:
        df[col] = pd.to_numeric(df[col], errors="coerce")

if "id" in df.columns:
    df["id"] = df["id"].astype("Int64")
if "vote_count" in df.columns:
    df["vote_count"] = df["vote_count"].astype("Int64")
if "runtime" in df.columns:
    df["runtime"] = df["runtime"].astype("Int64")

# Limpeza de valores invalidos
if "vote_average" in df.columns:
    df.loc[(df["vote_average"] < 0) | (df["vote_average"] > 10), "vote_average"] = pd.NA
if "vote_count" in df.columns:
    df.loc[df["vote_count"] < 0, "vote_count"] = pd.NA
if "revenue" in df.columns:
    df.loc[df["revenue"] < 0, "revenue"] = pd.NA
if "budget" in df.columns:
    df.loc[df["budget"] < 0, "budget"] = pd.NA
if "runtime" in df.columns:
    df.loc[(df["runtime"] <= 0) | (df["runtime"] > 600), "runtime"] = pd.NA
if "popularity" in df.columns:
    df.loc[df["popularity"] < 0, "popularity"] = pd.NA

# Datas
if "release_date" in df.columns:
    df["release_date"] = pd.to_datetime(df["release_date"], errors="coerce")
    df["year"] = df["release_date"].dt.year.astype("Int64")
    df["month"] = df["release_date"].dt.month.astype("Int64")
    df["day"] = df["release_date"].dt.day.astype("Int64")
    df["day_of_week"] = df["release_date"].dt.dayofweek.astype("Int64")
    df["quarter"] = df["release_date"].dt.quarter.astype("Int64")
    df["week_of_year"] = df["release_date"].dt.isocalendar().week.astype("Int64")
    df["decade"] = (df["year"] // 10 * 10).astype("Int64")
    df["is_weekend"] = df["day_of_week"].isin([5, 6])
    df["cinema_era"] = df["year"].apply(cinema_era).astype("string")

# Campos CSV
list_cols = ["genres", "production_companies", "production_countries", "spoken_languages", "keywords"]
for col in list_cols:
    if col in df.columns:
        df[col] = df[col].apply(clean_csv_list).astype("string")

# Metricas derivadas
if "revenue" in df.columns and "budget" in df.columns:
    df["profit"] = df["revenue"] - df["budget"]
    df["roi"] = np.where(
        df["budget"] > 0,
        (df["revenue"] - df["budget"]) / df["budget"] * 100,
        np.nan,
    )

if "vote_average" in df.columns and "vote_count" in df.columns:
    df["engagement"] = df["vote_average"] * np.log1p(df["vote_count"])

if "revenue" in df.columns and "runtime" in df.columns:
    df["revenue_per_minute"] = np.where(df["runtime"] > 0, df["revenue"] / df["runtime"], np.nan)

if "vote_average" in df.columns and "vote_count" in df.columns:
    quality_raw = (df["vote_average"].fillna(0) * np.log1p(df["vote_count"].fillna(0)))
    q_min = quality_raw.min()
    q_max = quality_raw.max()
    if pd.notna(q_min) and pd.notna(q_max) and q_max != q_min:
        df["quality_score"] = (quality_raw - q_min) / (q_max - q_min) * 100
    else:
        df["quality_score"] = pd.NA

# Faixas
if "revenue" in df.columns:
    df["revenue_range"] = pd.cut(
        df["revenue"],
        bins=[-1, 0, 1e6, 1e7, 5e7, 1e8, 5e8, np.inf],
        labels=["Zero", "<1M", "1-10M", "10-50M", "50-100M", "100-500M", ">500M"],
    ).astype("string")

if "budget" in df.columns:
    df["budget_range"] = pd.cut(
        df["budget"],
        bins=[-1, 0, 1e6, 1e7, 5e7, 1e8, 2.5e8, np.inf],
        labels=["Zero", "<1M", "1-10M", "10-50M", "50-100M", "100-250M", ">250M"],
    ).astype("string")

if "vote_average" in df.columns:
    df["rating_range"] = pd.cut(
        df["vote_average"],
        bins=[-0.1, 4, 6, 7, 8, 10],
        labels=["Ruim", "Regular", "Bom", "Muito bom", "Excelente"],
    ).astype("string")

if "runtime" in df.columns:
    df["runtime_range"] = pd.cut(
        df["runtime"],
        bins=[-1, 60, 90, 120, 150, np.inf],
        labels=["<60", "60-90", "90-120", "120-150", ">150"],
    ).astype("string")

if "popularity" in df.columns:
    df["popularity_range"] = pd.cut(
        df["popularity"],
        bins=[-1, 1, 5, 10, 20, 50, np.inf],
        labels=["<1", "1-5", "5-10", "10-20", "20-50", ">50"],
    ).astype("string")

# Deduplicacao por id
if "id" in df.columns:
    df = df.dropna(subset=["id"])
    df = df.sort_values(by=["id", "vote_count", "revenue", "budget"], ascending=[True, False, False, False])
    df = df.drop_duplicates(subset=["id"], keep="first")

# Metadados
df["load_timestamp"] = pd.Timestamp.now()
df["source"] = "base de dados.csv"

print(f"Linhas finais: {len(df):,}")


### 3.1 Reordenacao das colunas (padrao Silver)

In [None]:
silver_columns = [
    "id",
    "title",
    "original_title",
    "imdb_id",
    "overview",
    "tagline",
    "status",
    "adult",
    "original_language",
    "release_date",
    "year",
    "month",
    "day",
    "day_of_week",
    "quarter",
    "week_of_year",
    "decade",
    "cinema_era",
    "is_weekend",
    "vote_average",
    "vote_count",
    "popularity",
    "revenue",
    "budget",
    "profit",
    "roi",
    "runtime",
    "engagement",
    "revenue_per_minute",
    "quality_score",
    "revenue_range",
    "budget_range",
    "rating_range",
    "runtime_range",
    "popularity_range",
    "genres",
    "production_companies",
    "production_countries",
    "spoken_languages",
    "keywords",
    "homepage",
    "poster_path",
    "backdrop_path",
    "load_timestamp",
    "source",
]

available_cols = [c for c in silver_columns if c in df.columns]
df_silver = df[available_cols].copy()

print(f"Colunas Silver: {len(df_silver.columns)}")


## 4. Load - gravacao na camada Silver

In [None]:
SILVER_DIR.mkdir(parents=True, exist_ok=True)

print("Salvando CSV da camada Silver...")
df_silver.to_csv(SILVER_FILE, index=False, encoding="utf-8")
print(f"Arquivo salvo: {SILVER_FILE}")

# Salvar Parquet (opcional)
try:
    df_silver.to_parquet(SILVER_PARQUET, index=False)
    print(f"Arquivo salvo: {SILVER_PARQUET}")
except Exception:
    print("Parquet nao gerado (instale pyarrow ou fastparquet se desejar).")


### 4.1 Carga no PostgreSQL (opcional)

In [None]:
if LOAD_TO_DB and engine is not None:
    print("Carregando dados no PostgreSQL...")
    with engine.connect() as conn:
        conn.execute(text("CREATE SCHEMA IF NOT EXISTS silver"))
    df_silver.to_sql(
        "movies",
        engine,
        schema="silver",
        if_exists="replace",
        index=False,
        method="multi",
        chunksize=2000,
    )
    print("Carga concluida.")


## 5. Resumo final

In [None]:
print("ETL Raw para Silver concluido.")
print(f"Registros finais: {len(df_silver):,}")
print(f"Arquivo: {SILVER_FILE}")
