import Basic Module

In [None]:
import os
import torch
import numpy as np
from pathlib import Path
import time
from datetime import datetime, timedelta
from tqdm import tqdm
import importlib

# Utils import (모듈화)
from utils import create_dataloaders, CDMetrics, get_loss_fn



CUDA(GPU) 확인

In [35]:
# 단일 GPU 사용
GPU_ID = 0 
os.environ['CUDA_VISIBLE_DEVICES'] = str(GPU_ID)
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using GPU: {GPU_ID}")
print(f"Device: {DEVICE}")

# 멀티 GPU 사용 
# # GPU_IDS = [0, 1, 2, 3]  # 사용할 GPU 리스트
# os.environ['CUDA_VISIBLE_DEVICES'] = ','.join(map(str, GPU_IDS))
# DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# USE_MULTI_GPU = len(GPU_IDS) > 1 and torch.cuda.device_count() > 1
# print(f"Using GPUs: {GPU_IDS}")
# print(f"Available GPU count: {torch.cuda.device_count()}")
# if USE_MULTI_GPU:
#     BATCH_SIZE = BATCH_SIZE * len(GPU_IDS)  # 멀티 GPU시 배치 크기 조정
#     print(f"Adjusted batch size for multi-GPU: {BATCH_SIZE}")
# 시드 설정 (재현가능성)

Using GPU: 0
Device: cuda


데이터셋 & 모델 리스트

In [36]:
DATASET_ROOT = "./dataset"  # 심볼릭 링크된 데이터셋 루트 폴더
DATASET_LIST = [
    'LEVIR-CD+',
    'WHU-CD',
    'CLCD',
    'CaBuAr-CD',
    'S2Looking-CD',
    'SEN1Floods11-CD'
]

MODEL_LIST = [
    'A2Net',
    'Changer',
    'Change3D',
    'STRobustNet',
    'USSFC-Net'
    # 'ChangeMamba',
    # 'CDMamba',
    # 'ChangeCLIP',
    # 'EATDER'
]

시드 설정

In [37]:
SEED = 42
np.random.seed(SEED)
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed(SEED)

##### 고정 메트릭 설정

In [38]:
# 이미지 설정
IMG_SIZE = 256 
IN_CHANNELS = 3  # RGB
OUT_CHANNELS = 1  # Binary change detection

# 학습 설정
BATCH_SIZE = 16
NUM_WORKERS = 4
MAX_ITERATIONS = 100000  # 데이터셋 크기와 무관하게 고정


##### 모델 & 데이터셋 설정

In [None]:
# 실험할 데이터셋 선택
test_dataset = 'LEVIR-CD+'  # LEVIR-CD+
# test_dataset = DATASET_LIST[0]  # LEVIR-CD+

# 실험할 모델 선택
test_model = 'A2Net'

# True: 최소 구현, False: 전체 구현
base = True  

print(f"Test dataset: {test_dataset}")
if test_model not in MODEL_LIST:
    raise ValueError(f"Model {test_model} not in MODEL_LIST. Choose from: {MODEL_LIST}")
print(f"Test model: {test_model}")
print(f"Using base version: {base}")

Test dataset: LEVIR-CD+
Test model: Change3D


모델 동적 import

In [None]:

# %% 모델 동적 import
import importlib

def get_model_class(model_name, use_base=False):
    """모델 동적 import"""
    
    # 모델 매핑 (base와 full 버전)
    model_mapping = {
        'A2Net': {
            'base': ('models.a2net_base', 'A2NetBase'),
            'full': ('models.a2net', 'A2Net')  # 나중에 구현
        },
        'Change3D': {
            'base': ('models.change3d_base', 'Change3DBase'),
            'full': ('models.change3d', 'Change3D')
        },
        # 추가 모델들...
    }
    
    if model_name not in model_mapping:
        raise ValueError(f"Model {model_name} not implemented")
    
    # base/full 선택
    version = 'base' if use_base else 'full'
    module_path, class_name = model_mapping[model_name][version]
    
    try:
        module = importlib.import_module(module_path)
        model_class = getattr(module, class_name)
        return model_class
    except ImportError:
        if use_base:
            raise ImportError(f"{model_name} base version not found at {module_path}")
        else:
            print(f"Full version not found, falling back to base version")
            return get_model_class(model_name, use_base=True)

# 모델 클래스 가져오기
ModelClass = get_model_class(test_model, use_base=base)
print(f"Loaded model: {ModelClass.__name__}")


In [40]:
from configs import get_model_config

model_config = get_model_config(test_model)

optimizer = model_config['optimizer']
learning_rate = model_config['learning_rate']
weight_decay = model_config['weight_decay']
betas = model_config['betas']
eps = model_config['eps']
scheduler = model_config['scheduler']
momentum = model_config['momentum']

print(f"Model configurations:")
print(f"  Optimizer: {optimizer}")
print(f"  Learning rate: {learning_rate}")
print(f"  Weight decay: {weight_decay}")
if betas:
    print(f"  Betas: {betas}")
if momentum:
    print(f"  Momentum: {momentum}")
print(f"  Scheduler: {scheduler}")


Model configurations:
  Optimizer: adam
  Learning rate: 0.0002
  Weight decay: 0.0001
  Betas: (0.9, 0.999)
  Scheduler: poly


옵티마이저 생성 함수

