# ETL: Structured Streaming e Delta Lake

In [0]:
%sql
CREATE TABLE bronze_layer
(id INT, first_name STRING, age DOUBLE);

In [0]:
%sql
INSERT INTO bronze_layer
VALUES (1, "Mike", 45), (2, "Omar", 23), (3, "Nick", 35)

num_affected_rows,num_inserted_rows
3,3


In [0]:
%sql
SELECT * FROM bronze_layer

id,first_name,age
1,Mike,45.0
2,Omar,23.0
3,Nick,35.0


In [0]:
%sql
DESCRIBE FORMATTED bronze_layer

col_name,data_type,comment
id,int,
first_name,string,
age,double,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,default,
Table,bronze_layer,
Created Time,Fri Feb 14 04:14:19 UTC 2025,
Last Access,UNKNOWN,


Ao usar Spark Structured Streaming nos adicionamos uma coluna para capturar o timestamp e escrever o dado dentro da table silver como um unico batch

Como estamos usando o Structured Streaming, podemos pensar o processo como um trigger batch com mudançasincrementais

In [0]:
from pyspark.sql.functions import *

def run_silver():
    query = (
        spark.readStream.\
            table("bronze_layer").\
                withColumn("processed_time", current_timestamp()).\
                    writeStream.option("checkpointLocation", "/FileStore/silver").\
                        trigger(availableNow=True).\
                            table("silver_layer")
    )

    query.awaitTermination()

In [0]:
run_silver()

"triger(availableNow-True)" para diminuir o processamento de dado combinado com "querry.awaytTermination() ate o batch ser processado

In [0]:
%sql
SELECT * FROM silver_layer

id,first_name,age,processed_time
1,Mike,45.0,2025-02-14T04:25:28.250+0000
2,Omar,23.0,2025-02-14T04:25:28.250+0000
3,Nick,35.0,2025-02-14T04:25:28.250+0000


In [0]:
%sql
DESCRIBE EXTENDED silver_layer

col_name,data_type,comment
id,int,
first_name,string,
age,double,
processed_time,timestamp,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,default,
Table,silver_layer,
Created Time,Fri Feb 14 04:15:54 UTC 2025,


# Escrevendo em Multiplas Tabelas

"foreachBach" metodo oferece a opção de executar diferentes logicas para escrever dados em cada microbatch durante o streaming de dados O Databricks garante que ao utilizar Delta Table a escrita indepotente , ate mesmo quando o processo ocorrer em multiplas tabelas se nós setarmos as opções "txnVersion"  e "txnAppld"

In [0]:
from pyspark.sql.functions import *

def write_two_sinks(microBatchDF, batchId):
    appId = "write_two_sinks"
    print ("batchId==", batchId)

    microBatchDF.\
        select("id", "first_name", current_timestamp().alias("processed_time")).\
        write.option("txnVersion", batchId).\
        option("txnAppId", appId).\
        mode("append").\
        saveAsTable("silver_first_name")

    microBatchDF.\
        select("id", "age", current_timestamp().alias("processed_time")).\
        write.option("txnVersion", batchId).\
        option("txnAppId", appId).\
        mode("append").\
        saveAsTable("silver_age")


def two_streams():
    query=(
        spark.readStream.\
                    table("bronze_layer").\
                        writeStream.\
                            foreachBatch(write_two_sinks).\
                                option("checkpointLocation", "/FileStore/two_streams").\
                                    trigger(once=True).\
                                        start()
    )
    
    query.awaitTermination()

In [0]:
two_streams()

batchId== 0


In [0]:
%sql
SELECT * FROM silver_first_name

id,first_name,processed_time
1,Mike,2025-02-14T04:36:24.140+0000
2,Omar,2025-02-14T04:36:24.140+0000
3,Nick,2025-02-14T04:36:24.140+0000


In [0]:
%sql
SELECT * FROM silver_age

id,age,processed_time
1,45.0,2025-02-14T04:36:29.590+0000
2,23.0,2025-02-14T04:36:29.590+0000
3,35.0,2025-02-14T04:36:29.590+0000


# Agregação de dados

Na proxima função iremos definir uma logica de agregação para a tabela da camada silver 

In [0]:
def define_aggregates():
  query = (
    spark.readStream
    .table("silver_layer")
    .groupBy("id")
    .agg(
      sum("age").alias("total_age"),
      mean("age").alias("avg_age"),
      count("age").alias("record_count")
      )
    .writeStream
    .option("checkpointLocation", "FileStore/aggregates")
    .outputMode("complete")
    .trigger(once=True)
    .table("silver_aggregates")
  )

  query.awaitTermination()

In [0]:
define_aggregates()

In [0]:
%sql

SELECT * FROM silver_aggregates

id,total_age,avg_age,record_count
1,45.0,45.0,1
3,35.0,35.0,1
2,23.0,23.0,1


# Processando Mudanças nos Dados

Aqui a tabela "bronze_cdc" ira representar a informação de CDC raw.

In [0]:
%sql

CREATE OR REPLACE TABLE bronze_cdc
(user_ID INT,
  first_name STRING,
  update_type STRING,
  processed_timestamp TIMESTAMP);


INSERT INTO bronze_cdc
VALUES (1, "James", "insert", current_timestamp()),
    (2, "John", "update", current_timestamp()),
    (3, "Kimberly", "update", current_timestamp()),
    (4, "Irene", "update", current_timestamp()),
    (5, null, "delete", current_timestamp())


