In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [3]:
# Hyperparameters
vocab_size = 30
learning_rate = 0.005
hidden_units = 128
beta1 = 0.90
beta2 = 0.99

In [4]:
# Activation Functions

#sigmoid function
def sigmoid(X):
    return 1/1(1+np.exp(-X))

def tanh_activation(X):
    return np.tanh(X)

# softmax activation
def softmax(X):
    exp_X = np.exp(X)
    exp_X_sum = np.sum(exp_X, axis=1).reshape(-1, 1)
    exp_X = exp_X / exp_X_sum
    return exp_X

# derivative of tanh
def tanh_derivative(X):
    return 1 - (X**2)

  return 1/1(1+np.exp(-X))


In [5]:
# 初始化 lstm，包含cell state, hidden state
def init_lstm_state(batch_size, hidden_units, device):
    return (torch.zeros((batch_size, hidden_units), ctx=device), 
            torch.zeros((batch_size, hidden_units), ctx=device))

In [6]:
# initialize parameters
def initialize_parameters(vocab_size, hidden_units, device):
    std = 0.01
    input_units = output_units = vocab_size

    # 正态分布
    def normal(shape):
        return torch.randn(size=shape, device=device) * std

    # LSTM cell weights
    forget_gate_weights = normal((input_units + hidden_units, hidden_units))
    input_gate_weights = normal((input_units + hidden_units, hidden_units))
    output_gate_weights = normal((input_units + hidden_units, hidden_units))
    c_tilda_gate_weights = normal((input_units + hidden_units, hidden_units))

    # 偏置项
    forget_gate_bias = torch.zeros((1, hidden_units), device=device)
    input_gate_bias = torch.zeros((1, hidden_units), device=device)
    output_gate_bias = torch.zeros((1, hidden_units), device=device)
    c_tilda_gate_bias = torch.zeros((1, hidden_units), device=device)

    # 输出层参数
    hidden_output_weights = normal((hidden_units, output_units))
    output_bias = torch.zeros((1, output_units), device=device)

    # 将所有参数添加到字典
    parameters = {
        'fgw': forget_gate_weights,
        'igw': input_gate_weights,
        'ogw': output_gate_weights,
        'cgw': c_tilda_gate_weights,
        'fgb': forget_gate_bias,
        'igb': input_gate_bias,
        'ogb': output_gate_bias,
        'cgb': c_tilda_gate_bias,
        'how': hidden_output_weights,
        'ob': output_bias
    }

    # 设置 requires_grad=True 以启用梯度计算
    # 确保所有参数在反向传播中能够计算梯度
    for param in parameters.values():
        param.requires_grad_(True)

    return parameters



In [7]:
# single lstm cell
def lstm_cell(batch_dataset, prev_hidden_state, prev_cell_state, parameters):
    # get parameters
    fgw = parameters['fgw']
    igw = parameters['igw']
    ogw = parameters['ogw']
    cgw = parameters['cgw']

    fgb = parameters['fgb']
    igb = parameters['igb']
    ogb = parameters['ogb']
    cgb = parameters['cgb']
    
    # 串联 data 和 prev_hidden_state
    concat_dataset = np.concatenate((batch_dataset, prev_hidden_state), axis=1)

    # forget gate activations
    F = sigmoid(np.matmul(concat_dataset, fgw) + fgb)

    # input gate activations
    I = sigmoid(np.matmul(concat_dataset, igw) + igb)

    # output gate activations
    O = sigmoid(np.matmul(concat_dataset, ogw) + ogb)

    # cell_tilda gate activations
    C_tilda = np.tanh(np.matmul(concat_dataset, cgw) + cgb)

    # 更新 cell state, hidden_state
    cell_state = F * prev_cell_state + I * C_tilda
    hidden_state = np.multiply(O, np.tanh(cell_state))

    # store four gate weights to be used in back propagation
    lstm_activations = {
        'F': F,
        'I': I,
        'O': O,
        'C_tilda': C_tilda
    }
    
    return lstm_activations, hidden_state, cell_state

In [8]:
# 输出层
# 需要注意的是，只有隐状态才会传递到输出层，而记忆元不直接参与输出计算，记忆元完全属于内部信息
def output_cell(hidden_state, parameters):
    # get hidden to output parameters
    how = parameters['how']
    ob = parameters['ob']
    # calculate the output
    output = np.matmul(hidden_state, how)
    # 如果输出为概率的话，可以使用softmax函数进行归一化
    # output = softmax(output)
    return output

