In [None]:
import pandas as pd
import numpy as np
import os
import time
from itertools import cycle
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, f1_score, roc_curve, auc
from sklearn.preprocessing import label_binarize
import matplotlib.pyplot as plt

# Function to optimize DataFrame memory usage
def optimize_dataframe(df):
    for col in df.select_dtypes(include=['int']).columns:
        df[col] = pd.to_numeric(df[col], downcast='integer')
    for col in df.select_dtypes(include=['float']).columns:
        df[col] = pd.to_numeric(df[col], downcast='float')
    return df

# Safe label encoding to handle unseen labels
def safe_label_encode(encoder, values, default_value=-1):
    unique_classes = set(encoder.classes_)
    return [encoder.transform([v])[0] if v in unique_classes else default_value for v in values]

# Load data from a directory
def load_data_from_directory(directory_path):
    data_frames = []
    for file_name in os.listdir(directory_path):
        if file_name.endswith('.csv'):
            file_path = os.path.join(directory_path, file_name)
            df = pd.read_csv(file_path)
            data_frames.append(df)
    return pd.concat(data_frames, ignore_index=True)

# Preprocess data
def preprocess_data(df):
    df['timestamp'] = pd.to_datetime(df['timestamp']).astype(np.int64) // 10**9
    df['arbitration_id'] = df['arbitration_id'].apply(lambda x: int(x, 16) if isinstance(x, str) else x)
    df['data_field'] = df['data_field'].apply(lambda x: int(x, 16) if isinstance(x, str) else x)
    df = optimize_dataframe(df)
    return df

# Extract features and labels
def extract_features_labels(df, label_col='attack'):
    X = df.drop(columns=label_col)
    y = df[label_col]
    return X, y

# Train Decision Tree Model
def train_decision_tree(X_train, y_train):
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    clf = DecisionTreeClassifier()
    start_time = time.time()
    clf.fit(X_train_scaled, y_train)
    training_time = time.time() - start_time
    return clf, scaler, training_time

# Function to Test Decision Tree Model and Calculate ROC
def test_decision_tree(clf, scaler, X_test, y_test):
    X_test_scaled = scaler.transform(X_test)
    start_time = time.time()
    y_pred = clf.predict(X_test_scaled)
    y_pred_prob = clf.predict_proba(X_test_scaled)
    testing_time = time.time() - start_time

    # ROC and AUC Curve Calculation
    n_classes = len(np.unique(y_test))
    if n_classes == 2:
        fpr, tpr, _ = roc_curve(y_test, y_pred_prob[:, 1])
        roc_auc = auc(fpr, tpr)
        plt.figure()
        plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC curve (area = {0:0.4f})'.format(roc_auc))
        plt.plot([0, 1], [0, 1], 'k--', lw=2)
        plt.xlim([0.0, 1.0])
        plt.ylim([0.0, 1.05])
        plt.xlabel('False Positive Rate')
        plt.ylabel('True Positive Rate')
        plt.title('Receiver Operating Characteristic (ROC) Curve')
        plt.legend(loc="lower right")
        plt.show()
        print(f"AUC: {roc_auc:.4f}")
    else:
        y_test_bin = label_binarize(y_test, classes=np.arange(n_classes))
        fpr, tpr, roc_auc = {}, {}, {}
        
        for i in range(n_classes):
            fpr[i], tpr[i], _ = roc_curve(y_test_bin[:, i], y_pred_prob[:, i])
            roc_auc[i] = auc(fpr[i], tpr[i])
        
        plt.figure(figsize=(10, 7))
        colors = cycle(['aqua', 'darkorange', 'cornflowerblue', 'red', 'green'])
        for i, color in zip(range(n_classes), colors):
            plt.plot(fpr[i], tpr[i], color=color, lw=2, label=f'ROC curve of class {i} (area = {roc_auc[i]:0.2f})')
        
        plt.plot([0, 1], [0, 1], 'k--', lw=2)
        plt.xlim([0.0, 1.0])
        plt.ylim([0.0, 1.05])
        plt.xlabel('False Positive Rate')
        plt.ylabel('True Positive Rate')
        plt.title('Receiver Operating Characteristic (ROC) Curve')
        plt.legend(loc="lower right")
        plt.show()
        
        for i in range(n_classes):
            print(f"AUC of class {i}: {roc_auc[i]:.4f}")

    # Evaluate model
    conf_matrix = confusion_matrix(y_test, y_pred)
    accuracy = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred, average='weighted', zero_division=0)
    recall = recall_score(y_test, y_pred, average='weighted', zero_division=0)
    f1 = f1_score(y_test, y_pred, average='weighted', zero_division=0)

    return {
        'conf_matrix': conf_matrix,
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1_score': f1,
        'roc_auc': roc_auc,
        'testing_time': testing_time
    }

