In [5]:
import torch
import torch.nn as nn

In [6]:
from torchtext.datasets import IMDB
from torch.utils.data.dataset import random_split

train_dataset = IMDB(split='train')
test_dataset = IMDB(split='test')
torch.manual_seed(1)
train_dataset, valid_dataset = random_split(list(train_dataset), [20000, 5000])

In [7]:
import re
from collections import Counter, OrderedDict

token_counts = Counter()


def tokenizer(text):
    text = re.sub('<[^>]*>', '', text)
    emoticons = re.findall('(?::|;|=)(?:-)?(?:\)|\(|D|P)', text.lower())
    text = re.sub('[\W]+', ' ', text.lower()) + \
           ' '.join(emoticons).replace('-', '')
    tokenized = text.split()
    return tokenized


for label, line in train_dataset:
    tokens = tokenizer(line)
    token_counts.update(tokens)

print('어휘 사전 크기 : ', len(token_counts))

어휘 사전 크기: 69023


In [8]:
from torchtext.vocab import vocab

sorted_by_freq_tuples = sorted(token_counts.items(), key=lambda x: x[1], reverse=True)
ordered_dict = OrderedDict(sorted_by_freq_tuples)

vocab = vocab(ordered_dict)

vocab.insert_token("<pad>", 0)
vocab.insert_token("<unk>", 1)
vocab.set_default_index(1)

print([vocab[token] for token in ['this', 'is', 'an', 'example']])

[11, 7, 35, 457]


In [9]:
if not torch.cuda.is_available():
    print("경고: 이 코드는 CPU에서 매우 느릴 수 있습니다.")

In [11]:
import torchtext
from torchtext import __version__ as torchtext_version
from pkg_resources import parse_version

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

text_pipeline = lambda x: [vocab[token] for token in tokenizer(x)]

if parse_version(torchtext.__version__) > parse_version("0.10"):
    label_pipeline = lambda x: 1. if x == 2 else 0.
else:
    label_pipeline = lambda x: 1. if x == 'pos' else 0.


def collate_batch(batch):
    label_list, text_list, lengths = [], [], []
    for _label, _text in batch:
        label_list.append(label_pipeline(_label))
        processed_text = torch.tensor(text_pipeline(_text), dtype=torch.int64)
        text_list.append(processed_text)
        lengths.append(processed_text.size(0))
    label_list = torch.tensor(label_list)
    lengths = torch.tensor(lengths)
    padded_text_list = nn.utils.rnn.pad_sequence(text_list, batch_first=True)
    return padded_text_list.to(device), label_list.to(device), lengths.to(device)

In [12]:
from torch.utils.data import DataLoader

dataloader = DataLoader(train_dataset, batch_size=4, shuffle=False, collate_fn=collate_batch)
text_batch, label_batch, length_batch = next(iter(dataloader))
print(text_batch)
print(label_batch)
print(length_batch)
print(text_batch.shape)

tensor([[   35,  1739,     7,   449,   721,     6,   301,     4,   787,     9,
             4,    18,    44,     2,  1705,  2460,   186,    25,     7,    24,
           100,  1874,  1739,    25,     7, 34415,  3568,  1103,  7517,   787,
             5,     2,  4991, 12401,    36,     7,   148,   111,   939,     6,
         11598,     2,   172,   135,    62,    25,  3199,  1602,     3,   928,
          1500,     9,     6,  4601,     2,   155,    36,    14,   274,     4,
         42945,     9,  4991,     3,    14, 10296,    34,  3568,     8,    51,
           148,    30,     2,    58,    16,    11,  1893,   125,     6,   420,
          1214,    27, 14542,   940,    11,     7,    29,   951,    18,    17,
         15994,   459,    34,  2480, 15211,  3713,     2,   840,  3200,     9,
          3568,    13,   107,     9,   175,    94,    25,    51, 10297,  1796,
            27,   712,    16,     2,   220,    17,     4,    54,   722,   238,
           395,     2,   787,    32,    27,  5236,  

In [13]:
batch_size = 32

train_dl = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_batch)
valid_dl = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_batch)
test_dl = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_batch)

In [15]:
embedding = nn.Embedding(num_embeddings=10, embedding_dim=3, padding_idx=0)
text_encoded_input = torch.LongTensor([[1, 2, 4, 5], [4, 3, 2, 0]])
print(embedding(text_encoded_input))

