# 텍스트 감성분석
-  Kcbert 기반 전이학습
- 긍정(1) 부정(0) 분류기 개발과 추론
- 네이버 영화 리뷰 데이터(nsmc): https://huggingface.co/datasets/nsmc

### 페키지 설치와 불러오기

In [None]:
!pip install accelerate -U

In [None]:
import pandas as pd
import torch
from torch.utils.data import DataLoader, Dataset, random_split
from transformers import BertTokenizer, BertConfig, BertForSequenceClassification, Trainer, TrainingArguments, EarlyStoppingCallback
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix
import numpy as np

## 학습 데이터셋 구조 설정

In [None]:
# 데이터셋 클래스 정의
class NSMCDataset(Dataset):
    def __init__(self, file_path, tokenizer, max_length):
        self.data = pd.read_excel(file_path)[['id','text','label']]  # 파일 불러오기
        self.data.dropna(inplace=True)
        self.data = self.data[self.data['text'].apply(lambda x: isinstance(x, str) and x.strip() != '')]
        self.tokenizer = tokenizer  # 토크나이저 초기화
        self.max_length = max_length  # 입력 시퀀스의 최대 길이 설정

    def __len__(self):
        return len(self.data)  # 데이터셋의 길이 반환

    def __getitem__(self, idx):
        text = self.data.iloc[idx, 1]  # 데이터프레임에서 텍스트 추출
        label = self.data.iloc[idx, 2]  # 데이터프레임에서 라벨 추출
        inputs = self.tokenizer.encode_plus(
            text,
            None,
            add_special_tokens=True,
            max_length=self.max_length,
            padding='max_length',
            return_token_type_ids=True,
            truncation=True
        )
        input_ids = torch.tensor(inputs['input_ids'], dtype=torch.long)  # 토큰 ID 텐서로 변환
        attention_mask = torch.tensor(inputs['attention_mask'], dtype=torch.long)  # 어텐션 마스크 텐서로 변환
        token_type_ids = torch.tensor(inputs['token_type_ids'], dtype=torch.long)  # 토큰 타입 ID 텐서로 변환
        label = torch.tensor(label, dtype=torch.long)  # 라벨 텐서로 변환
        return {
            'input_ids': input_ids,
            'attention_mask': attention_mask,
            'token_type_ids': token_type_ids,
            'labels': label
        }  # 딕셔너리 형태로 반환

In [None]:
# 실습데이터: 데이터의 일부만 가지고 연습. 실전에서는 전체 데이터 사용
data=pd.read_excel("/content/drive/MyDrive/2024_1_class/nsmc_ratings.xlsx")
label_0 = data[data['label'] == 0].sample(n=5000, random_state=42)
label_1 = data[data['label'] == 1].sample(n=5000, random_state=42)
small_data = pd.concat([label_0, label_1]).reset_index(drop=True)
small_data.to_excel("/content/drive/MyDrive/2024_1_class/nsmc_ratings_small.xlsx")

In [None]:
# huggingface에 데이터셋이 있을 경우: https://huggingface.co/datasets/nsmc
! pip install datasets

from datasets import load_dataset
dataset_hf = load_dataset('nsmc')  # 로드
dataset_hf=pd.concat([pd.DataFrame(dataset_hf['train']), pd.DataFrame(dataset_hf['train'])])
dataset_hf=dataset_hf.drop_duplicates()

## 데이터셋 불러오기와 분리: 훈련(train)-검증(validation)-평가(test)

In [None]:
# 데이터셋 설정 및 분리
tokenizer = BertTokenizer.from_pretrained('beomi/kcbert-base')  # 토크나이저 로드
dataset = NSMCDataset("/content/drive/MyDrive/2024_1_class/nsmc_ratings_small.xlsx", tokenizer, max_length=64)  # 데이터셋 불러오기
train_size = int(0.8 * len(dataset))  # 훈련 데이터 크기
val_size = int(0.1 * len(dataset))  # 검증 데이터 크기
test_size = len(dataset) - train_size - val_size  # 테스트 데이터 크기
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])  # 데이터셋 분리

# 데이터 로더 설정
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)  # 훈련 데이터 로더
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)  # 검증 데이터 로더
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)  # 테스트 데이터 로더

## 베이스 모델 설정

In [None]:
# BERT 모델 로드 및 드롭아웃 설정
config = BertConfig.from_pretrained("beomi/kcbert-base", num_labels=2)
config.hidden_dropout_prob = 0.1  # 드롭아웃 확률 설정
config.attention_probs_dropout_prob = 0.1  # 어텐션 드롭아웃 확률 설정

model = BertForSequenceClassification.from_pretrained("beomi/kcbert-base", config=config)

# 모델을 GPU로 이동
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

## 학습: 훈련(train)-검증(validation)*이탤릭체 텍스트* 데이터 활용

