In [13]:
# 필요한 라이브러리 설치
!pip install transformers[torch] accelerate -q
!pip install scikit-learn
!pip install pandas
!pip install numpy

import pandas as pd
import numpy as np
import re
import glob
import os
import json
from sklearn.model_selection import train_test_split
from sklearn.utils import class_weight
from transformers import AutoTokenizer
import torch
from torch.utils.data import Dataset

# --- 1. 데이터 로딩 및 전처리 (이전 코드와 동일) ---
data_folder_path = 'C:/Users/admin/Downloads/119.국가기록물_대상_초거대AI_학습을_위한_말뭉치_데이터/3.개방데이터/1.데이터/Training'
all_processed_data = []

def clean_text(text):
    text = str(text)
    text = re.sub(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', '', text)
    text = re.sub(r'\[.*?\]', '', text)
    text = re.sub(r'[^\w\s]', '', text)
    text = re.sub(r' +', ' ', text)
    return text.strip()

json_file_list = glob.glob(os.path.join(data_folder_path, '**', '*.json'), recursive=True)
for file_path in json_file_list:
    try:
        with open(file_path, 'r', encoding='utf-8-sig') as f:
            data = json.load(f)
            if 'data' in data and isinstance(data['data'], list):
                for item in data['data']:
                    text = None
                    is_spam = 0
                    if 'instruct_text' in item and 'labels' in item:
                        for label in item['labels']:
                            if 'level1_type' in label and label['level1_type'] == 7:
                                text = item['instruct_text']
                                is_spam = 1
                                if text:
                                    all_processed_data.append({'text': text, 'is_spam': is_spam})
                                break
                    if not is_spam and 'labels' in item and 'instructs' in item['labels'][0]:
                        for instruction in item['labels'][0]['instructs']:
                            if 'meta' in instruction:
                                for meta_item in instruction['meta']:
                                    if meta_item.get('category') == 'question':
                                        text = instruction.get('text')
                                        if text:
                                            all_processed_data.append({'text': text, 'is_spam': 0})
                                        break
                                if text:
                                    break
            elif 'Data' in data and isinstance(data['Data'], list):
                for item in data['Data']:
                    if 'corpus' in item:
                        text = item['corpus']
                        if text:
                            all_processed_data.append({'text': text, 'is_spam': 0})
    except Exception as e:
        continue

txt_file_list = glob.glob(os.path.join(data_folder_path, '**', '*.txt'), recursive=True)
for file_path in txt_file_list:
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            text_content = f.read()
            if text_content:
                all_processed_data.append({'text': text_content, 'is_spam': 0})
    except Exception as e:
        continue

combined_df = pd.DataFrame(all_processed_data)
combined_df['cleaned_text'] = combined_df['text'].apply(clean_text)
combined_df.drop_duplicates(subset=['cleaned_text'], inplace=True)
combined_df.reset_index(drop=True, inplace=True)

# --- 2. 데이터 분할 및 클래스 가중치 계산 ---
X = combined_df['cleaned_text']
y = combined_df['is_spam']

X_train, X_temp, y_train, y_temp = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)
X_val, X_test, y_val, y_test = train_test_split(
    X_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp
)

class_weights = class_weight.compute_class_weight(
    'balanced',
    classes=np.unique(y_train),
    y=y_train
)
class_weights_tensor = torch.tensor(class_weights, dtype=torch.float)

print("[데이터 분할 및 클래스 가중치]")
print(f"총 데이터 개수: {len(X)}")
print(f"훈련 데이터 개수: {len(X_train)}")
print(f"검증 데이터 개수: {len(X_val)}")
print(f"테스트 데이터 개수: {len(X_test)}")
print(f"훈련 데이터 라벨 분포: 정상({(y_train == 0).sum()}), 스팸({(y_train == 1).sum()})")
print(f"계산된 클래스 가중치 (Tensor): {class_weights_tensor}")

# --- 3. BERT 토크나이저 및 데이터셋 준비 ---
model_name = "beomi/kcbert-base"
tokenizer = AutoTokenizer.from_pretrained(model_name)

class SpamDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len=128):
        self.texts = texts.tolist()
        self.labels = labels.tolist()
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = str(self.texts[idx])
        label = self.labels[idx]
        
        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            padding='max_length',
            truncation=True,
            return_token_type_ids=False,
            return_attention_mask=True,
            return_tensors='pt',
        )

        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'labels': torch.tensor(label, dtype=torch.long)
        }

train_dataset = SpamDataset(X_train, y_train, tokenizer)
val_dataset = SpamDataset(X_val, y_val, tokenizer)
test_dataset = SpamDataset(X_test, y_test, tokenizer)

print("\nBERT 데이터셋 준비 완료!")

