# torch version

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Function
from my_code.mysionna.channel.torch_version.utils import expand_to_rank
import numpy as np

import tensorflow as tf


from sionna.constants import GLOBAL_SEED_NUMBER 

class CustomOperations:
    
    class CustomXOR(Function):
        @staticmethod
        def forward(ctx, a, b):
            ctx.save_for_backward(a, b)
            if a.dtype in (torch.uint8, torch.int8, torch.int16, torch.int32, torch.int64):
                z = (a + b) % 2
            else:
                z = torch.abs(a - b)
            return z

        @staticmethod
        def backward(ctx, grad_output):
            return grad_output, grad_output
    # STEBinarizer（Straight-Through Estimator Binarizer）是一种在神经网络中用于处理二值化操作的技术。
    # STE代表Straight-Through Estimator，它是一种用于在反向传播中处理不可微操作的技术。
    class STEBinarizer(Function):
        @staticmethod
        def forward(ctx, x):
            ctx.save_for_backward(x)
            z = torch.where(x < 0.5, torch.tensor(0.0, device=x.device), torch.tensor(1.0, device=x.device))
            return z

        @staticmethod
        def backward(ctx, grad_output):
            return grad_output
    
    class SampleErrors(torch.nn.Module):
        def __init__(self, eps=1e-10, temperature=1.0):
            super().__init__()
            self._eps = eps
            self._temperature = temperature

        def forward(self, pb, shape):
#            torch.manual_seed(2023)
#            torch.cuda.manual_seed(2023)
#            u1 = torch.rand(shape, dtype=torch.float32)
#            u2 = torch.rand(shape, dtype=torch.float32)
            tf.random.set_seed(GLOBAL_SEED_NUMBER)
            u1_tf=tf.random.uniform(shape, minval=0, maxval=1, dtype=tf.float32)
            u2_tf=tf.random.uniform(shape, minval=0, maxval=1, dtype=tf.float32)
            u1_np=u1_tf.numpy()
            u2_np=u2_tf.numpy()
            u1=torch.from_numpy(u1_np)
            u2=torch.from_numpy(u2_np)

            u = torch.stack((u1, u2), dim=-1)

            # 采样Gumbel分布
            q = -torch.log(-torch.log(u + self._eps) + self._eps)
            p = torch.stack((pb, 1 - pb), dim=-1).unsqueeze(1).expand(shape[0], shape[1], 2)
            a = (torch.log(p + self._eps) + q) / self._temperature

            # 应用softmax
            e_cat = F.softmax(a, dim=-1)

            # 通过直通估计器对最终值进行二值化
            return CustomOperations.STEBinarizer.apply(e_cat[..., 0])

class BinaryMemorylessChannel(nn.Module):
    def __init__(self, return_llrs=False, bipolar_input=False, llr_max=100., dtype=torch.float32, **kwargs):
        super(BinaryMemorylessChannel,self).__init__(**kwargs)

        assert isinstance(return_llrs, bool), "return_llrs must be bool."
        self._return_llrs = return_llrs

        assert isinstance(bipolar_input, bool), "bipolar_input must be bool."
        self._bipolar_input = bipolar_input

        assert llr_max >= 0., "llr_max must be a positive scalar value."
        self.llr_max = llr_max
        self.dtype = dtype

        if self._return_llrs:
            assert dtype in (torch.float16, torch.float32, torch.float64), \
                "LLR outputs require non-integer dtypes."
        else:
            if self._bipolar_input:
                assert dtype in (torch.float16, torch.float32, torch.float64,
                                 torch.int8, torch.int16, torch.int32, torch.int64), \
                    "Only signed dtypes are supported for bipolar inputs."
            else:
                assert dtype in (torch.float16, torch.float32, torch.float64,
                                 torch.uint8, torch.int16, torch.int32, torch.int64,
                                 torch.int8, torch.int16, torch.int32, torch.int64), \
                    "Only real-valued dtypes are supported."

        self.check_input = True  # check input for consistency (i.e., binary)

        self._eps = 1e-9  # small additional term for numerical stability
        self.temperature = torch.tensor(0.1, dtype=torch.float32)  # for Gumble-softmax

    @property
    def llr_max(self):
        """Maximum value used for LLR calculations."""
        return self._llr_max

    @llr_max.setter
    def llr_max(self, value):
        """Maximum value used for LLR calculations."""
        assert value >= 0, 'llr_max cannot be negative.'
        self._llr_max = value

    @property
    def temperature(self):
        """Temperature for Gumble-softmax trick."""
        return self._temperature.item()

    @temperature.setter
    def temperature(self, value):
        """Temperature for Gumble-softmax trick."""
        assert value >= 0, 'temperature cannot be negative.'
        self._temperature = torch.tensor(value, dtype=torch.float32)
    #########################
    # Utility methods
    #########################

    def _check_inputs(self, x):
        """Check input x for consistency, i.e., verify
        that all values are binary of bipolar values."""
        x = x.float()
        if self.check_input:
            if self._bipolar_input:
                assert torch.all(torch.logical_or(x == -1, x == 1)), "Input must be bipolar {-1, 1}."
            else:
                assert torch.all(torch.logical_or(x == 0, x == 1)), "Input must be binary {0, 1}."
            # input datatype consistency should be only evaluated once
            self.check_input = False

    # 使用方法
    @staticmethod
    def custom_xor(a, b):
        return CustomOperations.CustomXOR.apply(a, b)       

    @staticmethod
    def _ste_binarizer(x):
        """Straight through binarizer to quantize bits to int values."""
        return CustomOperations.STEBinarizer.apply(x)

    def _sample_errors(self, pb, shape):
        """Samples binary error vector with given error probability e.
        This function is based on the Gumble-softmax "trick" to keep the
        sampling differentiable."""
#        torch.manual_seed(2023)
#        torch.cuda.manual_seed(2023)
#        u1 = torch.rand(shape)
#        u2 = torch.rand(shape)
        
        tf.random.set_seed(GLOBAL_SEED_NUMBER)
        u1_tf=tf.random.uniform(shape, minval=0, maxval=1, dtype=tf.float32)
        u2_tf=tf.random.uniform(shape, minval=0, maxval=1, dtype=tf.float32)
        u1_np=u1_tf.numpy()
        u2_np=u2_tf.numpy()
        u1=torch.from_numpy(u1_np)
        u2=torch.from_numpy(u2_np)

        u = torch.stack((u1, u2), dim=-1)

        # sample Gumble distribution
        q = -torch.log(-torch.log(u + self._eps) + self._eps)
        p = torch.stack((pb, 1 - pb), dim=-1)
        p = p.unsqueeze(0).expand(q.shape)
        a = (torch.log(p + self._eps) + q) / self.temperature

        # apply softmax
        e_cat = F.softmax(a, dim=-1)

        # binarize final values via straight-through estimator
        return self._ste_binarizer(e_cat[..., 0])  # only take the first class
    
    #########################
    # Keras layer functions
    #########################

    # 这段代码定义了一个 build 方法了，用于验证输入的形状是否正确
    # 它主要检查第二个输入（错误概率 pb）的形状，确保其最后一维的长度为 2

    def build(self, input_shapes):
        """Verify correct input shapes"""

        pb_shapes = input_shapes[1]
        # allow tuple of scalars as alternative input
        if isinstance(pb_shapes, (tuple, list)):
            if not len(pb_shapes) == 2:
                raise ValueError("Last dim of pb must be of length 2.")
        else:
            if len(pb_shapes) > 0:
                if not pb_shapes[-1] == 2:
                    raise ValueError("Last dim of pb must be of length 2.")
            else:
                raise ValueError("Last dim of pb must be of length 2.")
            
    def forward(self, inputs):
        """Apply discrete binary memoryless channel to inputs."""

        x, pb = inputs

        # allow pb to be a tuple of two scalars
        if isinstance(pb, (tuple, list)):
            pb0 = pb[0]
            pb1 = pb[1]
        else:
            pb0 = pb[...,0]
            pb1 = pb[...,1]
        
        # 假设pb0和pb1是PyTorch张量
        pb0 = pb0.float()  # 确保pb0是浮点数
        pb1 = pb1.float()  # 确保pb1是浮点数
        pb0 = torch.clamp(pb0, 0., 1.)  # 将pb0的值限制在0和1之间
        pb1 = torch.clamp(pb1, 0., 1.)  # 将pb1的值限制在0和1之间

        # check x for consistency (binary, bipolar)
        self._check_inputs(x)

        e0 = self._sample_errors(pb0,x.shape)
        e1 = self._sample_errors(pb1, x.shape)

        if self._bipolar_input:
            neutral_element = torch.tensor(-1, dtype=x.dtype)
        else:
            neutral_element = torch.tensor(0, dtype=x.dtype)    

        # mask e0 and e1 with input such that e0 only applies where x==0    
        e = torch.where(x == neutral_element, e0, e1)
        e = e.to(dtype=x.dtype)

        if self._bipolar_input:
            # flip signs for bipolar case
            y = x * (-2*e + 1)
        else:
            # XOR for binary case
            y = self.custom_xor(x, e)

        # if LLRs should be returned
        if self._return_llrs:
            if not self._bipolar_input:
                y = 2 * y - 1  # transform to bipolar
            # Remark: Sionna uses the logit definition log[p(x=1)/p(x=0)]
            # 计算LLRs的组成部分
            y0 = -(torch.log(pb1 + self._eps) - torch.log(1 - pb0 - self._eps))
            y1 = (torch.log(1 - pb1 - self._eps) - torch.log(pb0 + self._eps))

            # multiply by y to keep gradient
            # 使用torch.where实现条件选择
            y = torch.where(y == 1, y1, y0).to(dtype=y.dtype) * y

            # and clip output llrs
            # 将LLR的值限制在范围内
            y = torch.clamp(y, min=-self._llr_max, max=self._llr_max)        

        return y

