# 02_etl_raw_to_silver
---
Este notebook executa o processo `ETL` que transfere os dados da camada **Raw** para a **Silver**, englobando transformações, padronização, movimentação dos arquivos e carga dos dados no **PostgreSQL**, dando continuidade ao pipeline.


In [25]:
# Parameters

run_mode = "latest"
run_date = None

raw_path = "/opt/airflow/data-layer/raw"
silver_path = "/opt/airflow/data-layer/silver"

postgres_conn_id = "AIRFLOW_VAR_POSTGRES_CONN_ID"


In [None]:
import os
import shutil
from pathlib import Path

from pyspark import StorageLevel
from pyspark.sql import DataFrame, functions as F, Window
from pyspark.sql.column import Column
from pyspark.sql.types import DoubleType, StringType

from transformer.utils.file_io import find_partition
from transformer.utils.helpers import to_date_from_ymd
from transformer.utils.logger import get_logger
from transformer.utils.spark_helpers import get_spark_session, load_to_postgres
from transformer.utils.postgre_helpers import assert_table_rowcount
from transformer.utils.quality_gates_silver_aggregated import run_quality_gates_silver_aggregated
from transformer.utils.quality_gates_silver_base import run_quality_gates_silver_base
from transformer.utils.quality_gates_silver_flights import run_quality_gates_silver_flights


## Job 1: airlines_transform

Este job realiza a transformação e validação do dataset de companhias aéreas (`airlines`) da camada **Raw** para a camada **Silver**.

In [None]:
log = get_logger("transform_airlines")

spark = get_spark_session("TransformAirlines")
log.info("[Airlines] Sessão Spark iniciada.")

# Ajustes de performance para o Spark
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.shuffle.partitions", "32")


### Definindo função de transformação do dataset `airlines`

In [4]:
def transform_airlines(df: DataFrame) -> DataFrame:
    """
    Transforma e valida o DataFrame de companhias aéreas para a camada silver.

    Args:
        df (DataFrame): DataFrame bruto da camada Bronze.

    Returns:
        DataFrame: DataFrame padronizado pronto para a camada Silver.
    """
    log.info("[Airlines] Iniciando transformações.")

    # Verifica se as colunas obrigatórias estão presentes
    log.info("[Airlines] Verificando presença de colunas obrigatórias.")
    required = {"IATA_CODE", "AIRLINE"}
    missing = required - set(df.columns)
    if missing:
        raise KeyError(f"[Airlines][ERROR] Colunas faltando no dataset: {missing}.")

    # Renomeia e converte tipos de colunas
    log.info("[Airlines] Renomeando e convertendo os tipos das colunas.")
    df2 = (
        df.withColumnRenamed("IATA_CODE", "airline_iata_code")
          .withColumnRenamed("AIRLINE", "airline_name")
          .withColumn("airline_iata_code", F.col("airline_iata_code").cast(StringType()))
          .withColumn("airline_name", F.col("airline_name").cast(StringType()))
    )

    # Verifica duplicidade na chave primária
    log.info("[Airlines] Verificando duplicidade da pk.")
    dup_count = df2.groupBy("airline_iata_code").count().filter(F.col("count") > 1).count()
    if dup_count > 0:
        raise ValueError(f"[Airlines][ERROR] airline_iata_code não é único: {dup_count} duplicatas.")

    # Padroniza os nomes das colunas para minúsculo
    log.info("[Airlines] Padronizando nomes de colunas para minúsculo.")
    df2 = df2.toDF(*[c.lower() for c in df2.columns])

    log.info("[Airlines] Transformação concluída com sucesso.")

    return df2


### Runner para o job `airlines_transform`

