In [None]:
import numpy as np
import pickle
import os 
from keras.models import Sequential, Model
from keras.layers import Dense, Flatten, Conv2D, MaxPooling2D, Input, UpSampling2D, Dropout ,LSTM, merge
from keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
import pickle
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
from keras.layers import BatchNormalization
from tensorflow.keras.optimizers import Adam # - Works


## Data Preprocessing
### I use RADIOML 2016.10A Dataset
### https://www.deepsig.ai/datasets


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
path_data= '/content/drive/MyDrive/data.pkl'

In [None]:
def digitizer(labels):
    
    unique_labels = np.unique(labels)
    label_dict = {}
    num = 1
    for i in unique_labels:
        label_dict[i] = num
        num += 1 
    
    digit_label = []
    for i in labels:
        digit_label.append(label_dict[i])
    
    return label_dict,  digit_label 
             
def onehot_encoder(L_dict,  labels):
    num_classes = len(L_dict)
    vector = np.zeros(num_classes)
    vector[L_dict[labels]-1] = 1
    return vector

with open(path_data, "rb") as p:
    d = pickle.load(p, encoding='latin1')
classes = []    
for i in d.keys():    
    if i[0] not in classes:
        classes.append(i[0])
# creating class dictionary for strings to digits transformation.
label_dict,  digit_label = digitizer(classes)

SNRs = {}
for key in d.keys():
    if key not in SNRs:
        SNRs[key[1]] = []
SNRs.keys()

j = 0
for keys in d.keys():
    for arrays in d[keys]:
        # convert labels to one-hot encoders.
        SNRs[keys[1]].append([onehot_encoder(label_dict, keys[0]),np.array(arrays)]) 

outfile = open('dataset','wb')
pickle.dump(SNRs,outfile)
outfile.close()


outfile = open('class_dict','wb')
pickle.dump(label_dict,outfile)
outfile.close()

if not os.path.exists('./history'):
    os.makedirs('./history')
if not os.path.exists('./figures'):
    os.makedirs('./figures')

## Train Dataset

In [None]:

def Cnn_Model():
    
    model = Sequential()
    model.add(Conv2D(256, (3, 3), activation='relu', padding='same', input_shape=(2,128,1)))
    model.add(BatchNormalization())
    model.add(MaxPooling2D(pool_size=(1, 2), padding='valid',  data_format=None))
    model.add(Dropout(.3))
    model.add(Conv2D(128, (3, 3), activation='relu', padding='same'))
    model.add(BatchNormalization())
    model.add(MaxPooling2D(pool_size=(1, 2), padding='valid', data_format=None))
    model.add(Dropout(.3))
    model.add(Conv2D(64, (3, 3), activation='relu', padding='same'))
    model.add(BatchNormalization())
    model.add(MaxPooling2D(pool_size=(1, 2), padding='valid', data_format=None))
    model.add(Dropout(.3))
    model.add(Conv2D(64, (3, 3), activation='relu', padding='same'))
    model.add(BatchNormalization())
    model.add(MaxPooling2D(pool_size=(1, 2), padding='valid', data_format=None))
    model.add(Dropout(.3))
    model.add(Flatten())
    model.add(Dense(128, activation='relu', kernel_initializer='he_normal'))
    model.add(BatchNormalization())
    model.add(Dense(11, activation='softmax', kernel_initializer='he_normal'))
    
    return model


In [None]:

with open('./dataset', 'rb') as file:
    data = pickle.load(file)
   
c = 0
accuracies_All = []
for key in data.keys():
    
    dataset = []
    labels = []
    
    for values in data[key]:
        labels.append(values[0])
        dataset.append(values[1]) 

        
    print('Starting training for SNR:', key)
    # data spliting

    N = len(dataset)
    shuffled_indeces = np.random.permutation(range(N))
    new_dataset = np.array(dataset)[shuffled_indeces,:,:]
    new_labels = np.array(labels)[shuffled_indeces,:]
    
    num_train = int(0.8*N)
    
    x_train = new_dataset[:num_train,:,:]
    y_train = new_labels[:num_train,:]

    num_val = int(0.1*len(x_train))
    
    x_val = x_train[:num_val,:,:]
    x_val = x_val.reshape(x_val.shape[0],x_val.shape[1],x_val.shape[2], -1)
    y_val = y_train[:num_val,:]
    
    x_train = x_train[num_val:,:,:]
    x_train = x_train.reshape(x_train.shape[0],x_train.shape[1],x_train.shape[2], -1)
    y_train = y_train[num_val:,:]
    
    x_test = new_dataset[num_train:,:,:]
    x_test = x_test.reshape(x_test.shape[0],x_test.shape[1],x_test.shape[2], -1)
    y_test = new_labels[num_train:,:] 

    
    models = Cnn_Model()
    
 
    opt = Adam(learning_rate=0.0001)
    models.compile(loss='categorical_crossentropy', optimizer=opt, metrics=['accuracy'])
    
    num_epochs = 50
    #earlyStopping = EarlyStopping(monitor='val_loss', patience=10, verbose=0, mode='min')
    mcp_save = ModelCheckpoint('.mdl_wts.hdf5', save_best_only=True, monitor='val_accuracy')
    #reduce_lr_loss = ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5, verbose=1, epsilon=1e-4, mode='min')
    #EarlyStopping(monitor='val_loss', patience=5, verbose=0, mode='auto')
    
    history = models.fit(x_train,
                        y_train,
                        epochs=num_epochs,
                        batch_size=10,
                        callbacks = [mcp_save],
                        validation_data=(x_val, y_val))
    
    
    models.load_weights(".mdl_wts.hdf5") 
    loss, acc = models.evaluate(x_test,y_test, verbose=2)
    accuracies_All.append([acc,key])   
    
    prediction = models.predict(x_test)
    labels_pred =np.argmax(prediction, axis = 1)
    
    
    with open('./history/SNR_{}_history.pkl'.format(key), 'wb') as file_pi:
        pickle.dump(history.history, file_pi)
    
    DICTED = {'1':labels_pred,'2':y_test}
    with open('./history/SNR_{}_prediction.pkl'.format(key), 'wb') as file_pi:
        pickle.dump(DICTED, file_pi)
        
    with open('./SNR_accuracies.pkl', 'wb') as file_pi:
        pickle.dump(accuracies_All, file_pi)

