In [1]:
from torch.utils.data import TensorDataset, DataLoader
from torch.utils.tensorboard import SummaryWriter
import torch.nn as nn
import numpy as np
import torch

class RNN(nn.Module):
    def __init__(self, vocab_size, dw, dh, output):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, dw, padding_idx=vocab_size-1)
        self.rnn = nn.RNN(dw, dh, batch_first=True, bidirectional=True)
        self.fc1 = nn.Linear(dh*2, output, bias=True)
        self.fc2 = nn.Softmax(dim=1)
        nn.init.xavier_normal_(self.rnn.weight_ih_l0)
        nn.init.xavier_normal_(self.rnn.weight_hh_l0)
        nn.init.xavier_normal_(self.rnn.weight_ih_l0_reverse)
        nn.init.xavier_normal_(self.rnn.weight_hh_l0_reverse)
        nn.init.xavier_normal_(self.fc1.weight)
    def forward(self, x):
        x = self.embed(x)
        _, x = self.rnn(x)
        rnn_out = torch.cat([x[-2,:,:], x[-1,:,:]], dim=1)
        x = self.fc1(rnn_out)
        x = self.fc2(x)
        return x

def calculate_loss_and_accuracy(model, dataset, device, criterion=None):
    dataloader = DataLoader(dataset, batch_size=1, shuffle=False)
    loss = 0.0
    total = 0
    correct = 0
    model = model.to(device)
    with torch.no_grad():
        for X, Y in dataloader:
            X = X.to(device)
            Y = Y.to(device)
            Y_pred = model(X)
            if criterion != None:
                loss += criterion(Y_pred, Y).item()
            pred = torch.argmax(Y_pred, dim=-1)
            total += len(Y)
            correct += (pred == Y).sum().item()

    return loss / len(dataset), correct / total

def train_model(X_train, y_train, X_test, y_test, batch_size, model, lr, num_epochs, device, collate_fn=None):
    dataset_train = TensorDataset(X_train, y_train)
    dataset_test = TensorDataset(X_test, y_test)
    model = model.to(device)
    dataloader_train = DataLoader(dataset_train, batch_size=batch_size, shuffle=True)
    dataloader_test = DataLoader(dataset_test, batch_size=batch_size, shuffle=True)
    criterion = nn.CrossEntropyLoss()
    criterion = criterion.to(device)
    for ep in range(num_epochs):
        optimizer = torch.optim.SGD(model.parameters(), lr=lr)
        if ep%30==0:
            lr = lr * 0.1
        model.train()
        for X, Y in dataloader_train:
            X = X.to(device)
            Y = Y.to(device)
            optimizer.zero_grad()
            Y_pred = model(X)
            loss = criterion(Y_pred, Y)
            loss.backward()
            optimizer.step()
        model.eval()

        loss_train, acc_train = calculate_loss_and_accuracy(model, dataset_train, device, criterion=criterion)
        loss_test, acc_test = calculate_loss_and_accuracy(model, dataset_test, device, criterion=criterion)

        print(f'epoch: {ep + 1}, loss_train: {loss_train:.4f}, accuracy_train: {acc_train:.4f}, loss_Test: {loss_test:.4f}, accuracy_Test: {acc_test:.4f}')
        TensorboardWriter(model, X_train, Y_train, ep, loss_train, "Train", device)
        TensorboardWriter(model, X_test, Y_test, ep, loss_test, "Test", device)

def TensorboardWriter(model, X, Y, epoch, loss, name, device):
    writer = SummaryWriter(log_dir="logs")
    model = model.to(device)
    X = X.to(device)
    Y_pred = model(X)
    result = torch.max(Y_pred.data, dim=1).indices
    result = result.cpu().data.numpy()
    Y_pred = np.array([np.argmax(y) for y in Y_pred.cpu().data.numpy()])
    Y = np.array([np.argmax(y) for y in Y.cpu().data.numpy()])
    result = torch.tensor(result)
    Y = torch.tensor(Y)
    Y_pred = torch.tensor(Y_pred)
    accuracy = result.eq(Y).sum().numpy()/len(Y_pred)
    writer.add_scalar("Loss/{}_Loss".format(name), loss, epoch)
    writer.add_scalar("Accuracy/{}_Accuracy".format(name), accuracy, epoch)
    writer.close()

def CountVocab(name):
    f = open("{}_code.txt".format(name), "r")
    lines = f.readlines()
    f.close()
    max_num = []
    for line in lines:
        line_t = line.split("\t")[2].replace("\n", "").split(" ")
        max_num.extend(map(int, line_t))
    vocab_max = max(max_num)+1
    return vocab_max

def GetCodeLow(name):
    f = open("{}_code.txt".format(name), "r")
    lines = f.readlines()
    f.close()
    num_list = []
    code_list = []
    pad_list = []
    for line in lines:
        line_s = line.split("\t")
        code_list.append(int(line_s[0]))
        num = line_s[2].replace("\n", "").split(" ")
        num = list(map(int, num))
        num_list.append(num)
        num_tensor = torch.tensor(num)
        pad_list.append(num_tensor)

    max_vocab = CountVocab("train")
    mlen = max([len(x) for x in num_list])
    pad_list = list(map(lambda x:x + [max_vocab]*(mlen-len(x)), num_list))
    pad_list = torch.tensor(pad_list)
    code_list = torch.tensor(code_list)
    return pad_list, code_list

X_train, Y_train = GetCodeLow("train")
X_test, Y_test = GetCodeLow("test")
BATCH_SIZE = 2
NUM_EPOCHS = 100
VOCAB_SIZE = CountVocab("train")+1
EMB_SIZE = 300
OUTPUT_SIZE = 4
HIDDEN_SIZE = 50
lr = 1e-2
device = "cuda:0"
model = RNN(VOCAB_SIZE, EMB_SIZE, HIDDEN_SIZE, OUTPUT_SIZE)
train_model(X_train, Y_train, X_test, Y_test, BATCH_SIZE, model, lr, NUM_EPOCHS, device)


FileNotFoundError: [Errno 2] No such file or directory: 'train_code.txt'