In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
pip install scapy



In [3]:
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from scapy.all import *
from collections import deque
from sklearn.preprocessing import OneHotEncoder
import statistics

In [4]:
data_path = '/content/drive/MyDrive/NSL-KDD/KDDTrain+.txt'

column_names = ["duration", "protocol_type", "service", "flag", "src_bytes",
                "dst_bytes", "land", "wrong_fragment", "urgent", "hot",
                "num_failed_logins", "logged_in", "num_compromised",
                "root_shell", "su_attempted", "num_root", "num_file_creations",
                "num_shells", "num_access_files", "num_outbound_cmds",
                "is_host_login", "is_guest_login", "count", "srv_count",
                "serror_rate", "srv_serror_rate", "rerror_rate", "srv_rerror_rate",
                "same_srv_rate", "diff_srv_rate", "srv_diff_host_rate",
                "dst_host_count", "dst_host_srv_count", "dst_host_same_srv_rate",
                "dst_host_diff_srv_rate", "dst_host_same_src_port_rate",
                "dst_host_srv_diff_host_rate", "dst_host_serror_rate",
                "dst_host_srv_serror_rate", "dst_host_rerror_rate",
                "dst_host_srv_rerror_rate", "label"]
data = pd.read_csv(data_path, header=None, names=column_names)

In [5]:
# Encode categorical features using one-hot encoding
categorical_features = ["protocol_type", "service", "flag"]
encoder = OneHotEncoder(sparse=False, drop='first')  # 'drop' option removes the first category to avoid multicollinearity
encoded_features = encoder.fit_transform(data[categorical_features])
encoded_feature_names = encoder.get_feature_names_out(input_features=categorical_features)

encoded_df = pd.DataFrame(encoded_features, columns=encoded_feature_names)



In [6]:
# Reset the index of the original dataset and the encoded DataFrame
data = data.reset_index(drop=True)
encoded_df = encoded_df.reset_index(drop=True)

In [7]:
# Combine encoded features with the original dataset
data_encoded = pd.concat([data.drop(categorical_features, axis=1), encoded_df], axis=1)

In [8]:
# Define features and labels
X = data_encoded.drop("label", axis=1)
y = data_encoded["label"]

KeyboardInterrupt: ignored

In [None]:
# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [None]:
# Standardize features
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

In [None]:
# Create and train the Random Forest classifier for intrusion detection
intrusion_clf = RandomForestClassifier(n_estimators=100, random_state=42)
intrusion_clf.fit(X_train, y_train)

In [None]:
model_filename = 'intrusion_detection_model.joblib'
joblib.dump(intrusion_clf, model_filename)

In [None]:
# Make predictions for intrusion detection
intrusion_predictions = intrusion_clf.predict(X_test)

In [None]:
# Evaluate the intrusion detection model
intrusion_accuracy = accuracy_score(y_test, intrusion_predictions)
intrusion_conf_matrix = confusion_matrix(y_test, intrusion_predictions)
intrusion_classification_report = classification_report(y_test, intrusion_predictions)

In [None]:
print("Intrusion Detection Model:")
print("Accuracy:", intrusion_accuracy)
print("Confusion Matrix:\n", intrusion_conf_matrix)
print("Classification Report:\n", intrusion_classification_report)

In [None]:
# Snort rule-based breach detection (simplified example)
snort_rules = {
    "ssh_bruteforce": "alert tcp any any -> any 22 (content:\"Invalid user\"; msg:\"SSH Bruteforce Attempt\";)",
    # Add more rules as needed
}

In [None]:
def detect_breaches(pkt):
    for rule_name, rule_content in snort_rules.items():
        if pkt.haslayer(TCP) and rule_content.encode() in pkt[TCP].load:
            print(f"Breach Detected ({rule_name}): {pkt[IP].src} -> {pkt[IP].dst}")

In [None]:
# Sniff network traffic and apply breach detection
sniff(filter="tcp", prn=detect_breaches)

In [None]:
# Anomalous traffic detection (simplified statistical approach)
window_size = 10  # Adjust the window size as needed
traffic_window = deque(maxlen=window_size)

In [None]:
def detect_anomalous_traffic(pkt):
    traffic_window.append(pkt)
    if len(traffic_window) == window_size:
        traffic_lengths = [len(p) for p in traffic_window]
        mean_length = statistics.mean(traffic_lengths)
        std_dev = statistics.stdev(traffic_lengths)
        threshold = mean_length + 2 * std_dev  # Adjust the threshold as needed
        if len(pkt) > threshold:
            print("Anomalous Traffic Detected")

# Sniff network traffic and apply anomalous traffic detection
sniff(prn=detect_anomalous_traffic)