## Ploting Accuracy and Loss

In [None]:
for item in os.listdir('history'):
    i= item.split('_')
    if i[2] == 'history.pkl':
        with open('./history/'+item, 'rb') as file:
            history = pickle.load(file, encoding = 'latin')
        plt.plot(history['loss'], label = 'Train Loss')
        plt.plot(history['val_loss'], label = 'Validation Loss')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.title(f'Model Loss vs. Epoch (SNR= {i[1]})')
        plt.legend()
        plt.savefig(f"./figures/SNR {i[1]} -loss.jpg")
        plt.show()

        plt.plot(history['accuracy'], label = 'Train Accuracy')
        plt.plot(history['val_accuracy'], label = 'Validation Accuracy')
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy')
        plt.title(f'Model Accuracy vs. Epoch (SNR= {i[1]})')
        plt.legend()
        plt.savefig(f"./figures/SNR {i[1]} -Accuracy.jpg")

        plt.show()  

## Confusion Matrix Comparing Predicted and Target Label

In [None]:

with open(path_data, "rb") as p:
    d = pickle.load(p, encoding='latin1')

classes = []    
for i in d.keys():    
    if i[0] not in classes:
        classes.append(i[0])

# creating class dictionary for strings to digits transformation.
label_dict,  digit_label = digitizer(classes)

for item in os.listdir('history'):
    itm= item.split('_')
    if itm[2] == 'prediction.pkl':
        with open('./history/'+item, 'rb') as file:
            y = pickle.load(file, encoding = 'latin')
        y_pred = y['1']
        y_true = y['2']
        y_true = np.argmax(y_true, axis =1)

        labels = []
        for i in label_dict.items():
            labels.append(i[0])

        cm = confusion_matrix(y_true,y_pred)
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        fig, ax = plt.subplots()
        fig.set_figheight(10)
        fig.set_figwidth(10)
        plt.xticks(ticks=[-1,0,1,2,3,4,5,6,7,8,9,10], rotation=45)
        plt.yticks(ticks=[-1,0,1,2,3,4,5,6,7,8,9,10], rotation=45)
        im = ax.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
        ax.figure.colorbar(im, ax=ax)
        ax.set_xticklabels([''] + labels)
        ax.set_yticklabels([''] + labels)
        ax.set (title= f'Confusion Matrix Comparing Predicted and Target Label for SNR {itm[1]}', 
                ylabel='True label',
                xlabel='Predicted label')
        fmt = '.2f' 
        thresh = cm.max() / 2.
        for i in range(cm.shape[0]):
            for j in range(cm.shape[1]):
                ax.text(j, i, format(cm[i, j], fmt),
                        ha="center", va="center",
                        color="white" if cm[i, j] > thresh else "black")
        plt.savefig(f"./figures/SNR {itm[1]} -cm.jpg")
        plt.show()

## Performance of all Trained Models over RadioML2016.10a Dataset

In [None]:
def sorter(dataset1, legend1 = None, dataset2 = None, legend2 = None, dataset3 = None, legend3 = None, model = ''):
    
    if type(dataset3) == list:
        
        yyy = []
        xxx = []

        for i in range(len(dataset3)):
            xxx.append(dataset3[i][1])
        xxx = sorted(xxx)


        for i in xxx:
            for j in range(len(dataset3)):
                if dataset3[j][1]  == i:
                    yyy.append(dataset3[j][0])
         
    if type(dataset2) == list:
        
        yy = []
        xx = []

        for i in range(len(dataset2)):
            xx.append(dataset2[i][1])
        xx = sorted(xx)


        for i in xx:
            for j in range(len(dataset2)):
                if dataset2[j][1]  == i:
                    yy.append(dataset2[j][0])
 
    y = []
    x = []

    for i in range(len(dataset1)):
        x.append(dataset1[i][1])
    x = sorted(x)

    for i in x:
        for j in range(len(dataset1)):
            if dataset1[j][1]  == i:
                y.append(dataset1[j][0])
    
    fig = plt.figure(figsize = (10,8))
    plt.plot(x,y, linestyle = '--', marker = 'o', label = legend1)
    if type(dataset2) == list:
            plt.plot(xx,yy, linestyle = '--', marker = 'o', label = legend2)
            plt.legend()
            
    if type(dataset3) == list:
            plt.plot(xxx,yyy, linestyle = '--', marker = 'o', label = legend3)
            plt.legend()
    
    plt.title(model)
    plt.xlabel('SNR [dB]')
    plt.ylabel('Accuracy [%]')
    plt.grid()
    plt.savefig(f"./figures/Performance.jpg")
    plt.show()

In [None]:
with open('./SNR_accuracies.pkl', 'rb') as file:
    CNN = pickle.load(file, encoding = 'Latin')
sorter(CNN, 'CNN', model = 'Performance of all Trained Models over RadioML2016.10a Dataset')