# STREAMING

## 1. Read kafka stream

In [0]:
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import (StructType, StructField, StringType, DoubleType,
                               IntegerType, BooleanType, TimestampType)

# Schema that mirrors the data being sent to kafka
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([
    StructField("transaction_id", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("sender_account", StringType(), True),
    StructField("receiver_account", StringType(), True),
    StructField("amount", StringType(), True),
    StructField("transaction_type", StringType(), True),
    StructField("merchant_category", StringType(), True),
    StructField("location", StringType(), True),
    StructField("device_used", StringType(), True),
    StructField("is_fraud", StringType(), True),
    StructField("time_since_last_transaction", StringType(), True),
    StructField("spending_deviation_score", StringType(), True),
    StructField("velocity_score", StringType(), True),
    StructField("geo_anomaly_score", StringType(), True),
    StructField("payment_channel", StringType(), True),
    StructField("ip_address", StringType(), True),
    StructField("device_hash", StringType(), True),
    StructField("event_time", StringType(), True),
    StructField("time_since_last_tx_tmp", StringType(), True),
    StructField("time_since_last_transaction_imp", StringType(), True),
    StructField("hour_of_day", StringType(), True),
    StructField("day_of_week", StringType(), True),
    StructField("is_weekend", StringType(), False),
    StructField("month", StringType(), True),
    StructField("txns_1h", StringType(), False),
    StructField("amt_24h", StringType(), False),
    StructField("uniq_rcv_7d", StringType(), False),
    StructField("avg_gap_mins_10", StringType(), False),
])
dbutils.fs.rm("dbfs:/tmp/stream", recurse=True)
spark.conf.set("spark.sql.streaming.checkpointLocation", "/tmp/stream") #set scheck point




In [0]:
%python
# Read from Kafka
raw_df = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafkainterface.ddns.net:9092")
    .option("subscribe", "streaming")
    .load())

In [0]:
%python
# Parse JSON → struct → columns
parsed_df = (raw_df
    .select(from_json(col("value").cast("string"), schema).alias("data"))
    .select("data.*"))          # format raw_df to fit the schema


In [0]:
%python
from pyspark.sql.functions import col, to_timestamp
# cast the correct datatypes
casted_df = (
    parsed_df
        # ────────── timestamps ──────────
        .withColumn("timestamp",   to_timestamp("timestamp"))
        .withColumn("event_time",  to_timestamp("event_time"))

        # ────────── doubles ──────────
        .withColumn("amount",                      col("amount").cast("double"))
        .withColumn("time_since_last_transaction", col("time_since_last_transaction").cast("double"))
        .withColumn("spending_deviation_score",    col("spending_deviation_score").cast("double"))
        .withColumn("velocity_score",              col("velocity_score").cast("double"))
        .withColumn("geo_anomaly_score",           col("geo_anomaly_score").cast("double"))
        .withColumn("time_since_last_tx_tmp",      col("time_since_last_tx_tmp").cast("double"))
        .withColumn("time_since_last_transaction_imp", col("time_since_last_transaction_imp").cast("double"))
        .withColumn("amt_24h",                     col("amt_24h").cast("double"))
        .withColumn("avg_gap_mins_10",             col("avg_gap_mins_10").cast("double"))
        # ────────── integers ──────────
        .withColumn("hour_of_day", col("hour_of_day").cast("int"))
        .withColumn("day_of_week", col("day_of_week").cast("int"))
        .withColumn("is_weekend",  col("is_weekend").cast("int"))
        .withColumn("month",       col("month").cast("int"))

        # ────────── longs ──────────
        .withColumn("txns_1h",     col("txns_1h").cast("long"))
        .withColumn("uniq_rcv_7d", col("uniq_rcv_7d").cast("long"))

        # ────────── booleans ──────────
        .drop("is_fraud")
)

In [0]:
# write casted_df(stream) into a delta dataframe for
query = casted_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/stream/Streaming_data") \
    .option("mergeSchema", "true") \
    .start("/mnt/datalake/Streaming_data")

In [0]:
%python
#make sure the dataset is ready to be feed into the preprocessing pipelines
casted_df.printSchema()

root
 |-- transaction_id: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- sender_account: string (nullable = true)
 |-- receiver_account: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- transaction_type: string (nullable = true)
 |-- merchant_category: string (nullable = true)
 |-- location: string (nullable = true)
 |-- device_used: string (nullable = true)
 |-- time_since_last_transaction: double (nullable = true)
 |-- spending_deviation_score: double (nullable = true)
 |-- velocity_score: double (nullable = true)
 |-- geo_anomaly_score: double (nullable = true)
 |-- payment_channel: string (nullable = true)
 |-- ip_address: string (nullable = true)
 |-- device_hash: string (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- time_since_last_tx_tmp: double (nullable = true)
 |-- time_since_last_transaction_imp: double (nullable = true)
 |-- hour_of_day: integer (nullable = true)
 |-- day_of_week: integer (nullable = true)
 

## 2. Load Preprocessing Pipeline and Slicer


In [0]:
%python
from pyspark.ml import PipelineModel

preproc_model = PipelineModel.load("/FileStore/models/preproc_model")
slicer_model = PipelineModel.load("/FileStore/models/slicer_top10")



In [0]:
#transform the data to be fed into the model
sliceed_df = slicer_model.transform(preproc_model.transform(casted_df))


## 3. Load Model and Predict

In [0]:
%python
from pyspark.ml.classification import LogisticRegressionModel
model_path = "/FileStore/models/rf_top10_weighted_model_40pct"
model = RandomForestClassificationModel.load(model_path)

# add probability + prediction columns
scored_df = model.transform(sliceed_df)


## 4. Write data with model prediction to kafka 

In [0]:
#stream results back to kafka
# make a memory sink(in order to query the df in databricks)
out = (
    scored_df
      .selectExpr(
          "CAST(transaction_id AS STRING) AS key",
          "to_json(struct(*))          AS value"     # every column → one JSON blob
      )
)

# -------------------------------------------------------------------
# 2.  Stream it back to Kafka
# -------------------------------------------------------------------
kafka_query = (
    out.writeStream
       .format("kafka")
       .option("kafka.bootstrap.servers", "kafkainterface.ddns.net:9092")
       .option("topic", "scored-transactions")               # create this topic first
       .option("checkpointLocation", "dbfs:/checkpoints/scored_df→kafka")
       .outputMode("append")                                 # required for Kafka sink
       .start()
)

# Conclusion

The model prediction and data are available on another kafka topic which streamlines production and makes the building of the dashboard easier