In [1]:
# 06_streaming_inference.py  —  Real‑time scoring with Spark + Kafka
"""
Run with e.g.
    spark-submit \
      --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 \
      06_streaming_inference.py

Prereqs
--------
* Kafka cluster reachable at BOOTSTRAP_SERVERS (env‑var)
* Topic with raw events already being produced (RAW_TOPIC)
  ▸ Each message = JSON with same schema used during training
* Best feature‑engineering PipelineModel saved at ../dados/best_pipe
* Best estimator Model saved at           ../dados/best_model
* Enough memory executors for VectorAssembler & GBT (~4 GB/executor)

Outputs
-------
* Predictions pushed to PRED_TOPIC (JSON with all original fields + `prediction`)
* Also written to Parquet for audit ▸ ../dados/stream_preds/ (partitioned by date)
"""

import os
from pyspark.sql import SparkSession, functions as F
from pyspark.ml import PipelineModel
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, BooleanType, TimestampType

# ─────────── 1. Spark Session ────────────────────────────────────────────────
spark = (
    SparkSession.builder.appName("ChicagoStreamingScoring")
    .getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")

# ─────────── 2. Config vars (can be overridden per env) ──────────────────────
BOOTSTRAP = os.environ.get("BOOTSTRAP_SERVERS", "localhost:9092")
RAW_TOPIC  = os.environ.get("RAW_TOPIC",  "chicago_raw")
PRED_TOPIC = os.environ.get("PRED_TOPIC", "chicago_pred")
CHECKPOINT = os.environ.get("CHECKPOINT_DIR", "../dados/checkpoints/stream_pred")
PARQUET_OUT = "../dados/stream_preds"

# ─────────── 3. Load pipeline + model ────────────────────────────────────────
pipe = PipelineModel.load("../dados/best_pipe")  # feature engineering
model = PipelineModel.load("../dados/best_model")  # last stage = GBTClassifier

# ─────────── 4. Define input schema (only primitives; no Vector) ─────────────
#  Adjust if you added/removed columns during EDA
schema = (
    StructType()
    .add("ID", StringType())
    .add("Date", TimestampType())
    .add("Primary Type", StringType())
    .add("Location Description", StringType())
    .add("Beat", IntegerType())
    .add("District", IntegerType())
    .add("Latitude", DoubleType())
    .add("Longitude", DoubleType())
    .add("Hour", IntegerType())
    .add("DayOfWeek", IntegerType())
    .add("Arrest", BooleanType())
)

# ─────────── 5. Read stream from Kafka  ──────────────────────────────────────
raw_df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", BOOTSTRAP)
    .option("subscribe", RAW_TOPIC)
    .option("startingOffsets", "latest")
    .load()
)
json_df = (
    raw_df.select(F.col("value").cast("string").alias("json"))
           .select(F.from_json("json", schema).alias("data"))
           .select("data.*")
)

# ─────────── 6. Feature engineering + prediction  ────────────────────────────
features_df = pipe.transform(json_df)
pred_df = model.transform(features_df)

out_df = (pred_df
          .withColumn("prediction", F.col("prediction"))
          .drop("features")
)

# ─────────── 7A. Write predictions back to Kafka  ────────────────────────────
query_kafka = (
    out_df.select(F.to_json(F.struct(*out_df.columns)).alias("value"))
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", BOOTSTRAP)
    .option("topic", PRED_TOPIC)
    .option("checkpointLocation", CHECKPOINT + "/kafka")
    .outputMode("append")
    .start()
)

# ─────────── 7B. Write Parquet sink (partitioned by date) ────────────────────
query_parquet = (
    out_df.writeStream
    .format("parquet")
    .option("path", PARQUET_OUT)
    .option("checkpointLocation", CHECKPOINT + "/parquet")
    .partitionBy("DayOfWeek")
    .outputMode("append")
    .start()
)

print("🔥 Streaming inference running…  Ctrl‑C to stop.")

spark.streams.awaitAnyTermination()


KeyboardInterrupt: 