In [2]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=f0c988d3ffd6f71bae82abce1410cb0dabb971d7c01a75c6e234c0a3fe2274f8
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [3]:
!pip install confluent-kafka

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting confluent-kafka
  Downloading confluent_kafka-2.1.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.9/3.9 MB[0m [31m87.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: confluent-kafka
Successfully installed confluent-kafka-2.1.1


In [4]:
!pip install streamlit


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting streamlit
  Downloading streamlit-1.23.1-py2.py3-none-any.whl (8.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m8.9/8.9 MB[0m [31m108.9 MB/s[0m eta [36m0:00:00[0m
Collecting blinker<2,>=1.0.0 (from streamlit)
  Downloading blinker-1.6.2-py3-none-any.whl (13 kB)
Collecting importlib-metadata<7,>=1.4 (from streamlit)
  Downloading importlib_metadata-6.7.0-py3-none-any.whl (22 kB)
Collecting pympler<2,>=0.9 (from streamlit)
  Downloading Pympler-1.0.1-py3-none-any.whl (164 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m164.8/164.8 kB[0m [31m21.1 MB/s[0m eta [36m0:00:00[0m
Collecting validators<1,>=0.2 (from streamlit)
  Downloading validators-0.20.0.tar.gz (30 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting gitpython!=3.1.19,<4,>=3 (from streamlit)
  Downloading GitPython-3.1.31-py3-none-any.whl (184 kB)
[2K 

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from confluent_kafka import Consumer, KafkaError, Producer
import streamlit as st
import matplotlib.pyplot as plt
from datetime import datetime

In [6]:
# Start SparkSession
spark = SparkSession.builder \
    .appName("Real-Time Network Traffic Analysis") \
    .getOrCreate()


In [7]:
# Configure Kafka connection details
bootstrap_servers = 'pkc-6ojv2.us-west4.gcp.confluent.cloud:9092'
sasl_username = 'G3AQTVSBGV44FT7M'
sasl_password = 'z/JoMbqlwhSv/MaYMUzzIW7qDDmnHTsmay1tTkmqLpvRmzB4NFQjcoHBWtj9nh5E'
kafka_topic = 'network-traffic'
processed_topic = 'processed-data'

In [8]:
# Define schema for the data
schema = StructType([
    StructField("source_ip", StringType(), True),
    StructField("destination_ip", StringType(), True),
    StructField("bytes_sent", IntegerType(), True),
    StructField("event_time", TimestampType(), True)
])
# Create Kafka consumer configuration
conf = {
    'bootstrap.servers': bootstrap_servers,
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'PLAIN',
    'sasl.username': sasl_username,
    'sasl.password': sasl_password,
    'group.id': 'network-traffic-group',
    'auto.offset.reset': 'earliest'
}

In [9]:
# Create Kafka consumer
consumer = Consumer(conf)
producer = Producer(conf)

# Subscribe to the Kafka topic
consumer.subscribe([kafka_topic])

In [10]:
# Read data from Kafka and perform real-time visualization
def read_from_kafka():
    # Create a Streamlit app
    st.title("Real-Time Network Traffic Analysis for Telecommunications")

    # Create a plot to visualize processed data
    fig, ax = plt.subplots()

    # Initialize an empty list to store processed data and event times
    processed_data = []
    event_times = []

    # Function to process incoming Kafka messages and update the plot
    def process_message(message):
        nonlocal processed_data, event_times
        if message is None:
            return
        if message.error():
            if message.error().code() == KafkaError._PARTITION_EOF:
                return
            else:
                print(f"Error: {message.error()}")
                return

        value = message.value().decode('utf-8')
        data = value.split(',')
        source_ip = data[0]
        destination_ip = data[1]
        bytes_sent = int(data[2])
        event_time = datetime.now()

        row = (source_ip, destination_ip, bytes_sent, event_time)
        df = spark.createDataFrame([row], schema=schema)

        # Perform window-based aggregations
        aggregated_df = df \
            .groupBy("source_ip") \
            .agg(sum("bytes_sent").alias("total_bytes_sent")) \
            .orderBy(desc("total_bytes_sent"))

        # Convert the aggregated DataFrame to JSON
        json_data = aggregated_df.select(to_json(struct("*")).alias("value")).first().value

        # Publish processed data to Kafka topic
        producer.produce(processed_topic, value=json_data.encode('utf-8'))

        # Wait for the message to be delivered to Kafka
        producer.flush()

        # Process the Kafka message and update the plot
        processed_data.append(bytes_sent)
        event_times.append(event_time)
        ax.clear()
        ax.plot(event_times, processed_data)
        ax.set_xlabel("Event Time")
        ax.set_ylabel("Processed Data")
        st.pyplot(fig)

In [None]:
# Continuously read messages from Kafka topic
while True:
      message = consumer.poll(1.0)

      if message is None:
            continue

      if message.error():
            if message.error().code() == KafkaError._PARTITION_EOF:
                continue
            else:
                print(f"Error: {message.error()}")
                break

                process_message(message)

# Run the Streamlit app
read_from_kafka()
