**라이브러리 임포트**

In [1]:
import os
import json
import random
import shutil
import cv2

from google.colab import drive

import torch
import torch.nn as nn
import torch.optim as optim
from torchvision.datasets import ImageFolder
from torchvision import transforms, models, datasets
from torch.utils.data import DataLoader, Subset

**데이터셋 unzip**

In [2]:
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
%cd /content/drive/MyDrive/CAB/CAB_dataset/JPEGImages
!unzip -qq "/content/drive/MyDrive/CAB/CAB_dataset/JPEGImages/images_bbox.zip"

/content/drive/MyDrive/CAB/CAB_dataset/JPEGImages


In [None]:
%cd /content/drive/MyDrive/CAB/CAB_dataset/Annotations
!unzip -qq "/content/drive/MyDrive/CAB/CAB_dataset/Annotations/label_bbox.zip"

/content/drive/MyDrive/CAB/CAB_dataset/Annotations


**데이터 라벨링**

파일 경로 변환
- /content/drive/MyDrive/CAB/CAB_dataset/JPEGImages 하위 디렉토리에 있는 이미지를 /content/drive/MyDrive/CAB/CAB_dataset/JPEGImages로 옮긴다.
- /content/drive/MyDrive/CAB/CAB_dataset/Annotations 하위 디렉토리에 있는 이미지를 /content/drive/MyDrive/CAB/CAB_dataset/Annotations로 옮긴다.

In [None]:
def move_file(base_dir):
  for root, dirs, files in os.walk(base_dir):
    for file in files:
        if file.lower().endswith(('.jpg', 'json')):
            source_path = os.path.join(root, file)
            destination_path = os.path.join(base_dir, file)

            shutil.move(source_path, destination_path)

jpeg_dir = "/content/drive/MyDrive/CAB/CAB_dataset/JPEGImages"
annot_dir = "/content/drive/MyDrive/CAB/CAB_dataset/Annotations"

move_file(jpeg_dir)
move_file(annot_dir)

입과 눈 사진을 저장할 폴더를 만들어 준다.

In [None]:
data_root = "/content/drive/MyDrive/CAB/CAB_dataset/"
eyes_mouth = os.path.join(data_root, "EyesMouth")
os.makedirs(eyes_mouth, exist_ok=False)

In [None]:
class_mapping = {
      "Face": 0,
      "Leye": 1,
      "Reye": 2,
      "Mouth": 3,
      "Cigar": 4,
      "Phone": 5
}

이미지에서 양쪽 눈과 입 사진을 잘라서 저장한다.
이때 파일명은 기존 파일명_클래스 이름_Open 혹은 기존 파일명_클래스 이름_Close로 한다.

In [None]:
for image in os.listdir(jpeg_dir):
  if image.endswith('jpg'):
    name = os.path.splitext(image)[0]

    img_path = os.path.join(jpeg_dir, image)
    annot_path = os.path.join(annot_dir, name + '.json')

    with open(annot_path, 'r', encoding='utf-8') as f:
      data = json.load(f)

    for obj, bbox in data["ObjectInfo"]["BoundingBox"].items():
      if bbox["isVisible"] and class_mapping[obj] in [1, 2, 3]:

        x1, y1, x2, y2 = bbox["Position"]

        Is_open = "Open" if bbox["Opened"] else "Close"

        img = cv2.imread(img_path)
        crop_img = img[y1:y2, x1:x2]

        img_dst = os.path.join(eyes_mouth, f"{name}_{obj}_{Is_open}.jpg")
        cv2.imwrite(img_dst, crop_img)

파일명을 참고하여 이미지를 open 혹은 close 폴더로 옮긴다.



```python
# /content/drive/MyDrive/CAB/CAB_dataset/fs/
# ├── open/
# │   └── 파일명_클래스 이름_Open.jpg
# ├── close/
#      └── 파일명_클래스 이름_Close.jpg
```

In [None]:
data_root = "/content/drive/MyDrive/CAB/CAB_dataset/"

fs_root = os.path.join(data_root, "fs")
open_root = os.path.join(fs_root, "open")
close_root = os.path.join(fs_root, "close")

os.makedirs(open_root, exist_ok=False)
os.makedirs(close_root, exist_ok=False)

In [None]:
for file in os.listdir(eyes_mouth):
  if file.endswith('.jpg') and 'Open' in file:
    img_src = os.path.join(eyes_mouth, file)
    img_dst = os.path.join(open_root, file)

  elif file.endswith('.jpg') and 'Close' in file:
    img_src = os.path.join(eyes_mouth, file)
    img_dst = os.path.join(close_root, file)

  shutil.copy2(img_src, img_dst)


