# 🔥 지역난방 열수요 예측: 시즌별-브랜치별 XGBoost 모델

## 📋 모델링 전략
- **시즌 분할**: Heating Season vs Non-Heating Season
- **브랜치별 개별 모델**: 각 branch_id마다 전용 모델
- **XGBoost**: 모든 모델에 XGBoost 사용
- **하이퍼파라미터 최적화**: Optuna TPE 사용
- **총 모델 수**: 38개 (2시즌 × 19브랜치)

In [1]:
# Google Colab 환경 확인 및 패키지 설치
import sys
IN_COLAB = 'google.colab' in sys.modules

if IN_COLAB:
    print("🔥 Google Colab 환경에서 실행 중...")
    !pip install xgboost optuna
    from google.colab import files, drive
    print("✅ 패키지 설치 완료!")
else:
    print("💻 로컬 환경에서 실행 중...")

💻 로컬 환경에서 실행 중...


In [2]:
# 라이브러리 import
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')
from tqdm.auto import tqdm
import pickle
import json

# 머신러닝
from sklearn.preprocessing import StandardScaler, MinMaxScaler, LabelEncoder
from sklearn.metrics import mean_squared_error, mean_absolute_error
from sklearn.model_selection import train_test_split, TimeSeriesSplit
import xgboost as xgb

# Optuna
import optuna
from optuna.integration import XGBoostPruningCallback
from optuna.samplers import TPESampler

plt.rcParams['figure.figsize'] = (12, 6)
print("📚 라이브러리 로드 완료!")

📚 라이브러리 로드 완료!


In [3]:
# # 데이터 분할: 2021-2022년 (훈련용) vs 2023년 (테스트용)
# print("📊 데이터 분할 시작...")

# # train_heat.csv 파일 읽기
# train_heat = pd.read_csv('train_heat.csv')
# print(f"원본 데이터 크기: {train_heat.shape}")

# # 데이터 확인
# print("\n📋 데이터 구조 확인:")
# print(train_heat.columns.tolist())
# print("\n첫 5행:")
# print(train_heat.head())

# # 임시 datetime 컬럼 생성 (원본 tm 컬럼은 그대로 유지)
# train_heat['temp_datetime'] = pd.to_datetime(train_heat['train_heat.tm'], format='%Y%m%d%H')

# print(f"\n📅 날짜 범위: {train_heat['temp_datetime'].min()} ~ {train_heat['temp_datetime'].max()}")

# # 연도별 데이터 개수 확인
# year_counts = train_heat['temp_datetime'].dt.year.value_counts().sort_index()
# print(f"\n📊 연도별 데이터 개수:")
# for year, count in year_counts.items():
#     print(f"  {year}년: {count:,}행")

# # 연도별로 데이터 분할 (임시 datetime 컬럼 사용)
# train_data_2122 = train_heat[train_heat['temp_datetime'].dt.year.isin([2021, 2022])].copy()
# test_data_23 = train_heat[train_heat['temp_datetime'].dt.year == 2023].copy()

# # 임시 datetime 컬럼 제거
# train_data_2122.drop('temp_datetime', axis=1, inplace=True)
# test_data_23.drop('temp_datetime', axis=1, inplace=True)

# print(f"\n✂️ 분할 결과:")
# print(f"2021-2022년 데이터 크기: {train_data_2122.shape}")
# print(f"2023년 데이터 크기: {test_data_23.shape}")

# # 파일로 저장
# train_data_2122.to_csv('train_data_2122.csv', index=False)
# test_data_23.to_csv('test_data_23.csv', index=False)

# print("\n✅ 데이터 분할 및 저장 완료!")
# print(f"- train_data_2122.csv: {train_data_2122.shape[0]:,}행")
# print(f"- test_data_23.csv: {test_data_23.shape[0]:,}행")


In [4]:
# 데이터 파일 로드
if IN_COLAB:
    print("📁 파일 업로드 방법 선택:")
    print("1. 직접 업로드")
    print("2. Google Drive")

    method = input("선택 (1 또는 2): ")

    if method == "1":
        uploaded = files.upload()
        files_list = list(uploaded.keys())
        train_path = [f for f in files_list if 'train' in f.lower()][0]
        test_path = [f for f in files_list if 'test' in f.lower()][0]
    else:
        drive.mount('/content/drive')
        train_path = "/content/drive/MyDrive/train_heat.csv"
        test_path = "/content/drive/MyDrive/test_heat.csv"
else:
    train_path = 'dataset/train_data_2122.csv'
    test_path = 'dataset/test_data_23.csv'

print(f"✅ 파일 경로 설정 완료")

✅ 파일 경로 설정 완료


## 1️⃣ 데이터 로드 및 전처리

In [5]:
def load_and_preprocess(train_path, test_path):
    print("📊 데이터 로드 및 전처리...")

    # 데이터 로드
    train_df = pd.read_csv(train_path)
    test_df = pd.read_csv(test_path)

    def process_df(df):
        # 컬럼명 정리
        if 'Unnamed: 0' in df.columns:
            df = df.drop(columns=['Unnamed: 0'])
        df.columns = [col.replace('train_heat.', '') for col in df.columns]

        # 시간 변수
        df['datetime'] = pd.to_datetime(df['tm'], format='%Y%m%d%H')
        # df['year'] = df['datetime'].dt.year
        df['month'] = df['datetime'].dt.month
        # df['day'] = df['datetime'].dt.day
        df['hour'] = df['datetime'].dt.hour
        df['dayofweek'] = df['datetime'].dt.dayofweek

        # 결측치 처리
        missing_cols = ['ta', 'wd', 'ws', 'rn_day', 'rn_hr1', 'hm', 'si', 'ta_chi']
        if 'heat_demand' in df.columns:
            missing_cols.append('heat_demand')

        for col in missing_cols:
            if col in df.columns:
                df[col] = df[col].replace(-99, np.nan)

        # wd -9.9 결측치 처리
        df.loc[df['wd'] == -9.9, 'wd'] = np.nan

        # 일사량 야간 처리
        if 'si' in df.columns:
            night_mask = (df['hour'] < 8) | (df['hour'] > 18)
            df.loc[night_mask & df['si'].isna(), 'si'] = 0

        # 지사별 보간
        df = df.sort_values(['branch_id', 'datetime'])
        numeric_cols = df.select_dtypes(include=[np.number]).columns

        for branch in df['branch_id'].unique():
            mask = df['branch_id'] == branch
            df.loc[mask, numeric_cols] = df.loc[mask, numeric_cols].interpolate().fillna(method='ffill').fillna(method='bfill')

        return df

    train_df = process_df(train_df)
    test_df = process_df(test_df)

    print(f"   훈련: {train_df.shape}, 테스트: {test_df.shape}")
    print(f"   기간: {train_df['datetime'].min()} ~ {test_df['datetime'].max()}")
    print(f"   브랜치: {sorted(train_df['branch_id'].unique())}")

    return train_df, test_df

train_df, test_df = load_and_preprocess(train_path, test_path)

📊 데이터 로드 및 전처리...
   훈련: (332861, 15), 테스트: (166440, 15)
   기간: 2021-01-01 01:00:00 ~ 2023-12-31 23:00:00
   브랜치: ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S']


## 2️⃣ 파생변수 생성

