<a href="https://colab.research.google.com/github/ingabLee/Transformers_book/blob/main/Transformer_09_13.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#!pip install transformers==4.30.0
#!pip install torchtext==0.15.2
#!pip install portalocker==2.7.0
#!pip install accelerate -U
#!pip install torch==2.6.0

#!python --version

!pip3 uninstall --yes torch torchaudio torchvision torchtext torchdata
!pip3 install torch torchaudio torchvision torchtext==0.17.0 torchdata
!pip3 install portalocker
!pip3 install accelerate
!pip3 install transformers

In [None]:
import numpy as np
import torch
import torchtext

print(torch.__version__)
print(torchtext.__version__)

a = np.array([0,0,1])
b = torch.Tensor(a)
print(a)
print(b)

c = b.tolist()
print(c)


In [None]:
from torchtext.datasets import IMDB

train_iter = IMDB(split='train')
test_iter = IMDB(split='test')

# 출력 결과를 고정하기 위해
import random
random.seed(6)

# train_iter를 리스트 타입으로 변경
train_list = list(train_iter)
test_list = list(test_iter)

# 각기 1000개씩 random sampling
train_list_small = random.sample(train_list, 1000)
test_list_small = random.sample(test_list, 1000)

# 각 변수에 담긴 인덱스 0의 내용 화면 출력
print(train_list_small[0])
print(test_list_small[0])

In [None]:
# train_text, train_label 컨테이너 생성
# 아래 반복문에서 생성된 결과를 담는다.
train_texts = []
train_labels = []

#for 반복문
# train_list_small에 담긴 튜플쌍 원소를 변수명 label, test부여하여 순서대로 축출
for label, text in train_list_small:
  # IMDB 데이터의 기존 레이블 2를 1로 변경, 기존 레이블 1을 0으로 변경.
  train_labels.append(1 if label == 2 else 0)
  train_texts.append(text)

# test_list_small도 동일하게 처리
test_texts = []
test_labels = []
for label, text in test_list_small:
  test_labels.append(1 if label==2 else 0)
  test_texts.append(text)

# 각 변수에 담긴 첫번째 원소 출력
print(train_texts[0])
print(train_labels[0])
print(test_texts[0])
print(test_labels[0])

In [None]:
from sklearn.model_selection import train_test_split
train_texts, val_texts, train_labels, val_labels = train_test_split(train_texts, train_labels, test_size=.2,random_state=3)

print(len(train_texts))
print(len(train_labels))
print(len(val_texts))
print(len(val_labels))

In [None]:
# distilbert-base-uncased 모델에서 tokenizer 불러오기
from transformers import DistilBertTokenizerFast
tokenizer = DistilBertTokenizerFast.from_pretrained('distilbert-base-uncased')

In [None]:
# tokenizer execute
# truncation=True -> 모델의 default max_length를 넘는 입력부분은 더이상 받지않고 절단한다는 의미
train_encodings = tokenizer(train_texts, truncation=True, padding=True)
val_encodings = tokenizer(val_texts, truncation=True, padding=True)
test_encodings = tokenizer(test_texts, truncation=True, padding=True)

# 0번째 입력문의 5번째 토큰까지 input_ids출력
print(train_encodings["input_ids"][0][:5])

#위의 결과를 다시 디코딩해서 출력
print(tokenizer.decode(train_encodings["input_ids"][0][:5]))

In [None]:
import torch

# Dataset 클래스를 상속하는 IMDB class정의
class IMdbDataset(torch.utils.data.Dataset):

  # contructor
  def __init__(self, encodings, labels):
    self.encodings = encodings
    self.labels = labels

  #
  def __getitem__(self, idx):
    # self.encoding에 담긴 키(key)와 키값(value)를 items()로 축출
    # 이 값을 key와 val 변수에 담아 새로운 키와 키값(torch.tensor(val[idx]))을 갖는 딕셔너리 생성

    # 딕셔너리{"key1 : value1", ...} 형태를 가지는 데이터 구조.
    # val[idx]에 담긴 데이터를 torch.tensor()을 통해 텐셔화.
    item = {key:torch.tensor(val[idx]) for key, val in self.encodings.items() }

    # tensor화
    item['labels'] = torch.tensor(self.labels[idx])
    return item

  #
  def __len__(self):
    return len(self.labels)

train_dataset = IMdbDataset(train_encodings, train_labels)
val_dataset = IMdbDataset(val_encodings, val_labels)
test_dataset = IMdbDataset(test_encodings, test_labels)

for i in train_dataset:
  print(i)
  break

In [None]:
from transformers import DistilBertForSequenceClassification

# load distilbert model.  (need transformers==4.30.0)
model = DistilBertForSequenceClassification.from_pretrained('distilbert-base-uncased')
model

In [None]:
from transformers import TrainingArguments

training_args = TrainingArguments(
    output_dir='./result',    # 출력 폴더 설정
    num_train_epochs=8,       # training epochs count
    per_device_train_batch_size=16, # train mini-batch size
    per_device_eval_batch_size=64,  # eval min-batch size
    warmup_steps=500,   # learning-rate scheduling warmup step
    weight_decay=0.01,  # 가중치 감소 강도
    logging_dir='./logs',   #로그 폴더 경로
    logging_steps=10
)

In [None]:
#import torch

# gpu사용이 가능한 경우 tensor타입를 gpu로 전달
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