**데이터 undersampling**

다음으로, 모든 Image의 절대 경로가 적힌 리스트를 만든다.

In [4]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize((0.5), (0.5))
])

dataset = ImageFolder(root=fs_root, transform=transform)

class_to_idx = dataset.class_to_idx
print(f'클래스별 인덱스: {class_to_idx}')

class_counts = [0] * len(class_to_idx)
for _, target in dataset.samples:
    class_counts[target] += 1

print(f"클래스별 샘플 수: {class_counts}")
print(f"데이터셋 크기: {len(dataset)}")


클래스별 인덱스: {'close': 0, 'open': 1}
클래스별 샘플 수: [8661, 15364]
데이터셋 크기: 24025


데이터 불균형 문제와 제공된 GPU RAM, 학습 속도 등을 고려하여 클래스별 샘플 수를 줄여준다.

여기서는 실험 결과 클래스별 샘플 수를 2500개로 하는 것이 이상적이었다.

In [5]:
close_samples = [(sample, target) for sample, target in dataset.samples if target == 0]
open_samples = [(sample, target) for sample, target in dataset.samples if target == 1]

open_samples = open_samples[:2500]
close_samples = close_samples[:2500]

dataset.samples = close_samples + open_samples

**train/val/test loader 생성**

전체 이미지를 6:2:2 비율로 나누어 train set, validation set, test set을 생성한다.

ImageFolder는 instance를 직접 섞는 걸 허용하지 않는다. 따라서 무작위로 섞인 인덱스를 이용하여 훈련/검증/테스트 데이터셋을 만든다.


In [6]:
dataset_indices = list(range(len(dataset)))

random.shuffle(dataset_indices)

train_size = int(0.6 * len(dataset))
val_size = int(0.2 * len(dataset))
test_size = val_size

train_dataset = Subset(dataset, dataset_indices[:train_size])
val_dataset = Subset(dataset, dataset_indices[train_size:train_size+val_size])
test_dataset = Subset(dataset, dataset_indices[train_size+val_size:])

In [7]:
print("length of train dataset: ", len(train_dataset))
print("length of val dataset: ", len(val_dataset))
print("length of test dataset: ", len(test_dataset))

length of train dataset:  3000
length of val dataset:  1000
length of test dataset:  1000


다음으로, 훈련/검증/테스트 데이터로더를 생성한다.

- num_workers: 2개의 프로세서가 병렬로 데이터를 불러와 이력 데이터가 더 빨리 준비될 수 있도록 한다.
- pin_memory=True: CPU에서 GPU로 데이터를 전송할 때 발생하는 복사 작업을 빠르게 할 수 있도록 도와준다.

In [8]:
# 데이터 로더 생성
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True,  num_workers=2, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=True,  num_workers=2, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=True, num_workers=2, pin_memory=True)

**분류 모델 생성**

분류 모델로는 이전 프로젝트에서 우수한 성능을 보였던 ResNet50 모델을 선택하였다. 최적화 기법으로는 Adam optimizer를 사용하였으며, 학습률(lr)은 0.0001로 설정하였다. 또한, 배치 사이즈(batch size)는 32로 초기화하였다.

(지난 프로젝트 결과, ResNet50 모델에서 <Adam optimizer/lr = 0.0001/batch size = 64> 조합일 때 가장 최상의 결과를 얻었다. 이번 프로젝트에서도 최상의 결과를 얻고자 비슷한 조합을 사용했으며, 제공된 GPU RAM을 고려하여 batch size만 32로 수정하였다.)

In [9]:
resnet50 = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)

Downloading: "https://download.pytorch.org/models/resnet50-11ad3fa6.pth" to /root/.cache/torch/hub/checkpoints/resnet50-11ad3fa6.pth
100%|██████████| 97.8M/97.8M [00:00<00:00, 189MB/s]


In [10]:
class MyResNet50(nn.Module):
    def __init__(self, pretrained_model):
        super(MyResNet50, self).__init__()
        self.backbone = pretrained_model

        self.dropout = nn.Dropout(0.3)
        self.extra_layer = nn.Linear(1000, 2) # open/close 판단
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        x = self.backbone(x)
        x = self.dropout(x)
        x = self.softmax(self.extra_layer(x))
        return x

In [11]:
myresnet50 = MyResNet50(resnet50)

optimizer = optim.Adam(myresnet50.parameters(), lr=0.0001)
criterion = nn.CrossEntropyLoss()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

**훈련/검증/테스트 함수 정의**

