# Multiclass Classification Using Bi-LSTM for ADFA-WD Dataset

In [None]:
# mounting drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# importing needed libraries
import os
import gensim
import numpy as np
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Bidirectional, Dense, Embedding
from tensorflow.keras.utils import to_categorical
from sklearn.preprocessing import LabelEncoder
import matplotlib.pyplot as plt


In [None]:
# mappings
label_map = {'N1': 1, 'N2': 2, 'N3': 3, 'N4': 4,
             'P1': 5, 'P2': 6, 'P3': 7, 'P4': 8,
             'S1': 9, 'S2': 10, 'S3': 11, 'S4': 12,
             'Train': 0, 'Valid': 0
             }

# using 3 grams for context capturing
N_GRAM=3


In [None]:
# generate n-grams as strings
def generate_ngrams(sequence, n):
    """Generate n-grams from a sequence as strings of n words."""
    return [' '.join(sequence[i:i + n]) for i in range(len(sequence) - n + 1)]


# load data and create n-grams
def load_data(dataset_path, n):
    sequences = []
    labels = []

    for root, dirs, files in os.walk(dataset_path):
        for file in files:
            if file == '.DS_Store':
                continue
            file_path = os.path.join(root, file)
            with open(file_path, 'r', encoding='ISO-8859-1') as f:
                #print(file)
                #print(root)
                print(file_path)
                sequence = f.read().strip().split()
                '''all.extend(sequence)'''
                n_grams = generate_ngrams(sequence, n)
                sequences.append(n_grams)

                for Lab_substring, label in label_map.items():
                    if Lab_substring in root:
                      labels.append(label)
                      break

    return sequences, labels

In [None]:
# paths
train_path  = "/content/drive/MyDrive/PGSLDATASET/ORDATASET/Full_Trace_Training_Data"
val_path    = "/content/drive/MyDrive/PGSLDATASET/ORDATASET/Full_Trace_Validation_Data"
attack_path = "/content/drive/MyDrive/PGSLDATASET/ORDATASET/Full_Trace_Attack_Data"

# prepare training and validation data
train_sequences  , train_labels   = load_data(train_path  , n=N_GRAM)
val_sequences    , val_labels     = load_data(val_path    , n=N_GRAM)
attack_sequences , attack_labels  = load_data(attack_path , n=N_GRAM)


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
/content/drive/MyDrive/PGSLDATASET/ORDATASET/Full_Trace_Attack_Data/V8-Wireless-Karma-N4-4/V8-Wireless-Karma-N4-4_1112.GHC
/content/drive/MyDrive/PGSLDATASET/ORDATASET/Full_Trace_Attack_Data/V8-Wireless-Karma-N4-4/V8-Wireless-Karma-N4-4_412.GHC
/content/drive/MyDrive/PGSLDATASET/ORDATASET/Full_Trace_Attack_Data/V8-Wireless-Karma-N4-4/V8-Wireless-Karma-N4-4_940.GHC
/content/drive/MyDrive/PGSLDATASET/ORDATASET/Full_Trace_Attack_Data/V8-Wireless-Karma-N4-4/V8-Wireless-Karma-N4-4_1328.GHC
/content/drive/MyDrive/PGSLDATASET/ORDATASET/Full_Trace_Attack_Data/V8-Wireless-Karma-N4-4/V8-Wireless-Karma-N4-4_2688.GHC
/content/drive/MyDrive/PGSLDATASET/ORDATASET/Full_Trace_Attack_Data/V8-Wireless-Karma-N4-4/V8-Wireless-Karma-N4-4_2396.GHC
/content/drive/MyDrive/PGSLDATASET/ORDATASET/Full_Trace_Attack_Data/V8-Wireless-Karma-N4-4/V8-Wireless-Karma-N4-4_1864.GHC
/content/drive/MyDrive/PGSLDATASET/ORDATASET/Full_Trace_Attack_Data/V8-Wirel

In [None]:
sattack_lists = {i: [] for i in range(13)}
# populate the dictionary based on integer values as we will pick from each class
for attack_num, sublist in zip(attack_labels, attack_sequences):
    if attack_num in sattack_lists:  # Check if the value is in the desired range
        sattack_lists[attack_num].append(sublist)


In [None]:
from math import ceil

# dictionaries to hold the split data for each key
attack_train_data = {} # 70% of orignal
attack_test_data   = {} # 30% of orignal(for valid)

# split each list of lists in data_dict
# key is representing the attack number
for key, data in sattack_lists.items():
    total_length = len(data)
    train_size = ceil(total_length * 0.7) #Breaking test data in 70 and 30
    test_size = total_length - train_size

    # split the data and store in the corresponding dictionaries
    attack_train_data[key] = data[:train_size]
    attack_test_data[key] = data[train_size:]


'\nattack_num=4\nprint("Train data:", len(attack_train_data[attack_num]))\nprint("Validation data:", len(attack_val_data[attack_num]))\nprint("Test data:", len(attack_test_data[attack_num]))\n'

In [None]:
import copy

T_DATLIST = []
T_LABLIST = []
ftrain_sequences = []
ftrain_labels = []
ftrain_sequences = copy.deepcopy(train_sequences)
ftrain_labels = train_labels[:]

# iterate over each key and its list of lists (merge attack data with train data)
for key, lists in attack_train_data.items():
    print(key)
    # extend merged_data with each sublist in the current key's list of lists
    T_DATLIST.extend(lists)
    # extend key_labels with the key repeated for each sublist
    T_LABLIST.extend([key] * len(lists))

ftrain_sequences.extend(T_DATLIST)
ftrain_labels.extend(T_LABLIST)

