In [1]:
import torch
import torch.nn as nn

GRU（门控循环单元）是一种循环神经网络（RNN）的变体，它通过引入更新门（Update Gate）和重置门（Reset Gate）来控制信息的流动，从而解决了传统RNN中的梯度消失和梯度爆炸问题。GRU的设计使得它在处理序列数据时更加高效，尤其是在长序列数据上。

GRU主要包括四个部分：

重置门（Reset Gate）：重置门控制着上一时间步的信息在多大程度上影响当前时间步的候选隐藏状态。其计算公式为：
$$R^t = \sigma(X^t W^{xr} + H^{t-1} W^{hr} + b^r)$$

更新门（Update Gate）：更新门决定了上一时间步的隐藏状态在当前时间步的保留程度。其计算公式为：
$$Z^t = \sigma(X^t W^{xz} + H^{t-1} W^{hz} + b^z)$$

候选隐藏状态（Candidate Hidden State）：候选隐藏状态结合了当前输入和上一时间步的信息，计算公式为：
$$\tilde{H}^{t}=\tanh\left(X^{t} W^{x h}+\left(R^{t}\odot H^{t-1}\right) W^{h h}+b^{h}\right)$$

隐藏状态更新：最终的隐藏状态是上一时间步的隐藏状态和候选隐藏状态的加权和，计算公式为：
$$H^{t} = Z^{t} \odot H^{t-1} + (1 - Z^{t}) \odot \tilde{H}^{t}$$

