## Import Packages

In [1]:
#import packages
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import os
import pywt
import time
import math
import random
import argparse
import scipy
import scipy.stats as st
from scipy import spatial
import sklearn
from PIL import Image
import pickle as pk
from __future__ import print_function

import torch
import torchvision
import torch.optim as optim
import torch.nn as nn
from torch.nn import Linear, ReLU, CrossEntropyLoss, Sequential, Conv2d, MaxPool2d, Module, Softmax, BatchNorm2d, Dropout
from torch.autograd import Variable as V
from torch.nn import functional as F
from torchvision import datasets, models, transforms, utils
from torch.utils import data
from torch.utils.data import DataLoader

# Device configuration
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

## Upload Data

In [2]:
#upload pickle
f = open('D:/论文/work/2022.07/0721-CNN and loss function simulation/RML2016.10a_dict.pkl','rb')
signal_data = pk.load(f, encoding = 'latin1')
Y = ['QPSK', 'PAM4', 'AM-DSB', 'GFSK', 'QAM64', 'AM-SSB', '8PSK', 'QAM16', 'WBFM', 'CPFSK', 'BPSK']
SNR = np.arange(-20, 20, 2)
X = np.zeros([11, 20, 1000, 2, 128])
ii = 0
for i in Y:
    jj = 0
    for j in SNR:
        X[ii, jj, :, :, :] = signal_data[(i, j)]
        jj += 1
    ii += 1
# plot_x = range(1, 129)
# plt.plot(plot_x, X[0,0,0,0, :])
# plt.show()

In [3]:
# Select a type of snr
def select_snr(x, snr):
    X_choosed = x[:, snr, :, :, :]
    X_choosed = np.reshape(X_choosed, [X_choosed.shape[0], 1000, 2, 128])
    return X_choosed

## Add Noise

In [4]:
def noisy_label(y, noise_ratio):
    noise_index = random.sample(list(np.arange(len(y) / 11).astype(int)), int(len(y) * noise_ratio / 11))
    tt = np.arange(11)
    for k in range(11):
        ttt = np.setdiff1d(tt, [k])
        for i in noise_index:
            y[int(i + k*len(y)/11)] = np.random.choice(ttt, replace = True)
    
    return y

In [5]:
def decomp_noise(x, snr, noise_ratio, train_ratio):
    X_choosed = select_snr(x, snr)
    n = 1000
    n1 = int(n * train_ratio)
    n2 = n - n1
    n_noise = int(n1 * noise_ratio)
    class_num = X_choosed.shape[0]
    Y = np.zeros([1, int(n)], dtype = int)
    
    #创建空数组
    train_index = np.zeros([class_num, n1], dtype = int)
    test_index = np.zeros([class_num, n2], dtype = int)
    
    x_train = np.zeros([class_num, n1, 2, 128])
    y_train = np.zeros([class_num, n1])
    x_test = np.zeros([class_num, n2, 2, 128])
    y_test = np.zeros([class_num, n2])
    
    #数据集分割
    for i in range(1, class_num):
        Y = np.concatenate((Y, np.ones([1, n]) * i), 0)
    for j in range(class_num):
        train_index[j, :] = random.sample(list(np.arange(n)), n1)
        test_index[j, :] = np.setdiff1d(np.arange(n), train_index[j, :])

        x_train[j, :, :] = X_choosed[j, train_index[j, :], :]
        y_train[j, :] = Y[j, train_index[j, :]]
        x_test[j, :, :] = X_choosed[j, test_index[j, :], :]
        y_test[j, :] = Y[j, test_index[j, :]]

    #reshape并对train与test添加错误标签
    x_train = np.reshape(x_train, [class_num * n1, 2, 128])
    y_train = np.reshape(y_train, [class_num * n1])
    y_train = noisy_label(y_train, noise_ratio).astype(int)
    
    x_test = np.reshape(x_test, [class_num * n2, 2, 128])
    y_test = np.reshape(y_test, [class_num * n2])


#     train_shuffle = np.random.choice(np.arange(X_choosed.shape[0] * int(1000 * sample_ratio1)), 
#                                      size=X_choosed.shape[0] * int(1000 * sample_ratio1), replace=False)
#     x_train = x_train[train_shuffle, :]
#     y_train = y_train[train_shuffle]
    
   
    return x_train, y_train, x_test, y_test

## Achieve the clean data

In [6]:
# achieve the clean sample
x_train, y_train, x_test, y_test = decomp_noise(X, 10, 0.5, 0.7)

In [7]:
x_test.shape

(3300, 2, 128)

In [8]:
train_label = np.zeros(7700)
for i in range(11):
    train_label[(700*i) : (700 * (i+1))] = i * np.ones(700)
