In [1]:
!pip install confluent-kafka
!pip install pyspark==3.1.1



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, sum, when, window
from pyspark.sql.types import StructType, StringType, LongType,TimestampType



# We set the environment variable PYSPARK_SUBMIT_ARGS to include the necessary
# package dependency for Kafka integration in the Spark application.
import os
os.environ["PYSPARK_SUBMIT_ARGS"] = "--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1 pyspark-shell"

bootstrap_server = "localhost:9092"
consumer_topic = "network-traffic"
producer_topic = "network-traffic-producer"



# Create SparkSession
spark = SparkSession.builder \
    .appName("NetworkTrafficAnomalyDetection") \
    .getOrCreate()

# Define the schema for incoming JSON messages
schema = StructType() \
    .add("source_ip", StringType()) \
    .add("destination_ip", StringType()) \
    .add("protocol", StringType()) \
    .add("bytes", LongType())\
    .add("timestamp",TimestampType())

# Create a streaming DataFrame that reads from a Kafka topic
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", bootstrap_server) \
    .option("subscribe", consumer_topic) \
    .option("startingOffsets", "latest") \
    .load()

# Convert the value column from binary to string
df = df.withColumn("value", df["value"].cast("string"))

# Parse the JSON data and select required columns
parsed_df = df \
    .select(from_json(df["value"], schema).alias("data")) \
    .select("data.source_ip", "data.destination_ip", "data.protocol", "data.bytes","data.timestamp")

# Perform network traffic analysis with sliding window
anomaly_df = parsed_df \
    .withColumn("is_anomaly", when(col("bytes") > 10000, 1).otherwise(0)) \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(window("timestamp", "10 minutes", "5 minutes"), col("source_ip")) \
    .agg(sum("is_anomaly").alias("anomaly_count"))

# Select relevant columns for producing to Kafka
output_df = anomaly_df \
    .select(
        col("window.start").alias("window_start"),
        col("window.end").alias("window_end"),
        col("source_ip"),
        col("anomaly_count").cast("string").alias("value")
    )

# Start writing the results to another Kafka topic
write_query = output_df \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", bootstrap_server) \
    .option("topic", producer_topic) \
    .option("checkpointLocation", "/tmp/checkpoints" ) \
    .start()

# Wait for the streaming query to finish
write_query.awaitTermination()


23/07/06 19:19:36 WARN StreamingQueryManager: Stopping existing streaming query [id=ca710588-a9c3-461e-85f5-a018e2b850b4, runId=0108aac1-5040-455b-a8dd-74c26e6a3894], as a new run is being started.
23/07/06 19:19:36 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@6d67c9b1 is aborting.
23/07/06 19:19:36 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@6d67c9b1 aborted.
23/07/06 19:19:36 ERROR Utils: Aborting task
org.apache.spark.executor.CommitDeniedException: Commit denied for partition 53 (task 77271, attempt 0, stage 771.0)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:431)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:452

KeyboardInterrupt: 

                                                                                