In [1]:
import duckdb
import os
from dotenv import load_dotenv, find_dotenv
from deltalake import write_deltalake

load_dotenv(find_dotenv())

con = duckdb.connect()

AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_MINIO")
AWS_SECRET_KEY_ID = os.getenv("AWS_SECRET_KEY_MINIO")
HOST_MINIO = os.getenv("HOST_MINIO")
PORT_MINIO = os.getenv("PORT_MINIO")

con.sql(f""" CREATE SECRET secret1 (
            TYPE S3,
            KEY_ID '{AWS_ACCESS_KEY_ID}',
            SECRET '{AWS_SECRET_KEY_ID}',
            REGION 'us-east-1',
            ENDPOINT '{HOST_MINIO}:{PORT_MINIO}',
            URL_STYLE 'path',
            USE_SSL 'false'

        );
    """)

storage_options = {
    "AWS_ACCESS_KEY_ID": f"{AWS_ACCESS_KEY_ID}",
    "AWS_SECRET_ACCESS_KEY": f"{AWS_SECRET_KEY_ID}",
    "AWS_ENDPOINT_URL": f"http://{HOST_MINIO}:{PORT_MINIO}",
    "AWS_allow_http": "true",
    "AWS_REGION": "us-east-1",
    "AWS_S3_ALLOW_UNSAFE_RENAME": "true",
}

In [2]:
path_bronze = "s3://bronze/uff/projeto_comex"
path_silver = "s3://silver/uff/projeto_comex"

In [3]:
con.sql("INSTALL delta")
con.sql("LOAD delta")

In [4]:
df=con.sql(f""" 
            WITH importacoes AS (
                select * from delta_scan('{path_bronze}/IMP')
                ),
                ncm as (
                select * from delta_scan('{path_bronze}/NCM')   
                ),
                pais as (
                select * from delta_scan('{path_bronze}/PAIS')
                ),
                pais_bloco as (
                select * from delta_scan('{path_bronze}/PAIS_BLOCO')
                ),
                uf as (
                select * from delta_scan('{path_bronze}/UF')
                )
               
            SELECT
            ncm.NOME_NCM as NOME_NCM,
            pais.NOME_PAIS,
            pb.NOME_BLOCO,
            uf.NOME_ESTADO,
            IMP.*
            FROM importacoes IMP
            LEFT JOIN ncm ON IMP.FK_NCM = ncm.CODIGO_NCM
            LEFT JOIN pais ON IMP.FK_PAIS = pais.CODIGO_PAIS
            LEFT JOIN pais_bloco pb ON pais.CODIGO_PAIS = pb.CODIGO_PAIS
            LEFT JOIN uf ON IMP.FK_SIGA_ESTADO = uf.SIGLA_ESTADO            

           """).to_arrow_table()

In [5]:
write_deltalake(
    f"{path_silver}/IMP",
    df,
    mode="append",
    storage_options=storage_options,
    partition_by=["ANO"],
)

In [6]:
max_ano = con.sql("SELECT MAX(ANO) FROM df").to_df().squeeze()

max_mes = con.sql(f"SELECT MAX(MES) FROM df WHERE ANO = {max_ano}").to_df().squeeze()

In [7]:
df = con.sql(f""" 
            WITH importacoes AS (
                select * from delta_scan('{path_bronze}/IMP')
                ),
                ncm as (
                select * from delta_scan('{path_bronze}/NCM')   
                ),
                pais as (
                select * from delta_scan('{path_bronze}/PAIS')
                ),
                pais_bloco as (
                select * from delta_scan('{path_bronze}/PAIS_BLOCO')
                ),
                uf as (
                select * from delta_scan('{path_bronze}/UF')
                )
               
            SELECT
            ncm.NOME_NCM as NOME_NCM,
            pais.NOME_PAIS,
            pb.NOME_BLOCO,
            uf.NOME_ESTADO,
            IMP.*
            FROM importacoes IMP
            LEFT JOIN ncm ON IMP.FK_NCM = ncm.CODIGO_NCM
            LEFT JOIN pais ON IMP.FK_PAIS = pais.CODIGO_PAIS
            LEFT JOIN pais_bloco pb ON pais.CODIGO_PAIS = pb.CODIGO_PAIS
            LEFT JOIN uf ON IMP.FK_SIGA_ESTADO = uf.SIGLA_ESTADO     
           
            WHERE IMP.ANO > {max_ano} OR
            (
                IMP.ANO = {max_ano} AND MES > {max_mes}
            )
                
""").to_arrow_table()

In [8]:
if len(df) > 0:
    write_deltalake(
        f"{path_silver}/IMP",
        df,
        mode="append",
        storage_options=storage_options,
        partition_by=["ANO"],
    )

In [9]:
con.close()