In [0]:
import re
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def tratar_cnpj(cnpj):
    if cnpj is None:
        return None
    # Remove tudo que não for número
    cnpj_num = re.sub(r'\D', '', cnpj)
    # Retorna CNPJ formatado ou vazio se não tiver 14 dígitos
    cnpj_formatado = f"{cnpj_num[:2]}.{cnpj_num[2:5]}.{cnpj_num[5:8]}/{cnpj_num[8:12]}-{cnpj_num[12:14]}" if len(cnpj_num) == 14 else ""
    # Remove qualquer caracter especial do resultado
    cnpj_formatado = re.sub(r'[^0-9]', '', cnpj_formatado)
    return cnpj_formatado

def tratar_string(valor):
    if valor is None:
        return None
    # Remove espaços extras, coloca em maiúsculo e remove caracteres especiais
    valor = valor.strip().upper()
    valor = re.sub(r'[^A-Z0-9 ]', '', valor)
    return valor

tratar_cnpj_udf = udf(tratar_cnpj, StringType())
tratar_string_udf = udf(tratar_string, StringType())

In [0]:
from datetime import datetime, timedelta
import pytz

# Defina a data de corte para novos registros (15 minutos atrás automaticamente) em horário local
local_tz = pytz.timezone("America/Sao_Paulo")
data_corte = datetime.now(local_tz) - timedelta(minutes=30)
df_brz_clientes = spark.table("data_catalog_01_d.bronze.clientes").filter(f"data_carga > '{data_corte.strftime('%Y-%m-%d %H:%M:%S')}'")



In [0]:
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import current_timestamp, from_utc_timestamp

window_spec = Window.partitionBy("id").orderBy(F.desc("data_carga"))
df_silver_clientes = (
    df_brz_clientes.withColumn("row_num", F.row_number().over(window_spec))
    .filter(F.col("row_num") == 1)
    .drop("row_num")
    .withColumnRenamed("data_carga", "insert_date")
    .withColumn("document_number", tratar_cnpj_udf(F.col("cnpj")))
    .withColumn("company_name", tratar_string_udf(F.col("razao_social")))
    .withColumn("nick_name", tratar_string_udf(F.col("fantasia")))
    .withColumn("sales_person", tratar_string_udf(F.col("vendedor")))
    .withColumn("city_name", tratar_string_udf(F.col("cidade")))
    .withColumn("address", tratar_string_udf(F.col("endereco")))
    .withColumn("state", tratar_string_udf(F.col("estado")))
    .withColumn("customer_id", F.col("id"))
    .withColumn("address_number", F.col("numero"))
    .withColumn("update_date", from_utc_timestamp(current_timestamp(), "America/Sao_Paulo"))
    .select(
        "customer_id",
        "document_number",
        "company_name",
        "nick_name",
        "sales_person",
        "address",
        "address_number",
        "city_name",
        "state",
        "insert_date",
        "update_date"
    )
)


In [0]:
from delta.tables import DeltaTable

delta_table = DeltaTable.forName(spark, "data_catalog_01_d.silver.customers")

(
    delta_table.alias("target")
    .merge(
        df_silver_clientes.alias("source"),
        "target.customer_id = source.customer_id"
    )
    .whenMatchedUpdate(
        set = {
            "document_number": "source.document_number",
            "company_name": "source.company_name",
            "nick_name": "source.nick_name",
            "sales_person": "source.sales_person",
            "address": "source.address",
            "address_number": "source.address_number",
            "city_name": "source.city_name",
            "state": "source.state",
            "update_date": "source.update_date"
        }
    )
    .whenNotMatchedInsertAll()
    .execute()
)