<a href="https://colab.research.google.com/github/zeroxy/colab_pub/blob/main/bert_mbti.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

https://www.kaggle.com/datasets/zeyadkhalid/mbti-personality-types-500-dataset

데이터셋 부터 먼저 다운로드하여 준비 해 둡시다.

In [None]:
!pip install fastparquet transformers[torch]

In [4]:
import pandas as pd
from pprint import pprint
from sklearn.model_selection import train_test_split

def get_count_by_type(data, character = 'I'):
    return data[data['type'].str.contains(character)]['type'].count()

train_data_path = 'drive/MyDrive/Colab Notebooks/mbti_data/MBTI_500_train.parquet'
test_data_path = 'drive/MyDrive/Colab Notebooks/mbti_data/MBTI_500_test.parquet'


# df = pd.read_csv('sample_data/MBTI 500.csv')
df = pd.read_parquet('/content/drive/MyDrive/Colab Notebooks/mbti_data/MBTI 500.parquet', engine='fastparquet')

pprint( df.info() ) # 무슨 칼럼이 있는지, 각 칼럼별 데이터는 얼마나 있는지 확인
print("===="*8)

pprint( df.describe() ) # 데이터의 추가 정보들, unique 값의 갯수 등을 출력
print("===="*8)

# MBTI 종류별로 데이터 분포가 어떻게 되는지 확인
pprint( df.groupby("type").count().sort_values('posts', ascending=False).T )
print("===="*8)

for a, b in [('I','E'), ('N','S'), ('T','F'), ('P','J')]:
    print(f'{a} : {get_count_by_type(df, a)}, {b} : {get_count_by_type(df, b)}, ')

# MBTI 종류를 numeric 으로 매칭 시키고, id_to_label 생성
df['type_cd'] , origin_labels = pd.factorize( df['type'], sort=True)[:2]
id_to_label = [x for x in origin_labels]
#pprint(id_to_label)
for i , x in enumerate(id_to_label):
    print(f'{i:2} : {x}')
print("===="*8)

### 학습용 과 테스트용 데이터를 92: 8 비율로 나눔.
train, test = train_test_split(df, test_size = 0.08)

### csv 보다 압축률이 좋은 parquet 형태로 저장해 뒀다가 사용 예정
train.to_parquet(train_data_path, engine='fastparquet', compression='gzip')
test.to_parquet(test_data_path, engine='fastparquet', compression='gzip')
print("===="*8)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 106067 entries, 0 to 106066
Data columns (total 2 columns):
 #   Column  Non-Null Count   Dtype 
---  ------  --------------   ----- 
 0   posts   106067 non-null  object
 1   type    106067 non-null  object
dtypes: object(2)
memory usage: 1.6+ MB
None
                                                    posts    type
count                                              106067  106067
unique                                             106067      16
top     know intj tool use interaction people excuse a...    INTP
freq                                                    1   24961
type    INTP   INTJ   INFJ   INFP   ENTP  ENFP  ISTP  ENTJ  ESTP  ENFJ  ISTJ  \
posts  24961  22427  14963  12134  11725  6167  3424  2955  1986  1534  1243   

type   ISFP  ISFJ  ESTJ  ESFP  ESFJ  
posts   875   650   482   360   181  
I : 80677, E : 25390, 
N : 96866, S : 9201, 
T : 69203, F : 36864, 
P : 61632, J : 44435, 
 0 : ENFJ
 1 : ENFP
 2 : ENTJ
 3 : ENTP

In [None]:
#############################################
import pandas as pd
import numpy as np

from sklearn.metrics import top_k_accuracy_score
import torch
from torch.utils.data import Dataset, DataLoader

from transformers import BertForSequenceClassification, BertTokenizer
from transformers import TrainingArguments, Trainer

train_data_path = 'drive/MyDrive/Colab Notebooks/mbti_data/MBTI_500_train.parquet'
test_data_path = 'drive/MyDrive/Colab Notebooks/mbti_data/MBTI_500_test.parquet'

