참고 url : https://github.com/abhimishra91/transformers-tutorials/blob/master/transformers_summarization_wandb.ipynb

In [None]:
# 버전에 맞는 패키지 설치
!pip install transformers==4.12.5
!pip install sentencepiece==0.1.91

In [None]:
# 모델 학습에 필요한 모듈 불러오기
from transformers import T5Config, T5Tokenizer, T5ForConditionalGeneration
import torch
from torch.utils.data import DataLoader
from tqdm import tqdm


# 모델 패스 지정
model_folder = './etri_et5'

model = T5ForConditionalGeneration.from_pretrained(model_folder)
tokenizer = T5Tokenizer.from_pretrained(model_folder)

In [None]:
# gpu 환경 설정
device = 'cuda' if torch.cuda.is_available() else 'cpu'

# gpu 환경 적용
model.to(device)

In [None]:
# model에 넣기 위한 형식으로 dataset 변경
class CustomDataset:

    def __init__(self, dataframe, tokenizer, source_len, summ_len):
        self.tokenizer = tokenizer
        self.data = dataframe
        self.source_len = source_len
        self.summ_len = summ_len
        self.text = self.data.text
        self.ctext = self.data.ctext

    def __len__(self):
        return len(self.text)

    def __getitem__(self, index):
        ctext = str(self.ctext[index])
        ctext = ' '.join(ctext.split())

        text = str(self.text[index])
        text = ' '.join(text.split())

        source = self.tokenizer.batch_encode_plus([ctext], max_length= self.source_len, pad_to_max_length=True,return_tensors='pt')
        target = self.tokenizer.batch_encode_plus([text], max_length= self.summ_len, pad_to_max_length=True,return_tensors='pt')

        # batch_encode_plus: 인코딩된 sequences 쌍의 추가 정보를 포함하여 사전의 index 반환
        """
        [tokenizer.batch_encode_plus]
        encode_plus => single,   
        batch_encode_plus => a list of pairs of sequences
        
        < example >
            input_text = ['첫번째 문장', '두번째 문장']
            
            [input]
                tokenizer.encode_plus(input_text)
            [output]
                {'input_ids': [2, 2, 1], 
                 'attention_mask': [1, 1, 1]}

            [input]
                tokenizer.batch_encode_plus(input_text)
            [output]
                {'input_ids': [[21437, 19862, 1], [17521, 19862, 1]], 
                 'attention_mask': [[1, 1, 1], [1, 1, 1]]}
           """

        # input_ids: 문장을 토크나이즈해서 인덱스값으로 변환
        # attention_mask: 패딩된 부분에 대해 학습에 영향을 받지 않기 위해 처리해주는 입력값
        source_ids = source['input_ids'].squeeze()
        source_mask = source['attention_mask'].squeeze()
        target_ids = target['input_ids'].squeeze()
        target_mask = target['attention_mask'].squeeze()
        # Tensor.squeeze() => 차원이 1인 차원 제거

        return {
            'source_ids': source_ids.to(dtype=torch.long), 
            'source_mask': source_mask.to(dtype=torch.long), 
            'target_ids': target_ids.to(dtype=torch.long),
            'target_ids_y': target_ids.to(dtype=torch.long)
            }
            # torch.long => 64비트의 부호있는 정수(64-bit integer (signed))
            

In [None]:
# model fine-tuning task를 위한 사용자 함수
def train(epoch, tokenizer, model, device, loader, optimizer):
    model.train()
    for _,data in tqdm(enumerate(loader, 0)):
        y = data['target_ids'].to(device, dtype = torch.long)
        y_ids = y[:, :-1].contiguous()
        lm_labels = y[:, 1:].clone().detach()
        lm_labels[y[:, 1:] == tokenizer.pad_token_id] = -100
        ids = data['source_ids'].to(device, dtype = torch.long)
        mask = data['source_mask'].to(device, dtype = torch.long)

        outputs = model(input_ids = ids, attention_mask = mask, decoder_input_ids=y_ids, labels=lm_labels)
        loss = outputs[0]
        
        if _%10 == 0:
            pass
            
        if _%500==0:
            print(f'Epoch: {epoch}, Loss:  {loss.item()}')
        
        optimizer.zero_grad()

        """
        gradient를 0으로 설정
        이 작업을 하지 않을 시 학습중 backward를 해줄 때 계속 반영되어 예기치않은 방향으로 학습할 수 있음
        
        """
        loss.backward()  # 역전파 단계
        optimizer.step() # update hyper-parameters to optimizer
       