# Load, preprocess, and encode data
train_directories = [
    r"C:\Users\Lerin\Project Master's\New\can-train-and-test\set_01\train_01",
    r"C:\Users\Lerin\Project Master's\New\can-train-and-test\set_02\train_01",
    r"C:\Users\Lerin\Project Master's\New\can-train-and-test\set_03\train_01",
    r"C:\Users\Lerin\Project Master's\New\can-train-and-test\set_04\train_01"
]

df_train = pd.concat([load_data_from_directory(dir) for dir in train_directories], ignore_index=True)

# Check and remove NaN values in the training dataset
if df_train.isna().any().any():
    print("NaN values detected in the training dataset.")
    print(f"Initial training data shape: {df_train.shape}")
    df_train = df_train.dropna()  # Removes rows with any NaN values
    print(f"Training data shape after removing NaN values: {df_train.shape}")
else:
    print("No NaN values detected in the training dataset.")

df_train = preprocess_data(df_train)
X_train, y_train = extract_features_labels(df_train)

# Encoder to handle label encoding
encoder = LabelEncoder()
y_train_encoded = encoder.fit_transform(y_train)

# Train the Decision Tree model
decision_tree_model, scaler, train_time = train_decision_tree(X_train, y_train_encoded)

# Test data
test_files = [
    r"C:\Users\Lerin\Project Master's\New\can-train-and-test\set_01\test_01_known_vehicle_known_attack\DoS-3.csv",
    r"C:\Users\Lerin\Project Master's\New\can-train-and-test\set_01\test_01_known_vehicle_known_attack\DoS-4.csv",
    r"C:\Users\Lerin\Project Master's\New\can-train-and-test\set_01\test_02_unknown_vehicle_known_attack\DoS-3.csv",
    r"C:\Users\Lerin\Project Master's\New\can-train-and-test\set_01\test_02_unknown_vehicle_known_attack\DoS-4.csv",
    r"C:\Users\Lerin\Project Master's\New\can-train-and-test\set_02\test_03_known_vehicle_unknown_attack\DoS-3.csv",
    r"C:\Users\Lerin\Project Master's\New\can-train-and-test\set_02\test_03_known_vehicle_unknown_attack\DoS-4.csv",
    r"C:\Users\Lerin\Project Master's\New\can-train-and-test\set_02\test_04_unknown_vehicle_unknown_attack\DoS-3.csv",
    r"C:\Users\Lerin\Project Master's\New\can-train-and-test\set_02\test_04_unknown_vehicle_unknown_attack\DoS-4.csv",
    r"C:\Users\Lerin\Project Master's\New\can-train-and-test\set_03\test_01_known_vehicle_known_attack\DoS-3.csv",
    r"C:\Users\Lerin\Project Master's\New\can-train-and-test\set_03\test_01_known_vehicle_known_attack\DoS-4.csv",
    r"C:\Users\Lerin\Project Master's\New\can-train-and-test\set_03\test_02_unknown_vehicle_known_attack\DoS-3.csv",
    r"C:\Users\Lerin\Project Master's\New\can-train-and-test\set_03\test_02_unknown_vehicle_known_attack\DoS-4.csv",
    r"C:\Users\Lerin\Project Master's\New\can-train-and-test\set_04\test_01_known_vehicle_known_attack\DoS-3.csv",
    r"C:\Users\Lerin\Project Master's\New\can-train-and-test\set_04\test_01_known_vehicle_known_attack\DoS-4.csv",
    r"C:\Users\Lerin\Project Master's\New\can-train-and-test\set_04\test_02_unknown_vehicle_known_attack\DoS-3.csv",
    r"C:\Users\Lerin\Project Master's\New\can-train-and-test\set_04\test_02_unknown_vehicle_known_attack\DoS-4.csv"
]

# Load and preprocess test data
df_test = pd.concat([pd.read_csv(file) for file in test_files], ignore_index=True)

# Check and remove NaN values in the test dataset
if df_test.isna().any().any():
    print("NaN values detected in the test dataset.")
    print(f"Initial test data shape: {df_test.shape}")
    df_test = df_test.dropna()  # Removes rows with any NaN values
    print(f"Test data shape after removing NaN values: {df_test.shape}")
else:
    print("No NaN values detected in the test dataset.")

df_test = preprocess_data(df_test)
X_test, y_test = extract_features_labels(df_test)

# Encode test labels
y_test_encoded = safe_label_encode(encoder, y_test, default_value=-1)

# Test the Decision Tree model
results = test_decision_tree(decision_tree_model, scaler, X_test, y_test_encoded)

