In [84]:
import torch
import torch.nn as nn
import numpy as np

# 单细胞LSTM

In [85]:
# 定义一个LSTM单元，输入维度20，隐藏层维度100
model = nn.LSTMCell(20, 100)

# 打印隐藏层到隐藏层的权重矩阵形状
# 形状为[4*hidden_size, hidden_size]，对应四个门（输入/遗忘/细胞/输出）的权重
print(model.weight_hh.shape)  # 输出: torch.Size([400, 100])

# 打印输入层到隐藏层的权重矩阵形状
# 形状为[4*hidden_size, input_size]，对应四个门的输入权重
print(model.weight_ih.shape)  # 输出: torch.Size([400, 20])

torch.Size([400, 100])
torch.Size([400, 20])


In [86]:
def sigmoid(x):
    return 1/(1+np.exp(-x))

# 自定义LSTM单元实现
def lstm_cell(x, h, c, W_hh, W_ih, b):
    # 合并计算四个门的线性变换: W_ih@x + W_hh@h + b
    # 输出形状为[4*hidden_size]，按四个门切割
    i, f, g, o = np.split(W_ih @ x + W_hh @ h + b, 4)
    
    # 应用激活函数
    i,f,g,o = sigmoid(i), sigmoid(f), np.tanh(g), sigmoid(o)
    
    # 更新细胞状态
    c_out = f * c + i * g
    
    # 计算隐藏状态
    h_out = o * np.tanh(c_out)
    
    return h_out, c_out  # 返回新隐藏状态和细胞状态

In [87]:
# 生成随机输入数据（1个样本，20维特征）
x = np.random.randn(1, 20).astype(np.float32)
h0 = np.random.randn(1, 100).astype(np.float32)  # 初始隐藏状态
c0 = np.random.randn(1, 100).astype(np.float32)  # 初始细胞状态

# 使用PyTorch的LSTMCell计算输出
h_pytorch, c_pytorch = model(
    torch.tensor(x), 
    (torch.tensor(h0), torch.tensor(c0)))

In [88]:
x[0] # 注意区分和x的区别

array([-0.5220228 ,  0.37693432, -1.7825887 , -1.09794   ,  0.24591659,
        0.6271068 , -0.6358606 ,  0.6282339 ,  0.4833358 ,  0.40714845,
        0.43824404, -0.23232095, -0.67724615,  0.80638236, -0.6009789 ,
       -0.05727648,  0.26365104, -1.0171974 , -0.5647117 ,  0.66235757],
      dtype=float32)

In [89]:
# 使用自定义LSTM单元计算结果
h_custom, c_custom = lstm_cell(
    x[0], h0[0], c0[0],
    model.weight_hh.detach().numpy(),     # 获取PyTorch权重并转换为NumPy
    model.weight_ih.detach().numpy(),
    (model.bias_hh + model.bias_ih).detach().numpy()  # 合并偏置项
)

# 比较自定义实现与PyTorch结果的差异（L2范数）
print(np.linalg.norm(h_pytorch.detach().numpy() - h_custom))  # 误差
print(np.linalg.norm(c_pytorch.detach().numpy() - c_custom))  # 误差

2.2367168e-07
3.9906277e-07


# 全序列LSTM
- 为了处理长序列的拆分，函数返回所有的hidden state以及最后一个cell state

In [90]:
model = nn.LSTM(20,100,num_layers=1)  # 生成测试数据（50步序列，20维特征）

X_seq = np.random.randn(50, 20).astype(np.float32)
h0 = np.random.randn(1, 100).astype(np.float32)  # 初始隐藏状态
c0 = np.random.randn(1, 100).astype(np.float32)  # 初始细胞状态

In [91]:
# 定义完整LSTM序列处理函数
def lstm(X, h, c, W_hh, W_ih, b):
    # 初始化输出隐藏状态矩阵 [序列长度, 隐藏维度]
    H = np.zeros((X.shape[0], h.shape[0]))
    for t in range(X.shape[0]):
        # 逐时间步调用LSTM单元
        h, c = lstm_cell(X[t], h, c, W_hh, W_ih, b)
        H[t, :] = h  # 保存当前隐藏状态
    return H, c  # 返回所有隐藏状态和最终细胞状态

# 调用自定义LSTM处理序列
H_custom, cn_custom = lstm(
    X_seq, h0[0], c0[0],
    model.weight_hh_l0.detach().numpy(),
    model.weight_ih_l0.detach().numpy(),
    (model.bias_hh_l0 + model.bias_ih_l0).detach().numpy()
)

