In [18]:
import numpy as np
import pandas as pd
import seaborn as sns
from matplotlib import pyplot
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import to_categorical
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix

In [37]:
class Data:
    def __init__(self, file_path):
        self.data = pd.read_csv(file_path)
        self.numeric_data = self.data.drop(columns=['timestamp', 'source', 'destination'])
        self.X = self.numeric_data.drop(columns=['protocol']).values
        self.Y = self.numeric_data['protocol'].values
        self.scaler = StandardScaler()
        self.X_scaled = self.scaler.fit_transform(self.X)
        '''self.data = pd.read_csv(file_path)
        self.label_encoder = LabelEncoder()
        self.data['protocol_encoded'] = self.label_encoder.fit_transform(self.data['protocol'])
        self.numeric_data = self.data.drop(columns=['timestamp', 'source', 'destination', 'protocol'])
        self.X = self.numeric_data.values
        self.Y = self.data['protocol_encoded'].values
        self.scaler = StandardScaler()
        self.X_scaled = self.scaler.fit_transform(self.X)'''

    def TrainTestSplit(self, trainTestSplitRatio=0.6):
        numeric_data = self.data.drop(columns=['timestamp', 'source', 'destination'])
        numeric_data['protocol'] = pd.Categorical(numeric_data['protocol']).codes
        self.X = numeric_data.drop(columns=['protocol']).values
        self.Y = numeric_data['protocol'].values
        self.scaler = StandardScaler()
        self.X_scaled = self.scaler.fit_transform(self.X)
        self.Y_encoded = to_categorical(self.Y)
        self.n_train = int(round(len(self.data) * trainTestSplitRatio, 0))
        self.trainX, self.testX = self.X_scaled[:self.n_train, :], self.X_scaled[self.n_train:, :]
        self.trainY, self.testY = self.Y_encoded[:self.n_train], self.Y_encoded[self.n_train:]
        self.trainY = np.argmax(self.trainY, axis=1)
        self.testY = np.argmax(self.testY, axis=1)
        return (self.trainX, self.trainY, self.testX, self.testY)
        '''self.Y_encoded = to_categorical(self.Y)
        self.n_train = int(round(len(self.data) * trainTestSplitRatio, 0))
        self.trainX, self.testX = self.X_scaled[:self.n_train, :], self.X_scaled[self.n_train:, :]
        self.trainY, self.testY = self.Y_encoded[:self.n_train], self.Y_encoded[self.n_train:]
        return (self.trainX, self.trainY, self.testX, self.testY)'''

In [38]:
data_path = "/home/PFAII/ML/TrainTest.csv"
data = Data(data_path)
trainX, trainY, testX, testY = data.TrainTestSplit()
input_dimensions = (trainX.shape[1], 1)
if len(trainY.shape) == 1:
    trainY_encoded = to_categorical(trainY)
    testY_encoded = to_categorical(testY)
else:
    trainY_encoded, testY_encoded = trainY, testY

In [39]:
model = Sequential([
    LSTM(64, activation='relu', input_shape=input_dimensions, return_sequences=True),
    Dropout(0.2),
    LSTM(32),
    Dropout(0.2),
    Dense(4, activation='softmax')  
])

  super().__init__(**kwargs)


In [40]:
optimizer = Adam(learning_rate=0.001)
model.compile(optimizer=optimizer, 
              loss='categorical_crossentropy', 
              metrics=['accuracy'])
num_classes = 3

print("Shape of trainY:", trainY.shape)
print("Shape of testY:", testY.shape)
print("Shape of trainX:", trainY.shape)
print("Shape of testX:", testY.shape)
history = model.fit(trainX, trainY_encoded, epochs=50, batch_size=32, validation_data=(testX, testY_encoded))
loss, accuracy = model.evaluate(testX, testY_encoded)
print(f'Loss: {loss}, Accuracy: {accuracy}')

Shape of trainY: (3454,)
Shape of testY: (2302,)
Shape of trainX: (3454,)
Shape of testX: (2302,)
Epoch 1/50
[1m108/108[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 4ms/step - accuracy: 0.9481 - loss: 1.0893 - val_accuracy: 0.8723 - val_loss: 0.9222
Epoch 2/50
[1m108/108[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step - accuracy: 0.9924 - loss: 0.0502 - val_accuracy: 0.8723 - val_loss: 0.6814
Epoch 3/50
[1m108/108[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step - accuracy: 0.9929 - loss: 0.0273 - val_accuracy: 0.9487 - val_loss: 0.5447
Epoch 4/50
[1m108/108[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3ms/step - accuracy: 0.9986 - loss: 0.0110 - val_accuracy: 0.9487 - val_loss: 0.4452
Epoch 5/50
[1m108/108[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step - accuracy: 0.9993 - loss: 0.0102 - val_accuracy: 0.9431 - val_loss: 0.3700
Epoch 6/50
[1m108/108[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step - accu

In [43]:
predicted_probs = model.predict(testX)
predicted_labels = np.argmax(predicted_probs, axis=1)
n_train = data.n_train
true_labels = data.Y[n_train:]

[1m72/72[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 834us/step


In [44]:
accuracy = accuracy_score(true_labels, predicted_labels)
print(f'Accuracy: {accuracy:.4f}')

precision = precision_score(true_labels, predicted_labels, average='macro')
recall = recall_score(true_labels, predicted_labels, average='macro')
f1 = f1_score(true_labels, predicted_labels, average='macro')

print(f'Precision: {precision:.4f}')
print(f'Recall: {recall:.4f}')
print(f'F1 Score: {f1:.4f}')

cm = confusion_matrix(true_labels, predicted_labels)
print('Confusion Matrix:')
print(cm)

Accuracy: 0.9487
Precision: 0.4861
Recall: 0.5000
F1 Score: 0.4929
Confusion Matrix:
[[   0    0   84    0]
 [   0  176    0    0]
 [   0    0 2008    0]
 [   0    0   34    0]]


  _warn_prf(average, modifier, msg_start, len(result))