# Print results
print(f"Confusion Matrix:\n{results['conf_matrix']}")
print(f"Accuracy: {results['accuracy']:.4f}")
print(f"Precision: {results['precision']:.4f}")
print(f"Recall: {results['recall']:.4f}")
print(f"F1 Score: {results['f1_score']:.4f}")
print(f"Testing Time: {results['testing_time']:.2f} seconds")


In [1]:
import pandas as pd
import numpy as np
import os
import time
from itertools import cycle
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, f1_score, roc_curve, auc
from sklearn.preprocessing import label_binarize
import matplotlib.pyplot as plt

In [2]:
# Function to optimize DataFrame memory usage
def optimize_dataframe(df):
    for col in df.select_dtypes(include=['int']).columns:
        df[col] = pd.to_numeric(df[col], downcast='integer')
    for col in df.select_dtypes(include=['float']).columns:
        df[col] = pd.to_numeric(df[col], downcast='float')
    return df

# Safe label encoding to handle unseen labels
def safe_label_encode(encoder, values, default_value=-1):
    unique_classes = set(encoder.classes_)
    return [encoder.transform([v])[0] if v in unique_classes else default_value for v in values]

# Load data from a directory
def load_data_from_directory(directory_path):
    data_frames = []
    for file_name in os.listdir(directory_path):
        if file_name.endswith('.csv'):
            file_path = os.path.join(directory_path, file_name)
            df = pd.read_csv(file_path)
            data_frames.append(df)
    return pd.concat(data_frames, ignore_index=True)

# Preprocess data
def preprocess_data(df):
    df['timestamp'] = pd.to_datetime(df['timestamp']).astype(np.int64) // 10**9
    df['arbitration_id'] = df['arbitration_id'].apply(lambda x: int(x, 16) if isinstance(x, str) else x)
    df['data_field'] = df['data_field'].apply(lambda x: int(x, 16) if isinstance(x, str) else x)
    df = optimize_dataframe(df)
    return df

# Extract features and labels
def extract_features_labels(df, label_col='attack'):
    X = df.drop(columns=label_col)
    y = df[label_col]
    return X, y

In [3]:
# Train Decision Tree Model
def train_decision_tree(X_train, y_train):
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    clf = DecisionTreeClassifier()
    start_time = time.time()
    clf.fit(X_train_scaled, y_train)
    training_time = time.time() - start_time
    return clf, scaler, training_time

# Function to Test Decision Tree Model and Calculate ROC
def test_decision_tree(clf, scaler, X_test, y_test):
    X_test_scaled = scaler.transform(X_test)
    start_time = time.time()
    y_pred = clf.predict(X_test_scaled)
    y_pred_prob = clf.predict_proba(X_test_scaled)
    testing_time = time.time() - start_time

    # ROC and AUC Curve Calculation
    n_classes = len(np.unique(y_test))
    if n_classes == 2:
        fpr, tpr, _ = roc_curve(y_test, y_pred_prob[:, 1])
        roc_auc = auc(fpr, tpr)
        plt.figure()
        plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC curve (area = {0:0.4f})'.format(roc_auc))
        plt.plot([0, 1], [0, 1], 'k--', lw=2)
        plt.xlim([0.0, 1.0])
        plt.ylim([0.0, 1.05])
        plt.xlabel('False Positive Rate')
        plt.ylabel('True Positive Rate')
        plt.title('Receiver Operating Characteristic (ROC) Curve')
        plt.legend(loc="lower right")
        plt.show()
        print(f"AUC: {roc_auc:.4f}")
    else:
        y_test_bin = label_binarize(y_test, classes=np.arange(n_classes))
        fpr, tpr, roc_auc = {}, {}, {}
        
        for i in range(n_classes):
            fpr[i], tpr[i], _ = roc_curve(y_test_bin[:, i], y_pred_prob[:, i])
            roc_auc[i] = auc(fpr[i], tpr[i])
        
        plt.figure(figsize=(10, 7))
        colors = cycle(['aqua', 'darkorange', 'cornflowerblue', 'red', 'green'])
        for i, color in zip(range(n_classes), colors):
            plt.plot(fpr[i], tpr[i], color=color, lw=2, label=f'ROC curve of class {i} (area = {roc_auc[i]:0.2f})')
        
        plt.plot([0, 1], [0, 1], 'k--', lw=2)
        plt.xlim([0.0, 1.0])
        plt.ylim([0.0, 1.05])
        plt.xlabel('False Positive Rate')
        plt.ylabel('True Positive Rate')
        plt.title('Receiver Operating Characteristic (ROC) Curve')
        plt.legend(loc="lower right")
        plt.show()
        
        for i in range(n_classes):
            print(f"AUC of class {i}: {roc_auc[i]:.4f}")

    # Evaluate model
    conf_matrix = confusion_matrix(y_test, y_pred)
    accuracy = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred, average='weighted', zero_division=0)
    recall = recall_score(y_test, y_pred, average='weighted', zero_division=0)
    f1 = f1_score(y_test, y_pred, average='weighted', zero_division=0)

    return {
        'conf_matrix': conf_matrix,
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1_score': f1,
        'roc_auc': roc_auc,
        'testing_time': testing_time
    }

