#### Intrusion detection system using LSTM

In [274]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
plt.style.use('default')
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torch.utils.data import Dataset

In [275]:
sonata_data = pd.read_csv('sonata_data.csv')
soul_data = pd.read_csv('soul_data.csv')
spark_data = pd.read_csv('spark_data.csv')

In [276]:
print("Sonata shape: ", sonata_data.shape)
print("Soul shape: ", soul_data.shape)
print("Spark shape: ", spark_data.shape)

Sonata shape:  (535041, 12)
Soul shape:  (797843, 12)
Spark shape:  (402956, 12)


In [277]:
spark_data.head()

Unnamed: 0,timestamp,canId,dlc,data0,data1,data2,data3,data4,data5,data6,data7,flag
0,1513920000.0,04C1,8,0,CC,80,5E,52,08,00,00,R
1,1513920000.0,04C7,3,10,00,00,-1,-1,-1,-1,-1,R
2,1513920000.0,01E1,7,0,00,00,00,00,00,00,-1,R
3,1513920000.0,00C1,8,0,F9,05,41,02,85,8B,91,R
4,1513920000.0,00C5,8,3,52,0F,1D,C3,F4,03,D4,R


In [278]:
flag_values = pd.DataFrame(columns=['Sonata', 'Soul', 'Spark'])
flag_values['Sonata'] = sonata_data['flag'].value_counts()
flag_values['Soul'] = soul_data['flag'].value_counts()
flag_values['Spark'] = spark_data['flag'].value_counts()
flag_values

Unnamed: 0_level_0,Sonata,Soul,Spark
flag,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
R,468527,717489,366510
T,66514,80354,36446


In [279]:
sonata_data = sonata_data.drop(columns=['timestamp'])
soul_data = soul_data.drop(columns=['timestamp'])
spark_data = spark_data.drop(columns=['timestamp'])
spark_data.head()

Unnamed: 0,canId,dlc,data0,data1,data2,data3,data4,data5,data6,data7,flag
0,04C1,8,0,CC,80,5E,52,08,00,00,R
1,04C7,3,10,00,00,-1,-1,-1,-1,-1,R
2,01E1,7,0,00,00,00,00,00,00,-1,R
3,00C1,8,0,F9,05,41,02,85,8B,91,R
4,00C5,8,3,52,0F,1D,C3,F4,03,D4,R


In [280]:
X_sonata = sonata_data.drop(columns=['flag'])
y_sonata = sonata_data['flag']

X_soul = soul_data.drop(columns=['flag'])
y_soul = soul_data['flag']

X_spark = spark_data.drop(columns=['flag'])
y_spark = spark_data['flag']

In [281]:
X_sonata.head()

Unnamed: 0,canId,dlc,data0,data1,data2,data3,data4,data5,data6,data7
0,02C0,8,14,0,0,00,00,00,00,0
1,0002,8,0,0,0,00,00,03,0B,11
2,0153,8,0,21,10,FF,00,FF,00,0
3,043F,8,10,40,60,FF,5A,87,08,0
4,0370,8,0,20,0,00,00,00,00,0


In [282]:
y_sonata.head()

0    R
1    R
2    R
3    R
4    R
Name: flag, dtype: object

In [283]:
print(X_sonata.dtypes)

canId    object
dlc       int64
data0    object
data1    object
data2    object
data3    object
data4    object
data5    object
data6    object
data7    object
dtype: object


In [284]:
def preprocess_hex_values(data):
    for col in data.columns:
        if col.startswith('data') or col == 'canId':
            data[col] = data[col].apply(lambda x: int(x, 16))
    return data

X_sonata = preprocess_hex_values(X_sonata)

In [285]:
label_encoder = LabelEncoder()
y_sonata = label_encoder.fit_transform(y_sonata)
y_soul = label_encoder.fit_transform(y_soul)
y_spark = label_encoder.fit_transform(y_spark)

In [286]:
X_train_sonata, X_test_sonata, y_train_sonata, y_test_sonata = train_test_split(X_sonata, y_sonata, test_size=0.2, random_state=42)


class CanBusDataset(Dataset):
    def __init__(self, X, y):
        self.X = X.values if isinstance(X, pd.DataFrame) else np.array(X)
        self.y = y.values if isinstance(y, pd.Series) else np.array(y)
        
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        X_values = self.X[idx].astype(np.float32)
        y_values = self.y[idx].astype(np.float32)
        return torch.tensor(X_values), torch.tensor(y_values)
    

