In [1]:
import os
import torch
import pickle

from torch import nn
from torch import optim
from torchtext import transforms
from torch.utils.data import DataLoader
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator

from utils import data_download

In [2]:
def save_pkl(data, fname):
    with open(fname, "wb") as f:
        pickle.dump(data, f)


def load_pkl(fname):
    with open(fname, "rb") as f:
        data = pickle.load(f)
    return data

In [3]:
class Multi30k:
    UNK, UNK_IDX = "<unk>", 0
    PAD, PAD_IDX = "<pad>", 1
    SOS, SOS_IDX = "<sos>", 2
    EOS, EOS_IDX = "<eos>", 3
    SPECIALS = {UNK : UNK_IDX, PAD : PAD_IDX, SOS : SOS_IDX, EOS : EOS_IDX}

    def __init__(self, data_dir, target_language, max_seq_len, min_freq):
        self.data_dir = f"{data_dir}/Multi30k"
        self.cache_dir = f"{self.data_dir}/caches"

        if not os.path.isdir(self.data_dir):
            data_download(self.data_dir)

        self.target_language = target_language
        self.max_seq_len = max_seq_len
        self.min_freq = min_freq

        self.target_tokenizer = self.build_tokenizer(self.target_language)

        self.train, self.valid, self.test = None, None, None
        self.build_dataset()

        self.vocab = None
        self.build_vocab()

        self.vocab_transform = None
        self.build_transform()


    def build_tokenizer(self, language):
        spacy_lang_dict = {'en': "en_core_web_sm", 'de': "de_core_news_sm"}
        assert language in spacy_lang_dict.keys()

        return get_tokenizer("spacy", spacy_lang_dict[language])


    def build_dataset(self):
        if not os.path.isdir(self.cache_dir):
            os.makedirs(self.cache_dir)

        train_pkl = f"{self.cache_dir}/train.pkl"
        if os.path.exists(train_pkl):
            self.train = load_pkl(train_pkl)

        else:
            with open(f"{self.data_dir}/train.en") as f:
                self.train = [text.rstrip() for text in f]
        
            save_pkl(self.train, train_pkl)

        val_pkl = f"{self.cache_dir}/val.pkl"
        if os.path.exists(val_pkl):
            self.val = load_pkl(val_pkl)
            
        else:
            with open(f"{self.data_dir}/val.en") as f:
                self.val = [text.rstrip() for text in f]
        
            save_pkl(self.val, val_pkl)

        test_pkl = f"{self.cache_dir}/test.pkl"
        if os.path.exists(test_pkl):
            self.test = load_pkl(test_pkl)
            
        else:
            with open(f"{self.data_dir}/test.en") as f:
                self.test = [text.rstrip() for text in f]
        
            save_pkl(self.test, test_pkl)


    def build_vocab(self):
        assert self.train is not None

        def yield_tokens():
            for text in self.train:
                yield [str(token) for token in self.target_tokenizer(text)]

        vocab_file = f"{self.cache_dir}/vocab_{self.target_language}.pkl"
        if os.path.exists(vocab_file):
            vocab = load_pkl(vocab_file)
        else:
            vocab = build_vocab_from_iterator(yield_tokens(), min_freq=self.min_freq, specials=self.SPECIALS.keys())
            vocab.set_default_index(self.UNK_IDX)
            save_pkl(vocab, vocab_file)

        self.vocab = vocab


    def build_transform(self):
        def get_transform(self, vocab):
            return transforms.Sequential(transforms.VocabTransform(vocab),
                                         transforms.Truncate(self.max_seq_len),
                                        #  transforms.AddToken(token=self.SOS_IDX, begin=True),
                                        #  transforms.AddToken(token=self.EOS_IDX, begin=False),
                                         transforms.ToTensor(padding_value=self.PAD_IDX))

        self.vocab_transform = get_transform(self, self.vocab)

    def one_hot_encoding(self, tensor):
        batch_size, sequence_length = tensor.size()
        one_hot_tensor = torch.zeros(batch_size, sequence_length, len(self.vocab), dtype=torch.int64)
        for i in range(batch_size):
            for j in range(sequence_length):
                one_hot_tensor[i, j, tensor[i, j]] = 1

        return one_hot_tensor        


    def collate_fn(self, batch):
        trg = [self.target_tokenizer(data) for data in batch]
        # trg_transform = self.vocab_transform(trg)
        
        x_data = [t[:-1] for t in trg]
        y_data = [t[1:] for t in trg]

        X = self.vocab_transform(x_data)
        Y = self.vocab_transform(y_data)

        X = self.one_hot_encoding(X)
        Y = self.one_hot_encoding(Y)
        
        return batch, X, Y


    def get_iter(self, **kwargs):
        if self.vocab_transform is None:
            self.build_transform()

        train_iter = DataLoader(self.train, collate_fn=self.collate_fn, **kwargs)
        valid_iter = DataLoader(self.valid, collate_fn=self.collate_fn, **kwargs)
        test_iter = DataLoader(self.test, collate_fn=self.collate_fn, **kwargs)

        return train_iter, valid_iter, test_iter

In [4]:
EPOCHS = 100
BATCH_SIZE = 4
MAX_SEQ_LEN = 256
MIN_FREQ = 1
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
NUM_WORKERS = min([os.cpu_count(), BATCH_SIZE if BATCH_SIZE > 1 else 0, 8])

DATASET = Multi30k("/home/pervinco/Datasets/test", "en", MAX_SEQ_LEN, MIN_FREQ)
train_iter, valid_iter, test_iter = DATASET.get_iter(batch_size=BATCH_SIZE, num_workers=NUM_WORKERS)

