# 파이토치를 활용한 회귀 모델 및 성능향상

> California Housing

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

plt.rcParams['font.family'] = 'Malgun Gothic'
plt.rcParams['axes.unicode_minus'] = False
plt.figure(figsize=(10, 6))

In [None]:
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV, RandomizedSearchCV, KFold
from sklearn.preprocessing import StandardScaler, OneHotEncoder, PolynomialFeatures
from sklearn.impute import SimpleImputer

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, TensorDataset

# PyTorch 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

In [None]:
from pathlib import Path
DATA_PATH = Path("data/housing.csv")
df = pd.read_csv(DATA_PATH)
# print(df.describe())
# print(df.isnull().sum())

In [None]:
corr = df.corr(numeric_only=True)
target_corr = corr["median_house_value"].sort_values(ascending=False)
# print("타겟과의 상관관계:")
# print(target_corr)

In [None]:
# 시각화
fig, axes = plt.subplots(1, 4, figsize=(20, 5))

## 중위 소득 분포
axes[0].hist(df["median_income"], bins=50)
axes[0].set_title("Distribution of Median Income")
axes[0].set_xlabel("Median Income")
axes[0].set_ylabel("Count")

## 중위 주택 가치 분포
axes[1].hist(df["median_house_value"], bins=50)
axes[1].set_title("Distribution of Median House Value")
axes[1].set_xlabel("Median House Value")
axes[1].set_ylabel("Count")

## 중위 소득 vs 중위 주택 가치
axes[2].scatter(df["median_income"], df["median_house_value"], alpha=0.2)
axes[2].set_title("Median Income vs Median House Value")
axes[2].set_xlabel("Median Income")
axes[2].set_ylabel("Median House Value")

## 지리적 분포 (주택 가치)
sc = axes[3].scatter(df["longitude"], df["latitude"], c=df["median_house_value"],
                     s=(df["population"]/100).clip(lower=1), alpha=0.4)
axes[3].set_title("Geographical Distribution of House Values")
axes[3].set_xlabel("Longitude")
axes[3].set_ylabel("Latitude")

# 컬러바 추가
cbar = plt.colorbar(sc, ax=axes[3])
cbar.set_label("Median House Value")

plt.tight_layout()
plt.show()

In [None]:
# %%
X = df.drop("median_house_value", axis=1)
y = df["median_house_value"]

num_features = X.select_dtypes(include=["float64", "int64", "float32", "int32"]).columns.tolist()
cat_features = ["ocean_proximity"]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [None]:
# 기본 전처리 (파이프라인 없이)
print("데이터 전처리 중...")

# 숫자형 피처 전처리
imputer = SimpleImputer(strategy="median")
X_train_num = imputer.fit_transform(X_train[num_features])
X_test_num = imputer.transform(X_test[num_features])

scaler = StandardScaler()
X_train_num_scaled = scaler.fit_transform(X_train_num)
X_test_num_scaled = scaler.transform(X_test_num)

# 범주형 피처 전처리
encoder = OneHotEncoder()
X_train_cat = encoder.fit_transform(X_train[cat_features]).toarray()
X_test_cat = encoder.transform(X_test[cat_features]).toarray()

# 전처리된 데이터 결합
X_train_processed = np.hstack([X_train_num_scaled, X_train_cat])
X_test_processed = np.hstack([X_test_num_scaled, X_test_cat])

# 타겟값도 스케일링을 추가(매우 중요합니다!)
y_scaler = StandardScaler()
y_train_scaled = y_scaler.fit_transform(y_train.values.reshape(-1, 1)).flatten()
y_test_scaled = y_scaler.transform(y_test.values.reshape(-1, 1)).flatten()

# Numpy -> Tensor로 변경해야 됨
X_train_tensor = torch.FloatTensor(X_train_processed)
X_test_tensor = torch.FloatTensor(X_test_processed)
y_train_tensor = torch.FloatTensor(y_train_scaled)
y_test_tensor = torch.FloatTensor(y_test_scaled)

