In [None]:
# This Paper is directly implemented from: 
#T. J. O’Shea, J. Corgan, and T. C. Clancy, “Convolutional radio modulation recognition networks,” in International
#conference on engineering applications of neural networks. Springer, 2016, pp. 213–226.

import tarfile
import pickle
import random
import gc

from tensorflow import keras
import tensorflow as tf
import scipy
import numpy as np
import pandas as pd
from pandas import DataFrame as df
import matplotlib.pyplot as plt
import seaborn as sb

In [None]:
from keras.utils import np_utils, plot_model
from keras.layers.convolutional import Conv2D, MaxPooling2D, SeparableConv2D
from keras.layers import UpSampling2D, GlobalAveragePooling2D, AveragePooling2D, BatchNormalization
from keras.layers import Input, LSTM, Dense, Concatenate,add
from keras.layers.core import Reshape, Flatten, Dropout, Activation
from keras.callbacks import EarlyStopping
from keras.models import Sequential, load_model, Model
import keras.models as models
from keras import metrics
from keras.regularizers import *
from keras.optimizers import Adam
from keras.layers.noise import GaussianNoise

from sklearn.preprocessing import LabelBinarizer as LB
from sklearn.preprocessing import normalize

In [None]:
data = open("RML2016.10b.dat",'rb') 
dt = pickle.load(data, encoding = 'bytes') 
snrs, mods = map(lambda j: sorted(list(set(map(lambda x: x[j], dt.keys())))), [1,0])
X = [] 
lbl = []
for mod in mods:
    for snr in snrs:
        X.append(dt[(mod,snr)])
        for i in range(dt[(mod,snr)].shape[0]):  lbl.append((mod,snr))
X = np.vstack(X)

In [None]:
# the data is split into training set, Validation set and test set
#  randomly initializing the seed
np.random.seed(2021)
n_example = X.shape[0]
# 80 % of input data was chosen for training
n_train = n_example * 0.80
# randomly choosing the input training signals
train_idx = np.random.choice(range(0, n_example), size=int(n_train), replace=False)
# test set
test_idx = list(set(range(0, n_example)) - set(train_idx))
X_train = X[train_idx]
X_test = X[test_idx]

# Function for Hot Encoding the vectors
def onehot(vec):
    vec_hot = np.zeros([len(vec), max(vec) + 1])
    vec_hot[np.arange(len(vec)), vec] = 1
    return vec_hot

Y_train = onehot(list(map(lambda x: mods.index(lbl[x][0]), train_idx)))
Y_test = onehot(list(map(lambda x: mods.index(lbl[x][0]), test_idx)))
labels = np.array(lbl)
snr_train = labels[train_idx][:,1].astype(int)
snr_test = labels[test_idx][:,1].astype(int)
in_shp = list(X_train.shape[1:])
classes = mods
print(X_train.shape,Y_train.shape, in_shp)
print('classes:', mods)

In [None]:
#BASE CNN MODEL
dr = 0.6
model = models.Sequential()
inp = Input(shape=(1,2, 128)) 

l1=(Conv2D(64, (1, 3), padding='same', activation="relu", name="conv1", kernel_initializer='glorot_uniform',
                 data_format="channels_first"))(inp)
l1=(Dropout(dr))(l1)
l2=(Conv2D(16, (2, 3), padding='same', activation="relu", name="conv2", kernel_initializer='glorot_uniform',
                 data_format="channels_first"))(l1)
l2=(Dropout(dr))(l2)
f = (Flatten())(l2)
d1=(Dense(128, activation='relu',  name="dense1"))(f)
d2=(Dense(len(classes),  name="dense2"))(d1)
act=(Activation('softmax'))(d2)
out=(Reshape([len(classes)]))(act)
model = models.Model(inputs=inp, outputs=out)


model.compile(loss='categorical_crossentropy', optimizer='Adam', metrics=['accuracy'])
model.summary()

X_train = np.reshape(X_train, (-1,1,2,128))
X_test = np.reshape(X_test, (-1,1,2,128))

In [None]:
history=model.fit(X_train, Y_train, epochs=100, validation_data=(X_test, Y_test), batch_size=1024, callbacks=[EarlyStopping(patience=35, restore_best_weights = True)])

In [None]:
model.save("cnn.h5")
print("Saved model to disk")

