<a href="https://colab.research.google.com/github/jylee2930/Basic_BIgDataAnalysis/blob/main/KNN%2BLSTM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.neighbors import KNeighborsRegressor
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.optimizers import Adam
import warnings
warnings.filterwarnings('ignore')

# 한글 폰트 설정 (matplotlib)
plt.rcParams['font.family'] = 'DejaVu Sans'
plt.style.use('default')

class TemperaturePredictor:
    def __init__(self, sequence_length=30, k_neighbors=5):
        self.sequence_length = sequence_length
        self.k_neighbors = k_neighbors
        self.knn_model = None
        self.lstm_model = None
        self.scaler_features = StandardScaler()
        self.scaler_target = MinMaxScaler()

    def load_and_preprocess_data(self, filepath):
        """데이터 로드 및 전처리"""
        print("데이터 로딩 및 전처리 시작...")

        # CSV 파일 읽기
        df = pd.read_csv(filepath)

        # 날짜 컬럼 정리 (탭 문자 제거)
        df['date'] = df['date'].str.replace('\t', '')
        df['date'] = pd.to_datetime(df['date'])

        # 결측값 제거
        df = df.dropna().reset_index(drop=True)

        # 날짜 기반 특성 추가
        df['year'] = df['date'].dt.year
        df['month'] = df['date'].dt.month
        df['day'] = df['date'].dt.day
        df['dayofyear'] = df['date'].dt.dayofyear
        df['dayofweek'] = df['date'].dt.dayofweek

        # 계절 특성 (sin, cos 변환으로 순환성 표현)
        df['month_sin'] = np.sin(2 * np.pi * df['month'] / 12)
        df['month_cos'] = np.cos(2 * np.pi * df['month'] / 12)
        df['dayofyear_sin'] = np.sin(2 * np.pi * df['dayofyear'] / 365)
        df['dayofyear_cos'] = np.cos(2 * np.pi * df['dayofyear'] / 365)

        # 이동평균 특성
        for window in [3, 7, 14]:
            df[f'mean_tmp_ma{window}'] = df['mean_tmp'].rolling(window=window).mean()
            df[f'min_tmp_ma{window}'] = df['min_tmp'].rolling(window=window).mean()
            df[f'max_tmp_ma{window}'] = df['max_tmp'].rolling(window=window).mean()

        # 온도 차이 특성
        df['temp_range'] = df['max_tmp'] - df['min_tmp']
        df['temp_change'] = df['mean_tmp'].diff()

        # 결측값 제거 (이동평균으로 인한)
        df = df.dropna().reset_index(drop=True)

        print(f"전처리 완료: {len(df)}개 데이터 포인트")
        return df

    def create_features_for_knn(self, df):
        """KNN을 위한 특성 생성"""
        feature_cols = [
            'month', 'dayofyear', 'dayofweek',
            'month_sin', 'month_cos', 'dayofyear_sin', 'dayofyear_cos',
            'min_tmp', 'max_tmp', 'temp_range',
            'mean_tmp_ma3', 'mean_tmp_ma7', 'mean_tmp_ma14',
            'min_tmp_ma3', 'max_tmp_ma3', 'temp_change'
        ]

        return df[feature_cols].values

    def create_sequences_for_lstm(self, data, target_col='mean_tmp'):
        """LSTM을 위한 시퀀스 데이터 생성"""
        sequences = []
        targets = []

        for i in range(self.sequence_length, len(data)):
            # 과거 sequence_length 일의 기온, 최저기온, 최고기온 사용
            seq = data[i-self.sequence_length:i][['mean_tmp', 'min_tmp', 'max_tmp']].values
            target = data.iloc[i][target_col]
            sequences.append(seq)
            targets.append(target)

        return np.array(sequences), np.array(targets)

    def split_data(self, df, test_ratio=0.2):
        """데이터 분할 (시계열 특성 고려)"""
        split_idx = int(len(df) * (1 - test_ratio))

        train_df = df[:split_idx].copy()
        test_df = df[split_idx:].copy()

        print(f"훈련 데이터: {len(train_df)}개")
        print(f"테스트 데이터: {len(test_df)}개")

        return train_df, test_df

    def train_knn_model(self, train_df):
        """KNN 모델 훈련"""
        print("KNN 모델 훈련 중...")

        X_train = self.create_features_for_knn(train_df)
        y_train = train_df['mean_tmp'].values

        # 특성 정규화
        X_train_scaled = self.scaler_features.fit_transform(X_train)

        # KNN 모델 훈련
        self.knn_model = KNeighborsRegressor(n_neighbors=self.k_neighbors, weights='distance')
        self.knn_model.fit(X_train_scaled, y_train)

        print("KNN 모델 훈련 완료")

    def train_lstm_model(self, train_df, epochs=50, batch_size=32):
        """LSTM 모델 훈련"""
        print("LSTM 모델 훈련 중...")

        # 시퀀스 데이터 생성
        X_train, y_train = self.create_sequences_for_lstm(train_df)

        # 타겟 정규화
        y_train_scaled = self.scaler_target.fit_transform(y_train.reshape(-1, 1)).flatten()

        # LSTM 모델 구축
        self.lstm_model = Sequential([
            LSTM(64, return_sequences=True, input_shape=(self.sequence_length, 3)),
            Dropout(0.2),
            LSTM(32, return_sequences=False),
            Dropout(0.2),
            Dense(16, activation='relu'),
            Dense(1)
        ])

        self.lstm_model.compile(optimizer=Adam(learning_rate=0.001), loss='mse', metrics=['mae'])

        # 모델 훈련
        history = self.lstm_model.fit(
            X_train, y_train_scaled,
            epochs=epochs, batch_size=batch_size,
            validation_split=0.2, verbose=1
        )

        print("LSTM 모델 훈련 완료")
        return history

    def predict_knn(self, test_df):
        """KNN 예측"""
        X_test = self.create_features_for_knn(test_df)
        X_test_scaled = self.scaler_features.transform(X_test)
        return self.knn_model.predict(X_test_scaled)

    def predict_lstm(self, test_df):
        """LSTM 예측"""
        X_test, _ = self.create_sequences_for_lstm(test_df)
        y_pred_scaled = self.lstm_model.predict(X_test, verbose=0)
        return self.scaler_target.inverse_transform(y_pred_scaled).flatten()

    def predict_hybrid(self, test_df, knn_weight=0.3, lstm_weight=0.7):
        """혼합 모델 예측"""
        print("혼합 모델 예측 중...")

        # KNN 예측
        knn_pred = self.predict_knn(test_df)

        # LSTM 예측 (시퀀스 길이만큼 뒤에서부터)
        lstm_pred = self.predict_lstm(test_df)

        # 길이 맞추기 (LSTM은 sequence_length만큼 짧음)
        knn_pred_aligned = knn_pred[self.sequence_length:]
        test_actual = test_df['mean_tmp'].values[self.sequence_length:]

        # 가중 평균으로 결합
        hybrid_pred = knn_weight * knn_pred_aligned + lstm_weight * lstm_pred

        return {
            'knn': knn_pred_aligned,
            'lstm': lstm_pred,
            'hybrid': hybrid_pred,
            'actual': test_actual
        }

    def evaluate_model(self, predictions, model_name):
        """모델 평가"""
        actual = predictions['actual']
        pred = predictions[model_name.lower()]

        mse = mean_squared_error(actual, pred)
        mae = mean_absolute_error(actual, pred)
        rmse = np.sqrt(mse)
        r2 = r2_score(actual, pred)

        print(f"\n{model_name} 모델 성능:")
        print(f"  MSE: {mse:.4f}")
        print(f"  MAE: {mae:.4f}")
        print(f"  RMSE: {rmse:.4f}")
        print(f"  R²: {r2:.4f}")

        return {'MSE': mse, 'MAE': mae, 'RMSE': rmse, 'R2': r2}

    def plot_predictions(self, predictions, n_days=100):
        """예측 결과 시각화"""
        actual = predictions['actual'][:n_days]
        knn_pred = predictions['knn'][:n_days]
        lstm_pred = predictions['lstm'][:n_days]
        hybrid_pred = predictions['hybrid'][:n_days]

        plt.figure(figsize=(15, 10))

        # 전체 예측 비교
        plt.subplot(2, 2, 1)
        plt.plot(actual, label='Actual', linewidth=2, color='black')
        plt.plot(knn_pred, label='KNN', alpha=0.7, color='blue')
        plt.plot(lstm_pred, label='LSTM', alpha=0.7, color='red')
        plt.plot(hybrid_pred, label='Hybrid', alpha=0.8, color='green', linewidth=2)
        plt.title(f'Temperature Prediction Comparison (First {n_days} days)')
        plt.xlabel('Days')
        plt.ylabel('Temperature (°C)')
        plt.legend()
        plt.grid(True, alpha=0.3)

        # 산점도
        plt.subplot(2, 2, 2)
        plt.scatter(actual, hybrid_pred, alpha=0.6, color='green')
        plt.plot([actual.min(), actual.max()], [actual.min(), actual.max()], 'r--', lw=2)
        plt.title('Hybrid Model: Actual vs Predicted')
        plt.xlabel('Actual Temperature (°C)')
        plt.ylabel('Predicted Temperature (°C)')
        plt.grid(True, alpha=0.3)

        # 오차 분포
        plt.subplot(2, 2, 3)
        errors = {
            'KNN': actual - knn_pred,
            'LSTM': actual - lstm_pred,
            'Hybrid': actual - hybrid_pred
        }

        for model, error in errors.items():
            plt.hist(error, bins=30, alpha=0.6, label=f'{model} (std={np.std(error):.2f})')

        plt.title('Prediction Error Distribution')
        plt.xlabel('Error (°C)')
        plt.ylabel('Frequency')
        plt.legend()
        plt.grid(True, alpha=0.3)

        # 월별 성능 비교
        plt.subplot(2, 2, 4)
        # 간단한 시계열로 표시
        days = np.arange(len(actual))
        plt.plot(days, np.abs(actual - knn_pred), label='KNN MAE', alpha=0.7)
        plt.plot(days, np.abs(actual - lstm_pred), label='LSTM MAE', alpha=0.7)
        plt.plot(days, np.abs(actual - hybrid_pred), label='Hybrid MAE', alpha=0.8, linewidth=2)
        plt.title('Absolute Error Over Time')
        plt.xlabel('Days')
        plt.ylabel('Absolute Error (°C)')
        plt.legend()
        plt.grid(True, alpha=0.3)

        plt.tight_layout()
        plt.show()

    def plot_training_history(self, history):
        """LSTM 훈련 과정 시각화"""
        plt.figure(figsize=(12, 4))

        plt.subplot(1, 2, 1)
        plt.plot(history.history['loss'], label='Training Loss')
        plt.plot(history.history['val_loss'], label='Validation Loss')
        plt.title('LSTM Model Loss')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.legend()
        plt.grid(True, alpha=0.3)

        plt.subplot(1, 2, 2)
        plt.plot(history.history['mae'], label='Training MAE')
        plt.plot(history.history['val_mae'], label='Validation MAE')
        plt.title('LSTM Model MAE')
        plt.xlabel('Epoch')
        plt.ylabel('MAE')
        plt.legend()
        plt.grid(True, alpha=0.3)

        plt.tight_layout()
        plt.show()