# Numpy -> Tensor 변환
X_train_tensor = torch.FloatTensor(X_train_processed).to(device)
X_test_tensor = torch.FloatTensor(X_test_processed).to(device)
y_train_tensor = torch.FloatTensor(y_train_scaled).to(device)
y_test_tensor = torch.FloatTensor(y_test_scaled).to(device)

print(f"X_train_tensor 형태: {X_train_tensor.shape}")
print(f"y_train_tensor 형태: {y_train_tensor.shape}")

In [None]:
# PyTorch 모델 정의
class LinearRegressionModel(nn.Module):
    def __init__(self, input_dim):
        super().__init__()
        self.linear = nn.Linear(input_dim, 1)

    def forward(self, x):
        return self.linear(x).squeeze()

In [None]:
# 훈련 함수
def train_model(
    model, X_train, y_train, X_val, y_val, epochs=1000, lr=0.001, patience=50
):
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)

    train_losses = []
    val_losses = []

    best_val_loss = float("inf")
    patience_counter = 0

    for epoch in range(epochs):
        # 훈련 모드
        model.train()
        optimizer.zero_grad()

        train_pred = model(X_train)
        train_loss = criterion(train_pred, y_train)

        train_loss.backward()
        optimizer.step()

        # 검증 모드
        model.eval()
        with torch.no_grad():
            val_pred = model(X_val)
            val_loss = criterion(val_pred, y_val)

        train_losses.append(train_loss.item())
        val_losses.append(val_loss.item())

        # Early stopping
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            patience_counter = 0
        else:
            patience_counter += 1

        if patience_counter >= patience:
            print(f"Early stopping at epoch {epoch}")
            break

        # 진행상황 출력
        if epoch % 100 == 0:
            print(
                f"Epoch {epoch}: Train Loss = {train_loss.item():.4f}, Val Loss = {val_loss.item():.4f}"
            )

    return train_losses, val_losses

In [None]:
# 훈련/검증 데이터 분할
n_train = int(0.8 * len(X_train_tensor))
indices = torch.randperm(len(X_train_tensor))
train_indices = indices[:n_train]
val_indices = indices[n_train:]

X_sub_train = X_train_tensor[train_indices]
y_sub_train = y_train_tensor[train_indices]
X_sub_val = X_train_tensor[val_indices]
y_sub_val = y_train_tensor[val_indices]

print(f"훈련 데이터: {X_sub_train.shape}")
print(f"검증 데이터: {X_sub_val.shape}")

In [None]:
# 모델 훈련
print("모델 훈련 시작...")
train_losses, val_losses = train_model(
    model, X_sub_train, y_sub_train, X_sub_val, y_sub_val
)

In [None]:
# 테스트 성능 평가
model.eval()
with torch.no_grad():
    test_pred = model(X_test_tensor)
    test_loss = nn.MSELoss()(test_pred, y_test_tensor)

    # 원래 스케일로 변환
    test_pred_original = y_scaler.inverse_transform(
        test_pred.cpu().numpy().reshape(-1, 1)
    ).flatten()
    y_test_original = y_test.values

    from sklearn.metrics import r2_score, mean_squared_error

    r2 = r2_score(y_test_original, test_pred_original)
    mse = mean_squared_error(y_test_original, test_pred_original)
    rmse = np.sqrt(mse)

print(f"\n=== 최종 성능 ===")
print(f"Test R^2 Score: {r2:.4f}")
print(f"Test RMSE: {rmse:.2f}")
print(f"Test MSE: {mse:.2f}")

In [None]:
# 손실 그래프 그리기
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(train_losses, label="Train Loss")
plt.plot(val_losses, label="Validation Loss")
plt.title("Training and Validation Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()
plt.grid(True)

plt.subplot(1, 2, 2)
plt.scatter(y_test_original, test_pred_original, alpha=0.5)
plt.plot(
    [y_test_original.min(), y_test_original.max()],
    [y_test_original.min(), y_test_original.max()],
    "r--",
    lw=2,
)
plt.xlabel("Actual Values")
plt.ylabel("Predicted Values")
plt.title(f"Actual vs Predicted ($R^2$ = {r2:.3f})")
plt.grid(True)

plt.tight_layout()
plt.show()