In [1]:
import os
import logging
import io
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, current_timestamp
import json
from minio import Minio
import pandas as pd
import unicodedata
import re

In [2]:
minio_connection = ""  

In [None]:
try:
    minio_conn = json.loads(minio_connection)
except json.JSONDecodeError:
    with open('../variables/minio_connection.json', "r") as minio_connection_file:
        minio_conn = json.loads(minio_connection_file.read())

In [4]:
class LazySparkSession:
    packages = [
        "io.delta:delta-spark_2.13:4.0.0",
        "org.apache.hadoop:hadoop-aws:3.4.0",
        "com.amazonaws:aws-java-sdk-bundle:1.12.787",
    ]

    def __init__(self, access_key, secret_key, endpoint):
        self._access_key = access_key
        self._secret_key = secret_key
        self._endpoint = endpoint
        

    def start(
        self,
        app_name: str = "Airflow Spark Delta Minio App",
        executor_memory: str = "1g",
        driver_memory: str = "1g",
        driver_maxresultsize: str = "1g",
        master_url: str = "local[*]",
    ):

        builder = (
            SparkSession
            .Builder()
            .appName("XLSX to Delta Bronze")
            .config("spark.hadoop.fs.s3a.access.key", self._access_key)
            .config("spark.hadoop.fs.s3a.secret.key", self._secret_key)
            .config("spark.hadoop.fs.s3a.endpoint", self._endpoint)
            .config("spark.hadoop.delta.enableFastS3AListFrom", "true")
            #
            .config("spark.executor.memory", executor_memory)
            .config("spark.driver.memory", driver_memory)
            .config("spark.driver.maxResultSize", driver_maxresultsize)
            #
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
            #
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
            #
            .config("spark.jars.packages", ",".join(self.packages))
            .config("spark.ui.port", "0")
            .master(master_url)
            
        )

        return builder.getOrCreate()

In [5]:
spark = LazySparkSession(
    access_key=minio_conn.get("access_key"), 
    secret_key=minio_conn.get("key"), 
    endpoint=minio_conn.get("endpoint")
).start()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/08/04 13:12:28 WARN Utils: Your hostname, DESKTOP-EDEM2DH, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/08/04 13:12:28 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/home/edcarlos/projeto-lakehouse/.venv/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/edcarlos/.ivy2.5.2/cache
The jars for the packages stored in: /home/edcarlos/.ivy2.5.2/jars
io.delta#delta-spark_2.13 added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-92ff157b-9486-422c-b652-8e0604e09622;1.0
	confs: [default]
	found io.delta#delta-spark_2.13;4.0.0 in central
	found io.delta#delta-storage;4.0.0 in central

In [6]:
endpoint_raw = minio_conn["endpoint"]
endpoint_sem_http = endpoint_raw.replace("http://", "").replace("https://", "")

s3_client = Minio(
    endpoint=endpoint_sem_http,
    access_key=minio_conn['access_key'],
    secret_key=minio_conn['key'],
    secure=endpoint_raw.startswith("https")
)

In [7]:
# Configuração básica de logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

In [8]:
# Variáveis do projeto
bucket_landing = "landing"
bucket_bronze = "bronze"
prefix_landing = "anp/serie_levantamento_precos/"

In [9]:
# Limpar e padronizar nome das colunas
def limpar_nome_colunas_spark(spark_df):
    """
    Limpa e padroniza as colunas do Dataframe.
    Remove acentos.
    Substitui caracteres não alfanumericos por underscore (_).
    Remove múltiplos underscores.
    """

    novas_colunas=[]
    for coluna in spark_df.columns:
        coluna_sem_acento = unicodedata.normalize("NFKD", coluna).encode("ASCII", "ignore").decode("ASCII")
        coluna_limpa = re.sub(r'[^a-zA-Z0-9_-]', '-', coluna_sem_acento)
        coluna_lower = coluna_limpa.lower()
        coluna_final = re.sub(r'_+', '_', coluna_lower).strip('_')
        novas_colunas.append(coluna_final)

    # Renomeia todas as colunas de uma vez
    return spark_df.toDF(*novas_colunas)

In [10]:
# Listar arquivos .xlsx
def listar_arquivos_xlsx(bucket: str, prefix: str) -> list:
    try:
        arquivos = s3_client.list_objects(bucket, prefix=prefix, recursive=True)
        lista = [obj.object_name for obj in arquivos if obj.object_name.endswith(".xlsx")]
        logging.info(f"{len(lista)} arquivos .xlsx encontrados em s3://{bucket}/{prefix}")
        return lista
    except Exception as e:
        logging.error(f"Erro ao listar arquivos: {e}")
        return []

In [11]:
# Leitura específica para este formato de arquivo
def ler_arquivos_anp(bucket: str, caminho_arquivo: str) -> pd.DataFrame:
    try:
        logging.info(f"Lendo arquivo com layout específico: {caminho_arquivo}")
        resposta = s3_client.get_object(bucket, caminho_arquivo)

        # Parâmetros descobertos na inspeção
        df = pd.read_excel(
            io.BytesIO(resposta.read()),
            engine="openpyxl",
            header=11,              # Cabeçalho indice 11
            skipfooter=12           # Conteúdo começa indice 12
        )
        
        # Remove colunas que foram criadas sem nome
        df.dropna(axis=1, how='all', inplace=True)

        return df
    except Exception as e:
        logging.error(f"Erro ao ler arquivo {caminho_arquivo}: {e}")
        raise

In [12]:
# Salvar como Delta
def salvar_como_delta(spark_df, nome_arquivo: str):
    nome_limpo = os.path.splitext(os.path.basename(nome_arquivo))[0]
    nome_limpo_sanitizado = re.sub(r'[^a-zA-Z0-9_-]', '-', nome_limpo)

    caminho_bronze = f"s3a://{bucket_bronze}/anp/precos-combustiveis/{nome_limpo_sanitizado}"

    df_final = (
        spark_df.withColumn("_carga_data", current_timestamp())
                .withColumn("_arquivo_origem", lit(nome_arquivo))
    )

    df_final.write.format("delta").mode("overwrite").save(caminho_bronze)
    logging.info(f"Arquivo salvo com sucesso: {caminho_bronze}")

In [13]:
# --- Execução principal
arquivos = listar_arquivos_xlsx(bucket_landing, prefix_landing)

for arquivo in arquivos:
    try:
        df_pandas = ler_arquivos_anp(bucket_landing, arquivo)
        df_spark = spark.createDataFrame(df_pandas)
        df_spark_limpo = limpar_nome_colunas_spark(df_spark)
        salvar_como_delta(df_spark_limpo, os.path.basename(arquivo))
    except Exception as e:
        logging.error(f"Erro ao processar {arquivo}: {e}")

2025-08-04 13:12:36,858 - INFO - 10 arquivos .xlsx encontrados em s3://landing/anp/serie_levantamento_precos/
2025-08-04 13:12:36,859 - INFO - Lendo arquivo com layout específico: anp/serie_levantamento_precos/mensal-brasil-2001-a-2012.xlsx
2025-08-04 13:12:36,859 - INFO - Lendo arquivo com layout específico: anp/serie_levantamento_precos/mensal-brasil-2001-a-2012.xlsx
25/08/04 13:12:38 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
25/08/04 13:12:39 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
25/08/04 13:12:45 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 b