In [1]:
import os
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.cluster import DBSCAN, KMeans
from sklearn.svm import OneClassSVM
from sklearn.metrics import accuracy_score, average_precision_score, silhouette_score
# import xgboost as xgb

In [2]:
data_dir = '../car_hacking_data/'
os.listdir(data_dir)

['Fuzzy_dataset.csv',
 'normal_run_data.txt',
 'gear_dataset.csv',
 '.DS_Store',
 'RPM_dataset.csv',
 'DoS_dataset.csv']

In [3]:
benign_data_path = os.path.join(data_dir, "normal_run_data.txt")
dos_data_path = os.path.join(data_dir, 'DoS_dataset.csv')

In [4]:
hex_to_dec = lambda x: int(x, 16)

## Since there are varying DLCs (2,5,6,8) in order to maintain data integrity
## The data must be padded with 00s when DLC < 8

def shift_columns(df):
    
    for dlc in [2,5,6]:

        df.loc[df['dlc'] == dlc, df.columns[3:]] = df.loc[df['dlc'] == dlc, df.columns[3:]].shift(periods=8-dlc, axis='columns', fill_value='00')

    return df
    

In [5]:
def read_attack_data(data_path):
    
    columns = ['timestamp','can_id', 'dlc', 'data0', 'data1', 'data2', 'data3', 'data4', 
           'data5', 'data6', 'data7', 'flag']
    
    data = pd.read_csv(data_path, names = columns)

    data = shift_columns(data)
    
    ##Replacing all NaNs with '00' 
    data = data.replace(np.NaN, '00')
    
    ##Joining all data columns to put all data in one column
    data_cols = ['data0', 'data1', 'data2', 'data3', 'data4', 'data5', 'data6', 'data7']
    
    ##The data column is in hexadecimal
    data['data'] = data[data_cols].apply(''.join, axis=1)
    data.drop(columns = data_cols, inplace = True, axis = 1)
    
    ##Converting columns to decimal
    data['can_id'] = data['can_id'].apply(hex_to_dec)
    data['data'] = data['data'].apply(hex_to_dec)
    
    data = data.assign(IAT=data['timestamp'].diff().fillna(0))
    
    return data

    

In [6]:
timestamps = []
ids = []
dlcs = []
data = []

# Read the data from the file
with open(benign_data_path, 'r') as file:
    for line in file:
        # Extract information from each line
        line = line.strip()
        ts = line.split('Timestamp: ')[1].split(' ')[0]
        can_id = line.split('ID: ')[1].split(' ')[0]
        dlc = line.split('DLC: ')[1].split(' ')[0]
        can_data = ''.join(line.split('DLC: ')[1].split(' ')[1:])
        
        #Converting Hexadecimal entries to decimal format
        timestamps.append(float(ts))
        ids.append(hex_to_dec(can_id))
        dlcs.append(int(dlc))
        data.append(hex_to_dec(can_data))
        
benign_data = pd.DataFrame({
    'timestamp': timestamps,
    'can_id': ids,
    'dlc': dlcs,
    'data': data
})

benign_data.sort_values(by = ['timestamp'], inplace = True)

# Creating IAT column
benign_data= benign_data.assign(IAT=benign_data['timestamp'].diff().fillna(0))
benign_data.drop(columns = ['timestamp'], axis = 1, inplace= True)

In [7]:
X = benign_data[['can_id', 'dlc', 'data', 'IAT']].values
X = X[:30_000]

y = np.zeros(X.shape[0])

In [8]:
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size = 0.2)

In [9]:
dos_data = read_attack_data(dos_data_path)

In [10]:
dos_data = dos_data[dos_data['flag'] == 'T'][:20_000]

X_test = dos_data[['can_id', 'dlc', 'data', 'IAT']].values
y_test = dos_data['flag'].replace({'T' : 1}).values

In [11]:
scaler = StandardScaler()

X_train = scaler.fit_transform(X_train)
X_val = scaler.transform(X_val)
X_test = scaler.transform(X_test)

In [12]:
def mse(inp, reconstructions):

    return np.mean(np.square(inp, reconstructions), axis = 1)

def generate_predictions(inp, reconstruction, threshold):

    reconstruction_error = mse(inp, reconstruction)
    preds = reconstruction_error > threshold

    return preds

In [13]:
# print("XGBOOOST")

# xgb_param_grid = {
#         'max_depth' : [5, 7],
#         'booster' : ['gbtree', 'gblinear'],
#         'eta' : [0.3],
#         'eval_metric' : ['rmse'],
#         'n_estimators' : [100, 200],
#         'subsample' : [0.8]}


