In [1]:
!pip install Flask pyshark ipywidgets
!pip install pushbullet.py
import time
import threading
from scapy.all import sniff, IP
import pandas as pd
import ipywidgets as widgets
from IPython.display import display, clear_output
from pushbullet import Pushbullet
from collections import defaultdict



In [2]:
# Pushbullet API Key
PUSHBULLET_API_KEY = "o.AVkuGhXw0o5A05yrpIE0mnOlCMwxmyiR"  # Replace with your API key
pb = Pushbullet(PUSHBULLET_API_KEY)

In [3]:
# Monitoring variables
monitoring = False
cycle_count = 0
packet_counts = defaultdict(int)  # Count packets by IP
packet_sizes = defaultdict(int)  # Track total packet sizes by IP
alerted_ips = set()  # Track alerted IPs to avoid duplicate notifications

In [4]:
# Thresholds for DoS detection
DOS_THRESHOLD = 100  # Minimum number of packets to consider a DoS attack
SIZE_THRESHOLD = 5000  # Minimum total packet size to consider a DoS attack


In [5]:
# Widgets for frontend
alert_widget = widgets.HTML(value="<b>No alerts yet</b>", layout=widgets.Layout(width='100%', height='50px'))
log_output = widgets.Output()
start_button = widgets.Button(description="Start Monitoring", button_style='success')
stop_button = widgets.Button(description="Stop Monitoring", button_style='danger', disabled=True)


In [6]:
# Function to send alerts
def send_alert(ip, reason):
    """Send an alert to Pushbullet and update the alert widget."""
    message = f"Anomaly Detected: DoS Attack from {ip} ({reason})"
    print(f"ALERT: {message}")
    alert_widget.value = f"<b style='color:red;'>{message}</b>"
    try:
        pb.push_note("Network Alert", message)
        print("Notification sent successfully.")
    except Exception as e:
        print(f"Error sending Pushbullet alert: {e}")
    with log_output:
        print(message)

In [7]:
# Simulate packet data for real-time monitoring
def generate_packet():
    """Generate simulated packet data."""
    return {
        'source_ip': f"192.168.1.{random.randint(1, 10)}",  # IP range for simulation
        'size': random.randint(50, 1500)  # Packet size in bytes
    }


In [8]:
# Real-time monitoring logic
def monitor_traffic():
    """Monitor simulated network traffic in real-time."""
    global monitoring, cycle_count, packet_counts, packet_sizes, alerted_ips
    alerted_ips.clear()  # Reset alerted IPs
    packet_counts.clear()  # Reset packet counts
    packet_sizes.clear()  # Reset packet sizes

    while monitoring:
        cycle_count += 1
        new_packet = generate_packet()
        source_ip = new_packet['source_ip']
        packet_size = new_packet['size']
        
        # Update packet counts and sizes
        packet_counts[source_ip] += 1
        packet_sizes[source_ip] += packet_size
        
        # Check for DoS attack based on thresholds
        if (packet_counts[source_ip] > DOS_THRESHOLD or 
            packet_sizes[source_ip] > SIZE_THRESHOLD) and source_ip not in alerted_ips:
            reason = (f"High packet count: {packet_counts[source_ip]} packets"
                      if packet_counts[source_ip] > DOS_THRESHOLD 
                      else f"High data volume: {packet_sizes[source_ip]} bytes")
            send_alert(source_ip, reason)
            alerted_ips.add(source_ip)  # Avoid duplicate alerts for this IP
        
        # Update frontend
        alert_widget.value = f"<b style='color:blue;'>Monitoring running... ({cycle_count} records processed)</b>"
        time.sleep(0.5)  # Simulate packet processing delay


In [9]:
# Start monitoring
def start_monitoring(_):
    """Start the monitoring process."""
    global monitoring
    if monitoring:
        print("Monitoring already running.")
        return
    monitoring = True
    start_button.disabled = True
    stop_button.disabled = False
    alert_widget.value = "<b style='color:blue;'>Monitoring started...</b>"
    threading.Thread(target=monitor_traffic, daemon=True).start()
    print("Monitoring started.")

In [10]:
# Stop monitoring
def stop_monitoring(_):
    """Stop the monitoring process."""
    global monitoring
    monitoring = False
    start_button.disabled = False
    stop_button.disabled = True
    alert_widget.value = "<b style='color:gray;'>Monitoring stopped.</b>"
    print("Monitoring stopped.")

In [11]:
# Button interactions
start_button.on_click(start_monitoring)
stop_button.on_click(stop_monitoring)


In [12]:
# Display widgets
display(alert_widget, log_output, start_button, stop_button)


HTML(value='<b>No alerts yet</b>', layout=Layout(height='50px', width='100%'))

Output()

Button(button_style='success', description='Start Monitoring', style=ButtonStyle())

Button(button_style='danger', description='Stop Monitoring', disabled=True, style=ButtonStyle())