# etl_silver_to_gold
---
Este notebook executa o processo `ETL` que transfere os dados da camada **Silver** para a **Gold**, englobando normalização, movimentação dos arquivos e carga dos dados no *PostgreSQL*, dando finalidade ao pipeline.


In [1]:
# Parameters

run_mode = "latest"
run_date = None

silver_path = "/opt/airflow/data-layer/silver"
gold_path = "/opt/airflow/data-layer/gold"

aggregated_name = "flights_aggregated.parquet"
postgres_conn_id = "AIRFLOW_VAR_POSTGRES_CONN_ID"


In [2]:
import os
import shutil
from datetime import datetime
from pathlib import Path

from transformer.utils.file_io import find_partition
from transformer.utils.logger import get_logger
from transformer.utils.spark_helpers import get_spark_session, load_to_postgres, read_from_postgres
from transformer.utils.postgre_helpers import assert_table_rowcount
from transformer.utils.quality_gates_gold import run_quality_gates_gold

from pyspark.sql import DataFrame, functions as F
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.types import DateType


## Job 1: build_and_load_gold_star_schema

Este job realiza a construção do esquema estrela da camada **Gold**, materializando as tabelas dimensionais e fato a partir da tabela `silver_flights`, salva os dados em formato `parquet` na camada **Gold** e carregando os dados no *PostgreSQL* de acordo com o ddl da camada.


In [3]:
log = get_logger("build_and_load_gold_star_schema")

spark = get_spark_session("BuildLoadGoldStarSchema")
log.info("[BuildLoad] SparkSession iniciada.")


/usr/local/lib/python3.12/site-packages/pyspark/bin/load-spark-env.sh: line 68: ps: command not found


:: loading settings :: url = jar:file:/usr/local/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c9643096-85a2-4dc2-acab-117d373dfd24;1.0
	confs: [default]
	found org.postgresql#postgresql;42.7.3 in central
	found org.checkerframework#checker-qual;3.42.0 in central
:: resolution report :: resolve 160ms :: artifacts dl 8ms
	:: modules in use:
	org.checkerframework#checker-qual;3.42.0 from central in [default]
	org.postgresql#postgresql;42.7.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	---------------------------------------------------------

### Definindo função de materialização

