# CNN 지도학습

체스 데이터로 CNN 모델을 지도학습합니다.

이 모델은 이후 강화학습의 정책 신경망으로 사용됩니다.

## 모델 구조
- **입력**: (18, 8, 8) 체스 보드 상태
- **Policy Head**: 4096개 액션에 대한 확률 분포
- **Value Head**: 포지션 평가값 (-1 ~ 1)

In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import numpy as np
from pathlib import Path
from tqdm import tqdm

from parquet_dataset import (
    ParquetChessDataset,
    get_parquet_file_info, 
    split_files_by_ratio
)

# 재현성을 위한 설정
torch.manual_seed(42)
np.random.seed(42)

# CUDA 설정
if torch.cuda.is_available():
    torch.cuda.manual_seed(42)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False  # 재현성을 위해 False, 성능을 위해 True로 변경 가능
    device = torch.device("cuda")
    print(f"✅ CUDA 사용 가능!")
    print(f"   GPU: {torch.cuda.get_device_name(0)}")
    print(f"   CUDA 버전: {torch.version.cuda}")
    print(f"   GPU 메모리: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.2f} GB")
else:
    device = torch.device("cpu")
    print(f"⚠️  CUDA 사용 불가 - CPU 사용")

print(f"\n사용 장치: {device}")

⚠️  CUDA 사용 불가 - CPU 사용

사용 장치: cpu


## 모델 정의