In [6]:
def create_features(df):
    """HDD, wind_chill, 순환형 인코딩, 범주형 변수 생성"""
    df = df.copy()

    # ⭐ HDD (수치형)
    if 'ta' in df.columns:
        df['HDD_18'] = np.maximum(18 - df['ta'], 0)
        df['HDD_20'] = np.maximum(20 - df['ta'], 0)

    # ⭐ wind_chill (수치형)
    if 'ta' in df.columns and 'ws' in df.columns:
        df['wind_chill'] = np.where(
            (df['ta'] <= 10) & (df['ws'] > 0),
            13.12 + 0.6215 * df['ta'] - 11.37 * (df['ws'] ** 0.16) + 0.3965 * df['ta'] * (df['ws'] ** 0.16),
            df['ta']
        )

    # ⭐ heating_season (범주형)
    df['heating_season'] = df['month'].isin([10, 11, 12, 1, 2, 3, 4]).astype(int)

    # 시간대 범주형
    df['is_weekend'] = (df['dayofweek'] >= 5).astype(int)
    df['is_peak_morning'] = ((df['hour'] >= 7) & (df['hour'] <= 9)).astype(int)
    df['is_peak_evening'] = ((df['hour'] >= 18) & (df['hour'] <= 22)).astype(int)
    df['is_night'] = ((df['hour'] >= 23) | (df['hour'] <= 5)).astype(int)

    # 피크시간 통합
    df['peak_time_category'] = 0
    df.loc[df['is_peak_morning'] == 1, 'peak_time_category'] = 1
    df.loc[df['is_peak_evening'] == 1, 'peak_time_category'] = 2
    df.loc[df['is_night'] == 1, 'peak_time_category'] = 3

    # ⭐ 기온 범주 (범주형)
    if 'ta' in df.columns:
        df['temp_category'] = pd.cut(df['ta'],
                                   bins=[-np.inf, 0, 10, 20, 30, np.inf],
                                   labels=[0, 1, 2, 3, 4]).astype(int)

    # ⭐ 강수 강도 (범주형)
    if 'rn_day' in df.columns:
        df['rain_intensity'] = pd.cut(df['rn_day'],
                                   bins=[-1, 0, 1, 5, 10, np.inf],
                                   labels=[0, 1, 2, 3, 4]).astype(int)

    # ⭐ 순환형 인코딩 (시간 cos, sin)
    df['hour_sin'] = np.sin(2 * np.pi * df['hour'] / 24)
    df['hour_cos'] = np.cos(2 * np.pi * df['hour'] / 24)
    df['month_sin'] = np.sin(2 * np.pi * df['month'] / 12)
    df['month_cos'] = np.cos(2 * np.pi * df['month'] / 12)
    df['dayofweek_sin'] = np.sin(2 * np.pi * df['dayofweek'] / 7)
    df['dayofweek_cos'] = np.cos(2 * np.pi * df['dayofweek'] / 7)

    # 시간 기반 파생변수
    df['hour_squared'] = df['hour'] ** 2
    df['month_day_interaction'] = df['month'] * df['datetime'].dt.day
    
    # 기온 관련 파생변수
    if 'ta' in df.columns:
        df['ta_squared'] = df['ta'] ** 2
        df['ta_cubed'] = df['ta'] ** 3
    
    # 습도와 기온 상호작용
    if 'hm' in df.columns and 'ta' in df.columns:
        df['hm_ta_interaction'] = df['hm'] * df['ta']

    return df

# 파생변수 생성
train_df = create_features(train_df)
test_df = create_features(test_df)

print(f"✅ 파생변수 생성 완료: {train_df.shape[1]}개 컬럼")

✅ 파생변수 생성 완료: 37개 컬럼


## 3️⃣ 시즌별-브랜치별 데이터 분할

In [7]:
# 시즌별-브랜치별 데이터 분할
def split_by_season_and_branch(df):
    data_splits = {}
    
    branches = sorted(df['branch_id'].unique())
    seasons = [0, 1]  # 0: 비난방시즌, 1: 난방시즌
    season_names = {0: '비난방', 1: '난방'}
    
    print(f"📊 데이터 분할 정보:")
    print(f"   브랜치: {len(branches)}개 - {branches}")
    print(f"   시즌: {len(seasons)}개 - {[season_names[s] for s in seasons]}")
    print(f"   총 조합: {len(branches) * len(seasons)}개")
    
    for season in seasons:
        for branch in branches:
            key = f"{season_names[season]}_{branch}"
            
            # 시즌과 브랜치로 필터링
            subset = df[(df['heating_season'] == season) & (df['branch_id'] == branch)].copy()
            
            if len(subset) > 0:
                data_splits[key] = subset
                print(f"   {key}: {len(subset):,}개 데이터")
            else:
                print(f"   {key}: 데이터 없음 ⚠️")
    
    return data_splits

# 훈련 및 테스트 데이터 분할
train_splits = split_by_season_and_branch(train_df)
test_splits = split_by_season_and_branch(test_df)

print(f"\n✅ 훈련 데이터: {len(train_splits)}개 분할")
print(f"✅ 테스트 데이터: {len(test_splits)}개 분할")

📊 데이터 분할 정보:
   브랜치: 19개 - ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S']
   시즌: 2개 - ['비난방', '난방']
   총 조합: 38개
   비난방_A: 7,344개 데이터
   비난방_B: 7,344개 데이터
   비난방_C: 7,344개 데이터
   비난방_D: 7,344개 데이터
   비난방_E: 7,344개 데이터
   비난방_F: 7,344개 데이터
   비난방_G: 7,344개 데이터
   비난방_H: 7,344개 데이터
   비난방_I: 7,344개 데이터
   비난방_J: 7,344개 데이터
   비난방_K: 7,344개 데이터
   비난방_L: 7,344개 데이터
   비난방_M: 7,344개 데이터
   비난방_N: 7,344개 데이터
   비난방_O: 7,344개 데이터
   비난방_P: 7,344개 데이터
   비난방_Q: 7,344개 데이터
   비난방_R: 7,344개 데이터
   비난방_S: 7,344개 데이터
   난방_A: 10,175개 데이터
   난방_B: 10,175개 데이터
   난방_C: 10,175개 데이터
   난방_D: 10,175개 데이터
   난방_E: 10,175개 데이터
   난방_F: 10,175개 데이터
   난방_G: 10,175개 데이터
   난방_H: 10,175개 데이터
   난방_I: 10,175개 데이터
   난방_J: 10,175개 데이터
   난방_K: 10,175개 데이터
   난방_L: 10,175개 데이터
   난방_M: 10,175개 데이터
   난방_N: 10,175개 데이터
   난방_O: 10,175개 데이터
   난방_P: 10,175개 데이터
   난방_Q: 10,175개 데이터
   난방_R: 10,175개 데이터
   난방_S: 10,175개 데이터