In [None]:
# 훈련 인자 설정
training_args = TrainingArguments(
    output_dir='./results/',  # 결과 저장 폴더
    num_train_epochs=1,  # 에포크 수
    per_device_train_batch_size=32,  # 훈련 배치 크기
    per_device_eval_batch_size=32,  # 평가 배치 크기
    warmup_steps=100,  # 학습 초기 100단계 동안 학습률을 선형적으로 증가시키고, 이후에는 설정된 학습률 유지
    weight_decay=0.01,  # 모델 가중치를 0.01의 비율로 감소시켜 모델이 과적합되지 않도록 도와줌
    logging_dir='./logs/',  # 로그 저장 폴더
    logging_steps=10,  # 로그 기록 단계
    evaluation_strategy="epoch",  # 에포크 단위로 평가
    save_strategy="epoch",  # 에포크 단위로 모델 저장
    #evaluation_strategy="steps",  # 스텝 단위로 평가
    #eval_steps=100,  # 100 스텝마다 평가
    #save_strategy="steps",  # 스텝 단위로 모델 저장
    #save_steps=100,  # 100 스텝마다 저장
    load_best_model_at_end=True,  # 최적 모델 로드
    metric_for_best_model="accuracy",  # 최적 모델 기준
    greater_is_better=True,  # 높은 값이 더 나은지 여부
    save_total_limit=3,  # 최대 체크포인트 저장 수
    learning_rate=5e-5,  # 학습률
    optim="adamw_torch"  # 옵티마이저
)

# 평가 메트릭 정의
def compute_metrics(pred):
    labels = pred.label_ids  # 실제 라벨
    preds = pred.predictions.argmax(-1)  # 예측값
    acc = accuracy_score(labels, preds)  # 정확도 계산
    f1 = f1_score(labels, preds, average='weighted')  # F1 스코어 계산
    return {
        'accuracy': acc,
        'f1': f1,
    }

In [None]:
### 모델 훈련

# 트레이너 초기화
trainer = Trainer(
    model=model,  # 모델 설정
    args=training_args,  # 훈련 인자 설정
    train_dataset=train_dataset,  # 훈련 데이터셋
    eval_dataset=val_dataset,  # 검증 데이터셋
    compute_metrics=compute_metrics,  # 평가 메트릭 설정
    callbacks=[EarlyStoppingCallback(early_stopping_patience=3)],  # 조기 종료 콜백 추가
)

# 모델 훈련
trainer.train()

## 평가(test)

In [None]:
### 모델 평가
predictions, labels, _ = trainer.predict(test_dataset)  # 예측 수행
preds = np.argmax(predictions, axis=1)  # 예측값 추출
acc = accuracy_score(labels, preds)  # 정확도 계산
f1 = f1_score(labels, preds, average='weighted')  # F1 스코어 계산
cm = confusion_matrix(labels, preds)  # 혼동 행렬 계산
print(f"Accuracy: {acc}")
print(f"F1 Score: {f1}")
print("Confusion Matrix:")
print(cm)

# 모델과 토크나이저를 저장할 디렉토리
save_directory = '/content/drive/MyDrive/2024_1_class/nsmc_best_model/'

# 모델과 토크나이저 저장
model.save_pretrained(save_directory)
tokenizer.save_pretrained(save_directory)

## 추론(inference): 모델 로드와 분석 데이터 적용


In [None]:
# 추론 설정
import torch
from transformers import BertTokenizer, BertForSequenceClassification
from torch.utils.data import DataLoader, Dataset
from safetensors.torch import save_file, load_file

# 데이터셋 정의
class TextDataset(Dataset):
    def __init__(self, texts, tokenizer, max_length):
        self.texts = texts
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        inputs = self.tokenizer(
            text,
            max_length=self.max_length,
            padding='max_length',
            truncation=True,
            return_tensors="pt"
        )
        return inputs['input_ids'].squeeze(), inputs['attention_mask'].squeeze()

# 미니배치 추론 함수
def batch_inference(model, tokenizer, texts, batch_size=32, max_length=64):
    dataset = TextDataset(texts, tokenizer, max_length)
    dataloader = DataLoader(dataset, batch_size=batch_size)

    model.eval()
    predictions = []

    with torch.no_grad():
        for input_ids, attention_mask in dataloader:
            input_ids = input_ids.to(model.device)
            attention_mask = attention_mask.to(model.device)
            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            logits = outputs.logits
            batch_predictions = torch.argmax(logits, dim=-1)
            predictions.extend(batch_predictions.cpu().numpy())

    return predictions

# KC BERT 모델과 토크나이저 로드
model_name = "beomi/kcbert-base"
tokenizer = BertTokenizer.from_pretrained(model_name)
model = BertForSequenceClassification.from_pretrained(model_name)

# safetensors 형식으로 저장된 모델 가중치 로드
state_dict = load_file('/content/drive/MyDrive/2024_1_class/nsmc_best_model/model.safetensors')

# KC BERT 모델 구조 초기화 및 가중치 로드
loaded_model = BertForSequenceClassification.from_pretrained(model_name)
loaded_model.load_state_dict(state_dict)

In [None]:
# 추론 실습

texts = ["이 영화 정말 재미있어요!", "이 영화는 실망 그 자체입니다.", "훌륭한 인물이 되어 사회에 기여할 겁니다.", "잘못한 일에 대해 반성하고 사과하세요."]

# 모델을 GPU로 이동 (가능한 경우)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
loaded_model.to(device)

# 미니배치 추론 수행
predicted_labels = batch_inference(loaded_model, tokenizer, texts, batch_size=2, max_length=128)

for text, label in zip(texts, predicted_labels):
    print(f"Text: {text} => Predicted label: {label}")