In [3]:
class ChessCNN(nn.Module):
    """
    체스 CNN 모델
    
    Policy Head와 Value Head를 가진 구조로,
    지도학습 후 강화학습의 정책 신경망으로 사용됩니다.
    """
    
    def __init__(self, num_channels=256):
        super(ChessCNN, self).__init__()
        
        # 입력: (batch, 18, 8, 8)
        # 공통 CNN 백본
        self.conv_layers = nn.Sequential(
            # 첫 번째 블록
            nn.Conv2d(18, 64, 3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            
            # 두 번째 블록
            nn.Conv2d(64, 128, 3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            
            # 세 번째 블록
            nn.Conv2d(128, num_channels, 3, padding=1),
            nn.BatchNorm2d(num_channels),
            nn.ReLU(),
        )
        
        # Policy Head (4096개 액션)
        self.policy_head = nn.Sequential(
            nn.Conv2d(num_channels, 32, 1),  # 1x1 conv
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(32 * 8 * 8, 4096),  # 4096개 액션
        )
        
        # Value Head (1개 출력)
        self.value_head = nn.Sequential(
            nn.Conv2d(num_channels, 32, 1),  # 1x1 conv
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(32 * 8 * 8, 256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, 1),
            nn.Tanh()  # [-1, 1] 범위
        )
    
    def forward(self, x, mask=None):
        """
        Args:
            x: (batch, 18, 8, 8) 입력 텐서
            mask: (batch, 4096) 합법 수 마스크 (선택사항)
        
        Returns:
            policy_logits: (batch, 4096) 정책 로짓
            value: (batch, 1) 가치 예측
        """
        # 공통 백본
        features = self.conv_layers(x)
        
        # Policy Head
        policy_logits = self.policy_head(features)
        
        # Mask 적용 (불법 수 제거)
        if mask is not None:
            policy_logits = policy_logits * mask + (1 - mask) * (-1e9)
        
        # Value Head
        value = self.value_head(features)
        
        return policy_logits, value


# 모델 생성 및 확인
model = ChessCNN(num_channels=256).to(device)
print(f"모델 파라미터 수: {sum(p.numel() for p in model.parameters()):,}")

# 샘플 입력으로 shape 확인
sample_input = torch.randn(2, 18, 8, 8).to(device)
sample_mask = torch.ones(2, 4096).to(device)
policy_logits, value = model(sample_input, sample_mask)
print(f"\n입력 shape: {sample_input.shape}")
print(f"Policy logits shape: {policy_logits.shape}")
print(f"Value shape: {value.shape}")

모델 파라미터 수: 9,314,433

입력 shape: torch.Size([2, 18, 8, 8])
Policy logits shape: torch.Size([2, 4096])
Value shape: torch.Size([2, 1])


## 데이터셋 설정

먼저 데이터 크기를 확인합니다.

In [4]:
# 데이터 경로
PARQUET_DIR = "data/parquet"
TRAIN_RATIO = 0.9  # 학습/검증 분할 비율

# ============================================
# Shuffle Buffer 기반 스트리밍 데이터셋
# ============================================
# 파일을 순차적으로 읽으면서 Shuffle Buffer로 데이터를 섞습니다.
# - I/O 효율: 파일 순차 읽기로 10-100배 향상
# - 학습 품질: Shuffle Buffer로 데이터 다양성 확보

# 1. 파일 정보 수집 (메타데이터만 읽기 - 빠름)
parquet_files, file_lengths = get_parquet_file_info(PARQUET_DIR)

# 2. 파일 단위로 train/val 분할
train_files, val_files, train_samples, val_samples = split_files_by_ratio(
    parquet_files, 
    file_lengths, 
    train_ratio=TRAIN_RATIO,
    shuffle=False  # 순서 유지 (재현성)
)

# 3. 스트리밍 데이터셋 생성
print("\n학습 데이터셋 생성 중...")
train_dataset = ParquetChessDataset(
    parquet_files=train_files,
    buffer_size=100000,  # 10만 샘플 버퍼 (파일 2개 분량, ~1GB 메모리)
    shuffle_files=True,  # 에폭마다 파일 순서 셔플
    seed=42
)

print("\n검증 데이터셋 생성 중...")
val_dataset = ParquetChessDataset(
    parquet_files=val_files,
    shuffle=False,  # 검증용이므로 셔플 없음
    shuffle_files=False,  # 파일 순서 유지
    seed=42
)

print(f"\n최종 데이터셋 크기:")
print(f"  학습 데이터: {train_dataset.estimated_length:,} (스트리밍)")
print(f"  검증 데이터: {val_dataset.estimated_length:,} (스트리밍)")

# DataLoader 생성
BATCH_SIZE = 100
NUM_WORKERS = 0  # Windows에서는 0 권장, Linux/Mac에서는 4-8 권장

# CUDA 사용 시 배치 크기 조정 가능 (GPU 메모리에 따라)
if torch.cuda.is_available():
    # GPU 메모리가 충분하면 배치 크기 증가 가능
    # BATCH_SIZE = 256  # GPU 메모리 충분시 증가 가능
    print(f"CUDA 사용 중 - 배치 크기: {BATCH_SIZE}")

# 학습 DataLoader (IterableDataset이므로 shuffle=False)
# 셔플은 ParquetChessDataset 내부에서 처리됨
train_loader = DataLoader(
    train_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,  # IterableDataset은 shuffle=False 필수
    num_workers=NUM_WORKERS,
    pin_memory=torch.cuda.is_available(),
)

val_loader = DataLoader(
    val_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=NUM_WORKERS,
    pin_memory=torch.cuda.is_available(),
)

# 배치 확인 (빠르게 로드됨!)
print("\n첫 배치 로딩 중...")
for states, policies, masks, values in train_loader:
    print(f"\n배치 shape 확인:")
    print(f"  States: {states.shape}")
    print(f"  Policies: {policies.shape}")
    print(f"  Masks: {masks.shape}")
    print(f"  Values: {values.shape}")
    break

print("\n데이터 로딩 준비 완료!")

Parquet 파일 정보 수집 중... (449개 파일)


파일 스캔: 100%|██████████| 449/449 [00:00<00:00, 2998.94it/s]

총 파일 수: 449, 총 샘플 수: 22,450,000

파일 단위 분할 완료:
  Train: 405개 파일, 20,250,000개 샘플 (90.2%)
  Val: 44개 파일, 2,200,000개 샘플 (9.8%)

학습 데이터셋 생성 중...
Parquet 파일 정보 수집 중... (405개 파일)





총 샘플 수: 20,250,000
모드: Shuffle Buffer (100,000 샘플, 0.49%)

검증 데이터셋 생성 중...
Parquet 파일 정보 수집 중... (44개 파일)
총 샘플 수: 2,200,000
모드: 순차 읽기 (검증용)

최종 데이터셋 크기:
  학습 데이터: 20,250,000 (스트리밍)
  검증 데이터: 2,200,000 (스트리밍)

첫 배치 로딩 중...

배치 shape 확인:
  States: torch.Size([100, 18, 8, 8])
  Policies: torch.Size([100])
  Masks: torch.Size([100, 4096])
  Values: torch.Size([100])

데이터 로딩 준비 완료!


## 학습 설정

In [5]:
# 학습 하이퍼파라미터
LEARNING_RATE = 0.001
NUM_EPOCHS = 10
POLICY_WEIGHT = 1.0  # Policy loss 가중치
VALUE_WEIGHT = 1.0   # Value loss 가중치

# Loss 함수
policy_loss_fn = nn.CrossEntropyLoss()
value_loss_fn = nn.MSELoss()

# Optimizer
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

# Learning Rate Scheduler
scheduler = optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode='min', factor=0.5, patience=2, verbose=True
)

# 모델 저장 경로
MODEL_DIR = Path("models")
MODEL_DIR.mkdir(exist_ok=True)
BEST_MODEL_PATH = MODEL_DIR / "best_chess_cnn.pth"
LAST_MODEL_PATH = MODEL_DIR / "last_chess_cnn.pth"

print(f"학습 설정:")
print(f"  Learning Rate: {LEARNING_RATE}")
print(f"  Epochs: {NUM_EPOCHS}")
print(f"  Batch Size: {BATCH_SIZE}")
print(f"  Policy Weight: {POLICY_WEIGHT}")
print(f"  Value Weight: {VALUE_WEIGHT}")

TypeError: ReduceLROnPlateau.__init__() got an unexpected keyword argument 'verbose'

## 학습 함수

In [None]:
def train_epoch(model, dataloader, optimizer, policy_loss_fn, value_loss_fn, 
                policy_weight, value_weight, device):
    """한 에폭 학습"""
    model.train()
    total_policy_loss = 0.0
    total_value_loss = 0.0
    total_loss = 0.0
    num_batches = 0
    
    # CUDA 사용 시 비동기 전송
    non_blocking = torch.cuda.is_available()
    
    pbar = tqdm(dataloader, desc="학습 중")
    for states, policies, masks, values in pbar:
        # CUDA 사용 시 비동기 전송으로 속도 향상
        states = states.to(device, non_blocking=non_blocking)
        policies = policies.to(device, non_blocking=non_blocking)
        masks = masks.to(device, non_blocking=non_blocking)
        values = values.to(device, non_blocking=non_blocking)
        
        # Forward
        policy_logits, value_pred = model(states, masks)
        
        # Policy Loss
        policy_loss = policy_loss_fn(policy_logits, policies)
        
        # Value Loss
        value_loss = value_loss_fn(value_pred.squeeze(), values)
        
        # Total Loss
        loss = policy_weight * policy_loss + value_weight * value_loss
        
        # Backward
        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)  # Gradient clipping
        optimizer.step()
        
        # 통계
        total_policy_loss += policy_loss.item()
        total_value_loss += value_loss.item()
        total_loss += loss.item()
        num_batches += 1
        
        # 진행 상황 업데이트
        pbar.set_postfix({
            'loss': f'{loss.item():.4f}',
            'policy': f'{policy_loss.item():.4f}',
            'value': f'{value_loss.item():.4f}'
        })
    
    return {
        'policy_loss': total_policy_loss / num_batches,
        'value_loss': total_value_loss / num_batches,
        'total_loss': total_loss / num_batches
    }