# xgb_model = xgb.XGBRegressor()


# print("Grid Searching for best model!")
# xgb_gs = GridSearchCV(xgb_model, param_grid=xgb_param_grid, cv = 10, scoring = 'neg_mean_squared_error', verbose = 2)
# xgb_gs.fit(X_train, X_train)

# print("Best Parameters:",xgb_gs.best_params_)

XGBOOOST


NameError: name 'xgb' is not defined

In [None]:
# xgb_best = xgb_gs.best_estimator_

# train_reconstructions = xgb_best.predict(X_train)
# train_reconstruction_error = mse(X_train, train_reconstructions)

# val_reconstructions = xgb_best.predict(X_val)

# test_reconstructions = xgb_best.predict(X_test)

# thresholds_to_test = np.arange(0.1, 1.0, 0.05)

# for thresholds in thresholds_to_test:
    
#     threshold = np.quantile(train_reconstruction_error, thresholds)
    
#     print("Currently testing threshold:", thresholds)

#     val_preds = generate_predictions(X_val, val_reconstructions, threshold)
#     print("Validation Accuracy:", accuracy_score(y_val, val_preds))

#     test_preds = generate_predictions(X_test, test_reconstructions, threshold)
#     print("Testing Accuracy Score:", accuracy_score(y_test, test_preds))

#     print()

In [11]:
clust_scaler = StandardScaler()
X_stan = clust_scaler.fit_transform(X)

anomaly_data = dos_data[['can_id', 'dlc', 'data', 'IAT']].values
anomaly_data = clust_scaler.transform(anomaly_data)
validation_data = anomaly_data[:250]
test_data = anomaly_data[250:]

In [25]:
# Combine X_stan and validation_data for training
combined_data = np.vstack((X_stan, validation_data))

In [27]:
print(len(X_stan))
print(len(validation_data))

30000
250


In [None]:
print("-----SVM------")


svm_param_grid = {
    'kernel': ['linear', 'rbf'],
    'nu': [0.01, 0.05, 0.1, 0.2, 0.3],
    'gamma': ['scale', 'auto']
}

best_score = -1
best_params = {}

total_combinations = len(svm_param_grid['kernel']) * len(svm_param_grid['nu']) * len(svm_param_grid['gamma'])
count = 0

# Iterate over the parameter grid
for kernel in svm_param_grid['kernel']:
    for nu in svm_param_grid['nu']:
        for gamma in svm_param_grid['gamma']:
        
            count += 1
        
            print(f"Testing combination {count} of {total_combinations}")
            # Create an instance of OneClassSVM with current parameter combination
            svm = OneClassSVM(kernel=kernel, nu=nu, gamma=gamma)
        
            # Fit OneClassSVM on training data
            svm.fit(X_train)
        
            # Evaluate performance using a suitable metric (e.g., accuracy score)
            validation_preds = svm.predict(X_val)
            validation_preds[validation_preds == -1] = 0
            score = accuracy_score(y_val, validation_preds)
        
            # Update best parameters if current score is higher
            if score > best_score:
                best_score = score
                best_params = {'kernel': kernel, 'nu': nu, 'gamma': gamma}

print("Best params:", best_params)

# Get the best OneClassSVM model and its parameters
best_svm = OneClassSVM(kernel=best_params['kernel'], nu=best_params['nu'], gamma=best_params['gamma'])
best_svm.fit(X_train)

-----SVM------
Testing combination 1 of 20


In [28]:



# # Use the best OneClassSVM model for anomaly detection on validation data
# val_preds = best_svm.fit_predict(X_val)
# val_preds[val_preds == -1] = 1


# # Use the best OneClassSVM model for anomaly detection on test data
# test_preds = best_svm.predict(X_test)
# test_preds[test_preds == -1] = 1

# print("Validation Accuracy:",accuracy_score(y_val,  val_preds))
# print("Testing Accuracy:",accuracy_score(y_test, test_preds))

NameError: name 'best_svm' is not defined

In [None]:
vm = OneClassSVM()

# Perform hyperparameter search using GridSearchCV
grid_search = GridSearchCV(estimator=svm, param_grid=param_grid, scoring='f1')

# Fit the model to the data
grid_search.fit(X_scaled)

# Get the best hyperparameters and best model
best_params = grid_search.best_params_
best_model = grid_search.best_estimator_

# Fit the best model to the data
best_model.fit(X_scaled)

# Predict on the data
predictions = best_model.predict(X_scaled)

# Calculate F1 score
f1 = f1_score(y_true, predictions)

print("Best Hyperparameters:", best_params)
print("F1 Score:", f1)