In [None]:
try:
    log.info("[Airlines] Iniciando job de trasnformação de 'airlines'.")

    # Resolve partição e caminhos
    source_partition = find_partition(raw_path, mode=run_mode, date_str=run_date)
    src = Path(raw_path) / source_partition / "PARQUET" / "airlines.parquet"
    dst = Path(silver_path) / source_partition / "PARQUET" / "airlines.parquet"

    if not src.exists():
        raise FileNotFoundError(f"[Airlines][ERROR] Arquivo não encontrado: {src}.")

    # Leitura do dataset bruto
    log.info(f"[Airlines] Lendo dataset: {src}.")
    df = spark.read.parquet(str(src))

    # Aplica transformações
    df_tf = transform_airlines(df)

    # Executa quality gates
    required_cols = ["airline_iata_code", "airline_name"]
    pk_cols = ["airline_iata_code"]
    
    log.info("[Airlines] Executando quality gates.")
    
    run_quality_gates_silver_base(
        df=df_tf,
        name="airlines_silver",
        required_columns=required_cols,
        pk_columns=pk_cols,
    )
    
    log.info("[Airlines] Quality gates concluídos com sucesso.")

    # Cria diretório de destino e grava o resultado na silver
    dst.parent.mkdir(parents=True, exist_ok=True)
    df_tf.write.mode("overwrite").parquet(str(dst))
    log.info(f"[Refinement][Airlines] Dataset salvo na camada silver: {dst}.")

except Exception as e:
    log.exception(f"[Airlines][ERROR] Falha na execução do job: {e}.")
    raise
finally:
    log.info("[Airlines] Job de trasnformação de 'airlines' encerrado.")


In [6]:
%%script false --no-raise-error # Comentar essa linha se estiver em debug ou se quiser rodar a célula.

df_tf.printSchema()

df_tf.limit(5).show(truncate=False)


In [None]:
# Encerra a sessão Spark
spark.stop()
log.info("[Airlines] Sessão Spark finalizada.")


## Job 2: airports_transform

Este job realiza a transformação e validação do dataset de aeroportos (`airports`) da camada **Raw** para a camada **Silver**.

In [None]:
log = get_logger("transform_airports")

spark = get_spark_session("TransformAirports")
log.info("[Airports] Sessão Spark iniciada.")

# Ajustes de performance para o Spark
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.shuffle.partitions", "32")


### Definindo função de trasformação do dataset `airports`

In [9]:
def transform_airports(df: DataFrame) -> DataFrame:
    """
    Transforma e valida o DataFrame de aeroportos para a camada silver.

    Args:
        df (DataFrame): DataFrame bruto lido da camada bronze.

    Returns:
        DataFrame: DataFrame limpo e padronizado para a camada silver.
    """
    log.info("[Airports] Iniciando transformações.")

    # Verifica colunas obrigatórias
    required = {"IATA_CODE", "LATITUDE", "LONGITUDE"}
    missing = required - set(df.columns)
    if missing:
        raise KeyError(f"[Airports][ERROR] Colunas faltando no dataset: {missing}.")

    # Correções de coordenadas faltosas
    corrections = {
        "ECP": {"LATITUDE": 30.3549, "LONGITUDE": -86.6160},
        "PBG": {"LATITUDE": 44.6895, "LONGITUDE": -68.0448},
        "UST": {"LATITUDE": 42.0703, "LONGITUDE": -87.9539},
    }

    # Tipagem e renomeação de colunas principais
    df2 = (
        df.withColumnRenamed("IATA_CODE", "airport_iata_code")
          .withColumn("airport_iata_code", F.col("airport_iata_code").cast(StringType()))
          .withColumn("LATITUDE", F.col("LATITUDE").cast(DoubleType()))
          .withColumn("LONGITUDE", F.col("LONGITUDE").cast(DoubleType()))
    )

    # Remove coluna COUNTRY
    if "COUNTRY" in df2.columns:
        df2 = df2.drop("COUNTRY")

    # Aplica correções manuais de coordenadas
    for code, coords in corrections.items():
        df2 = df2.withColumn(
            "LATITUDE",
            F.when(F.col("airport_iata_code") == code, F.lit(coords["LATITUDE"])).otherwise(F.col("LATITUDE"))
        ).withColumn(
            "LONGITUDE",
            F.when(F.col("airport_iata_code") == code, F.lit(coords["LONGITUDE"])).otherwise(F.col("LONGITUDE"))
        )

    # Renomeia colunas e força lowercase
    rename_map = {
        "AIRPORT": "airport_name",
        "CITY": "city",
        "STATE": "state",
        "LATITUDE": "latitude",
        "LONGITUDE": "longitude",
    }
    for old, new in rename_map.items():
        if old in df2.columns:
            df2 = df2.withColumnRenamed(old, new)

    # Normaliza nomes para minúsculo
    df2 = df2.toDF(*[c.lower() for c in df2.columns])

    log.info("[Airports] Transformação concluída com sucesso.")

    return df2


### Runner para o job `airports_transform`