def validate(model, dataloader, policy_loss_fn, value_loss_fn, 
             policy_weight, value_weight, device):
    """검증"""
    model.eval()
    total_policy_loss = 0.0
    total_value_loss = 0.0
    total_loss = 0.0
    num_batches = 0
    
    correct_predictions = 0
    total_predictions = 0
    
    # CUDA 사용 시 비동기 전송
    non_blocking = torch.cuda.is_available()
    
    with torch.no_grad():
        for states, policies, masks, values in tqdm(dataloader, desc="검증 중"):
            # CUDA 사용 시 비동기 전송으로 속도 향상
            states = states.to(device, non_blocking=non_blocking)
            policies = policies.to(device, non_blocking=non_blocking)
            masks = masks.to(device, non_blocking=non_blocking)
            values = values.to(device, non_blocking=non_blocking)
            
            # Forward
            policy_logits, value_pred = model(states, masks)
            
            # Policy Loss
            policy_loss = policy_loss_fn(policy_logits, policies)
            
            # Value Loss
            value_loss = value_loss_fn(value_pred.squeeze(), values)
            
            # Total Loss
            loss = policy_weight * policy_loss + value_weight * value_loss
            
            # 정확도 계산
            pred_policies = policy_logits.argmax(dim=1)
            correct_predictions += (pred_policies == policies).sum().item()
            total_predictions += policies.size(0)
            
            # 통계
            total_policy_loss += policy_loss.item()
            total_value_loss += value_loss.item()
            total_loss += loss.item()
            num_batches += 1
    
    accuracy = correct_predictions / total_predictions if total_predictions > 0 else 0.0
    
    return {
        'policy_loss': total_policy_loss / num_batches,
        'value_loss': total_value_loss / num_batches,
        'total_loss': total_loss / num_batches,
        'accuracy': accuracy
    }