In [5]:
# Load, preprocess, and encode data
train_directories = [
    r"C:\Users\Lerin\Project Master's\New\can-train-and-test\set_01\train_01",
    r"C:\Users\Lerin\Project Master's\New\can-train-and-test\set_02\train_01",
    r"C:\Users\Lerin\Project Master's\New\can-train-and-test\set_03\train_01",
    r"C:\Users\Lerin\Project Master's\New\can-train-and-test\set_04\train_01"
]

df_train = pd.concat([load_data_from_directory(dir) for dir in train_directories], ignore_index=True)
df_train = preprocess_data(df_train)
df_train = preprocess_data(df_train)
X_train, y_train = extract_features_labels(df_train)

# Encoder to handle label encoding
encoder = LabelEncoder()
y_train_encoded = encoder.fit_transform(y_train)

In [6]:
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import train_test_split
import numpy as np
import time

# Assuming X_train and y_train_encoded are already defined

# Step 1: Handle missing values with SimpleImputer
imputer = SimpleImputer(strategy='mean')  # You can use 'median' or 'most_frequent' based on your data
X_train_imputed = imputer.fit_transform(X_train)

# Step 2: Standardize the data (Optional but recommended)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train_imputed)

# Step 3: Train the Decision Tree model
start_time = time.time()
decision_tree_model = DecisionTreeClassifier()
decision_tree_model.fit(X_train_scaled, y_train_encoded)
train_time = time.time() - start_time

print(f"Model training completed in {train_time} seconds.")


Model training completed in 199.18320417404175 seconds.


In [13]:
test_files = [
    r"C:\Users\Lerin\Project Masters\New\can-train-and-test\set_01\test_01_known_vehicle_known_attack\DoS-3.csv",
    r"C:\Users\Lerin\Project Masters\New\can-train-and-test\set_01\test_01_known_vehicle_known_attack\DoS-4.csv",
    r"C:\Users\Lerin\Project Masters\New\can-train-and-test\set_01\test_02_unknown_vehicle_known_attack\DoS-3.csv",
    r"C:\Users\Lerin\Project Masters\New\can-train-and-test\set_01\test_02_unknown_vehicle_known_attack\DoS-4.csv",
    r"C:\Users\Lerin\Project Masters\New\can-train-and-test\set_02\test_03_known_vehicle_unknown_attack\DoS-3.csv",
    r"C:\Users\Lerin\Project Masters\New\can-train-and-test\set_02\test_03_known_vehicle_unknown_attack\DoS-4.csv",
    r"C:\Users\Lerin\Project Masters\New\can-train-and-test\set_02\test_04_unknown_vehicle_unknown_attack\DoS-3.csv",
    r"C:\Users\Lerin\Project Masters\New\can-train-and-test\set_02\test_04_unknown_vehicle_unknown_attack\DoS-4.csv",
    r"C:\Users\Lerin\Project Masters\New\can-train-and-test\set_03\test_01_known_vehicle_known_attack\DoS-3.csv",
    r"C:\Users\Lerin\Project Masters\New\can-train-and-test\set_03\test_01_known_vehicle_known_attack\DoS-4.csv",
    r"C:\Users\Lerin\Project Masters\New\can-train-and-test\set_03\test_02_unknown_vehicle_known_attack\DoS-3.csv",
    r"C:\Users\Lerin\Project Masters\New\can-train-and-test\set_03\test_02_unknown_vehicle_known_attack\DoS-4.csv",
    r"C:\Users\Lerin\Project Masters\New\can-train-and-test\set_04\test_01_known_vehicle_known_attack\DoS-3.csv",
    r"C:\Users\Lerin\Project Masters\New\can-train-and-test\set_04\test_01_known_vehicle_known_attack\DoS-4.csv",
    r"C:\Users\Lerin\Project Masters\New\can-train-and-test\set_04\test_02_unknown_vehicle_known_attack\DoS-3.csv",
    r"C:\Users\Lerin\Project Masters\New\can-train-and-test\set_04\test_02_unknown_vehicle_known_attack\DoS-4.csv"
]


df_test = pd.concat([pd.read_csv(file) for file in test_files], ignore_index=True)
df_test = preprocess_data(df_test)
X_test, y_test = extract_features_labels(df_test)
y_test_encoded = encoder.transform(y_test)

FileNotFoundError: [Errno 2] No such file or directory: 'C:\\Users\\Lerin\\Project Masters\\New\\can-train-and-test\\set_03\\test_01_known_vehicle_known_attack\\DoS-3.csv'