In [8]:
import json
import os
from pathlib import Path
import cv2
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
from PIL import Image
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from tqdm import tqdm


In [9]:
CONFIG = {
    "image_dir": "sleepy/dataset/Trainning/image_trainning/all_image_trainning",
    "label_dir": "sleepy/dataset/Trainning/label_trainning/all_label_trainning",
    "test_image_dir": "sleepy/dataset/Test/image_test/all_image_test",
    "test_label_dir": "sleepy/dataset/Test/label_test/all_label_test",
    "device": torch.device("cuda" if torch.cuda.is_available() else "cpu"),
    "img_size": 90,
    "batch_size": 32,
    "epochs": 5,
    "learning_rate": 0.001,
    "max_samples": 15000,
    "model_path": "eye_state_classifier.pth"
}


In [10]:
def str2bool(val: str) -> bool:
    return str(val).strip().lower() == "true"

def crop_and_resize_eye(frame: np.ndarray, bbox: tuple, size: int = 90) -> np.ndarray | None:
    if bbox is None:
        return None
    x, y, w, h = bbox
    x1 = max(x - int(w * 0.5), 0)
    y1 = max(y - int(h * 0.5), 0)
    x2 = min(x + w + int(w * 0.5), frame.shape[1])
    y2 = min(y + h + int(h * 0.5), frame.shape[0])
    eye_crop = frame[y1:y2, x1:x2]
    if eye_crop.size == 0:
        return None
    return cv2.resize(eye_crop, (size, size))

In [11]:

class EyeDataset(Dataset):
    def __init__(self, image_dir, label_dir, transform=None, max_samples=None):
        self.image_dir = Path(image_dir)
        self.label_dir = Path(label_dir)
        self.transform = transform
        self.samples = []

        image_files = sorted([f for f in os.listdir(image_dir) if f.lower().endswith(('.jpg', '.jpeg', '.png'))])
        if max_samples:
            image_files = image_files[:max_samples]

        print("🔍 유효한 눈 데이터를 수집 중...")
        for img_name in tqdm(image_files):
            img_path = self.image_dir / img_name
            label_path = self.label_dir / (img_name.rsplit('.', 1)[0] + '.json')
            if not label_path.exists():
                continue

            try:
                with open(label_path, 'r', encoding='utf-8') as f:
                    label_data = json.load(f)
            except Exception:
                continue
            
            object_info = label_data.get('ObjectInfo', {})
            bounding_boxes = object_info.get('BoundingBox', {})
            
            leye_info = bounding_boxes.get('Leye', {})
            reye_info = bounding_boxes.get('Reye', {})

            if not (leye_info.get('isVisible', False) and reye_info.get('isVisible', False)):
                continue

            leye_opened = str2bool(leye_info.get('Opened', 'false'))
            reye_opened = str2bool(reye_info.get('Opened', 'false'))

            leye_pos = leye_info.get('Position', [0, 0, 0, 0])
            reye_pos = reye_info.get('Position', [0, 0, 0, 0])
            
            try:
                leye_bbox = tuple(map(int, map(float, leye_pos)))
                reye_bbox = tuple(map(int, map(float, reye_pos)))
            except (ValueError, TypeError):
                continue

            if leye_bbox[2] > 0 and leye_bbox[3] > 0:
                self.samples.append((str(img_path), leye_bbox, int(leye_opened)))
            if reye_bbox[2] > 0 and reye_bbox[3] > 0:
                self.samples.append((str(img_path), reye_bbox, int(reye_opened)))
            
        print(f"✅ 총 {len(self.samples)}개의 눈 이미지 수집 완료")

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        img_path, bbox, label = self.samples[idx]
        try:
            image = Image.open(img_path).convert("RGB")
        except FileNotFoundError:
            print(f"경고: 파일을 찾을 수 없습니다: {img_path}")
            return None, None

        x, y, w, h = bbox
        if w <= 0 or h <= 0:
            return None, None
        
        eye_img = image.crop((x, y, x + w, y + h))

        if self.transform:
            eye_img = self.transform(eye_img)

        return eye_img, label