tensor([[[ 0.7039, -0.8321, -0.4651],
         [-0.3203,  2.2408,  0.5566],
         [-0.4643,  0.3046,  0.7046],
         [-0.7106, -0.2959,  0.8356]],

        [[-0.4643,  0.3046,  0.7046],
         [ 0.0946, -0.3531,  0.9124],
         [-0.3203,  2.2408,  0.5566],
         [ 0.0000,  0.0000,  0.0000]]], grad_fn=<EmbeddingBackward0>)


In [16]:
class RNN(nn.Module):
    def __init__(self, input_size, hidden_size):
        super().__init__()
        self.rnn = nn.RNN(input_size, hidden_size, num_layers=2, batch_first=True)
        self.fc = nn.Linear(hidden_size, 1)

    def forward(self, x):
        _, hidden = self.rnn(x)
        out = hidden[-1, :, :]
        out = self.fc(out)
        return out


model = RNN(64, 32)
print(model)
model(torch.randn(5, 3, 64))

RNN(
  (rnn): RNN(64, 32, num_layers=2, batch_first=True)
  (fc): Linear(in_features=32, out_features=1, bias=True)
)


tensor([[ 0.3183],
        [ 0.1230],
        [ 0.1772],
        [-0.1052],
        [-0.1259]], grad_fn=<AddmmBackward0>)

In [17]:
class RNN(nn.Module):
    def __init__(self, vocab_size, embed_dim, rnn_hidden_size, fc_hidden_size):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
        self.rnn = nn.LSTM(embed_dim, rnn_hidden_size, batch_first=True)
        self.fc1 = nn.Linear(rnn_hidden_size, fc_hidden_size)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(fc_hidden_size, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, text, lengths):
        out = self.embedding(text)
        out = nn.utils.rnn.pack_padded_sequence(out, lengths.cpu().numpy(), enforce_sorted=False, batch_first=True)
        out, (hidden, cell) = self.rnn(out)
        out = hidden[-1, :, :]
        out = self.fc1(out)
        out = self.relu(out)
        out = self.fc2(out)
        out = self.sigmoid(out)
        return out


vocab_size = len(vocab)
embed_dim = 20
rnn_hidden_size = 64
fc_hidden_size = 64
torch.manual_seed(1)
model = RNN(vocab_size, embed_dim, rnn_hidden_size, fc_hidden_size)
model = model.to(device)

In [23]:
def train(dataloader):
    model.train()
    total_acc, total_loss = 0, 0
    for text_batch, label_batch, lengths in dataloader:
        optimizer.zero_grad()
        pred = model(text_batch, lengths)[:, 0]
        loss = loss_fn(pred, label_batch)
        loss.backward()
        optimizer.step()
        total_acc += ((pred >= 0.5).float() == label_batch).float().sum().item()
        total_loss += loss.item() * label_batch.size(0)
    return total_acc / len(dataloader.dataset), total_loss / len(dataloader.dataset)


def evaluate(dataloader):
    model.eval()
    total_acc, total_loss = 0, 0
    with torch.no_grad():
        for text_batch, label_batch, lengths in dataloader:
            pred = model(text_batch, lengths)[:, 0]
            loss = loss_fn(pred, label_batch)
            total_acc += ((pred >= 0.5).float() == label_batch).float().sum().item()
            total_loss += loss.item() * label_batch.size(0)
    return total_acc / len(list(dataloader.dataset)), total_loss / len(list(dataloader.dataset))

In [24]:
loss_fn = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

num_epochs = 10
torch.manual_seed(1)
for epoch in range(num_epochs):
    acc_train, loss_train = train(train_dl)
    acc_valid, loss_valid = evaluate(valid_dl)
    print(f'에포크 {epoch} 정확도 : {acc_train:.4f} 검증 정확도 : {acc_valid:.4f}')

에포크 0 정확도: 0.9563 검증 정확도: 0.8582
에포크 1 정확도: 0.9711 검증 정확도: 0.8672
에포크 2 정확도: 0.9760 검증 정확도: 0.8714
에포크 3 정확도: 0.9816 검증 정확도: 0.8692
에포크 4 정확도: 0.9819 검증 정확도: 0.8666
에포크 5 정확도: 0.9831 검증 정확도: 0.8640
에포크 6 정확도: 0.9885 검증 정확도: 0.8612
에포크 7 정확도: 0.9913 검증 정확도: 0.8576
에포크 8 정확도: 0.9943 검증 정확도: 0.8570
에포크 9 정확도: 0.9940 검증 정확도: 0.8586


In [25]:
acc_test, _ = evaluate(test_dl)
print(f'테스트 정확도 : {acc_test:.4f}')