In [None]:
# 메인 실행 코드

print("=== KNN+LSTM 혼합 기온 예측 모델 ===\n")

# 모델 초기화
predictor = TemperaturePredictor(sequence_length=30, k_neighbors=7)

# 데이터 로드 및 전처리
df = predictor.load_and_preprocess_data('wonju_temp.csv')

# 데이터 분할
train_df, test_df = predictor.split_data(df, test_ratio=0.2)

# 모델 훈련
predictor.train_knn_model(train_df)
history = predictor.train_lstm_model(train_df, epochs=30, batch_size=32)

# 훈련 과정 시각화
predictor.plot_training_history(history)

# 예측 수행
predictions = predictor.predict_hybrid(test_df, knn_weight=0.3, lstm_weight=0.7)

# 모델 평가
print("\n=== 모델 성능 비교 ===")
knn_metrics = predictor.evaluate_model(predictions, 'KNN')
lstm_metrics = predictor.evaluate_model(predictions, 'LSTM')
hybrid_metrics = predictor.evaluate_model(predictions, 'Hybrid')

# 성능 개선 계산
print(f"\n=== 성능 개선도 ===")
print(f"KNN 대비 Hybrid RMSE 개선: {((knn_metrics['RMSE'] - hybrid_metrics['RMSE']) / knn_metrics['RMSE'] * 100):.2f}%")
print(f"LSTM 대비 Hybrid RMSE 개선: {((lstm_metrics['RMSE'] - hybrid_metrics['RMSE']) / lstm_metrics['RMSE'] * 100):.2f}%")

# 예측 결과 시각화
predictor.plot_predictions(predictions, n_days=200)