In [None]:
try:
    log.info("[Airports] Iniciando job de trasnformação de 'airports'.")

    # Resolve partição e caminhos
    source_partition = find_partition(bronze_path, mode=run_mode, date_str=run_date)
    src = Path(bronze_path) / source_partition / "PARQUET" / "airports.parquet"
    dst = Path(silver_path) / source_partition / "PARQUET" / "airports.parquet"

    if not src.exists():
        raise FileNotFoundError(f"[Airports][ERROR] Arquivo não encontrado: {src}.")

    # Lê dataset e aplica transformações
    log.info(f"[Airports] Lendo dataset: {src}.")
    df = spark.read.parquet(str(src))
    
    df_tf = transform_airports(df)

    # Executa quality gates
    log.info("[Airports] Executando quality gates.")

    required_cols = ["airport_iata_code", "airport_name", "city", "state", "latitude", "longitude"]
    pk_cols = ["airport_iata_code"]

    run_quality_gates_silver_base(
        df=df_tf,
        name="airports_silver",
        required_columns=required_cols,
        pk_columns=pk_cols,
    )

    log.info("[Airports] Quality gates concluídos.")

    # Cria diretório e escreve resultado
    dst.parent.mkdir(parents=True, exist_ok=True)
    df_tf.write.mode("overwrite").parquet(str(dst))

    log.info(f"[Airports] Dataset salvo na camada silver: {dst}.")

except Exception as e:
    log.exception(f"[Airports][ERROR] Falha na execução do job: {e}")
    raise
finally:
    log.info("[Airports] Job de trasnformação de 'airports' encerrado.")


In [11]:
%%script false --no-raise-error # Comentar essa linha se estiver em debug ou se quiser rodar a célula.

df_tf.printSchema()

df_tf.limit(5).show(truncate=False)


In [None]:
# Encerra a sessão Spark
spark.stop()
log.info("[Airlines] Sessão Spark finalizada.")


## Job 3: flights_transform

Este job transforma e normaliza o dataset de voos (`flights`) da **Raw** para a **Silver** (pré-join), gerando um arquivo intermediário a ser consumido na etapa de agregação.

In [None]:
log = get_logger("transform_flights_silver")

spark = get_spark_session("TransformFlightsSilver")
log.info("[Flights] Sessão Spark iniciada.")

# Ajustes performance
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.shuffle.partitions", "32")


### Definindo funções de normalização das colunas de datas, de diferença de tempo e transformação

In [14]:
def normalize_time_expr(col_name: str) -> F.Column:
    """
    Normaliza valores de horário removendo casas decimais e preenchendo zeros à esquerda.
    """
    return F.when(
        F.col(col_name).isNotNull(),
        F.lpad(F.regexp_replace(F.col(col_name).cast("string"), r"\.0$", ""), 4, "0")
    )


def abs_min_diff(c1: str, c2: str) -> F.Column:
    """
    Retorna a diferença absoluta entre dois horários em minutos.
    """
    return F.abs(F.col(c1).cast("long") - F.col(c2).cast("long")) / 60.0


