# 基于深度学习用于高阶调制的极化码译码器

In [310]:
import numpy as np
from keras.models import Sequential
from keras.layers.core import Dense, Lambda
from keras import backend as K
import matplotlib.pyplot as plt
%matplotlib inline

# Parameters

In [311]:
k = 6                       # 信息位
N = 32                      # 码长
E = 1                       # 发送功率
modulate = 'BPSK'           # 调制方式('4QAM' or 'BPSK' or '4ASK')
train_SNR_Eb = 1            # 信噪比
nb_epoch = 2**16            # 训练轮数
design = [128, 64, 32]      # 每个隐含层的节点数
batch_size = 256            # 训练样本的批数
optimizer = 'adam'          # 优化器，调整每个节点权重的方法
loss = 'mse'                # 损失函数

train_SNR_Es = train_SNR_Eb + 10*np.log10(k/N)          #符号信噪比，指的是每个符号的能量与噪声功率谱密度之比
train_sigma = np.sqrt(1/(2*10**(train_SNR_Es/10)))      #噪声功率

# Define NN model

In [312]:
# 添加噪声
def addNoise(x, sigma):
    w = K.random_normal(K.shape(x), mean=0.0, stddev=sigma)
    return x + w

# 误码率
def ber(y_true, y_pred):
    return K.mean(K.not_equal(y_true, K.round(y_pred)))

# 输出张量形状
def return_output_shape(input_shape):
    return input_shape

# 构造模型
def compose_model(layers):
    model = Sequential()
    for layer in layers:
        model.add(layer)
    return model

# 统计误比特数
def errors(y_true, y_pred):
    return K.sum(K.cast(K.not_equal(y_true, K.round(y_pred)),dtype = 'int32'))

In [313]:
if modulate == '4ASK':
    inputShape = N>>1
else:
    inputShape = N

# 定义噪声层
noise_layers = [Lambda(addNoise, arguments={'sigma':train_sigma},
                       input_shape=(inputShape,), output_shape=return_output_shape, name="noise")]
noise = compose_model(noise_layers)
noise.compile(optimizer=optimizer, loss=loss)

# 定义解码层
decoder_layers = [Dense(design[0], activation='relu', input_shape=(inputShape,))]
for i in range(1,len(design)):
    decoder_layers.append(Dense(design[i], activation='relu'))
decoder_layers.append(Dense(k, activation='sigmoid'))
decoder = compose_model(decoder_layers)
decoder.compile(optimizer=optimizer, loss=loss, metrics=[errors])

# 定义训练模型
model_layers = noise_layers+decoder_layers
model = compose_model(model_layers)
model.compile(optimizer=optimizer, loss=loss, metrics=[ber])

# Data Generation

In [314]:
# 半加器
def half_adder(a,b):
    s = a ^ b
    c = a & b
    return s,c

# 全加器
def full_adder(a,b,c):
    s = (a ^ b) ^ c
    c = (a & b) | (c & (a ^ b))
    return s,c

# a + b，数据类型为bool
def add_bool(a,b):
    if len(a) != len(b):
        raise ValueError('arrays with different length')
    k = len(a)
    s = np.zeros(k,dtype=bool)
    c = False
    for i in reversed(range(0,k)):
        s[i], c = full_adder(a[i],b[i],c)    
    if c:
        warnings.warn("Addition overflow!")
    return s

# a + 1，数据类型为bool
def inc_bool(a):
    k = len(a)
    increment = np.hstack((np.zeros(k-1,dtype=bool), np.ones(1,dtype=bool)))
    a = add_bool(a,increment)
    return a

# 比特翻转
def bitrevorder(x):
    m = np.amax(x)
    n = np.ceil(np.log2(m)).astype(int)
    for i in range(0,len(x)):
        x[i] = int('{:0{n}b}'.format(x[i],n=n)[::-1],2)  
    return x

