## Mounting drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## 1. Loading data

In [None]:
import pickle
import numpy as np
import random

In [None]:
with open('/content/drive/MyDrive/Colab Notebooks/Data/gnu_data.pkl', 'rb') as file:
    Xd = pickle.load(file,encoding='latin1')
# open a file, where you stored the pickled data


FileNotFoundError: ignored

In [None]:
Xd['X']
Xd['lbl']

X     = []
modul = []
snr   = []

for i in range(len(Xd['X'])):
    x_ = Xd['X'][i]
    x_ = np.expand_dims(x_.transpose([1,0]),axis=2)
    X.append(x_)
    
    lbl = Xd['lbl'][i]
    modul.append(lbl[0])
    snr.append(int(lbl[1]))

### 1.1. Train/Test Split

In [None]:
train_test_split = 0.8

random.seed(100)
random.shuffle(X)

random.seed(100)
random.shuffle(modul)

random.seed(100)
random.shuffle(snr)


X_train = np.array(X[:int(train_test_split*len(X))])
X_test  = np.array(X[int(train_test_split*len(X)):])

modul_train = modul[:int(train_test_split*len(X))]
modul_test  = modul[int(train_test_split*len(X)):]

snr_train = snr[:int(train_test_split*len(X))]
snr_test  = snr[int(train_test_split*len(X)):]

### 1.2. Generating One-hot encoded

In [None]:
mods = list(set(modul_train))
print(len(mods))

def to_onehot(yy):
    yy1 = np.zeros([1, len(mods)])
    yy1[0,mods.index(yy)] = 1
    return yy1

Y_train = np.zeros([len(modul_train),len(mods)])
Y_test = np.zeros([len(modul_test),len(mods)])

for i in range(len(modul_train)):
  Y_train[i,:] = to_onehot(modul_train[i])

for i in range(len(modul_test)):
  Y_test[i,:] = to_onehot(modul_test[i]) 

## 2. Train Deep Neural Networks

### 2.1. Train Auto-Encoder (AE)

In [None]:
import keras

# This is our input image
input_data     = keras.Input(shape=(128,2,1,))

x = keras.layers.Conv2D(8, (3, 3), activation='tanh', padding='same')(input_data)
x = keras.layers.MaxPooling2D((2, 2), padding='same')(x)
x = keras.layers.Conv2D(16, (3, 3), activation='tanh', padding='same')(x)
x = keras.layers.MaxPooling2D((2, 2), padding='same')(x)
x = keras.layers.Conv2D(32, (3, 3), activation='tanh', padding='same')(x)
encoded = keras.layers.MaxPooling2D((2, 2), padding='same', name='encoding')(x)

x = keras.layers.Conv2D(32, (3, 3), activation='tanh', padding='same')(encoded)
x = keras.layers.UpSampling2D((2, 2))(x)
x = keras.layers.Conv2D(16, (3, 3), activation='tanh', padding='same')(x)
x = keras.layers.UpSampling2D((2, 1))(x)
x = keras.layers.Conv2D(8, (3, 3), activation='tanh', padding='same')(x)
x = keras.layers.UpSampling2D((2, 1))(x)
decoded = keras.layers.Conv2D(1, (3, 3), activation='tanh', padding='same')(x)

# This model maps an input to its reconstruction
autoencoder = keras.Model(input_data, decoded)
autoencoder.compile(loss='mse', optimizer='adam')
autoencoder.summary()

In [None]:
# Set up some params 
nb_epoch = 1000     # number of epochs to train on
batch_size = 32  # training batch size

filepath = '/content/drive/My Drive/upwork/amadou/AE.h5'
autoencoder.fit(X_train,
    X_train,
    batch_size=batch_size,
    epochs=nb_epoch,
    verbose=1,
    validation_data=(X_test,X_test),
    callbacks = [
        keras.callbacks.ModelCheckpoint(filepath, monitor='val_loss', verbose=0, save_best_only=True, mode='auto'),
        keras.callbacks.EarlyStopping(monitor='val_loss', patience=5, verbose=0, mode='auto')]
    )
# we re-load the best weights once training is finished
autoencoder.load_weights(filepath)

