In [None]:

import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.utils import class_weight
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint

# Load the dataset
def load_dataset(file_path):
    data = pd.read_csv(file_path)
    labels = data['Label'].astype(str)

    # Encode labels
    label_mapping = {label: idx for idx, label in enumerate(labels.unique())}
    labels = labels.map(label_mapping)

    # Select features
    features = data.drop(columns=['Label'])

    # Normalize data
    features = (features - features.min()) / (features.max() - features.min())

    # Split the data
    train_data, test_data, train_labels, test_labels = train_test_split(
        features, labels, test_size=0.2, stratify=labels
    )
    train_data, val_data, train_labels, val_labels = train_test_split(
        train_data, train_labels, test_size=0.1, stratify=train_labels
    )

    return train_data, train_labels, val_data, val_labels, test_data, test_labels

# Build the model
def build_model(input_shape, num_classes):
    model = Sequential([
        Dense(64, activation='relu', input_shape=(input_shape,)),
        Dropout(0.5),
        Dense(64, activation='relu'),
        Dropout(0.5),
        Dense(num_classes, activation='softmax')
    ])
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return model

# Actions based on detected threats
def block_ip(ip_address):
    print(f"Blocking IP address: {ip_address}")
    os.system(f"iptables -A INPUT -s {ip_address} -j DROP")

def isolate_device(mac_address):
    print(f"Isolating device with MAC address: {mac_address}")
    os.system(f"iptables -A INPUT -m mac --mac-source {mac_address} -j DROP")

def send_alert(email, threat_type):
    print(f"Sending alert for {threat_type} to {email}")
    # Example of sending an email (requires setup)
    # Uncomment and configure SMTP server to enable
    # import smtplib
    # server = smtplib.SMTP('smtp.example.com', 587)
    # server.starttls()
    # server.login("your_email@example.com", "your_password")
    # message = f"Subject: Alert: {threat_type} detected!\n\nAction required."
    # server.sendmail("your_email@example.com", email, message)
    # server.quit()

def take_action(threat_type, source_ip, mac_address=None):
    if threat_type in ['DDoS', 'Brute Force']:
        block_ip(source_ip)
    elif threat_type in ['Malware', 'Data Exfiltration']:
        isolate_device(mac_address)
    elif threat_type in ['Phishing', 'Reconnaissance']:
        send_alert("admin@example.com", threat_type)
    else:
        print("Unknown threat type. Logging for review.")

# File path
file_path = 'pcap_data.csv'  # Replace with your actual file location

# Load the dataset
train_data, train_labels, val_data, val_labels, test_data, test_labels = load_dataset(file_path)

# One-hot encode labels
num_classes = len(train_labels.unique())
train_labels = to_categorical(train_labels, num_classes)
val_labels = to_categorical(val_labels, num_classes)
test_labels = to_categorical(test_labels, num_classes)

# Build the model
input_shape = train_data.shape[1]
model = build_model(input_shape, num_classes)

# Callbacks
early_stopping = EarlyStopping(monitor='val_loss', patience=10, mode='min')
model_checkpoint = ModelCheckpoint('best_model.h5', save_best_only=True, monitor='val_loss', mode='min')

# Train the model
history = model.fit(
    train_data, train_labels,
    validation_data=(val_data, val_labels),
    epochs=100,
    batch_size=64,
    callbacks=[early_stopping, model_checkpoint]
)

# Evaluate the model
model.load_weights('best_model.h5')
test_loss, test_accuracy = model.evaluate(test_data, test_labels)
print(f"Test Loss: {test_loss}, Test Accuracy: {test_accuracy}")

# Confusion matrix
predictions = model.predict(test_data).argmax(axis=-1)
true_labels = test_labels.argmax(axis=-1)
cm = confusion_matrix(true_labels, predictions)
ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=range(num_classes)).plot()
plt.show()

# Example proactive action
take_action("DDoS", "192.168.1.1")