class BinarySymmetricChannel(BinaryMemorylessChannel):
    def __init__(self,return_llrs=False,bipolar_input=False,llr_max=100.,dtype=torch.float32,**kwargs):
        #继承父类的__init__()
        super().__init__(return_llrs=return_llrs,
                         bipolar_input=bipolar_input,
                         llr_max=llr_max,
                         dtype=dtype,
                         **kwargs)
    #########################
    # Keras layer functions
    #########################

    def build(self,input_shapes):
        """"Verify correct input shapes"""
        pass # nothing to verify here
    def forward(self,inputs):
        """Apply discrete binary symmetric channel, i.e., randomly flip
        bits with probability pb."""
        """"应用离散二进制对称信道,即以pb概率随机翻转位"""

        x,pb = inputs

        # the BSC is implemented by calling the DMC with symmetric pb
        # BSC（二元对称信道）是通过使用对称的pb（比特翻转概率）调用DMC（离散记忆少信道）来实现的
        # 这里的“对称的pb”意味着信道的翻转概率p对于输入0和1是相同的，即信道以相同的概率将输入0翻转为1，或将输入1翻转为0。

        """"在二元对称信道(BSC)中,通常有两种状态:输入0被保持为0,或被翻转为1;输入1被保持为1,或被翻转为0。
        如果翻转概率p对于两种输入都是相同的,那么信道就是对称的。

        在实现上,可以构建一个离散记忆少信道(DMC),并设置其状态转移概率矩阵为对称的,以此来模拟BSC的行为。

        在数学上,如果用p表示翻转概率,那么BSC的状态转移概率可以表示为:
        从状态0(输入0)翻转到状态1的概率是p   从状态1(输入1)翻转到状态0的概率也是p  保持原始状态的概率是1-p"""
        pb = pb.to(x.dtype)
        pb = torch.stack((pb,pb), dim=-1)
        y = super().forward((x,pb))

        return y

class BinaryZChannel(BinaryMemorylessChannel):
    def __init__(self, return_llrs=False, bipolar_input=False,llr_max=100.,dtype=torch.float32, **kwargs):

        super().__init__(return_llrs=return_llrs,
                         bipolar_input=bipolar_input,
                         llr_max=llr_max,
                         dtype=dtype,
                         **kwargs)
    #########################
    # Keras layer functions
    #########################
    def build(self, input_shapes):
        """Verify correct input shapes"""
        pass # nothing to verify here

    def forward(self, inputs):
        """Apply discrete binary symmetric channel, i.e., randomly flip
        bits with probability pb."""

        x, pb = inputs

        # the Z is implemented by calling the DMC with p(1|0)=0
        pb = pb.to(x.dtype)
        pb = torch.stack((torch.zeros_like(pb), pb), dim=-1)
        y = super().forward((x, pb))

        return y        
  

class BinaryErasureChannel(BinaryMemorylessChannel):
    def __init__(self, return_llrs=False, bipolar_input=False, llr_max=100, dtype=torch.float32):
        super().__init__()
        self.return_llrs = return_llrs
        self.bipolar_input = bipolar_input
        self.llr_max = llr_max
        self.dtype = dtype

        assert dtype in (torch.float16, torch.float32, torch.float64,
                         torch.int8, torch.int16, torch.int32, torch.int64), \
               "Unsigned integers are currently not supported."
    #########################
    # Keras layer functions
    #########################

    def forward(self, inputs):

        x,pb = inputs


        # Example validation of input x
        if not self.bipolar_input:
            assert torch.all((x == 0) | (x == 1)), "Input x must be binary (0 or 1)."
        else:
            assert torch.all((x == -1) | (x == 1)), "Input x must be bipolar (-1 or 1)."

        # Example validation of pb
        # clip for numerical stability
        pb = pb.float().clamp(0., 1.)

        # sample erasure pattern
        e = self._sample_errors(pb, x.size())

        # if LLRs should be returned
        # remark: the Sionna logit definition is llr = log[p(x=1)/p(x=0)]
        if self.return_llrs:
            if not self.bipolar_input:
                x = 2 * x - 1
            x = x.to(torch.float32) * self.llr_max  # calculate llrs

            # erase positions by setting llrs to 0
            y = torch.where(e == 1, torch.tensor(0, dtype=torch.float32), x)
        else:
            if self.bipolar_input:
                erased_element = torch.tensor(0, dtype=x.dtype) 
            else:
                erased_element=torch.tensor(-1, dtype=x.dtype)
            y = torch.where(e == 0, x, erased_element)

        return y




2024-07-13 17:42:18.152895: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-07-13 17:42:18.152928: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-07-13 17:42:18.154659: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-07-13 17:42:18.163402: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
# 测试 BinaryMemorylessChannel 用例
def test_binary_memoryless_channel():
    # 定义更复杂的输入数据
    x = torch.tensor([[0, 1, 0, 1], [1, 0, 1, 0], [0, 0, 1, 1], [1, 1, 0, 0]], dtype=torch.float32)
    pb = torch.tensor([[0.1, 0.9], [0.5, 0.5], [0.2, 0.8], [0.8, 0.2]], dtype=torch.float32)  # 各种不同的概率

    # 创建 BinaryMemorylessChannel 实例
    bmc = BinaryMemorylessChannel(return_llrs=True, bipolar_input=False, llr_max=100.)

    # 执行前向传播
    y = bmc((x, pb))

    # 打印输入和输出
    print("输入数据 x:")
    print(x)
    print("\n错误概率 pb:")
    print(pb)
    print("\n输出数据 y:")
    print(y)

# 运行测试用例
test_binary_memoryless_channel()


输入数据 x:
tensor([[0., 1., 0., 1.],
        [1., 0., 1., 0.],
        [0., 0., 1., 1.],
        [1., 1., 0., 0.]])