# AWGN信道构造极化码
def polar_design_awgn(N, k, design_snr_dB):
    S = 10**(design_snr_dB/10)
    z0 = np.zeros(N)
    z0[0] = np.exp(-S)
    for j in range(1,int(np.log2(N))+1):
        u = 2**j
        for t in range(0,int(u/2)):
            T = z0[t]
            z0[t] = 2*T - T**2     # upper channel
            z0[int(u/2)+t] = T**2  # lower channel
    # 对巴氏参数排序
    idx = np.argsort(z0)
    # 选择最好的k个信道
    idx = np.sort(bitrevorder(idx[0:k]))
    A = np.zeros(N, dtype=bool)
    A[idx] = True
    return A

# 迭代生成Polar码
def polar_transform_iter(u):
    N = len(u)
    n = 1
    x = np.copy(u)
    stages = np.log2(N).astype(int)
    for s in range(0,stages):
        i = 0
        while i < N:
            for j in range(0,n):
                idx = i+j
                x[idx] = x[idx] ^ x[idx+n]
            i=i+2*n
        n=2*n
    return x

In [315]:
# 生成所有可能的信息序列
d = np.zeros((2**k,k),dtype=bool)
for i in range(1,2**k):
    d[i]= inc_bool(d[i-1])

# 生成所有可能的码字
A = polar_design_awgn(N, k, design_snr_dB=1)  # logical vector indicating the nonfrozen bit locations
x = np.zeros((2**k, N),dtype=bool)
if modulate == '4ASK':
    s = np.zeros((2**k, N>>1),dtype=float)
else:
    s = np.zeros((2**k, N),dtype=float)
u = np.zeros((2**k, N),dtype=bool)
u[:,A] = d

#调制
if modulate == '4ASK':
    for i in range(0,2**k):
        x[i] = polar_transform_iter(u[i])
        for j in range(0,N>>1):
            if x[i][2*j] == 0 & x[i][2*j+1] == 0:
                s[i][j] = -3*np.sqrt(E/5)
            elif x[i][2*j] == 1 & x[i][2*j+1] == 0:
                s[i][j] = -1*np.sqrt(E/5)
            elif x[i][2*j] == 1 & x[i][2*j+1] == 1:
                s[i][j] =  1*np.sqrt(E/5)
            else:
                s[i][j] =  3*np.sqrt(E/5)

elif modulate == '4QAM':
    for i in range(0,2**k):
        x[i] = polar_transform_iter(u[i])
        for j in range(0,N>>1):
            if x[i][2*j] == 0 & x[i][2*j+1] == 0:
                s[i][2*j] = np.sqrt(E)/np.sqrt(2)
                s[i][2*j+1] = np.sqrt(E)/np.sqrt(2)
            elif x[i][2*j] == 1 & x[i][2*j+1] == 0:
                s[i][2*j] = -np.sqrt(E)/np.sqrt(2)
                s[i][2*j+1] = np.sqrt(E)/np.sqrt(2)
            elif x[i][2*j] == 1 & x[i][2*j+1] == 1:
                s[i][2*j] = -np.sqrt(E)/np.sqrt(2)
                s[i][2*j+1] = -np.sqrt(E)/np.sqrt(2)
            else:
                s[i][2*j] = np.sqrt(E)/np.sqrt(2)
                s[i][2*j+1] = -np.sqrt(E)/np.sqrt(2)
else:
    for i in range(0,2**k):
        x[i] = polar_transform_iter(u[i])
    s = (-2*x + 1)*E




# Train Neural Network

In [316]:
model.summary()
history = model.fit(s, d, batch_size=batch_size, epochs=nb_epoch, verbose=0, shuffle=True)

Model: "sequential_105"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
noise (Lambda)               (None, 16)                0         
_________________________________________________________________
dense_137 (Dense)            (None, 128)               2176      
_________________________________________________________________
dense_138 (Dense)            (None, 64)                8256      
_________________________________________________________________
dense_139 (Dense)            (None, 32)                2080      
_________________________________________________________________
dense_140 (Dense)            (None, 8)                 264       
Total params: 12,776
Trainable params: 12,776
Non-trainable params: 0
_________________________________________________________________


# Test NN

In [317]:
test_batch = 1000  
num_words = 100000
SNR_dB_start_Eb = 0
SNR_dB_stop_Eb = 5
SNR_points = 20

