In [0]:
UsuarioKafka = dbutils.secrets.get(scope='sc-adls', key='UsuarioKafka')
PasswordKafka = dbutils.secrets.get(scope='sc-adls', key='PasswordKafka')
ServerKafka = dbutils.secrets.get(scope='sc-adls', key='ServerKafka')

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from delta.tables import *

In [0]:
schema = "struct<dni:string,nombres:string,curso:string, nota:integer, fechaRegistro:string>"

dfStreamingKafka = (
    spark.readStream.format("kafka")
        .option("subscribe","notasAlumnos")
        .option("kafka.security.protocol", "SASL_SSL") 
        .option("kafka.sasl.jaas.config",
                f"org.apache.kafka.common.security.plain.PlainLoginModule required username='{UsuarioKafka}' password='{PasswordKafka}';") 
        .option("kafka.sasl.mechanism", "PLAIN") 
        .option("kafka.bootstrap.servers", ServerKafka) 
        .option("startingOffsets", "earliest") 
        .load()
        .withColumn("value", col("value").cast("string"))
        .withColumn("value", from_json( "value" ,  schema))
        .select("value.*", "topic", current_timestamp().alias("fechaRegistroKafka"))
).dropna().drop("topic")

In [0]:
(
    dfStreamingKafka.writeStream
        .trigger(processingTime='1 seconds')
        .option("checkpointLocation", "dbfs:/user/hive/warehouse/db_alumnos.db/tbalumnos/checkpoint/alumnos") # para mantener el estado de la consulta de streaming y permitir la recuperación de fallos.
        .outputMode("append") # solo los nuevos datos recibidos en cada micro-batch se escribirán en la tabla
        .toTable("db_alumnos.tbalumnos")
)

In [0]:
dfStreamingtbalumnos = (
    spark.readStream.table("db_alumnos.tbalumnos")
)

In [0]:
#display(dfStreamingtbalumnos)

In [0]:
def foreachBatchMethod(dfMicrobatch, id): # (dfMicrobatch que llega cada segundo)
    windowsSpec = Window.partitionBy("dni" , "curso").orderBy(col("fechaRegistroKafka").desc()) # identifica el ultimo registro que ha llegado
    dfUniques = dfMicrobatch.withColumn("ranked" , row_number().over(windowsSpec)) \
                            .where( col("ranked") == 1) \
                            .drop("ranked")

    main_table = DeltaTable.forName(spark, "db_alumnos.tbalumnos_unique") # instancia a la tabla tb_alumnos
    
    main_table.alias("main") \
        .merge(dfUniques.alias("news") , "main.curso = news.curso and main.dni = news.dni" )\
        .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
        .execute()

In [0]:
(
    dfStreamingtbalumnos.writeStream
        .trigger(processingTime='10 seconds')
        .option("checkpointLocation", "dbfs:/user/hive/warehouse/db_alumnos.db/tbalumnos/checkpoint/alumnos_unique")
        .outputMode("update")
        .foreachBatch(foreachBatchMethod)
        .start()    
)