错误概率 pb:
tensor([[0.1000, 0.9000],
        [0.5000, 0.5000],
        [0.2000, 0.8000],
        [0.8000, 0.2000]])

输出数据 y:
tensor([[0., 0., 0., 0.],
        [0., 0., 0., 0.],
        [0., 0., 0., 0.],
        [0., 0., 0., 0.]])


  self._temperature = torch.tensor(value, dtype=torch.float32)
2024-07-13 17:42:23.022631: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1929] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 1050 MB memory:  -> device: 0, name: NVIDIA GeForce RTX 3080, pci bus id: 0000:09:00.0, compute capability: 8.6


In [3]:
# 测试 BinarySymmetricChannel 类
# 定义输入数据
x = torch.tensor([[0, 1, 0, 1], [1, 0, 1, 0]], dtype=torch.float32)
pb = torch.tensor(0.37, dtype=torch.float32)  # 10% 概率翻转

# 创建 BinarySymmetricChannel 实例
bsc = BinarySymmetricChannel()

# 执行前向传播
y = bsc.forward((x, pb))

# 打印输入和输出
print("输入数据 x:")
print(x)
print("\n翻转概率 pb:")
print(pb)
print("\n输出数据 y:")
print(y)



输入数据 x:
tensor([[0., 1., 0., 1.],
        [1., 0., 1., 0.]])

翻转概率 pb:
tensor(0.3700)

输出数据 y:
tensor([[0., 0., 0., 0.],
        [1., 1., 0., 0.]])


  self._temperature = torch.tensor(value, dtype=torch.float32)


In [4]:
# 测试 BinaryZChannel 类
# 定义更复杂的输入数据
x = torch.tensor([[0, 1, 0, 1], [1, 0, 1, 0], [0, 0, 1, 1], [1, 1, 0, 0]], dtype=torch.float32)
pb = torch.tensor([0.1, 0.3, 0.2, 0.8], dtype=torch.float32)  # 各种不同的概率

# 创建 BinaryZChannel 实例
bzc = BinaryZChannel()

# 执行前向传播
y = bzc.forward((x, pb))

# 打印输入和输出
print("输入数据 x:")
print(x)
print("\n翻转概率 pb:")
print(pb)
print("\n输出数据 y:")
print(y)

输入数据 x:
tensor([[0., 1., 0., 1.],
        [1., 0., 1., 0.],
        [0., 0., 1., 1.],
        [1., 1., 0., 0.]])

翻转概率 pb:
tensor([0.1000, 0.3000, 0.2000, 0.8000])

输出数据 y:
tensor([[0., 0., 0., 0.],
        [1., 0., 0., 0.],
        [0., 0., 1., 0.],
        [1., 1., 0., 0.]])


  self._temperature = torch.tensor(value, dtype=torch.float32)


In [5]:
# 测试 BinaryErasureChannel 类
# Usage example
input_data = torch.tensor([-1, 1, -1, 1, 1], dtype=torch.float32)
pb = torch.tensor(0.48)
channel = BinaryErasureChannel(return_llrs=False, bipolar_input=True)

output = channel((input_data, pb))

# 打印输入和输出
print("输入数据 input_data:")
print(input_data.numpy())
print("\n翻转概率 pb:")
print(pb.numpy())
print("\n输出数据 output:")
print(output.numpy())

输入数据 input_data:
[-1.  1. -1.  1.  1.]

翻转概率 pb:
0.48

输出数据 output:
[-1.  0. -1.  0.  1.]


  self._temperature = torch.tensor(value, dtype=torch.float32)


# BinaryMemorylessChannel类

`BinaryMemorylessChannel`类是一个离散的二进制无记忆通道，具有（可能的）不对称比特翻转概率。

输入的比特以概率 \( p_\text{b,0} \) 和 \( p_\text{b,1} \) 分别翻转。

这个层支持二进制输入（\( x \in \{0, 1\} \)）和双极性输入（\( x \in \{-1, 1\} \)）。

如果激活，该通道直接返回对数似然比（LLRs），定义如下：

\[
\ell =
\begin{cases}
    \operatorname{log} \frac{p_{b,1}}{1-p_{b,0}}, \qquad \text{如果} \, y=0 \\
    \operatorname{log} \frac{1-p_{b,1}}{p_{b,0}}, \qquad \text{如果} \, y=1 \\
\end{cases}
\]

错误概率 \( p_\text{b}\) 可以是标量或张量（可广播到输入的形状）。这允许每个位位置有不同的擦除概率。在任何情况下，它的最后一个维度的长度必须为2，并被解释为 \( p_\text{b,0} \) 和 \( p_\text{b,1} \)。

这个类继承自Keras的`Layer`类，可以在Keras模型中用作层。

## 参数

- `return_llrs`：布尔类型，默认值为 `False`。如果设置为 `True`，则该层返回基于 `pb` 的对数似然比（LLRs）而不是二进制值。

- `bipolar_input`：布尔类型，默认值为 `False`。如果设置为 `True`，则期望的输入是 \( \{-1,1\} \) 而不是 \( \{0,1\} \)。

- `llr_max`：`tf.float`类型，默认值为100。定义LLRs的裁剪值。

- `dtype`：`tf.DType`类型，定义内部计算和输出的类型。默认值为 `tf.float32`。

## 输入

- `(x, pb)`：
  - `x`：形状为 [...,n] 的 `tf.float32` 类型的输入序列，包含二进制值 \( \{0,1\} \) 或 \( \{-1,1\} \)。
  - `pb`：形状为 [...,2] 的 `tf.float32` 类型的错误概率。可以是两个标量的元组或任何可以广播到 `x` 形状的形状。它有一个额外的最后维度，解释为 \( p_\text{b,0} \) 和 \( p_\text{b,1} \)。

## 输出

- 形状为 [...,n] 的 `tf.float32` 类型的输出序列，长度与输入 `x` 相同。如果 `return_llrs` 为 `False`，则输出是三元的，其中 `-1` 和 `0` 分别表示二进制和双极性输入的擦除。

### 详细解释 `Gumbel-Softmax` 机制

`Gumbel-Softmax` 是一种用于从离散分布中采样的近似方法，特别是在需要梯度信息的情况下。这在机器学习和深度学习中非常有用，例如在生成模型或离散动作空间的强化学习中。

#### Gumbel 分布

首先，`Gumbel-Softmax` 的基础是 Gumbel 分布。Gumbel 分布是一种极值分布，通常用于建模最大值或最小值的分布。其概率密度函数 (PDF) 为：

$$
f(x) = e^{-(x + e^{-x})}
$$

Gumbel 分布的一个重要性质是其用于最大值采样时的重参数化技巧。

#### Gumbel-Max 采样

给定一个类别分布 \(\pi = (\pi_1, \pi_2, \ldots, \pi_k)\)，我们可以使用 Gumbel-Max 采样方法来从中采样一个类别：

$$
\text{sample} = \arg\max_i \left( \log(\pi_i) + g_i \right)
$$

其中 $(g_i)$是从 Gumbel(0,1) 分布中采样的随机变量。

#### Gumbel-Softmax

然而，Gumbel-Max 采样是离散的，不适合梯度优化。因此，Gumbel-Softmax 引入了一个温度参数 $(\tau)$，并将采样过程近似为：

$$
y_i = \frac{\exp\left((\log(\pi_i) + g_i) / \tau \right)}{\sum_{j=1}^k \exp\left((\log(\pi_j) + g_j) / \tau \right)}
$$

当 $(\tau \to 0)$ 时，Gumbel-Softmax 分布趋近于离散分布。当 $(\tau \to \infty)$ 时，分布变得更加均匀。通过调整 $(\tau)$，我们可以控制采样的离散程度。

### `Gumbel-Softmax` 在 `BinaryMemorylessChannel` 中的应用