### 2.2. Define Encoder model (trained)

In [None]:
encoder_model = keras.Model(inputs=autoencoder.input,
                                 outputs=autoencoder.get_layer('encoding').output)

### 2.3. Calculate Encoded data (train & test) using trained Encoder Model

In [None]:
encoded_train = encoder_model.predict(X_train, verbose=1)
encoded_test  = encoder_model.predict(X_test, verbose=1)


In [None]:
encoded_train = encoded_train.reshape(encoded_train.shape[0],-1)
encoded_test  = encoded_test.reshape(encoded_test.shape[0],-1)

In [None]:
encoded_train.shape

### 2.4. Train MLP Classifier

In [None]:
model = keras.models.Sequential()
model.add(keras.layers.Dense(256, activation='relu', name="dense1", input_shape=(512,)))
model.add(keras.layers.Dropout(0.1))
model.add(keras.layers.Dense(256, activation='relu', name="dense2"))
model.add(keras.layers.Dropout(0.1))
model.add(keras.layers.Dense( 11, name="output" ))
model.add(keras.layers.Activation('softmax'))
model.compile(loss='categorical_crossentropy', optimizer='adam',metrics='acc')
model.summary()

In [None]:
filepath = '/content/drive/My Drive/upwork/amadou/AE_MLP.h5'

model.fit(encoded_train,
    Y_train,
    batch_size=batch_size,
    epochs=nb_epoch,
    verbose=1,
    validation_data=(encoded_test,Y_test),
    callbacks = [
        keras.callbacks.ModelCheckpoint(filepath, monitor='val_loss', verbose=0, save_best_only=True, mode='auto'),
        keras.callbacks.EarlyStopping(monitor='val_loss', patience=5, verbose=0, mode='auto')]
    )
# we re-load the best weights once training is finished
model.load_weights(filepath)

## 3. Evaluating the trained classifier

In [None]:
snrs= list(set(snr))
snrs.sort()

import matplotlib.pyplot as plt
def plot_confusion_matrix(cm, title='Confusion matrix', cmap=plt.cm.Blues, labels=[]):
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(labels))
    plt.xticks(tick_marks, labels, rotation=45)
    plt.yticks(tick_marks, labels)
    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

In [None]:

# Plot confusion matrix
acc = {}
for snr_value in snrs:

    # extract classes @ SNR
    test_X_i = X_test[np.where(np.array(snr_test)==snr_value)]
    test_Y_i = Y_test[np.where(np.array(snr_test)==snr_value)]    

    # estimate classes
    test_encoded = encoder_model.predict(test_X_i)
    test_encoded = test_encoded.reshape(test_encoded.shape[0],-1)
    test_Y_i_hat = model.predict(test_encoded)
    conf = np.zeros([len(mods),len(mods)])
    confnorm = np.zeros([len(mods),len(mods)])
    for i in range(0,test_X_i.shape[0]):
        j = list(test_Y_i[i,:]).index(1)
        k = int(np.argmax(test_Y_i_hat[i,:]))
        conf[j,k] = conf[j,k] + 1
    for i in range(0,len(mods)):
        confnorm[i,:] = conf[i,:] / np.sum(conf[i,:])
    plt.figure()
    plot_confusion_matrix(confnorm, labels=mods, title="AE Confusion Matrix (SNR=%d)"%(int(snr_value)))
    
    cor = np.sum(np.diag(conf))
    ncor = np.sum(conf) - cor
    print ("Overall Accuracy: ", cor / (cor+ncor))
    acc[snr_value] = 1.0*cor/(cor+ncor)

In [None]:
acc_ = [acc[snr_value] for snr_value in snrs]

# Plot accuracy curve
plt.plot(snrs, acc_)
plt.xlabel("Signal to Noise Ratio")
plt.ylabel("Classification Accuracy")
plt.title("AE Classification Accuracy on RadioML 2016.10 Alpha")

## Display data

In [None]:
import pandas as pd
print('real')
pd.DataFrame(X_train[:,:,0,0]).head()

In [None]:
print('imaginary')
pd.DataFrame(X_train[:,:,1,0]).head()