## 1. YOLO를 이용한 사람탐지 전이학습

In [None]:
## YOLO11s
from ultralytics import YOLO

# 모델 로드
model = YOLO('yolo11s.pt')

# 학습 실행 - 3000장에 최적화된 간단한 설정
results = model.train(
    data='helmet_yolo_dataset/data.yaml',
    epochs=150,          # 3000장이니까 조금 더 길게
    imgsz=640,
    batch=8,            # GPU 메모리에 따라 8~16 조정
    patience=30,         # 30 에포크 개선 없으면 조기 종료
    save_period=10,      # 10 에포크마다 저장
    name='helmet_detection'  # 결과 폴더 이름
)

In [None]:
## YOLO12s
from ultralytics import YOLO

# 모델 로드
model = YOLO('yolo11s.pt')

# 학습 실행 - 3000장에 최적화된 간단한 설정
results = model.train(
    data='helmet_yolo_dataset/data.yaml',
    epochs=150,          # 3000장이니까 조금 더 길게
    imgsz=640,
    batch=8,            # GPU 메모리에 따라 8~16 조정
    patience=30,         # 30 에포크 개선 없으면 조기 종료
    save_period=10,      # 10 에포크마다 저장
    name='helmet_detection'  # 결과 폴더 이름
)

## 2. 객체 이미지 전처리 방식 개선

In [None]:
## 상반신crop + 컬러
from ultralytics import YOLO
import torch

DATA_DIR = r"C:\Users\main\Documents\project_helmet\DMC_Backup_cctv\crop_데이터\dataset_cls"

model = YOLO("yolo11s-cls.pt")  # (인식 안 되면 'yolov8s-cls.pt'로 대체)

results = model.train(
    data=DATA_DIR,      # train/, val/ 포함된 루트
    epochs=100,
    imgsz=224,          # 분류는 224/256 권장
    batch=16,           # VRAM 보고 조절
    device=0 if torch.cuda.is_available() else "cpu",
    workers=0,          # Windows 권장
    name="helmet_cls_yolo11s",
    exist_ok=True,
    patience=20,
)

In [None]:
## 상반신crop + Grayscale

## 3. Vision Transfomer(ViT) 전이 학습

In [None]:
import os
import cv2
import torch
import numpy as np
from torch.utils.data import Dataset, DataLoader
from transformers import (
    ViTForImageClassification,
    ViTImageProcessor,
    TrainingArguments,
    Trainer,
    TrainerCallback,
    get_cosine_schedule_with_warmup
)
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import torch.nn.functional as F
from tqdm import tqdm
import json
import random

class HelmetDataset(Dataset):
    """헬멧 데이터셋 클래스 (데이터 증강 포함)"""

    def __init__(self, images_dir, labels_dir, processor, augment=False):
        self.images_dir = images_dir
        self.labels_dir = labels_dir
        self.processor = processor
        self.augment = augment  # 데이터 증강 여부

        # 이미지 파일 목록 가져오기
        self.image_files = [f for f in os.listdir(images_dir) if f.endswith('.jpg')]

        # 라벨 파일 존재 확인
        valid_files = []
        for img_file in self.image_files:
            label_file = img_file.replace('.jpg', '.txt')
            label_path = os.path.join(labels_dir, label_file)
            if os.path.exists(label_path):
                valid_files.append(img_file)

        self.image_files = valid_files
        print(f"유효한 데이터: {len(self.image_files)}개")
        if self.augment:
            print("데이터 증강 활성화: 좌우 반전 (50% 확률)")

    def __len__(self):
        return len(self.image_files)

    def apply_horizontal_flip(self, image):
        """좌우 반전 적용 (50% 확률)"""
        if random.random() < 0.5:
            return cv2.flip(image, 1)  # 1은 좌우 반전
        return image

    def __getitem__(self, idx):
        # 이미지 로드
        img_file = self.image_files[idx]
        img_path = os.path.join(self.images_dir, img_file)

        # OpenCV로 이미지 읽기
        image = cv2.imread(img_path)
        if image is None:
            # 한글 경로 대응
            with open(img_path, 'rb') as f:
                img_array = np.frombuffer(f.read(), np.uint8)
            image = cv2.imdecode(img_array, cv2.IMREAD_COLOR)

        # BGR -> RGB 변환
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        # 데이터 증강 적용 (학습시만)
        if self.augment:
            image = self.apply_horizontal_flip(image)

        # 라벨 로드
        label_file = img_file.replace('.jpg', '.txt')
        label_path = os.path.join(self.labels_dir, label_file)

        with open(label_path, 'r') as f:
            label = int(f.read().strip())

        # ViTImageProcessor 적용
        processed = self.processor(images=image, return_tensors="pt")
        pixel_values = processed['pixel_values'].squeeze(0)

        return {
            'pixel_values': pixel_values,
            'labels': torch.tensor(label, dtype=torch.long)
        }