In [12]:
class EyeCNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(3, 16, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(16, 32, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(32, 64, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(64, 128, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
        )
        self.fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(128 * 5 * 5, 128),
            nn.ReLU(),
            nn.Linear(128, 1),
            nn.Sigmoid()
        )

    def forward(self, x):
        x = self.conv(x)
        # Flatten 후 FC 레이어에 맞는 크기로 reshape
        x = x.view(x.size(0), -1)
        # EyeCNN 모델의 fc 레이어 크기가 128 * 5 * 5로 하드코딩 되어있어
        # 입력 이미지 크기가 90이 아닐 경우 에러가 발생할 수 있습니다.
        # MaxPool2d를 4번 거친 후 크기는 90 -> 45 -> 22 -> 11 -> 5 가 됩니다.
        # 따라서 Flatten 후 128 * 5 * 5가 됩니다.
        x = self.fc(x)
        return x


In [None]:
def train_model(model, train_loader, criterion, optimizer, device, epochs):
    model.train()
    losses = []
    print("\n=== 🚀 학습 시작 ===")
    for epoch in range(epochs):
        total_loss = 0
        for images, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}"):
            if images.nelement() == 0:
                continue
            images = images.to(device)
            labels = labels.float().unsqueeze(1).to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        
        if len(train_loader) > 0:
            avg_loss = total_loss / len(train_loader)
            print(f"Epoch {epoch+1}/{epochs} | Loss: {avg_loss:.4f}")
            losses.append(avg_loss)
        else:
            print(f"Epoch {epoch+1}/{epochs} | 학습할 데이터가 없습니다.")

    return losses

def evaluate_model(model, test_loader, device):
    model.eval()
    preds, trues = [], []
    print("\n=== 📊 테스트 평가 시작 ===")
    with torch.no_grad():
        for images, labels in tqdm(test_loader, desc="평가 중"):
            if images.nelement() == 0:
                continue
            images = images.to(device)
            outputs = model(images)
            predicted = (outputs > 0.5).int().cpu()
            preds.append(predicted)
            trues.append(labels.cpu())

    if not preds:
        print("❗ 예측 결과가 없습니다. 테스트 데이터셋을 확인해주세요.")
        return np.array([]), np.array([])
    
    y_pred = torch.cat(preds).view(-1).numpy()
    y_true = torch.cat(trues).view(-1).numpy()
    
    return y_true, y_pred



🔍 유효한 눈 데이터를 수집 중...


100%|██████████| 13446/13446 [00:01<00:00, 7139.48it/s]


✅ 총 6960개의 눈 이미지 수집 완료
🔍 유효한 눈 데이터를 수집 중...


100%|██████████| 510/510 [00:00<00:00, 5994.36it/s]


✅ 총 20개의 눈 이미지 수집 완료

학습 샘플 수: 6960
테스트 샘플 수: 20

=== 🚀 학습 시작 ===


Epoch 1/5: 100%|██████████| 218/218 [00:58<00:00,  3.74it/s]


Epoch 1/5 | Loss: 0.3125


Epoch 2/5:  64%|██████▍   | 140/218 [00:39<00:23,  3.37it/s]

In [None]:
def main():
    # 데이터 변환 정의
    transform = transforms.Compose([
        transforms.Resize((CONFIG['img_size'], CONFIG['img_size'])),
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
    ])

    # 데이터셋 및 데이터로더 준비
    train_dataset = EyeDataset(CONFIG['image_dir'], CONFIG['label_dir'], transform=transform, max_samples=CONFIG['max_samples'])
    test_dataset = EyeDataset(CONFIG['test_image_dir'], CONFIG['test_label_dir'], transform=transform)

    def collate_fn(batch):
        batch = list(filter(lambda x: x[0] is not None, batch))
        if not batch:
            return torch.Tensor(), torch.Tensor()
        return torch.utils.data.dataloader.default_collate(batch)

    train_loader = DataLoader(train_dataset, batch_size=CONFIG['batch_size'], shuffle=True, collate_fn=collate_fn)
    test_loader = DataLoader(test_dataset, batch_size=CONFIG['batch_size'], shuffle=False, collate_fn=collate_fn)

    print(f"\n학습 샘플 수: {len(train_dataset)}")
    print(f"테스트 샘플 수: {len(test_dataset)}")

    # 모델, 손실 함수, 옵티마이저 정의
    model = EyeCNN().to(CONFIG['device'])
    criterion = nn.BCELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=CONFIG['learning_rate'])

    # 모델 학습
    losses = train_model(model, train_loader, criterion, optimizer, CONFIG['device'], CONFIG['epochs'])

    # 학습된 모델 저장
    torch.save(model.state_dict(), CONFIG['model_path'])
    print(f"\n✅ 모델이 '{CONFIG['model_path']}'에 저장되었습니다.")

    # 모델 평가
    # (선택 사항) 저장된 모델을 다시 로드하여 평가할 수도 있습니다.
    # model.load_state_dict(torch.load(CONFIG['model_path']))
    y_true, y_pred = evaluate_model(model, test_loader, CONFIG['device'])
    
    # 평가 결과 출력
    if len(y_true) > 0:
        print("\n[ 최종 평가 결과 ]")
        print(classification_report(y_true, y_pred, target_names=["Closed (0)", "Opened (1)"], zero_division=0))
        
        print("Confusion Matrix:")
        print(confusion_matrix(y_true, y_pred))
        
        print(f"\nAccuracy: {accuracy_score(y_true, y_pred):.4f}")
    
    # 손실 그래프 시각화
    if losses:
        plt.figure(figsize=(10, 5))
        plt.plot(losses, label='Train Loss')
        plt.title("Training Loss per Epoch")
        plt.xlabel("Epoch")
        plt.ylabel("Loss")
        plt.grid(True)
        plt.legend()
        plt.show()

if __name__ == '__main__':
    main()