def transform_flights(df: DataFrame) -> DataFrame:
    """
    Transforma e valida o DataFrame de voos (pré-join) para a camada Silver.

    Args:
        df (DataFrame): DataFrame bruto lido da camada Bronze.

    Returns:
        DataFrame: DataFrame transformado e validado.
    """
    log.info("[Flights] Iniciando transformações.")

    # Validação mínima de colunas obrigatórias
    log.info("[Flights] Verificando presença de colunas obrigatórias.")
    required = {"YEAR", "MONTH", "DAY", "AIRLINE", "FLIGHT_NUMBER"}
    missing = required - set(df.columns)
    if missing:
        raise KeyError(f"[Flights][ERROR] Colunas faltando: {missing}.")

    # Filtro inicial e padronização de nomes
    log.info("[Flights] Aplicando filtros iniciais e padronizando nomes.")
    df2 = (
        df.filter((F.col("DIVERTED") != 1) & (F.col("CANCELLED") != 1))
          .toDF(*[c.lower() for c in df.columns])
          .withColumn("flight_date", to_date_from_ymd(F.col("year"), F.col("month"), F.col("day")))
          .withColumnRenamed("year", "flight_year")
          .withColumnRenamed("month", "flight_month")
          .withColumnRenamed("day", "flight_day")
          .withColumnRenamed("day_of_week", "flight_day_of_week")
    )

    # Normalização de horários
    log.info("[Flights] Normalização de colunas com horários.")
    time_cols = [
        "scheduled_departure", "departure_time",
        "scheduled_arrival", "arrival_time",
        "wheels_off", "wheels_on",
    ]

    for col_name in time_cols:
        if col_name in df2.columns:
            tmp = f"{col_name}_str"
            df2 = (
                df2.withColumn(tmp, normalize_time_expr(col_name))
                    .withColumn(
                        col_name,
                        F.to_timestamp(
                            F.concat_ws(" ", F.col("flight_date").cast("string"), F.col(tmp)),
                            "yyyy-MM-dd HHmm"
                        )
                    )
                    .drop(tmp)
            )

    # Detecção de horários trocados
    log.info("[Flights] Correção de horários trocados.")
    df2 = (
        df2
        .withColumn("diff_dep_sched_arr", abs_min_diff("departure_time", "scheduled_arrival"))
        .withColumn("diff_dep_sched_dep", abs_min_diff("departure_time", "scheduled_departure"))
        .withColumn("diff_arr_sched_dep", abs_min_diff("arrival_time", "scheduled_departure"))
        .withColumn("diff_arr_sched_arr", abs_min_diff("arrival_time", "scheduled_arrival"))
        .withColumn(
            "is_swapped",
            (F.col("diff_dep_sched_arr") < F.col("diff_dep_sched_dep")) &
            (F.col("diff_arr_sched_dep") < F.col("diff_arr_sched_arr"))
        )
        .withColumn(
            "departure_time_tmp",
            F.when(F.col("is_swapped"), F.col("arrival_time")).otherwise(F.col("departure_time"))
        )
        .withColumn(
            "arrival_time_tmp",
            F.when(F.col("is_swapped"), F.col("departure_time")).otherwise(F.col("arrival_time"))
        )
        .drop("departure_time", "arrival_time")
        .withColumnRenamed("departure_time_tmp", "departure_time")
        .withColumnRenamed("arrival_time_tmp", "arrival_time")
        .drop(
            "diff_dep_sched_arr", "diff_dep_sched_dep",
            "diff_arr_sched_dep", "diff_arr_sched_arr", "is_swapped"
        )
    )

    # Conversão numérica e substituição de nulos
    log.info("[Flights] Normalizando colunas numéricas.")
    numeric_cols = [
        "departure_delay", "arrival_delay", "taxi_out", "taxi_in",
        "air_time", "elapsed_time", "scheduled_time", "distance",
        "air_system_delay", "security_delay", "airline_delay",
        "late_aircraft_delay", "weather_delay",
    ]
    delay_cols = [
        "air_system_delay", "security_delay", "airline_delay",
        "late_aircraft_delay", "weather_delay",
    ]

    for c in numeric_cols:
        if c in df2.columns:
            expr = F.col(c).cast(DoubleType())
            if c in delay_cols:
                expr = F.coalesce(expr, F.lit(0.0))
            df2 = df2.withColumn(c, expr)

    # Ajuste de voos overnight
    log.info("[Flights] Ajustes nos voos que atravessa dia.")
    df2 = (
        df2.withColumn(
            "is_overnight_flight",
            F.when(
                (F.col("arrival_time").isNotNull()) &
                (F.col("departure_time").isNotNull()) &
                (F.hour(F.col("arrival_time")) < F.hour(F.col("departure_time"))),
                F.lit(True)
            ).otherwise(F.lit(False))
        )
        .withColumn(
            "arrival_time",
            F.when(F.col("is_overnight_flight"),
                   F.col("arrival_time") + F.expr("INTERVAL 1 DAY"))
             .otherwise(F.col("arrival_time"))
        )
    )

    # Filtros finais
    log.info("[Flights] Aplicando filtros finais.")
    df2 = df2.filter(
        (F.col("departure_time").isNotNull()) &
        (F.col("arrival_time").isNotNull()) &
        (F.col("arrival_time") > F.col("departure_time")) &
        (~F.col("origin_airport").rlike("^[0-9]+$")) &
        (~F.col("destination_airport").rlike("^[0-9]+$"))
    )

    # Remoção de colunas desnecessárias
    log.info("[Flights] Removendo colunas desnecessárias.")
    drop_cols = [c for c in ["diverted", "cancelled", "cancellation_reason"] if c in df2.columns]
    if drop_cols:
        df2 = df2.drop(*drop_cols)

    log.info("[Flights] Transformação concluída.")

    return df2


