<a href="https://colab.research.google.com/github/jejae3372/Colab_AI/blob/main/Pre_trained_Model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install torchtext==0.3.1

Collecting torchtext==0.3.1
  Downloading torchtext-0.3.1-py3-none-any.whl (62 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m62.4/62.4 kB[0m [31m1.2 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: torchtext
  Attempting uninstall: torchtext
    Found existing installation: torchtext 0.16.0
    Uninstalling torchtext-0.16.0:
      Successfully uninstalled torchtext-0.16.0
Successfully installed torchtext-0.3.1


In [3]:
import re
import sys
import random

import torch
import torch.nn as nn
import torch.nn.functional as F

from torchtext import data
from torchtext import datasets

In [5]:
TEXT = data.Field(batch_first = True,
                  fix_length = 500,
                  tokenize = str.split,
                  pad_first = True,
                  pad_token = '[PAD]',
                  unk_token = '[UNK]')

LABEL = data.LabelField(dtype = torch.float)

train_data, test_data = datasets.IMDB.splits(text_field = TEXT,
                                             label_field = LABEL)


downloading aclImdb_v1.tar.gz


aclImdb_v1.tar.gz: 100%|██████████| 84.1M/84.1M [00:04<00:00, 18.2MB/s]


In [6]:
def PreProcessingText(input_sentence):
  input_sentence = input_sentence.lower()
  input_sentence = re.sub('<[^>]*>', repl= ' ', string = input_sentence)
  input_sentence = re.sub('[!"#$%&\()*+,-./:;<=>?@[\\]^_`{|}~]', repl= ' ', string = input_sentence)
  input_sentence = re.sub('\s+', repl = ' ', string = input_sentence)
  if input_sentence:
    return input_sentence

for example in train_data.examples:
  vars(example)['text'] = PreProcessingText(' '.join(vars(example)['text'])).split()

for example in test_data.examples:
  vars(example)['text'] = PreProcessingText(' '.join(vars(example)['text'])).split()


In [13]:
model_config = {'emb_type': 'glove', 'emb_dim': 300}
TEXT.build_vocab(train_data,
                 min_freq = 2,
                 max_size = None,
                 vectors = f"glove.6B.{model_config['emb_dim']}d")
LABEL.build_vocab(train_data)
model_config['vocab_size'] = len(TEXT.vocab)

.vector_cache/glove.6B.zip: 862MB [02:39, 5.40MB/s]                           
100%|█████████▉| 399999/400000 [00:37<00:00, 10572.92it/s]


In [14]:
train_data, vaild_data = train_data.split(random_state = random.seed(0), split_ratio = 0.8)

In [16]:
model_config['batch_size'] = 30

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

train_iterator, vaild_iterator, test_iterator = data.BucketIterator.splits(
    (train_data, vaild_data, test_data),
    batch_size = model_config['batch_size'],
    device = device
)

In [17]:
sample_for_check = next(iter(train_iterator))
print(sample_for_check)
print(sample_for_check.text)
print(sample_for_check.label)


[torchtext.data.batch.Batch of size 30]
	[.text]:[torch.LongTensor of size 30x500]
	[.label]:[torch.FloatTensor of size 30]
tensor([[   1,    1,    1,  ..., 1164,    6,  383],
        [   1,    1,    1,  ...,   15,    2, 4762],
        [   1,    1,    1,  ...,  103,   11,  120],
        ...,
        [   1,    1,    1,  ...,    4, 1609,  371],
        [ 463,  109,  176,  ...,    9,    7,  469],
        [   1,    1,    1,  ..., 2414, 1468, 2344]])
tensor([0., 1., 0., 1., 0., 0., 1., 0., 0., 0., 0., 0., 1., 0., 1., 1., 0., 0.,
        0., 1., 1., 1., 0., 0., 1., 1., 1., 1., 1., 1.])


In [18]:
class SentenceClassification(nn.Module):
  def __init__(self, **model_config):
    super(SentenceClassification, self).__init__()

    if model_config['emb_type'] == 'glove' or 'fasttext':
      self.emb = nn.Embedding(model_config['vocab_size'],
                              model_config['emb_dim'],
                              _weight = TEXT.vocab.vectors)
    else:
      self.emb = nn.Embedding(model_config['vocab_size'],
                              model_config['emb_dim'])

    self.bidirectional = model_config['bidirectional']
    self.num_direction = 2 if model_config['bidirectional'] else 1
    self.model_type = model_config['model_type']

    self.RNN = nn.RNN(input_size = model_config['emb_dim'],
                      hidden_size = model_config['hidden_dim'],
                      dropout=model_config['dropout'],
                      bidirectional = model_config['bidirectional'],
                      batch_first = model_config['batch_first'])

    self.LSTM = nn.LSTM(input_size = model_config['emb_dim'],
                      hidden_size = model_config['hidden_dim'],
                      dropout=model_config['dropout'],
                      bidirectional = model_config['bidirectional'],
                      batch_first = model_config['batch_first'])

    self.GRU = nn.GRU(input_size = model_config['emb_dim'],
                      hidden_size = model_config['hidden_dim'],
                      dropout=model_config['dropout'],
                      bidirectional = model_config['bidirectional'],
                      batch_first = model_config['batch_first'])

    self.fc = nn.Linear(model_config['hidden_dim'] * self.num_direction,
                        model_config['output_dim'])
    self.drop = nn.Dropout(model_config['dropout'])

  def forward(self, x):

    emb = self.emb(x)

    if self.model_type == 'RNN':
      output, hidden = self.RNN(emb)
    elif self.model_type == 'LSTM':
      output, (hidden, cell) = self.LSTM(emb)
    elif self.model_type == 'GRU':
      output, hidden = self.GRU(emb)
    else :
      raise NameError('Select model_type in [RNN, LSTM, GRU]')

    last_output = output[:,-1,:]

    return self.fc(self.drop(last_output))

In [19]:
loss_fn = nn.BCEWithLogitsLoss().to(device)

def binary_accuracy(preds, y):
    rounded_preds = torch.round(torch.sigmoid(preds))
    correct = (rounded_preds == y).float()
    acc = correct.sum()/len(correct)
    return acc

In [None]:
def train(model, iterator, optimizer, loss_fn, idx_epoch, **model_params):

    epoch_loss = 0
    epoch_acc = 0

    model.train()
    batch_size = model_params['batch_size']

    for idx, batch in enumerate(iterator):

        # Initializing
        optimizer.zero_grad()

        # Forward
        predictions = model(batch.text).squeeze()
        loss = loss_fn(predictions, batch.label)
        acc = binary_accuracy(predictions, batch.label)

        sys.stdout.write(
                    "\r" + f"[Train] Epoch : {idx_epoch:^3}"\
                    f"[{(idx + 1) * batch_size} / {len(iterator) * batch_size} ({100. * (idx + 1) / len(iterator) :.4}%)]"\
                    f"  Loss: {loss.item():.4}"\
                    f"  Acc : {acc.item():.4}"\
                    )

        # Backward
        loss.backward()
        optimizer.step()

        # Update Epoch Performance
        epoch_loss += loss.item()
        epoch_acc += acc.item()

    return epoch_loss/len(iterator) , epoch_acc/len(iterator)

In [None]:
def evaluate(model, iterator, loss_fn):

    epoch_loss = 0
    epoch_acc = 0

    # evaluation mode
    model.eval()
    with torch.no_grad():
        for batch in iterator:
            predictions = model(batch.text).squeeze(1)
            loss = loss_fn(predictions, batch.label)
            acc = binary_accuracy(predictions, batch.label)

            epoch_loss += loss.item()
            epoch_acc += acc.item()

    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [None]:

model_config.update(dict(batch_first = True,
                         model_type = 'RNN',
                         bidirectional = True,
                         hidden_dim = 128,
                         output_dim = 1,
                         dropout = 0))
model_config['model_type'] = 'RNN'
model = SentenceClassification(**model_config).to(device)
optimizer = torch.optim.Adam(model.parameters())
loss_fn = nn.BCEWithLogitsLoss().to(device)
N_EPOCH = 5

best_valid_loss = float('inf')
model_name = f"{'bi-' if model_config['bidirectional'] else ''}{model_config['model_type']}_{model_config['emb_type']}"

print('---------------------------------')
print(f'Model name : {model_name}')
print('---------------------------------')

for epoch in range(N_EPOCH):
    train_loss, train_acc = train(model, train_iterator, optimizer, loss_fn, epoch, **model_config)
    valid_loss, valid_acc = evaluate(model, valid_iterator, loss_fn)
    print('')
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), f'./{model_name}.pt')
        print(f'\t Saved at {epoch}-epoch')

    print(f'\t Epoch : {epoch} | Train Loss : {train_loss:.4} | Train Acc : {train_acc:.4}')
    print(f'\t Epoch : {epoch} | Valid Loss : {valid_loss:.4} | Valid Acc : {valid_acc:.4}')