테스트 정확도: 0.8494


In [26]:
class RNN(nn.Module):
    def __init__(self, vocab_size, embed_dim, rnn_hidden_size, fc_hidden_size):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
        self.rnn = nn.LSTM(embed_dim, rnn_hidden_size, batch_first=True, bidirectional=True)
        self.fc1 = nn.Linear(rnn_hidden_size * 2, fc_hidden_size)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(fc_hidden_size, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, text, lengths):
        out = self.embedding(text)
        out = nn.utils.rnn.pack_padded_sequence(out, lengths.cpu().numpy(), enforce_sorted=False, batch_first=True)
        _, (hidden, cell) = self.rnn(out)
        out = torch.cat((hidden[-2, :, :], hidden[-1, :, :]), dim=1)
        out = self.fc1(out)
        out = self.relu(out)
        out = self.fc2(out)
        out = self.sigmoid(out)
        return out


torch.manual_seed(1)
model = RNN(vocab_size, embed_dim, rnn_hidden_size, fc_hidden_size)
model = model.to(device)

In [27]:
loss_fn = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.002)
num_epochs = 10
torch.manual_seed(1)
for epoch in range(num_epochs):
    acc_train, loss_train = train(train_dl)
    acc_valid, loss_valid = evaluate(valid_dl)
    print(f'에포크 {epoch} 정확도 : {acc_train:.4f} 검증 정확도 : {acc_valid:.4f}')

에포크 0 정확도: 0.6254 검증 정확도: 0.7030
에포크 1 정확도: 0.7802 검증 정확도: 0.7444
에포크 2 정확도: 0.8495 검증 정확도: 0.8462
에포크 3 정확도: 0.8912 검증 정확도: 0.8384
에포크 4 정확도: 0.9315 검증 정확도: 0.8546
에포크 5 정확도: 0.9522 검증 정확도: 0.8546
에포크 6 정확도: 0.9700 검증 정확도: 0.8700
에포크 7 정확도: 0.9792 검증 정확도: 0.8448
에포크 8 정확도: 0.9857 검증 정확도: 0.8668
에포크 9 정확도: 0.9943 검증 정확도: 0.8664


In [28]:
test_dataset = IMDB(split='test')
test_dl = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_batch)

In [29]:
acc_test, _ = evaluate(test_dl)
print(f'테스트 정확도 : {acc_test:.4f}')

테스트 정확도: 0.8480


In [None]:
with open('1268-0.txt', 'r', encoding="utf8") as fp:
    text = fp.read()

start_indx = text.find('THE MYSTERIOUS ISLAND')
end_indx = text.find('End of the Project Gutenberg')

text = text[start_indx:end_indx]
char_set = set(text)
print('전체 길이 : ', len(text))
print('고유한 문자 : ', len(char_set))

In [None]:
import numpy as np

chars_sorted = sorted(char_set)
char2int = {ch: i for i, ch in enumerate(chars_sorted)}
char_array = np.array(chars_sorted)

text_encoded = np.array(
    [char2int[ch] for ch in text],
    dtype=np.int32)

print('인코딩된 텍스트 크기 : ', text_encoded.shape)

print(text[:15], '     == 인코딩 ==> ', text_encoded[:15])
print(text_encoded[15:21], ' == 디코딩  ==> ', ''.join(char_array[text_encoded[15:21]]))

In [None]:
for ex in text_encoded[:5]:
    print('{} -> {}'.format(ex, char_array[ex]))

In [None]:
seq_length = 40
chunk_size = seq_length + 1

text_chunks = [text_encoded[i:i + chunk_size]
               for i in range(len(text_encoded) - chunk_size + 1)]
for seq in text_chunks[:1]:
    input_seq = seq[:seq_length]
    target = seq[seq_length]
    print(input_seq, ' -> ', target)
    print(repr(''.join(char_array[input_seq])),
          ' -> ', repr(''.join(char_array[target])))

In [None]:
import torch
from torch.utils.data import Dataset


class TextDataset(Dataset):
    def __init__(self, text_chunks):
        self.text_chunks = text_chunks

    def __len__(self):
        return len(self.text_chunks)

    def __getitem__(self, idx):
        text_chunk = self.text_chunks[idx]
        return text_chunk[:-1].long(), text_chunk[1:].long()


seq_dataset = TextDataset(torch.tensor(text_chunks))

In [None]:
for i, (seq, target) in enumerate(seq_dataset):
    print('입력 (x) : ', repr(''.join(char_array[seq])))
    print('타깃 (y) : ', repr(''.join(char_array[target])))
    print()
    if i == 1:
        break