## 학습 실행

In [None]:
# 학습 히스토리
train_history = {
    'policy_loss': [],
    'value_loss': [],
    'total_loss': []
}

val_history = {
    'policy_loss': [],
    'value_loss': [],
    'total_loss': [],
    'accuracy': []
}

best_val_loss = float('inf')

print("=" * 60)
print("학습 시작")
print("=" * 60)

for epoch in range(1, NUM_EPOCHS + 1):
    print(f"\nEpoch {epoch}/{NUM_EPOCHS}")
    print("-" * 60)
    
    # 에폭 설정 (파일 순서 셔플에 사용)
    train_dataset.set_epoch(epoch)
    
    # 학습
    train_metrics = train_epoch(
        model, train_loader, optimizer, policy_loss_fn, value_loss_fn,
        POLICY_WEIGHT, VALUE_WEIGHT, device
    )
    
    # 검증
    val_metrics = validate(
        model, val_loader, policy_loss_fn, value_loss_fn,
        POLICY_WEIGHT, VALUE_WEIGHT, device
    )
    
    # 히스토리 저장
    train_history['policy_loss'].append(train_metrics['policy_loss'])
    train_history['value_loss'].append(train_metrics['value_loss'])
    train_history['total_loss'].append(train_metrics['total_loss'])
    
    val_history['policy_loss'].append(val_metrics['policy_loss'])
    val_history['value_loss'].append(val_metrics['value_loss'])
    val_history['total_loss'].append(val_metrics['total_loss'])
    val_history['accuracy'].append(val_metrics['accuracy'])
    
    # 결과 출력
    print(f"\n학습 결과:")
    print(f"  Policy Loss: {train_metrics['policy_loss']:.4f}")
    print(f"  Value Loss: {train_metrics['value_loss']:.4f}")
    print(f"  Total Loss: {train_metrics['total_loss']:.4f}")
    
    print(f"\n검증 결과:")
    print(f"  Policy Loss: {val_metrics['policy_loss']:.4f}")
    print(f"  Value Loss: {val_metrics['value_loss']:.4f}")
    print(f"  Total Loss: {val_metrics['total_loss']:.4f}")
    print(f"  Accuracy: {val_metrics['accuracy']:.4f} ({val_metrics['accuracy']*100:.2f}%)")
    
    # Learning Rate Scheduler
    scheduler.step(val_metrics['total_loss'])
    
    # 최고 모델 저장
    if val_metrics['total_loss'] < best_val_loss:
        best_val_loss = val_metrics['total_loss']
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'val_loss': val_metrics['total_loss'],
            'val_accuracy': val_metrics['accuracy'],
        }, BEST_MODEL_PATH)
        print(f"\n✅ 최고 모델 저장! (Loss: {best_val_loss:.4f})")
    
    # 마지막 모델 저장
    torch.save({
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'val_loss': val_metrics['total_loss'],
        'val_accuracy': val_metrics['accuracy'],
    }, LAST_MODEL_PATH)

print("\n" + "=" * 60)
print("학습 완료!")
print("=" * 60)
print(f"최고 검증 Loss: {best_val_loss:.4f}")
print(f"최고 모델: {BEST_MODEL_PATH}")

## 학습 곡선 시각화

In [None]:
# Loss 곡선
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# Total Loss
axes[0, 0].plot(train_history['total_loss'], label='Train', marker='o')
axes[0, 0].plot(val_history['total_loss'], label='Validation', marker='s')
axes[0, 0].set_title('Total Loss')
axes[0, 0].set_xlabel('Epoch')
axes[0, 0].set_ylabel('Loss')
axes[0, 0].legend()
axes[0, 0].grid(True)