### Runner para o job `flights_transform`

In [None]:
try:
    log.info("[Flights] Iniciando job de trasnformação de 'flights'.")

    # Resolve partição e caminhos
    source_partition = find_partition(raw_path, mode=run_mode, date_str=run_date)
    src = Path(raw_path) / source_partition / "PARQUET" / "flights.parquet"
    dst_dir = Path(silver_path) / source_partition / "PARQUET"
    dst = dst_dir / "flights_pre_join.parquet"
    airports_src = dst_dir / "airports.parquet"

    if not src.exists():
        raise FileNotFoundError(f"[Flights][ERROR] Arquivo não encontrado: {src}.")
    if not airports_src.exists():
        raise FileNotFoundError(f"[Flights][ERROR] Airports não encontrado na silver: {airports_src}.")

    # Leitura e transformação
    log.info(f"[Flights] Lendo datasets: {src} e {airports_src}.")
    
    df = spark.read.parquet(str(src))
    airports_df = spark.read.parquet(str(airports_src))
    df_tf = transform_flights(df)
    
    # Quality gate
    run_quality_gates_silver_flights(df_tf, airports_df)

    # Escrita do resultado intermediário
    dst_dir.mkdir(parents=True, exist_ok=True)

    spark.conf.set("spark.sql.codegen.wholeStage", "false")
    df_tf.coalesce(1).write.mode("overwrite").parquet(str(dst))
    spark.conf.set("spark.sql.codegen.wholeStage", "true")

    log.info(f"[Flights] Dataset salvo na silver: {dst}.")

    # Libera cache
    df_tf.unpersist()

except Exception as e:
    log.exception(f"[Flights][ERROR] Falha na execução do job: {e}.")
    raise
finally:
    log.info("[Flights] Fim do job de transformação de 'flights'.")


In [16]:
%%script false --no-raise-error # Comentar essa linha se estiver em debug ou se quiser rodar a célula.

path_df = Path(silver_path) / "2025-11-12" / "PARQUET" / "flights_pre_join.parquet" # Verificar a data quando rodar

df = spark.read.parquet(str(path_df))

df.printSchema()

df.limit(5).show(truncate=True)


In [None]:
# Encerra a sessão Spark
spark.stop()
log.info("[Flights] Sessão Spark finalizada.")


## Job 4: silver_aggregate

Este job realiza a agregação da camada **Silver**, unindo os datasets já tratados de `flights`, `airlines` e `airports` em um único dataset consolidado `flights_aggregated.parquet`, conforme o **DDL** da camada.

In [None]:
log = get_logger("silver_aggregate")

spark = get_spark_session("RefinementSilverAggregate")
log.info("[Aggregate] Sessão Spark iniciada.")

# Ajustes performance
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.shuffle.partitions", "32")


### Definido função de agregação

