In [6]:
# 1. 필수 라이브러리 설치 (터미널이 아닌 주피터 셀에서 실행 시 !)
!pip install efficientnet_pytorch albumentations pandas matplotlib scikit-learn

import os
import cv2
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from efficientnet_pytorch import EfficientNet
import albumentations as A
from albumentations.pytorch import ToTensorV2
import matplotlib.pyplot as plt
from tqdm import tqdm

# GPU 설정
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
if device.type == 'cuda':
    print(torch.cuda.get_device_name(0))

[0mUsing device: cuda
NVIDIA A100-SXM4-80GB MIG 3g.40gb


In [10]:
from sklearn.model_selection import train_test_split
import glob

# 데이터셋 클래스 수정 (파일 리스트를 직접 받을 수 있게 변경)
class DeepDetectDataset(Dataset):
    def __init__(self, file_paths=None, labels=None, root_dir=None, split='test', transform=None):
        """
        1. file_paths & labels가 있으면: 그걸 그대로 씀 (Train/Val 나눌 때 사용)
        2. 없으면: root_dir/split 폴더에서 직접 읽음 (Test 셋 읽을 때 사용)
        """
        self.transform = transform
        
        if file_paths is not None and labels is not None:
            # 리스트를 직접 받은 경우 (Train / Val 분할 시)
            self.image_paths = file_paths
            self.labels = labels
        else:
            # 폴더에서 직접 읽는 경우 (Test 셋)
            self.image_paths = []
            self.labels = []
            # 대소문자 이슈 방지를 위해 소문자로도 체크
            for label, class_name in enumerate(['Real', 'Fake']):
                class_dir = os.path.join(root_dir, split, class_name)
                if not os.path.exists(class_dir): 
                    class_dir = os.path.join(root_dir, split, class_name.lower())
                
                # 해당 폴더의 모든 이미지 파일 읽기
                if os.path.exists(class_dir):
                    files = [os.path.join(class_dir, f) for f in os.listdir(class_dir) 
                             if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
                    self.image_paths.extend(files)
                    self.labels.extend([label] * len(files))
                else:
                    print(f"Warning: {class_dir} not found!")

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        try:
            image = cv2.imread(img_path)
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        except:
            print(f"Error loading image: {img_path}")
            # 에러 발생 시 검은 이미지 반환 (학습 중단 방지)
            return torch.zeros((3, 224, 224)), torch.tensor(self.labels[idx], dtype=torch.float32)

        if self.transform:
            augmented = self.transform(image=image)
            image = augmented['image']
        
        return image, torch.tensor(self.labels[idx], dtype=torch.float32)

# 논문 Listing 1에 기반한 증강 파이프라인 [cite: 91-101]
train_transform = A.Compose([
    A.Resize(224, 224, interpolation=3), # 3 = Bicubic [cite: 93]
    A.HorizontalFlip(p=0.5),             # [cite: 93]
    
    # 주파수 강건성 증강 (Domain-Specific)
    A.OneOf([
        A.GaussianBlur(blur_limait=(3, 7), p=0.5),           # 
        A.ImageCompression(quality_lower=60, p=0.5),        # 
    ], p=0.5),
    
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), # [cite: 100]
    ToTensorV2()
])

val_transform = A.Compose([
    A.Resize(224, 224, interpolation=3),
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ToTensorV2()
])

In [11]:
# 1. Train 폴더의 모든 파일 경로와 라벨 읽어오기
all_train_paths = []
all_train_labels = []
DATA_DIR = './data' # 본인의 데이터 경로

# Real / Fake 폴더 순회
for label, class_name in enumerate(['Real', 'Fake']): # 0: Real, 1: Fake
    # 대소문자 호환성 체크 (Real/real, Fake/fake)
    class_dir = os.path.join(DATA_DIR, 'train', class_name)
    if not os.path.exists(class_dir):
        class_dir = os.path.join(DATA_DIR, 'train', class_name.lower())
    
    if os.path.exists(class_dir):
        files = [os.path.join(class_dir, f) for f in os.listdir(class_dir)]
        all_train_paths.extend(files)
        all_train_labels.extend([label] * len(files))
        print(f"Loaded {len(files)} images from {class_name}")
    else:
        print(f"CRITICAL ERROR: Folder not found: {class_dir}")

# 2. Train / Val 분할 (8:2 비율)
train_paths, val_paths, train_labels, val_labels = train_test_split(
    all_train_paths, all_train_labels, test_size=0.2, random_state=42, stratify=all_train_labels
)

print(f"Total Train: {len(train_paths)}, Total Val: {len(val_paths)}")

# 3. 데이터셋 및 로더 생성
# Train에는 강력한 증강(Augmentation) 적용
train_dataset = DeepDetectDataset(file_paths=train_paths, labels=train_labels, transform=train_transform)

# Val에는 기본 변환(Resize+Normalize)만 적용 -> 이게 논문 성능의 핵심입니다!
val_dataset = DeepDetectDataset(file_paths=val_paths, labels=val_labels, transform=val_transform)

# DataLoader
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=4)

Loaded 48815 images from Real
Loaded 41594 images from Fake
Total Train: 72327, Total Val: 18082


In [13]:
import os
import cv2
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split

# 1. 데이터 경로 설정 (train 폴더 속에 Real, Fake가 있는 구조)
DATA_DIR = './data' 