id_to_label = ['ENFJ',  'ENFP',  'ENTJ',  'ENTP',  'ESFJ',  'ESFP',  'ESTJ',  'ESTP',
            'INFJ',  'INFP',  'INTJ',  'INTP',  'ISFJ',  'ISFP',  'ISTJ',  'ISTP']

def compute_metrics(pred): #모델 정확도를 확인하기 위한 metric 생성 함
    labels = pred.label_ids
    preds = pred.predictions
    acc = top_k_accuracy_score(labels, preds, k=1)
    return {'accuracy':acc,} #dict 형태로 여러 지표 추가 가능

def get_test_func(model, tokenizer):
    def test_mbti(target_text, model=model, tokenizer=tokenizer):
        print(target_text)
        print(tokenizer.tokenize(target_text))
        sample_test_input = get_token_by_tokenizer(tokenizer, target_text)
        model.eval()
        result = model( **sample_test_input)['logits'].detach().to('cpu')
        result_idx = result.argmax(-1).item()
        print(result, result_idx)
        print(id_to_label[result_idx])
        model.train()
    return test_mbti

def get_token_by_tokenizer(tokenizer, target_text):
    return tokenizer.encode_plus(
                            target_text,
                            add_special_tokens = True,     # '[CLS]', '[SEP]' 같은 토큰 추가
                            max_length = 512,              # 최대 token 길이
                            padding = 'max_length',        # 최대 길이 만큼 길이 맞춤
                            truncation = True,             # 최대 길이 이상의 토큰일시 길이에 맞춰 자름
                            return_attention_mask = True,
                            return_tensors = 'pt',
    )

class MBTIDataset(Dataset):
    def __init__(self, parquet_path, tokenizer):
        self.tokenizer = tokenizer
        self.df = pd.read_parquet(parquet_path, engine='fastparquet') # parquet 파일 path를 받아서 dataset 객체 생성

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        result = self.df.iloc[idx]
        tempresult = get_token_by_tokenizer(self.tokenizer, result['posts'])
        return {'input_ids' : tempresult['input_ids'][0],
                'attention_mask' : tempresult['attention_mask'][0],
                'labels' : result['type_cd']
                }

bert_name = 'bert-base-uncased' # 사용 할 기본 모델의 위치
class_nums = 16 # MBTI 는 총 16종류
model     = BertForSequenceClassification.from_pretrained(bert_name, num_labels = class_nums)
tokenizer = BertTokenizer.from_pretrained(bert_name)

test_function = get_test_func(model, tokenizer) # 테스트용 함수를 반환 받아 둠

train_dataset = MBTIDataset(train_data_path, tokenizer)
test_dataset = MBTIDataset(test_data_path, tokenizer)
print(f'train data rows : {len(train_dataset):8}')
print(f'test data rows  : {len(test_dataset):8}')

training_args = TrainingArguments(
    output_dir = 'mbti_test',
    num_train_epochs = 8.0,            # 8.0 번 반복 학습
    evaluation_strategy = 'epoch',     # 매 epoch 마다 모델 정확도 측정
    per_device_train_batch_size = 32,  # 한 배치 입력에 32개 데이터 입력 ( train 용 )
    per_device_eval_batch_size  = 32,  # 한 배치 입력에 32개 데이터 입력 ( evaluation 용)
    gradient_accumulation_steps = 8,   # 배치 8개를 묶어서 학습량 계산.
    gradient_checkpointing = True,     # gradient_checkpointing 을 활성화 하여 계산량이 조금 늘지만, gpu 메모리를 절약
    fp16 = True,
    save_strategy = 'epoch',
    save_total_limit = 3,
    dataloader_num_workers = 4         # data loader 를 4개 워커를 이용해 수행.
)

########################################

print('학습 전')
target_text = "I'm ironman"
test_function(target_text)

########################################
for n, p in model.named_parameters():
    if 'bert' in n:
        p.require_grad = False
trainer = Trainer(
    model = model,
    args = training_args,
    train_dataset = train_dataset,
    eval_dataset = test_dataset,
    compute_metrics = compute_metrics
)
trainer.train()
trainer.save_model('mbti_ckpt_train_last_layer')

########################################

print('마지막 레이어만 학습 후')
test_function(target_text)

########################################