In [10]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder
import h5py
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
import tensorflow as tf
from keras.layers import Input, Conv1D, MaxPooling1D, Flatten, Dense, concatenate, BatchNormalization,LSTM,Dropout,Reshape
from keras.models import Model, Sequential
from keras.regularizers import l2

In [2]:
# set seed for random state - group #
seed = 23

In [11]:
def load_data(file_name, csv_file):

    # read the csv file with labels
    df = pd.read_csv(csv_file)

    # get the number of samples
    num_samples = min(df.shape[0], 20000)

    # get the labels and encode them
    y = LabelEncoder().fit_transform(df["trace_category"])[:num_samples]
    # y = y.reshape(len(y),1)

    # read data into numpy arrays
    data = []
    dtfl = h5py.File(file_name, 'r')
    for i in range(num_samples):
        dataset = dtfl.get("STEAD_data/{}".format(i))
        data.append(np.array(dataset))
    X = np.array(data)
    
    # train test split
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=seed)

    # no preprocessing required, so return the splits
    return X_train, X_test, y_train, y_test

In [12]:
# dataset file name here - hdf5 file
file_name = r"C:\Users\krish\OneDrive\Documents\spring23\DM\CSE572-Project\CSE572-Project\STEAD_data.hdf5"

# name of the csv file with labels 
csv_file = r"C:\Users\krish\OneDrive\Documents\spring23\DM\CSE572-Project\CSE572-Project\STEAD_labels.csv"

# load the data
X_train, X_test, y_train, y_test = load_data(file_name=file_name, csv_file=csv_file)

In [5]:
print(X_train.shape)
print(y_train.shape)

(13042, 6000, 3)
(13042,)


In [13]:
callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

In [None]:
# # reshape data for nn
# X_train_cnn = X_train.reshape(X_train.shape[0], X_train.shape[1], 1)
# X_test_cnn = X_test.reshape(X_test.shape[0], X_test.shape[1], 1)

In [14]:
import tensorflow as tf
from tensorflow.keras import layers, regularizers

def resnet_block(x, filters, conv_num=2, activation='relu'):
    # Shortcut path
    shortcut = x
    shortcut = layers.Conv1D(filters, 1, padding='same')(shortcut)

    # Residual path
    x = layers.Conv1D(filters, 3, padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation(activation)(x)

    for i in range(conv_num - 1):
        x = layers.Conv1D(filters, 3, padding='same')(x)
        x = layers.BatchNormalization()(x)
        x = layers.Activation(activation)(x)

    x = layers.Add()([shortcut, x])
    x = layers.Activation(activation)(x)
    
    return x

def build_resnet(input_shape, num_classes):
    inputs = layers.Input(shape=input_shape)

    x = layers.Conv1D(32, 3, padding='same', kernel_regularizer=regularizers.L2(0.01))(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)

    x = layers.MaxPooling1D(2, padding='same')(x)

    x = resnet_block(x, 32, conv_num=2)
    x = resnet_block(x, 32, conv_num=2)

    x = layers.GlobalAveragePooling1D()(x)

    # Append LSTM layers with dropout and kernel regularization
    x = layers.Reshape((-1, 1))(x)
    x = layers.LSTM(32, return_sequences=True, kernel_regularizer=regularizers.L2(0.01))(x)
    x = layers.Dropout(0.2)(x)
    x = layers.LSTM(16, kernel_regularizer=regularizers.L2(0.01))(x)
    x = layers.Dropout(0.2)(x)

    # Dense layers with kernel regularization
    x = layers.Dense(8, activation='relu', kernel_regularizer=regularizers.L2(0.01))(x)
    x = layers.Dense(1, activation='sigmoid', kernel_regularizer=regularizers.L2(0.01))(x)

    outputs = x

    model = tf.keras.Model(inputs=inputs, outputs=outputs)

    return model

model = build_resnet((6000,3),2)
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])

model.summary()

Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_2 (InputLayer)           [(None, 6000, 3)]    0           []                               
                                                                                                  
 conv1d_7 (Conv1D)              (None, 6000, 32)     320         ['input_2[0][0]']                
                                                                                                  
 batch_normalization_5 (BatchNo  (None, 6000, 32)    128         ['conv1d_7[0][0]']               
 rmalization)                                                                                     
                                                                                                  
 activation_7 (Activation)      (None, 6000, 32)     0           ['batch_normalization_5[0][

In [15]:
# Add early stopping callback
early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=3)

history = model.fit(X_train, y_train, epochs=50, batch_size=128, validation_split=0.2, callbacks=[callback])

Epoch 1/50


KeyboardInterrupt: 

In [None]:
y_pred_cnn = list(map(lambda y: round(y), cnn.predict(X_test).flatten()))



In [None]:
print("Classification report for CNN: ")
print(classification_report(y_test, y_pred_cnn))

Classification report for NN: 
              precision    recall  f1-score   support

           0       0.99      0.08      0.14      2786
           1       0.52      1.00      0.69      2804

    accuracy                           0.54      5590
   macro avg       0.76      0.54      0.41      5590
weighted avg       0.76      0.54      0.42      5590

