In [None]:
# 0. 기본 설정

from google.colab import drive
drive.mount('/content/drive')

import os
import zipfile

# 기본 경로 설정
base_path = '/content/drive/MyDrive'
extract_root = os.path.join(base_path, 'emotions')

# 감정별 zip 파일 리스트

# emotion_zip_files = ['happy.zip', 'fear.zip', 'sadness.zip', 'anger.zip']

# for zip_name in emotion_zip_files:
#     zip_path = os.path.join(base_path, zip_name)
#     emotion_name = zip_name.replace('.zip', '')
#     extract_path = os.path.join(extract_root, emotion_name)
#     # 압축 해제
#     if not os.path.exists(extract_path) or len(os.listdir(extract_path)) == 0:
#         os.makedirs(extract_path, exist_ok=True)
#         with zipfile.ZipFile(zip_path, 'r') as zip_ref:
#             zip_ref.extractall(extract_path)
#         print(" 압축 해제 완료!")
#     else:
#         print("이미 압축이 해제되어 있어 건너뜁니다.")

In [None]:

# 1. 필요한 라이브러리 임포트
import random
import pandas as pd
from PIL import Image
from sklearn.model_selection import train_test_split
from torchvision import transforms, models
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm
from datetime import datetime
from PIL import ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True


In [None]:

# 2. 환경 세팅 (GPU 사용 가능하면 GPU로)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Device: {device}")

In [None]:
# 3. 이미지 데이터셋 경로 (본인 환경에 맞게 수정)
image_dir = '/content/drive/MyDrive/emotions'

In [None]:
# 4. 데이터프레임 생성 (파일명, 경로, 라벨)
data = []
for label in os.listdir(image_dir):
    label_path = os.path.join(image_dir, label)
    if os.path.isdir(label_path):
        for fname in os.listdir(label_path):
            if fname.lower().endswith(('.jpg', '.jpeg', '.png')):
                full_path = os.path.join(label_path, fname)
                try:
                    img = Image.open(full_path)
                    img.verify()
                    data.append({'image': full_path, 'label': label})
                except Exception as e:
                    print(f"Invalid image skipped: {full_path}")


df = pd.DataFrame(data)
print(f"총 이미지 수: {df.shape[0]}")
print(df['label'].value_counts())

In [None]:
# 5. 감정별로 stratified split 하기 (train 75%, val 15%, test 10%)
train_list = []
val_list = []
test_list = []

for label in df['label'].unique():
    temp_df = df[df['label'] == label]
    train_df, temp_df2 = train_test_split(temp_df, test_size=0.25, random_state=42, shuffle=True)
    val_df, test_df = train_test_split(temp_df2, test_size=0.4, random_state=42, shuffle=True)
    # 0.25 * 0.4 = 0.10 test

    train_list.append(train_df)
    val_list.append(val_df)
    test_list.append(test_df)

train_df = pd.concat(train_list).reset_index(drop=True)
val_df = pd.concat(val_list).reset_index(drop=True)
test_df = pd.concat(test_list).reset_index(drop=True)

# 라벨 인덱스 생성
label_to_idx = {label: idx for idx, label in enumerate(sorted(df['label'].unique()))}
idx_to_label = {v: k for k, v in label_to_idx.items()}

train_df['label_idx'] = train_df['label'].map(label_to_idx)
val_df['label_idx'] = val_df['label'].map(label_to_idx)
test_df['label_idx'] = test_df['label'].map(label_to_idx)

print("Label to idx mapping:", label_to_idx)
print(f"Train size: {len(train_df)}, Val size: {len(val_df)}, Test size: {len(test_df)}")

In [None]:
# 6. Dataset 클래스 정의
class EmotionDataset(Dataset):
    def __init__(self, dataframe, transform=None):
        self.dataframe = dataframe
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        row = self.dataframe.iloc[idx]
        image = Image.open(row['image']).convert('RGB')
        if self.transform:
            image = self.transform(image)
        label = row['label_idx']
        return image, label