In [19]:
def create_aggregated_flights_df(
    flights_silver_df: DataFrame,
    airlines_silver_df: DataFrame,
    airports_silver_df: DataFrame,
) -> DataFrame:
    """
    Constrói o DataFrame agregado da camada Silver (flights_aggregated), unindo:
        - flights_pre_join.parquet
        - airlines.parquet
        - airports.parquet

    Args:
        flights_silver_df (DataFrame): Dataset de voos já transformado (pré-join).
        airlines_silver_df (DataFrame): Dataset transformado de companhias aéreas.
        airports_silver_df (DataFrame): Dataset transformado de aeroportos.

    Returns:
        DataFrame: Dataset consolidado no formato final da camada Silver.
    """

    log.info("[Aggregate] Iniciando agregação dos datasets Silver.")

    # Detecta coluna de companhia aérea no dataset 'flights'
    if "airline_iata_code" in flights_silver_df.columns:
        airline_col = "airline_iata_code"
    elif "airline" in flights_silver_df.columns:
        airline_col = "airline"
    else:
        raise KeyError("Nenhuma coluna de companhia aérea encontrada no dataset flights.")

    # Detecta colunas de origem e destino no dataset 'flights'
    if "origin_airport_iata_code" in flights_silver_df.columns:
        origin_col = "origin_airport_iata_code"
    elif "origin_airport" in flights_silver_df.columns:
        origin_col = "origin_airport"
    else:
        raise KeyError("Coluna de aeroporto de origem não encontrada.")

    if "dest_airport_iata_code" in flights_silver_df.columns:
        dest_col = "dest_airport_iata_code"
    elif "destination_airport" in flights_silver_df.columns:
        dest_col = "destination_airport"
    else:
        raise KeyError("Coluna de aeroporto de destino não encontrada.")

    # Join com airlines
    df_joined = flights_silver_df.join(
        airlines_silver_df,
        flights_silver_df[airline_col] == airlines_silver_df["airline_iata_code"],
        how="left",
    )

    # Seleciona campos para aeroportos de origem
    df_origin = (
        airports_silver_df.select(
            F.col("airport_iata_code").alias("origin_airport_iata_code"),
            F.col("airport_name").alias("origin_airport_name"),
            F.col("city").alias("origin_city"),
            F.col("state").alias("origin_state"),
            F.col("latitude").alias("origin_latitude"),
            F.col("longitude").alias("origin_longitude"),
        )
    )

    # Seleciona campos para aeroportos de destino
    df_dest = (
        airports_silver_df.select(
            F.col("airport_iata_code").alias("dest_airport_iata_code"),
            F.col("airport_name").alias("dest_airport_name"),
            F.col("city").alias("dest_city"),
            F.col("state").alias("dest_state"),
            F.col("latitude").alias("dest_latitude"),
            F.col("longitude").alias("dest_longitude"),
        )
    )

    # Join com aeroportos de origem e destino
    df_joined = (
        df_joined.join(
            df_origin,
            df_joined[origin_col] == F.col("origin_airport_iata_code"),
            how="left",
        )
        .join(
            df_dest,
            df_joined[dest_col] == F.col("dest_airport_iata_code"),
            how="left",
        )
    )

    # Mapeamento dos tipos
    schema_casts = {
        "flight_id": "bigint",
        "flight_year": "smallint",
        "flight_month": "smallint",
        "flight_day": "smallint",
        "flight_day_of_week": "smallint",
        "flight_date": "date",

        "airline_iata_code": "string",
        "airline_name": "string",

        "flight_number": "int",
        "tail_number": "string",

        "origin_airport_iata_code": "string",
        "origin_airport_name": "string",
        "origin_city": "string",
        "origin_state": "string",
        "origin_latitude": "double",
        "origin_longitude": "double",

        "dest_airport_iata_code": "string",
        "dest_airport_name": "string",
        "dest_city": "string",
        "dest_state": "string",
        "dest_latitude": "double",
        "dest_longitude": "double",

        "scheduled_departure": "timestamp",
        "departure_time": "timestamp",
        "scheduled_arrival": "timestamp",
        "arrival_time": "timestamp",
        "wheels_off": "timestamp",
        "wheels_on": "timestamp",

        "departure_delay": "double",
        "arrival_delay": "double",
        "taxi_out": "double",
        "taxi_in": "double",
        "air_time": "double",
        "elapsed_time": "double",
        "scheduled_time": "double",
        "distance": "double",

        "is_overnight_flight": "boolean",

        "air_system_delay": "double",
        "security_delay": "double",
        "airline_delay": "double",
        "late_aircraft_delay": "double",
        "weather_delay": "double",
    }

    for col_name, spark_type in schema_casts.items():
        if col_name in df_joined.columns:
            df_joined = df_joined.withColumn(col_name, F.col(col_name).cast(spark_type))

    # Seleção final conforme o ddl
    final_df = df_joined.select(
        "flight_year",
        "flight_month",
        "flight_day",
        "flight_day_of_week",
        "flight_date",
        "airline_iata_code",
        "airline_name",
        "flight_number",
        "tail_number",
        "origin_airport_iata_code",
        "origin_airport_name",
        "origin_city",
        "origin_state",
        "origin_latitude",
        "origin_longitude",
        "dest_airport_iata_code",
        "dest_airport_name",
        "dest_city",
        "dest_state",
        "dest_latitude",
        "dest_longitude",
        "scheduled_departure",
        "departure_time",
        "scheduled_arrival",
        "arrival_time",
        "wheels_off",
        "wheels_on",
        "departure_delay",
        "arrival_delay",
        "taxi_out",
        "taxi_in",
        "air_time",
        "elapsed_time",
        "scheduled_time",
        "distance",
        "air_system_delay",
        "security_delay",
        "airline_delay",
        "late_aircraft_delay",
        "weather_delay",
    )

    # Cria pk sequencial ordenada
    window_spec = (
        Window
        .orderBy(
            F.col("flight_date").asc(),
            F.col("airline_iata_code").asc(),
            F.col("flight_number").asc(),
            F.col("origin_airport_iata_code").asc(),
            F.col("departure_time").asc(),
        )
    )

    # Gera pk determinística e sequencial
    final_df = final_df.withColumn(
        "flight_id",
        F.row_number().over(window_spec)
    )

    # Reordena colunas para a pk ser a primeira
    final_df = final_df.select(
        "flight_id",
        *[c for c in final_df.columns if c != "flight_id"]
    )

    log.info("[Aggregate] Agregação concluída com sucesso.")

    return final_df


