In [57]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import Can_Algorithms as alg
import logging as log
import datetime
import os
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import precision_recall_fscore_support
from sklearn.impute import SimpleImputer
from scipy.interpolate import interp1d

class ScanResult:
    def __init__(self, model, precision, recall, f1_score, support):
        self.model=model
        self.precision = precision
        self.recall = recall
        self.f1_score = f1_score
        self.support=support

dsType="dos"
dos_dataPath="DoS_dataset.csv"
# dos_dataPath="Dataset/1000doS_dataset.csv"
fuzzy_dataPath="Dataset/1000fuzzy_dataset.csv"
datasets = ['dos']
# Create a folder for log files if it doesn't exist
log_folder = 'logs'
os.makedirs(log_folder, exist_ok=True)

# Configure logging to save log file in the folder
log_file = os.path.join(log_folder, 'log_file.txt')

# Configure logging
log.basicConfig(filename=log_file,
                level=log.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

#Function to convert hex string to integer
def hex_to_int(x):
    if isinstance(x, str):
        try:
            return int(x, 16)
        except ValueError:
            return np.nan
        else:
            return x

def getDate():
    current_ts = datetime.datetime.now()
    formatted_time= current_ts.strftime("%Y-%m-%d %H:%M:%S")
    return current_ts,formatted_time

def PreprocessData(data, testSize=0.2):        
        # Assign column names
        data.columns = ['Timestamp', 'CAN_ID', 'DLC', 'DATA0', 'DATA1',
                        'DATA2', 'DATA3', 'DATA4', 'DATA5', 'DATA6', 'DATA7', 'Flag']

        # Convert Flag column to numerical labels using label encoding
        label_encoder = LabelEncoder()
        data['Flag'] = label_encoder.fit_transform(data['Flag'])

        data["Timestamp"] = pd.to_datetime(data["Timestamp"])

        # Convert CAN ID and DLC to integer
        data["CAN_ID"] = data["CAN_ID"].apply(lambda x: int(x, 16) if isinstance(x, str) else x)
        data["DLC"] = data["DLC"].astype(int)
       
        # Convert DATA fields from hexadecimal strings to integers
        for i in range(8):
            data[f"DATA{i}"] = data[f"DATA{i}"].apply(hex_to_int).astype(float)

        # Assuming 'Flag' column represents the target variable
        X = data.drop('Flag', axis=1)  # Features
        y = data['Flag']  # Target variable

        # Splitting the data into training and testing sets
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=testSize, random_state=42)

        # Drop 'Timestamp' column from input features
        X_train = X_train.drop('Timestamp', axis=1)
        X_test = X_test.drop('Timestamp', axis=1)

        # Impute missing values using SimpleImputer
        imputer = SimpleImputer(strategy='mean')
        X_train_imputed = imputer.fit_transform(X_train)
        X_test_imputed = imputer.transform(X_test)
        return X_train_imputed,X_test_imputed,y_train,y_test

def PlotBarGraph(models,metrics,values,title,x_label,y_label):
    # Plotting
    plt.figure(figsize=(5,3))

    # Plot bars for each metric and model
    bar_width = 0.2
    index = np.arange(len(models))

    for i, metric in enumerate(metrics):
        plt.bar(index + i * bar_width, values[i], bar_width, label=metric)

    plt.xlabel(x_label)
    plt.ylabel(y_label)
    plt.title(title)
    plt.xticks(index + bar_width * 1.5, models)
    plt.legend()
    plt.grid(axis='y')
    plt.tight_layout()
    plt.show()

def PlotLineGraph(models,metrics,values,title,x_label,y_label):
    # Plot curved lines for each metric and model
    for i, metric in enumerate(metrics):
        f = interp1d(np.arange(len(models)), values[i], kind='cubic')
        x_new = np.linspace(0, len(models) - 1, 100)
        y_new = f(x_new)
        plt.plot(x_new, y_new, label=metric)

    plt.xlabel(x_label)
    plt.ylabel(y_label)
    plt.title(title)
    plt.xticks(np.arange(len(models)), models)
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()

In [58]:
dataPath=""
for ds_name in datasets:
    log.info("#####################################")
    log.info("Running dataset: [%s]",ds_name)
    log.info("#####################################")

    dataPath=dos_dataPath
    if ds_name=='fuzzy':
        dataPath=fuzzy_dataPath

    #Load Data
    ds = pd.read_csv(dataPath, header=None)

    X_train_imputed,X_test_imputed,y_train,y_test=PreprocessData(ds,0.2)

    # Define the models and their corresponding metrics
    models = ['SVC', 'MLP', 'SGD', 'LRG']
    precision = []
    recall = []
    f1_score = []
    support = []
    results =[]
    
    metrics=[]
    values=[]
    # Calculate total number of samples in the test set
    total_samples = len(y_test)

    # Run and evaluate each model
    for model_name in models:
        if model_name == 'SVC':
            start_ts,formatted_start_ts = getDate()
            log.info("Starting SVC model...[%s]",formatted_start_ts)
            y_pred = alg.SVC_Scan(X_train_imputed, y_train, X_test_imputed)
            end_ts,formatted_end_ts = getDate()
            log.info('SVC model finished. Elapsed time: %s',
             end_ts - start_ts)
        elif model_name == 'MLP':
            start_ts,formatted_start_ts = getDate()
            log.info("Starting MLP model...[%s]",formatted_start_ts)
            end_ts,formatted_end_ts = getDate()
            log.info('MLP model finished. Elapsed time: %s',
             end_ts - start_ts)
            y_pred = alg.MLP_Scan(X_train_imputed, y_train, X_test_imputed)
        elif model_name == 'SGD':
            start_ts,formatted_start_ts = getDate()
            log.info("Starting SGD model...[%s]",formatted_start_ts)
            y_pred = alg.SGD_Scan(X_train_imputed, y_train, X_test_imputed)
            end_ts,formatted_end_ts = getDate()
            log.info('SGD model finished. Elapsed time: %s',
             end_ts - start_ts)
        elif model_name == 'LRG':
            start_ts,formatted_start_ts = getDate()
            log.info("Starting Linear regression model...[%s]",formatted_start_ts)
            y_pred = alg.Linear_regression_Scan(X_train_imputed, y_train, X_test_imputed)            
            end_ts,formatted_end_ts = getDate()
            log.info('Linear regression model finished. Elapsed time: %s',
             end_ts - start_ts)
        elif model_name == 'CNN':
            start_ts,str_ts = getDate()
            log.info("Starting CNN model...[%s]",formatted_start_ts)
            y_pred = alg.CNN_Scan(X_train_imputed, y_train, X_test_imputed)                      
            end_ts,end_ts = getDate()
            log.info('CNN model finished. Elapsed time: %s',
             end_ts - start_ts)


        precision_score, recall_score, f1_score_val, _ = precision_recall_fscore_support(y_test, y_pred, average='weighted', zero_division=1)
        precision.append(precision_score * 100)
        recall.append(recall_score * 100)
        f1_score.append(f1_score_val * 100)

        support_score=(pd.Series(y_pred).value_counts() / len(y_pred) * 100).loc[0]
        support.append(support_score)  # Support calculated based on correct predictions

    # Define the metrics to plot
    metrics = ['Precision', 'Recall', 'F1 Score', 'Support']
    values = [precision, recall, f1_score, support]

    print(values)
    PlotBarGraph(models,metrics,values,'Metrics by Model - '+ds_name,'Models','Score')
    PlotLineGraph(models,metrics,values,'Metrics by Model - '+ds_name,'Models','Score')