In [1]:
import nltk
from nltk.corpus import movie_reviews

from sklearn.model_selection import train_test_split

import numpy as np
from pathlib import Path

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torch.optim import AdamW

import evaluate

from transformers import BertModel, BertTokenizerFast

In [2]:
result_path = str(Path.cwd().parent / 'exercisebook_large_data' / 'Transformers' / 'results')
result_path

'c:\\Coding\\Local\\exercisebook_large_data\\Transformers\\results'

In [3]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print(device)

cuda


In [4]:
fileids = movie_reviews.fileids()
reviews = [movie_reviews.raw(fileid) for fileid in fileids]
categories = [movie_reviews.categories(fileid)[0] for fileid in fileids] 

label_dict = {'pos':1, 'neg':0}
y = [label_dict[c] for c in categories]

In [5]:
X_train_val, X_test, y_train_val, y_test = train_test_split(reviews, y, test_size=0.2, random_state=7)
X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=0.3, random_state=0)

In [6]:
model_name = 'bert-base-uncased'
tokenizer = BertTokenizerFast.from_pretrained(model_name)
bert_model = BertModel.from_pretrained(model_name)

In [7]:
train_input = tokenizer(X_train, truncation=True, padding=True, return_tensors='pt')
val_input = tokenizer(X_val, truncation=True, padding=True, return_tensors='pt')
test_input = tokenizer(X_test, truncation=True, padding=True, return_tensors='pt')
print(list(train_input.keys()))

['input_ids', 'token_type_ids', 'attention_mask']


In [8]:
class OurDataset(torch.utils.data.Dataset):
    def __init__(self, inputs, labels):
        self.inputs = inputs
        self.labels = labels
    
    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.inputs.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item
    
    def __len__(self):
        return len(self.labels)

In [9]:
train_dataset = OurDataset(train_input, y_train)
val_dataset = OurDataset(val_input, y_val)
test_dataset = OurDataset(test_input, y_test)

In [10]:
metric = evaluate.load('accuracy')

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    pred = np.argmax(logits, axis=-1)
    return metric.compute(predictions=pred, references=labels)

In [11]:
train_loader = DataLoader(train_dataset, shuffle=True, batch_size=16)
val_loader = DataLoader(val_dataset, shuffle=True, batch_size=16)
test_loader = DataLoader(test_dataset, shuffle=True, batch_size=16)

In [12]:
class MyModel(nn.Module):
    def __init__(self, pretrained_model, token_size, num_labels):
        super(MyModel, self).__init__()
        self.token_size = token_size
        self.num_labels = num_labels
        self.pretrained_model = pretrained_model
        
        self.classifier = nn.Linear(self.token_size, self.num_labels)
    
    def forward(self, inputs):
        outputs = self.pretrained_model(**inputs)
        bert_clf_token = outputs.last_hidden_state[:, 0, :]
        return self.classifier(bert_clf_token)

In [13]:
model = MyModel(bert_model, num_labels=2, token_size=bert_model.config.hidden_size)
model = model.to(device)

In [14]:
optim = AdamW(model.parameters(), lr=1e-3)
loss_function = nn.CrossEntropyLoss()
num_epochs = 5

In [15]:
model.train()

for epoch in range(num_epochs):
    total_epoch_loss = 0

    for step, batch in enumerate(train_loader):
        inputs = {k: v.to(device) for k, v in batch.items() if k != 'labels'}
        labels = batch['labels'].to(device)
        outputs = model(inputs)

        loss = loss_function(outputs, F.one_hot(labels, num_classes=2).float())

        if (step+1) % 100 == 0:
            print(f"Epoch {epoch+1}, batch {step+1}, loss {loss:.4f}")
        
        optim.zero_grad()
        loss.backward()
        optim.step()

        total_epoch_loss += float(loss)

    avg_epoch_loss = total_epoch_loss / len(train_loader)
    print(f"Avg loss for epoch {epoch+1}: {avg_epoch_loss:.4f}")

  item = {key: torch.tensor(val[idx]) for key, val in self.inputs.items()}
Consider using tensor.detach() first. (Triggered internally at C:\actions-runner\_work\pytorch\pytorch\pytorch\torch\csrc\autograd\generated\python_variable_methods.cpp:836.)
  total_epoch_loss += float(loss)


Avg loss for epoch 1: 0.8781
Avg loss for epoch 2: 0.7066
Avg loss for epoch 3: 0.7194
Avg loss for epoch 4: 0.6954
Avg loss for epoch 5: 0.6956


In [16]:
model.eval()

for batch in test_loader:
    inputs = {k: v.to(device) for k, v in batch.items() if k != 'labels'}
    labels = batch['labels'].to(device)

    with torch.no_grad():
        outputs = model(inputs)
    
    predictions = torch.argmax(outputs, dim=-1)
    metric.add_batch(predictions=predictions, references=labels)

metric.compute()

  item = {key: torch.tensor(val[idx]) for key, val in self.inputs.items()}


{'accuracy': 0.495}

In [17]:
del model
torch.cuda.empty_cache()

In [None]:
# 음.. accuracy가 낮게 나오는데 어차피 쓸 일 없으려나? Trainer 위주로 활용 예정