In [None]:
from sklearn.metrics import accuracy_score
def plot_results(predicted_labels, true_labels, snrs):
  sorted_snrs = np.sort(np.unique(snrs))
  x_axis = []
  y_axis = []
  for snr in sorted_snrs:
    idx = np.where(snrs == snr)
    #print('snr =', snr, '-->', accuracy_score(np.argmax(true_labels[idx], axis = 1), np.argmax(predicted_labels[idx], axis = 1)))
    x_axis.append(snr)
    y_axis.append(accuracy_score(np.argmax(true_labels[idx], axis = 1), np.argmax(predicted_labels[idx], axis = 1)))
  
  plt.xlabel('SNR (in dB)')
  plt.ylabel('Accuracy (%)')
  plt.title('Classification Accuracy over different SNRs for CNN Model')
  plt.plot(x_axis, np.array(y_axis) * 100, 'g*--')
  plt.grid(True)

def print_results(predicted_labels, true_labels, snrs):
  sorted_snrs = np.sort(np.unique(snrs))
  x_axis = []
  y_axis = []
  for snr in sorted_snrs:
    idx = np.where(snrs == snr)
    #print('snr =', snr, '-->', accuracy_score(np.argmax(true_labels[idx], axis = 1), np.argmax(predicted_labels[idx], axis = 1)))
    x_axis.append(snr)
    y_axis.append(accuracy_score(np.argmax(true_labels[idx], axis = 1), np.argmax(predicted_labels[idx], axis = 1)))
  return df(data = np.array(y_axis).reshape(1, -1) * 100,  columns = sorted_snrs, index = ['Accuracy (%)']).round(2)

In [None]:
cnn_model = keras.models.load_model('cnn.h5')
y_pred_cnn = cnn_model.predict(X_test)
plot_results(y_pred_cnn, Y_test, snr_test)
print_results(y_pred_cnn, Y_test, snr_test)

In [None]:
#RUN THIS CELL RIGHT AFTER TRAINING THE MODEL AS IT WILL NOT BE STORED IN AN .h5 FILE.
# Show loss curves
plt.figure()
plt.title('Training performance for CNN Model')
plt.plot(history.epoch, history.history['loss'], label='train loss+error')
plt.plot(history.epoch, history.history['val_loss'], label='val_error')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend()
plt.grid(True)

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns

y_pred=np.argmax(y_pred_cnn, axis=1)
y_test=np.argmax(Y_test, axis=1)
cm = confusion_matrix(y_test, y_pred)
# Normalise
cmn = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
fig, ax = plt.subplots(figsize=(10,5))
sns.heatmap(cmn, annot=True, cmap='Greens', fmt='.2f', xticklabels=classes, yticklabels=classes)
plt.title('Confusion Matrix for CNN Model')
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.show(block=True)

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import itertools

test_Y_hat = y_pred_cnn #model.predict(X_test, batch_size=1024)
conf = np.zeros([len(classes),len(classes)])
confnorm = np.zeros([len(classes),len(classes)])
for i in range(0,X_test.shape[0]):
    j = list(Y_test[i,:]).index(1)
    k = int(np.argmax(test_Y_hat[i,:]))
    conf[j,k] = conf[j,k] + 1
for i in range(0,len(classes)):
    confnorm[i,:] = conf[i,:] / np.sum(conf[i,:])
y_pred=np.argmax(y_pred_cnn, axis=1)
y_test=np.argmax(Y_test, axis=1)
cm = confusion_matrix(y_test,y_pred)
def plot_confusion_matrix(cm, classes, normalize=False, title='Confusion Matrix for CNN Model without Normalization', cmap=plt.cm.Greens):
    fig1 = plt.figure(figsize=(7, 7), dpi=65)
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion Matrix for CNN Model without Normalization')

         
    print(cm)

    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, cm[i, j],
            horizontalalignment="center",
            color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.title('Confusion Matrix for CNN Model without Normalization')
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
cm_plot_labels = ['8PSK', 'AM-DSB', 'BPSK', 'CPFSK', 'GFSK', 'PAM4', 'QAM16', 'QAM64', 'QPSK', 'WBFM']
plot_confusion_matrix(cm=cm, classes=cm_plot_labels, title='Confusion Matrix')

In [None]:
# FOR ROC CURVE
from sklearn.metrics import roc_curve, auc
from scipy import interp
from itertools import cycle
lw=2
n_classes=10

fpr = dict()
tpr = dict()
roc_auc = dict()
for i in range(n_classes):
    fpr[i], tpr[i], _ = roc_curve(Y_test[:, i], y_pred_cnn[:, i])  #####
    roc_auc[i] = auc(fpr[i], tpr[i])


# Compute micro-average ROC curve and ROC area
fpr["micro"], tpr["micro"], _ = roc_curve(Y_test.ravel(), y_pred_cnn.ravel()) ####
roc_auc["micro"] = auc(fpr["micro"], tpr["micro"])

