## In this notebook we attempt to train a ResNet structure
 - Inputs : Time sequences with 1024 samples
 - Label: 23 possible modulations (M_FSK, LFM, M_PSK, ...)

More information on the residual stack from ResNet [here](https://towardsdatascience.com/intuition-behind-residual-neural-networks-fa5d2996b2c7)

In [None]:
import h5py
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, optimizers
from tensorflow.keras.layers import Input, Dense, Activation, ZeroPadding2D, BatchNormalization, Flatten, Add, Conv1D,Convolution2D, Bidirectional, LSTM, GRU, AlphaDropout, MaxPooling1D
from tensorflow.keras.layers import MaxPooling2D, Dropout
from tensorflow.keras.models import Model
from sklearn.metrics import confusion_matrix
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
import sklearn, json
import scipy.io as io
from typing import Any, Dict

In [15]:
from includes.clr_callback import *
import matplotlib.pyplot as plt

In [2]:
path = '/Users/mariavictoriadacostarivas/Documents/GMR/'
dataset_path = path + 'Dataset/'

In [3]:
with h5py.File(dataset_path +'X_train.mat', 'r') as f:
    X_train = np.array(f['X_train']).T
print(X_train.shape)
with h5py.File(dataset_path +'X_test.mat', 'r') as f:
    X_test = np.array(f['X_test']).T

(126500, 1024, 2)


In [4]:
lbl_train = io.loadmat(dataset_path + 'lbl_train.mat')['lbl_train']
lbl_test = io.loadmat(dataset_path + 'lbl_test.mat')['lbl_test']
print(lbl_test.shape)
print(lbl_train.shape)
Y_train = io.loadmat(dataset_path + 'Y_train.mat')
Y_train = Y_train['Y_train']
Y_test = io.loadmat(dataset_path + 'Y_test.mat')
Y_test = Y_test['Y_test']

(25300, 6)
(126500, 6)


In [5]:
classes = ['LFM', '2FSK', '4FSK', '8FSK', 'Costas', '2PSK', '4PSK', '8PSK', 'Barker', 'Huffman', 'Frank', 'P1', 'P2',
           'P3', 'P4', 'Px', 'Zadoff-Chu', 'T1', 'T2', 'T3', 'T4', 'NM', 'ruido']
I_x = X_train[:, :, 0]
Q_x = X_train[:, :, 1]
X_train[:, :, 1] = np.arctan(Q_x, I_x) / np.pi
X_train[:, :, 0] = np.abs(I_x + 1j * Q_x)
I_t = X_test[:, :, 0]
Q_t = X_test[:, :, 1]
X_test[:, :, 0] = np.arctan(Q_t, I_t) / np.pi
X_test[:, :, 1] = np.abs(I_t + 1j * Q_t)

In [6]:
np.random.seed(2022)
X_train, Y_train, lbl_train = sklearn.utils.shuffle(X_train[:], Y_train[:], lbl_train[:], random_state=2022)
X_test, Y_test, lbl_test = sklearn.utils.shuffle(X_test[:], Y_test[:], lbl_test[:], random_state=2022)

### We have to create the structure of a residual stack

In [7]:
def res_stack(x, f):
    """
    x : input vector
    f : output shape
    """
    if x.shape[1] != f:
        x = Conv1D(f, 1, strides=1, padding='same', data_format='channels_last')(x)
        x = BatchNormalization()(x)
        x = Activation('relu')(x)
    x_shortcut = x
    x = Conv1D(f, 5, strides=1, padding="same", data_format='channels_last')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = Conv1D(f, 5, strides=1, padding="same", data_format='channels_last')(x)
    x = BatchNormalization()(x)
    """Now we add the skip connection"""
    if x.shape[1] == x_shortcut.shape[1]:
        x = Add()([x, x_shortcut])
    else:
        raise Exception("Skip function not implemented")
    """After the addition the relu function is applied"""
    x = Activation('relu')(x)
    x = MaxPooling1D(pool_size=2, strides=None, padding='valid', data_format='channels_last')(x)
    return x

In [12]:
def ResNet(input_shape):
    X_input = Input(input_shape)
    x = res_stack(X_input, 32)
    x = res_stack(x, 32)
    x = res_stack(x, 32)
    x = res_stack(x, 32)
    x = res_stack(x, 32)
    x = res_stack(x, 32)
    x = Flatten()(x)
    x = Dense(128, activation='selu')(x)
    x = AlphaDropout(0.6)(x)
    x = Dense(128, activation='selu')(x)
    x = AlphaDropout(0.6)(x)
    x = Dense(128, activation='softmax')(x)
    # Create model. This creates your Keras model instance, you'll use this instance to train/test the model.
    model = Model(inputs = X_input, outputs = x)
    model.summary()
    return model

In [13]:
model = ResNet(X_train.shape[1:])

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_2 (InputLayer)            [(None, 1024, 2)]    0                                            
__________________________________________________________________________________________________
conv1d (Conv1D)                 (None, 1024, 32)     96          input_2[0][0]                    
__________________________________________________________________________________________________
batch_normalization (BatchNorma (None, 1024, 32)     128         conv1d[0][0]                     
__________________________________________________________________________________________________
activation (Activation)         (None, 1024, 32)     0           batch_normalization[0][0]        
______________________________________________________________________________________________

In [16]:
output_path = path + 'Results/models'
clr_triangular = CyclicLR(mode='triangular', base_lr=1e-7, max_lr=1e-3, step_size= 4 * (X_train.shape[0] // 1200))
c=[clr_triangular,ModelCheckpoint(filepath= output_path +'best_model.h5', monitor='val_loss', save_best_only=True)]
model.compile(optimizer=optimizers.Adam(1e-5), loss='categorical_crossentropy', metrics=['accuracy'])

In [None]:
Train = True
if Train:
    history = model.fit(X_train, Y_train, epochs = 200, batch_size = 150, callbacks = c, validation_data=(X_test, Y_test))
    with open(output_path +'history_rnn.json', 'w') as f:
        json.dump(history.history, f)
    model_json = model.to_json()
    with open(output_path +'model_rnn.json', "w") as json_file:
        json_file.write(model_json)
else:
    model.load_weights(output_path +'best_model.h5')
    with open(output_path +'history_rnn.json', 'r') as f:
        history = json.load(f)

In [None]:
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['test', 'val'])
plt.show()
plt.savefig(path+ '\graphs\model_loss.pdf')

In [None]:
def getConfusionMatrixPlot(true_labels, predicted_labels,title):
    # Compute confusion matrix
    cm = confusion_matrix(true_labels, predicted_labels)
    cm_norm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
    cm_norm = np.nan_to_num(cm_norm)
    cm = np.round(cm_norm,2)
    print(cm)

    # create figure
    width = 18
    height = width / 1.618
    fig = plt.figure(figsize=(width, height))
    plt.clf()
    ax = fig.add_subplot(111)
    ax.set_aspect(1)
    ax.set_xlabel('Predicted label')
    ax.set_ylabel('True label')
    res = ax.imshow(cm, cmap=plt.cm.binary,
                    interpolation='nearest', vmin=0, vmax=1)

    # add color bar
    plt.colorbar(res)

    # annotate confusion entries
    width = len(cm)
    height = len(cm[0])

    for x in range(width):
        for y in range(height):
            ax.annotate(str(cm[x][y]), xy=(y, x), horizontalalignment='center',
                        verticalalignment='center', color=getFontColor(cm[x][y]))

    # add genres as ticks
    alphabet = classes
    plt.xticks(range(width), alphabet[:width], rotation=30)
    plt.yticks(range(height), alphabet[:height])
    plt.title(title)
    return plt