In [4]:
def materialize_gold_layer(df: DataFrame) -> dict[str, DataFrame]:
    """
    Materializa as tabelas dimensionais e fato da camada gold a partir do DataFrame agregado da Silver.

    Args:
        df (DataFrame): DataFrame consolidado da camada Gold.

    Returns:
        dict[str, DataFrame]: DataFrames correspondentes a dim_air, dim_apt, dim_dat e fat_flt.
    """

    # Feriados federais nos EUA em 2015 (UTC)
    us_holidays_2015 = [
        "2015-01-01",
        "2015-01-19",
        "2015-02-16",
        "2015-05-25",
        "2015-07-04",
        "2015-09-07",
        "2015-10-12",
        "2015-11-11",
        "2015-11-26",
        "2015-12-25",
    ]
    holidays_df = (
        spark.createDataFrame([(d,) for d in us_holidays_2015], ["holiday_date"])
            .withColumn("holiday_date", F.col("holiday_date").cast(DateType()))
    )
    
    log.info("[Materialize] Iniciando materialização da camada Gold.")

    # dim_air
    log.info("[Materialize] Materializando 'dim_air'.")
    w_air = Window.orderBy("air_iat")
    dim_air = (
        df.select(
            F.col("airline_iata_code").alias("air_iat"),
            F.col("airline_name").alias("air_nam")
        )
        .distinct()
        .withColumn("srk_air", F.row_number().over(w_air))
        .select("srk_air", "air_iat", "air_nam")
    )

    # dim_apt
    log.info("[Materialize] Materializando 'dim_apt'.")
    
    state_map = F.create_map(
    	[F.lit(x) for x in [
    		"AL","Alabama","AK","Alaska","AZ","Arizona","AR","Arkansas","CA","California",
        	"CO","Colorado","CT","Connecticut","DE","Delaware","FL","Florida","GA","Georgia",
        	"HI","Hawaii","ID","Idaho","IL","Illinois","IN","Indiana","IA","Iowa","KS","Kansas",
        	"KY","Kentucky","LA","Louisiana","ME","Maine","MD","Maryland","MA","Massachusetts",
        	"MI","Michigan","MN","Minnesota","MS","Mississippi","MO","Missouri","MT","Montana",
        	"NE","Nebraska","NV","Nevada","NH","New Hampshire","NJ","New Jersey","NM","New Mexico",
        	"NY","New York","NC","North Carolina","ND","North Dakota","OH","Ohio","OK","Oklahoma",
        	"OR","Oregon","PA","Pennsylvania","RI","Rhode Island","SC","South Carolina",
        	"SD","South Dakota","TN","Tennessee","TX","Texas","UT","Utah","VT","Vermont",
        	"VA","Virginia","WA","Washington","WV","West Virginia","WI","Wisconsin","WY","Wyoming",
        	"DC","District of Columbia"
   	 	]]
    )
    
    apt_base = (
        df.select(
            F.col("origin_airport_iata_code").alias("apt_iat"),
            F.col("origin_airport_name").alias("apt_nam"),
            F.col("origin_state").alias("ste_cod"),
            F.coalesce(state_map[F.col("origin_state")], F.lit("Unknown")).alias("ste_nam"),
            F.col("origin_city").alias("cty_nam"),
            F.col("origin_latitude").alias("lat_val"),
            F.col("origin_longitude").alias("lon_val")
        )
        .union(
            df.select(
                F.col("dest_airport_iata_code").alias("apt_iat"),
                F.col("dest_airport_name").alias("apt_nam"),
                F.col("dest_state").alias("ste_cod"),
                F.coalesce(state_map[F.col("dest_state")], F.lit("Unknown")).alias("ste_nam"),
                F.col("dest_city").alias("cty_nam"),
                F.col("dest_latitude").alias("lat_val"),
                F.col("dest_longitude").alias("lon_val")
            )
        )
    )

    w_apt = Window.orderBy("apt_iat")
    dim_apt = (
        apt_base.distinct()
        .withColumn("srk_apt", F.row_number().over(w_apt))
        .select(
            "srk_apt", "apt_iat", "apt_nam",
            "ste_cod", "ste_nam", "cty_nam",
            "lat_val", "lon_val"
        )
    )

    # dim_dat
    log.info("[Materialize] Materializando 'dim_dat'.")
    dat_base = (
        df.select(F.col("flight_date").alias("ful_dat"))
          .distinct()
          .withColumn("yer", F.year("ful_dat"))
          .withColumn("mth", F.month("ful_dat"))
          .withColumn("day", F.dayofmonth("ful_dat"))
          .withColumn("dow", F.dayofweek("ful_dat"))
          .withColumn("qtr", F.quarter("ful_dat"))
          .join(holidays_df, F.col("ful_dat") == F.col("holiday_date"), "left")
          .withColumn("hol_flg", F.col("holiday_date").isNotNull())
          .drop("holiday_date")
    )

    w_dat = Window.orderBy("ful_dat")
    dim_dat = (
        dat_base.distinct()
          .withColumn("srk_dat", F.row_number().over(w_dat))
          .select("srk_dat", "ful_dat", "yer", "mth", "day", "dow", "qtr", "hol_flg")
    )

    # Adiciona surrogate keys na fat_flt
    fat = (
        df
        .withColumnRenamed("flight_date", "ful_dat")
        .join(
            dim_air.select("srk_air", "air_iat"),
            df.airline_iata_code == F.col("air_iat"), "left")
        .join(
            dim_apt.select(
                F.col("srk_apt").alias("srk_ori"),
                F.col("apt_iat").alias("ori_iat")
            ),
            df.origin_airport_iata_code == F.col("ori_iat"),
            "left"
        )
        .join(
            dim_apt.select(
                F.col("srk_apt").alias("srk_dst"),
                F.col("apt_iat").alias("dst_iat")
            ),
            df.dest_airport_iata_code == F.col("dst_iat"),
            "left"
        )
        .join(
            dim_dat.select("srk_dat", "ful_dat"),
            "ful_dat",
            "left"
        )
    )

    # Gera a sk sequencial para fat_flt
    w_flt = Window.orderBy("ful_dat", "airline_iata_code", "origin_airport_iata_code",
                           "dest_airport_iata_code", "scheduled_departure")

    fat_flt = (
        fat.withColumn("srk_flt", F.row_number().over(w_flt))
           .select(
               "srk_flt", "srk_dat", "srk_air", "srk_ori", "srk_dst",
               F.col("scheduled_departure").alias("sch_dep"),
               F.col("departure_time").alias("dep_tme"),
               F.col("scheduled_arrival").alias("sch_arr"),
               F.col("arrival_time").alias("arr_tme"),
               F.col("distance").alias("dis_val"),
               F.col("air_time").alias("air_tme"),
               F.col("elapsed_time").alias("elp_tme"),
               F.col("scheduled_time").alias("sch_tme"),
               F.col("departure_delay").alias("dep_dly"),
               F.col("arrival_delay").alias("arr_dly"),
               F.col("air_system_delay").alias("sys_dly"),
               F.col("security_delay").alias("sec_dly"),
               F.col("airline_delay").alias("air_dly"),
               F.col("late_aircraft_delay").alias("acf_dly"),
               F.col("weather_delay").alias("wea_dly"),
               F.col("is_overnight_flight").alias("ovn_flg")
           )
    )

    return {
        "dim_air": dim_air,
        "dim_apt": dim_apt,
        "dim_dat": dim_dat,
        "fat_flt": fat_flt
    }