In [None]:
# fine-tuning model이 test 진행을 하기 위한 사용자 함수
def test(epoch, tokenizer, model, device, loader):
    model.eval()
    predictions = []
    actuals = []
    with torch.no_grad():
        for _, data in enumerate(loader, 0):
            y = data['target_ids'].to(device, dtype = torch.long)
            ids = data['source_ids'].to(device, dtype = torch.long)
            mask = data['source_mask'].to(device, dtype = torch.long)

            # train과 다르게 generate할 수 있는 코드가 포함됨.
            generated_ids = model.generate(
                input_ids = ids,
                attention_mask = mask, 
                max_length=150, 
                num_beams=2,
                repetition_penalty=2.5, 
                length_penalty=1.0, 
                early_stopping=True
                )
            preds = [tokenizer.decode(g, skip_special_tokens=True, clean_up_tokenization_spaces=True) for g in generated_ids]
            target = [tokenizer.decode(t, skip_special_tokens=True, clean_up_tokenization_spaces=True)for t in y]
            if _%100==0:
                print(f'Completed {_}')

            predictions.extend(preds)
            actuals.extend(target)
    return predictions, actuals

hyper-parameters

In [None]:
# hyper parameter 설정
config = T5Config()
config.MAX_LEN = 1024
config.SUMMARY_LEN = 150 
config.TRAIN_BATCH_SIZE = 2    # input batch size for training (default: 64)
config.TEST_BATCH_SIZE = 2    # input batch size for testing (default: 1000)
config.TRAIN_EPOCHS = 12        # number of epochs to train (default: 10)
config.TEST_EPOCHS = 1 
config.LEARNING_RATE = 1e-4    # learning rate (default: 0.01)
config.SEED = 42               # random seed (default: 42)

In [None]:
# hyper parameter train, valid에 적용
train_params = {
        'batch_size': config.TRAIN_BATCH_SIZE,
        'shuffle': True,
        'num_workers': 0
        }

test_params = {
        'batch_size': config.TEST_BATCH_SIZE,
        'shuffle': False,
        'num_workers': 0
        }

optimizer = torch.optim.Adam(params =  model.parameters(), lr=config.LEARNING_RATE)

In [None]:
# train, test dataset 설정
import pandas as pd
train_dataset = pd.read_csv('/content/drive/MyDrive/3차 프로젝트/dataset/train.csv')[['document','label']]
test_dataset = pd.read_csv('/content/drive/MyDrive/3차 프로젝트/dataset/test.csv')[['document','label']]

train

In [None]:
# fine-tuning 진행
# 모델이 학습하기 위한 columns name 변경 (ctext: 원문, text: 요약문)
train_dataset.columns = ['ctext','text']

# t5는 prefix 모델로써, 진행할 task를 입력할 시 해당 task 진행
# summarize 진행 할 예정으로 summarize 추가
train_dataset.ctext = 'summarize: ' + train_dataset.ctext

# 위에서 정의한 model에 맞는 dataset 형식 변경하는 함수 적용
training_set = CustomDataset(train_dataset, tokenizer, config.MAX_LEN, config.SUMMARY_LEN)

# DataLoader : 일일히 모든 데이터를 나눠서 forward, backward 작업을 해야하는데 그 과정을 대신 해주는 과정
# 학습 시 minibatch, epoch마다 데이터를 다시 섞어 과적합을 막음.
training_loader = DataLoader(training_set, **train_params) # **var -> {key:value} 형식으로 반환

for epoch in range(config.TRAIN_EPOCHS):
    train(epoch, tokenizer, model, device, training_loader, optimizer)

test

In [None]:
test_dataset.columns = ['ctext','text']
test_dataset.ctext = 'summarize: ' + test_dataset.ctext

test_set = CustomDataset(test_dataset, tokenizer, config.MAX_LEN, config.SUMMARY_LEN)

test_loader = DataLoader(test_set, **test_params)

for epoch in range(config.TEST_EPOCHS):
    predictions, actuals = test(epoch, tokenizer, model, device, test_loader)
    final_df = pd.DataFrame({'Generated Text':predictions,'Actual Text':actuals})

final_df

In [None]:
# model, tokenizer 저장
tokenizer.save_pretrained('./fine-tuned/')
model.save_pretrained('./fine-tuned/')