In [None]:
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import h5py 
import matlab_helpers as mh 
# from sklearn.model_selection import test_test_split
from sklearn.preprocessing import StandardScaler
from keras.utils import to_categorical
import copy 
from scipy.io import savemat

## Load dataset 
Steps are 
1. Load training data 
2. To use the parallel training, need to use the same length

In [None]:
root_out_dir = '../Datasets/sgRNN_vIHC/PyData/'
train_dataset = np.load(root_out_dir + 'sgRNN_data_train_file.npz', allow_pickle=True)
print(train_dataset.files)

X_train = train_dataset["data_sgRNN_x"][:] # your train set features
y_train = to_categorical(np.ravel(np.array(train_dataset["data_label_y"][:]))) # your train set labels

# Preprocess to make the data the same length 
train_len_pre = np.array([item.shape[1] for item in X_train])
min_val = np.array([np.min(np.ravel(item)) for item in X_train])
max_val = np.array([np.max(np.ravel(item)) for item in X_train])

max_len = train_len_pre.max()
X_train = [np.concatenate([item, -(2**7)*np.ones((item.shape[0], max_len-item.shape[1]))],axis=1) for item in X_train]
train_len_post = np.array([item.shape[1] for item in X_train])
X_train = np.asarray(X_train, dtype=np.int8)
X_train = np.transpose(X_train, (0, 2, 1))

print(X_train.shape)

# If we want to train using clean calls only, then take the last 1203 points from the training dataset
X_train_clean = X_train[-1203:,:,:]
y_train_clean = y_train[-1203:,:]

test_dataset = np.load(root_out_dir + 'sgRNN_data_test_snrInf.npz', allow_pickle=True)
print(test_dataset)
X_test = test_dataset["data_sgRNN_x"][:] # your train set features
X_test = [np.concatenate([item, -(2**7)*np.ones((item.shape[0], max_len-item.shape[1]))],axis=1) for item in X_test]
X_test = np.asarray(X_test, dtype=np.int8)
X_test = np.transpose(X_test, (0, 2, 1))
y_test = to_categorical(np.ravel(np.array(test_dataset["data_label_y"][:]))) # your train set labels

print(f"Type: X_train={type(X_train)}, y_train={type(y_train)}")
print(f"Shapes: X_train={X_train.shape}, y_train={y_train.shape},X_test={X_test.shape}, y_test={y_test.shape}")

unq_vals, unq_counts = np.unique(y_train, return_counts=True)
print(dict(zip(unq_vals,unq_counts)))


## Create an RNN to fit using noisy data 

In [None]:
from tensorflow.keras.layers import LSTM, Dense, TimeDistributed, Bidirectional
tf.random.set_seed(1234)  # applied to achieve consistent results

RNNmodel_noisy = tf.keras.models.Sequential()
RNNmodel_noisy.add(Bidirectional(LSTM(20, return_sequences=True), input_shape=(73, 67)))
RNNmodel_noisy.add(Bidirectional(LSTM(20, return_sequences=False)))
RNNmodel_noisy.add(Dense(5, activation='softmax'))

RNNmodel_noisy.compile(optimizer="adam", loss='categorical_crossentropy', metrics=['accuracy'])
RNNmodel_noisy.summary()

train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train)).batch(32)
test_dataset = tf.data.Dataset.from_tensor_slices((X_test, y_test)).batch(32)
history_noisy = RNNmodel_noisy.fit(train_dataset, epochs=100, validation_data=test_dataset)

## Create an RNN to fit using clean data 

In [None]:
from tensorflow.keras.layers import LSTM, Dense, TimeDistributed, Bidirectional
tf.random.set_seed(1234)  # applied to achieve consistent results

RNNmodel_clean = tf.keras.models.Sequential()
RNNmodel_clean.add(Bidirectional(LSTM(20, return_sequences=True), input_shape=(73, 67)))
RNNmodel_clean.add(Bidirectional(LSTM(20, return_sequences=False)))
RNNmodel_clean.add(Dense(5, activation='softmax'))

RNNmodel_clean.compile(optimizer="adam", loss='categorical_crossentropy', metrics=['accuracy'])
RNNmodel_clean.summary()

