1. 定义基本工具类

In [1]:
import numpy as np
import torch
import torch.nn.functional as F
import time
import random
from collections import Counter

RANDOM_SEED = 123
torch.manual_seed(RANDOM_SEED)
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

def elapsed(sec):   # 计算时间函数
    if sec < 60:
        return str(sec) + 'sec'
    elif sec < (60*60):
        return str(sec/60) + 'min'
    else:
        return str(sec*(60*60)) + 'hr'
    
training_file = 'data/wordstest.txt' # 定义样本文件

def readalltxt(txt_files):   # 处理中文
    labels = []
    for txt_file in txt_files:
        target = get_ch_label(txt_file)
        labels.append(target)
    return labels

def get_ch_label(txt_file):     # 获取样本中的汉字
    labels = ""
    with open(txt_file, 'rb') as f:
        for label in f:
            labels = labels +label.decode('utf-8')
    return labels

# 将汉字转为向量，支持文件和内存对象里的汉字转换
def get_ch_label_v(txt_file, word_num_map, txt_label=None): 
    words_size = len(word_num_map)
    to_num = lambda word: word_num_map.get(word, words_size)
    if txt_file != None:
        txt_label = get_ch_label(txt_file)
    labels_vector = list(map(to_num, txt_file))
    return labels_vector

2. 样本预处理

In [2]:
training_data = get_ch_label(training_file)
print('Loaded training data...')

print('样本长度：', len(training_data))
counter = Counter(training_data)
words = sorted(counter)
words_size = len(words)
word_num_map = dict(zip(words, range(words_size)))

print('字表大小：',words_size)
wordlabel = get_ch_label_v(training_file, word_num_map)

Loaded training data...
样本长度： 200
字表大小： 101


3. 构建循环神经网络（RNN）模型

In [3]:
class GRURNN(torch.nn.Module):
    def __init__(self, word_size, embed_dim, hidden_dim, output_size, num_layers) -> None:
        super(GRURNN, self).__init__()
        self.num_layers = num_layers
        self.hidden_dim = hidden_dim

        self.embed = torch.nn.Embedding(word_size,embed_dim)
        self.gru = torch.nn.GRU(input_size=embed_dim, hidden_size=hidden_dim, 
                                num_layers=num_layers, bidirectional=True)
        self.fc = torch.nn.Linear(hidden_dim*2, output_size)

    def forward(self, features, hidden):
        embedded = self.embed(features.view(1,-1))
        output, hidden = self.gru(embedded.view(1,1,-1), hidden)
        output = self.attention(output)
        output = self.fc(output.view(1,-1))
        return output, hidden
    
    def init_zero_state(self):
        init_hidden = torch.zeros(self.num_layers*2, 1,
                                  self.hidden_dim).to(DEVICE)
        return init_hidden

4. 实例化模型，并训练模型

In [6]:
# 定义参数，训练模型
EMBEDDING_DIM = 10  # 定义词嵌入维度
HIDDEN_DIM = 20      # 定义隐藏层维度
NUM_LAYERS = 1      # 定义层数
# 实例化模型
model = GRURNN(words_size, EMBEDDING_DIM, HIDDEN_DIM, words_size, NUM_LAYERS)
model = model.to(DEVICE)
optimizer = torch.optim.Adam(model.parameters(), lr=0.005)
# 定义预测函数
def evaluate(model, prime_str, predict_len, temperature=0.8):
    hidden = model.init_zero_state().to(DEVICE)
    predicted = ''

    # 处理输入语义
    for p in range(len(prime_str) - 1):
        _, hidden = model(prime_str[p], hidden)     # 将输入文字和状态传入模型
        predicted += words[prime_str[p]]
    inp = prime_str[-1]     # 获得输入字符
    predicted += words[inp]
    # 按指定长度输出预测字符
    for p in range(predict_len):
        output, hidden = model(inp, hidden) # 将输入字符和状态传入模型
        # 从多项式分布中采样
        output_dist = output.data.view(-1).div(temperature).exp()
        inp = torch.multinomial(output_dist, 1)[0]  # 获取采样中的结果
        predicted += words[inp]         # 将索引转成汉字并保存到字符串中
    return predicted # 将输入字符和预测字符一起返回

# 定义参数训练模型
training_iters = 5000
display_step = 1000
n_input = 4
step = 0
offset = random.randint(0, n_input+1)
end_offset = n_input + 1

while step < training_iters:    # 按照迭代次数训练模型
    start_time = time.time()    # 计算起始时间
    # 随机取一个位置偏移
    if offset > (len(training_data) - end_offset):
        offset = random.randint(0, n_input+1)
    # 制作输入样本
    inwords = wordlabel[offset:offset+n_input]
    inwords = np.reshape(np.array(inwords), [n_input, -1, 1])
    # 制作标签样本
    out_onehot = wordlabel[offset+1:offset+n_input+1]
    hidden = model.init_zero_state()   # 将RNN初始状态清零
    optimizer.zero_grad()

    loss = 0
    inputs = torch.LongTensor(inwords).to(DEVICE)
    targets = torch.LongTensor(out_onehot).to(DEVICE)
    for c in range(n_input):    # 按照输入长度依次将样本输入模型进行预测
        outputs, hidden = model(inputs[c], hidden)
        loss += F.cross_entropy(outputs, targets[c].view(1))
    
    loss /= n_input
    loss.backward()
    optimizer.step()

    # 输出日志
    with torch.set_grad_enabled(False):
        if (step+1)%display_step == 0:
            print(f'Time elapsed: {(time.time() - start_time)/60:.4f} min')
            print(f'step {step+1} | Loss {loss.item():.2f}\n\n')
            with torch.no_grad():
                print(evaluate(model, inputs, 32), '\n')
            print(50*'=')
    step += 1
    offset += (n_input+1)
print('Finished!')



RuntimeError: CUDA error: device-side assert triggered
CUDA kernel errors might be asynchronously reported at some other API call,so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1.