In [0]:
from pyspark.sql.functions import *

spark.sql("USE CATALOG real_time_projects")
spark.sql("USE SCHEMA realtime_log_analytics_alerting")

silver_df = (
    spark.readStream.table("bronze_logs")
    .withColumn("timestamp", to_timestamp("timestamp"))
    .filter(col("timestamp").isNotNull())
    .withColumn("level", upper(col("level")))
    .withColumn("log_date", to_date("timestamp"))
)

silver_df.writeStream \
    .format("delta") \
    .option("checkpointLocation", "/Volumes/real_time_projects/realtime_log_analytics_alerting/project_volume/checkpoints/silver") \
    .partitionBy("log_date") \
    .outputMode("append") \
    .trigger(availableNow=True) \
    .table("silver_logs")


In [0]:
#------------------ Data Quality Checks ------------

from pyspark.sql.functions import *

valid_levels = ["INFO", "WARN", "ERROR"]

spark.sql("USE CATALOG real_time_projects")
spark.sql("USE SCHEMA realtime_log_analytics_alerting")

dq_df = (
    spark.readStream.table("silver_logs")
    .withColumn(
        "dq_error",
        when(col("event_id").isNull(), "NULL_EVENT_ID")
        .when(col("service").isNull(), "NULL_SERVICE")
        .when(~col("level").isin(valid_levels), "INVALID_LEVEL")
        .when(col("response_time_ms") <= 0, "INVALID_RESPONSE_TIME")
        .when(col("timestamp") > current_timestamp(), "FUTURE_TIMESTAMP")
        .otherwise("VALID")
    )
)

#------------- Valid Stream -----------------
dq_df.filter(col("dq_error") == "VALID") \
    .drop("dq_error") \
    .writeStream \
    .format("delta") \
    .option("checkpointLocation", "/Volumes/real_time_projects/realtime_log_analytics_alerting/project_volume/checkpoints/silver_valid") \
    .outputMode("append") \
    .trigger(availableNow=True) \
    .table("silver_logs_valid")

#------------ Quarantine Stream ----------------
dq_df.filter(col("dq_error") != "VALID") \
    .writeStream \
    .format("delta") \
    .option("checkpointLocation", "/Volumes/real_time_projects/realtime_log_analytics_alerting/project_volume/checkpoints/quarantine") \
    .outputMode("append") \
    .trigger(availableNow=True) \
    .table("quarantine_logs")



In [0]:
from pyspark.sql.functions import *

#------------- SCD Type-2 Initial Load ----------------

service_data = [
    ("payment-service", "payments", "critical"),
    ("order-service", "orders", "high"),
    ("inventory-service", "inventory", "medium"),
    ("auth-service", "security", "critical")
]

columns = ["service", "owner", "tier"]

spark.sql("USE CATALOG real_time_projects")
spark.sql("USE SCHEMA realtime_log_analytics_alerting")

df = spark.createDataFrame(service_data, columns) \
    .withColumn("effective_from", current_timestamp()) \
    .withColumn("effective_to", lit(None).cast("timestamp")) \
    .withColumn("is_current", lit(True))

df.write.format("delta").mode("overwrite").saveAsTable("dim_service")


In [0]:
from delta.tables import DeltaTable

#---------------- SCD Type-2 Incremental Load ------------------

updates = [
    ("order-service", "orders", "critical")  # tier changed
]

updates_df = spark.createDataFrame(updates, ["service", "owner", "tier"])

delta_table = DeltaTable.forName(spark, "dim_service")

delta_table.alias("t").merge(
    updates_df.alias("s"),
    "t.service = s.service AND t.is_current = true"
).whenMatchedUpdate(
    condition="t.tier <> s.tier",
    set={
        "effective_to": current_timestamp(),
        "is_current": lit(False)
    }
).whenNotMatchedInsert(
    values={
        "service": "s.service",
        "owner": "s.owner",
        "tier": "s.tier",
        "effective_from": current_timestamp(),
        "effective_to": lit(None),
        "is_current": lit(True)
    }
).execute()


In [0]:
#---------- Enrichment (Streaming Join) -------------

spark.sql("USE CATALOG real_time_projects")
spark.sql("USE SCHEMA realtime_log_analytics_alerting")

silver_logs = (
    spark.readStream.table("silver_logs_valid")
    .withColumnRenamed("service", "silver_service")
)

enriched_df = (
    silver_logs.join(
        spark.table("dim_service"),
        (col("silver_service") == col("dim_service.service")) &
        (col("dim_service.is_current") == True),
        "left"
    )
)

enriched_df.writeStream \
    .format("delta") \
    .option("checkpointLocation", "/Volumes/real_time_projects/realtime_log_analytics_alerting/project_volume/checkpoints/enriched") \
    .outputMode("append") \
    .trigger(availableNow=True) \
    .table("enriched_logs")