In [None]:
device = torch.device("cuda:0")

In [None]:
from torch.utils.data import DataLoader

batch_size = 64
torch.manual_seed(1)
seq_dl = DataLoader(seq_dataset, batch_size=batch_size, shuffle=True, drop_last=True)

In [None]:
import torch.nn as nn


class RNN(nn.Module):
    def __init__(self, vocab_size, embed_dim, rnn_hidden_size):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.rnn_hidden_size = rnn_hidden_size
        self.rnn = nn.LSTM(embed_dim, rnn_hidden_size,
                           batch_first=True)
        self.fc = nn.Linear(rnn_hidden_size, vocab_size)

    def forward(self, x, hidden, cell):
        out = self.embedding(x).unsqueeze(1)
        out, (hidden, cell) = self.rnn(out, (hidden, cell))
        out = self.fc(out).reshape(out.size(0), -1)
        return out, hidden, cell

    def init_hidden(self, batch_size):
        hidden = torch.zeros(1, batch_size, self.rnn_hidden_size)
        cell = torch.zeros(1, batch_size, self.rnn_hidden_size)
        return hidden.to(device), cell.to(device)


vocab_size = len(char_array)
embed_dim = 256
rnn_hidden_size = 512

torch.manual_seed(1)
model = RNN(vocab_size, embed_dim, rnn_hidden_size)
model = model.to(device)
model

In [None]:
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.005)
num_epochs = 10000
torch.manual_seed(1)
for epoch in range(num_epochs):
    hidden, cell = model.init_hidden(batch_size)
    seq_batch, target_batch = next(iter(seq_dl))
    seq_batch = seq_batch.to(device)
    target_batch = target_batch.to(device)
    optimizer.zero_grad()
    loss = 0
    for c in range(seq_length):
        pred, hidden, cell = model(seq_batch[:, c], hidden, cell)
        loss += loss_fn(pred, target_batch[:, c])
    loss.backward()
    optimizer.step()
    loss = loss.item() / seq_length
    if epoch % 500 == 0:
        print(f'에포크 {epoch} 손실: {loss:.4f}')

In [None]:
from torch.distributions.categorical import Categorical

torch.manual_seed(1)
logits = torch.tensor([[1.0, 1.0, 1.0]])
print('확률 : ', nn.functional.softmax(logits, dim=1).numpy()[0])

m = Categorical(logits=logits)
samples = m.sample((10,))
print(samples.numpy())

In [None]:
torch.manual_seed(1)
logits = torch.tensor([[1.0, 1.0, 3.0]])
print('확률 : ', nn.functional.softmax(logits, dim=1).numpy()[0])

m = Categorical(logits=logits)
samples = m.sample((10,))
print(samples.numpy())

In [None]:
def sample(model, starting_str,
           len_generated_text=500,
           scale_factor=1.0):
    encoded_input = torch.tensor([char2int[s] for s in starting_str])
    encoded_input = torch.reshape(encoded_input, (1, -1))

    generated_str = starting_str

    model.eval()
    hidden, cell = model.init_hidden(1)
    hidden = hidden.to('cpu')
    cell = cell.to('cpu')
    for c in range(len(starting_str) - 1):
        _, hidden, cell = model(encoded_input[:, c].view(1), hidden, cell)

    last_char = encoded_input[:, -1]
    for i in range(len_generated_text):
        logits, hidden, cell = model(last_char.view(1), hidden, cell)
        logits = torch.squeeze(logits, 0)
        scaled_logits = logits * scale_factor
        m = Categorical(logits=scaled_logits)
        last_char = m.sample()
        generated_str += str(char_array[last_char])

    return generated_str


torch.manual_seed(1)
model.to('cpu')
print(sample(model, starting_str='The island'))

In [None]:
logits = torch.tensor([[1.0, 1.0, 3.0]])
print('스케일 조정 전의 확률 : ', nn.functional.softmax(logits, dim=1).numpy()[0])
print('0.5배 조정 후 확률 : ', nn.functional.softmax(0.5 * logits, dim=1).numpy()[0])
print('0.1배 조정 후 확률 : ', nn.functional.softmax(0.1 * logits, dim=1).numpy()[0])

In [None]:
torch.manual_seed(1)
print(sample(model, starting_str='The island', scale_factor=2.0))

In [None]:
torch.manual_seed(1)
print(sample(model, starting_str='The island', scale_factor=0.5))