[데이터 분할 및 클래스 가중치]
총 데이터 개수: 40669
훈련 데이터 개수: 32535
검증 데이터 개수: 4067
테스트 데이터 개수: 4067
훈련 데이터 라벨 분포: 정상(31306), 스팸(1229)
계산된 클래스 가중치 (Tensor): tensor([ 0.5196, 13.2364])


tokenizer_config.json:   0%|          | 0.00/49.0 [00:00<?, ?B/s]

To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development


config.json:   0%|          | 0.00/619 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]


BERT 데이터셋 준비 완료!


In [21]:
import torch
import torch.nn as nn
from transformers import AutoModelForSequenceClassification, Trainer, TrainingArguments
import numpy as np
import joblib
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, confusion_matrix
from torch.utils.data import DataLoader

# 1단계에서 정의된 변수들을 사용합니다.
# (train_dataset, val_dataset, test_dataset, class_weights_tensor)

# --- 1. BERT 모델 및 학습 설정 ---
model_name = "beomi/kcbert-base"
num_labels = 2

class WeightedCELoss(nn.Module):
    def __init__(self, weight):
        super().__init__()
        self.register_buffer('weight', weight)
        self.loss_fn = nn.CrossEntropyLoss(weight=self.weight)
    
    def forward(self, logits, labels):
        return self.loss_fn(logits, labels)

model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=num_labels)
model.classifier = nn.Linear(model.config.hidden_size, num_labels)
model.classifier.bias.data.zero_()
model.classifier.weight.data.normal_(mean=0.0, std=model.config.initializer_range)

# --- 2. TrainingArguments 정의 ---
training_args = TrainingArguments(
    output_dir='./results',
    num_train_epochs=3,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    weight_decay=0.01,
    learning_rate=2e-5,
    eval_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
    report_to="none",
)

# --- 3. Trainer 클래스 정의 및 학습 (수정된 부분) ---
class CustomTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False, **kwargs): # **kwargs 추가
        labels = inputs.pop("labels")
        outputs = model(**inputs)
        logits = outputs.get("logits")
        loss = WeightedCELoss(class_weights_tensor.to(logits.device))(logits.view(-1, self.model.config.num_labels), labels.view(-1))
        return (loss, outputs) if return_outputs else loss

trainer = CustomTrainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
)

print("[BERT 모델 학습 시작]")
trainer.train()

# --- 4. 모델 성능 평가 (테스트 데이터) ---
test_loader = DataLoader(test_dataset, batch_size=16)
predictions = []
true_labels = []

model.eval()
for batch in test_loader:
    with torch.no_grad():
        inputs = {k: v.to(trainer.args.device) for k, v in batch.items()}
        labels = inputs.pop('labels')
        outputs = model(**inputs)
        
        logits = outputs.logits
        probs = torch.softmax(logits, dim=1)[:, 1]
        
        predictions.extend(probs.cpu().numpy())
        true_labels.extend(labels.cpu().numpy())

y_pred_proba = np.array(predictions)
y_pred = (y_pred_proba > 0.5).astype("int32")
y_true = np.array(true_labels)

# 평가지표 계산
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, zero_division=0)
recall = recall_score(y_true, y_pred, zero_division=0)
f1 = f1_score(y_true, y_pred, zero_division=0)
roc_auc = roc_auc_score(y_true, y_pred_proba)

# 혼동 행렬 계산
tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
fpr = fp / (fp + tn)
fnr = fn / (fn + tp)

print("\n" + "="*50)
print("              [최종 BERT 모델 성능 평가]")
print("="*50)
print(f"Accuracy (정확도): {accuracy:.4f}")
print(f"Precision (정밀도): {precision:.4f}")
print(f"Recall (재현율): {recall:.4f}")
print(f"F1-Score: {f1:.4f}")
print(f"ROC-AUC: {roc_auc:.4f}")
print("-" * 50)
print(f"정상 메일 오탐률: {fpr:.4f} ({fpr*100:.2f}%)")
print(f"스팸 미탐률: {fnr:.4f} ({fnr*100:.2f}%)")
print("-" * 50)
print(f"**목표 기준**")
print(f" - F1: >= 0.92")
print(f" - ROC-AUC: >= 0.97")
print(f" - 정상 메일 오탐률: <= 2.5%")
print(f" - 스팸 미탐률: <= 5%")
print("="*50)

# 모델과 토크나이저 저장
model.save_pretrained('./final_bert_model')
tokenizer.save_pretrained('./final_bert_model')
print("\n최종 BERT 모델과 토크나이저를 'final_bert_model' 폴더에 저장했습니다.")

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at beomi/kcbert-base and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


[BERT 모델 학습 시작]


Epoch,Training Loss,Validation Loss
1,0.0711,0.026637
2,0.0421,0.017515
3,0.0164,0.025456





              [최종 BERT 모델 성능 평가]
