# California Housing Regression (PyTorch)
캘리포니아 주택 가격 데이터셋으로 회귀 모델을 학습한다. 데이터 누수 방지, 재현성, 학습 곡선 시각화, 다중 지표 평가를 포함한다.

## 개요
- **데이터**: `sklearn.datasets.fetch_california_housing`
- **입력 특징**(8개): MedInc, HouseAge, AveRooms, AveBedrms, Population, AveOccup, Latitude, Longitude
- **목표**: MedHouseVal (단위: 10만 달러)
- **주의**: 스케일러는 **훈련 세트에만** 적합한다. 이후 검증/테스트 세트에는 변환만 적용해 누수를 막는다.

In [None]:
# 1) 기본 설정
import math
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import r2_score, mean_absolute_error
import matplotlib.pyplot as plt

def set_seed(seed: int = 42):
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
set_seed(42)
print(f"사용 디바이스: {DEVICE}")
DEVICE

## 데이터 로드와 전처리
- 훈련/검증 분할 후 **훈련 세트 기준으로만** `StandardScaler.fit` 실행.
- 텐서 변환 시 `float32` 유지.

In [None]:
housing = fetch_california_housing(as_frame=True)
X = housing.data.values.astype(np.float32)
y = housing.target.values.astype(np.float32).reshape(-1, 1)
feature_names = list(housing.feature_names)

print(f"데이터셋 크기: {X.shape}")
print(f"특성 이름: {feature_names}")
print(f"타겟 범위: {y.min():.2f} ~ {y.max():.2f}")

X_train, X_val, y_train, y_val = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# 데이터 누수 방지: 훈련 세트에만 fit, 검증 세트에는 transform만 적용
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train).astype(np.float32)
X_val   = scaler.transform(X_val).astype(np.float32)

# PyTorch 텐서로 변환
X_train_t = torch.tensor(X_train, dtype=torch.float32)
y_train_t = torch.tensor(y_train, dtype=torch.float32)
X_val_t   = torch.tensor(X_val,   dtype=torch.float32)
y_val_t   = torch.tensor(y_val,   dtype=torch.float32)

# 데이터셋과 데이터로더 생성
train_ds = TensorDataset(X_train_t, y_train_t)
val_ds   = TensorDataset(X_val_t,   y_val_t)

BATCH_SIZE = 64
train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True)
val_loader   = DataLoader(val_ds,   batch_size=BATCH_SIZE, shuffle=False)

print(f"\n훈련 데이터: {X_train_t.shape}")
print(f"검증 데이터: {X_val_t.shape}")
print(f"특성 개수: {len(feature_names)}")
print(f"배치 개수 - 훈련: {len(train_loader)}, 검증: {len(val_loader)}")

## 모델 정의
- 선형층 + ReLU + 드롭아웃.
- He 초기화(Kaiming)로 학습 안정화.

In [None]:
class HousingRegressor(nn.Module):
    def __init__(self, input_size=8, hidden_sizes=(128, 64, 32), p_drop=0.1):
        super().__init__()
        layers = []
        prev = input_size
        
        # 은닉층 구성
        for h in hidden_sizes:
            layers += [nn.Linear(prev, h), nn.ReLU(), nn.Dropout(p_drop)]
            prev = h
        
        # 출력층
        layers += [nn.Linear(prev, 1)]
        self.net = nn.Sequential(*layers)
        
        # 가중치 초기화
        self.apply(self._init_weights)

    @staticmethod
    def _init_weights(m):
        if isinstance(m, nn.Linear):
            nn.init.kaiming_uniform_(m.weight, nonlinearity='relu')
            if m.bias is not None:
                nn.init.zeros_(m.bias)

    def forward(self, x):
        return self.net(x)

# 모델 생성
model = HousingRegressor(input_size=X_train_t.shape[1]).to(DEVICE)
print(f"모델 구조:\n{model}")

# 모델 파라미터 수 계산
total_params = sum(p.numel() for p in model.parameters())
print(f"\n총 파라미터 수: {total_params:,}")

## 학습 루프
- 손실: MSE
- 옵티마이저: Adam + weight decay(간단한 L2)
- 스케줄러: `ReduceLROnPlateau`
- 얼리 스탑: `patience` 에폭 동안 검증 손실 개선 없으면 중단.

In [None]:
# 하이퍼파라미터 설정
EPOCHS = 200
LR = 1e-3
WEIGHT_DECAY = 1e-4
PATIENCE = 20