In [None]:
def get_optimizer(model, config):
    """모델 설정에 따른 옵티마이저 생성"""
    if config['optimizer'] == 'adam':
        optimizer = torch.optim.Adam(
            model.parameters(), 
            lr=config['learning_rate'],
            betas=config['betas'],
            eps=config['eps'],
            weight_decay=config['weight_decay']
        )
    elif config['optimizer'] == 'adamw':
        optimizer = torch.optim.AdamW(
            model.parameters(),
            lr=config['learning_rate'],
            betas=config['betas'],
            eps=config['eps'],
            weight_decay=config['weight_decay']
        )
    elif config['optimizer'] == 'sgd':
        optimizer = torch.optim.SGD(
            model.parameters(),
            lr=config['learning_rate'],
            momentum=config['momentum'],
            weight_decay=config['weight_decay']
        )
    else:
        raise ValueError(f"Unknown optimizer: {config['optimizer']}")
    
    return optimizer

실험 폴더 설정

In [41]:
test_path = f"experiments/{test_dataset}"
test_dir = Path(f"{test_path}")
test_dir.mkdir(parents=True, exist_ok=True)

model_dir = Path(f"{test_path}/{test_model}")
model_dir.mkdir(parents=True, exist_ok=True)

checkpoint_dir = model_dir / "checkpoints"
checkpoint_dir.mkdir(exist_ok=True)

print(f"Experiment directory: {model_dir}")

Experiment directory: experiments/WHU-CD/Change3D


Iteration 계산 
MAX_ITERATIONS = 100000을 기준으로 epoch 수 계산


In [44]:
dataset_path = Path(DATASET_ROOT) / test_dataset
print(dataset_path)
if dataset_path.exists():
    print(f"Dataset found: {dataset_path}")
    splits = ['train', 'val', 'test']
    dataset_info = {}
    
    for split in splits:
        split_path = dataset_path / split
        if split_path.exists():
            img_count = len(list((split_path / 't1').glob('*')))
            dataset_info[split] = img_count
            print(f"  {split}: {img_count} images")
    
    # Epoch 수 계산 (MAX_ITERATIONS 기준)
    if 'train' in dataset_info:
        train_samples = dataset_info['train']
        iterations_per_epoch = train_samples // BATCH_SIZE
        EPOCHS = MAX_ITERATIONS // iterations_per_epoch
        
        print(f"\nTraining iterations info:")
        print(f"  Train samples: {train_samples}")
        print(f"  Iterations per epoch: {iterations_per_epoch}")
        print(f"  Total epochs: {EPOCHS}")
        print(f"  Total iterations: {EPOCHS * iterations_per_epoch}")
else:
    print(f"Dataset not found: {dataset_path}")
    raise FileNotFoundError(f"Dataset {test_dataset} not found at {dataset_path}")

dataset/WHU-CD
Dataset found: dataset/WHU-CD
  train: 4233 images
  val: 747 images
  test: 2700 images

Training iterations info:
  Train samples: 4233
  Iterations per epoch: 264
  Total epochs: 378
  Total iterations: 99792


모델 학습 및 검증

In [None]:

# %% 모델 import
from models.a2net_base import A2NetBase
from utils.dataset import create_dataloaders
from utils.metrics import CDMetrics
from utils.losses import BCEDiceLoss

# %% 데이터로더 생성
train_loader, val_loader, test_loader = create_dataloaders(
    root_dir=DATASET_ROOT,
    dataset_name=test_dataset,
    batch_size=BATCH_SIZE,
    num_workers=NUM_WORKERS,
    img_size=IMG_SIZE,
    augment=True
)

print(f"Train batches: {len(train_loader)}")
print(f"Val batches: {len(val_loader)}")
print(f"Test batches: {len(test_loader)}")

# %% 모델, 손실함수, 옵티마이저
model = A2NetBase(num_classes=1).to(DEVICE)

# 손실 함수
criterion = BCEDiceLoss()

# 옵티마이저 (configs에서 가져온 설정 사용)
if optimizer == 'adam':
    opt = torch.optim.Adam(
        model.parameters(),
        lr=learning_rate,
        betas=betas,
        weight_decay=weight_decay
    )

# %% 간단한 학습 루프
from tqdm import tqdm

def train_one_epoch(model, loader, criterion, optimizer, device):
    model.train()
    total_loss = 0
    
    for batch in tqdm(loader, desc='Training'):
        img1 = batch['img1'].to(device)
        img2 = batch['img2'].to(device)
        label = batch['label'].to(device)
        
        # Forward
        output = model(img1, img2)
        loss = criterion(output, label)
        
        # Backward
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
    
    return total_loss / len(loader)

def validate(model, loader, criterion, device):
    model.eval()
    total_loss = 0
    metrics = CDMetrics()
    
    with torch.no_grad():
        for batch in tqdm(loader, desc='Validation'):
            img1 = batch['img1'].to(device)
            img2 = batch['img2'].to(device)
            label = batch['label'].to(device)
            
            output = model(img1, img2)
            loss = criterion(output, label)
            
            total_loss += loss.item()
            metrics.update(output, label)
    
    results = metrics.get_metrics()
    return total_loss / len(loader), results

# %% 학습 실행
num_epochs = 10  # 테스트용으로 적게 설정

for epoch in range(num_epochs):
    print(f'\nEpoch {epoch+1}/{num_epochs}')
    
    # Train
    train_loss = train_one_epoch(model, train_loader, criterion, opt, DEVICE)
    print(f'Train Loss: {train_loss:.4f}')
    
    # Validate
    val_loss, val_metrics = validate(model, val_loader, criterion, DEVICE)
    print(f'Val Loss: {val_loss:.4f}')
    print(f"Val F1: {val_metrics['f1']:.4f}, IoU: {val_metrics['iou']:.4f}")
    
    # 체크포인트 저장
    if epoch == 0 or val_metrics['f1'] > best_f1:
        best_f1 = val_metrics['f1']
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': opt.state_dict(),
            'best_f1': best_f1,
        }, checkpoint_dir / 'best_model.pth')
        print(f'Model saved! (F1: {best_f1:.4f})')