### 1.环境和数据准备

  以下是所有需要用的包

In [1]:
import numpy as np
import torch.nn as nn
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset
from collections import OrderedDict
from torchsummary import summary
import timm
import h5py
import os
import random
import scipy.io as scio
import matplotlib.pyplot as plt
#!pip install torchsummary
#!pip install timm

#gpu_list = '0'
#os.environ["CUDA_VISIBLE_DEVICES"] = gpu_list
def seed_everything(seed=42):
    random.seed(seed)
    os.environ['PYHTONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
SEED = 42
seed_everything(SEED)

def visualize(img):
    if len(img.shape) == 2:
        plt.imshow(img, cmap='gray', vmin = 0, vmax = 1,interpolation='none')
        plt.show()
    else:
        for i in range(2):
            plt.imshow(img[i,:,:,], cmap='gray', vmin = 0, vmax = 1,interpolation='none')
            plt.show()

In [2]:
torch.cuda.is_available()

True

In [3]:
torch.cuda.current_device()

0

In [4]:
!pwd

/python_prj/5g


#### 1.1 训练数据加载和探索

In [6]:
import scipy.io as scio
data_load_address = './data/'
mat = scio.loadmat(data_load_address+'/Htrain.mat')
x_train = mat['H_train']  # shape=8000*126*128*2

x_train = np.transpose(x_train.astype('float32'),[0,3,1,2])
print(np.shape(x_train))

mat = scio.loadmat(data_load_address+'/Htest.mat')
x_test = mat['H_test']  # shape=2000*126*128*2

x_test = np.transpose(x_test.astype('float32'),[0,3,1,2])
print(np.shape(x_test))

(8000, 2, 126, 128)
(2000, 2, 126, 128)


In [9]:
#visualize(1-(x_test[219]-0.5)/0.01)

### 2.模型思路

模型主要参考了两篇论文，
(使用CRNet网络,channel reconstruction network) 和 MRFnet。

论文①:https://arxiv.org/pdf/1910.14322.pdf
<br>中的思路，它设计了一种多分辨率结构的网络，对'信道图'进行编码和解码,
<br>这种结构在没有任何额外信息的情况下，在相同的计算复杂度下，所提出的CRNet优于最先进的CsiNet
<br>将多分辨路径和卷积分解引入CSI反馈任务，并设计了一个退火训练方案优化CRNet性能

还有论文②：https://ieeexplore.ieee.org/document/9495802/ 中提出的MRFNet的结构。
<br>MRFNet可以恢复具有不同感受野和大量卷积通道的丰富特征，从而更好地恢复CSI。
作者用不同数量的卷积通道和多个感受野来可视化最后一个块的输出和MRFNet的最终输出。

2.1 本文融合了上述两种网络，进行复现。

<br>在Encode阶段，使用了CRNet的block结构。
<br>在Deconde阶段，使用MRFNet的Block结果。
<br> 融合的关键在于使用CRnet的编码阶段网络优化,MRFNet的Encode，进一步提升整体性能,联合了两种网络的优点；
<br>并应用于本比赛场景，取得了初步的效果，由于节后开工时间紧迫只提交了一次，后面还待优化，模型代码如下

### 3.model定义

In [16]:
mean = 0.5
std = 0.01

In [10]:
def Num2Bit(Num, B):
    Num_ = Num.type(torch.uint8)

    def integer2bit(integer, num_bits=B * 2):
        dtype = integer.type()
        exponent_bits = -torch.arange(-(num_bits - 1), 1).type(dtype)
        exponent_bits = exponent_bits.repeat(integer.shape + (1,))
        out = integer.unsqueeze(-1) // 2 ** exponent_bits
        return (out - (out % 1)) % 2

    bit = integer2bit(Num_)
    bit = (bit[:, :, B:]).reshape(-1, Num_.shape[1] * B)
    return bit.type(torch.float32)


def Bit2Num(Bit, B):
    Bit_ = Bit.type(torch.float32)
    Bit_ = torch.reshape(Bit_, [-1, int(Bit_.shape[1] / B), B])
    num = torch.zeros(Bit_[:, :, 1].shape).cuda()
    for i in range(B):
        num = num + Bit_[:, :, i] * 2 ** (B - 1 - i)
    return num


class Quantization(torch.autograd.Function):
    @staticmethod
    def forward(ctx, x, B):
        ctx.constant = B
        step = 2 ** B
        out = torch.round(x * step - 0.5)
        out = Num2Bit(out, B)
        return out

    @staticmethod
    def backward(ctx, grad_output):
        # return as many input gradients as there were arguments.
        # Gradients of constant arguments to forward must be None.
        # Gradient of a number is the sum of its B bits.
        b, _ = grad_output.shape
        grad_num = torch.sum(grad_output.reshape(b, -1, ctx.constant), dim=2) / ctx.constant
        return grad_num, None


class Dequantization(torch.autograd.Function):
    @staticmethod
    def forward(ctx, x, B):
        ctx.constant = B
        step = 2 ** B
        out = Bit2Num(x, B)
        out = (out + 0.5) / step
        return out

    @staticmethod
    def backward(ctx, grad_output):
        # return as many input gradients as there were arguments.
        # Gradients of non-Tensor arguments to forward must be None.
        # repeat the gradient of a Num for B time.
        b, c = grad_output.shape
        grad_output = grad_output.unsqueeze(2) / ctx.constant
        grad_bit = grad_output.expand(b, c, ctx.constant)
        return torch.reshape(grad_bit, (-1, c * ctx.constant)), None


class QuantizationLayer(nn.Module):

    def __init__(self, B):
        super(QuantizationLayer, self).__init__()
        self.B = B

    def forward(self, x):
        out = Quantization.apply(x, self.B)
        return out


class DequantizationLayer(nn.Module):

    def __init__(self, B):
        super(DequantizationLayer, self).__init__()
        self.B = B

    def forward(self, x):
        out = Dequantization.apply(x, self.B)
        return out

In [11]:
def conv3x3(in_planes, out_planes, stride=1):
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride,
                     padding=1, bias=True)


