In [5]:
# spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1 app.py

import os
import joblib
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

# Kafka config
SERVER = "broker:9092"
TOPIC = "pm10"
CHECKPOINT_PATH = "/home/jovyan/notebooks/spark/checkpoints"
ANOMALY_PARQUET_PATH = "/home/jovyan/notebooks/spark/anomalies/iforest"

# Schemat danych z Kafka (JSON jako string)
SCHEMA = StructType([
    StructField("station_id", IntegerType()),
    StructField("reading_date", StringType()),
    StructField("datetime_from_sensor", TimestampType()),
    StructField("value", DoubleType()),
    StructField("unit", StringType())
])

# Wczytaj wytrenowany model Isolation Forest
isoforest_model = joblib.load("/home/jovyan/notebooks/spark/isoforest_model.pkl")

# UDF do predykcji
def predict_anomaly(value):
    if value is None:
        return 0
    try:
        prediction = isoforest_model.predict([[value]])
        return int(prediction[0] == -1)
    except:
        return 0

predict_anomaly_udf = udf(predict_anomaly, IntegerType())

# Funkcja wykrywania anomalii przez Isolation Forest
def detect_isoforest_anomalies(df, epoch_id):
    if df.rdd.isEmpty():
        print(f"⚠️ Batch {epoch_id} is empty – skipping.")
        return
    try:
        print(f"🌲 [Batch {epoch_id}] Isolation Forest anomaly detection...")
        df_with_anomalies = df.withColumn("anomaly", predict_anomaly_udf(col("value")))
        anomalies = df_with_anomalies.filter(col("anomaly") == 1)

        if anomalies.rdd.isEmpty():
            print(f"ℹ️ No anomalies in batch {epoch_id}")
            return

        anomalies.select("station_id", "datetime_from_sensor", "value").show(truncate=False)

        # Zapisz do pliku Parquet
        anomalies.write.mode("append").parquet(ANOMALY_PARQUET_PATH)

        # Wyślij do Kafka topic
        kafka_ready = anomalies.selectExpr(
            "CAST(station_id AS STRING) AS key",
            "to_json(struct(*)) AS value"
        )
        kafka_ready.write.format("kafka") \
            .option("kafka.bootstrap.servers", SERVER) \
            .option("topic", "if_anomalies") \
            .save()

        print("📤 Anomalie wysłane do Kafka topic: 'if_anomalies'")

    except Exception as e:
        print(f"❌ Error during ISOForest detection: {e}")

# Główna aplikacja
if __name__ == "__main__":
    spark = SparkSession.builder.appName("IsolationForest-PM10").getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")

    # Dane z Kafka
    raw = (
        spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", SERVER)
        .option("subscribe", TOPIC)
        .option("startingOffsets", "latest")
        .option("failOnDataLoss", "false")
        .load()
    )

    parsed = (
        raw.selectExpr("CAST(value AS STRING) AS json_str")
        .select(from_json("json_str", SCHEMA).alias("data"))
        .select("data.*")
    )

    # Stream wykrywający anomalie Isolation Forest
    isoforest_query = (
        parsed.writeStream
        .outputMode("append")
        .foreachBatch(detect_isoforest_anomalies)
        .option("checkpointLocation", CHECKPOINT_PATH + "/iforest_detection")
        .trigger(processingTime="5 minutes")
        .start()
    )

    spark.streams.awaitAnyTermination()


Writing app_if.py
