In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
import boto3
import json
import os
import traceback

In [4]:

os.environ["PYSPARK_SUBMIT_ARGS"] = (
     "--packages org.apache.spark:spark-sql-kafka-0-10_2.13:4.1.1 pyspark-shell"
)

In [5]:

spark = (
    SparkSession.builder
    .appName("msk-to-s3-alerts")
    .getOrCreate()
)



In [6]:
BOOTSTRAP = "kafka:9092" #os.environ["KAFKA_BOOTSTRAP"]          # "b-1.xxx:9092,b-2.xxx:9092"
TOPIC = os.environ.get("KAFKA_TOPIC", "clickstream_events")



In [7]:

WATERMARK = os.environ.get("WATERMARK", "10 minutes")
ALERT_BAD_RATE = float(os.environ.get("ALERT_BAD_RATE", "0.02"))  # 2%


In [8]:
event_schema = StructType([
    StructField("event_id", StringType(), False),
    StructField("event_type", StringType(), True),
    StructField("event_ts", StringType(), False),  # parse to timestamp
    StructField("user_id", StringType(), True),
    StructField("session_id", StringType(), True),
    StructField("order_id", StringType(), True),
    StructField("product_id", StringType(), True),
    StructField("amount", DoubleType(), True),
    StructField("status", StringType(), True),
    StructField("device", StructType([
        StructField("os", StringType(), True),
        StructField("app_ver", StringType(), True)
    ]), True),
    StructField("ip", StringType(), True),
])

In [9]:

try:
    kdf = (
        spark.readStream.format("kafka")
        .option("kafka.bootstrap.servers", BOOTSTRAP)
        .option("subscribe", TOPIC)
        .option("startingOffsets", "latest")
        .load()
    )
except Exception as e:
    print(type(e), e)
    traceback.print_exc()

In [10]:

raw = kdf.select(
    F.col("key").cast("string").alias("key"),
    F.col("value").cast("string").alias("value"),
    F.col("timestamp").alias("kafka_ingest_ts"),
    F.col("topic"),
    F.col("partition"),
    F.col("offset")
)

parsed = raw.withColumn("json", F.from_json(F.col("value"), event_schema))


In [11]:

good = (
    parsed
    .filter(F.col("json").isNotNull())
    .select("key", "kafka_ingest_ts", "topic", "partition", "offset", F.col("json.*"))
    .withColumn("event_time", F.to_timestamp("event_ts"))   # assumes ISO-8601
    .drop("event_ts")
)


In [12]:
bad = (
    parsed
    .filter(F.col("json").isNull())
    .select("key", "kafka_ingest_ts", "topic", "partition", "offset", "value")
    .withColumn("reason", F.lit("SCHEMA_PARSE_FAILED"))
)



In [13]:
deduped = (
    good
    .withWatermark("event_time", WATERMARK)
    .dropDuplicates(["event_id"])
)

In [14]:
out = (
    deduped
    .withColumn("dt", F.to_date("event_time"))
    .withColumn("hh", F.date_format("event_time", "HH"))
)

In [15]:

def publish_alert(subject: str, message: dict):
    print("publish_alert", json.dumps(message, default=str))
    return 
    sns.publish(
        TopicArn=SNS_TOPIC_ARN,
        Subject=subject[:100],
        Message=json.dumps(message, default=str)
    )

In [16]:
S3_OUT = 'S3_OUT'

In [27]:

def alerting_and_write(batch_df, batch_id: int):
    df = (batch_df
          .withColumn("device_os", F.col("device.os"))
          .withColumn("device_app_ver", F.col("device.app_ver"))
          .drop("device"))

    (df.write.mode("append")
       .partitionBy("dt", "hh")
       .csv(S3_OUT))
    
    # 1) Write good data
    #(batch_df
    # .write.mode("append")
    # .partitionBy("dt", "hh")
    # .parquet(S3_OUT))

    # 2) Compute simple quality metrics per micro-batch (example)
    total = batch_df.count()
    if total == 0:
        publish_alert(
            "Streaming pipeline: zero events",
            {"batch_id": batch_id, "topic": TOPIC, "note": "No events in this microbatch"}
        )


In [18]:
CHECKPOINT = 'CHECKPOINT'

In [28]:
# --- Write streams ---
# Good stream
q1 = (
    out.writeStream
    .outputMode("append")
    .foreachBatch(alerting_and_write)
    .option("checkpointLocation", CHECKPOINT + "/good")
    .trigger(processingTime="1 minute")
    .start()
)
q1.awaitTermination(180)