### Runner para o job `build_and_load_gold_star_schema`

In [5]:
try:
    log.info("[BuildLoad] Iniciando job de materialização da gold.")

    df = read_from_postgres(
        spark=spark,
        db_conn_id=postgres_conn_id,
        table_name="silver.silver_flights",
    )

    log.info(f"[BuildLoad] Datasets carregado a partir do PostgreSQL.")

    # Materializando
    tables = materialize_gold_layer(df)
    dim_air = tables["dim_air"]
    dim_apt = tables["dim_apt"]
    dim_dat = tables["dim_dat"]
    fat_flt = tables["fat_flt"]

    # Executa quality gate
    log.info("[BuildLoad] Iniciando quality gate.")
    run_quality_gates_gold(
        dim_air=dim_air,
        dim_apt=dim_apt,
        dim_dat=dim_dat,
        fat_flt=fat_flt
    )
    log.info("[BuildLoad] Quality gate concluído com sucesso.")

    """
    # *** DEBUG ***
    # Define partição de saída
    processing_date = datetime.now().strftime("%Y-%m-%d")
    output_dir = Path(gold_path) / processing_date / "PARQUET"
    output_dir.mkdir(parents=True, exist_ok=True)
    
    # Escreve os arquivos na gold
    log.info("[BuildLoad] Iniciando escrita dos arquivos na camada gold.")

    dim_air.write.mode("overwrite").parquet(str(output_dir / "dim_air.parquet"))
    dim_apt.write.mode("overwrite").parquet(str(output_dir / "dim_apt.parquet"))
    dim_dat.write.mode("overwrite").parquet(str(output_dir / "dim_dat.parquet"))
    fat_flt.write.mode("overwrite").parquet(str(output_dir / "fat_flt.parquet"))

    log.info("[BuildLoad] Escrita concluída com sucesso.")
    """
    
    log.info("[BuildLoad] Iniciando carga da gold.")

    tables = {
        "dim_air": dim_air,
        "dim_apt": dim_apt,
        "dim_dat": dim_dat,
        "fat_flt": fat_flt,
    }

    # Carga no PostgreSQL e validação
    for table_name, df in tables.items():
        full_table_name = f"gold.{table_name}"

        log.info(f"[BuildLoad] Carregando tabela: {full_table_name}.")
        expected_count = df.count()

        # Carga no PostgreSQL
        load_to_postgres(
            df=df,
            db_conn_id=postgres_conn_id,
            table_name=full_table_name,
            mode="overwrite"
        )

        log.info(f"[BuildLoad] Tabela '{full_table_name}' carregada. Validando integridade.")

        # Validação (fallback se falhar)
        try:
            assert_table_rowcount(
                db_conn_id=postgres_conn_id,
                table_name=full_table_name,
                expected_count=expected_count,
            )
        except Exception as e:
            log.error(f"[BuildLoad][ERROR] Validação falhou para '{full_table_name}'. Limpando tabela.")

            import psycopg2

            with psycopg2.connect(
                host=os.getenv("DB_HOST", "localhost"),
                dbname=os.getenv("DB_NAME", "postgres"),
                user=os.getenv("DB_USER", "postgres"),
                password=os.getenv("DB_PASSWORD", "postgres"),
            ) as conn_pg:
                with conn_pg.cursor() as cur:
                    cur.execute(f"TRUNCATE TABLE {full_table_name} CASCADE;")
                    conn_pg.commit()

            raise ValueError(f"[BuildLoad][ERROR] Falha na validação da tabela '{full_table_name}'.") from e

        log.info(f"[BuildLoad] Validação concluída com sucesso: {full_table_name}.")

    log.info("[BuildLoad] Carga de todas as tabelas concluída com sucesso.") 