In [None]:
# 7. 전처리 정의
# 데이터 전처리, 증식 train 랜덤 포함, val&test 랜덤 미포함
train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(5), # 랜덤 10 -> 5 로 조정
    transforms.ColorJitter(brightness=0.1, contrast=0.1), # 0.2 -> 0.1 로 약하게 조정
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225])
])

val_test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225])
])

train_dataset = EmotionDataset(train_df, transform=train_transform)
val_dataset = EmotionDataset(val_df, transform=val_test_transform)
test_dataset = EmotionDataset(test_df, transform=val_test_transform)
label_counts = train_df['label_idx'].value_counts().to_dict()

# 각 샘플의 weight 계산 (클래스가 적을수록 높은 weight)
weights = train_df['label_idx'].map(lambda x: 1.0 / label_counts[x]).values

# WeightedRandomSampler 정의
sampler = WeightedRandomSampler(weights, num_samples=len(weights), replacement=True)

train_loader = DataLoader(train_dataset, batch_size=64, sampler=sampler, num_workers=2)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False, num_workers=2)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False, num_workers=2)

In [None]:
# 8. ResNet18을 미리 학습된 모델에서 불러와 fine-tuning
model = models.resnet18(pretrained=True)
in_features = model.fc.in_features
model.fc = nn.Sequential(
    nn.Dropout(0.3),
    nn.Linear(in_features, len(label_to_idx))
)
# layer4와 fc만 학습되도록 설정
for name, param in model.named_parameters():
    if 'layer4' in name or 'fc' in name:
        param.requires_grad = True
    else:
        param.requires_grad = False

model = model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-5)  

In [None]:
# 9. 학습 함수 정의
def train_epoch(model, dataloader, criterion, optimizer, device):
    model.train()
    running_loss = 0
    correct = 0
    total = 0

    for images, labels in tqdm(dataloader):
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * images.size(0)
        _, preds = torch.max(outputs, 1)
        correct += (preds == labels).sum().item()
        total += labels.size(0)

    return running_loss / total, correct / total

In [None]:
# 10. 검증 함수 정의
def eval_model(model, dataloader, criterion, device):
    model.eval()
    running_loss = 0
    correct = 0
    total = 0

    with torch.no_grad():
        for images, labels in tqdm(dataloader):
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)

            running_loss += loss.item() * images.size(0)
            _, preds = torch.max(outputs, 1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)

    return running_loss / total, correct / total

In [None]:
# 11. 학습 루프
num_epochs = 20
best_val_acc = 0
# best_val_loss = float('inf')  # val_loss 최소값 초기화
# 손실도로 early stop하기
patience = 5  # early stop 기준
early_stop_counter = 0
# 히스토리 저장할 리스트
train_losses, val_losses = [], []
train_accuracies, val_accuracies = [], []

for epoch in range(num_epochs):

    print(f"\nEpoch {epoch+1}/{num_epochs}")

    train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
    print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}")

    val_loss, val_acc = eval_model(model, val_loader, criterion, device)
    print(f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")

    # 히스토리 저장
    train_losses.append(train_loss)
    val_losses.append(val_loss)
    train_accuracies.append(train_acc)
    val_accuracies.append(val_acc)

    if val_acc > best_val_acc:
         best_val_acc = val_acc
         torch.save(model.state_dict(), 'best_emotion_model.pth')
         early_stop_counter = 0 # 성능 좋아졌으니까 성능 초기화
    else:
         early_stop_counter += 1
         print(f"⏸No improvement. Early stop counter: {early_stop_counter}/{patience}")
         if early_stop_counter >= patience:
             print("Early stopping triggered!")
             break

In [None]:
# 12. 테스트 데이터 정확도 측정
model.load_state_dict(torch.load('best_emotion_model.pth'))
test_loss, test_acc = eval_model(model, test_loader, criterion, device)
print(f"\nTest Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}")

In [None]:
# 13. 끝
print(f"\n학습 완료! {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")