In [31]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
import numpy as np
import random
from sklearn.model_selection import train_test_split
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [32]:
def create_argsort(n_samples, seq_len):
    """argsort task1"""
    data = np.random.randint(seq_len, size=(n_samples, seq_len))
    labels = np.argsort(data)
    return data, labels

def generate_single_seq(length=30, min_len=5, max_len=10):
    seq_before = [(random.randint(1, 5)) for x in range(random.randint(min_len, max_len))]
    seq_during = [(random.randint(6, 10)) for x in range(random.randint(min_len, max_len))]
    seq_after = [random.randint(1, 5) for x in range(random.randint(min_len, max_len))]
    seq = seq_before + seq_during + seq_after
    seq = seq + ([0] * (length - len(seq)))
    return seq, len(seq_before), len(seq_before) + len(seq_during) - 1

def generate_set_seq(N, seq_len):
    """boundary task2"""
    data = []
    starts = []
    ends = []
    for i in range(N):
        seq, ind_start, ind_end = generate_single_seq(seq_len)
        data.append(seq)
        starts.append(ind_start)
        ends.append(ind_end)
    data = np.array(data)
    labels = np.vstack((starts, ends)).T
    return data, labels

In [33]:
embedding_dim = 100
hidden_dim = 100
batch_size = 64

n_samples = 10000
# task 1
# input_dim = 10
# output_dim = 10
# max_trg_len = 10
# data, labels = create_argsort(n_samples, 10)

# task 2
input_dim = 11
output_dim = 30
max_trg_len = 2
data, labels = generate_set_seq(n_samples, 30)
X_train, X_test, Y_train, Y_test = train_test_split(data, labels, test_size=0.1, random_state=42)

In [34]:
class Seq2seq(nn.Module):
    def __init__(self, input_dim, output_dim, embedding_dim, hidden_dim):
        super().__init__()
        self.input_dim = input_dim
        self.output_dim = output_dim
        self.embedding_dim = embedding_dim
        self.hidden_dim = hidden_dim
        self.encoder_embedding = nn.Embedding(input_dim, embedding_dim)
        self.decoder_embedding = nn.Embedding(output_dim, embedding_dim)
        self.encoder = nn.GRU(embedding_dim, hidden_dim)
        self.decoder = nn.GRUCell(embedding_dim, hidden_dim)
        self.fc = nn.Linear(embedding_dim + hidden_dim, output_dim)
        
    def forward(self, inputs, targets):
        batch_size = inputs.size(1)
        max_len = targets.size(0)
        # (L, B)
        embedded = self.encoder_embedding(inputs)
        targets = self.decoder_embedding(targets)
        # (L, B, E)
        _, hidden = self.encoder(embedded)
        # initialize 
        decoder_outputs = torch.zeros((max_len, batch_size, self.output_dim)).to(device)
        decoder_input = torch.zeros((batch_size, self.embedding_dim)).to(device)
        hidden = hidden.squeeze(0) # (B, H)
        for i in range(max_len):
            hidden = self.decoder(decoder_input, hidden)
            # (B, H)
            output = F.log_softmax(F.relu(self.fc(torch.cat((decoder_input, hidden), 1))), 1)
            decoder_outputs[i] = output
            decoder_input = targets[i]

        return decoder_outputs
    
    def predict(self, inputs, max_trg_len):
        batch_size = inputs.size(1)
        # (L, B)
        embedded = self.encoder_embedding(inputs)
        # (L, B, E)
        _, hidden = self.encoder(embedded)
        # initialize 
        decoder_outputs = torch.zeros((max_trg_len, batch_size, self.output_dim)).to(device)
        decoder_input = torch.zeros((batch_size, self.embedding_dim)).to(device)
        hidden = hidden.squeeze(0) # (B, H)
        for i in range(max_trg_len):
            hidden = self.decoder(decoder_input, hidden)
            # (B, H)
            output = F.log_softmax(F.relu(self.fc(torch.cat((decoder_input, hidden), 1))), 1)
            decoder_outputs[i] = output
            _, indices = torch.max(output, 1)
            decoder_input = self.decoder_embedding(indices)

        return decoder_outputs

In [35]:
def attention(query, key, value):
    """query[B, H], key[B, L, H], value[B, L, H]"""
    query = query.unsqueeze(1).repeat(1, key.size(1), 1)
    # (B, L, H)
    score = torch.sum(query * key, -1)
    attn = F.softmax(score, -1).unsqueeze(1)
    # (B, 1, L)
    outputs = torch.matmul(attn, value)
    return outputs.squeeze(1), attn.squeeze(1)