train_label = pd.DataFrame(train_label)
train_label.to_csv("train_label.csv")
test_label = pd.DataFrame(y_test)
test_label.to_csv("test_label.csv")
train_noisy_label = pd.DataFrame(y_train)
train_noisy_label.to_csv("train_noise_label.csv")
origin_train = pd.DataFrame(np.reshape(x_train, [x_train.shape[0], 256]))
origin_test = pd.DataFrame(np.reshape(x_test, [x_test.shape[0], 256]))
origin_train.to_csv("train.csv")
origin_test.to_csv("test.csv")

## CNN（Save Parameter)

In [9]:
#Data Centralization
def train_test(x_train, y_train, x_test, y_test):
#     train_x, y_train, test_x, y_test = decomp_noise(x, 10, 0.3, 0.6, 0.2) 
    train_x = np.reshape(x_train, [x_train.shape[0], 1, 2, 128])
    test_x = np.reshape(x_test, [x_test.shape[0], 1, 2, 128])
    #print(train_x.shape)

    train_x = torch.tensor(train_x).to(device)
    test_x = torch.tensor(test_x).to(device)
    xmean = torch.mean(train_x, dim=0)
    xstd = torch.std(train_x, dim=0)
    # xmean = torch.unsqueeze(xmean, 0)
    # xstd = torch.unsqueeze(xstd, 0)
    #print(xmean.shape)

    train_x = (train_x - xmean) / (xstd + 1e-7)
    test_x = (test_x - xmean) / (xstd + 1e-7)

    y_train = torch.tensor(y_train).to(device)
    y_test = torch.tensor(y_test).to(device)
    
    return train_x, test_x, y_train, y_test

In [10]:
#Dataset Setup

train_x, test_x, y_train, y_test = train_test(x_train, y_train, x_test, y_test)
batch_size = 60
num_epochs = 5
num_classes = 11
learning_rate = 0.001

class MyDataset(data.Dataset):
    def __init__(self, x, y):
        self.x = x.type(torch.float32)
        self.y = y.long()

    def __getitem__(self, index):
        x = self.x[index, :, :, :]
        y = self.y[index]

        return x, y

    def __len__(self):
        return len(self.x[:, :, :, 0])
    

train_dataset = MyDataset(train_x, y_train)
test_dataset = MyDataset(test_x, y_test)

train_loader = DataLoader(train_dataset,
                          shuffle=True,
                          batch_size=batch_size)
test_loader = DataLoader(test_dataset,
                          shuffle=False,
                          batch_size=3300)

In [11]:
#Network Design

class ConvNet(nn.Module):
    def __init__(self, num_classes=11):
        super(ConvNet, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(1, 50, kernel_size = (1, 8), stride = (1, 1), padding = (0, 2)),
          nn.Dropout2d(),
          nn.BatchNorm2d(50),
          nn.ReLU()
          #nn.MaxPool2d(kernel_size=(1, 3), stride=(1, 1))
         )
        
        self.layer2 = nn.Sequential(
            nn.Conv2d(50, 50, kernel_size=(2, 8), stride=(1, 1), padding=(0, 2)),
            nn.Dropout2d(),
            nn.BatchNorm2d(50),
            nn.ReLU()
#             nn.MaxPool2d(kernel_size=(1, 3), stride=(1, 1))
        )
        self.layer3 = nn.Sequential(
            nn.Linear(6100, 256)
        )
        self.layer4 = nn.Sequential(
            nn.Dropout2d(),
            nn.ReLU(),
            nn.Linear(256, 11)
            
        )

    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = out.reshape(out.size(0), -1)
        temp = self.layer3(out)
        out = self.layer4(temp)
        return out, temp

model = ConvNet(num_classes).to(device)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [12]:
#训练
def train(epoch):
    running_loss = 0.0
#     if epoch == 100:
#         optimizer.param_groups[0]['lr'] /= 2
#     if epoch == 200:
#         optimizer.param_groups[0]['lr'] /= 2
    for batch_idx, data_ in enumerate(train_loader, 0):   # enumerate( , 0)  自动编号 从0开始

        inputs, target = data_
#         print(inputs)
#         print(target)
#         break
        optimizer.zero_grad()

        # forward, backward, update
        outputs, temp = model(inputs)
        loss = criterion(outputs, target)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        if batch_idx % 30 == 29:
            print('[%d, %5d] loss: %.3f' % (epoch + 1, batch_idx + 1, running_loss / 30))
            running_loss = 0.0
        
            
            
def test():
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in test_loader:
            images = images.to(device)
            labels = labels.to(device)
            outputs, temp = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    print('accuracy on test set: %d %% ' % (100 * correct / total))
    return (100 * correct / total)

In [13]:
# Save the Network
num_epochs = 5
num_classes = 11
batch_size = 60
learning_rate = 0.001
if __name__ == '__main__':
    his10_noiseconv = [0]
    optim = 0
    start = time.time()
    for epoch in range(40):
        train(epoch)
        acc = test()
        optim = max(his10_noiseconv)
        if acc >= optim:
            torch.save(model.state_dict(), "CNN_signal_0.3.pkl")
        his10_noiseconv.append(acc)
        print(optim)
    end = time.time()
    print(end - start)