In [318]:
SNR_dB_start_Es = SNR_dB_start_Eb + 10*np.log10(k/N)
SNR_dB_stop_Es = SNR_dB_stop_Eb + 10*np.log10(k/N)

sigma_start = np.sqrt(1/(2*10**(SNR_dB_start_Es/10)))
sigma_stop = np.sqrt(1/(2*10**(SNR_dB_stop_Es/10)))

sigmas = np.linspace(sigma_start, sigma_stop , SNR_points)

nb_errors = np.zeros(len(sigmas),dtype=int)
nb_bits = np.zeros(len(sigmas),dtype=int)

for i in range(0,len(sigmas)):

    for ii in range(0,np.round(num_words/test_batch).astype(int)):
        
        # 信源
        np.random.seed(0)
        d_test = np.random.randint(0,2,size=(test_batch,k)) 

        # 编码
        x_test = np.zeros((test_batch, N),dtype=bool)
        if modulate == '4ASK':
            s_test = np.zeros((test_batch, N>>1),dtype=float)
        else:
            s_test = np.zeros((test_batch, N),dtype=float)
        u_test = np.zeros((test_batch, N),dtype=bool)
        u_test[:,A] = d_test

        for iii in range(0,test_batch):
            x_test[iii] = polar_transform_iter(u_test[iii])

        # 调制
        if modulate == '4QAM':
            for iii in range(0,test_batch):
                for j in range(0,N>>1):
                    if x_test[iii][2*j] == 0 & x_test[iii][2*j+1] == 0:
                        s_test[iii][2*j] = np.sqrt(E)/np.sqrt(2)
                        s_test[iii][2*j+1] = np.sqrt(E)/np.sqrt(2)
                    elif x_test[iii][2*j] == 1 & x_test[iii][2*j+1] == 0:
                        s_test[iii][2*j] = -np.sqrt(E)/np.sqrt(2)
                        s_test[iii][2*j+1] = np.sqrt(E)/np.sqrt(2)
                    elif x_test[iii][2*j] == 1 & x_test[iii][2*j+1] == 1:
                        s_test[iii][2*j] = -np.sqrt(E)/np.sqrt(2)
                        s_test[iii][2*j+1] = -np.sqrt(E)/np.sqrt(2)
                    else:
                        s_test[iii][2*j] = np.sqrt(E)/np.sqrt(2)
                        s_test[iii][2*j+1] = -np.sqrt(E)/np.sqrt(2)

        elif modulate == '4ASK':
            for iii in range(0,test_batch):
                for j in range(0,N>>1):
                    if x_test[iii][2*j] == 0 & x_test[iii][2*j+1] == 0:
                        s_test[iii][j] = -3*np.sqrt(E/5)
                    elif x_test[iii][2*j] == 1 & x_test[iii][2*j+1] == 0:
                        s_test[iii][j] = -1*np.sqrt(E/5)
                    elif x_test[iii][2*j] == 1 & x_test[iii][2*j+1] == 1:
                        s_test[iii][j] = 1*np.sqrt(E/5)
                    else:
                        s_test[iii][j] = 3*np.sqrt(E/5)
        else:
            s_test = (-2*x_test + 1)*E

        # 信道 (AWGN)
        y_test = s_test + sigmas[i]*np.random.standard_normal(s_test.shape)

        # 译码
        nb_errors[i] += decoder.evaluate(y_test, d_test, batch_size=test_batch, verbose=0)[1]
        nb_bits[i] += d_test.size

# Plot Bit-Error-Rate

In [None]:
legend = []
# np.savetxt('C:/Users/Yu/PycharmProjects/pythonProject1/n16_k8_bpsk_E1.txt', np.vstack((10*np.log10(1/(2*sigmas**2)) - 10*np.log10(k/N), nb_errors/nb_bits)).T)
plt.plot(10*np.log10(1/(2*sigmas**2)) - 10*np.log10(k/N), nb_errors/nb_bits)
legend.append('NN')
plt.legend(legend, loc=3)
plt.yscale('log')
plt.xlabel('$E_b/N_0$')
plt.ylabel('BER')    
plt.grid(True)
plt.show()