all_fpr = np.unique(np.concatenate([fpr[i] for i in range(n_classes)]))

# Then interpolate all ROC curves at this points
mean_tpr = np.zeros_like(all_fpr)
for i in range(n_classes):
    mean_tpr += interp(all_fpr, fpr[i], tpr[i])

# Finally average it and compute AUC
mean_tpr /= n_classes

fpr["macro"] = all_fpr
tpr["macro"] = mean_tpr
roc_auc["macro"] = auc(fpr["macro"], tpr["macro"])

# Plot all ROC curves
plt.figure()
plt.plot(fpr["micro"], tpr["micro"],
         label='micro-average ROC curve (area = {0:0.2f})'
               ''.format(roc_auc["micro"]),
         color='deeppink', linestyle=':', linewidth=1)

plt.plot(fpr["macro"], tpr["macro"],
         label='macro-average ROC curve (area = {0:0.2f})'
               ''.format(roc_auc["macro"]),
         color='navy', linestyle=':', linewidth=1)

colors = cycle(['brown', 'darkorange', 'cornflowerblue', 'black', 'green', 'red', 'purple', 'magenta', 'cyan','yellow'])
for i, color in zip(range(n_classes), colors):
    plt.plot(fpr[i], tpr[i], color=color, lw=lw,
             label='ROC curve of class {0} (area = {1:0.2f})'
             ''.format(i, roc_auc[i]))

plt.plot([0, 1], [0, 1], 'k--', lw=lw)
#plt.xlim([0.0, 1.0])
#plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver operating characteristic (ROC) curve for CNN Model')
plt.legend(loc="lower right")
plt.show()

In [None]:
#TO PLOT CONFUSION MATRIX FOR EVERY SINGLE SNR BUT WITHOUT VALUES IN IT.
from sklearn.metrics import plot_confusion_matrix
from sklearn.metrics import confusion_matrix
def plot_confusion_matrix(cm, title='Confusion matrix', cmap=plt.cm.Greens, labels=[]):
    plt.figure()
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(labels))
    plt.xticks(tick_marks, labels, rotation=45)
    plt.yticks(tick_marks, labels)
    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    #plt.savefig(title)
    plt.show()


# Plot confusion matrix
test_Y_hat = y_pred_cnn

name = 'CNN Model'
acc = {}
for snr in snrs:

    # extract classes @ SNR
    test_SNRs = list(map(lambda x: lbl[x][1], test_idx))
    test_X_i = X_test[np.where(np.array(test_SNRs) == snr)]
    test_Y_i = Y_test[np.where(np.array(test_SNRs) == snr)]

    # estimate classes
    test_Y_i_hat = model.predict(test_X_i)


    pre_labels_i = []
    for x in test_Y_i_hat:
        tmp = np.argmax(x, 0)
        pre_labels_i.append(tmp)
    true_labels_i = []
    for x in test_Y_i:
        tmp = np.argmax(x, 0)
        true_labels_i.append(tmp)
    cnf_matrix = confusion_matrix(true_labels_i, pre_labels_i)
    # np.set_printoptions(precision=2)
    # Plot non-normalized confusion matrix
    # plt.figure()
    plot_confusion_matrix(cnf_matrix, labels=classes,
     #                            normalize=False,
                                 title='%s Confusion matrix, without normalization (SNR=%d)' % (name, snr))
    #plt.savefig('%s Confusion matrix, without normalization (SNR=%d)' % (name, snr))
    # Plot normalized confusion matrix
    # plt.figure()
    plot_confusion_matrix(cnf_matrix, labels=classes,
                       #          normalize=True,
                                 title='%s Normalized confusion matrix (SNR=%d)' % (name, snr))
   # plt.savefig('%s Normalized confusion matrix (SNR=%d)' % (name, snr))
    plt.show()


    conf = np.zeros([len(classes), len(classes)])
    confnorm = np.zeros([len(classes), len(classes)])
    for i in range(0, test_X_i.shape[0]):
        j = list(test_Y_i[i, :]).index(1)
        k = int(np.argmax(test_Y_i_hat[i, :]))
        conf[j, k] += 1
    for i in range(0, len(classes)):
        confnorm[i, :] = conf[i, :] / np.sum(conf[i, :])
    # plt.figure()
    plot_confusion_matrix(confnorm, labels=classes, title="%s Confusion Matrix (SNR=%d)" % (name, snr))

    cor = np.sum(np.diag(conf))
    ncor = np.sum(conf) - cor
    print ("Overall Accuracy: ", cor / (cor + ncor))
    acc[snr] = 1.0 * cor / (cor + ncor)