train_dataset_clean = tf.data.Dataset.from_tensor_slices((X_train_clean, y_train_clean)).batch(32)
history_clean = RNNmodel_clean.fit(train_dataset_clean, epochs=100, validation_data=test_dataset)

Check output of NN model 

In [None]:
y_train_pred_NN = np.argmax(RNNmodel_noisy.predict(X_train), axis=1)
y_test_pred_NN = np.argmax(RNNmodel_noisy.predict(X_test), axis=1)
y_train_numeric= np.argmax(y_train, axis=1)
y_test_numeric= np.argmax(y_test, axis=1)

print(f"Unique values in y_train_pred_NN={np.unique(y_train_pred_NN)}")
print(f"Unique values in y_test_pred_NN={np.unique(y_test_pred_NN)}")

fig, ax = plt.subplots(2,1,figsize=(4,4))
ax[0].plot(y_train_numeric)
ax[1].plot(y_train_numeric-y_train_pred_NN, color = "orangered")
ax[1].set_xlabel("Call numbers")

print(f"Training accuracy={np.sum(y_train_numeric==y_train_pred_NN)/len(y_train)}\n Testing accuracy={np.sum(y_test_numeric==y_test_pred_NN)/len(y_test_numeric)}\n")

Test in different SNRs 

In [None]:
all_snrs= np.arange(-20.0, 11, 5)
all_snrs= np.append(all_snrs, np.inf)

accuracy_vs_snr_noisyRNN = np.zeros((all_snrs.shape))
accuracy_vs_snr_cleanRNN = np.zeros((all_snrs.shape))

for snr_value,iter in zip(all_snrs,range(all_snrs.shape[0])):
    if np.isinf(snr_value):
        out_sgRNN_data_test_file= root_out_dir + 'sgRNN_data_test_snrInf.npz'
    else: 
        out_sgRNN_data_test_file= root_out_dir + 'sgRNN_data_test_snr' + str(np.int_(snr_value)) + '.npz'
    print(out_sgRNN_data_test_file)

    test_dataset = np.load(out_sgRNN_data_test_file, allow_pickle=True)
    print(test_dataset)
    X_test = test_dataset["data_sgRNN_x"][:] # your train set features
    X_test = [np.concatenate([item, -(2**7)*np.ones((item.shape[0], max_len-item.shape[1]))],axis=1) for item in X_test]
    X_test = np.asarray(X_test, dtype=np.int8)
    X_test = np.transpose(X_test, (0, 2, 1))
    y_test = np.ravel(np.array(test_dataset["data_label_y"][:])) # your train set labels
    
    y_test_pred_NN_noisy = np.argmax(RNNmodel_noisy.predict(X_test), axis=1)
    y_test_pred_NN_clean = np.argmax(RNNmodel_clean.predict(X_test), axis=1)
    
    accuracy_vs_snr_noisyRNN[iter] = np.sum(y_test==y_test_pred_NN_noisy)/len(y_test)
    accuracy_vs_snr_cleanRNN[iter] = np.sum(y_test==y_test_pred_NN_clean)/len(y_test)
    print(f"Testing accuracy: Noisy = {accuracy_vs_snr_noisyRNN[iter]}, clean = {y_test_pred_NN_clean[iter]}\n")

plot_snr= copy.deepcopy(all_snrs)
plot_snr[np.isinf(plot_snr)] = 15
plt.plot(plot_snr, accuracy_vs_snr_noisyRNN, label="Noise-trained")
plt.plot(plot_snr, accuracy_vs_snr_cleanRNN, label="Clean-trained")
plt.xlabel("SNR (dB)")
plt.ylabel("Accuracy")
plt.title("RNN/LSTM")
plt.legend()

out_sgRNN_classify_file = root_out_dir + 'sgRNN_classify_out'
np.savez(out_sgRNN_classify_file + '.npz', all_snrs=all_snrs, accuracy_vs_snr_noisyRNN=accuracy_vs_snr_noisyRNN, accuracy_vs_snr_cleanRNN=accuracy_vs_snr_cleanRNN)
mat_dict= {"all_snrs":all_snrs, "accuracy_vs_snr_noisyRNN":accuracy_vs_snr_noisyRNN, "accuracy_vs_snr_cleanRNN":accuracy_vs_snr_cleanRNN}
savemat(out_sgRNN_classify_file + '.mat', mat_dict)