In [37]:
import torch
import torch.nn as nn


class BasicBlock(nn.Module):
    def __init__(self, in_ch, out_ch, k_size):
        super().__init__()
        self.add_module("bn", nn.BatchNorm1d(in_ch))
        self.add_module("act", nn.ReLU())
        self.add_module("conv", nn.Conv1d(in_ch, out_ch, k_size, padding=k_size//2))
    
    def forward(self, X):
        for blk in self._models:
            X = blk(X)
        return X
    
    
class DenseBlock(nn.Module):
    def __init__(self, in_ch, conv_num, ch_num):
        super().__init__()
        self._out_ch = in_ch + ch_num * conv_num
        for i in torch.arange(conv_num):
            self.add_module(f"basic_blk{i}", BasicBlock(in_ch, ch_num, 3))
            in_ch += ch_num
    
    def forward(self, X):
        for blk in self._modules:
            Y = blk(X)
            X = torch.cat([X, Y])
        return X
    
    def out_ch(self):
        return self._out_ch
            
            
class TransBlock(nn.Module):
    def __init__(self, in_ch, conv_k_num, conv_k_size, pool_k_size, pool_step):
        super().__init__()
        self.add_module("basic_blk", BasicBlock(in_ch, conv_k_num, conv_k_size))
        self.add_module("pool", nn.MaxPool1d(pool_k_size, pool_step, padding=pool_k_size//2))
        
    def forward(self, X):
        for blk in self._models:
            X = blk(X)
        return X
    
    

In [38]:
import torch.nn.functional as F

class Conv_DenseNet_1D(nn.Module):

    loss = F.cross_entropy
    device = torch.device('cuda:0')

    def __init__(self):
        super().__init__()
        # 密连接部分
        dense_part = nn.Sequential()
        dense_part.add_module("conv1", nn.Conv1d(2, 64, 3, 1, 1))
        dense_part.add_module("trans_blk1", TransBlock(64, 128, 3, 3, 2))
        dense_part.add_module("dense_blk1", DenseBlock(128, 2, 128))
        dense_part.add_module("trans_blk2", TransBlock(128 * 2 + 128, 64, 3, 3, 2))
        dense_part.add_module("dense_blk2", DenseBlock(64, 3, 64))
        dense_part.add_module("trans_blk3", TransBlock(64 * 3 + 64, 64, 3, 3, 2))
        dense_part.add_module("dense_blk3", DenseBlock(64, 4, 64))
        dense_part.add_module("trans_blk4", TransBlock(64 * 4 + 64, 64, 3, 3, 2))
        dense_part.add_module("dense_blk4", DenseBlock(64, 3, 64))
        dense_part.add_module("conv2", nn.Conv1d(64 * 3 + 64, 150, 3, 1, 1))
        self.add_module("dense_part", dense_part)
        # 全局池化部分
        pool_part = {
            "max_pool": nn.AdaptiveMaxPool1d(1),
            "avg_pool": nn.AdaptiveAvgPool1d(1),
        }
        self._pools = pool_part
        self.add_module("global_pool_part", nn.Sequential(pool_part))
        # 全连接部分
        self.add_module("lin_part", nn.Linear(300, 32*2))
    
    def __call__(self, X):
        return self.forward(X)

    def forward(self, X):
        models = self._modules
        pool_part = models["global_pool_part"]
        Y = models["dense_part"](X)
        feature_vec = torch.cat([pool_part[0](Y), pool_part[1](Y)], dim=1)
        y = models["lin_part"](feature_vec)
        return y.reshape([-1, 32, 2])
    
    def predict(self, X):
        torch.no_grad()
        self.eval()
        Y_hat = self(X)
        y_hat = torch.argmax(Y_hat, dim=len(Y_hat.shape)-1)
        return y_hat

In [7]:
from commpy import modulation
import numpy as np
from commpy import channelcoding

arr = np.random.randint(0, 2, [12])
QAM = modulation.QAMModem(16)
sig = QAM.modulate(arr)
gen74 = channelcoding.cyclic_code_genpoly(7, 4)[0]
gen155 = channelcoding.cyclic_code_genpoly(15, 5)[0]

'x^0 + x^2 + x^5 + x^6 + x^8 + x^9 + x^10 '

In [35]:
from IPython import display
import matplotlib.pyplot as plt
import random
import numpy as np

class Marker:
    def __init__(self):
        self._fig = plt.figure()
        self._ax = self._fig.subplots()
        self._train = [1]
        self._test = [1]


    def update(self, ebr_train, ebr_test):
        self._train.append(ebr_train)
        self._test.append(ebr_test)
        # 数据可视化
        self._ax.cla()
        plt.yscale('log')
        self._ax.set_ylim((10e-8, 1))
        epochs = torch.arange(len(self._train))
        self._ax.plot(epochs, self._train, label='train_ebr', color='blue')
        self._ax.plot(epochs, self._test, label='test_ebr', color='green')
        self._ax.legend(loc="upper left")
        display.display(self._fig)
        display.clear_output(wait=True)


def split_data(data_set, split_rate, shuffle=True):
    feature, label = data_set
    data_num = len(feature)
    index = torch.arange(data_num)
    if shuffle:
        random.shuffle(index)
    split_gap = np.array(split_rate).cumsum()
    split_point = (split_gap * data_num / split_gap[-1]).astype(int)
    index_start = [0, *split_point[:-1]]
    index_end = split_point
    data_parts = []
    for start, end in zip(index_start, index_end):
        i = index[start:end]
        data_parts.append((feature[i], label[i]))
    return data_parts
    

def load_data(data_set, batch_size):
    feature, label = data_set
    data_num = len(feature)
    for start in range(0, data_num, batch_size):
        end = min(start+batch_size, data_num)
        yield feature[start:end], label[start:end]


def run_epoch(net, data_set, batch_size, optimizer=None):
    err_bits = 0
    is_train = optimizer is not None
    for X, y in load_data(data_set, batch_size, is_train):
        X = X.to(net.device)
        y = y.to(net.device)
        if is_train:
            optimizer.zero_grad()
            Y_hat = net(X)
            loss = net.loss(Y_hat, y)
            loss.backward()
            with torch.no_grad():
                optimizer.step()
                y_hat = torch.argmax(Y_hat, dim=len(Y_hat.shape)-1)
        else:
            y_hat = net.predict(X)
        err_bits += torch.sum(~(y_hat == y))        
    return err_bits.cpu() / len(data_set)


def train(net, data_set, batch_size, lr, lr_gain, epochs, steady_epochs, momentum):
    steady_epoch = 0
    marker = Marker()
    for epoch in range(epochs):
        train_set, valid_set = split_data(data_set, [8, 2])
        if steady_epoch >= steady_epochs:
            lr *= lr_gain
            steady_epoch = 0
        optimizer = torch.optim.SGD(net.parameters(), lr, momentum)
        # 训练和验证
        ebr_train = run_epoch(net, train_set, batch_size, optimizer)
        ebr_valid = run_epoch(net, valid_set, batch_size)
        # 更新图像
        marker.update(ebr_train, ebr_valid)
        # 记一次lr稳定的epoch
        steady_epoch += 1



In [5]:
import torch
torch.arange(12*3*4).reshape([12,3,4]).split(2, dim=0)[1]

tensor([[[24, 25, 26, 27],
         [28, 29, 30, 31],
         [32, 33, 34, 35]],

        [[36, 37, 38, 39],
         [40, 41, 42, 43],
         [44, 45, 46, 47]]])