In [36]:
class Seq2seqAttn(nn.Module):
    def __init__(self, input_dim, output_dim, embedding_dim, hidden_dim):
        super().__init__()
        self.input_dim = input_dim
        self.output_dim = output_dim
        self.embedding_dim = embedding_dim
        self.hidden_dim = hidden_dim
        self.encoder_embedding = nn.Embedding(input_dim, embedding_dim)
        self.decoder_embedding = nn.Embedding(output_dim, embedding_dim)
        self.encoder = nn.GRU(embedding_dim, hidden_dim)
        self.decoder = nn.GRUCell(embedding_dim, hidden_dim)
        self.fc = nn.Linear(embedding_dim + hidden_dim * 2, output_dim)
        
    def forward(self, inputs, targets):
        batch_size = inputs.size(1)
        max_len = targets.size(0)
        # (L, B)
        embedded = self.encoder_embedding(inputs)
        targets = self.decoder_embedding(targets)
        # (L, B, E)
        encoder_outputs, hidden = self.encoder(embedded)
        # initialize 
        decoder_outputs = torch.zeros((max_len, batch_size, self.output_dim)).to(device)
        decoder_input = torch.zeros((batch_size, self.embedding_dim)).to(device)
        hidden = hidden.squeeze(0) # (B, H)
        for i in range(max_len):
            hidden = self.decoder(decoder_input, hidden)
            # (B, H)
            context, _ = attention(hidden, encoder_outputs.transpose(0, 1), encoder_outputs.transpose(0, 1))
            output = F.log_softmax(F.relu(self.fc(torch.cat((decoder_input, hidden, context), 1))), 1)
            decoder_outputs[i] = output
            decoder_input = targets[i]

        return decoder_outputs
    
    def predict(self, inputs, max_trg_len):
        batch_size = inputs.size(1)
        # (L, B)
        embedded = self.encoder_embedding(inputs)
        # (L, B, E)
        encoder_outputs, hidden = self.encoder(embedded)
        # initialize 
        decoder_outputs = torch.zeros(max_trg_len, batch_size, self.output_dim).to(device)
        decoder_input = torch.zeros((batch_size, self.embedding_dim)).to(device)
        hidden = hidden.squeeze(0) # (B, H)
        for i in range(max_trg_len):
            hidden = self.decoder(decoder_input, hidden)
            # (B, H)
            context, _ = attention(hidden, encoder_outputs.transpose(0, 1), encoder_outputs.transpose(0, 1))
            output = F.log_softmax(F.relu(self.fc(torch.cat((decoder_input,hidden, context), 1))), 1)
            decoder_outputs[i] = output
            _, indices = torch.max(output, 1)
            decoder_input = self.decoder_embedding(indices)

        return decoder_outputs

In [37]:
class PtrNet(nn.Module):
    def __init__(self, input_dim, output_dim, embedding_dim, hidden_dim):
        super().__init__()
        self.input_dim = input_dim
        self.output_dim = output_dim
        self.embedding_dim = embedding_dim
        self.hidden_dim = hidden_dim
        
        self.encoder_embedding = nn.Embedding(input_dim, embedding_dim)
        self.decoder_embedding = nn.Embedding(output_dim, embedding_dim)
        self.encoder = nn.GRU(embedding_dim, hidden_dim)
        self.decoder = nn.GRUCell(embedding_dim, hidden_dim)
        
        self.W1 = nn.Linear(hidden_dim, hidden_dim, bias=False)
        self.W2 = nn.Linear(hidden_dim, hidden_dim, bias=False)
        self.v = nn.Linear(hidden_dim, 1, bias=False)
    
    def forward(self, inputs, targets):
        batch_size = inputs.size(1)
        max_len = targets.size(0)
        # (L, B)
        embedded = self.encoder_embedding(inputs)
        targets = self.decoder_embedding(targets)
        # (L, B, E)
        encoder_outputs, hidden = self.encoder(embedded)
        # (L, B, H), (1, B, H)
        # initialize 
        decoder_outputs = torch.zeros((max_len, batch_size, self.output_dim)).to(device)
        decoder_input = torch.zeros((batch_size, self.embedding_dim)).to(device)
        hidden = hidden.squeeze(0) # (B, H)
        for i in range(max_len):
            hidden = self.decoder(decoder_input, hidden)
            # (B, H)
            projection1 = self.W1(encoder_outputs)
            # (L, B, H)
            projection2 = self.W2(hidden)
            # (B, H)
            output = F.log_softmax(self.v(F.relu(projection1 + projection2)).squeeze(-1).transpose(0, 1), -1)
            decoder_outputs[i] = output
            decoder_input = targets[i]

        return decoder_outputs

    
    def predict(self, inputs, max_trg_len):
        batch_size = inputs.size(1)
        # (L, B)
        embedded = self.encoder_embedding(inputs)
        # (L, B, E)
        encoder_outputs, hidden = self.encoder(embedded)
        # (L, B, H), (1, B, H)
        # initialize 
        decoder_outputs = torch.zeros(max_trg_len, batch_size, self.output_dim).to(device)
        decoder_input = torch.zeros((batch_size, self.embedding_dim)).to(device)
        hidden = hidden.squeeze(0) # (B, H)
        for i in range(max_trg_len):
            hidden = self.decoder(decoder_input, hidden)
            # (B, H)
            projection1 = self.W1(encoder_outputs)
            # (L, B, H)
            projection2 = self.W2(hidden)
            # (B, H)
            a = self.v(F.relu(projection1 + projection2))
            output = F.log_softmax(self.v(F.relu(projection1 + projection2)).squeeze(-1).transpose(0, 1), -1)
            decoder_outputs[i] = output
            _, indices = torch.max(output, 1)
            decoder_input = self.decoder_embedding(indices)

        return decoder_outputs