### Runner para o job `silver_aggregate`

In [None]:
try:
    log.info("[Aggregate] Iniciando job de agregação da camada Silver.")

    # Resolve partição e caminhos
    source_partition: str = find_partition(
        base_path=silver_path,
        mode=run_mode,
        date_str=run_date,
    )
    base_dir: Path = Path(silver_path) / source_partition / "PARQUET"

    flights_path: Path  = base_dir / "flights_pre_join.parquet"
    airlines_path: Path = base_dir / "airlines.parquet"
    airports_path: Path = base_dir / "airports.parquet"

    # Verifica existência dos arquivos necessários
    for required_file in [flights_path, airlines_path, airports_path]:
        if not required_file.exists():
            raise FileNotFoundError(
                f"[Refinement][Aggregate][ERROR] Arquivo esperado não encontrado: {required_file}"
            )

    # Leitura dos datasets
    log.info("[Aggregate] Lendo datasets silver (flights, airlines, airports).")

    df_flights  = spark.read.parquet(str(flights_path))
    df_airlines = spark.read.parquet(str(airlines_path))
    df_airports = spark.read.parquet(str(airports_path))

    # Construção do DataFrame agregado
    aggregated_df: DataFrame = create_aggregated_flights_df(
        flights_silver_df=df_flights,
        airlines_silver_df=df_airlines,
        airports_silver_df=df_airports,
    )

    # Filtra registros inválidos de aeroportos
    aggregated_df = aggregated_df.filter(
        F.col("origin_airport_iata_code").isNotNull()
        & F.col("dest_airport_iata_code").isNotNull()
    )

    # Quality gates
    run_quality_gates_silver_aggregated(aggregated_df)

    # Escrita do arquivo final
    output_path: Path = base_dir / "flights_aggregated.parquet"
    aggregated_df.coalesce(1).write.mode("overwrite").parquet(str(output_path))

    log.info(f"[Aggregate] Dataset agregado salvo em: {output_path}")

    # Libera cache após uso
    aggregated_df.unpersist()

except Exception as e:
    log.exception(f"[Aggregate][ERROR] Falha na execução do job: {e}.")
    raise

finally:
    log.info("[Aggregate] Job de agregação encerrado.")


In [21]:
%%script false --no-raise-error # Comentar essa linha se estiver em debug ou se quiser rodar a célula.

aggregated_df.printSchema()

aggregated_df.limit(5).show(truncate=True)


In [None]:
# Encerra a sessão Spark
spark.stop()
log.info("[Aggregate] Sessão Spark finalizada.")


## Job 5: silver_load

Este job realiza a carga do dataset agregado da camada **Silver** (`flights_aggregated.parquet`) para o PostgreSQL, populando a tabela `silver.silver_flights` conforme o ddl da camada.

In [None]:
log = get_logger("silver_load")

spark = get_spark_session("SilverLoad")
log.info("[SilverLoad] Sessão Spark iniciada.")


### Runner para o job `silver_load`