def compute_metrics(eval_pred):
    """평가 메트릭 계산"""
    predictions, labels = eval_pred
    predictions = np.argmax(predictions, axis=1)

    accuracy = accuracy_score(labels, predictions)
    return {'accuracy': accuracy}

class LoggingCallback(TrainerCallback):
    """학습 로그를 JSON 파일로 저장하는 콜백"""

    def __init__(self, log_file="training_log.json"):
        self.log_file = log_file
        self.logs = []

    def on_log(self, args, state, control, model=None, logs=None, **kwargs):
        """로그 이벤트마다 호출"""
        if logs:
            log_entry = {
                'step': state.global_step,
                'epoch': state.epoch,
                **logs
            }
            self.logs.append(log_entry)

            # 로그를 파일에 저장
            with open(self.log_file, 'w') as f:
                json.dump(self.logs, f, indent=2)

def train_helmet_classifier():
    """개선된 헬멧 분류기 학습"""

    # GPU 설정
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"사용 디바이스: {device}")

    # 데이터 경로 설정
    base_dir = "C:/helmet_data"  # 경로 수정 필요시 여기서 변경
    train_images_dir = os.path.join(base_dir, "bbox_train_dataset", "images")
    train_labels_dir = os.path.join(base_dir, "bbox_train_dataset", "labels")
    test_images_dir = os.path.join(base_dir, "bbox_test_dataset", "images")
    test_labels_dir = os.path.join(base_dir, "bbox_test_dataset", "labels")

    # 경로 존재 확인
    for path in [train_images_dir, train_labels_dir, test_images_dir, test_labels_dir]:
        if not os.path.exists(path):
            print(f"경로가 존재하지 않습니다: {path}")
            return

    # ViT 프로세서 및 모델 로드
    print("ViT 모델 로딩중...")
    model_name = "google/vit-base-patch16-224-in21k"
    processor = ViTImageProcessor.from_pretrained(model_name)
    model = ViTForImageClassification.from_pretrained(
        model_name,
        num_labels=2,  # 이진분류
        id2label={0: "helmet", 1: "no_helmet"},
        label2id={"helmet": 0, "no_helmet": 1}
    )

    # 모델을 GPU로 이동
    model.to(device)

    # 데이터셋 생성 (학습용은 증강 활성화, 테스트용은 비활성화)
    print("데이터셋 준비중...")
    train_dataset = HelmetDataset(train_images_dir, train_labels_dir, processor, augment=True)
    test_dataset = HelmetDataset(test_images_dir, test_labels_dir, processor, augment=False)

    # 총 스텝 수 계산 (warmup용)
    per_device_train_batch_size = 8
    gradient_accumulation_steps = 2
    num_train_epochs = 10
    effective_batch_size = per_device_train_batch_size * gradient_accumulation_steps
    total_steps = (len(train_dataset) // effective_batch_size) * num_train_epochs
    warmup_steps = int(total_steps * 0.06)  # 전체의 6%

    print(f"총 스텝 수: {total_steps}")
    print(f"Warmup 스텝 수: {warmup_steps}")

    # 개선된 학습 설정
    training_args = TrainingArguments(
        output_dir='./bbox_classifier_results_improved',
        num_train_epochs=num_train_epochs,

        # 개선된 배치 설정
        per_device_train_batch_size=per_device_train_batch_size,  # 4 → 8
        per_device_eval_batch_size=16,  # 평가용은 더 크게
        gradient_accumulation_steps=gradient_accumulation_steps,  # 4 → 2

        # 개선된 학습률 및 스케줄러 설정
        learning_rate=4e-5,  # 5e-5 → 4e-5
        lr_scheduler_type="cosine",  # 코사인 스케줄러 추가
        warmup_steps=warmup_steps,

        # 개선된 정규화
        weight_decay=0.03,  # 0.01 → 0.03
        label_smoothing_factor=0.05,  # Label smoothing 추가
        max_grad_norm=1.0,  # Gradient clipping 추가

        # 로깅 설정
        logging_dir='./bbox_logs_improved',
        logging_strategy="steps",
        logging_steps=50,
        logging_first_step=True,

        # 평가 설정
        eval_strategy="steps",
        eval_steps=200,

        # 저장 설정
        save_strategy="steps",
        save_steps=400,
        save_total_limit=3,  # 최근 3개만 보존 (디스크 공간 절약)

        load_best_model_at_end=True,
        metric_for_best_model="accuracy",
        greater_is_better=True,

        # 최적화 설정
        fp16=True,
        gradient_checkpointing=True,
        dataloader_pin_memory=False,
        dataloader_num_workers=0,
        remove_unused_columns=False,

        # 리포팅 설정
        report_to=["tensorboard"],
        run_name="bbox_classifier_improved_v1",
    )

    # 로깅 콜백 생성
    logging_callback = LoggingCallback("./bbox_training_progress_improved.json")

    # Trainer 생성
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=test_dataset,
        compute_metrics=compute_metrics,
        callbacks=[logging_callback],
    )

    # 학습 시작
    print("\n" + "="*60)
    print("개선된 헬멧 분류기 학습 시작!")
    print("="*60)
    print(f"학습 데이터: {len(train_dataset)}개 (증강 포함)")
    print(f"테스트 데이터: {len(test_dataset)}개")
    print(f"Effective Batch Size: {effective_batch_size}")
    print(f"총 에포크: {num_train_epochs}")
    print(f"총 스텝 수: {total_steps}")
    print("\n개선사항:")
    print("배치 사이즈: 4 → 8")
    print("학습률: 5e-5 → 4e-5 + 코사인 스케줄러")
    print("Weight decay: 0.01 → 0.03")
    print("Label smoothing: 0.05")
    print("Gradient clipping: 1.0")
    print("="*60)

    trainer.train()

    # 최종 평가
    print("\n최종 평가 중...")
    eval_results = trainer.evaluate()
    print(f"최종 정확도: {eval_results['eval_accuracy']:.4f}")

    # 모델 저장
    model_save_path = "./bbox_classifier_improved_final"
    trainer.save_model(model_save_path)
    processor.save_pretrained(model_save_path)
    print(f"모델 저장 완료: {model_save_path}")

    # trainer_state.json도 수동으로 저장
    trainer_state_path = "./bbox_trainer_state_improved_final.json"
    with open(trainer_state_path, 'w') as f:
        json.dump(trainer.state.__dict__, f, indent=2, default=str)
    print(f"학습 상태 저장 완료: {trainer_state_path}")

    # 상세 평가 (혼동 행렬 등)
    print("\n상세 평가 중...")
    detailed_evaluation(model, test_dataset, device)

