###### **1º Passo: Destino dos dados raw -> silver**

In [1]:
lk_lakehouse_raw = 'lk_raw_dados_publicos'
lk_lakehouse_silver = 'lk_silver_dados_publicos'

StatementMeta(, 1a6c6b07-11dd-4282-9406-e83e9ec9b768, 3, Finished, Available, Finished)

###### **2º Passo: Bibliotecas**

In [2]:
from pyspark.sql.functions import *

StatementMeta(, 1a6c6b07-11dd-4282-9406-e83e9ec9b768, 4, Finished, Available, Finished)

###### **3º Passo: Tabelas Dimensões**

In [3]:
# 1. Checagem de existência

if not spark.catalog.tableExists(f"{lk_lakehouse_silver}.tb_dim_uf_mun"):


#2. Leitura das tabelas RAW

    df_mun = spark.table(f"{lk_lakehouse_raw}.tb_dim_uf_mun")
    df_geo = spark.table(f"{lk_lakehouse_raw}.tb_dim_municipios_geoloc")


#3. Normalização de nomes (para fallback)

    df_mun = df_mun.withColumn(
        "nome_norm",
        lower(regexp_replace(col("NO_MUN_MIN"), "[^a-zA-Z0-9 ]", ""))
    )

    df_geo = df_geo.withColumn(
        "nome_norm",
        lower(regexp_replace(col("name_muni"), "[^a-zA-Z0-9 ]", ""))
    )


#4. Tabela de correções manuais de código IBGE

    correcoes = [
        (3406607, 3506607),  # Biritiba Mirim
        (5320157, 5220157),  # São Luiz do Norte
        (3450001, 3550001),  # São Luiz do Paraitinga
        (3430805, 3530805),  # Mogi Mirim
        (3422158, 3515004),  # Embu das Artes
        (3415004, 5000609),  # Amambai
        (5200609, 3522158)   # Itaoca
    ]

    df_correcoes = spark.createDataFrame(
        correcoes,
        ["co_mun_origem", "co_mun_corrigido"]
    )


#5. Aplicação das correções

    df_mun = (
        df_mun
        .join(
            df_correcoes,
            col("co_mun_geo").cast("int") == col("co_mun_origem"),
            "left"
        )
        .withColumn(
            "co_mun_join",
            coalesce(col("co_mun_corrigido"), col("co_mun_geo")).cast("int")
        )
    )


#6. Join principal (código forte + fallback por nome)

    tb_dim_uf_mun = (
        df_mun
        .join(
            df_geo,
            (col("co_mun_join") == col("code_muni").cast("int"))
            | (df_mun.nome_norm == df_geo.nome_norm),
            "left"
        )
        .select(
            col("code_muni").cast("string"),
            col("name_muni").cast("string"),
            col("abbrev_state").cast("string"),
            col("name_region").cast("string"),
            col("latitude").cast("double"),
            col("longitude").cast("double"),
            col("CO_MUN_GEO").cast("int").alias("co_mun_geo"),
            initcap(col("NO_MUN")).alias("no_mun"),
            col("NO_MUN_MIN").alias("no_mun_min"),
            col("SG_UF").alias("sg_uf")
        )
    )


#7. Gravação na camada Silver

    (
        tb_dim_uf_mun
        .write
        .format("delta")
        .mode("overwrite")
        .saveAsTable(f"{lk_lakehouse_silver}.tb_dim_uf_mun")
    )

StatementMeta(, 1a6c6b07-11dd-4282-9406-e83e9ec9b768, 5, Finished, Available, Finished)

In [5]:
if not spark.catalog.tableExists(f"{lk_lakehouse_silver}.tb_dim_pais"):

    df_paises  = spark.table(f"{lk_lakehouse_raw}.tb_dim_pais");
    df_blocos  = spark.table(f"{lk_lakehouse_raw}.tb_dim_pais_bloco");

    df_join = (
        df_paises.alias("p")
        .join(
            df_blocos.alias("r"),
            col("p.co_pais") == col("r.co_pais"),
            "left"
        )
        .select(
            col("p.co_pais"),
            col("p.no_pais"),
            col("p.no_pais_ing"),
            col("p.no_pais_esp"),
            
            col("r.no_bloco"),
            col("r.no_bloco_ing"),
            col("r.no_bloco_esp")
        )
    );

    (
        df_join
        .write
        .format("delta")
        .mode("overwrite")
        .saveAsTable(f"{lk_lakehouse_silver}.tb_dim_pais")
    )

StatementMeta(, 1a6c6b07-11dd-4282-9406-e83e9ec9b768, 6, Finished, Available, Finished)

