In [0]:
from delta.tables import DeltaTable
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession


In [0]:
spark = SparkSession.builder.getOrCreate()

In [0]:
def get_merge_condition(tabela_destino: str) -> str:
    """
    Retorna a condi√ß√£o de MERGE (string SQL) de acordo com a tabela de destino.
    Essa fun√ß√£o √© pura (n√£o depende de Spark) e √© f√°cil de testar.
    """
    if tabela_destino in [
        "workspace.bronze_etl.ipca",
        "workspace.bronze_etl.boi_gordo",
    ]:
        return "t.data = s.data AND t.valor = s.valor"

    return (
        "t.data = s.data "
        "AND t.ipca = s.ipca "
        "AND t.boi_gordo = s.boi_gordo"
    )


def merge_delta_table(df_join: DataFrame, tabela_destino: str) -> None:
    """
    Realiza MERGE (upsert) de um DataFrame em uma tabela Delta Lake.

    Regras de chaves:
    -----------------
    - Para 'workspace.bronze_etl.ipca' ou 'workspace.bronze_etl.boi_gordo': usa as colunas 'data', 'valor'
    - Para demais tabelas: usa as colunas 'data', 'ipca', 'boi_gordo'
    """

    if df_join is None or df_join.limit(1).count() == 0:
        print("‚ö†Ô∏è Nenhum dado para atualizar.")
        return

    total = df_join.count()
    print(f"‚úÖ Total de registros carregados: {total}")

    if spark.catalog.tableExists(tabela_destino):
        print(f"üì¶ Tabela {tabela_destino} j√° existe ‚Äî atualizando dados...")

        delta_table = DeltaTable.forName(spark, tabela_destino)

        condicao_merge = get_merge_condition(tabela_destino)
        print(f"üîë Condi√ß√£o de merge usada: {condicao_merge}")

        (
            delta_table.alias("t")
            .merge(
                df_join.alias("s"),
                condicao_merge,
            )
            .whenMatchedUpdateAll()
            .whenNotMatchedInsertAll()
            .execute()
        )

        print(f"‚úÖ MERGE conclu√≠do com sucesso em {tabela_destino}")

    else:
        print(f"üÜï Tabela {tabela_destino} n√£o existe ‚Äî criando nova tabela...")

        (
            df_join.write.format("delta")
            .partitionBy("data")
            .mode("append")
            .option("overwriteSchema", "true")
            .saveAsTable(tabela_destino)
        )

        print(f"‚úÖ Tabela {tabela_destino} criada e dados inseridos com sucesso.")
