In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import OrdinalEncoder
from sklearn.preprocessing import OneHotEncoder
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score, classification_report
import matplotlib.pyplot as plt
from sklearn.model_selection import cross_val_score
from scipy.io import arff
from sklearn.model_selection import GridSearchCV

pd.set_option('display.max_columns', None)

In [None]:
path = r'D:\Studies\Fourth Semester\Autonomous Systems A\Truck-Platooning-Simulation-CARLA\Seminar Papers\Mohamed Amer\ML model\NSL-KDD new dataset\KDDTrain+.arff'
data, meta = arff.loadarff(path)

# Convert to DataFrame
df = pd.DataFrame(data)

# Decode byte strings
for col in df.select_dtypes([object]):
    df[col] = df[col].str.decode('utf-8')

# Show first few rows
print(df.describe(include='all'))

In [None]:
# Check for missing values
print(df.isnull().sum())
#Drop always Zero columns
df = df.drop(['num_outbound_cmds', 'is_host_login', 'is_guest_login'], axis=1)

In [None]:
#encoding the class column
le = LabelEncoder()
df['class'] = le.fit_transform(df['class'])
y=df['class']


In [None]:
categorical_cols=df.select_dtypes(include=['object']).columns
print(categorical_cols)

In [None]:
# One-hot encoding categorical features
encoder = OneHotEncoder(sparse=False) 
encoded_categories = encoder.fit_transform(df[['protocol_type', 'service', 'flag', 'land', 'logged_in']])
encoded_df = pd.DataFrame(encoded_categories, columns=encoder.get_feature_names_out(['protocol_type', 'service', 'flag', 'land', 'logged_in']))
X_copy = df.copy()
X_copy.drop(columns=['protocol_type', 'service', 'flag', 'land', 'logged_in'], inplace=True)
X_full = pd.concat([X_copy, encoded_df], axis=1)
X_full = X_full.drop(['class'], axis=1)

In [None]:
#inital model to test
rf = RandomForestClassifier(
    n_estimators=500,
    max_depth=None,
    min_samples_split=10,
    min_samples_leaf=5,
    max_features='log2',
    random_state=42,
    class_weight='balanced'
)
rf.fit(X_full, y)

#feature importances
importances = rf.feature_importances_
feature_names = X_full.columns

feature_importance_df = pd.DataFrame({
    'feature': feature_names,
    'importance': importances
})
feature_importance_df.sort_values(by='importance', ascending=False, inplace=True)


In [None]:

#Plot top features and reduce dimensions
top_n = 30
plt.figure(figsize=(10, 6))
plt.title("Top Feature Importances")
plt.barh(feature_importance_df['feature'][:top_n][::-1], feature_importance_df['importance'][:top_n][::-1])
plt.xlabel("Importance")
plt.tight_layout()
plt.show()
top_features = feature_importance_df['feature'].iloc[:top_n].values
X_reduced = X_full[top_features]


In [None]:

# Finding best parameters
param_grid = {
    'n_estimators': [100, 300],
    'max_depth': [None, 10, 30],
    'min_samples_split': [2, 10],
    'min_samples_leaf': [1, 5],
    'max_features': ['sqrt', 'log2'],
    'class_weight': ['balanced']
}

grid_search = GridSearchCV(
    RandomForestClassifier(random_state=42),
    param_grid,
    cv=3,
    scoring='f1_macro',
    n_jobs=-1
)

grid_search.fit(X_reduced, y)



In [None]:

print("Best parameters found:")
print(grid_search.best_params_)

print("Best F1 macro score:")
print(grid_search.best_score_)

In [None]:
rf = RandomForestClassifier(
    n_estimators=100,
    max_depth=None,
    min_samples_split=2,
    min_samples_leaf=1,
    max_features='sqrt',
    random_state=42,
    class_weight='balanced'
)
rf.fit(X_full, y)

In [None]:
test_path = r'D:\Studies\Fourth Semester\Autonomous Systems A\Truck-Platooning-Simulation-CARLA\Seminar Papers\Mohamed Amer\ML model\NSL-KDD new dataset\KDDTest+.arff'
data_test, meta_test = arff.loadarff(test_path)
df_test = pd.DataFrame(data_test)
for col in df_test.select_dtypes([object]):
    df_test[col] = df_test[col].str.decode('utf-8')


In [None]:
encoded_categories_test = encoder.transform(df_test[['protocol_type', 'service', 'flag', 'land', 'logged_in']])

encoded_df_test = pd.DataFrame(encoded_categories_test, columns=encoder.get_feature_names_out(['protocol_type', 'service', 'flag', 'land', 'logged_in']))

df_test_copy = df_test.copy()

df_test_copy.drop(columns=['protocol_type', 'service', 'flag', 'land', 'logged_in','num_outbound_cmds', 'is_host_login', 'is_guest_login'], inplace=True)

X_test_full = pd.concat([df_test_copy, encoded_df_test], axis=1)

In [None]:
# Drop target column from test features
X_test_full = X_test_full.drop(['class'], axis=1)
x_test_full = X_test_full[top_features]

y_test = df_test['class']

In [None]:

y_test_encoded = le.transform(y_test)  
y_pred = rf.predict(X_test_full)

print("Accuracy:", accuracy_score(y_test_encoded, y_pred))
print(classification_report(y_test_encoded, y_pred))

In [None]:
import seaborn as sns
from sklearn.metrics import confusion_matrix
report = classification_report(y_test_encoded, y_pred, output_dict=True)
report_df = pd.DataFrame(report).transpose()
plt.figure(figsize=(10, 6))
sns.heatmap(report_df.iloc[:-1, :-1], annot=True, cmap='Blues', fmt=".2f")
plt.title("Classification Report Heatmap")
plt.show()
cm = confusion_matrix(y_test_encoded, y_pred)
plt.figure(figsize=(6, 5))
sns.heatmap(cm, annot=True, fmt='d', cmap='Oranges', xticklabels=['Normal', 'Attack'], yticklabels=['Normal', 'Attack'])
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Confusion Matrix')
plt.show()