In [0]:
%sql
CREATE VOLUME IF NOT EXISTS formula1.landing.estudantes;

In [0]:
spark.read.option('header', True).csv(f'{path}/*.csv').write.saveAsTable('estudantes')

In [0]:
%sql
CREATE OR REPLACE TABLE estudantes (id_estudante STRING)

In [0]:
path = '/Volumes/formula1/landing/estudantes'

In [0]:
df = (
    spark.readStream
    .format('cloudFiles')
    .option('cloudFiles.format', 'csv')
    .option('header', True)
    .option('inferSchema', True)
    .option('cloudFiles.schemaLocation', f'{path}/_ckp_schema')
    .option('cloudFiles.schemaEvolutionMode', 'addNewColumns')
    .load(f'{path}/*.csv')
)

In [0]:
from delta.tables import *
from pyspark.sql.functions import current_timestamp


def upsetToDelta(microBatchOutputDF, batchId):
    deltaTable = DeltaTable.forName(spark, f"estudantes")
    microBatchOutputDF = microBatchOutputDF.withColumn(
        "ingestion_date", current_timestamp()
    )
    microBatchOutputDF = microBatchOutputDF.orderBy(
        "process_date", ascending=False
    ).dropDuplicates(["id_aluno"])
    (
        deltaTable.alias("t")
        .merge(microBatchOutputDF.alias("s"), "t.id_aluno = s.id_aluno")
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )

In [0]:
streamQuery = (
    df.writeStream
    .format('delta')
    #.outputMode('append')
    .foreachBatch(upsetToDelta)
    .trigger(once=True)
    .option('checkpointLocation', f'{path}/_ckp_pessoas_bronze')
    .start()
)

In [0]:
%sql
SELECT * FROM estudantes