In [0]:
from delta.tables import DeltaTable
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
import time
import os

schema_cliente = StructType([
    StructField("cliente_id", StringType(), True),
    StructField("nome", StringType(), True), 
    StructField("estado", StringType(), True),
    StructField("data_criacao", TimestampType(), True),
])

parquet_path = "/Volumes/workspace/bravium/bravium_db/cliente/"
bronze_path = "/Volumes/workspace/bravium/bronze/cliente/"

In [0]:
df_stream = (
    spark.readStream
    .format("parquet")
    .schema(schema_cliente)
    .load(parquet_path)
)

df_stream = (
    df_stream.dropDuplicates(["cliente_id", "data_criacao"])
             .withWatermark("data_criacao", "10 minutes")
)

In [0]:
def upsert_to_delta(batch_df, batch_id):
    delta_table = DeltaTable.forPath(spark, bronze_path)

    (
        delta_table.alias("t")
        .merge(
            batch_df.alias("s"),
            "t.cliente_id = s.cliente_id"
        )
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )

(
    df_stream.writeStream
    .foreachBatch(upsert_to_delta)
    .option("checkpointLocation", "/Volumes/workspace/bravium/bronze/checkpoints/cliente")
    .outputMode("update")
    .trigger(availableNow=True)
    .start()
    .awaitTermination()
)