Accuracy (정확도): 0.9943
Precision (정밀도): 0.9012
Recall (재현율): 0.9542
F1-Score: 0.9270
ROC-AUC: 0.9993
--------------------------------------------------
정상 메일 오탐률: 0.0041 (0.41%)
스팸 미탐률: 0.0458 (4.58%)
--------------------------------------------------
**목표 기준**
 - F1: >= 0.92
 - ROC-AUC: >= 0.97
 - 정상 메일 오탐률: <= 2.5%
 - 스팸 미탐률: <= 5%

최종 BERT 모델과 토크나이저를 'final_bert_model' 폴더에 저장했습니다.


In [22]:
import joblib
import re
import pandas as pd
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch

# 저장된 모델과 토크나이저 불러오기
try:
    tokenizer = AutoTokenizer.from_pretrained('./final_bert_model')
    model = AutoModelForSequenceClassification.from_pretrained('./final_bert_model')
    print("모델과 토크나이저를 성공적으로 불러왔습니다.")
except Exception as e:
    print("저장된 모델 파일이 없거나 오류가 발생했습니다. 학습 코드를 먼저 실행하여 모델을 저장해주세요.")
    exit()

# 모델을 평가 모드로 설정
model.eval()

# 텍스트 전처리 함수
def clean_text(text):
    text = str(text)
    text = re.sub(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', '', text)
    text = re.sub(r'\[.*?\]', '', text)
    text = re.sub(r'[^\w\s]', '', text)
    text = re.sub(r' +', ' ', text)
    return text.strip()

# 새로운 문장 예측 함수
def predict_spam(new_text):
    cleaned_text = clean_text(new_text)
    
    # 텍스트를 토크나이저로 변환
    encoding = tokenizer.encode_plus(
        cleaned_text,
        add_special_tokens=True,
        max_length=128,
        padding='max_length',
        truncation=True,
        return_tensors='pt'
    )
    
    # 모델 예측
    with torch.no_grad():
        outputs = model(**encoding)
        logits = outputs.logits
        probabilities = torch.softmax(logits, dim=1)
        
    prediction = torch.argmax(probabilities, dim=1).item()
    
    if prediction == 1:
        result = "스팸(Spam)"
    else:
        result = "정상(Normal)"

    print(f"\n입력 문장: '{new_text}'")
    print(f"분류 결과: {result} (정상: {probabilities[0, 0].item():.4f}, 스팸: {probabilities[0, 1].item():.4f})")

# 다양한 스팸 및 정상 문장 테스트
print("\n[새로운 문장 테스트 결과]\n")
predict_spam("안녕하세요. 내일 2시에 커피 한 잔 하실래요?")
predict_spam("광고: 지금 바로 접속하시면 엄청난 혜택이 쏟아집니다. 무료로 확인해보세요!")
predict_spam("일단 고객 정보가 별로 없어서 프로모션 광고 메일을 보낼 수가 없는데 우리 회사의 주 고객층 사람들 메일 주소를 알아낼 방법이 있나요?")
predict_spam("재택 알바! 하루 30분 투자로 월 200만원 보장! 지금 신청하세요!")
predict_spam("긴급 안내: 고객님께서는 300만원 당첨되셨습니다. 확인을 위해 링크를 클릭하세요.")
predict_spam("안녕하세요. 최근 문의하신 건에 대한 답변입니다. 자세한 내용은 첨부파일 확인 부탁드립니다.")

모델과 토크나이저를 성공적으로 불러왔습니다.

[새로운 문장 테스트 결과]


입력 문장: '안녕하세요. 내일 2시에 커피 한 잔 하실래요?'
분류 결과: 정상(Normal) (정상: 1.0000, 스팸: 0.0000)

입력 문장: '광고: 지금 바로 접속하시면 엄청난 혜택이 쏟아집니다. 무료로 확인해보세요!'
분류 결과: 스팸(Spam) (정상: 0.0047, 스팸: 0.9953)

입력 문장: '일단 고객 정보가 별로 없어서 프로모션 광고 메일을 보낼 수가 없는데 우리 회사의 주 고객층 사람들 메일 주소를 알아낼 방법이 있나요?'
분류 결과: 정상(Normal) (정상: 0.9971, 스팸: 0.0029)

입력 문장: '재택 알바! 하루 30분 투자로 월 200만원 보장! 지금 신청하세요!'
분류 결과: 정상(Normal) (정상: 1.0000, 스팸: 0.0000)

입력 문장: '긴급 안내: 고객님께서는 300만원 당첨되셨습니다. 확인을 위해 링크를 클릭하세요.'
분류 결과: 정상(Normal) (정상: 1.0000, 스팸: 0.0000)

입력 문장: '안녕하세요. 최근 문의하신 건에 대한 답변입니다. 자세한 내용은 첨부파일 확인 부탁드립니다.'
분류 결과: 정상(Normal) (정상: 0.9999, 스팸: 0.0001)
