In [1]:
# %load sample.py
from __future__ import unicode_literals, print_function, division
from io import open
import unicodedata
import string
import re
import random
import time
import math
import torch
import torch.nn as nn
from torch import optim
import torch.nn.functional as F
import matplotlib.pyplot as plt
plt.switch_backend('agg')
import matplotlib.ticker as ticker
import numpy as np
from os import system
from nltk.translate.bleu_score import SmoothingFunction, sentence_bleu

In [2]:
"""========================================================================================
The sample.py includes the following template functions:

1. Encoder, decoder
2. Training function
3. BLEU-4 score function

You have to modify them to complete the lab.
In addition, there are still other functions that you have to 
implement by yourself.

1. Your own dataloader (design in your own way, not necessary Pytorch Dataloader)
2. Output your results (BLEU-4 score, correction words)
3. Plot loss/score
4. Load/save weights
========================================================================================"""

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
#----------Hyper Parameters----------#
embedding_size = 32
hidden_size = 512
#The number of vocabulary
vocab_size = 28
teacher_forcing_ratio = 0.5
LR = 0.05

In [3]:
################################
#Example inputs of compute_bleu
################################
#The target word
reference = 'variable'
#The word generated by your model
output = 'varable'

#compute BLEU-4 score
def compute_bleu(output, reference):
    cc = SmoothingFunction()
    if len(reference) == 3:
        weights = (0.33,0.33,0.33)
    else:
        weights = (0.25,0.25,0.25,0.25)
    return sentence_bleu([reference], output,weights=weights,smoothing_function=cc.method1)

print(compute_bleu(output, reference))

0.5154486831107657


In [4]:
SOS_token = 0
EOS_token = 1
char2index = {"SOS": 0, "EOS": 1}
index2char = {0: "SOS", 1: "EOS"}
a2z = "abcdefghijklmnopqrstuvwxyz"
for index, char in enumerate(a2z, 2):
    char2index[char] = index
    index2char[index] = char

In [5]:
import json
with open("train.json") as f:
    line = json.load(f)

data_list = []
label_list = []
for pair in line:
    label_idx = []
    for character in pair["target"]:
        label_idx.append(char2index[character])
    label_idx = torch.tensor(label_idx)
    for word in pair["input"]:
        word_idx = []
        for character in word:
            word_idx.append(char2index[character])
        word_idx.append(char2index["EOS"])
        word_idx = torch.tensor(word_idx)
        data_list.append(word_idx)
        label_list.append(label_idx)

In [6]:
def ToOneHot(input_tensor):
    output_tensor = torch.zeros(input_tensor.shape[0], 1, 28)
    for i in range(input_tensor.shape[0]):
        output_tensor[i][0][input_tensor[i]] = 1
    return output_tensor

In [7]:
#Encoder
class EncoderRNN(nn.Module):
    def __init__(self, vocab_size, embedding_size, hidden_size):
        super(EncoderRNN, self).__init__()
        self.hidden_size = hidden_size
        self.embedding_size = embedding_size
        self.embedding = nn.Embedding(vocab_size, embedding_size)
        self.gru = nn.GRU(embedding_size, hidden_size)
        self.rnn = nn.RNN(embedding_size, hidden_size)
#         self.lstm = nn.LSTM(hidden_size, hidden_size)

    def forward(self, input, hidden):
        embedded = self.embedding(input).view(-1, 1, self.embedding_size)
        output, hidden = self.gru(embedded, hidden)
        return output, hidden

    def initHidden(self):
        return torch.zeros(1, 1, self.hidden_size, device=device)

In [8]:
#Decoder
class DecoderRNN(nn.Module):
    def __init__(self, embedding_size, hidden_size, output_size):
        super(DecoderRNN, self).__init__()
        self.embedding_size = embedding_size
        self.hidden_size = hidden_size
        self.teacher_forcing_ratio = 0.5

        self.embedding = nn.Embedding(output_size, embedding_size)
        self.gru = nn.GRU(embedding_size, hidden_size)
        self.rnn = nn.RNN(embedding_size, hidden_size)
        self.out = nn.Linear(hidden_size, output_size)
        self.log_softmax = nn.LogSoftmax(dim=2)

    def forward(self, input, hidden):
        output = self.embedding(input).view(-1, 1, self.embedding_size)
#         output = F.relu(output)
        output, hidden = self.gru(output, hidden)
        output = self.log_softmax(self.out(output))
        return output, hidden

    def initHidden(self):
        return torch.zeros(1, 1, self.hidden_size, device=device)

In [40]:
def train(input_tensor, target_tensor, encoder, decoder, encoder_optimizer, decoder_optimizer, loss_fn):
    encoder.train()
    decoder.train()
    input_tensor = input_tensor.to(device)
    target_tensor = target_tensor.to(device)
    encoder_hidden = encoder.initHidden()

    encoder_optimizer.zero_grad()
    decoder_optimizer.zero_grad()

    input_length = input_tensor.size(0)
    target_length = len(target_tensor)
    
    target_1hot = ToOneHot(target_tensor).to(device)