In [9]:
def lstm(inputs, initail_state, parameters):
    # inputs的形状：(时间步数量， 批量大小， 词表大小) (num_steps, batch_size, vocab_size)
    hidden_state, cell_state = initail_state
    outputs = []

    for X in inputs:
        _, hidden_state, cell_state = lstm_cell(X, hidden_state, cell_state, parameters)
    
        outputs.append(output_cell(hidden_state, parameters))
    return outputs, (hidden_state, cell_state)


In [10]:
# 定义一个RNN 类来训练LSTM
import torch.nn.functional as F

class RNNModelScratch:
    def __init__(self, vocab_size, num_hiddens, device, get_params, init_state, forward_fn):
        self.vocab_size = vocab_size
        self.num_hiddens = num_hiddens
        self.params = get_params(vocab_size, hidden_units, device)
        self.init_state, self.forward_fn = init_state, forward_fn

    def __call__(self, X, state):
        X = F.one_hot(X.T, self.vocab_size).type(torch.float32)
        return self.forward_fn(X, state, self.params)
    
    def begin_state(self, batch_size, device):
        return self.init_state(batch_size, self.num_hiddens, device)

In [11]:
model = RNNModelScratch(vocab_size, hidden_units, device, initialize_parameters, init_lstm_state, lstm)
model

<__main__.RNNModelScratch at 0x161427a00>

In [13]:
model.params

{'fgw': tensor([[-0.0008,  0.0131, -0.0069,  ...,  0.0121, -0.0083,  0.0055],
         [-0.0076,  0.0060,  0.0019,  ..., -0.0173,  0.0004,  0.0165],
         [ 0.0054,  0.0005, -0.0215,  ...,  0.0070, -0.0085,  0.0161],
         ...,
         [-0.0133,  0.0039,  0.0162,  ...,  0.0087, -0.0113, -0.0034],
         [ 0.0032,  0.0027,  0.0012,  ...,  0.0015,  0.0125,  0.0067],
         [ 0.0176, -0.0077, -0.0093,  ...,  0.0106,  0.0220, -0.0006]],
        requires_grad=True),
 'igw': tensor([[-0.0099, -0.0066,  0.0038,  ...,  0.0047, -0.0006,  0.0182],
         [-0.0023, -0.0048,  0.0153,  ..., -0.0240,  0.0119,  0.0001],
         [-0.0085, -0.0020,  0.0001,  ...,  0.0026,  0.0206, -0.0091],
         ...,
         [ 0.0029,  0.0095,  0.0106,  ...,  0.0017, -0.0030, -0.0011],
         [ 0.0133, -0.0033,  0.0187,  ..., -0.0050,  0.0088,  0.0128],
         [ 0.0067, -0.0166,  0.0203,  ..., -0.0119, -0.0224,  0.0087]],
        requires_grad=True),
 'ogw': tensor([[-0.0009,  0.0058,  0.0118,  .

In [12]:
from d2l import torch as d2l

batch_size, num_steps = 32, 35
train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps)

vocab_size, num_hiddens = len(vocab), 256
num_epochs, lr = 500, 1

d2l.train_ch8(model, train_iter, vocab, lr, num_epochs, device)

AttributeError: module 'd2l.torch' has no attribute 'load_data_time_machine'

In [63]:
# Get corresonding embeddings for the batch dataset
def get_embeddings(batch_dataset, embeddings):
    embedding_dataset = np.matmul(batch_dataset, embeddings)
    return embedding_dataset


![forward propagation](../assets/forward-propagation.png)


In [64]:
# forward propagation
def forward_propagation(batches, parameters, embeddings):
    # get batch size
    batch_size = batches[0].shape[0]

    # 存储缓存信息
    lstm_cache = dict()
    hidden_cache = dict()
    cell_cache = dict()
    output_cache = dict()
    embedding_cache = dict()

    # 初始化hidden_state(h0), cell_state(c0)  偏置项
    h0 = np.zeros([batch_size, hidden_units], dtype=np.float32)
    c0 = np.zeros([batch_size, hidden_units], dtype=np.float32)

    # 存储初始的hidden_state(h0), cell_state(c0)
    hidden_cache['h0'] = h0
    cell_cache['c0'] = c0

    for i in range(len(batches) - 1):
        

SyntaxError: incomplete input (2557073817.py, line 22)