In [None]:
# fine-tunning이전에 세입력 문장 극성 판별
# tokenizing
input_tokens = tokenizer(["I feel fantasic",
                          "My life is going something wrong.",
                          "I have not figured out what the chosen title has to do with the movie."],
                         truncation=True, padding=True)

# input text tokenizing result에 담긴 input_ids를 모델에 투입
# 그리고 모델 출력 결과를 gpu로 전송하며 값은 변수 outputs에 저장
outputs = model(torch.tensor(input_tokens['input_ids']).to(device))

# create label dictionary
label_dict = {0:'positvie', 1:'negative'}
print(outputs['logits'])


# outputs변수에 담긴 logits값을 행단위, 즉, 입력문장 단위로 가장 큰값의 위치(인덱스) 추출
# 결과값(인덱스)을 cpu로 넘기고 넘파이 타입으로 변경후, 인덱스에 매칭되는 레이블 출력

#print([label_dict[i] for i in torch.argmax(outputs['logits'], axis=1).cpu().numpy()])
v = torch.argmax(outputs['logits'], axis=1)
print(v)
#print(v.cpu().numpy())
#print(torch.__version__)

w = v.tolist()
print([label_dict[i] for i in w])

In [None]:
# maybe 5minute

from transformers import Trainer
trainer = Trainer(
    model=model,      # pretrain model instance
    args = training_args, # transformer.Arguments define hyper parameter
    train_dataset = train_dataset,  # train data set
    eval_dataset = val_dataset    # evaluation data set
)
trainer.train()

In [None]:
# after fine-tunning. then try again three text

input_tokens = tokenizer(["I feel fantastic",
                          "My life is going something wrong.",
                          "I have not figured out what the chosen title has to do with the movie."],
                         truncation=True, padding=True)

# input text tokenizing result에 담긴 input_ids를 모델에 투입
# 그리고 모델 출력 결과를 gpu로 전송하며 값은 변수 outputs에 저장
outputs = model(torch.tensor(input_tokens['input_ids']).to(device))

# create label dictionary
label_dict = {1:'positvie', 0:'negative'}
print(outputs['logits'])


# outputs변수에 담긴 logits값을 행단위, 즉, 입력문장 단위로 가장 큰값의 위치(인덱스) 추출
# 결과값(인덱스)을 cpu로 넘기고 넘파이 타입으로 변경후, 인덱스에 매칭되는 레이블 출력

#print([label_dict[i] for i in torch.argmax(outputs['logits'], axis=1).cpu().numpy()])
v = torch.argmax(outputs['logits'], axis=1).tolist()
print([label_dict[i] for i in v])

In [None]:
# make function
def test_inference(model, tokenizer):
  input_tokens = tokenizer(["I feel fantastic",
                          "My life is going something wrong.",
                          "I have not figured out what the chosen title has to do with the movie."],
                         truncation=True, padding=True)

  # input text tokenizing result에 담긴 input_ids를 모델에 투입
  # 그리고 모델 출력 결과를 gpu로 전송하며 값은 변수 outputs에 저장
  outputs = model(torch.tensor(input_tokens['input_ids']).to(device))

  # create label dictionary
  label_dict = {1:'positvie', 0:'negative'}

  # outputs변수에 담긴 logits값을 행단위, 즉, 입력문장 단위로 가장 큰값의 위치(인덱스) 추출
  # 결과값(인덱스)을 cpu로 넘기고 넘파이 타입으로 변경후, 인덱스에 매칭되는 레이블 출력

  v = torch.argmax(outputs['logits'], axis=1).tolist()
  return [label_dict[i] for i in v]

In [None]:
# maybe 5minute take time
from torch.utils.data import DataLoader
from torch.optim import AdamW
from transformers import DistilBertForSequenceClassification
from transformers import DistilBertTokenizerFast

# 1) load pretrain model and tokenizer
# add to code ( output of model "to(device)" ) 가능한 경우 결과를 gpu에 전달
tokenizer = DistilBertTokenizerFast.from_pretrained('distilbert-base-uncased')
model = DistilBertForSequenceClassification.from_pretrained('distilbert-base-uncased')
model.to(device)

#print result of before fine tunning
print(test_inference(model, tokenizer))

# 2) Dataloader instance
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)

# 3) define optimize
optim = AdamW(model.parameters(), lr=5e-5)

# 4) 모델을 학습(train)모드로 전환
# 학습모드 전환은 dropout, batch-normalization에 영향을 미침
model.train()

losses=[]

# 5) for loop epochs count
for epoch in range(8):
  print(f'epoch:{epoch}')
  for batch in train_loader:
    # 6) init optimize function gradient
    optim.zero_grad()

    input_ids = batch['input_ids'].to(device)
    attention_mask = batch['attention_mask'].to(device)
    labels = batch['labels'].to(device)

    # 7) inference model
    outputs = model(input_ids, attention_mask=attention_mask, labels=labels)

    # 8) calc loss
    loss = outputs[0]
    losses.append(loss)

    # 9) backward error
    loss.backward()

    # 10) update weight
    optim.step()

# change mode evaluate model
model.eval()

# run test_inference in eval mode.
print(test_inference(model, tokenizer))

In [None]:
print(test_inference(model, tokenizer))
print(losses)
type(losses)

In [None]:
# pytorch에서 item은 tensor값을 python number로 추출하여 cpu에 전달
new_losses = [i.item() for i in losses]

# display first 5 items
new_losses[:5]
type(new_losses)

In [None]:
new_losses[-5:]

In [None]:
import matplotlib.pyplot as plt
plt.plot(new_losses)
plt.show()