H_pytorch, (hn_pytorch, cn_pytorch) = model(torch.tensor(X_seq)[:,None,:],
                                            (torch.tensor(h0)[:,None,:],torch.tensor(c0)[:,None,:]))
# [:,None,:]在张量的第二维插入一个大小为1的维度，适配PyTorch LSTM对输入形状必须包含批次维度的要求

# 结果对比
print(np.linalg.norm(H_custom - H_pytorch[:,0,:].detach().numpy()))  # 误差


1.54967600491765e-06


# 添加批处理的LSTM
- 内存连续的矩阵乘法效果最好，而lstm是按时间步来遍历矩阵的，如果按习惯将batch作为第一个维度，即[batch_size,timesteps,imput_size]则访问时为[:,t,:]内存排放不连续，故需要将t放在第一个维度

In [92]:
# 定义批处理优化的LSTM单元
def lstm_cell(x, h, c, W_hh, W_ih, b):
    # 输入x形状: [batch_size, input_size]
    # 矩阵乘法优化计算
    gates = x @ W_ih + h @ W_hh + b[None, :]  # 广播偏置
    i, f, g, o = np.split(gates, 4, axis=1)    # 按列切割
    
    # 激活函数
    i, f, g, o = sigmoid(i), sigmoid(f), np.tanh(g), sigmoid(o)
    c_out = f * c + i * g
    h_out = o * np.tanh(c_out)
    return h_out, c_out

# 批处理LSTM实现
def lstm(X, h, c, W_hh, W_ih, b):
    H = np.zeros((X.shape[0], X.shape[1], h.shape[1]))  # 输出形状[T, B, N]
    for t in range(X.shape[0]):
        h, c = lstm_cell(X[t], h, c, W_hh, W_ih, b)
        H[t, :, :] = h  # 保存当前批的隐藏状态
    return H, c

In [93]:
X_seq = np.random.randn(50, 128, 20).astype(np.float32)
h0 = np.random.randn(1, 128, 100).astype(np.float32)  # 初始隐藏状态
c0 = np.random.randn(1, 128, 100).astype(np.float32)  # 初始细胞状态

In [94]:
# 调用自定义LSTM处理序列
H_custom, cn_custom = lstm(
    X_seq, h0[0], c0[0],
    model.weight_hh_l0.detach().numpy().T, # 加.T匹以配前面@运算前后调换了
    model.weight_ih_l0.detach().numpy().T,
    (model.bias_hh_l0 + model.bias_ih_l0).detach().numpy()
)

# 官方LSTM
H_pytorch, (hn_pytorch, cn_pytorch) = model(torch.tensor(X_seq),
                                            (torch.tensor(h0),torch.tensor(c0)))

print(np.linalg.norm(H_custom - H_pytorch.detach().numpy()))  # 误差

1.1522489051983313e-05


In [95]:
H_pytorch.shape

torch.Size([50, 128, 100])

# 训练LSTM

In [96]:
def train_lstm(X, h0, c0, Y, W_hh, W_ih, b, opt):
    H, cn = lstm(X, h0, c0, W_hh, W_ih, b)
    l = loss(H, Y)
    l.backward()
    opt.step()

- 对于多层多时间步LSTM or RNN,先计算一整条时间步再多层间迭代

In [97]:
def train_deep_lstm(X, h0, c0, Y, W_hh, W_ih, b, opt):
    H = X
    depth = len(W_hh)
    for i in range(depth): # 层间迭代
        H, cn = lstm(H, h0[i], c0[i], W_hh[i], W_ih[i], b[i])
    l = loss(H, Y)
    l.backward()
    opt.step()

- 当时间步非常长时，需要截断计算图，拆分成许多小部分作正向反向传播更新参数，之间仅传递末尾h/c作为下个部分的h0/c0，故函数要返回末尾hidden/cell
- 就是对多层多时间步进行或横向或纵向的计算图细分

In [98]:
def train_deep_lstm(X, h0, c0, Y, W_hh, W_ih, b, opt):
    H = X
    depth = len(W_hh)
    for i in range(depth): # 层间迭代
        H, cn = lstm(H, h0[i], c0[i], W_hh[i], W_ih[i], b[i])
        h0[i] = H[-1].detech().copy()
        c0[i] = cn.detech().copy()

    l = loss(H, Y)
    l.backward()
    opt.step()
    return h0,c0

# # 训练过程
# # 初始化
# h0, c0 = np.zeros() 
# sequence_len, BLOCK_SIZE = ...

# for i in range(sequence_len // BLOCK_SIZE):
#     h0, c0 = train_deep_lstm(X[i:i+BLOCK_SIZE], h0, c0, Y[i:i+BLOCK_SIZE], W_hh, W_ih, b, opt)