In [35]:
#Outras tabelas

tabelas = [
    "tb_dim_isic_cuci",
    "tb_dim_ncm",
    "tb_dim_ncm_cgce",
    "tb_dim_ncm_cuci",
    "tb_dim_ncm_fat_agreg",
    "tb_dim_ncm_isic",
    "tb_dim_ncm_ppe",
    "tb_dim_ncm_ppi",
    "tb_dim_ncm_sh",
    "tb_dim_via"
]

for tabela in tabelas:
    destino = f"{lk_lakehouse_silver}.{tabela}"

    if spark.catalog.tableExists(destino):
        print(f"Já existe a {tabela}")
        continue

    (
        spark.table(f"{lk_lakehouse_raw}.{tabela}")
             .write
             .format("delta")
             .saveAsTable(destino)
    )

StatementMeta(, 1a6c6b07-11dd-4282-9406-e83e9ec9b768, 12, Finished, Available, Finished)

Já existe a tb_dim_isic_cuci
Já existe a tb_dim_ncm
Já existe a tb_dim_ncm_cgce
Já existe a tb_dim_ncm_cuci
Já existe a tb_dim_ncm_fat_agreg
Já existe a tb_dim_ncm_isic
Já existe a tb_dim_ncm_ppe
Já existe a tb_dim_ncm_ppi
Já existe a tb_dim_ncm_sh
Já existe a tb_dim_via


###### **4º Passo: Tabelas Fatos**

In [6]:
tb_ft_export = (
    spark.table(f"{lk_lakehouse_raw}.tb_ft_export")
    .select(
#Data de referência (mensal)
        make_date(
            col("CO_ANO").cast("int"),
            col("CO_MES").cast("int"),
            lit(1)
        ).alias("dt_referencia"),

#Chaves
        col("CO_NCM").cast("string").alias("co_ncm"),
        col("CO_UNID").cast("string").alias("co_unid"),
        col("CO_PAIS").cast("string").alias("co_pais"),
        col("SG_UF_NCM").cast("string").alias("sg_uf_ncm"),
        col("CO_VIA").cast("string").alias("co_via"),
        col("CO_URF").cast("string").alias("co_urf"),

#Medidas
        col("QT_ESTAT").cast("double").alias("qt_estat"),
        col("KG_LIQUIDO").cast("double").alias("kg_liquido"),
        col("VL_FOB").cast("double").alias("vl_fob"),

#Atributos
        col("Tipo_Operacao").cast("string").alias("tipo_operacao")
    )
);

StatementMeta(, 1a6c6b07-11dd-4282-9406-e83e9ec9b768, 7, Finished, Available, Finished)

In [7]:
tb_ft_import = (
    spark.table(f"{lk_lakehouse_raw}.tb_ft_import")
    .select(
#Data de referência (mensal)
        make_date(
            col("CO_ANO").cast("int"),
            col("CO_MES").cast("int"),
            lit(1)
        ).alias("dt_referencia"),

#Chaves
        col("CO_NCM").cast("string").alias("co_ncm"),
        col("CO_UNID").cast("string").alias("co_unid"),
        col("CO_PAIS").cast("string").alias("co_pais"),
        col("SG_UF_NCM").cast("string").alias("sg_uf_ncm"),
        col("CO_VIA").cast("string").alias("co_via"),
        col("CO_URF").cast("string").alias("co_urf"),

#Medidas
        col("QT_ESTAT").cast("double").alias("qt_estat"),
        col("KG_LIQUIDO").cast("double").alias("kg_liquido"),
        col("VL_FOB").cast("double").alias("vl_fob"),
        col("VL_FRETE").cast("double").alias("vl_frete"),
        col("VL_SEGURO").cast("double").alias("vl_seguro"),

#Atributos
        col("Tipo_Operacao").cast("string").alias("tipo_operacao")
    )
);

StatementMeta(, 1a6c6b07-11dd-4282-9406-e83e9ec9b768, 8, Finished, Available, Finished)

In [8]:
(
    tb_ft_import
    .write
    .format("delta")
    .mode("overwrite")
    #.partitionBy("dt_referencia")
    .saveAsTable(f"{lk_lakehouse_silver}.tb_ft_import")
);


(
    tb_ft_export
    .write
    .format("delta")
    .mode("overwrite")
    #.partitionBy("dt_referencia")
    .saveAsTable(f"{lk_lakehouse_silver}.tb_ft_export")
);


StatementMeta(, 1a6c6b07-11dd-4282-9406-e83e9ec9b768, 9, Finished, Available, Finished)