train_dataset_sonata = CanBusDataset(X_train_sonata, y_train_sonata)
test_dataset_sonata = CanBusDataset(X_test_sonata, y_test_sonata)

train_loader_sonata = DataLoader(train_dataset_sonata, batch_size=512, shuffle=True)
test_loader_sonata = DataLoader(test_dataset_sonata, batch_size=512, shuffle=False)


In [307]:
for idx, data in enumerate(train_loader_sonata):
    datas = data[0]
    labels = data[1]
    print("Datas shape:", datas.shape)
    print("Labels shape:", labels.shape)
    break

Datas shape: torch.Size([512, 10])
Labels shape: torch.Size([512])


In [309]:
len(train_dataset_sonata), len(test_dataset_sonata)

(428032, 107009)

In [335]:
class MyNeuralNetwork(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers):
        super(MyNeuralNetwork, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers=num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
        self.activation = nn.Sigmoid()

        print()

    def forward(self, X):
        if len(X.shape) == 2:  # Shape [batch_size, input_size]
            X = X.unsqueeze(1)  # Add a sequence dimension -> [batch_size, 1, input_size]
        batch_size = X.size(0)
        print("Batch size:", batch_size)
        print("X shape:", X.shape)
        hidden = torch.zeros(self.num_layers, batch_size, self.hidden_size)
        carry = torch.zeros(self.num_layers, batch_size, self.hidden_size)
        print("Hidden shape:", hidden.shape)
        print("Carry shape:", carry.shape)
        output, (hidden, carry) = self.lstm(X, (hidden, carry))
        output = self.fc(output[:, -1])
        return self.activation(output)

    def fit(self, train_dataloader, test_dataloader, epochs, optimizer, loss_function):
        train_losses = []
        test_losses = []

        for epoch in range(epochs):
            self.train()
            train_loss = 0.0
            for X_batch, y_batch in train_dataloader:
                print(X_batch.shape)
                optimizer.zero_grad()
                y_pred = self.forward(X_batch).squeeze()
                y_batch = y_batch.float().squeeze()
                loss = loss_function(y_pred, y_batch)
                loss.backward()
                optimizer.step()
                train_loss += loss.item()
            train_losses.append(train_loss / len(train_dataloader))

            self.eval()
            test_loss = 0.0
            with torch.no_grad():
                for X_batch, y_batch in test_dataloader:
                    y_pred = self.forward(X_batch).squeeze()
                    y_batch = y_batch.float().squeeze()
                    loss = loss_function(y_pred, y_batch)
                    test_loss += loss.item()
            test_losses.append(test_loss / len(test_dataloader))

            print(f"Epoch {epoch+1}/{epochs} - Train Loss: {train_losses[-1]:.4f}, Test Loss: {test_losses[-1]:.4f}")

        return train_losses, test_losses

    def predict(self, dataloader):
        all_outputs = []
        self.eval()
        with torch.no_grad():
            for X_batch, _ in dataloader:
                outputs = self.forward(X_batch)
                predicted = (outputs > 0.5).float()
                all_outputs.extend(predicted.numpy())
        return np.array(all_outputs)


In [288]:
def verify_parameters(y_test, y_pred_tab, parameters):
    results = pd.DataFrame(columns=['Accuracy', 'Precision', 'Recall', 'F1 Score'])
    for i in range(len(parameters)):
        accuracy_score_test = accuracy_score(y_test, y_pred_tab[i])
        precision_score_test = precision_score(y_test, y_pred_tab[i], average='macro')
        recall_score_test = recall_score(y_test, y_pred_tab[i], average='macro')
        f1_score_test = f1_score(y_test, y_pred_tab[i], average='macro')
        results.loc[str(parameters[i])] = [accuracy_score_test, precision_score_test, recall_score_test, f1_score_test]

    return results

In [289]:
def losses_chart(train_losses, test_losses, title):
    plt.figure(figsize=(6, 4))
    plt.ylim(0, 2)
    plt.plot(train_losses, label='Train loss')
    plt.plot(test_losses, label='Test loss')
    plt.title(title)
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()

In [290]:
input_size = X_train_sonata.shape[1]
input_size


10

In [None]:
lstm = MyNeuralNetwork(input_size=10, hidden_size=512, output_size=1, num_layers=1)
loss = lstm.fit(train_loader_sonata, test_loader_sonata, 10, optim.Adam(lstm.parameters(), lr=0.0001), nn.BCELoss())
predictions = lstm.predict(test_loader_sonata)
accuracy_score_test = accuracy_score(y_test_sonata, predictions)