def detailed_evaluation(model, test_dataset, device):
    """상세 평가 수행"""
    model.eval()

    dataloader = DataLoader(test_dataset, batch_size=16, shuffle=False)

    all_predictions = []
    all_labels = []

    with torch.no_grad():
        for batch in tqdm(dataloader, desc="평가 중"):
            pixel_values = batch['pixel_values'].to(device)
            labels = batch['labels'].to(device)

            outputs = model(pixel_values=pixel_values)
            predictions = torch.argmax(outputs.logits, dim=-1)

            all_predictions.extend(predictions.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    # 분류 리포트
    print("\n=== 분류 리포트 ===")
    print(classification_report(all_labels, all_predictions,
                              target_names=['헬멧 착용', '헬멧 미착용']))

    # 혼동 행렬
    print("\n=== 혼동 행렬 ===")
    cm = confusion_matrix(all_labels, all_predictions)
    print("실제\\예측   헬멧착용  헬멧미착용")
    print(f"헬멧착용     {cm[0,0]:6d}    {cm[0,1]:6d}")
    print(f"헬멧미착용   {cm[1,0]:6d}    {cm[1,1]:6d}")

    # 정확도 계산
    accuracy = accuracy_score(all_labels, all_predictions)
    print(f"\n최종 정확도: {accuracy:.4f} ({accuracy*100:.2f}%)")

def predict_single_image(image_path, model_path="./bbox_classifier_improved_final"):
    """단일 이미지 예측 (개선된 모델용)"""
    # 모델 로드
    processor = ViTImageProcessor.from_pretrained(model_path)
    model = ViTForImageClassification.from_pretrained(model_path)

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    model.eval()

    # 이미지 로드 및 전처리
    image = cv2.imread(image_path)
    if image is None:
        with open(image_path, 'rb') as f:
            img_array = np.frombuffer(f.read(), np.uint8)
        image = cv2.imdecode(img_array, cv2.IMREAD_COLOR)

    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    # 예측
    inputs = processor(images=image, return_tensors="pt").to(device)

    with torch.no_grad():
        outputs = model(**inputs)
        predictions = F.softmax(outputs.logits, dim=-1)
        predicted_class = torch.argmax(predictions, dim=-1).item()
        confidence = predictions[0][predicted_class].item()

    result = "헬멧 착용" if predicted_class == 0 else "헬멧 미착용"
    print(f"예측 결과: {result} (신뢰도: {confidence:.4f})")

    return predicted_class, confidence

def view_training_logs():
    """개선된 학습 로그 확인"""
    try:
        with open("./bbox_training_progress_improved.json", "r") as f:
            logs = json.load(f)

        print("개선된 학습 로그 요약:")
        print(f"총 로그 엔트리: {len(logs)}개")

        if logs:
            first_log = logs[0]
            last_log = logs[-1]
            print(f"첫 번째 로그: Step {first_log['step']}, Epoch {first_log['epoch']}")
            print(f"마지막 로그: Step {last_log['step']}, Epoch {last_log['epoch']}")

            # eval_accuracy가 있는 로그들만 필터링
            eval_logs = [log for log in logs if 'eval_accuracy' in log]
            if eval_logs:
                print(f"평가 로그: {len(eval_logs)}개")
                print("정확도 변화:")
                for log in eval_logs[:5]:  # 처음 5개만 출력
                    print(f"  Step {log['step']}: {log['eval_accuracy']:.4f}")

                # 최고 정확도 찾기
                best_accuracy = max(eval_logs, key=lambda x: x['eval_accuracy'])
                print(f"최고 정확도: {best_accuracy['eval_accuracy']:.4f} (Step {best_accuracy['step']})")

    except FileNotFoundError:
        print("bbox_training_progress_improved.json 파일을 찾을 수 없습니다.")

def compare_with_original():
    """원본 결과와 개선된 결과 비교"""
    print("\n" + "="*60)
    print("성능 비교 (원본 vs 개선)")
    print("="*60)

    # 원본 로그
    try:
        with open("./bbox_training_progress.json", "r") as f:
            original_logs = json.load(f)
        original_eval_logs = [log for log in original_logs if 'eval_accuracy' in log]
        if original_eval_logs:
            original_best = max(original_eval_logs, key=lambda x: x['eval_accuracy'])
            print(f"원본 최고 정확도: {original_best['eval_accuracy']:.4f}")
        else:
            print("원본 평가 로그를 찾을 수 없습니다.")
    except FileNotFoundError:
        print("원본 학습 로그를 찾을 수 없습니다.")

    # 개선된 로그
    try:
        with open("./bbox_training_progress_improved.json", "r") as f:
            improved_logs = json.load(f)
        improved_eval_logs = [log for log in improved_logs if 'eval_accuracy' in log]
        if improved_eval_logs:
            improved_best = max(improved_eval_logs, key=lambda x: x['eval_accuracy'])
            print(f"개선 최고 정확도: {improved_best['eval_accuracy']:.4f}")

            # 개선 효과 계산
            if 'original_best' in locals():
                improvement = improved_best['eval_accuracy'] - original_best['eval_accuracy']
                print(f"개선 효과: +{improvement:.4f} ({improvement*100:.2f}%p)")
        else:
            print("개선된 평가 로그를 찾을 수 없습니다.")
    except FileNotFoundError:
        print("개선된 학습 로그를 찾을 수 없습니다.")

if __name__ == "__main__":
    # 개선된 학습 실행
    train_helmet_classifier()

    # 학습 완료 후 로그 확인
    print("\n" + "="*50)
    view_training_logs()

    # 원본과 비교
    compare_with_original()