class ConvBN(nn.Sequential):
    def __init__(self, in_planes, out_planes, kernel_size, stride=1, groups=1):
        if not isinstance(kernel_size, int):
            padding = [(i - 1) // 2 for i in kernel_size]
        else:
            padding = (kernel_size - 1) // 2
        super(ConvBN, self).__init__(OrderedDict([
            ('conv', nn.Conv2d(in_planes, out_planes, kernel_size, stride,
                               padding=padding, groups=groups, bias=False)),
            ('bn', nn.BatchNorm2d(out_planes))
        ]))

class SELayer(nn.Module):
    def __init__(self, channel, reduction=8):
        super(SELayer, self).__init__()
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Sequential(
            nn.Linear(channel, channel // reduction, bias=False),
            nn.BatchNorm1d(channel // reduction),
            nn.ReLU(inplace=True),
            nn.Linear(channel // reduction, channel, bias=False),
            nn.BatchNorm1d(channel ),
            nn.Sigmoid()
        )

    def forward(self, x):
        b, c, _, _ = x.size()
        y = self.avg_pool(x).view(b, c)
        y = self.fc(y).view(b, c, 1, 1)
        return x * y.expand_as(x)

class CRBlock(nn.Module):
    def __init__(self,ch_nums,norm=True):
        super(CRBlock, self).__init__()
        self.path1 = nn.Sequential(OrderedDict([
            ('conv3x3', ConvBN(2, ch_nums, 3)),
            ('relu1', nn.LeakyReLU(negative_slope=0.3, inplace=True)),
            ('conv1x9', ConvBN(ch_nums, ch_nums, [1, 9])),
            ('relu2', nn.LeakyReLU(negative_slope=0.3, inplace=True)),
            ('conv9x1', ConvBN(ch_nums, ch_nums, [9, 1])),
        ]))
        self.path2 = nn.Sequential(OrderedDict([
            ('conv1x5', ConvBN(2, ch_nums, [1, 5])),
            ('relu', nn.LeakyReLU(negative_slope=0.3, inplace=True)),
            ('conv5x1', ConvBN(ch_nums, ch_nums, [5, 1])),
        ]))
        self.identity = nn.Identity()
        self.norm = norm
        self.relu = nn.LeakyReLU(negative_slope=0.3, inplace=True) 
        self.se = SELayer(ch_nums *2)
        if norm:
            self.out_layer = nn.Sequential(
                nn.BatchNorm2d(2),
                nn.LeakyReLU(negative_slope=0.3, inplace=True),
            )
            self.conv1x1 = ConvBN(ch_nums * 2, 2, 1)
            
        else:
            self.conv1x1 = conv3x3(ch_nums*2,2)
            self.out_layer = nn.Sigmoid()
        

    def forward(self, x):
        identity = self.identity(x)

        out1 = self.path1(x)
        out2 = self.path2(x)
        out = torch.cat((out1, out2), dim=1)
        out = self.se(out)
        out = self.relu(out)
        out = self.conv1x1(out)
        if self.norm:
            out = self.out_layer(out + identity)
        else:
            out = self.out_layer(out)
        return out

class MRFBlock(nn.Module):
    def __init__(self,ch_nums=64):
        super(MRFBlock, self).__init__()
        self.path1 = nn.Sequential(
            ConvBN(ch_nums,ch_nums,5),
            nn.LeakyReLU(negative_slope=0.3, inplace=True)
        )
        self.path2 = nn.Sequential(
            ConvBN(ch_nums,ch_nums,7),
            nn.LeakyReLU(negative_slope=0.3, inplace=True)
        )
        self.path3 = nn.Sequential(
            ConvBN(ch_nums,ch_nums,9),
            nn.LeakyReLU(negative_slope=0.3, inplace=True)
        )
        self.conv1x1 = nn.Sequential(
            ConvBN(ch_nums*3,ch_nums,1),
            nn.BatchNorm2d(ch_nums)
        )
        self.identity = nn.Identity()
        self.relu = nn.LeakyReLU(negative_slope=0.3, inplace=True)               

    def forward(self, x):
        identity = self.identity(x)

        out1 = self.path1(x)
        out2 = self.path2(x)
        out3 = self.path3(x)
        out = torch.cat((out1, out2,out3), dim=1)
        out = self.conv1x1(out)
        out = self.relu(out+identity)
        return out
    
def get_efficientnet_ns(model_name='tf_efficientnet_b0_ns', pretrained=True):
    net = timm.create_model(model_name, pretrained=pretrained)
    n_features = net.classifier.in_features

    return net, n_features


class Encoder(nn.Module):
    B = 4

    def __init__(self, feedback_bits, efn_name='tf_efficientnet_b0_ns'):
        super(Encoder, self).__init__()
        self.fc = nn.Linear(32256, int(feedback_bits // self.B))
        self.bn = nn.BatchNorm1d(int(feedback_bits // self.B))
        self.encoder = nn.Sequential(OrderedDict([
            ("conv5x5_bn", ConvBN(2, 2, 5 )),
            ("relu", nn.LeakyReLU(negative_slope=0.3, inplace=True)),
            ("CRBlock1", CRBlock(32)),
        ]))
        self.sig = nn.Sigmoid()
        self.quantize = QuantizationLayer(self.B)
        for m in self.modules():
            if isinstance(m, (nn.Conv2d, nn.Linear)):
                nn.init.xavier_uniform_(m.weight)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

    def forward(self, x):
        x = (x-mean)/std      
        x = self.encoder(x)
        out = torch.flatten(x, 1)
        out = self.bn(self.fc(out))
        out = self.sig(out)
        out = self.quantize(out)
        return out

class Decoder(nn.Module):
    B = 4

    def __init__(self, feedback_bits):
        super(Decoder, self).__init__()
        self.feedback_bits = feedback_bits
        self.dequantize = DequantizationLayer(self.B)
        
        self.decoder = nn.Sequential(OrderedDict([
            ("conv5x5_bn", ConvBN(2, 64, 3 )),
            ("relu", nn.LeakyReLU(negative_slope=0.3, inplace=True)),
            ("MRFBlock1", MRFBlock(64)),
            ("MRFBlock1", MRFBlock(64)),
        ]))
        
        self.fc = nn.Linear(int(feedback_bits // self.B), 32256)
        self.bn1d = nn.BatchNorm1d(32256)
        self.sig = nn.Sigmoid()
        self.out_conv = conv3x3(64,2)
        for m in self.modules():
            if isinstance(m, (nn.Conv2d, nn.Linear)):
                nn.init.xavier_uniform_(m.weight)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

    def forward(self, x):
        out = self.dequantize(x)
        out = out.view(-1, int(self.feedback_bits // self.B))

        out = self.sig(self.fc(out))
        out = out.view(-1, 2, 126, 128)
        out = self.decoder(out)
        out = self.sig(self.out_conv(out))
        return out

# Note: Do not modify following class and keep it in your submission.
# feedback_bits is 512 by default.
class AutoEncoder(nn.Module):

    def __init__(self, feedback_bits, efn_name='tf_efficientnet_b0_ns'):
        super(AutoEncoder, self).__init__()
        self.encoder = Encoder(feedback_bits, efn_name=efn_name)
        self.decoder = Decoder(feedback_bits)

    def forward(self, x):
        feature = self.encoder(x)
        out = self.decoder(feature)
        return out

    def forward_train(self,x):
        feature,en_data = self.encoder.forward_train(x)
        out,de_data = self.decoder.forward_train(feature)
        return out,en_data,de_data


def NMSE(x, x_hat):
    x_real = np.reshape(x[:, :, :, 0], (len(x), -1))
    x_imag = np.reshape(x[:, :, :, 1], (len(x), -1))
    x_hat_real = np.reshape(x_hat[:, :, :, 0], (len(x_hat), -1))
    x_hat_imag = np.reshape(x_hat[:, :, :, 1], (len(x_hat), -1))
    x_C = x_real - 0.5 + 1j * (x_imag - 0.5)
    x_hat_C = x_hat_real - 0.5 + 1j * (x_hat_imag - 0.5)
    power = np.sum(abs(x_C) ** 2, axis=1)
    mse = np.sum(abs(x_C - x_hat_C) ** 2, axis=1)
    nmse = np.mean(mse / power)
    return nmse

def NMSE_cuda(x, x_hat):
    x_real = x[:, 0, :, :].view(len(x), -1) - 0.5
    x_imag = x[:, 1, :, :].view(len(x), -1) - 0.5
    x_hat_real = x_hat[:, 0, :, :].view(len(x_hat), -1) - 0.5
    x_hat_imag = x_hat[:, 1, :, :].view(len(x_hat), -1) - 0.5
    power = torch.sum(x_real ** 2 + x_imag ** 2, axis=1)
    mse = torch.sum((x_real - x_hat_real) ** 2 + (x_imag - x_hat_imag) ** 2, axis=1)
    nmse = mse / power
    return nmse
 
class NMSELoss(nn.Module):
    def __init__(self, reduction='sum'):
        super(NMSELoss, self).__init__()
        self.reduction = reduction
 
    def forward(self, x_hat, x):
        nmse = NMSE_cuda(x, x_hat)
        if self.reduction == 'mean':
            nmse = torch.mean(nmse)
        else:
            nmse = torch.sum(nmse)
        return nmse

def Score(NMSE):
    score = 1 - NMSE
    return score


class DatasetFolder(Dataset):
    def __init__(self, matData, phase='val'):
        self.matdata = matData
        self.phase = phase
        self.data_shape = matData[0].shape
 
    def __getitem__(self, index):
        y = self.matdata[index]
        return y
 
    def __len__(self):
        return self.matdata.shape[0]

### 4.模型的训练

In [17]:
import gc
import math
from torch.optim.lr_scheduler import _LRScheduler

#CRnet论文中的cosine退火学习率
class WarmUpCosineAnnealingLR(_LRScheduler):
    def __init__(self, optimizer, T_max, T_warmup, eta_min=0, last_epoch=-1):
        self.T_max = T_max
        self.T_warmup = T_warmup
        self.eta_min = eta_min
        super(WarmUpCosineAnnealingLR, self).__init__(optimizer, last_epoch)

    def get_lr(self):
        if self.last_epoch < self.T_warmup:
            return [base_lr * self.last_epoch / self.T_warmup for base_lr in self.base_lrs]
        else:
            k = 1 + math.cos(math.pi * (self.last_epoch - self.T_warmup) / (self.T_max - self.T_warmup))
            return [self.eta_min + (base_lr - self.eta_min) * k / 2 for base_lr in self.base_lrs]
gc.collect()
torch.cuda.empty_cache()
gc.collect()

0

模型的初始化和训练准备

In [13]:

batch_size = 32
epochs = 500
learning_rate = 2e-3
feedback_bits = 512

model = AutoEncoder(feedback_bits,efn_name='tf_efficientnet_b0_ns')
criterion = NMSELoss(reduction='mean')  # nn.MSELoss()
criterion_test = NMSELoss(reduction='sum')

train_dataset = DatasetFolder(x_train,phase = 'train')
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size,shuffle=True,num_workers=4)
test_dataset = DatasetFolder(x_test,phase = 'val')
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size,num_workers=4)
for d in train_loader:
    print(d.shape)
    break
best_loss =100
model = model.cuda()
summary(model,(2,126,128))

torch.Size([32, 2, 126, 128])
----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1          [-1, 2, 126, 128]             100
       BatchNorm2d-2          [-1, 2, 126, 128]               4
         LeakyReLU-3          [-1, 2, 126, 128]               0
          Identity-4          [-1, 2, 126, 128]               0
            Conv2d-5         [-1, 32, 126, 128]             576
       BatchNorm2d-6         [-1, 32, 126, 128]              64
         LeakyReLU-7         [-1, 32, 126, 128]               0
            Conv2d-8         [-1, 32, 126, 128]           9,216
       BatchNorm2d-9         [-1, 32, 126, 128]              64
        LeakyReLU-10         [-1, 32, 126, 128]               0
           Conv2d-11         [-1, 32, 126, 128]           9,216
      BatchNorm2d-12         [-1, 32, 126, 128]              64
           Conv2d-13         [-1, 32, 126, 128]             320
      Bat

  out = integer.unsqueeze(-1) // 2 ** exponent_bits


#### 4.1 训练环境说明

<br>CPU：2.5g   内存：8G   显卡：Ti3060   
<br>epoch:500,训练时长:4hour

In [14]:
import time

modelSave1 = './data/models/encoder.pth.tar'
modelSave2 ='./data/models/decoder.pth.tar'
mse_fn = nn.MSELoss()
learning_rate = 3e-3
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate,weight_decay=0.05,betas=(0.9,0.95))
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.7, patience=4,  min_lr=5e-7, verbose=False)
for epoch in range(epochs):
    print('========================lr:%.4e' % optimizer.param_groups[0]['lr'])
    # 训练
    model.train()
    running_loss = 0.0
    start_time = time.time()
    for i, input in enumerate(train_loader):
        input = input.cuda()
        output= model(input)
 
        loss= mse_fn(output, input)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        if i % 100 == 0:
            print('Epoch: [{0}][{1}/{2}]\t'
                  'Loss {loss:.8f}\t'.format(
                epoch, i, len(train_loader), loss=loss.item()))
        running_loss += loss.item()
    epoch_loss = running_loss / len(train_loader)
    # 打印epoch信息
    print('time for this epch:',time.time()-start_time)
    model.eval()
    total_loss = 0
    mse_loss = 0
    with torch.no_grad():
        for i, input in enumerate(test_loader):
            # convert numpy to Tensor
            input = input.cuda()
            output = model(input)
            total_loss += criterion_test(output, input).item()
            mse_loss += mse_fn(output, input).item()
        average_loss = total_loss / len(test_dataset)
        average_loss_mse = mse_loss / len(test_dataset)
        print('NMSE %.4f' % average_loss,'  MSE %.10f' % average_loss_mse)
        if average_loss < best_loss:
            # model save
            # save encoder
            torch.save({'state_dict': model.encoder.state_dict(), }, modelSave1)
            # save decoder
            torch.save({'state_dict': model.decoder.state_dict(), }, modelSave2)
            print('Model saved!')
            best_loss = average_loss
    scheduler.step(average_loss)



  out = integer.unsqueeze(-1) // 2 ** exponent_bits


Epoch: [0][0/250]	Loss 0.03649970	
Epoch: [0][100/250]	Loss 0.00031055	
Epoch: [0][200/250]	Loss 0.00030923	
time for this epch: 154.34033346176147
NMSE 1.3254   MSE 0.0000078281
Model saved!
Epoch: [1][0/250]	Loss 0.00021043	
Epoch: [1][100/250]	Loss 0.00023766	
Epoch: [1][200/250]	Loss 0.00021129	
time for this epch: 159.75664925575256
NMSE 1.0733   MSE 0.0000065440
Model saved!


### 5.保存和加载model,以备提交

In [19]:
modelSave1 = './data/models/encoder.pth.tar'
torch.save({'state_dict': model.encoder.state_dict(), }, modelSave1)
# save decoder
modelSave2 ='./data/models/decoder.pth.tar'
torch.save({'state_dict': model.decoder.state_dict(), }, modelSave2)

In [None]:
model.encoder.load_state_dict(torch.load('/content/drive/MyDrive/naic_models/encoderb1.pth.tar')['state_dict'])
model.decoder.load_state_dict(torch.load('/content/drive/MyDrive/naic_models/decoderb1.pth.tar')['state_dict'])

<All keys matched successfully>

In [20]:
type(model)

__main__.AutoEncoder