In [None]:
try:
    log.info("[SilverLoad] Iniciando execução do job de carga.")

    # Resolve partição e caminhos
    partition = find_partition(
        base_path=silver_path,
        mode=run_mode,
        date_str=run_date,
    )

    base_dir = Path(silver_path) / partition / "PARQUET"
    parquet_path = base_dir / "flights_aggregated.parquet"

    if not parquet_path.exists():
        raise FileNotFoundError(
            f"[SilverLoad][ERROR] Arquivo não encontrado: {parquet_path}."
        )

    log.info(f"[SilverLoad] Lendo dataset agregado: {parquet_path}.")
    df = spark.read.parquet(str(parquet_path))

    log.info(f"[SilverLoad] Tuplas encontradas: {df.count():,}.")

    # Carga para PostgreSQL
    try:
        log.info("[SilverLoad] Inserindo dados em silver.silver_flights.")

        load_to_postgres(
            df=df,
            db_conn_id=postgres_conn_id,
            table_name="silver.silver_flights",
        )

        log.info("[SilverLoad] Validando tuplas da tabela após a carga.")
        
        # Valida carga
        expected_count = df.count()
        assert_table_rowcount(
            db_conn_id=postgres_conn_id,
            table_name="silver.silver_flights",
            expected_count=expected_count,
        )

        log.info("[SilverLoad] Validação pós-carga encerrada.")

    except Exception as e:
        log.exception(f"[Refinement][SilverLoad][ERROR] Erro durante carga no PostgreSQL: {e}")
        raise

    log.info("[SilverLoad] Carga concluída com sucesso.")

except Exception as e:
    log.exception(f"[SilverLoad][ERROR] Falha na execução: {e}")
    raise
finally:
    log.info("[SilverLoad] Job de carga encerrado.")


In [28]:
%%script false --no-raise-error # Comentar essa linha se estiver em debug ou se quiser rodar a célula.

df.printSchema()

df.limit(2).show(truncate=False)


In [None]:
# Encerra a sessão Spark
spark.stop()
log.info("[SilverLoad] Sessão Spark finalizada.")


## Jog 6: silver_cleanup

Este job remove arquivos intermediários da camada **Silver**, mantendo apenas o arquivo final `flights_aggregated.parquet` na partição.

In [None]:
log = get_logger("silver_cleanup")

spark = get_spark_session("SilverCleanupJob")
log.info("[SilverCleanup] Sessão Spark iniciada.")

# Ajustes performance
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.shuffle.partitions", "32")

### Runner para o job `silver_cleanup`

In [None]:
try:
    log.info("[SilverCleanup] Iniciando job de limpeza da camada silver.")

    # Resolve partição e caminhos
    partition = find_partition(
        base_path=silver_path,
        mode=run_mode,
        date_str=run_date,
    )
    partition_dir = Path(silver_path) / partition / "PARQUET"

    if not partition_dir.exists():
        raise FileNotFoundError(
            f"[Refinement][SilverCleanup][ERROR] Diretório de partição não encontrado: {partition_dir}."
        )

    # Apenas este diretório deve permanecer
    keep = {"flights_aggregated.parquet"}

    log.info(f"[SilverCleanup] Diretório alvo: {partition_dir}.")
    log.info(f"[SilverCleanup] Mantendo apenas: {keep}.")

    for item in partition_dir.iterdir():
        # Se for o agregado, não mexe
        if item.name in keep:
            log.info(f"[SilverCleanup] Mantendo: {item}")
            continue

        # Remove diretórios intermediários
        if item.is_dir():
            try:
                shutil.rmtree(item)
                log.info(f"[SilverCleanup] Diretório removido: {item}.")
            except Exception as e:
                log.warning(f"[SilverCleanup] Falha ao remover diretório {item}: {e}.")
            continue

        # Remove arquivos soltos
        if item.is_file():
            try:
                item.unlink()
                log.info(f"[SilverCleanup] Arquivo removido: {item}.")
            except Exception as e:
                log.warning(f"[SilverCleanup] Falha ao remover arquivo {item}: {e}.")

    log.info("[SilverCleanup] Limpeza concluída com sucesso.")

except Exception as e:
    log.exception(f"[SilverCleanup][ERROR] Falha durante o cleanup: {e}")
    raise

finally:
    log.info("[SilverCleanup] Job de limpeza da camada silver encerrado.")


In [32]:
%%script false --no-raise-error # Comentar essa linha se estiver em debug ou se quiser rodar a célula.

partition = find_partition(silver_path, mode=run_mode, date_str=run_date)
for item in (Path(silver_path) / partition / "PARQUET").iterdir():
    print(item)


In [None]:
# Encerra a sessão Spark
spark.stop()
log.info("[SilverCleanup] Sessão Spark finalizada.")
