import CNN models
import map-based data

loop over number of data samples
    apply CNN model to data
    de-normalize
    calculate NMSE


In [1]:
import h5py
import torch
import torch.nn as nn
import random


import os
import sys
import numpy as np
import matplotlib.pyplot as plt
import datetime
from tempfile import TemporaryFile
from scipy.io import loadmat
from scipy.io import savemat
from torch.utils.data import DataLoader, TensorDataset



### Load min, max variables for normalize and denormalize 

In [18]:
rowss = '1500_1509_3916_3920'
save_path = '../model/static/CNN/BS16/'+ rowss + '/'
snr = 0
variables = torch.load(save_path + str(snr) + 'dB/CNN_1_variable.pth')

trainData_LS_min = variables['train_min_LS']
trainData_LS_max = variables['train_max_LS']
trainData_LI_min = variables['train_min_LI']
trainData_LI_max = variables['train_min_LI']
trainLabels_min  = variables['train_label_min']
trainLabels_max  = variables['train_label_max']


# Load Model

In [19]:
sys.path.append('../helper')
from utils import CNN_Est
import loader

In [20]:
model_LS_CNN = CNN_Est()

checkpoint_LS = torch.load(save_path + str(snr) + 'dB/CNN_1_LS_CNN_model.pth')
model_LS_CNN.load_state_dict(checkpoint_LS['model_state_dict'])

model_LS_CNN.eval()