# 2. Train 폴더의 모든 이미지 경로 긁어오기
all_train_paths = []
all_train_labels = []

# train 폴더 내의 Real(0), Fake(1) 폴더를 순회
for label, class_name in enumerate(['Real', 'Fake']):
    # 대소문자 문제 방지 (Real/real, Fake/fake 모두 체크)
    class_dir = os.path.join(DATA_DIR, 'train', class_name)
    if not os.path.exists(class_dir):
        class_dir = os.path.join(DATA_DIR, 'train', class_name.lower())
    
    if os.path.exists(class_dir):
        # 해당 폴더의 모든 파일 리스트 가져오기
        files = [os.path.join(class_dir, f) for f in os.listdir(class_dir) 
                 if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
        
        all_train_paths.extend(files)
        all_train_labels.extend([label] * len(files)) # 0 또는 1 라벨링
        print(f"'{class_name}' 폴더에서 {len(files)}장 로드 완료")
    else:
        print(f"경고: {class_dir} 경로를 찾을 수 없습니다. 폴더명을 확인해주세요.")

# 3. 데이터가 정상적으로 로드되었는지 확인
if len(all_train_paths) == 0:
    raise ValueError("이미지를 하나도 못 찾았습니다! 경로 구조를 다시 확인해주세요.")

# 4. Train : Validation = 8 : 2 로 분할 (Stratified Split)
train_paths, val_paths, train_labels, val_labels = train_test_split(
    all_train_paths, all_train_labels, 
    test_size=0.2, 
    random_state=42, 
    stratify=all_train_labels # Real/Fake 비율 맞춰서 자르기
)

print(f"최종 학습 데이터: {len(train_paths)}장")
print(f"최종 검증 데이터: {len(val_paths)}장")

# 5. 데이터셋 생성 (여기서 에러가 안 나게 file_paths를 명시적으로 넣어줍니다)
# Train용: 증강(Augmentation) 적용
train_dataset = DeepDetectDataset(file_paths=train_paths, labels=train_labels, transform=train_transform)

# Val용: 증강 없이 리사이징만 적용 (검증의 정확성을 위해)
val_dataset = DeepDetectDataset(file_paths=val_paths, labels=val_labels, transform=val_transform)

# 6. 데이터 로더 생성
train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=128, shuffle=False, num_workers=4)

print("데이터 준비 완료! 이제 학습 코드를 실행하세요.")

'Real' 폴더에서 48815장 로드 완료
'Fake' 폴더에서 41594장 로드 완료
최종 학습 데이터: 72327장
최종 검증 데이터: 18082장
데이터 준비 완료! 이제 학습 코드를 실행하세요.


In [None]:
def get_advanced_model():
    # Pre-trained EfficientNet-B4 로드 [cite: 172]
    model = EfficientNet.from_pretrained('efficientnet-b4')
    
    # 출력 피처 수 확인 (EfficientNet-B4는 1792) [cite: 174]
    num_ftrs = model._fc.in_features 
    
    # Binary Classification Head 수정 [cite: 178]
    model._fc = nn.Sequential(
        nn.Dropout(p=0.4),            # [cite: 179]
        nn.Linear(num_ftrs, 1),       # [cite: 180]
        nn.Sigmoid()                  # [cite: 181]
    )
    return model

model = get_advanced_model().to(device)
print("Model loaded successfully.")

In [None]:
from tqdm import tqdm

# 손실함수 & 최적화함수 설정
criterion = nn.BCELoss() # 이진 분류 손실함수
optimizer = optim.Adam(model.parameters(), lr=1e-4) # 학습률 0.0001

def train_model(model, train_loader, val_loader, epochs=5):
    best_acc = 0.0
    print(f"학습 시작! (총 Epochs: {epochs})")
    
    for epoch in range(epochs):
        # --- 학습 (Train) ---
        model.train()
        running_loss = 0.0
        
        # tqdm으로 진행률 바 표시
        progress_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}")
        for images, labels in progress_bar:
            images, labels = images.to(device), labels.to(device).unsqueeze(1)
            
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
            # 진행바에 현재 Loss 표시
            progress_bar.set_postfix({'loss': running_loss / (progress_bar.n + 1)})
            
        # --- 검증 (Validation) ---
        model.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device).unsqueeze(1)
                outputs = model(images)
                predicted = (outputs > 0.5).float() # 0.5보다 크면 Fake(1), 작으면 Real(0)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
        
        epoch_acc = 100 * correct / total
        epoch_loss = running_loss / len(train_loader)
        
        print(f"\n[결과] Epoch {epoch+1} - Loss: {epoch_loss:.4f}, Val Acc: {epoch_acc:.2f}%")
        
        # 성능이 좋으면 모델 저장
        if epoch_acc > best_acc:
            best_acc = epoch_acc
            torch.save(model.state_dict(), 'best_model.pth')
            print(f"--> 최고 성능 갱신! 모델 저장됨 ({best_acc:.2f}%)")

# 학습 실행 (일단 3 Epoch만 돌려보세요)
train_model(model, train_loader, val_loader, epochs=3)

학습 시작! (총 Epochs: 3)


Epoch 1/3:  26%|████▍            | 586/2261 [02:08<06:05,  4.58it/s, loss=0.259]