In [12]:
def train(model):
  model.train()

  for batch_idx, (data, target) in enumerate(train_loader):
    data, target = data.to(device), target.to(device)

    optimizer.zero_grad()

    output = model(data)

    loss = criterion(output, target)
    loss.backward()
    optimizer.step()

In [13]:
def val(model, epoch):
  model.eval()

  val_loss = 0
  correct = 0

  with torch.no_grad():
    for data, target in val_loader:
      data, target = data.to(device), target.to(device)

      output = model(data)

      val_loss += criterion(output, target).item()
      pred = output.argmax(dim=1, keepdim=True)
      correct += pred.eq(target.view_as(pred)).sum().item()

  val_loss /= len(val_loader.dataset)
  accuracy = 100. * correct / len(val_loader.dataset)

  print(f"Epoch: {epoch}, Average loss: {val_loss:.4f}, Accuracy: {correct}/{len(val_loader.dataset)} ({accuracy:.2f}%)")

  return val_loss

In [14]:
def test(model):
  model.eval()

  test_loss = 0
  correct = 0

  with torch.no_grad():
    for data, target in test_loader:
      data, target = data.to(device), target.to(device)

      output = model(data)

      test_loss += criterion(output, target).item()
      pred = output.argmax(dim=1, keepdim=True)
      correct += pred.eq(target.view_as(pred)).sum().item()

  test_loss /= len(test_loader.dataset)
  accuracy = 100. * correct / len(test_loader.dataset)

  print(f"Average loss: {test_loss:.4f}, Accuracy: {correct}/{len(test_loader.dataset)} ({accuracy:.2f}%)")


**EarlyStopping 정의**

overfitting을 방지하기 위해 earlystopping을 사용한다.

In [15]:
class EarlyStopping:
    def __init__(self, patience=10, verbose=False, counter=0, best_loss=float('inf')):
        self.patience = patience
        self.verbose = verbose
        self.counter = counter
        self.best_loss = best_loss
        self.early_stop = False

    def __call__(self, val_loss, model):
        if val_loss < self.best_loss:
            self.counter = 0
            self.best_loss = val_loss
            torch.save(model.state_dict(), '/content/drive/MyDrive/CAB/CAB_dataset/model/best_resnet_model.pth')
        else:
            self.counter += 1
            if self.counter >= self.patience:
              self.early_stop = True

In [16]:
num_epochs = 50

In [17]:
myresnet50.to(device)

early_stopping = EarlyStopping(verbose=True)

for epoch in range(1, num_epochs + 1):
  train(myresnet50)
  val_loss = val(myresnet50, epoch)

  early_stopping(val_loss, myresnet50)

  if early_stopping.early_stop:
    print("************************************************************\nEarly stop!")
    myresnet50.load_state_dict(torch.load('/content/drive/MyDrive/CAB/CAB_dataset/model/best_resnet_model.pth', map_location=device))
    test(myresnet50)
    break

Epoch: 1, Average loss: 0.0127, Accuracy: 913/1000 (91.30%)
Epoch: 2, Average loss: 0.0123, Accuracy: 931/1000 (93.10%)
Epoch: 3, Average loss: 0.0111, Accuracy: 965/1000 (96.50%)
Epoch: 4, Average loss: 0.0115, Accuracy: 949/1000 (94.90%)
Epoch: 5, Average loss: 0.0113, Accuracy: 957/1000 (95.70%)
Epoch: 6, Average loss: 0.0112, Accuracy: 962/1000 (96.20%)
Epoch: 7, Average loss: 0.0112, Accuracy: 961/1000 (96.10%)
Epoch: 8, Average loss: 0.0115, Accuracy: 948/1000 (94.80%)
Epoch: 9, Average loss: 0.0111, Accuracy: 967/1000 (96.70%)
Epoch: 10, Average loss: 0.0116, Accuracy: 948/1000 (94.80%)
Epoch: 11, Average loss: 0.0109, Accuracy: 974/1000 (97.40%)
Epoch: 12, Average loss: 0.0112, Accuracy: 960/1000 (96.00%)
Epoch: 13, Average loss: 0.0111, Accuracy: 966/1000 (96.60%)
Epoch: 14, Average loss: 0.0111, Accuracy: 968/1000 (96.80%)
Epoch: 15, Average loss: 0.0109, Accuracy: 972/1000 (97.20%)
Epoch: 16, Average loss: 0.0109, Accuracy: 972/1000 (97.20%)
Epoch: 17, Average loss: 0.0111, 

  myresnet50.load_state_dict(torch.load('/content/drive/MyDrive/CAB/CAB_dataset/model/best_resnet_model.pth', map_location=device))


Average loss: 0.0106, Accuracy: 980/1000 (98.00%)