#     encoder_outputs = torch.zeros(max_length, encoder.hidden_size, device=device)

    total_loss = 0

    #----------sequence to sequence part for encoder----------#
    encoder_output, encoder_hidden = encoder(input_tensor.to(device), encoder_hidden)
    decoder_input = torch.tensor([SOS_token], device=device)
    decoder_hidden = encoder_hidden
    decoder_outputs = torch.zeros(target_length, 1, 28).to(device)
    use_teacher_forcing = True if random.random() < teacher_forcing_ratio else False
	

    #----------sequence to sequence part for decoder----------#
    
        # Teacher forcing: Feed the target as the next input
    for di in range(target_length):
        decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
        decoder_outputs[di][0] = decoder_output
#         loss = loss_fn(decoder_output.view(1, -1), target_1hot[di].view(1, -1).to(device))
#         total_loss += loss.item()
        
        if use_teacher_forcing:
            decoder_input = target_tensor[di].view(1,).to(device)  # Teacher forcing
        else:
            decoder_input = decoder_output.argmax()

#     decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
#     loss = loss_fn(decoder_output.view(1, -1), ToOneHot(torch.tensor([EOS_token]))[0].view(1, -1).to(device))
#     total_loss += loss.item()

#     else:
        # Without teacher forcing: use its own predictions as the next input
#         for di in range(target_length):
#             decoder_output, decoder_hidden, decoder_attention = decoder(
#                 decoder_input, decoder_hidden, encoder_outputs)
#             topv, topi = decoder_output.topk(1)
#             decoder_input = topi.squeeze().detach()  # detach from history as input

#             loss += criterion(decoder_output, target_tensor[di])
#             if decoder_input.item() == EOS_token:
#                 break


#     print(f"Loss: {total_loss / target_length}")
    loss = loss_fn(decoder_outputs.squeeze(1), target_1hot.squeeze(1))
    total_loss += loss.item()
    loss.backward()

    encoder_optimizer.step()
    decoder_optimizer.step()

    return total_loss

In [10]:
def asMinutes(s):
    m = math.floor(s / 60)
    s -= m * 60
    return '%dm %ds' % (m, s)


def timeSince(since, percent):
    now = time.time()
    s = now - since
    es = s / (percent)
    rs = es - s
    return '%s (- %s)' % (asMinutes(s), asMinutes(rs))

In [58]:
def trainIters(data_list, label_list, encoder, decoder, n_iters, print_every=1000, plot_every=100, learning_rate=0.001):
    start = time.time()
    plot_losses = []
    print_loss_total = 0  # Reset every print_every
    plot_loss_total = 0  # Reset every plot_every

    encoder_optimizer = optim.Adam(encoder.parameters(), lr=learning_rate)
    decoder_optimizer = optim.Adam(decoder.parameters(), lr=learning_rate)
    # your own dataloader
#     training_pairs = ...

    loss_fn = nn.CrossEntropyLoss()

    for i in range(n_iters):
#         training_pair = training_pairs[iter - 1]
        input_tensor = data_list[i]
        target_tensor = label_list[i]
#         print(target_tensor)

        loss = train(input_tensor, target_tensor, encoder,
                     decoder, encoder_optimizer, decoder_optimizer, loss_fn)
        print_loss_total += loss
        plot_loss_total += loss

        if (i+1) % print_every == 0:
            print_loss_avg = print_loss_total / print_every
            print(print_loss_avg)
            print_loss_total = 0
            plot_losses.append(plot_loss_total)
            plot_loss_total = 0
#             print('%s (%d %d%%) %.4f' % (timeSince(start, i / (n_iters+1)),
#                                          i, i / (n_iters+1) * 100, print_loss_avg))
	

In [41]:
encoder1 = EncoderRNN(vocab_size, embedding_size, hidden_size).to(device)
decoder1 = DecoderRNN(embedding_size, hidden_size, vocab_size).to(device)

In [74]:
# for i in range(10):
trainIters(data_list, label_list, encoder1, decoder1, len(data_list))

1.1290841481238603
1.003202915839851
1.0657880315911026
1.0850331896897405
1.0999686969965696
1.1536850261930376
1.217569311897736
1.077885737478733
1.0311937897484749
1.1574456442221999
1.1529032539837063
0.972330510918051


In [75]:
with torch.no_grad():
    init_hid = encoder1.initHidden()
    out, hid = encoder1(data_list[50].to(device), init_hid)
    dec_hid = hid
    dec_in = torch.tensor([SOS_token], device=device)
    generated_word = []
    for i in range(len(label_list[50])):
        dec_out, dec_hid = decoder1(dec_in, dec_hid)
        print(dec_out.argmax(dim=2))
        generated_word.append(index2char[dec_out.argmax(dim=2).item()])
        dec_in = dec_out.argmax(dim=2).view(1,)
        if dec_in.item() == 1:
            break
    print(generated_word)

tensor([[20]], device='cuda:0')
tensor([[17]], device='cuda:0')
tensor([[17]], device='cuda:0')
tensor([[6]], device='cuda:0')
tensor([[21]], device='cuda:0')
tensor([[21]], device='cuda:0')
tensor([[6]], device='cuda:0')
tensor([[19]], device='cuda:0')
tensor([[19]], device='cuda:0')
['s', 'p', 'p', 'e', 't', 't', 'e', 'r', 'r']


In [72]:
a = []
for i in range(len(label_list[50])):
    a.append(index2char[label_list[50][i].item()])
print(a)

['s', 'e', 'p', 't', 'e', 'm', 'b', 'e', 'r']