In [5]:
for idx, (sentence, X, Y) in enumerate(train_iter):
    print(X.shape)
    print(Y.shape)

    break

torch.Size([4, 14, 6191])
torch.Size([4, 14, 6191])


In [6]:
INPUT_DIM = len(DATASET.vocab)
OUTPUT_DIM = len(DATASET.vocab)
print(INPUT_DIM, OUTPUT_DIM)

EMBED_DIM = 256
HIDDEN_DIM = 512

6191 6191


In [7]:
class RNNLanguageModel(nn.Module):
    def __init__(self, input_dim, emb_dim, hidden_dim, output_dim, pad_idx):
        super().__init__()
        
        self.embedding = nn.Embedding(input_dim, emb_dim, padding_idx=pad_idx)
        self.rnn = nn.RNN(emb_dim, hidden_dim)
        self.fc = nn.Linear(hidden_dim, output_dim)
        
    def forward(self, src):
        embedded = self.embedding(src) ## src shape: [src_len, batch_size]
        output, _ = self.rnn(embedded) ## embedded shape: [src_len, batch_size, emb_dim]
        
        return self.fc(output) ## output shape: [src_len, batch_size, hidden_dim]


model = RNNLanguageModel(INPUT_DIM, EMBED_DIM, HIDDEN_DIM, OUTPUT_DIM, DATASET.PAD_IDX)
model = model.to(DEVICE)
print(model)

RNNLanguageModel(
  (embedding): Embedding(6191, 256, padding_idx=1)
  (rnn): RNN(256, 512)
  (fc): Linear(in_features=512, out_features=6191, bias=True)
)


In [8]:
optimizer = optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss(ignore_index=DATASET.PAD_IDX)

In [11]:
def train(model, iterator, optimizer, criterion):
    model.train()
    epoch_loss = 0
    for idx, (_, X, Y) in enumerate(iterator):
        X, Y = X.to(DEVICE), Y.to(DEVICE)
        print(X.shape, Y.shape)
        optimizer.zero_grad()
        output = model(X)

        print(output.shape)
        break

def evaluate(model, iterator, criterion):
    model.eval()
    epoch_loss = 0
    for idx, (sentence, X, Y) in enumerate(iterator):
        X, Y = X.to(DEVICE), Y.to(DEVICE)
        output = model(X)

        break

In [12]:
for epoch in range(EPOCHS):
    train_loss = train(model, train_iter, optimizer, criterion)
    print(train_loss)

torch.Size([4, 14, 6191]) torch.Size([4, 14, 6191])


AssertionError: RNN: Expected input to be 2-D or 3-D but received 4-D tensor

In [None]:
import torch.nn as nn

class RNNPredictor(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(RNNPredictor, self).__init__()

        # input_size: vocab_size
        # hidden_size: RNN의 hidden layer의 크기
        # output_size: 예측된 단어의 확률 분포를 위한 크기 (vocab_size와 동일)
        
        self.hidden_size = hidden_size

        # RNN layer
        self.rnn = nn.RNN(input_size, hidden_size, batch_first=True)
        
        # Fully connected layer to get the next word probabilities
        self.fc = nn.Linear(hidden_size, output_size)
        
    def forward(self, x, hidden=None):
        # Passing in the input and hidden state into the RNN
        r_out, hidden = self.rnn(x, hidden)
        
        # Reshaping the RNN output to fit into the fully connected layer
        r_out = r_out.contiguous().view(-1, self.hidden_size)
        
        # Getting the next word probabilities
        output = self.fc(r_out)
        
        # Reshaping to batch_size, seq_len, vocab_size
        output = output.view(x.size(0), x.size(1), -1)
        return output, hidden

    def init_hidden(self, batch_size):
        # Initializes hidden state
        return torch.zeros(1, batch_size, self.hidden_size)


In [None]:
class VanillaRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(VanillaRNN, self).__init__()

        self.hidden_size = hidden_size
        self.input_size = input_size

        # 입력과 hidden state를 동시에 처리할 수 있는 가중치
        self.i2h = nn.Linear(input_size + hidden_size, hidden_size)
        self.h2o = nn.Linear(hidden_size, output_size)

    def forward(self, input_seq, hidden):
        batch_size, seq_len, _ = input_seq.size()
        outputs = []

        for t in range(seq_len):
            input_t = input_seq[:, t, :]  # 현재 시점의 입력
            combined = torch.cat((input_t, hidden), dim=1)  # 입력과 hidden state를 결합
            hidden = torch.tanh(self.i2h(combined))
            output = self.h2o(hidden)
            outputs.append(output)

        return torch.stack(outputs, dim=1), hidden

    def init_hidden(self, batch_size):
        return torch.zeros(batch_size, self.hidden_size)


In [None]:
import torch
from torch import nn

class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(RNN, self).__init__()

        self.hidden_size = hidden_size
        self.input_size = input_size

        # 입력과 hidden state를 동시에 처리할 수 있는 가중치
        self.i2h = nn.Linear(input_size + hidden_size, hidden_size)
        self.h2o = nn.Linear(hidden_size, output_size)

    def forward(self, input_seq, hidden):
        batch_size, seq_len, _ = input_seq.size()
        outputs = []

        for t in range(seq_len):
            input_t = input_seq[:, t, :]  # 현재 시점의 입력
            combined = torch.cat((input_t, hidden), dim=1)  # 입력과 hidden state를 결합
            hidden = torch.tanh(self.i2h(combined))
            output = self.h2o(hidden)
            outputs.append(output)

        return torch.stack(outputs, dim=1), hidden

    def init_hidden_state(self, batch_size):
        return torch.zeros(batch_size, self.hidden_size)