# 손실함수, 옵티마이저, 스케줄러 설정
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=LR, weight_decay=WEIGHT_DECAY)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5, verbose=True)

def run_epoch(dataloader, train: bool):
    """한 에포크 실행"""
    if train:
        model.train()
    else:
        model.eval()
    
    epoch_loss = 0.0
    total = 0
    
    with torch.set_grad_enabled(train):
        for xb, yb in dataloader:
            xb = xb.to(DEVICE)
            yb = yb.to(DEVICE)
            
            if train:
                optimizer.zero_grad()
            
            preds = model(xb)
            loss = criterion(preds, yb)
            
            if train:
                loss.backward()
                optimizer.step()
            
            epoch_loss += loss.item() * xb.size(0)
            total += xb.size(0)
    
    return epoch_loss / total

print("설정 완료!")
print(f"에포크: {EPOCHS}, 학습률: {LR}, Weight Decay: {WEIGHT_DECAY}, Patience: {PATIENCE}")

In [None]:
# 학습 실행
train_losses = []
val_losses = []
best_val_loss = float('inf')
patience_counter = 0

for epoch in range(EPOCHS):
    # 훈련
    train_loss = run_epoch(train_loader, train=True)
    train_losses.append(train_loss)
    
    # 검증
    val_loss = run_epoch(val_loader, train=False)
    val_losses.append(val_loss)
    
    # 스케줄러 업데이트
    scheduler.step(val_loss)
    
    # 얼리 스토핑 체크
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        patience_counter = 0
        # 최적 모델 저장
        torch.save(model.state_dict(), 'best_housing_model.pth')
    else:
        patience_counter += 1
    
    # 10 에포크마다 진행상황 출력
    if (epoch + 1) % 10 == 0:
        current_lr = optimizer.param_groups[0]['lr']
        print(f'에포크 [{epoch+1}/{EPOCHS}] - '
              f'훈련 손실: {train_loss:.6f}, '
              f'검증 손실: {val_loss:.6f}, '
              f'학습률: {current_lr:.2e}, '
              f'Patience: {patience_counter}/{PATIENCE}')
    
    # 얼리 스토핑
    if patience_counter >= PATIENCE:
        print(f"\n얼리 스토핑! 에포크 {epoch+1}에서 훈련 중단")
        break

print(f"\n훈련 완료! 최적 검증 손실: {best_val_loss:.6f}")

## 학습 곡선 시각화

In [None]:
# 학습 곡선 플롯
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(train_losses, label='훈련 손실', alpha=0.8)
plt.plot(val_losses, label='검증 손실', alpha=0.8)
plt.xlabel('에포크')
plt.ylabel('MSE 손실')
plt.title('학습 곡선')
plt.legend()
plt.grid(True, alpha=0.3)

plt.subplot(1, 2, 2)
plt.plot(np.log10(train_losses), label='훈련 손실 (log)', alpha=0.8)
plt.plot(np.log10(val_losses), label='검증 손실 (log)', alpha=0.8)
plt.xlabel('에포크')
plt.ylabel('log10(MSE 손실)')
plt.title('학습 곡선 (로그 스케일)')
plt.legend()
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print(f"최종 훈련 손실: {train_losses[-1]:.6f}")
print(f"최종 검증 손실: {val_losses[-1]:.6f}")
print(f"최적 검증 손실: {best_val_loss:.6f}")

## 모델 평가
다양한 지표로 모델 성능을 평가합니다.

In [None]:
# 최적 모델 로드
model.load_state_dict(torch.load('best_housing_model.pth'))
model.eval()

# 예측 수행
with torch.no_grad():
    # 훈련 세트 예측
    train_preds = model(X_train_t.to(DEVICE)).cpu().numpy()
    # 검증 세트 예측
    val_preds = model(X_val_t.to(DEVICE)).cpu().numpy()

# 평가 지표 계산
def calculate_metrics(y_true, y_pred, dataset_name):
    mse = np.mean((y_true - y_pred) ** 2)
    rmse = np.sqrt(mse)
    mae = mean_absolute_error(y_true, y_pred)
    r2 = r2_score(y_true, y_pred)
    
    print(f"\n=== {dataset_name} 세트 평가 ===")
    print(f"MSE:  {mse:.6f}")
    print(f"RMSE: {rmse:.6f}")
    print(f"MAE:  {mae:.6f}")
    print(f"R²:   {r2:.6f}")
    
    return mse, rmse, mae, r2