print("Merged Data:", len(ftrain_sequences))
print("Key Labels:",   len(ftrain_labels))
print(ftrain_labels)

0
1
2
3
4
5
6
7
8
9
10
11
12
Merged Data: 4239
Key Labels: 4239
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,

In [None]:
import copy

T_DATLIST = []
T_LABLIST = []
ftest_sequences = []
ftest_labels = []

# final test data created from validation data + 30% of attack data
lv_VAL_LEN = len(val_sequences)
lv_VAL_SIZ = ceil(lv_VAL_LEN * 0.5) #Breaking val data in 50 %
ftest_sequences = copy.deepcopy(val_sequences[:lv_VAL_SIZ])
ftest_labels    = val_labels[:lv_VAL_SIZ]

# iterate over each key and its list of lists
for key, lists in attack_test_data.items():
    print(key)
    # extend merged_data with each sublist in the current key's list of lists
    T_DATLIST.extend(lists)
    # extend key_labels with the key repeated for each sublist
    T_LABLIST.extend([key] * len(lists))

ftest_sequences.extend(T_DATLIST)
ftest_labels.extend(T_LABLIST)

print("Merged Data:", len(ftest_sequences))
print("Key Labels:",   len(ftest_labels))
print(ftest_labels)

0
1
2
3
4
5
6
7
8
9
10
11
12
Merged Data: 2572
Key Labels: 2572
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,

In [None]:
from gensim.models import Word2Vec
model=Word2Vec(ftrain_sequences, vector_size=100, window=5, min_count=1)

In [None]:
word2vec_dict = model.wv.key_to_index

def ngram_to_indices(ngram, word2vec_dict):
    all_vectors = [vec for vec in word2vec_dict.values()]
    avg_vector = np.mean(all_vectors, axis=0) if all_vectors else np.zeros(word2vec_dict.vector_size)
    return [word2vec_dict.get(token, avg_vector) for token in ngram]

In [None]:
corpus_train_indices = [ngram_to_indices(sentence, word2vec_dict) for sentence in ftrain_sequences]
corpus_test_indices = [ngram_to_indices(sentence, word2vec_dict) for sentence in ftest_sequences]

In [None]:
import numpy as np
from keras.models import Sequential
from keras.layers import Embedding, Bidirectional, LSTM, Dense
from keras.preprocessing.sequence import pad_sequences

# embedding matrix: shape (num_tokens, embedding_size)
embedding_size = 100
num_tokens = len(word2vec_dict)
print(num_tokens)
embedding_matrix = np.zeros((num_tokens, embedding_size))

# populate the embedding matrix with Word2Vec embeddings
for word, idx in word2vec_dict.items():
    embedding_matrix[idx] = model.wv[word]

# pad sequences to ensure uniform input size
max_sequence_length = max(len(sentence) for sentence in corpus_train_indices)
print("Max_seq", max_sequence_length)
max_sequence_length = 3000

9443
Max_seq 1670683


In [None]:
X_train = pad_sequences(corpus_train_indices, maxlen=max_sequence_length, value=num_tokens-1, padding="post", truncating='post')
X_test = pad_sequences(corpus_test_indices, maxlen=max_sequence_length, value=num_tokens-1, padding="post", truncating='post')

In [None]:
from keras.layers import Embedding, Masking, Bidirectional, LSTM, Dense
from keras.models import Sequential
from keras.optimizers import Adam

In [None]:
model = Sequential()

model.add(Masking(mask_value=num_tokens-1, input_shape=(max_sequence_length,)))
model.add(Embedding(input_dim=num_tokens,
                        output_dim=embedding_size,
                        weights=[embedding_matrix],
                        input_length=max_sequence_length,
                        trainable=True,
))

model.add(Bidirectional(LSTM(128, return_sequences=False)))
model.add(Dense(64, activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dense(13, activation='softmax'))

In [None]:
model.summary()

Model: "sequential_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 masking_2 (Masking)         (None, 2000)              0         
                                                                 
 embedding_2 (Embedding)     (None, 2000, 100)         944300    
                                                                 
 bidirectional_2 (Bidirecti  (None, 200)               160800    
 onal)                                                           
                                                                 
 dense_6 (Dense)             (None, 100)               20100     
                                                                 
 dense_7 (Dense)             (None, 50)                5050      
                                                                 
 dense_8 (Dense)             (None, 13)                663       
                                                      

In [None]:
model.compile(optimizer=Adam(learning_rate=0.001),loss='categorical_crossentropy',metrics=['accuracy'])


In [None]:
y_train = to_categorical(ftrain_labels)
y_test  = to_categorical(ftest_labels)

X_train = np.array(X_train)
X_test = np.array(X_test)


print(len(X_train))
print(len(y_train))
print(len(X_test))
print(len(y_test))
print(X_test[0])
print(X_train.min(), X_train.max())

In [None]:
# Train the model
history = model.fit(
    X_train,
    y_train,
    epochs=300,
    batch_size=16,
    verbose=0
)

In [None]:
def plot_training_history(history):
    accuracy = history.history['accuracy']
    loss = history.history['loss']

    epochs = range(1, len(accuracy) + 1)
    plt.figure(figsize=(12, 5))

    plt.subplot(1, 2, 1)
    plt.plot(epochs, accuracy, 'b', label='Accuracy')
    plt.title('Training Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(epochs, loss, 'g', label='Training Loss')
    plt.title('Training Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.show()

plot_training_history(history)


In [2]:
loss, test_accuracy = model.evaluate(x_test, y_test)
print("Test accuracy :", test_accuracy)

Test Accuracy: 51.91%