# Policy Loss
axes[0, 1].plot(train_history['policy_loss'], label='Train', marker='o')
axes[0, 1].plot(val_history['policy_loss'], label='Validation', marker='s')
axes[0, 1].set_title('Policy Loss')
axes[0, 1].set_xlabel('Epoch')
axes[0, 1].set_ylabel('Loss')
axes[0, 1].legend()
axes[0, 1].grid(True)

# Value Loss
axes[1, 0].plot(train_history['value_loss'], label='Train', marker='o')
axes[1, 0].plot(val_history['value_loss'], label='Validation', marker='s')
axes[1, 0].set_title('Value Loss')
axes[1, 0].set_xlabel('Epoch')
axes[1, 0].set_ylabel('Loss')
axes[1, 0].legend()
axes[1, 0].grid(True)

# Accuracy
axes[1, 1].plot(val_history['accuracy'], label='Validation', marker='s', color='green')
axes[1, 1].set_title('Policy Accuracy')
axes[1, 1].set_xlabel('Epoch')
axes[1, 1].set_ylabel('Accuracy')
axes[1, 1].legend()
axes[1, 1].grid(True)

plt.tight_layout()
plt.savefig(MODEL_DIR / 'training_curves.png', dpi=150)
plt.show()

print("학습 곡선 저장 완료:", MODEL_DIR / 'training_curves.png')

## 모델 평가

In [None]:
# 최고 모델 로드
checkpoint = torch.load(BEST_MODEL_PATH)
model.load_state_dict(checkpoint['model_state_dict'])
model.eval()

print(f"로드된 모델 정보:")
print(f"  Epoch: {checkpoint['epoch']}")
print(f"  Validation Loss: {checkpoint['val_loss']:.4f}")
print(f"  Validation Accuracy: {checkpoint['val_accuracy']:.4f} ({checkpoint['val_accuracy']*100:.2f}%)")

# 최종 검증
final_metrics = validate(
    model, val_loader, policy_loss_fn, value_loss_fn,
    POLICY_WEIGHT, VALUE_WEIGHT, device
)

print(f"\n최종 검증 결과:")
print(f"  Policy Loss: {final_metrics['policy_loss']:.4f}")
print(f"  Value Loss: {final_metrics['value_loss']:.4f}")
print(f"  Total Loss: {final_metrics['total_loss']:.4f}")
print(f"  Accuracy: {final_metrics['accuracy']:.4f} ({final_metrics['accuracy']*100:.2f}%)")

## 샘플 예측 확인

In [None]:
# 몇 개 샘플로 예측 확인
model.eval()
non_blocking = torch.cuda.is_available()

with torch.no_grad():
    for states, policies, masks, values in val_loader:
        states = states.to(device, non_blocking=non_blocking)
        masks = masks.to(device, non_blocking=non_blocking)
        
        policy_logits, value_pred = model(states, masks)
        policy_probs = torch.softmax(policy_logits, dim=1)
        pred_policies = policy_logits.argmax(dim=1)
        
        print("샘플 예측:")
        for i in range(min(5, len(states))):
            print(f"\n샘플 {i+1}:")
            print(f"  실제 Policy: {policies[i].item()}")
            print(f"  예측 Policy: {pred_policies[i].item()}")
            print(f"  정확도: {'✅' if pred_policies[i] == policies[i] else '❌'}")
            print(f"  실제 Value: {values[i].item():.4f}")
            print(f"  예측 Value: {value_pred[i].item():.4f}")
            print(f"  예측 확률 (top-1): {policy_probs[i].max().item():.4f}")
        
        break

## 모델 저장 (강화학습용)

모델이 강화학습에서 사용할 수 있도록 저장합니다.

In [None]:
# 강화학습용 모델 저장 (state_dict만)
RL_MODEL_PATH = MODEL_DIR / "chess_cnn_rl.pth"
torch.save(model.state_dict(), RL_MODEL_PATH)

print(f"강화학습용 모델 저장 완료: {RL_MODEL_PATH}")
print(f"\n강화학습에서 사용 방법:")
print(f"  model = ChessCNN(num_channels=256)")
print(f"  model.load_state_dict(torch.load('{RL_MODEL_PATH}'))")
print(f"  model.eval()")