在 `BinaryMemorylessChannel` 类的初始化过程中，设置 `self._temperature = tf.constant(0.1, tf.float32)` 是为了在某些计算中使用 `Gumbel-Softmax` 机制。具体来说，`Gumbel-Softmax` 可以用于模拟离散的二元信道，同时保持对梯度的支持。这对于神经网络的训练非常重要，因为它允许使用反向传播来优化参数。

### 总结

通过上述初始化和检查步骤，`BinaryMemorylessChannel` 类确保输入参数有效，并设置实例变量以支持后续的计算和操作。其中，`Gumbel-Softmax` 机制的引入，使得在处理离散信道采样时，能够有效地进行梯度优化，从而提高模型的性能和训练效率。

## 关于STE
[pytorch实现简单的straight-through estimator(STE)](https://segmentfault.com/a/1190000020993594)

现在深度学习中一般我们学习的参数都是连续的，因为这样在反向传播的时候才可以对梯度进行更新。但是有的时候我们也会遇到参数是离>散的情况，这样就没有办法进行反向传播了，比如二值神经网络。本文中讲解了如何用`pytorch`对二值化的参数进行梯度更新的`straight-through estimator`算法。
### Question
`STE`核心的思想就是我们的参数初始化的时候就是`float`这样的连续值，当我们`forward`的时候就将原来的连续的参数映射到{-1,, 1}带入到网络进行计算，这样就可以计算网络的输出。然后`backward`的时候直接对原来`float`的参数进行更新，而不是对二值化的参数更新。这样可以完成对整个网络的更新了。
首先我们对上面问题进行一下数学的讲解。

- 我们希望参数的范围是$r \in \mathbb{R}$
- 我们可以得到二值化的参数 $q = Sign(r)$, $Sign$函数可以参考`torch.sign`函数, 可以理解为取符号函数
- `backward`的过程中对$q$求梯度可得 $\frac{\partial loss}{\partial q}$
- 对于$\frac{\partial q}{\partial r} = 0$, 所以可以得出 $\frac{\partial loss}{\partial r} = 0$, 这样的话我们就无法完成对参>数的更新，因为每次`loss`对`r`梯度都是0
- 所以`backward`的过程我们需要修改$\frac{\partial q}{\partial r}$这部分才可以使梯度继续更新下去，所以对$\frac{\partial loss}{\partial r}$进行如下修改: $\frac{\partial q}{\partial r} = \frac{\partial loss}{\partial q} * 1\_{|r| \leq 1}$, 其中
$1\_{|r| \leq 1}$ 可以看作$Htanh(x) = Clip(x, -1, 1) = max(-1, min(1, x))$对$x$的求导过程, 也就是是说:
$$ \frac{\partial loss}{\partial r} =  \frac{\partial loss}{\partial q} \frac{\partial Htanh}{\partial r}$$

### Example
#### torch.sign
首先我们验证一下使用`torch.sign`会是参数的梯度基本上都是0:

In [31]:
input = torch.randn(4, requires_grad = True)
output = torch.sign(input)
loss = output.mean()
loss.backward()
print(input)
print(input.grad)

tensor([1.0786, 0.3029, 0.5720, 2.0079], requires_grad=True)
tensor([0., 0., 0., 0.])


#### demo
我们需要重写sign这个函数，就好像写一个激活函数一样。先看一下代码：

In [32]:
import torch

class LBSign(torch.autograd.Function):

    @staticmethod
    def forward(ctx, input):
        return torch.sign(input)

    @staticmethod
    def backward(ctx, grad_output):
        return grad_output.clamp_(-1, 1)

接下来我们做一下测试

In [33]:
sign = LBSign.apply
params = torch.randn(4, requires_grad = True)                                                                           
output = sign(params)
loss = output.mean()
loss.backward()
print(params)
print(params.grad)

tensor([0.6866, 0.7378, 0.5175, 0.1240], requires_grad=True)
tensor([0.2500, 0.2500, 0.2500, 0.2500])


### explain
接下来我们对代码就行一下解释[pytorch文档链接](https://pytorch.org/docs/stable/autograd.html#function):

- forward中的参数ctx是保存的上下文信息，input是输入
- backward中的参数ctx是保存的上下文信息，grad_output可以理解成 $\frac{\partial loss}{\partial q}$这一步的梯度信息，

我们需要做的就是让$$grad_Output * \frac{\partial Htanh}{\partial r} $$

而不是让pytorch继续默认的$$ grad_Output * \frac{\partial q}{\partial r} $$

但是我们可以从上面的公式可以看出函数$Htanh$对$x$求导是1， 当$x \in [-1, 1]$，所以程序就可以化简成保留原来的梯度就行了，然后裁剪到其他范围的。


### reference
[torch.autograd.Function](https://pytorch.org/docs/stable/autograd.html#function)

[Binarized Neural Networks: Training Deep Neural Networks with Weights and Activations Constrained to +1 or -1](https://arxiv.org/abs/1602.02830)

[二值网络，围绕STE的那些事儿](https://zhuanlan.zhihu.com/p/72681647)

[Custom binarization layer with straight through estimator gives error](https://discuss.pytorch.org/t/custom-binarization-layer-with-straight-through-estimator-gives-error/4539)

[定义torch.autograd.Function的子类，自己定义某些操作，且定义反向求导函数](https://blog.csdn.net/tsq292978891/article/details/79364140)

# BinarySymmetricChannel
这段代码定义了一个名为 `BinarySymmetricChannel` 的类，它继承自 `BinaryMemorylessChannel` 类，并实现了一个离散的二元对称信道（Binary Symmetric Channel, BSC）。以下是代码的主要组成部分和功能分析：

### 类定义和初始化 (`__init__` 方法)
- `BinarySymmetricChannel` 类接受几个参数：
  - `return_llrs`: 布尔值，默认为 `False`。如果设置为 `True`，则信道返回对数似然比（LLRs）而不是基于 `pb` 的二元值。
  - `bipolar_input`: 布尔值，默认为 `False`。如果设置为 `True`，则预期的输入为 {-1,1} 而不是 {0,1}。
  - `llr_max`: 浮点型，默认为 100.0。定义了 LLRs 的裁剪值。
  - `dtype`: 张量数据类型，默认为 `tf.float32`，用于内部计算和输出数据类型。
- 调用父类 `BinaryMemorylessChannel` 的初始化方法，传递这些参数。

### 构建 (`build` 方法)
- `build` 方法用于验证输入形状是否正确。当前实现中，这个方法什么也不做（`pass`），表示没有特定的形状验证逻辑。

### 调用 (`call` 方法)
- `call` 方法是类的核心，实现了 BSC 的功能：
  - 接收输入 `inputs`，它是一个包含 `x` 和 `pb` 的元组。`x` 是输入序列，`pb` 是比特翻转概率。
  - 将 `pb` 转换为与 `x` 相同的数据类型，并将其堆叠为形状为 [-1, 2] 的张量，这表示 BSC 的翻转概率是对称的（即，翻转和不翻转的概率相同）。
  - 调用父类的 `call` 方法，传入 `(x, pb)`，以应用二元对称信道。

### 其他要点
- 类注释提供了关于 BSC 的详细数学定义和使用场景的说明。
- BSC 支持二元输入和双极输入。
- 如果启用，信道可以直接返回 LLRs，这是在信息论中用于表示输入为 0 或 1 的对数概率比。
- 比特翻转概率 `pb` 可以是标量或张量，允许每个比特位置具有不同的翻转概率。
- 这个类继承自 Keras 的 `Layer` 类，因此可以作为 Keras 模型中的层使用。

### 总结
`BinarySymmetricChannel` 类是一个用于模拟二元对称信道的神经网络层，它可以作为 Keras 模型的一部分。它提供了灵活性，以处理不同的概率分布和输入类型，并可以返回原始的二元输出或 LLRs，这在某些信号处理和通信系统中非常有用。


# BinaryZChannel
这段代码定义了一个名为 `BinaryZChannel` 的类，它继承自 `BinaryMemorylessChannel` 类，并实现了一个离散的二元 Z信道（Binary Z-Channel）。以下是代码的主要组成部分和功能分析：

### 类定义和初始化 (`__init__` 方法)
- `BinaryZChannel` 类接受几个参数：
  - `return_llrs`: 布尔值，默认为 `False`。如果设置为 `True`，则层返回对数似然比（LLRs）而不是基于 `pb` 的二元值。
  - `bipolar_input`: 布尔值，默认为 `False`。如果设为 `True`，则预期的输入为 {-1,1} 而不是 {0,1}。
  - `llr_max`: 浮点型，默认为 100.0。定义了 LLRs 的裁剪值。
  - `dtype`: 张量数据类型，默认为 `tf.float32`，用于内部计算和输出数据类型。
- 调用父类 `BinaryMemorylessChannel` 的构造函数，传递这些参数。

### 构建 (`build` 方法)
- `build` 方法用于验证输入形状是否正确。当前实现中，这个方法什么也不做（`pass`），表示没有特定的形状验证逻辑。

### 调用 (`call` 方法)
- `call` 方法是类的核心，实现了 Z 信道的功能：
  - 接收输入 `inputs`，它是一个包含 `x` 和 `pb` 的元组。`x` 是输入序列，`pb` 是错误概率。
  - 将 `pb` 转换为与 `x` 相同的数据类型，并将其堆叠为形状为 [-1, 2] 的张量，这表示 Z 信道的错误模型，其中第一个元素（`0`）总是被正确接收，而第二个元素（`1`）以概率 `pb` 发生错误。
  - 调用父类的 `call` 方法，传入 `(x, pb)`，以应用二元 Z 信道。

### 其他要点
- 类注释提供了关于 Z 信道的详细数学定义和使用场景的说明。
- Z 信道只对第二个输入元素（即 `1`）发生错误，第一个元素（`0`）总是被正确接收。
- 如果启用，信道可以直接返回 LLRs，这是在信息论中用于表示输入为 `0` 或 `1` 的对数概率比。
- 错误概率 `pb` 可以是标量或张量，允许每个比特位置具有不同的错误概率。
- 这个类继承自 Keras 的 `Layer` 类，因此可以作为 Keras 模型中的层使用。

### 总结
`BinaryZChannel` 类是一个用于模拟二元 Z 信道的神经网络层，它可以作为 Keras 模型的一部分。它提供了灵活性，以处理不同的概率分布和输入类型，并可以返回原始的二元输出或 LLRs，这在某些信号处理和通信系统中非常有用。


# Z信道
Z信道（Z-Channel）是一种理论上的通信信道模型，它具有一些特殊的错误模式。在Z信道中，只有当传输的信息是"1"时才会发生传输错误，而传输"0"总是被正确接收。这种信道模型通常用于信息论、编码理论和数字通信领域的教学和研究，以探讨错误检测和纠正算法的性能。

### Z信道的特点：

1. **"0"总是正确接收**：无论信道条件如何，传输的"0"总是能够被接收方正确识别。

2. **"1"可能发生错误**：传输的"1"有一定的概率被错误地接收为"0"，这个概率通常用`p`表示。

3. **错误概率对称**：在Z信道中，"1"被错误接收为"0"的概率与"0"被错误接收为"1"的概率相同，都是`p`。

4. **对数似然比（LLR）**：在某些应用中，Z信道可以直接返回LLRs，这有助于信道解码器更好地估计传输的信息。LLR是两个事件发生概率的对数比值。

### Z信道的数学定义：

假设有一个简单的Z信道模型，其状态转移可以表示为：

- 发送`0`，接收`0`的概率：`1`
- 发送`1`，接收`0`的概率：`p`
- 发送`1`，接收`1`的概率：`1-p`

### Z信道的应用：

- **信道编码**：研究如何设计编码方案来最小化Z信道中的错误。
- **信号检测**：探讨在Z信道中如何有效地检测和估计传输的信号。
- **性能分析**：评估不同信道编码和解码算法在Z信道条件下的性能。

Z信道由于其简化的错误模型，为理解和设计鲁棒的通信系统提供了一个有用的理论基础。尽管实际的通信信道可能比Z信道复杂得多，但Z信道仍然是一个重要的教学工具，帮助人们理解信道噪声和错误纠正的基本概念。


# BinaryErasureChannel
这段代码实现了一个二元擦除信道（Binary Erasure Channel），用于模拟在传输过程中可能发生的擦除错误。以下是对代码的解释和在 PyTorch 中的改写：

### 代码解释

1. **初始化方法 (`__init__`)**:
   - 初始化函数接受一系列参数，包括是否返回 LLRs (`return_llrs`)、是否使用极性输入 (`bipolar_input`)、LLRs 的最大值 (`llr_max`)、数据类型 (`dtype`) 等。
   - 在初始化过程中，通过断言检查数据类型 `dtype` 是否是支持的类型之一，排除了无符号整数。

2. **`build` 方法**:
   - 在 TensorFlow 的习惯中，`build` 方法通常用于验证输入形状或构建层。这里的 `build` 方法没有实际操作，因为 PyTorch 中的层通常不需要显式构建。

3. **`call` 方法**:
   - `call` 方法实现了如何应用二元擦除信道到输入数据。
   - 首先将概率 `pb` 强制转换为 `tf.float32` 类型，并使用 `tf.clip_by_value` 对其进行了数值稳定性的裁剪，保证在 [0, 1] 范围内。
   - 调用 `_check_inputs` 方法检查输入 `x` 是否符合预期（二元或极性）。
   - 使用 `_sample_errors` 方法根据擦除概率 `pb` 采样生成擦除模式 `e`。
   - 根据 `_return_llrs` 决定是否返回 LLRs。如果是，则计算 LLRs，并将擦除位置的 LLRs 设置为 0；否则，根据 `_bipolar_input` 决定擦除位置的输出值（0 或 -1）。

### 改写成 PyTorch

在 PyTorch 中，可以使用类似的逻辑来实现二元擦除信道。以下是将该代码改写成 PyTorch 的示例：


# tf version

In [68]:
#
# SPDX-FileCopyrightText: Copyright (c) 2021-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
"""Layer for discrete channel models"""

import tensorflow as tf
from tensorflow.keras.layers import Layer
from sionna.utils import expand_to_rank

from sionna.constants import GLOBAL_SEED_NUMBER

class BinaryMemorylessChannel(Layer):
    # pylint: disable=line-too-long
    r"""BinaryMemorylessChannel(return_llrs=False, bipolar_input=False, llr_max=100., dtype=tf.float32, **kwargs)

    Discrete binary memory less channel with (possibly) asymmetric bit flipping
    probabilities.

    Inputs bits are flipped with probability :math:`p_\text{b,0}` and
    :math:`p_\text{b,1}`, respectively.

    ..  figure:: ../figures/BMC_channel.png
        :align: center

    This layer supports binary inputs (:math:`x \in \{0, 1\}`) and `bipolar`
    inputs (:math:`x \in \{-1, 1\}`).

    If activated, the channel directly returns log-likelihood ratios (LLRs)
    defined as

    .. math::
        \ell =
        \begin{cases}
            \operatorname{log} \frac{p_{b,1}}{1-p_{b,0}}, \qquad \text{if} \, y=0 \\
            \operatorname{log} \frac{1-p_{b,1}}{p_{b,0}}, \qquad \text{if} \, y=1 \\
        \end{cases}

    The error probability :math:`p_\text{b}` can be either scalar or a
    tensor (broadcastable to the shape of the input). This allows
    different erasure probabilities per bit position. In any case, its last
    dimension must be of length 2 and is interpreted as :math:`p_\text{b,0}` and
    :math:`p_\text{b,1}`.

    This class inherits from the Keras `Layer` class and can be used as layer in
    a Keras model.

    Parameters
    ----------

    return_llrs: bool
        Defaults to `False`. If `True`, the layer returns log-likelihood ratios
        instead of binary values based on ``pb``.

    bipolar_input : bool, False
        Defaults to `False`. If `True`, the expected input is given as
        :math:`\{-1,1\}` instead of :math:`\{0,1\}`.

    llr_max: tf.float
        Defaults to 100. Defines the clipping value of the LLRs.

    dtype : tf.DType
        Defines the datatype for internal calculations and the output
        dtype. Defaults to `tf.float32`.

    Input
    -----
    (x, pb) :
        Tuple:

    x : [...,n], tf.float32
        Input sequence to the channel consisting of binary values :math:`\{0,1\}
        ` or :math:`\{-1,1\}`, respectively.

    pb : [...,2], tf.float32
        Error probability. Can be a tuple of two scalars or of any
        shape that can be broadcasted to the shape of ``x``. It has an
        additional last dimension which is interpreted as :math:`p_\text{b,0}`
        and :math:`p_\text{b,1}`.

    Output
    -------
        : [...,n], tf.float32
            Output sequence of same length as the input ``x``. If
            ``return_llrs`` is `False`, the output is ternary where a `-1` and
            `0` indicate an erasure for the binary and bipolar input,
            respectively.
    """

    def __init__(self, return_llrs=False, bipolar_input=False, llr_max=100.,dtype=tf.float32, **kwargs):

        super().__init__(dtype=dtype,**kwargs)

        assert isinstance(return_llrs, bool), "return_llrs must be bool."
        self._return_llrs = return_llrs

        assert isinstance(bipolar_input, bool), "bipolar_input must be bool."
        self._bipolar_input = bipolar_input

        assert llr_max>=0., "llr_max must be a positive scalar value."
        self._llr_max = tf.cast(llr_max, dtype=self.dtype)

        if self._return_llrs:
            assert dtype in (tf.float16, tf.float32, tf.float64),\
                "LLR outputs require non-integer dtypes."
        else:
            if self._bipolar_input:
                assert dtype in (tf.float16, tf.float32, tf.float64,
                    tf.int8, tf.int16, tf.int32, tf.int64),\
                    "Only, signed dtypes are supported for bipolar inputs."
            else:
                assert dtype in (tf.float16, tf.float32, tf.float64,
                    tf.uint8, tf.uint16, tf.uint32, tf.uint64,
                    tf.int8, tf.int16, tf.int32, tf.int64),\
                    "Only, real-valued dtypes are supported."

        self._check_input = True # check input for consistency (i.e., binary)

        self._eps = 1e-9 # small additional term for numerical stability
        self._temperature = tf.constant(0.1, tf.float32) # for Gumble-softmax

    #########################################
    # Public methods and properties
    #########################################

    @property
    def llr_max(self):
        """Maximum value used for LLR calculations."""
        return self._llr_max

    @llr_max.setter
    def llr_max(self, value):
        """Maximum value used for LLR calculations."""
        assert value>=0, 'llr_max cannot be negative.'
        self._llr_max = tf.cast(value, dtype=tf.float32)

    @property
    def temperature(self):
        """Temperature for Gumble-softmax trick."""
        return self._temperature

    @temperature.setter
    def temperature(self, value):
        """Temperature for Gumble-softmax trick."""
        assert value>=0, 'temperature cannot be negative.'
        self._temperature = tf.cast(value, dtype=tf.float32)

    #########################
    # Utility methods
    #########################

    def _check_inputs(self, x):
        """Check input x for consistency, i.e., verify
        that all values are binary of bipolar values."""
        x = tf.cast(x, tf.float32)
        if self._check_input:
            if self._bipolar_input: # allow -1 and 1 for bipolar inputs
                values = (tf.constant(-1, x.dtype),tf.constant(1, x.dtype))
            else: # allow 0,1 for binary input
                values = (tf.constant(0, x.dtype),tf.constant(1, x.dtype))
            tf.debugging.assert_equal(
                tf.reduce_min(tf.cast(tf.logical_or(tf.equal(x, values[0]),
                                    tf.equal(x, values[1])), x.dtype)),
                tf.constant(1, x.dtype),
                "Input must be binary.")
            # input datatype consistency should be only evaluated once
            self._check_input = False

    @tf.custom_gradient
    def _custom_xor(self, a, b):
        """Straight through estimator for XOR."""
        def grad(upstream):
            """identity in backward direction"""
            return upstream, upstream
        # xor in forward path
        # use module for "exotic" dtypes
        if self.dtype in (tf.uint8, tf.uint16, tf.uint32, tf.uint64, tf.int8, tf.int16, tf.int32, tf.int64):
            z = tf.math.mod(a+b, tf.constant(2, self.dtype))
        else: # use abs for float dtypes
            z = tf.abs(a - b)

        return z, grad

    @tf.custom_gradient
    def _ste_binarizer(self, x):
        """Straight through binarizer to quantize bits to int values."""
        def grad(upstream):
            """identity in backward direction"""
            return upstream
        # hard-decide in forward path
        z = tf.where(x<.5, 0., 1.)
        return z, grad

    def _sample_errors(self, pb, shape):
        """Samples binary error vector with given error probability e.
        This function is based on the Gumble-softmax "trick" to keep the
        sampling differentiable."""

        # this implementation follows https://arxiv.org/pdf/1611.01144v5.pdf
        # and https://arxiv.org/pdf/1906.07748.pdf
        tf.random.set_seed(GLOBAL_SEED_NUMBER)
        u1 = tf.random.uniform(shape=shape,
                                minval=0.,
                                maxval=1.,
                                dtype=tf.float32)
        u2 = tf.random.uniform(shape=shape,
                                minval=0.,
                                maxval=1.,
                                dtype=tf.float32)       

        u = tf.stack((u1, u2), axis=-1)

        # sample Gumble distribution
        q = - tf.math.log(- tf.math.log(u + self._eps) + self._eps)
        p = tf.stack((pb,1-pb), axis=-1)
        p = expand_to_rank(p, tf.rank(q), axis=0)
        p = tf.broadcast_to(p, tf.shape(q))
        a = (tf.math.log(p + self._eps) + q) / self._temperature

        # apply softmax
        e_cat = tf.nn.softmax(a)

        # binarize final values via straight-through estimator
        return self._ste_binarizer(e_cat[...,0]) # only take first class

    #########################
    # Keras layer functions
    #########################

    def build(self, input_shapes):
        """Verify correct input shapes"""

        pb_shapes = input_shapes[1]
        # allow tuple of scalars as alternative input
        if isinstance(pb_shapes, (tuple, list)):
            if not len(pb_shapes)==2:
                raise ValueError("Last dim of pb must be of length 2.")
        else:
            if len(pb_shapes)>0:
                if not pb_shapes[-1]==2:
                    raise ValueError("Last dim of pb must be of length 2.")
            else:
                raise ValueError("Last dim of pb must be of length 2.")

    def call(self, inputs):
        """Apply discrete binary memoryless channel to inputs."""

        x, pb = inputs

        # allow pb to be a tuple of two scalars
        if isinstance(pb, (tuple, list)):
            pb0 = pb[0]
            pb1 = pb[1]
        else:
            pb0 = pb[...,0]
            pb1 = pb[...,1]

        # clip for numerical stability
        pb0 = tf.cast(pb0, tf.float32) # Gumble requires float dtypes
        pb1 = tf.cast(pb1, tf.float32) # Gumble requires float dtypes
        pb0 = tf.clip_by_value(pb0, 0., 1.)
        pb1 = tf.clip_by_value(pb1, 0., 1.)

        # check x for consistency (binary, bipolar)
        self._check_inputs(x)

        e0 = self._sample_errors(pb0, tf.shape(x))
        e1 = self._sample_errors(pb1, tf.shape(x))

        if self._bipolar_input:
            neutral_element = tf.constant(-1, dtype=x.dtype)
        else:
            neutral_element = tf.constant(0, dtype=x.dtype)

        # mask e0 and e1 with input such that e0 only applies where x==0
        e = tf.where(x==neutral_element, e0, e1)
        e = tf.cast(e, x.dtype)

        if self._bipolar_input:
            # flip signs for bipolar case
            y = x * (-2*e + 1)
        else:
            # XOR for binary case
            y = self._custom_xor(x, e)

        # if LLRs should be returned
        if self._return_llrs:
            if not self._bipolar_input:
                y = 2 * y - 1 # transform to bipolar

            # Remark: Sionna uses the logit definition log[p(x=1)/p(x=0)]
            y0 = - (tf.math.log(pb1 + self._eps)
                   - tf.math.log(1 - pb0 - self._eps))
            y1 = (tf.math.log(1 - pb1 - self._eps)
                  - tf.math.log(pb0 + self._eps))
            # multiply by y to keep gradient
            y = tf.cast(tf.where(y==1, y1, y0), dtype=y.dtype) * y
            # and clip output llrs
            y = tf.clip_by_value(y, -self._llr_max, self._llr_max)

        return y

class BinarySymmetricChannel(BinaryMemorylessChannel):
    # pylint: disable=line-too-long
    r"""BinarySymmetricChannel(return_llrs=False, bipolar_input=False, llr_max=100., dtype=tf.float32, **kwargs)

    Discrete binary symmetric channel which randomly flips bits with probability
    :math:`p_\text{b}`.

    ..  figure:: ../figures/BSC_channel.png
        :align: center

    This layer supports binary inputs (:math:`x \in \{0, 1\}`) and `bipolar`
    inputs (:math:`x \in \{-1, 1\}`).

    If activated, the channel directly returns log-likelihood ratios (LLRs)
    defined as

    .. math::
        \ell =
        \begin{cases}
            \operatorname{log} \frac{p_{b}}{1-p_{b}}, \qquad \text{if}\, y=0 \\
            \operatorname{log} \frac{1-p_{b}}{p_{b}}, \qquad \text{if}\, y=1 \\
        \end{cases}
    where :math:`y` denotes the binary output of the channel.

    The bit flipping probability :math:`p_\text{b}` can be either a scalar or  a
    tensor (broadcastable to the shape of the input). This allows
    different bit flipping probabilities per bit position.

    This class inherits from the Keras `Layer` class and can be used as layer in
    a Keras model.

    Parameters
    ----------

    return_llrs: bool
        Defaults to `False`. If `True`, the layer returns log-likelihood ratios
        instead of binary values based on ``pb``.

    bipolar_input : bool, False
        Defaults to `False`. If `True`, the expected input is given as {-1,1}
        instead of {0,1}.

    llr_max: tf.float
        Defaults to 100. Defines the clipping value of the LLRs.

    dtype : tf.DType
        Defines the datatype for internal calculations and the output
        dtype. Defaults to `tf.float32`.

    Input
    -----
    (x, pb) :
        Tuple:

    x : [...,n], tf.float32
        Input sequence to the channel.

    pb : tf.float32
        Bit flipping probability. Can be a scalar or of any shape that
        can be broadcasted to the shape of ``x``.

    Output
    -------
        : [...,n], tf.float32
            Output sequence of same length as the input ``x``. If
            ``return_llrs`` is `False`, the output is binary and otherwise
            soft-values are returned.
    """

    def __init__(self, return_llrs=False, bipolar_input=False, llr_max=100., dtype=tf.float32, **kwargs):

        super().__init__(return_llrs=return_llrs,
                         bipolar_input=bipolar_input,
                         llr_max=llr_max,
                         dtype=dtype,
                         **kwargs)

    #########################
    # Keras layer functions
    #########################

    def build(self, input_shapes):
        """Verify correct input shapes"""
        pass # nothing to verify here

    def call(self, inputs):
        """Apply discrete binary symmetric channel, i.e., randomly flip
        bits with probability pb."""

        x, pb = inputs

        # the BSC is implemented by calling the DMC with symmetric pb
        pb = tf.cast(pb, x.dtype)
        pb = tf.stack((pb, pb), axis=-1)
        y = super().call((x, pb))

        return y

class BinaryZChannel(BinaryMemorylessChannel):
    # pylint: disable=line-too-long
    r"""BinaryZChannel(return_llrs=False, bipolar_input=False, llr_max=100., dtype=tf.float32, **kwargs)

    Layer that implements the binary Z-channel.

    In the Z-channel, transmission errors only occur for the transmission of
    second input element (i.e., if a `1` is transmitted) with error probability
    probability :math:`p_\text{b}` but the first element is always correctly
    received.

    ..  figure:: ../figures/Z_channel.png
        :align: center


    This layer supports binary inputs (:math:`x \in \{0, 1\}`) and `bipolar`
    inputs (:math:`x \in \{-1, 1\}`).

    If activated, the channel directly returns log-likelihood ratios (LLRs)
    defined as

    .. math::
        \ell =
        \begin{cases}
            \operatorname{log} \left( p_b \right), \qquad \text{if} \, y=0 \\
            \infty, \qquad \qquad \text{if} \, y=1 \\
        \end{cases}
    assuming equal probable inputs :math:`P(X=0) = P(X=1) = 0.5`.

    The error probability :math:`p_\text{b}` can be either a scalar or a
    tensor (broadcastable to the shape of the input). This allows
    different error probabilities per bit position.

    This class inherits from the Keras `Layer` class and can be used as layer in
    a Keras model.

    Parameters
    ----------

    return_llrs: bool
        Defaults to `False`. If `True`, the layer returns log-likelihood ratios
        instead of binary values based on ``pb``.

    bipolar_input : bool, False
        Defaults to `False`. If True, the expected input is given as {-1,1}
        instead of {0,1}.

    llr_max: tf.float
        Defaults to 100. Defines the clipping value of the LLRs.

    dtype : tf.DType
        Defines the datatype for internal calculations and the output
        dtype. Defaults to `tf.float32`.

    Input
    -----
    (x, pb) :
        Tuple:

    x : [...,n], tf.float32
        Input sequence to the channel.

    pb : tf.float32
        Error probability. Can be a scalar or of any shape that can be
        broadcasted to the shape of ``x``.

    Output
    -------
        : [...,n], tf.float32
            Output sequence of same length as the input ``x``. If
            ``return_llrs`` is `False`, the output is binary and otherwise
            soft-values are returned.
    """

    def __init__(self, return_llrs=False, bipolar_input=False, llr_max=100.,dtype=tf.float32, **kwargs):

        super().__init__(return_llrs=return_llrs,
                         bipolar_input=bipolar_input,
                         llr_max=llr_max,
                         dtype=dtype,
                         **kwargs)

    #########################
    # Keras layer functions
    #########################

    def build(self, input_shapes):
        """Verify correct input shapes"""
        pass # nothing to verify here

    def call(self, inputs):
        """Apply discrete binary symmetric channel, i.e., randomly flip
        bits with probability pb."""

        x, pb = inputs

        # the Z is implemented by calling the DMC with p(1|0)=0
        pb = tf.cast(pb, x.dtype)
        pb = tf.stack((tf.zeros_like(pb), pb), axis=-1)
        y = super().call((x, pb))

        return y


class BinaryErasureChannel(BinaryMemorylessChannel):
    # pylint: disable=line-too-long
    r"""BinaryErasureChannel(return_llrs=False, bipolar_input=False, llr_max=100., dtype=tf.float32, **kwargs)

    Binary erasure channel (BEC) where a bit is either correctly received
    or erased.

    In the binary erasure channel, bits are always correctly received or erased
    with erasure probability :math:`p_\text{b}`.

    ..  figure:: ../figures/BEC_channel.png
        :align: center

    This layer supports binary inputs (:math:`x \in \{0, 1\}`) and `bipolar`
    inputs (:math:`x \in \{-1, 1\}`).

    If activated, the channel directly returns log-likelihood ratios (LLRs)
    defined as

    .. math::
        \ell =
        \begin{cases}
            -\infty, \qquad \text{if} \, y=0 \\
            0, \qquad \quad \,\, \text{if} \, y=? \\
            \infty, \qquad \quad \text{if} \, y=1 \\
        \end{cases}

    The erasure probability :math:`p_\text{b}` can be either a scalar or a
    tensor (broadcastable to the shape of the input). This allows
    different erasure probabilities per bit position.

    Please note that the output of the BEC is ternary. Hereby, `-1` indicates an
    erasure for the binary configuration and `0` for the bipolar mode,
    respectively.

    This class inherits from the Keras `Layer` class and can be used as layer in
    a Keras model.

    Parameters
    ----------

    return_llrs: bool
        Defaults to `False`. If `True`, the layer returns log-likelihood ratios
        instead of binary values based on ``pb``.

    bipolar_input : bool, False
        Defaults to `False`. If `True`, the expected input is given as {-1,1}
        instead of {0,1}.

    llr_max: tf.float
        Defaults to 100. Defines the clipping value of the LLRs.

    dtype : tf.DType
        Defines the datatype for internal calculations and the output
        dtype. Defaults to `tf.float32`.

    Input
    -----
    (x, pb) :
        Tuple:

    x : [...,n], tf.float32
        Input sequence to the channel.

    pb : tf.float32
        Erasure probability. Can be a scalar or of any shape that can be
        broadcasted to the shape of ``x``.

    Output
    -------
        : [...,n], tf.float32
            Output sequence of same length as the input ``x``. If
            ``return_llrs`` is `False`, the output is ternary where each `-1`
            and each `0` indicate an erasure for the binary and bipolar input,
            respectively.
    """

    def __init__(self, return_llrs=False, bipolar_input=False, llr_max=100.,dtype=tf.float32, **kwargs):

        super().__init__(return_llrs=return_llrs,
                         bipolar_input=bipolar_input,
                         llr_max=llr_max,
                         dtype=dtype,
                         **kwargs)

        # also exclude uints, as -1 indicator for erasures does not exist
        assert dtype in (tf.float16, tf.float32, tf.float64,
                tf.int8, tf.int16, tf.int32, tf.int64),\
                "Unsigned integers are currently not supported."

    #########################
    # Keras layer functions
    #########################

    def build(self, input_shapes):
        """Verify correct input shapes"""
        pass # nothing to verify here

    def call(self, inputs):
        """Apply erasure channel to inputs."""

        x, pb = inputs

        # clip for numerical stability
        pb = tf.cast(pb, tf.float32) # Gumble requires float dtypes
        pb = tf.clip_by_value(pb, 0., 1.)

        # check x for consistency (binary, bipolar)
        self._check_inputs(x)

        # sample erasure pattern
        e = self._sample_errors(pb, tf.shape(x))

        # if LLRs should be returned
        # remark: the Sionna logit definition is llr = log[p(x=1)/p(x=0)]
        if self._return_llrs:
            if not self._bipolar_input:
                x = 2 * x -1
            x *= tf.cast(self._llr_max, x.dtype) # calculate llrs

            # erase positions by setting llrs to 0
            y = tf.where(e==1, tf.constant(0, x.dtype), x)
        else: # ternary outputs
            # the erasure indicator depends on the operation mode
            if self._bipolar_input:
                erased_element = tf.constant(0, dtype=x.dtype)
            else:
                erased_element = tf.constant(-1, dtype=x.dtype)

            y = tf.where(e==0, x, erased_element)
        return y


In [69]:
# 测试 BinaryMemorylessChannel 函数
def test_binary_memoryless_channel():
    # Define more complex input data
    x = tf.constant([[0, 1, 0, 1], [1, 0, 1, 0], [0, 0, 1, 1], [1, 1, 0, 0]], dtype=np.float32)
    pb = tf.constant([[0.1, 0.9], [0.5, 0.5], [0.2, 0.8], [0.8, 0.2]], dtype=np.float32)

    # Create instance of BinaryMemorylessChannel
    bmc = BinaryMemorylessChannel(return_llrs=True, bipolar_input=False, llr_max=100.)

    # Perform forward pass
    y = bmc((x, pb))

    # Print input and output
    print("输入数据 x:")
    print(x)
    print("\n翻转概率 pb:")
    print(pb)
    print("\n输出数据 y:")
    print(y)

# Run the test function
test_binary_memoryless_channel()

输入数据 x:
tf.Tensor(
[[0. 1. 0. 1.]
 [1. 0. 1. 0.]
 [0. 0. 1. 1.]
 [1. 1. 0. 0.]], shape=(4, 4), dtype=float32)

翻转概率 pb:
tf.Tensor(
[[0.1 0.9]
 [0.5 0.5]
 [0.2 0.8]
 [0.8 0.2]], shape=(4, 2), dtype=float32)

输出数据 y:
tf.Tensor(
[[0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]], shape=(4, 4), dtype=float32)


In [30]:
# 测试 BinarySymmetricChannel 类
# 定义输入数据
x = tf.constant([[0, 1, 0, 1], [1, 0, 1, 0]], dtype=tf.float32)
pb = tf.constant(0.37, dtype=tf.float32)  # 100% 概率翻转

# 创建 BinarySymmetricChannel 实例
bsc = BinarySymmetricChannel()

# 执行前向传播
y = bsc((x, pb))

# 打印输入和输出
print("输入数据 x:")
print(x.numpy())
print("\n翻转概率 pb:")
print(pb.numpy())
print("\n输出数据 y:")
print(y.numpy())

输入数据 x:
[[0. 1. 0. 1.]
 [1. 0. 1. 0.]]

翻转概率 pb:
0.37

输出数据 y:
[[0. 0. 0. 0.]
 [1. 1. 0. 0.]]


In [54]:
# 测试 BinaryZChannel 类
# 定义更复杂的输入数据
x = tf.constant([[0, 1, 0, 1], [1, 0, 1, 0], [0, 0, 1, 1], [1, 1, 0, 0]], dtype=tf.float32)
pb = tf.constant([0.1, 0.3, 0.2, 0.8], dtype=tf.float32)  # 各种不同的概率

# 创建 BinaryZChannel 实例
bzc = BinaryZChannel()

# 执行前向传播
y = bzc((x, pb))

# 打印输入和输出
print("输入数据 x:")
print(x.numpy())
print("\n翻转概率 pb:")
print(pb.numpy())
print("\n输出数据 y:")
print(y.numpy())

输入数据 x:
[[0. 1. 0. 1.]
 [1. 0. 1. 0.]
 [0. 0. 1. 1.]
 [1. 1. 0. 0.]]

翻转概率 pb:
[0.1 0.3 0.2 0.8]

输出数据 y:
[[0. 0. 0. 0.]
 [1. 0. 0. 0.]
 [0. 0. 1. 0.]
 [1. 1. 0. 0.]]


In [32]:
# 测试BinaryErasureChannel类
# Usage example
input_data = tf.constant([-1, 1, -1, 1, 1], dtype=tf.float32)
pb = tf.constant(0.48)
channel = BinaryErasureChannel(return_llrs=False, bipolar_input=True)

output = channel((input_data, pb))
# 打印输入和输出
print("输入数据 input_data:")
print(input_data.numpy())
print("\n翻转概率 pb:")
print(pb.numpy())
print("\n输出数据 output:")
print(output.numpy())

输入数据 input_data:
[-1.  1. -1.  1.  1.]

翻转概率 pb:
0.48

输出数据 output:
[-1.  0. -1.  0.  1.]