In [2]:
# 自定义的GRU层
class CostomGRU_layer(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(CostomGRU_layer, self).__init__()
        # 初始化参数
        self.W_xz = nn.Parameter(torch.randn(input_size, hidden_size))  # 更新门的输入到隐藏层的权重
        self.W_hz = nn.Parameter(torch.randn(hidden_size, hidden_size))  # 更新门的隐藏层到隐藏层的权重

        self.W_xr = nn.Parameter(torch.randn(input_size, hidden_size))  # 重置门的输入到隐藏层的权重
        self.W_hr = nn.Parameter(torch.randn(hidden_size, hidden_size))  # 重置门的隐藏层到隐藏层的权重

        self.W_xh = nn.Parameter(torch.randn(input_size, hidden_size))  # 候选隐藏状态的输入到隐藏层的权重
        self.W_hh = nn.Parameter(torch.randn(hidden_size, hidden_size))  # 候选隐藏状态的隐藏层到隐藏层的权重

        self.hb_z = nn.Parameter(torch.zeros(hidden_size))  # 更新门的偏置
        self.hb_r = nn.Parameter(torch.zeros(hidden_size))  # 重置门的偏置
        self.hb_h = nn.Parameter(torch.zeros(hidden_size))  # 候选隐藏状态的偏置
        
        self.xb_z = nn.Parameter(torch.zeros(hidden_size))  
        self.xb_r = nn.Parameter(torch.zeros(hidden_size))  
        self.xb_h = nn.Parameter(torch.zeros(hidden_size))  

    def forward(self, x, h):
        # 前向传播
        z = torch.sigmoid((torch.matmul(x, self.W_xz) + self.xb_z) + (torch.matmul(h, self.W_hz) + self.hb_z))  # 更新门
        r = torch.sigmoid((torch.matmul(x, self.W_xr) + self.xb_r) + (torch.matmul(h, self.W_hr) + self.hb_r))  # 重置门
        h_tilda = torch.tanh((torch.matmul(x, self.W_xh) + self.xb_h) + r * (torch.matmul(h, self.W_hh) + self.hb_h))  # 候选隐藏状态
        h = z * h + (1 - z) * h_tilda  # 更新隐藏状态
        return h

In [3]:
# 自定义的GRU模型
class CostomGRU(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(CostomGRU, self).__init__()
        self.input_size = input_size  # 输入特征的维度
        self.hidden_size = hidden_size  # 隐藏层的维度
        # 初始化自定义的GRU层
        self.gru = CostomGRU_layer(self.input_size, self.hidden_size)

    def forward(self, X, h0=None):
        # x.shape = (batch_size, seq_length, input_size)
        # h0.shape = (1, batch_size, hidden_size)
        # output.shape = (batch_size, seq_length, hidden_size)

        # 获取批次大小
        batch_size = X.shape[1]
        # 获取序列长度
        self.seq_length = X.shape[0]
        
        # 如果没有提供初始隐藏状态，则初始化为零张量
        if h0 is None:
            prev_h = torch.zeros([batch_size, self.hidden_size]).to(device)
        else:
            prev_h = torch.squeeze(h0, 0) 

        # 初始化输出张量
        output = torch.zeros([self.seq_length, batch_size, self.hidden_size]).to(device)

        # 循环处理序列中的每个时间步
        for i in range(self.seq_length):
            # 通过GRU层处理当前时间步的数据，并更新隐藏状态
            prev_h = self.gru(X[i], prev_h)
            # 将当前时间步的输出存储在输出张量中
            output[i] = prev_h

        # 返回最终的输出和隐藏状态
        return output, torch.unsqueeze(prev_h, 0)

In [4]:
batch_size = 16
seq_length = 30
input_size = 32
hidden_size = 64
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [5]:
x = torch.rand(seq_length, batch_size, input_size).to(device)  # 创建一个随机输入张量，形状为(seq_length, batch_size, input_size)
model = CostomGRU(input_size, hidden_size).to(device)  # 实例化自定义的GRU模型
output = model(x)  # 进行前向传播

# 打印输出张量的形状
print(output[0].shape)  
print(output[1].shape)  

torch.Size([30, 16, 64])
torch.Size([1, 16, 64])


In [6]:
"""测试：
    将nn.GRU中的4个随机初始化的可学习参数进行保存，并替换掉
      CostomGRU中CostomGRU_layer随机初始化的可学习参数，并通过torch.allclose
      判断输出是否相等，若相等则证明MyGRU的实现与官方的nn.GRU是一致的
"""

# 初始化nn.GRU
gru = nn.GRU(input_size=input_size, hidden_size=hidden_size).to(device)
weight_ih_l0 = gru.weight_ih_l0.T
weight_hh_l0 = gru.weight_hh_l0.T
bias_ih_l0 = gru.bias_ih_l0
bias_hh_l0 = gru.bias_hh_l0

# 初始化CostomGRU
costom_gru = CostomGRU(input_size=input_size, hidden_size=hidden_size).to(device)

# 替换CostomGRU中的参数
costom_gru.gru.W_xr = nn.Parameter(weight_ih_l0[:, :costom_gru.gru.W_xr.size(1)])  # 更新门的输入权重
costom_gru.gru.W_hr = nn.Parameter(weight_hh_l0[:, :costom_gru.gru.W_hr.size(1)])  # 更新门的隐藏权重

costom_gru.gru.W_xz = nn.Parameter(weight_ih_l0[:, costom_gru.gru.W_xr.size(1):costom_gru.gru.W_xr.size(1) + costom_gru.gru.W_xz.size(1)])  # 重置门的输入权重
costom_gru.gru.W_hz = nn.Parameter(weight_hh_l0[:, costom_gru.gru.W_hr.size(1):costom_gru.gru.W_hr.size(1) + costom_gru.gru.W_hz.size(1)])  # 重置门的隐藏权重

costom_gru.gru.W_xh = nn.Parameter(weight_ih_l0[:, costom_gru.gru.W_xr.size(1) + costom_gru.gru.W_xz.size(1):])  # 候选隐藏状态的输入权重
costom_gru.gru.W_hh = nn.Parameter(weight_hh_l0[:, costom_gru.gru.W_hr.size(1) + costom_gru.gru.W_hz.size(1):])  # 候选隐藏状态的隐藏权重

costom_gru.gru.hb_r = nn.Parameter(bias_hh_l0[:costom_gru.gru.hb_r.size(0)])  # 更新门的偏置
costom_gru.gru.hb_z = nn.Parameter(bias_hh_l0[costom_gru.gru.hb_r.size(0):costom_gru.gru.hb_z.size(0) + costom_gru.gru.hb_r.size(0)])  # 重置门的偏置
costom_gru.gru.hb_h = nn.Parameter(bias_hh_l0[costom_gru.gru.hb_z.size(0) + costom_gru.gru.hb_r.size(0):])  # 候选隐藏状态的偏置

costom_gru.gru.xb_r = nn.Parameter(bias_ih_l0[:costom_gru.gru.xb_r.size(0)])
costom_gru.gru.xb_z = nn.Parameter(bias_ih_l0[costom_gru.gru.xb_r.size(0):costom_gru.gru.xb_z.size(0) + costom_gru.gru.xb_r.size(0)])
costom_gru.gru.xb_h = nn.Parameter(bias_ih_l0[costom_gru.gru.xb_z.size(0) + costom_gru.gru.xb_r.size(0):])

# 初始化输入数据
x = torch.rand(seq_length, batch_size, input_size).to(device)

# 获取CostomGRU和nn.GRU的输出
output1, h1 = costom_gru(x)
output2, h2 = gru(x)


# 使用torch.allclose比较输出是否相等
print("output1 == output2 ? {}".format(torch.allclose(output1.to('cpu'), output2.to('cpu'), atol=1e-6)))
print("h1 == h2 ? {}".format(torch.allclose(h1.to('cpu'), h2.to('cpu'), atol=1e-6)))

output1 == output2 ? True
h1 == h2 ? True
