In [101]:
import torch.nn as nn
import torch
import numpy as np
import torch.nn.functional as F

## 1. 定义超参数

In [102]:
time_steps = 5 # 时间步
n_hidden = 128 # RNN隐藏层大小
EPOCH = 5000
epoch_print = 1000

## 2. 加载数据

In [103]:
datas = [['man', 'women'], ['black', 'white'], ['king', 'queen'], ['girl', 'boy'], ['up', 'down'], ['high', 'low']]
char_list = [chr(i) for i in range(ord('a'), ord('a') + 26)]
char_list.extend(list('SEP'))

In [104]:
# 字符与index对应
char_to_idx = {char: idx for idx, char in enumerate(char_list)}
idx_to_char = {idx: char for char, idx in char_to_idx.items()}
n_classes = len(char_to_idx)
batch_size = len(datas)

In [105]:
def make_batch(datas):
    # encode的输入, decoder的输入, 目标结果
    input_batch, output_batch, target_batch = [], [], []
    # 填充较短的单词（句子）
    for data in datas:
        for i in range(2):
            data[i] += 'P' * (time_steps - len(data[i]))

        # char转index
        input = [char_to_idx[char] for char in data[0]]
        # output 作为decoder的输入，添加标记符号S，作为其开始
        output = [char_to_idx[char] for char in ('S' + data[1])]
        target = [char_to_idx[char] for char in (data[1] + 'E')]

        # 转化为tensor，并加入batch
        # F.one_hot返回的张量
        #   size: (time_steps, n_classes)
        #   type: torch.LongTensor
        input_batch.append(F.one_hot(torch.tensor(input), n_classes))
        output_batch.append(F.one_hot(torch.tensor(output), n_classes))
        target_batch.append(torch.LongTensor(target))

    # 将列表（其中元素为tensor）整合为一个tensor
    # batch:
    #   size: (batch_size, time_steps, n_classes)
    #   type: torch.FloatTensor
    return (torch.stack(input_batch, dim=0).float(),
            torch.stack(output_batch, dim=0).float(),
            torch.stack(target_batch, dim=0).float())

In [106]:
input_batch, output_batch, target_batch = make_batch(datas)

## 3. 模型

In [113]:
class SeqToSeq(nn.Module):
    def __init__(self):
        super(SeqToSeq, self).__init__()

        # 第一维为batch_size
        self.encoder = nn.RNN(input_size=n_classes, hidden_size=n_hidden, dropout=0.5, batch_first=True)
        self.decoder = nn.RNN(input_size=n_classes, hidden_size=n_hidden, dropout=0.5, batch_first=True)

        self.fc = nn.Linear(n_hidden, n_classes)

    def forward(self, enc_input, enc_hidden, dec_input):
        '''

        :param enc_input: size: (batch_size, time_steps, n_classes)
        :param enc_hidden: size: (num_layers * num_directions, batch_size, n_hidden)
        :param dec_input: size: (batch_size, time_steps + 1, n_classes) 解码器的输入多了一个开始符号S
        :return:
        '''
        # _ size: (batch_size, time_steps, num_directions(=1) * n_hidden(=128))
        # enc_state: (batch_size, num_layers(=1) * num_directions(=1), n_hidden)
        _, enc_state = self.encoder(enc_input, enc_hidden)
        # outputs size: (batch_size, time_steps + 1, num_directions(=1) * n_hidden(=128))
        # _ size: (batch_size, num_layers(=1) * num_directions(=1), n_hidden)
        outputs, _ = self.decoder(dec_input, enc_state)
        outputs = self.fc(outputs) # size: (batch_size, time_steps + 1, n_classes)
        return outputs

In [114]:
lr = 0.001
model = SeqToSeq()
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)



## 4. 训练

In [115]:

for epoch in range(EPOCH):
    hidden = torch.zeros(1,batch_size, n_hidden)
    # print(input_batch.type(), hidden.type(), output_batch.type())
    outputs = model(input_batch, hidden, output_batch) # size: (batch_size, time_steps + 1, n_classes)

    # 计算损失
    loss = 0
    for i in range(outputs.size()[0]):
        #print(outputs[i].type(), target_batch[i].type())
        loss += criterion(outputs[i], target_batch[i].long())

    if (epoch + 1) % epoch_print == 0:
        print('Epoch = %d Loss = %.6f'%(epoch + 1, loss))

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

Epoch = 1000 Loss = 0.003916
Epoch = 2000 Loss = 0.001048
Epoch = 3000 Loss = 0.000442
Epoch = 4000 Loss = 0.000220
Epoch = 5000 Loss = 0.000118


In [122]:
def make_test_batch(word):
    input_batch, output_batch = [], []
    input_word = word + 'P' * (time_steps - len(word))
    input = [char_to_idx[char] for char in input_word]
    output = [char_to_idx[char] for char in 'S' + 'P' * time_steps]

    input_batch.append(F.one_hot(torch.tensor(input), n_classes))
    output_batch.append(F.one_hot(torch.tensor(output), n_classes))

    return (
        torch.stack(input_batch, dim=0).float(),
        torch.stack(output_batch, dim=0).float(),
    )

In [148]:
def translate(word):
    input_batch, output_batch = make_test_batch(word)
    # 因为一个单词, batch_size=1
    hidden = torch.zeros(1, 1, n_hidden)

    # size: (batch_size=1, time_steps+1, n_classes)
    outputs = model(input_batch, hidden, output_batch)
    # 取最大值的index
    # squeeze 把batch_size那一维去掉
    predicts = torch.argmax(outputs.squeeze(), dim=1).numpy().tolist()

    predict_word = ''.join([idx_to_char[idx] for idx in predicts])
    # 去除E, P标记
    predict_word = predict_word.replace('P', '').replace('E', '')
    return predict_word

In [149]:
print('test')
print('man ->', translate('man'))
print('mans ->', translate('mans'))
print('king ->', translate('king'))
print('black ->', translate('black'))
print('upp ->', translate('upp'))

test
man -> women
mans -> women
king -> queen
black -> white
upp -> down
