In [0]:
from pyspark.sql.functions import col, current_timestamp

# Tabelas
raw_table = "default.raw_transactions"
core_table = "default.user_behavior_profiles"
analytics_table = "default.real_time_alerts"

# Caminho para o checkpoint deste stream específico diferente do checkpoint da ingestão
storage_account_name = "adlshydra"
checkpoint_path = f"abfss://raw@{storage_account_name}.dfs.core.windows.net/checkpoint_alerts"

In [0]:
transactions_stream_df = spark.readStream.table(raw_table)
profiles_df = spark.read.table(core_table)

In [0]:
enriched_stream_df = transactions_stream_df.join(
    profiles_df,
    on="user_id",
    how="inner" 
)

In [0]:
# Sensibildiade pra detecção
N_STD_DEVS = 2

alerts_df = enriched_stream_df.filter(
    col("amount_brl") > (col("avg_amount") + N_STD_DEVS * col("stddev_amount"))
).withColumn("alert_timestamp", current_timestamp())

# Escrevendo os alertas na Analytics
alerts_stream_query = alerts_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", checkpoint_path) \
    .option("mergeSchema", "true") \
    .trigger(availableNow=True) \
    .toTable(analytics_table)

In [0]:
%sql
SELECT * FROM default.real_time_alerts

In [0]:
# Célula de Limpeza (execute apenas uma vez para resetar o stream)
#checkpoint_to_clear = "abfss://raw@adlshydra.dfs.core.windows.net/checkpoint_alerts"
#dbutils.fs.rm(checkpoint_to_clear, recurse=True)
# print(f"Checkpoint de alertas em '{checkpoint_to_clear}' foi removido com sucesso.")