In [38]:
def train(model, X, Y, batch_size, n_epochs):
    model.train()
    optimizer = optim.Adam(model.parameters())
    criterion = nn.NLLLoss().to(device)
    n_samples = X.shape[0]
    seq_len = X.shape[1]
    for epoch in range(n_epochs + 1):
        epoch_loss = 0
        for i in range(0, n_samples, batch_size):
            batch_X = X[i:i + batch_size]
            batch_Y = Y[i:i + batch_size]
            batch_X = torch.LongTensor(batch_X).transpose(0,1).to(device)
            batch_Y = torch.LongTensor(batch_Y).transpose(0,1).to(device)
            outputs = model(batch_X, batch_Y).view(-1, seq_len)
            targets = batch_Y.view(-1)
            
            loss = criterion(outputs, targets)
            epoch_loss += loss.item()
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        print('epoch: {} | total loss: {:.4f}'.format(epoch, epoch_loss))

In [39]:
def rand_evaluate(model):
    """for task 1"""
    data = np.random.randint(seq_len, size=(1, max_trg_len))
    print('>', data.flatten())
    print('=', np.argsort(data).flatten())
    inputs = torch.from_numpy(data.reshape(seq_len, -1)).to(device)
    outputs = model.predict(inputs, max_trg_len).squeeze(1)
    print('<', torch.max(outputs, 1)[1].cpu().numpy())

def accuracy(model, X, Y):
    batch_size = X.shape[0]
    inputs = torch.LongTensor(X).transpose(0,1).to(device)
    probs = model.predict(inputs, max_trg_len)
    _, indices = torch.max(probs, 2)
    predicted = indices.t().cpu().numpy()
    correct_count = sum([1 if all(p==y) else 0 for p, y in zip(predicted, Y)])
    print('Acc: {:.2f}% ({}/{})'.format(correct_count / batch_size * 100, correct_count, batch_size))

In [40]:
# model = Seq2seq(input_dim, output_dim, embedding_dim, hidden_dim).to(device)
# model = Seq2seqAttn(input_dim, output_dim, embedding_dim, hidden_dim).to(device)
model = PtrNet(input_dim, output_dim, embedding_dim, hidden_dim).to(device)
train(model, X_train, Y_train, batch_size, 20)

epoch: 0 | total loss: 86.9745
epoch: 1 | total loss: 0.3416
epoch: 2 | total loss: 0.0915
epoch: 3 | total loss: 0.0412
epoch: 4 | total loss: 0.0231
epoch: 5 | total loss: 0.0147
epoch: 6 | total loss: 0.0101
epoch: 7 | total loss: 0.0073
epoch: 8 | total loss: 0.0055
epoch: 9 | total loss: 0.0043
epoch: 10 | total loss: 0.0034
epoch: 11 | total loss: 0.0028
epoch: 12 | total loss: 0.0023
epoch: 13 | total loss: 0.0019
epoch: 14 | total loss: 0.0016
epoch: 15 | total loss: 0.0014
epoch: 16 | total loss: 0.0012
epoch: 17 | total loss: 0.0010
epoch: 18 | total loss: 0.0009
epoch: 19 | total loss: 0.0008
epoch: 20 | total loss: 0.0007


In [41]:
accuracy(model, X_test, Y_test)

Acc: 100.00% (1000/1000)