CNN_Est(
  (normalization): BatchNorm2d(1, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv1): Conv2d(1, 64, kernel_size=(9, 9), stride=(1, 1), padding=(4, 4))
  (relu): ReLU()
  (conv2): Conv2d(64, 64, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
  (conv3): Conv2d(64, 64, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
  (conv4): Conv2d(64, 32, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
  (conv5): Conv2d(32, 1, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
)

In [21]:
model_LS_LI_CNN = CNN_Est()


checkpoint_LI = torch.load(save_path + str(snr) + 'dB/CNN_1_LS_CNN_model.pth')
model_LS_LI_CNN.load_state_dict(checkpoint_LI['model_state_dict'])

model_LS_LI_CNN.eval()

CNN_Est(
  (normalization): BatchNorm2d(1, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv1): Conv2d(1, 64, kernel_size=(9, 9), stride=(1, 1), padding=(4, 4))
  (relu): ReLU()
  (conv2): Conv2d(64, 64, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
  (conv3): Conv2d(64, 64, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
  (conv4): Conv2d(64, 32, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
  (conv5): Conv2d(32, 1, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
)

# Load ETS map-based data

In [49]:
mapBaseData_path = '../../CDL Customization/Data/ver1/' + str(snr) + 'dB/1_mapBaseData.mat'
file = h5py.File(mapBaseData_path, 'r')
file['H_data'].shape

(732, 2, 14, 612)

In [50]:
H_true = np.empty((0, 2, 612, 14)) # true channel
H_equal = np.empty((0, 2, 612, 14)) # noisy channel # LS channel
H_linear = np.empty((0, 2, 612, 14)) # noisy channel # LS+Linear Interpolated channel
        
H_true = np.concatenate((H_true, np.transpose(np.array(file['H_data']), (0,1,3,2))) , axis = 0) # N_samples x channel(2) x height(614) x width(14)
H_equal = np.concatenate((H_equal, np.transpose(np.array(file['H_equalized_data']), (0,1,3,2))), axis = 0)
H_linear = np.concatenate((H_linear, np.transpose(np.array(file['H_linear_data']), (0,1,3,2))), axis=0)

H_true = torch.from_numpy(H_true)
H_equal = torch.from_numpy(H_equal)
H_linear = torch.from_numpy(H_linear)


## Normalize data and Make test_loader

In [55]:
device = 'cpu'
BATCH_SIZE = 32
trainData_LS_min = variables['train_min_LS']
trainData_LS_max = variables['train_max_LS']
trainData_LI_min = variables['train_min_LI']
trainData_LI_max = variables['train_min_LI']
trainLabels_min  = variables['val_min']
trainLabels_max  = variables['val_max']

testData_LS_normd   = (H_equal - trainData_LS_min)/ (trainData_LS_max - trainData_LS_min)
testData_LI_normd   = (H_linear - trainData_LI_min)/ (trainData_LI_max - trainData_LI_min)

testData_LS_normd = testData_LS_normd.to(device, dtype=torch.float)
testData_LI_normd = testData_LI_normd.to(device, dtype=torch.float)
H_true = H_true.to(device, dtype=torch.float)

# Create a DataLoader for dataset
test_dataset = TensorDataset(testData_LS_normd, testData_LI_normd, H_true)  # [nSamples, 2, 612, 14]
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)


In [60]:
SNR = [0] # np.arange(0, 31, 5) # 0:5:30 dB
average_nmse_LS_LI = []
average_nmse_LI_NN = []
average_nmse_LS_NN = []

for snr in SNR:
    with torch.no_grad():
        nmse_LS_LI = []
        nmse_LS_NN = []
        nmse_LI_NN = []
        for batch_idx, (LS_inputs, LI_inputs, test_targets) in enumerate(test_loader):
            print(f"Batch {batch_idx + 1}/{len(test_loader)}")
            
            LS_inputs_real = LS_inputs[:,0,:,:].unsqueeze(1)
            LS_inputs_imag = LS_inputs[:,1,:,:].unsqueeze(1)
            
            LI_inputs_real = LI_inputs[:,0,:,:].unsqueeze(1)
            LI_inputs_imag = LI_inputs[:,1,:,:].unsqueeze(1)
            
            # Calculate the variance of the targets
            test_targets_complex = torch.complex(test_targets[:,0,:,:], test_targets[:,1,:,:])
            variance = torch.var(test_targets_complex)
            
            # NMSE of LS+NN
            LS_outputs_real = model_LS_CNN(LS_inputs_real) # 32x1x612x14
            LS_outputs_imag = model_LS_CNN(LS_inputs_imag) 
            LS_outputs = torch.cat((LS_outputs_real, LS_outputs_imag), dim=1) # 32x2x612x14
            # De-normalized
            LS_outputs_denormd = LS_outputs * (trainLabels_max - trainLabels_min) + trainLabels_min
            LS_outputs_complex = torch.complex(LS_outputs_real, LS_outputs_imag)
            # Calculate the mean squared error
            mse_LS_NN = torch.mean(torch.abs(test_targets_complex - LS_outputs_complex) ** 2)
            nmse_LS_NN.append(mse_LS_NN / variance)
            
            # NMSE of LS+LI
            # De-normalized
            LS_LI_outputs_denormd = LI_inputs * (trainLabels_max - trainLabels_min) + trainLabels_min
            LS_LI_outputs_complex = torch.complex(LI_inputs[:,:,0,:], LI_inputs[:,:,1,:])
            # Calculate the mean squared error
            mse_LS_LI = torch.mean(torch.abs(test_targets_complex - LS_LI_outputs_complex) ** 2)
            nmse_LS_LI.append(mse_LS_LI / variance)
            
            # NMSE of LS+LI+NN
            LI_NN_outputs_real = model_LS_LI_CNN(LI_inputs_real)    # 32x1x612x14
            LI_NN_outputs_imag = model_LS_LI_CNN(LI_inputs_imag)
            LI_NN_outputs = torch.cat((LI_NN_outputs_real, LI_NN_outputs_imag), dim=1) # 32x2x612x14
            # De-normalized
            LI_NN_outputs_denormd = LI_NN_outputs * (trainLabels_max - trainLabels_min) + trainLabels_min
            LI_NN_outputs_complex = torch.complex(LI_NN_outputs_real, LI_NN_outputs_imag)
            # Calculate the mean squared error
            mse_LI_NN = torch.mean(torch.abs(test_targets_complex - LI_NN_outputs_complex) ** 2)
            nmse_LI_NN.append(mse_LI_NN / variance)
            
        average_nmse_LS_LI.append(sum(nmse_LS_LI)/len(nmse_LS_LI))
        average_nmse_LS_NN.append(sum(nmse_LS_NN)/len(nmse_LS_NN))
        average_nmse_LI_NN.append(sum(nmse_LI_NN)/len(nmse_LI_NN))

Batch 1/23


TypeError: unsupported operand type(s) for -: 'builtin_function_or_method' and 'Tensor'