# 훈련 세트 평가
train_metrics = calculate_metrics(y_train, train_preds, "훈련")

# 검증 세트 평가
val_metrics = calculate_metrics(y_val, val_preds, "검증")

## 예측 결과 시각화

In [None]:
# 예측 vs 실제 값 시각화
plt.figure(figsize=(15, 5))

# 훈련 세트
plt.subplot(1, 3, 1)
plt.scatter(y_train, train_preds, alpha=0.5, s=1)
plt.plot([y_train.min(), y_train.max()], [y_train.min(), y_train.max()], 'r--', lw=2)
plt.xlabel('실제 값')
plt.ylabel('예측 값')
plt.title(f'훈련 세트 (R² = {train_metrics[3]:.4f})')
plt.grid(True, alpha=0.3)

# 검증 세트
plt.subplot(1, 3, 2)
plt.scatter(y_val, val_preds, alpha=0.5, s=1)
plt.plot([y_val.min(), y_val.max()], [y_val.min(), y_val.max()], 'r--', lw=2)
plt.xlabel('실제 값')
plt.ylabel('예측 값')
plt.title(f'검증 세트 (R² = {val_metrics[3]:.4f})')
plt.grid(True, alpha=0.3)

# 잔차 플롯 (검증 세트)
plt.subplot(1, 3, 3)
residuals = y_val - val_preds
plt.scatter(val_preds, residuals, alpha=0.5, s=1)
plt.axhline(y=0, color='r', linestyle='--')
plt.xlabel('예측 값')
plt.ylabel('잔차 (실제 - 예측)')
plt.title('잔차 플롯 (검증 세트)')
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## 특성 중요도 분석 (선택사항)
간단한 순열 중요도로 특성의 중요도를 추정해봅니다.

In [None]:
# 순열 중요도 계산 (간단한 버전)
def permutation_importance(model, X, y, feature_names, n_repeats=5):
    model.eval()
    X_tensor = torch.tensor(X, dtype=torch.float32).to(DEVICE)
    
    # 기준 점수 계산
    with torch.no_grad():
        baseline_preds = model(X_tensor).cpu().numpy()
    baseline_score = r2_score(y, baseline_preds)
    
    importances = []
    
    for i, feature_name in enumerate(feature_names):
        scores = []
        
        for _ in range(n_repeats):
            # 특성 순열
            X_perm = X.copy()
            np.random.shuffle(X_perm[:, i])
            X_perm_tensor = torch.tensor(X_perm, dtype=torch.float32).to(DEVICE)
            
            # 순열된 특성으로 예측
            with torch.no_grad():
                perm_preds = model(X_perm_tensor).cpu().numpy()
            perm_score = r2_score(y, perm_preds)
            
            # 중요도 = 기준 점수 - 순열 점수
            scores.append(baseline_score - perm_score)
        
        importances.append(np.mean(scores))
    
    return np.array(importances)

# 검증 세트에서 특성 중요도 계산
print("특성 중요도 계산 중... (시간이 조금 걸릴 수 있습니다)")
importance_scores = permutation_importance(model, X_val, y_val, feature_names)

# 중요도 시각화
plt.figure(figsize=(10, 6))
sorted_idx = np.argsort(importance_scores)[::-1]
plt.bar(range(len(feature_names)), importance_scores[sorted_idx])
plt.xticks(range(len(feature_names)), [feature_names[i] for i in sorted_idx], rotation=45)
plt.xlabel('특성')
plt.ylabel('순열 중요도')
plt.title('특성 중요도 (순열 중요도 기반)')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

# 중요도 출력
print("\n특성 중요도 순위:")
for i, idx in enumerate(sorted_idx):
    print(f"{i+1:2d}. {feature_names[idx]:12s}: {importance_scores[idx]:.6f}")

## 결과 요약

### 모델 성능
- **아키텍처**: 8 → 128 → 64 → 32 → 1 (ReLU + Dropout)
- **정규화**: Weight Decay + Dropout
- **최적화**: Adam + ReduceLROnPlateau
- **얼리 스토핑**: 검증 손실 기준

### 데이터 누수 방지
- StandardScaler는 훈련 세트에만 fit
- 검증 세트에는 transform만 적용
- 재현성을 위한 시드 고정

### 평가 지표
- MSE, RMSE, MAE, R² 점수로 다각도 평가
- 훈련/검증 성능 비교로 과적합 확인
- 잔차 분석으로 모델 편향 확인