num_affected_rows,num_inserted_rows
5,5


In [0]:
%sql

SELECT * FROM bronze_cdc

user_ID,first_name,update_type,processed_timestamp
1,James,insert,2025-02-14T05:39:51.125+0000
2,John,update,2025-02-14T05:39:51.125+0000
3,Kimberly,update,2025-02-14T05:39:51.125+0000
4,Irene,update,2025-02-14T05:39:51.125+0000
5,,delete,2025-02-14T05:39:51.125+0000


In [0]:
%sql

CREATE OR REPLACE TABLE silver_cdc
(user_id INT, first_name STRING, updated_timestamp TIMESTAMP)

o comando "MERGE" pode facilmente ser escrito como SQL para aplicar CDC apropriadamente, dando o tipo de update recebido.

In [0]:
def upsert_cdc (microBatchDF, batchID):

    microBatchDF.createOrReplaceTempView("bronze_cdc_batch")
    
    query = f"""
    MERGE INTO silver_cdc s
    USING bronze_cdc_batch b
    ON b.user_id = s.user_id
    WHEN MATCHED AND b.update_type = "update"
    THEN UPDATE SET user_id=b.user_id, first_name=b.first_name, updated_timestamp=b.processed_timestamp
    WHEN MATCHED AND b.update_type = "delete"
    THEN DELETE
    WHEN NOT MATCHED AND b.update_type = "update" or b.update_type = "insert"
    THEN INSERT (user_id, first_name, updated_timestamp)
    VALUES (b.user_id, b.first_name, b.processed_timestamp)
   """

    # Acessar a spark session local para cada foreachBatch
    microBatchDF._jdf.sparkSession().sql(query)

In [0]:
def cdc_merge():
    query = (
        spark.readStream.table("bronze_cdc").writeStream.foreachBatch(upsert_cdc).option("checkpointLocation", "FileStore/silver_cdc").trigger(availableNow=True).start()
    )

    query.awaitTermination()

In [0]:
cdc_merge()

In [0]:
%sql 

SELECT * FROM silver_cdc

user_id,first_name,updated_timestamp
1,James,2025-02-14T05:39:51.125+0000
2,John,2025-02-14T05:39:51.125+0000
3,Kimberly,2025-02-14T05:39:51.125+0000
4,Irene,2025-02-14T05:39:51.125+0000


Alterando nossa tabela bronze

In [0]:
%sql
INSERT INTO bronze_cdc
VALUES (1, "Lala", "update", current_timestamp()),
    (2, "T4est", "update", current_timestamp()),
    (3, "Apo", "update", current_timestamp()),
    (4, null, "update", current_timestamp()),
    (5, "Test123", "insert", current_timestamp())

num_affected_rows,num_inserted_rows
5,5


In [0]:
cdc_merge()

In [0]:
%sql 

SELECT * FROM silver_cdc

user_id,first_name,updated_timestamp
1,Lala,2025-02-14T05:45:46.735+0000
2,T4est,2025-02-14T05:45:46.735+0000
3,Apo,2025-02-14T05:45:46.735+0000
4,,2025-02-14T05:45:46.735+0000
5,Test123,2025-02-14T05:45:46.735+0000


### Join de tabelas incrementais

In [0]:
def incremental_stream_join():
    first_nameDF=spark.readStream.table("silver_first_name")
    ageDF = spark.readStream.table("silver_age")

    return(
        first_nameDF.join(ageDF, first_nameDF.id == ageDF.id, "inner")
        .select(first_nameDF.id, first_nameDF.first_name, ageDF.age, current_timestamp().alias("joined_timestamp"))
    .writeStream
    .option("checkpointLocation", "FileStore/joined_streams")
    .table("incremental_joined_streams")
    )


In [0]:
incremental_stream_join()

Out[210]: <pyspark.sql.streaming.query.StreamingQuery at 0x7efedcb60430>

In [0]:
display(spark.readStream.table("incremental_joined_streams"))

id,first_name,age,joined_timestamp
1,Mike,45.0,2025-02-14T06:01:26.842+0000
3,Nick,35.0,2025-02-14T06:01:26.842+0000
2,Omar,23.0,2025-02-14T06:01:26.842+0000
15,Jacques,15.9,2025-02-14T06:10:11.602+0000
11,Amelia,11.5,2025-02-14T06:10:11.602+0000
10,Pedro,10.5,2025-02-14T06:10:11.602+0000
14,Daiyu,14.2,2025-02-14T06:10:11.602+0000
12,Diya,12.5,2025-02-14T06:10:11.602+0000
13,li,13.5,2025-02-14T06:10:11.602+0000


### Introduzindo dados novos na camada bronze

In [0]:
%sql
INSERT INTO bronze_layer
VALUES (10, "Pedro", 10.5),
       (11, "Amelia", 11.5),
       (12, "Diya", 12.5),
       (13, "li", 13.5),
       (14, "Daiyu", 14.2),
       (15, "Jacques", 15.9)


num_affected_rows,num_inserted_rows
6,6


In [0]:
two_streams()

batchId== 1


### Comando para encerrar os processos streamings

In [0]:
for stream in spark.streams.active:
    print (f"Stopping {stream.name}")
    stream.stop()
    stream.awaitTermination()

Stopping None
Stopping display_query_1