📊 데이터 분할 정보:
   브랜치: 19개 - ['A', 'B', 'C

### 📌 Validation시 월별 추출

In [8]:
def get_temporal_split_indices(df, test_size=0.2, min_val_samples=10):
    """시간 기반 분할 인덱스 반환"""
    
    month_counts = df['month'].value_counts()
    valid_months = month_counts[month_counts >= min_val_samples * 2].index
    
    if len(valid_months) < 3:
        # 폴백: 기존 방식
        split_idx = int(len(df) * (1 - test_size))
        return df.index[:split_idx], df.index[split_idx:]
    
    train_indices = []
    val_indices = []
    
    for month in valid_months:
        month_data = df[df['month'] == month].sort_values('datetime')
        val_size = max(min_val_samples, int(len(month_data) * test_size))
        
        train_indices.extend(month_data.iloc[:-val_size].index)
        val_indices.extend(month_data.iloc[-val_size:].index)
    
    print(f"      📅 월별 분할: {len(valid_months)}개월, 검증 {len(val_indices)}개")
    
    return train_indices, val_indices

## 4️⃣ XGBoost 모델 클래스 정의

In [9]:
# XGBoost 버전 확인
import xgboost as xgb
print(f"XGBoost 버전: {xgb.__version__}")

# 버전 호환 XGBoost 모델 클래스
class OptimizedXGBoostModel:
    def __init__(self, model_name):
        self.model_name = model_name
        self.model = None
        self.best_params = None
        self.feature_cols = None
        self.study = None
        self.best_score = None
        
        # XGBoost 버전 확인
        self.xgb_version = xgb.__version__
        self.use_callbacks = self._check_callbacks_support()
        
    def _check_callbacks_support(self):
        """XGBoost 버전에 따른 callbacks 지원 여부 확인"""
        try:
            # 버전 1.4.0 이상에서 callbacks 지원
            version_parts = self.xgb_version.split('.')
            major = int(version_parts[0])
            minor = int(version_parts[1]) if len(version_parts) > 1 else 0
            
            if major > 1 or (major == 1 and minor >= 4):
                return True
            else:
                return False
        except:
            return False
        
    def define_feature_columns(self, df):
        """특성 컬럼 정의"""
        exclude_cols = [
            'tm', 'datetime', 'year', 'heat_demand', 'branch_id'
        ]
        
        self.feature_cols = [col for col in df.columns 
                           if col not in exclude_cols and df[col].dtype in ['int64', 'float64']]
        
        print(f"   📋 {self.model_name}: {len(self.feature_cols)}개 특성 사용")
        return self.feature_cols
    
    def objective(self, trial, X_train, y_train, X_val, y_val):
        """Optuna 목적 함수"""
        # 하이퍼파라미터 탐색 공간 정의 (early stopping 제거하고 n_estimators로 제어)
        params = {
            'objective': 'reg:squarederror',
            'booster': 'gbtree',
            'lambda': trial.suggest_float('lambda', 1e-8, 1.0, log=True),
            'alpha': trial.suggest_float('alpha', 1e-8, 1.0, log=True),
            'colsample_bytree': trial.suggest_float('colsample_bytree', 0.3, 1.0),
            'subsample': trial.suggest_float('subsample', 0.4, 1.0),
            'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3, log=True),
            'n_estimators': trial.suggest_int('n_estimators', 50, 500),  # 범위 축소로 안정성 확보
            'max_depth': trial.suggest_int('max_depth', 3, 8),
            'min_child_weight': trial.suggest_int('min_child_weight', 1, 8),
            'random_state': 42,
            'n_jobs': -1,
            'verbosity': 0
        }
        
        # 모델 생성 및 간단한 훈련 (early stopping 없이)
        model = xgb.XGBRegressor(**params)
        model.fit(X_train, y_train, verbose=False)
        
        # 검증 예측
        y_pred = model.predict(X_val)
        rmse = np.sqrt(mean_squared_error(y_val, y_pred))
        
        return rmse
    
    def fit(self, df, target_col='heat_demand', n_trials=30):
        """모델 훈련 (Optuna 최적화 포함)"""
        print(f"\n🔍 {self.model_name} 모델 훈련 시작...")
        print(f"   🔧 XGBoost 버전: {self.xgb_version}, Callbacks 지원: {self.use_callbacks}")
        
        if len(df) < 10:
            print(f"   ⚠️ 데이터 부족 ({len(df)}개) - 기본 모델 사용")
            self._fit_basic_model(df, target_col)
            return
        
        # 🔥 1. 분할 인덱스 먼저 구하기 (datetime 있을 때)
        train_indices, val_indices = get_temporal_split_indices(df, test_size=0.2)
        
        # 🔥 2. 특성 컬럼 정의 (datetime 제거)
        self.define_feature_columns(df)
        
        # 🔥 3. X, y 생성 (datetime 없음)
        X = df[self.feature_cols].copy()
        y = df[target_col].copy()
        
        # 🔥 4. 인덱스로 분할
        X_train = X.loc[train_indices]
        y_train = y.loc[train_indices]
        X_val = X.loc[val_indices]
        y_val = y.loc[val_indices]
        
        print(f"      📊 훈련: {len(X_train):,}개, 검증: {len(X_val):,}개")
        
        if len(X_train) < 5 or len(X_val) < 2:
            print(f"   ⚠️ 분할 후 데이터 부족 - 기본 모델 사용")
            self._fit_basic_model(df, target_col)
            return
        
        # Optuna 최적화
        print(f"   🎯 하이퍼파라미터 최적화: ", end="", flush=True)
        
        try:
            # 모든 로그 끄기
            optuna.logging.set_verbosity(optuna.logging.ERROR)
            
            self.study = optuna.create_study(
                direction='minimize',
                sampler=TPESampler(seed=42),
                study_name=f"xgb_{self.model_name}"
            )
            
            # 진행상황 표시를 위한 콜백
            def progress_callback(study, trial):
                print(f"\r   🎯 하이퍼파라미터 최적화: {len(study.trials)}/{n_trials} (Best RMSE: {study.best_value:.4f})", end="", flush=True)
            
            # 최적화 실행
            self.study.optimize(
                lambda trial: self.objective(trial, X_train, y_train, X_val, y_val),
                n_trials=n_trials,
                callbacks=[progress_callback]
            )
            
            print()  # 줄바꿈
            
            # 안전한 best_value 접근
            if len(self.study.trials) > 0 and self.study.best_trial is not None:
                self.best_score = self.study.best_value
                self.best_params = self.study.best_params.copy()
                
                # 기본 파라미터 추가
                self.best_params.update({
                    'objective': 'reg:squarederror',
                    'random_state': 42,
                    'n_jobs': -1,
                    'verbosity': 0
                })
                
                # 전체 데이터로 최종 훈련 (early stopping 없이 간단하게)
                self.model = xgb.XGBRegressor(**self.best_params)
                self.model.fit(X, y, verbose=False)
                
                # 성능 정보
                val_pred = self.model.predict(X_val)
                val_rmse = np.sqrt(mean_squared_error(y_val, val_pred))
                
                print(f"   📈 최적화 완료: Best RMSE = {self.best_score:.4f}")
                print(f"   📊 검증 RMSE = {val_rmse:.4f}")
                print(f"   🏆 최적 파라미터: lr={self.best_params.get('learning_rate', 0.1):.3f}, n_est={self.best_params.get('n_estimators', 100)}")
                
            else:
                print(f"   ⚠️ 최적화 실패: 유효한 trial 없음 - 기본 모델 사용")
                self._fit_basic_model(df, target_col)
                
        except Exception as e:
            print(f"\n   ⚠️ 최적화 실패: {str(e)[:30]}... - 기본 모델 사용")
            self._fit_basic_model(df, target_col)
    
    def _fit_basic_model(self, df, target_col):
        """기본 모델 훈련 (최적화 실패 시 사용)"""
        if not hasattr(self, 'feature_cols') or self.feature_cols is None:
            self.define_feature_columns(df)
        
        X = df[self.feature_cols].copy()
        y = df[target_col].copy()
        
        
        # 기본 XGBoost 모델 (가장 안전한 파라미터)
        self.model = xgb.XGBRegressor(
            n_estimators=200,
            learning_rate=0.1,
            max_depth=6,
            min_child_weight=1,
            subsample=0.8,
            colsample_bytree=0.8,
            random_state=42,
            n_jobs=-1,
            verbosity=0
        )
        
        # 안전한 기본 훈련 (early stopping 없이)
        self.model.fit(X, y, verbose=False)
        self.best_score = None
        self.best_params = None
        print(f"   🔧 기본 모델 훈련 완료")
    
    def predict(self, df):
        """예측"""
        if self.model is None:
            return np.full(len(df), 0)
        
        # 동일한 특성 컬럼 사용
        X = df[self.feature_cols].copy()
        # X = X.fillna(0)
        
        # 예측
        predictions = self.model.predict(X)
        
        # 음수 값 처리
        predictions = np.maximum(predictions, 0)
        
        return predictions

print("✅ 버전 호환 XGBoost 모델 클래스 정의 완료")

XGBoost 버전: 3.0.2
✅ 버전 호환 XGBoost 모델 클래스 정의 완료


## 5️⃣ 38개 모델 훈련

In [10]:
# 38개 모델 훈련
print("🚀 38개 XGBoost 모델 훈련 시작!")
print("=" * 60)

models = {}
training_results = {}
n_trials_per_model = 20  # 시간 단축을 위해 20으로 조정

start_time = datetime.now()

# 성공/실패 카운터
success_count = 0
basic_count = 0
failed_count = 0

# 모든 훈련 데이터 분할에 대해 모델 훈련
for i, (model_key, train_data) in enumerate(train_splits.items(), 1):
    print(f"\n[{i:2d}/{len(train_splits)}] 🔥 {model_key}")
    print(f"         📊 데이터: {len(train_data):,}개")
    
    # 모델 생성 및 훈련
    model = OptimizedXGBoostModel(model_key)
    
    try:
        model_start = datetime.now()
        model.fit(train_data, n_trials=n_trials_per_model)
        model_time = (datetime.now() - model_start).total_seconds()
        
        models[model_key] = model
        
        # 결과 분류
        if model.best_score is not None:
            success_count += 1
            status = "✅ 최적화"
            score_text = f"RMSE: {model.best_score:.3f}"
        else:
            basic_count += 1
            status = "🔧 기본모델"
            score_text = "기본파라미터"
        
        # 안전한 결과 저장
        training_results[model_key] = {
            'data_size': len(train_data),
            'training_time': model_time,
            'best_score': model.best_score,
            'best_params': model.best_params,
            'optimization_success': model.best_score is not None
        }
        
        print(f"         {status} | {score_text} | ⏱️ {model_time:.1f}초")
        
    except Exception as e:
        print(f"         ❌ 훈련 실패: {str(e)[:30]}...")
        failed_count += 1
        
        # 완전 기본 모델로 대체
        try:
            basic_model = OptimizedXGBoostModel(model_key)
            basic_model._fit_basic_model(train_data, 'heat_demand')
            models[model_key] = basic_model
            
            training_results[model_key] = {
                'data_size': len(train_data),
                'training_time': 0,
                'best_score': None,
                'best_params': None,
                'optimization_success': False,
                'error': str(e)[:50]
            }
            print(f"         🔧 기본 모델로 대체 완료")
            
        except Exception as e2:
            print(f"         ❌ 기본 모델 생성도 실패")
            # 더미 모델 저장
            dummy_model = OptimizedXGBoostModel(model_key)
            dummy_model.model = None
            models[model_key] = dummy_model
            
            training_results[model_key] = {
                'data_size': len(train_data),
                'training_time': 0,
                'best_score': None,
                'best_params': None,
                'optimization_success': False,
                'error': f"Complete failure: {str(e2)[:30]}"
            }

total_time = (datetime.now() - start_time).total_seconds()

print(f"\n" + "=" * 60)
print(f"🎉 모든 모델 훈련 완료!")
print(f"⏱️  총 소요 시간: {total_time/60:.1f}분 (평균 {total_time/len(train_splits):.1f}초/모델)")
print(f"📊 훈련 결과:")
print(f"   ✅ 최적화 성공: {success_count}개")
print(f"   🔧 기본 모델: {basic_count}개") 
print(f"   ❌ 완전 실패: {failed_count}개")
print(f"   📈 전체 성공률: {(success_count + basic_count)/len(train_splits)*100:.1f}%")

# 최고 성능 모델 찾기
successful_models = {k: v for k, v in training_results.items() 
                    if v.get('optimization_success', False)}

if successful_models:
    best_model = min(successful_models.items(), key=lambda x: x[1]['best_score'])
    print(f"🏆 최고 성능: {best_model[0]} (RMSE: {best_model[1]['best_score']:.4f})")
    
    # 시즌별 평균 성능
    season_scores = {'난방': [], '비난방': []}
    for model_name, result in successful_models.items():
        season = model_name.split('_')[0]
        if season in season_scores:
            season_scores[season].append(result['best_score'])
    
    print(f"📈 시즌별 평균 RMSE:")
    for season, scores in season_scores.items():
        if scores:
            print(f"   {season}시즌: {np.mean(scores):.4f} ({len(scores)}개 모델)")

print("=" * 60)

🚀 38개 XGBoost 모델 훈련 시작!

[ 1/38] 🔥 비난방_A
         📊 데이터: 7,344개

🔍 비난방_A 모델 훈련 시작...
   🔧 XGBoost 버전: 3.0.2, Callbacks 지원: True
      📅 월별 분할: 5개월, 검증 1467개
   📋 비난방_A: 21개 특성 사용
      📊 훈련: 5,877개, 검증: 1,467개
   🎯 하이퍼파라미터 최적화: 20/20 (Best RMSE: 8.5144)
   📈 최적화 완료: Best RMSE = 8.5144
   📊 검증 RMSE = 7.3287
   🏆 최적 파라미터: lr=0.073, n_est=245
         ✅ 최적화 | RMSE: 8.514 | ⏱️ 10.8초

[ 2/38] 🔥 비난방_B
         📊 데이터: 7,344개

🔍 비난방_B 모델 훈련 시작...
   🔧 XGBoost 버전: 3.0.2, Callbacks 지원: True
      📅 월별 분할: 5개월, 검증 1467개
   📋 비난방_B: 21개 특성 사용
      📊 훈련: 5,877개, 검증: 1,467개
   🎯 하이퍼파라미터 최적화: 20/20 (Best RMSE: 14.8406)
   📈 최적화 완료: Best RMSE = 14.8406
   📊 검증 RMSE = 10.4886
   🏆 최적 파라미터: lr=0.010, n_est=208
         ✅ 최적화 | RMSE: 14.841 | ⏱️ 13.7초

[ 3/38] 🔥 비난방_C
         📊 데이터: 7,344개

🔍 비난방_C 모델 훈련 시작...
   🔧 XGBoost 버전: 3.0.2, Callbacks 지원: True
      📅 월별 분할: 5개월, 검증 1467개
   📋 비난방_C: 21개 특성 사용
      📊 훈련: 5,877개, 검증: 1,467개
   🎯 하이퍼파라미터 최적화: 20/20 (Best RMSE: 14.7540)
   📈 최적화 완료: Best RMSE = 

## 6️⃣ 훈련 결과 분석

In [15]:
# 훈련 결과 분석
print("\n📊 훈련 결과 분석")
print("=" * 60)

# 결과 정리
results_df = pd.DataFrame(training_results).T
results_df['season'] = results_df.index.str.split('_').str[0]
results_df['branch'] = results_df.index.str.split('_').str[1]

# 성공/실패 통계
successful_models = results_df[results_df['best_score'].notna()]
failed_models = results_df[results_df['best_score'].isna()]

print(f"✅ 성공: {len(successful_models)}개 모델")
print(f"❌ 실패: {len(failed_models)}개 모델")

if len(successful_models) > 0:
    print(f"\n🏆 최적화 성능 통계:")
    print(f"   평균 RMSE: {successful_models['best_score'].mean():.4f}")
    print(f"   최소 RMSE: {successful_models['best_score'].min():.4f}")
    print(f"   최대 RMSE: {successful_models['best_score'].max():.4f}")
    print(f"   표준편차: {successful_models['best_score'].std():.4f}")

    # 시즌별 성능
    print(f"\n📈 시즌별 평균 RMSE:")
    season_performance = successful_models.groupby('season')['best_score'].agg(['mean', 'count'])
    for season, row in season_performance.iterrows():
        print(f"   {season}시즌: {row['mean']:.4f} ({int(row['count'])}개 모델)")

    # # 상위 5개 모델
    # print(f"\n🥇 성능 상위 5개 모델:")
    # top_models = successful_models.nsmallest(5, 'best_score')
    # for idx, (model_name, row) in enumerate(top_models.iterrows(), 1):
    #     print(f"   {idx}. {model_name}: RMSE = {row['best_score']:.4f}")

# 실패한 모델이 있으면 정보 출력
if len(failed_models) > 0:
    print(f"\n⚠️ 실패한 모델들:")
    for model_name, row in failed_models.iterrows():
        error_msg = row.get('error', '알 수 없는 오류')
        print(f"   {model_name}: {error_msg}")

# 훈련 시간 통계
avg_time = results_df['training_time'].mean()
total_time_min = results_df['training_time'].sum() / 60
print(f"\n⏱️ 훈련 시간 통계:")
print(f"   평균 모델당: {avg_time:.1f}초")
print(f"   총 훈련 시간: {total_time_min:.1f}분")


📊 훈련 결과 분석
✅ 성공: 38개 모델
❌ 실패: 0개 모델

🏆 최적화 성능 통계:
   평균 RMSE: 12.7222
   최소 RMSE: 1.4992
   최대 RMSE: 46.0968
   표준편차: 9.9239

📈 시즌별 평균 RMSE:
   난방시즌: 17.5438 (19개 모델)
   비난방시즌: 7.9005 (19개 모델)

⏱️ 훈련 시간 통계:
   평균 모델당: 11.6초
   총 훈련 시간: 7.4분


### 📍 NaN 및 dtype 문제 해결 함수 별도 정의

In [13]:
# =============================================================================
# NaN 및 데이터 타입 문제 해결
# =============================================================================

# 1. RMSE 평가 시 NaN 문제 해결
def safe_rmse_evaluation(test_df, result_df):
    """안전한 RMSE 평가 (NaN 처리 포함)"""
    print(f"\n📊 안전한 RMSE 성능 평가")
    print("=" * 60)
    
    if 'heat_demand' not in test_df.columns:
        print("⚠️ 테스트 데이터에 heat_demand 컬럼이 없습니다.")
        return None
    
    # 원본 데이터
    y_true = test_df['heat_demand'].values
    y_pred = result_df['pred_heat_demand'].values
    
    print(f"📋 원본 데이터 상태:")
    print(f"   실제값 NaN: {np.isnan(y_true).sum():,}개")
    print(f"   예측값 NaN: {np.isnan(y_pred).sum():,}개")
    print(f"   실제값 inf: {np.isinf(y_true).sum():,}개")
    print(f"   예측값 inf: {np.isinf(y_pred).sum():,}개")
    
    # 유효한 값들만 필터링
    valid_mask = (~np.isnan(y_true)) & (~np.isnan(y_pred)) & (~np.isinf(y_true)) & (~np.isinf(y_pred))
    
    if np.sum(valid_mask) == 0:
        print("❌ 유효한 데이터가 없습니다!")
        return None
    
    y_true_clean = y_true[valid_mask]
    y_pred_clean = y_pred[valid_mask]
    
    print(f"✅ 정리 후 유효 데이터: {len(y_true_clean):,}개 ({len(y_true_clean)/len(y_true)*100:.1f}%)")
    
    # 전체 성능 계산
    overall_rmse = np.sqrt(mean_squared_error(y_true_clean, y_pred_clean))
    overall_mae = mean_absolute_error(y_true_clean, y_pred_clean)
    correlation = np.corrcoef(y_true_clean, y_pred_clean)[0, 1] if len(y_true_clean) > 1 else 0
    
    print(f"\n🏆 전체 성능:")
    print(f"   RMSE: {overall_rmse:.4f}")
    print(f"   MAE:  {overall_mae:.4f}")
    print(f"   상관계수: {correlation:.4f}")
    
    # 시즌별 RMSE
    print(f"\n📈 시즌별 RMSE 성능:")
    season_names = {0: '비난방시즌', 1: '난방시즌'}
    season_results = {}
    
    for season in [0, 1]:
        season_mask = (test_df['heating_season'] == season) & valid_mask
        if np.sum(season_mask) > 0:
            season_rmse = np.sqrt(mean_squared_error(y_true[season_mask], y_pred[season_mask]))
            season_mae = mean_absolute_error(y_true[season_mask], y_pred[season_mask])
            season_corr = np.corrcoef(y_true[season_mask], y_pred[season_mask])[0, 1] if np.sum(season_mask) > 1 else 0
            season_results[season] = {
                'rmse': season_rmse, 
                'mae': season_mae, 
                'corr': season_corr,
                'count': np.sum(season_mask)
            }
            
            print(f"   {season_names[season]:8s}: RMSE={season_rmse:7.4f} | MAE={season_mae:7.4f} | 상관={season_corr:6.3f} | {np.sum(season_mask):,}개")
    
    # 브랜치별 RMSE
    print(f"\n📊 브랜치별 RMSE 성능 (유효 데이터만):")
    branch_results = {}
    
    for branch in sorted(test_df['branch_id'].unique()):
        branch_mask = (test_df['branch_id'] == branch) & valid_mask
        if np.sum(branch_mask) > 1:  # 최소 2개 이상
            try:
                branch_rmse = np.sqrt(mean_squared_error(y_true[branch_mask], y_pred[branch_mask]))
                branch_mae = mean_absolute_error(y_true[branch_mask], y_pred[branch_mask])
                branch_results[branch] = {
                    'rmse': branch_rmse,
                    'mae': branch_mae, 
                    'count': np.sum(branch_mask)
                }
            except Exception as e:
                print(f"   ⚠️ 브랜치 {branch} 계산 실패: {str(e)[:30]}...")
                continue
    
    if branch_results:
        # RMSE 기준 정렬
        sorted_branches = sorted(branch_results.items(), key=lambda x: x[1]['rmse'])
        
        print(f"   🥇 RMSE 우수 브랜치 (Top 5):")
        for i, (branch, metrics) in enumerate(sorted_branches[:5], 1):
            print(f"      {i}. 브랜치 {branch}: RMSE={metrics['rmse']:7.4f} | MAE={metrics['mae']:7.4f} | {metrics['count']:,}개")
        
        print(f"   🥉 RMSE 개선 필요 브랜치 (Bottom 5):")
        for i, (branch, metrics) in enumerate(sorted_branches[-5:], 1):
            print(f"      {i}. 브랜치 {branch}: RMSE={metrics['rmse']:7.4f} | MAE={metrics['mae']:7.4f} | {metrics['count']:,}개")
    
    return {
        'overall_rmse': overall_rmse,
        'overall_mae': overall_mae,
        'correlation': correlation,
        'season_results': season_results,
        'branch_results': branch_results,
        'valid_count': len(y_true_clean),
        'total_count': len(y_true)
    }

# 2. 훈련 결과 분석 시 dtype 문제 해결
def safe_training_analysis(training_results):
    """안전한 훈련 결과 분석 (dtype 문제 해결)"""
    print(f"\n📊 훈련 결과 분석 (dtype 안전)")
    print("=" * 60)
    
    # DataFrame 생성 시 안전하게 처리
    results_list = []
    for model_name, result in training_results.items():
        # best_score를 안전하게 변환
        best_score = result.get('best_score')
        if best_score is not None:
            try:
                best_score = float(best_score)
            except (ValueError, TypeError):
                best_score = None
        
        # 안전한 딕셔너리 생성
        safe_result = {
            'model_name': model_name,
            'data_size': result.get('data_size', 0),
            'training_time': result.get('training_time', 0),
            'best_score': best_score,
            'optimization_success': result.get('optimization_success', False),
            'season': model_name.split('_')[0] if '_' in model_name else 'unknown',
            'branch': model_name.split('_')[1] if '_' in model_name and len(model_name.split('_')) > 1 else 'unknown'
        }
        results_list.append(safe_result)
    
    # DataFrame 생성
    results_df = pd.DataFrame(results_list)
    
    # 성공/실패 통계
    successful_models = results_df[results_df['optimization_success'] == True]
    failed_models = results_df[results_df['optimization_success'] == False]
    
    print(f"✅ 성공: {len(successful_models)}개 모델")
    print(f"❌ 실패: {len(failed_models)}개 모델")
    
    # 성공한 모델들의 성능 통계
    if len(successful_models) > 0:
        # best_score가 유효한 것들만 필터링
        valid_scores = successful_models[successful_models['best_score'].notna()]
        
        if len(valid_scores) > 0:
            print(f"\n🏆 최적화 성능 통계:")
            print(f"   평균 RMSE: {valid_scores['best_score'].mean():.4f}")
            print(f"   최소 RMSE: {valid_scores['best_score'].min():.4f}")
            print(f"   최대 RMSE: {valid_scores['best_score'].max():.4f}")
            print(f"   표준편차: {valid_scores['best_score'].std():.4f}")
            
            # 시즌별 성능
            print(f"\n📈 시즌별 평균 RMSE:")
            season_performance = valid_scores.groupby('season')['best_score'].agg(['mean', 'count'])
            for season, row in season_performance.iterrows():
                print(f"   {season}시즌: {row['mean']:.4f} ({int(row['count'])}개 모델)")
            
            # 상위 5개 모델 (안전하게)
            print(f"\n🥇 성능 상위 5개 모델:")
            try:
                # best_score를 숫자형으로 확실히 변환
                valid_scores_copy = valid_scores.copy()
                valid_scores_copy['best_score'] = pd.to_numeric(valid_scores_copy['best_score'], errors='coerce')
                
                # NaN 제거 후 정렬
                top_models = valid_scores_copy.dropna(subset=['best_score']).nsmallest(5, 'best_score')
                
                for idx, (_, row) in enumerate(top_models.iterrows(), 1):
                    print(f"   {idx}. {row['model_name']}: RMSE = {row['best_score']:.4f}")
                    
            except Exception as e:
                print(f"   ⚠️ 상위 모델 정렬 실패: {str(e)}")
                # 대안: 수동 정렬
                try:
                    scores_list = [(name, score) for name, score in zip(valid_scores['model_name'], valid_scores['best_score']) if pd.notna(score)]
                    scores_list.sort(key=lambda x: float(x[1]))
                    
                    for idx, (name, score) in enumerate(scores_list[:5], 1):
                        print(f"   {idx}. {name}: RMSE = {score:.4f}")
                except Exception as e2:
                    print(f"   ❌ 수동 정렬도 실패: {str(e2)}")
    
    # 실패한 모델 정보
    if len(failed_models) > 0:
        print(f"\n⚠️ 실패한 모델들:")
        error_summary = {}
        for _, row in failed_models.iterrows():
            season = row['season']
            if season not in error_summary:
                error_summary[season] = 0
            error_summary[season] += 1
        
        for season, count in error_summary.items():
            print(f"   {season}시즌: {count}개")
    
    # 훈련 시간 통계
    avg_time = results_df['training_time'].mean()
    total_time_min = results_df['training_time'].sum() / 60
    print(f"\n⏱️ 훈련 시간 통계:")
    print(f"   평균 모델당: {avg_time:.1f}초")
    print(f"   총 훈련 시간: {total_time_min:.1f}분")
    
    return results_df, successful_models, failed_models

# 3. 실행 함수
def run_safe_evaluation():
    """안전한 평가 실행"""
    # 훈련 결과 분석
    results_df, successful_models, failed_models = safe_training_analysis(training_results)
    
    # RMSE 평가 (NaN 안전)
    evaluation_results = safe_rmse_evaluation(test_df, result_df)
    
    return results_df, evaluation_results

print("✅ NaN 및 dtype 문제 해결 함수 준비 완료")
print("실행하려면: results_df, evaluation_results = run_safe_evaluation()")

# 추가: 예측값에서 NaN 제거 함수
def clean_predictions(predictions):
    """예측값 NaN 정리"""
    # NaN을 0으로 대체 (또는 평균값)
    cleaned = np.where(np.isnan(predictions), 0, predictions)
    
    # 음수값 제거
    cleaned = np.maximum(cleaned, 0)
    
    # inf 값 제거
    cleaned = np.where(np.isinf(cleaned), 0, cleaned)
    
    return cleaned

# 기존 result_df 정리
if 'result_df' in locals():
    print("🧹 기존 예측값 정리 중...")
    original_nan_count = result_df['pred_heat_demand'].isna().sum()
    
    # 예측값 정리
    result_df['pred_heat_demand'] = clean_predictions(result_df['pred_heat_demand'].values)
    
    print(f"   정리 전 NaN: {original_nan_count}개")
    print(f"   정리 후 NaN: {result_df['pred_heat_demand'].isna().sum()}개")
    print(f"   정리 후 범위: [{result_df['pred_heat_demand'].min():.2f}, {result_df['pred_heat_demand'].max():.2f}]")

✅ NaN 및 dtype 문제 해결 함수 준비 완료
실행하려면: results_df, evaluation_results = run_safe_evaluation()


### 📍 NaN 있는 경우 아래와 같이 결과 분석

In [None]:
# 새로운 안전한 훈련 결과 분석 사용
results_df, successful_models, failed_models = safe_training_analysis(training_results)

## 7️⃣ 테스트 예측

In [16]:
# 테스트 데이터 예측
print("🎯 테스트 데이터 예측 시작...")
print("=" * 40)

# 예측 결과를 저장할 딕셔너리
predictions = {}
prediction_stats = {}

# 각 모델별로 해당 테스트 데이터에 대해 예측
for model_key, model in models.items():
    if model_key in test_splits:
        test_data = test_splits[model_key]
        
        print(f"📊 {model_key}: {len(test_data):,}개 데이터 예측 중...")
        
        try:
            pred = model.predict(test_data)
            predictions[model_key] = {
                'data': test_data,
                'predictions': pred
            }
            
            # 예측 통계
            prediction_stats[model_key] = {
                'count': len(pred),
                'mean': np.mean(pred),
                'std': np.std(pred),
                'min': np.min(pred),
                'max': np.max(pred)
            }
            
            print(f"   ✅ 완료: 평균={np.mean(pred):.2f}, 범위=[{np.min(pred):.2f}, {np.max(pred):.2f}]")
            
        except Exception as e:
            print(f"   ❌ 예측 실패: {str(e)}")
            # 기본값으로 0 할당
            predictions[model_key] = {
                'data': test_data,
                'predictions': np.zeros(len(test_data))
            }
    else:
        print(f"⚠️ {model_key}: 대응하는 테스트 데이터 없음")

print(f"\n✅ 예측 완료: {len(predictions)}개 모델")

# 예측 통계 요약
if prediction_stats:
    stats_df = pd.DataFrame(prediction_stats).T
    print(f"\n📈 예측값 통계 요약:")
    print(f"   전체 예측 개수: {stats_df['count'].sum():,}개")
    print(f"   평균 예측값 범위: [{stats_df['mean'].min():.2f}, {stats_df['mean'].max():.2f}]")
    print(f"   최대 예측값: {stats_df['max'].max():.2f}")

🎯 테스트 데이터 예측 시작...
📊 비난방_A: 3,672개 데이터 예측 중...
   ✅ 완료: 평균=32.56, 범위=[15.40, 88.01]
📊 비난방_B: 3,672개 데이터 예측 중...
   ✅ 완료: 평균=78.18, 범위=[28.47, 220.26]
📊 비난방_C: 3,672개 데이터 예측 중...
   ✅ 완료: 평균=94.22, 범위=[39.65, 222.22]
📊 비난방_D: 3,672개 데이터 예측 중...
   ✅ 완료: 평균=56.01, 범위=[24.26, 121.57]
📊 비난방_E: 3,672개 데이터 예측 중...
   ✅ 완료: 평균=46.52, 범위=[3.87, 147.74]
📊 비난방_F: 3,672개 데이터 예측 중...
   ✅ 완료: 평균=37.73, 범위=[14.03, 64.06]
📊 비난방_G: 3,672개 데이터 예측 중...
   ✅ 완료: 평균=75.82, 범위=[39.08, 190.67]
📊 비난방_H: 3,672개 데이터 예측 중...
   ✅ 완료: 평균=41.67, 범위=[17.45, 116.63]
📊 비난방_I: 3,672개 데이터 예측 중...
   ✅ 완료: 평균=36.73, 범위=[14.46, 65.54]
📊 비난방_J: 3,672개 데이터 예측 중...
   ✅ 완료: 평균=51.58, 범위=[11.31, 115.81]
📊 비난방_K: 3,672개 데이터 예측 중...
   ✅ 완료: 평균=43.16, 범위=[11.62, 99.23]
📊 비난방_L: 3,672개 데이터 예측 중...
   ✅ 완료: 평균=9.37, 범위=[4.34, 27.61]
📊 비난방_M: 3,672개 데이터 예측 중...
   ✅ 완료: 평균=14.97, 범위=[5.84, 41.30]
📊 비난방_N: 3,672개 데이터 예측 중...
   ✅ 완료: 평균=26.64, 범위=[8.18, 92.44]
📊 비난방_O: 3,672개 데이터 예측 중...
   ✅ 완료: 평균=27.35, 범위=[11.05, 70.76]
📊 비난

## 8️⃣ 예측 결과 통합

In [17]:
# 예측 결과를 원본 test_df 순서에 맞게 통합
print("🔄 예측 결과 통합 중...")

# 결과를 저장할 배열 초기화
final_predictions = np.zeros(len(test_df))
prediction_counts = np.zeros(len(test_df))  # 각 인덱스별 예측 횟수 추적

# 각 예측 결과를 해당 인덱스에 할당
for model_key, pred_info in predictions.items():
    test_data = pred_info['data']
    pred_values = pred_info['predictions']
    
    # 원본 test_df에서 해당 데이터의 인덱스 찾기
    season, branch = model_key.split('_')
    season_num = 1 if season == '난방' else 0
    
    # 조건에 맞는 인덱스 찾기
    mask = (test_df['heating_season'] == season_num) & (test_df['branch_id'] == branch)
    indices = test_df[mask].index.tolist()
    
    print(f"📊 {model_key}: {len(indices)}개 인덱스에 할당")
    
    # 예측값 할당 (인덱스 개수와 예측값 개수가 맞는지 확인)
    if len(indices) == len(pred_values):
        for i, idx in enumerate(indices):
            final_predictions[idx] = pred_values[i]
            prediction_counts[idx] += 1
    else:
        print(f"   ⚠️ 크기 불일치: 인덱스 {len(indices)}개 vs 예측값 {len(pred_values)}개")
        # 크기가 다르면 최소 개수만큼만 할당
        min_len = min(len(indices), len(pred_values))
        for i in range(min_len):
            final_predictions[indices[i]] = pred_values[i]
            prediction_counts[indices[i]] += 1

# 예측되지 않은 데이터 확인
unassigned_count = np.sum(prediction_counts == 0)
if unassigned_count > 0:
    print(f"⚠️ 예측되지 않은 데이터: {unassigned_count}개 (0으로 유지)")

# 중복 예측 확인
duplicate_count = np.sum(prediction_counts > 1)
if duplicate_count > 0:
    print(f"⚠️ 중복 예측된 데이터: {duplicate_count}개")

print(f"\n✅ 예측 결과 통합 완료")
print(f"   📊 총 예측 개수: {len(final_predictions):,}개")
print(f"   📈 예측값 통계: 평균={np.mean(final_predictions):.2f}, 최대={np.max(final_predictions):.2f}")

🔄 예측 결과 통합 중...
📊 비난방_A: 3672개 인덱스에 할당
📊 비난방_B: 3672개 인덱스에 할당
📊 비난방_C: 3672개 인덱스에 할당
📊 비난방_D: 3672개 인덱스에 할당
📊 비난방_E: 3672개 인덱스에 할당
📊 비난방_F: 3672개 인덱스에 할당
📊 비난방_G: 3672개 인덱스에 할당
📊 비난방_H: 3672개 인덱스에 할당
📊 비난방_I: 3672개 인덱스에 할당
📊 비난방_J: 3672개 인덱스에 할당
📊 비난방_K: 3672개 인덱스에 할당
📊 비난방_L: 3672개 인덱스에 할당
📊 비난방_M: 3672개 인덱스에 할당
📊 비난방_N: 3672개 인덱스에 할당
📊 비난방_O: 3672개 인덱스에 할당
📊 비난방_P: 3672개 인덱스에 할당
📊 비난방_Q: 3672개 인덱스에 할당
📊 비난방_R: 3672개 인덱스에 할당
📊 비난방_S: 3672개 인덱스에 할당
📊 난방_A: 5088개 인덱스에 할당
📊 난방_B: 5088개 인덱스에 할당
📊 난방_C: 5088개 인덱스에 할당
📊 난방_D: 5088개 인덱스에 할당
📊 난방_E: 5088개 인덱스에 할당
📊 난방_F: 5088개 인덱스에 할당
📊 난방_G: 5088개 인덱스에 할당
📊 난방_H: 5088개 인덱스에 할당
📊 난방_I: 5088개 인덱스에 할당
📊 난방_J: 5088개 인덱스에 할당
📊 난방_K: 5088개 인덱스에 할당
📊 난방_L: 5088개 인덱스에 할당
📊 난방_M: 5088개 인덱스에 할당
📊 난방_N: 5088개 인덱스에 할당
📊 난방_O: 5088개 인덱스에 할당
📊 난방_P: 5088개 인덱스에 할당
📊 난방_Q: 5088개 인덱스에 할당
📊 난방_R: 5088개 인덱스에 할당
📊 난방_S: 5088개 인덱스에 할당

✅ 예측 결과 통합 완료
   📊 총 예측 개수: 166,440개
   📈 예측값 통계: 평균=96.16, 최대=953.80


## 9️⃣ 최종 결과 저장 및 평가

In [20]:
# 최종 결과를 test_df에 추가
print("💾 최종 결과 저장...")

# 원본 test_df 복사
result_df = test_df.copy()

# 예측 결과 추가 (음수값 제거)
result_df['pred_heat_demand'] = np.maximum(final_predictions, 0).round(1)

# CSV 파일 저장
output_filename = 'xgboost_branch_season_predictions.csv'
result_df.to_csv(output_filename, index=False)

print(f"📁 결과 파일 저장: {output_filename}")

# =============================================================================
# RMSE 중심 성능 평가 (실제값이 있는 경우)
# =============================================================================

if 'heat_demand' in test_df.columns:
    print(f"\n📊 RMSE 성능 평가")
    print("=" * 60)
    
    # 전체 RMSE
    y_true = test_df['heat_demand'].values
    y_pred = result_df['pred_heat_demand'].values
    
    # 음수나 NaN 값 제거
    valid_mask = ~(np.isnan(y_true) | np.isnan(y_pred))
    y_true_clean = y_true[valid_mask]
    y_pred_clean = y_pred[valid_mask]
    
    overall_rmse = np.sqrt(mean_squared_error(y_true_clean, y_pred_clean))
    overall_mae = mean_absolute_error(y_true_clean, y_pred_clean)
    correlation = np.corrcoef(y_true_clean, y_pred_clean)[0, 1]
    
    print(f"🏆 전체 성능:")
    print(f"   RMSE: {overall_rmse:.4f}")
    print(f"   MAE:  {overall_mae:.4f}")
    print(f"   상관계수: {correlation:.4f}")
    print(f"   유효 데이터: {len(y_true_clean):,}개")
    
    # 시즌별 RMSE (핵심!)
    print(f"\n📈 시즌별 RMSE 성능:")
    season_names = {0: '비난방시즌', 1: '난방시즌'}
    season_results = {}
    
    for season in [0, 1]:
        mask = (test_df['heating_season'] == season) & valid_mask
        if np.sum(mask) > 0:
            season_rmse = np.sqrt(mean_squared_error(y_true[mask], y_pred[mask]))
            season_mae = mean_absolute_error(y_true[mask], y_pred[mask])
            season_corr = np.corrcoef(y_true[mask], y_pred[mask])[0, 1] if np.sum(mask) > 1 else 0
            season_results[season] = {
                'rmse': season_rmse, 
                'mae': season_mae, 
                'corr': season_corr,
                'count': np.sum(mask)
            }
            
            print(f"   {season_names[season]:8s}: RMSE={season_rmse:7.4f} | MAE={season_mae:7.4f} | 상관={season_corr:6.3f} | {np.sum(mask):,}개")
    
    # 브랜치별 RMSE (상위/하위 분석)
    print(f"\n📊 브랜치별 RMSE 성능:")
    branch_results = {}
    
    for branch in sorted(test_df['branch_id'].unique()):
        mask = (test_df['branch_id'] == branch) & valid_mask
        if np.sum(mask) > 1:  # 최소 2개 이상의 데이터가 있어야 RMSE 계산 가능
            branch_rmse = np.sqrt(mean_squared_error(y_true[mask], y_pred[mask]))
            branch_mae = mean_absolute_error(y_true[mask], y_pred[mask])
            branch_results[branch] = {
                'rmse': branch_rmse,
                'mae': branch_mae, 
                'count': np.sum(mask)
            }
    
    if branch_results:
        # RMSE 기준 정렬
        sorted_branches = sorted(branch_results.items(), key=lambda x: x[1]['rmse'])
        
        print(f"   🥇 RMSE 우수 브랜치 (Top 5):")
        for i, (branch, metrics) in enumerate(sorted_branches[:5], 1):
            print(f"      {i}. 브랜치 {branch}: RMSE={metrics['rmse']:7.4f} | MAE={metrics['mae']:7.4f} | {metrics['count']:,}개")
        
        print(f"   🥉 RMSE 개선 필요 브랜치 (Bottom 5):")
        for i, (branch, metrics) in enumerate(sorted_branches[-5:], 1):
            print(f"      {i}. 브랜치 {branch}: RMSE={metrics['rmse']:7.4f} | MAE={metrics['mae']:7.4f} | {metrics['count']:,}개")
        
        # 브랜치별 성능 통계
        rmse_values = [v['rmse'] for v in branch_results.values()]
        print(f"\n   📈 브랜치별 RMSE 통계:")
        print(f"      평균: {np.mean(rmse_values):.4f}")
        print(f"      표준편차: {np.std(rmse_values):.4f}")
        print(f"      최소: {np.min(rmse_values):.4f}")
        print(f"      최대: {np.max(rmse_values):.4f}")
    
    # 시즌×브랜치 조합별 RMSE
    print(f"\n🔥 시즌×브랜치 조합별 RMSE (Ranking):")
    combo_results = []
    
    for season in [0, 1]:
        for branch in test_df['branch_id'].unique():
            mask = (test_df['heating_season'] == season) & (test_df['branch_id'] == branch) & valid_mask
            if np.sum(mask) > 1:
                combo_rmse = np.sqrt(mean_squared_error(y_true[mask], y_pred[mask]))
                combo_name = f"{season_names[season]}_{branch}"
                combo_results.append((combo_name, combo_rmse, np.sum(mask)))
    
    # RMSE 기준 정렬하여 표시
    combo_results.sort(key=lambda x: x[1])
    for i, (combo_name, rmse, count) in enumerate(combo_results, 1):
        print(f"   {i:2d}. {combo_name:15s}: RMSE={rmse:7.4f} | {count:,}개")
    
    # 훈련된 모델들의 최적화 성능과 실제 테스트 성능 비교
    print(f"\n🔍 모델 최적화 vs 실제 성능 비교:")
    optimization_rmses = [v['best_score'] for v in training_results.values() if v.get('best_score') is not None]
    
    if optimization_rmses:
        print(f"   훈련시 최적화 RMSE: 평균={np.mean(optimization_rmses):.4f}, 범위=[{np.min(optimization_rmses):.4f}, {np.max(optimization_rmses):.4f}]")
        print(f"   실제 테스트 RMSE: {overall_rmse:.4f}")
        print(f"   성능 차이: {abs(overall_rmse - np.mean(optimization_rmses)):.4f}")
    
else:
    print(f"\n⚠️ 테스트 데이터에 heat_demand 컬럼이 없어서 RMSE 평가를 수행할 수 없습니다.")
    print(f"   예측 결과만 저장되었습니다.")
    
    # 예측값 기본 통계
    print(f"\n📊 예측값 기본 통계:")
    print(f"   개수: {len(result_df):,}개")
    print(f"   평균: {result_df['pred_heat_demand'].mean():.2f}")
    print(f"   중앙값: {result_df['pred_heat_demand'].median():.2f}")
    print(f"   표준편차: {result_df['pred_heat_demand'].std():.2f}")
    print(f"   범위: [{result_df['pred_heat_demand'].min():.2f}, {result_df['pred_heat_demand'].max():.2f}]")
    
    # 시즌별 예측 통계
    print(f"\n📈 시즌별 예측 통계:")
    for season in [0, 1]:
        season_data = result_df[result_df['heating_season'] == season]['pred_heat_demand']
        if len(season_data) > 0:
            season_name = '비난방시즌' if season == 0 else '난방시즌'
            print(f"   {season_name}: 평균={season_data.mean():.2f}, 개수={len(season_data):,}개")

print("=" * 60)

💾 최종 결과 저장...
📁 결과 파일 저장: xgboost_branch_season_predictions.csv

📊 RMSE 성능 평가
🏆 전체 성능:
   RMSE: 21.0538
   MAE:  12.5191
   상관계수: 0.9834
   유효 데이터: 166,440개

📈 시즌별 RMSE 성능:
   비난방시즌   : RMSE=10.0041 | MAE= 6.5901 | 상관= 0.950 | 69,768개
   난방시즌    : RMSE=26.2857 | MAE=16.7981 | 상관= 0.982 | 96,672개

📊 브랜치별 RMSE 성능:
   🥇 RMSE 우수 브랜치 (Top 5):
      1. 브랜치 R: RMSE= 2.9103 | MAE= 2.1281 | 8,760개
      2. 브랜치 L: RMSE= 4.3791 | MAE= 3.0253 | 8,760개
      3. 브랜치 M: RMSE= 6.8395 | MAE= 4.7554 | 8,760개
      4. 브랜치 S: RMSE= 7.3588 | MAE= 5.8401 | 8,760개
      5. 브랜치 K: RMSE= 9.6604 | MAE= 7.3847 | 8,760개
   🥉 RMSE 개선 필요 브랜치 (Bottom 5):
      1. 브랜치 H: RMSE=28.2001 | MAE=19.7508 | 8,760개
      2. 브랜치 G: RMSE=31.4408 | MAE=22.6905 | 8,760개
      3. 브랜치 C: RMSE=33.1009 | MAE=24.0311 | 8,760개
      4. 브랜치 D: RMSE=35.1071 | MAE=24.7073 | 8,760개
      5. 브랜치 B: RMSE=48.0838 | MAE=33.8632 | 8,760개

   📈 브랜치별 RMSE 통계:
      평균: 17.4319
      표준편차: 11.8065
      최소: 2.9103
      최대: 48.0838

🔥 시즌×브랜치 조합별 R

### 📍 NaN 있는 경우 아래와 같이 성능평가 진행

In [None]:
# 새로운 안전한 RMSE 평가 사용
evaluation_results = safe_rmse_evaluation(test_df, result_df)

## 🔟 모델 정보 저장

In [None]:
# 모델 정보 및 결과 저장
print("💾 모델 정보 저장...")

# 훈련 결과 및 모델 정보를 JSON으로 저장
model_info = {
    'total_models': len(models),
    'successful_models': len([k for k, v in training_results.items() if v.get('best_score') is not None]),
    'training_results': training_results,
    'prediction_stats': prediction_stats if 'prediction_stats' in locals() else {},
    'feature_columns': models[list(models.keys())[0]].feature_cols if models else [],
    'optimization_trials_per_model': n_trials_per_model,
    'total_training_time_minutes': total_time_min if 'total_time_min' in locals() else 0
}

# RMSE 결과 추가 (있는 경우)
if 'heat_demand' in test_df.columns:
    model_info['evaluation_results'] = {
        'overall_rmse': overall_rmse,
        'overall_mae': overall_mae,
        'correlation': correlation
    }

# JSON 파일로 저장
with open('model_info_and_results.json', 'w', encoding='utf-8') as f:
    json.dump(model_info, f, indent=2, ensure_ascii=False, default=str)

print("📁 model_info_and_results.json 저장 완료")

# 간단한 요약 출력
print(f"\n📋 모델 정보 요약:")
print(f"   🔧 총 모델 수: {model_info['total_models']}개")
print(f"   ✅ 성공한 모델: {model_info['successful_models']}개")
print(f"   📊 사용 특성 수: {len(model_info['feature_columns'])}개")
print(f"   🎯 모델당 최적화 시도: {model_info['optimization_trials_per_model']}회")

# Google Drive 저장 (Colab 환경)
if IN_COLAB:
    save_drive = input("\nGoogle Drive에 결과 파일들을 저장하시겠습니까? (y/n): ").lower() == 'y'
    if save_drive:
        try:
            !cp {output_filename} /content/drive/MyDrive/
            !cp model_info_and_results.json /content/drive/MyDrive/
            print("✅ Google Drive 저장 완료!")
        except Exception as e:
            print(f"⚠️ Google Drive 저장 실패: {e}")

## 🎯 최종 요약

In [None]:
print("\n" + "="*80)
print("🔥 지역난방 열수요 예측: 시즌별-브랜치별 XGBoost 모델 - 최종 요약")
print("="*80)

print(f"\n🏗️ 모델 구성:")
print(f"   📊 XGBoost 개별 모델 (시즌별 × 브랜치별)")
print(f"   🎯 Optuna TPE 하이퍼파라미터 최적화")
print(f"   📋 총 모델 수: {len(models)}개")
print(f"   ✅ 성공적 훈련: {len([k for k, v in training_results.items() if v.get('best_score') is not None])}개")

print(f"\n📈 특성 엔지니어링:")
print(f"   ⭐ HDD, wind_chill, 온도 제곱/세제곱")
print(f"   🔄 순환형 인코딩 (시간, 월, 요일)")
print(f"   📋 범주형: heating_season, 피크시간, 기온범주, 강수강도")
print(f"   🧮 상호작용: 습도×기온, 월×일")
# print(f"   📏 StandardScaler 정규화")

print(f"\n🎯 모델링 전략:")
print(f"   ❄️ 난방시즌 (10,11,12,1,2,3,4월) 전용 모델")
print(f"   🌞 비난방시즌 (5,6,7,8,9월) 전용 모델")
print(f"   🏢 브랜치별 개별 모델 (각 지사의 특성 반영)")
print(f"   ⚡ XGBoost with Early Stopping")

print(f"\n🔍 최적화 설정:")
print(f"   📊 Optuna TPE Sampler")
print(f"   🎯 모델당 {n_trials_per_model}회 시도")
print(f"   📈 시계열 기반 Train/Validation 분할 (80:20)")
print(f"   🎪 XGBoost Pruning Callback 사용")

if 'heat_demand' in test_df.columns:
    print(f"\n🏆 최종 성능:")
    print(f"   📊 전체 RMSE: {overall_rmse:.4f}")
    print(f"   📏 전체 MAE: {overall_mae:.4f}")
    print(f"   📈 상관계수: {correlation:.4f}")

# 최고 성능 모델 정보
if successful_models is not None and len(successful_models) > 0:
    best_model_name = successful_models['best_score'].idxmin()
    best_score = successful_models.loc[best_model_name, 'best_score']
    print(f"\n🥇 최고 성능 모델: {best_model_name} (RMSE: {best_score:.4f})")

print(f"\n📁 출력 파일:")
print(f"   • {output_filename} - 예측 결과")
print(f"   • model_info_and_results.json - 모델 정보 및 훈련 결과")
print(f"   • 핵심 컬럼: pred_heat_demand (예측값)")

print(f"\n⏱️ 실행 시간:")
if 'total_time_min' in locals():
    print(f"   🚀 총 훈련 시간: {total_time_min:.1f}분")
    print(f"   ⚡ 평균 모델당: {total_time_min*60/len(models):.1f}초")

print(f"\n🎉 지역난방 열수요 예측 완료!")
print(f"🔬 혁신 포인트: 시즌×브랜치 세분화 + Optuna 자동 최적화")
print(f"📊 총 {len(models)}개 모델로 정밀한 지역별-시즌별 예측 구현")
print("="*80)