[1,    30] loss: 2.508
[1,    60] loss: 2.395
[1,    90] loss: 2.381
[1,   120] loss: 2.375
accuracy on test set: 25 % 
0
[2,    30] loss: 2.309
[2,    60] loss: 2.313
[2,    90] loss: 2.277
[2,   120] loss: 2.252
accuracy on test set: 35 % 
25.727272727272727
[3,    30] loss: 2.218
[3,    60] loss: 2.257
[3,    90] loss: 2.201
[3,   120] loss: 2.191
accuracy on test set: 39 % 
35.303030303030305
[4,    30] loss: 2.171
[4,    60] loss: 2.154
[4,    90] loss: 2.154
[4,   120] loss: 2.133
accuracy on test set: 50 % 
39.24242424242424
[5,    30] loss: 2.119
[5,    60] loss: 2.120
[5,    90] loss: 2.102
[5,   120] loss: 2.117
accuracy on test set: 57 % 
50.39393939393939
[6,    30] loss: 2.074
[6,    60] loss: 2.109
[6,    90] loss: 2.028
[6,   120] loss: 2.072
accuracy on test set: 61 % 
57.90909090909091
[7,    30] loss: 2.034
[7,    60] loss: 2.012
[7,    90] loss: 1.999
[7,   120] loss: 1.996
accuracy on test set: 61 % 
61.57575757575758
[8,    30] loss: 1.909
[8,    60] loss: 1.913
[8

## CNN (Feature Extractor)

In [14]:
# Empty network architure
class MyDataset(data.Dataset):
    def __init__(self, x, y):
        self.x = x.type(torch.float32)
        self.y = y.long()

    def __getitem__(self, index):
        x = self.x[index, :, :, :]
        y = self.y[index]

        return x, y

    def __len__(self):
        return len(self.x[:, :, :, 0])

In [15]:
train_x.shape[0]

7700

In [16]:
num_classes = 11
learning_rate = 0.001  

class ConvNet(nn.Module):
    def __init__(self, num_classes=11):
        super(ConvNet, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(1, 50, kernel_size = (1, 8), stride = (1, 1), padding = (0, 2)),
          nn.Dropout2d(),
          nn.BatchNorm2d(50),#正则化
          nn.ReLU()
          #nn.MaxPool2d(kernel_size=(1, 3), stride=(1, 1))
         )
        
        self.layer2 = nn.Sequential(
            nn.Conv2d(50, 50, kernel_size=(2, 8), stride=(1, 1), padding=(0, 2)),
            nn.Dropout2d(),
            nn.BatchNorm2d(50),
            nn.ReLU()
#             nn.MaxPool2d(kernel_size=(1, 3), stride=(1, 1))
        )
        self.layer3 = nn.Sequential(
            nn.Linear(6100, 256)
        )
        self.layer4 = nn.Sequential(
            nn.Dropout2d(),
            nn.ReLU(),
            nn.Linear(256, 11)
            
        )

    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = out.reshape(out.size(0), -1)
        temp = self.layer3(out)
        out = self.layer4(temp)
        return out, temp

model = ConvNet(num_classes).to(device)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    
new_model = model                                                   # 调用模型Model
new_model.load_state_dict(torch.load("./CNN_signal_0.3.pkl"))    # 加载模型参数     

<All keys matched successfully>

### for train

In [17]:
#Feature extractor for train
def ch_gen(model, train_loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in train_loader:
            images = images#.to(device)
            labels = labels#.to(device)
            outputs, temp = model(images)
            
    return temp

def character_extract():
    train_dataset = MyDataset(train_x, y_train)
    batch_size = train_x.shape[0]
    train_loader = DataLoader(train_dataset,
                          shuffle=False,
                          batch_size=train_x.shape[0])                                            
    

    character = ch_gen(new_model, train_loader)
    character = character.cpu().detach().numpy()
    return character, y_train.cpu().numpy()
    

In [18]:
signal_3_logits_before, signal_3_y = character_extract()

In [19]:
save = pd.DataFrame(signal_3_logits_before)
save.to_csv("train_noise_logits_before.csv")

### for test

In [20]:
#Feature extractor for train
def ch_gen_test(model, train_loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in train_loader:
            images = images#.to(device)
            labels = labels#.to(device)
            outputs, temp = model(images)
            
    return temp

def character_extract_test():
    train_dataset = MyDataset(test_x, y_test)
    batch_size = test_x.shape[0]
    train_loader = DataLoader(train_dataset,
                          shuffle=False,
                          batch_size=test_x.shape[0])                                            
    

    character = ch_gen_test(new_model, train_loader)
    character = character.cpu().detach().numpy()
    return character
    

In [21]:
signal_3_logits_before_test = character_extract_test()

In [22]:
save_test = pd.DataFrame(signal_3_logits_before_test)
save_test.to_csv("test_noise_logits_before.csv")