StreamingQueryException: [STREAM_FAILED] Query [id = 775f9ec1-0be3-4b88-a963-365d4d2bee92, runId = 72bab06d-102c-4081-bc0a-fb8346d841c8] terminated with exception: Cannot invoke "scala.collection.IterableOps.map(scala.Function1)" because the return value of "scala.Option.get()" is null SQLSTATE: XXKST
=== Streaming Query ===
Identifier: [id = 775f9ec1-0be3-4b88-a963-365d4d2bee92, runId = 72bab06d-102c-4081-bc0a-fb8346d841c8]
Current Committed Offsets: {KafkaV2[Subscribe[clickstream_events]]: {"clickstream_events":{"0":86484}}}
Current Available Offsets: {KafkaV2[Subscribe[clickstream_events]]: {"clickstream_events":{"0":93483}}}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
~WriteToMicroBatchDataSourceV1 ForeachBatchSink, 775f9ec1-0be3-4b88-a963-365d4d2bee92, [checkpointLocation=CHECKPOINT/good], Append
+- ~Project [key#14, kafka_ingest_ts#16, topic#9, partition#10, offset#11L, event_id#18, event_type#19, user_id#21, session_id#22, order_id#23, product_id#24, amount#25, status#26, device#27, ip#28, event_time#29-T600000ms, dt#31, date_format(event_time#29-T600000ms, HH, Some(Etc/UTC)) AS hh#32]
   +- ~Project [key#14, kafka_ingest_ts#16, topic#9, partition#10, offset#11L, event_id#18, event_type#19, user_id#21, session_id#22, order_id#23, product_id#24, amount#25, status#26, device#27, ip#28, event_time#29-T600000ms, to_date(event_time#29-T600000ms, None, Some(Etc/UTC), true) AS dt#31]
      +- ~Deduplicate [event_id#18]
         +- ~EventTimeWatermark f356368c-3e16-401a-9ca3-f30d60304a8d, event_time#29: timestamp, 10 minutes
            +- ~Project [key#14, kafka_ingest_ts#16, topic#9, partition#10, offset#11L, event_id#18, event_type#19, user_id#21, session_id#22, order_id#23, product_id#24, amount#25, status#26, device#27, ip#28, event_time#29]
               +- ~Project [key#14, kafka_ingest_ts#16, topic#9, partition#10, offset#11L, event_id#18, event_type#19, event_ts#20, user_id#21, session_id#22, order_id#23, product_id#24, amount#25, status#26, device#27, ip#28, to_timestamp(event_ts#20, None, TimestampType, Some(Etc/UTC), true) AS event_time#29]
                  +- ~Project [key#14, kafka_ingest_ts#16, topic#9, partition#10, offset#11L, json#17.event_id AS event_id#18, json#17.event_type AS event_type#19, json#17.event_ts AS event_ts#20, json#17.user_id AS user_id#21, json#17.session_id AS session_id#22, json#17.order_id AS order_id#23, json#17.product_id AS product_id#24, json#17.amount AS amount#25, json#17.status AS status#26, json#17.device AS device#27, json#17.ip AS ip#28]
                     +- ~Filter isnotnull(json#17)
                        +- ~Project [key#14, value#15, kafka_ingest_ts#16, topic#9, partition#10, offset#11L, from_json(StructField(event_id,StringType,false), StructField(event_type,StringType,true), StructField(event_ts,StringType,false), StructField(user_id,StringType,true), StructField(session_id,StringType,true), StructField(order_id,StringType,true), StructField(product_id,StringType,true), StructField(amount,DoubleType,true), StructField(status,StringType,true), StructField(device,StructType(StructField(os,StringType,true),StructField(app_ver,StringType,true)),true), StructField(ip,StringType,true), value#15, Some(Etc/UTC), false) AS json#17]
                           +- ~Project [cast(key#7 as string) AS key#14, cast(value#8 as string) AS value#15, timestamp#12 AS kafka_ingest_ts#16, topic#9, partition#10, offset#11L]
                              +- ~StreamingDataSourceV2ScanRelation[key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13] KafkaTable


In [20]:
S3_BAD = 'S3_BAD'

In [21]:
# Bad stream to S3 (DLQ)
q2 = (
    bad.writeStream
    .outputMode("append")
    .format("csv") #("parquet")
    .option("path", S3_BAD)
    .option("checkpointLocation", CHECKPOINT + "/bad")
    .trigger(processingTime="1 minute")
    .start()
)

In [None]:
print("q1 active:", q1.isActive, "id:", q1.id)
print("q2 active:", q2.isActive, "id:", q2.id)