except Exception as e:
    log.exception(f"[BuildLoad][ERROR] Falha na construção do esquema estrela: {e}.")
    raise

finally:
    log.info("[BuildLoad] Job de materialização da gold encerrado.")


2025-12-11 02:46:29 [INFO] build_and_load_gold_star_schema: [BuildLoad] Iniciando job de materialização da gold.
2025-12-11 02:46:29 [INFO] spark_helpers: [READ] Iniciando leitura de 'silver.silver_flights'.
2025-12-11 02:46:29 [WARN] spark_helpers: [WARN] Airflow indisponível, usando variáveis de ambiente para conexão PostgreSQL.
2025-12-11 02:46:34 [INFO] spark_helpers: [READ] Leitura concluída: 'silver.silver_flights'. Linhas: 5208259
2025-12-11 02:46:34 [INFO] build_and_load_gold_star_schema: [BuildLoad] Datasets carregado a partir do PostgreSQL.
2025-12-11 02:46:34 [INFO] build_and_load_gold_star_schema: [Materialize] Iniciando materialização da camada Gold.
2025-12-11 02:46:34 [INFO] build_and_load_gold_star_schema: [Materialize] Materializando 'dim_air'.
2025-12-11 02:46:34 [INFO] build_and_load_gold_star_schema: [Materialize] Materializando 'dim_apt'.
2025-12-11 02:46:35 [INFO] build_and_load_gold_star_schema: [Materialize] Materializando 'dim_dat'.
2025-12-11 02:46:35 [INFO] b

In [6]:
%%script false --no-raise-error # Comentar essa linha se estiver em debug ou se quiser rodar a célula.

# Verifica arquivos

df_show = {
    "dim_air": dim_air,
    "dim_apt": dim_apt,
    "dim_dat": dim_dat,
    "fat_flt": fat_flt
}

for name, d in df_show.items():
    print(f"\n{name}\n")
    d.printSchema()
    d.limit(1).show(truncate=True)

# Verifica tabelas

jdbc_url = f"jdbc:postgresql://{os.getenv('DB_HOST', 'localhost')}:{os.getenv('DB_PORT', '5432')}/{os.getenv('DB_NAME', 'postgres')}"
connection_properties = {
    "user": os.getenv("DB_USER", "postgres"),
    "password": os.getenv("DB_PASSWORD", "postgres"),
    "driver": "org.postgresql.Driver",
}

tables_to_check = ["dim_air", "dim_apt", "dim_dat"]
for tbl in tables_to_check:
    print(f"\n gold.{tbl} \n")
    df_check = spark.read.jdbc(url=jdbc_url, table=f"gold.{tbl}", properties=connection_properties)
    df_check.limit(1).show(truncate=True)


In [7]:
# Encerrando a sessão do Spark.
spark.stop()
log.info("[BuildLoad] Sessão Spark finalizada.")


2025-12-11 03:00:18 [INFO] build_and_load_gold_star_schema: [BuildLoad] Sessão Spark finalizada.
