In [1]:
from google.colab import drive
drive.mount('/content/gdrive/')

Mounted at /content/gdrive/


In [2]:
pip install pycaret

Collecting pycaret
  Downloading pycaret-3.3.2-py3-none-any.whl.metadata (17 kB)
Collecting pandas<2.2.0 (from pycaret)
  Downloading pandas-2.1.4-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (18 kB)
Collecting scipy<=1.11.4,>=1.6.1 (from pycaret)
  Downloading scipy-1.11.4-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (60 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.4/60.4 kB[0m [31m1.0 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting joblib<1.4,>=1.2.0 (from pycaret)
  Downloading joblib-1.3.2-py3-none-any.whl.metadata (5.4 kB)
Collecting pyod>=1.1.3 (from pycaret)
  Downloading pyod-2.0.3.tar.gz (169 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m169.6/169.6 kB[0m [31m8.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting category-encoders>=2.4.0 (from pycaret)
  Downloading category_encoders-2.8.0-py3-none-any.whl.metadata (7.9 kB)
Collectin

In [None]:
from pycaret.regression import *

In [None]:
import pandas as pd
import numpy as np
import itertools
import copy
import matplotlib.pyplot as plt
import matplotlib as mpl

# Library Settings
mpl.rcParams['font.family'] = 'Malgun Gothic'  # 윈도우에서 사용되는 한글 폰트
mpl.rcParams['axes.unicode_minus'] = False  # 마이너스 기호가 깨지지 않도록 설정

In [None]:
from sklearn.linear_model import LassoCV, RidgeCV, LarsCV
from sklearn.ensemble import RandomForestRegressor
from sklearn.preprocessing import StandardScaler, RobustScaler, MinMaxScaler
from collections import Counter
from sklearn.ensemble import VotingRegressor
from sklearn.metrics import mean_absolute_error
from sklearn.feature_selection import mutual_info_regression

def split_data(my_data):
    """데이터를 학습 및 검증 세트로 나누는 함수"""
    for df_index in list(itertools.combinations(my_data['Year'].unique(), 2)):
        val_df_index = list(df_index)
        train_df_index = list(set(my_data['Year'].values) - set(val_df_index))
        val = my_data[my_data['Year'].isin(val_df_index)]
        train = my_data[my_data['Year'].isin(train_df_index)]
        yield val, train

def get_feature_importance(x, y, random_state=42,top_k=10):
    """
    다양한 모델(Lasso, Ridge, LARS, RandomForest)을 사용해 Feature Importance를 계산하고
    공통적으로 중요한 피처를 반환하는 함수입니다.

    Parameters:
        X (pd.DataFrame): 독립 변수 데이터프레임
        y (pd.Series): 종속 변수 (타겟)
        random_state (int): 난수 시드

    Returns:
        selected_features (list): 공통적으로 중요한 피처 목록
    """
    # 피처 목록
    feature_lists = []

    # 1. Lasso 모델
    lasso = LassoCV(cv=5, random_state=random_state).fit(x, y)
    lasso_importance = np.abs(lasso.coef_)
    lasso_features = x.columns[lasso_importance > 0].tolist()
    feature_lists.extend(lasso_features)
    print("Lasso Selected Features:", lasso_features)

    # 2. Ridge 모델
    ridge = RidgeCV(cv=5).fit(x, y)
    ridge_importance = np.abs(ridge.coef_)
    ridge_features = x.columns[ridge_importance > np.mean(ridge_importance)].tolist()
    feature_lists.extend(ridge_features)
    print("Ridge Selected Features:", ridge_features)

    # 3. LARS 모델
    lars = LarsCV(cv=5).fit(x, y)
    lars_importance = np.abs(lars.coef_)
    lars_features = x.columns[lars_importance > 0].tolist()
    feature_lists.extend(lars_features)
    print("LARS Selected Features:", lars_features)

    # 피처 등장 횟수 계산
    feature_counts = Counter(feature_lists)

    # 세 가지 이상 모델에서 등장한 피처만 선택
    common_features = [feature for feature, count in feature_counts.items() if count >= 2][:top_k]

    return common_features

def scale_data(data, type = "ss", scaler=None, is_fit=True):
    if is_fit:
        """스케일링 함수 선정"""
        if type == "ss":
            scaler = StandardScaler()
        elif type == "ms":
            scaler = MinMaxScaler()
        elif type == "rs":
            scaler = RobustScaler()
        else:
            print("Wrong Name of Scaler")
        X_scaled = scaler.fit_transform(data)
    else:
        scaler = scaler
        X_scaled = scaler.transform(data)

    # 스케일링된 데이터를 DataFrame으로 변환
    X_scaled_df = pd.DataFrame(X_scaled, columns=data.columns)

    return X_scaled_df, scaler

def prepare_data_for_prediction(data_seen, data_unseen):
    """
    주어진 데이터를 전처리하여 NaN을 제거하고, 예측에 사용할 데이터 준비.
    """
    test_data_1 = pd.DataFrame(data_unseen.loc[TEST_YEARS[0]]).T
    test_data_2 = pd.DataFrame(data_unseen.loc[TEST_YEARS[1]]).T
    #test_data_3 = pd.DataFrame(data_unseen.loc[TEST_YEARS[2]]).T
    data_2023 = pd.DataFrame(data_unseen.loc[2023]).T

    # NaN 제거
    test_data_1.drop(data_seen.columns[data_seen.isna().any()], axis=1, inplace=True)
    test_data_2.drop(data_seen.columns[data_seen.isna().any()], axis=1, inplace=True)
    #test_data_3.drop(data_seen.columns[data_seen.isna().any()], axis=1, inplace=True)
    data_2023.drop(data_seen.columns[data_seen.isna().any()], axis=1, inplace=True)

    data_unseen = data_unseen.drop(data_seen.columns[data_seen.isna().any()], axis=1)
    data_seen = data_seen.drop(data_seen.columns[data_seen.isna().any()], axis=1)

    data_seen.to_csv(f"data_seen_{CROP}{MODEL_NUM}.csv",encoding='utf-8-sig',index=None)
    data_unseen.to_csv(f"data_unseen_{CROP}{MODEL_NUM}.csv",encoding='utf-8-sig',index=None)
    data_2023.to_csv(f"data_2023_{CROP}{MODEL_NUM}.csv",encoding='utf-8-sig',index=None)

    return data_seen, data_unseen, test_data_1, test_data_2, data_2023 # test_data_3

def calculate_error_rate(actual, predicted):
    """평균 절대 오차율 계산"""
    return 100 * mean_absolute_error(actual, predicted) / np.mean(actual)


def create_voting_regressor(base_models_params, random_seed=None):
    estimators = []
    for name, model in base_models_params:
        if hasattr(model, 'random_state') and model.random_state is None:
            model.random_state = random_seed  # 랜덤 시드 설정
        estimators.append((name, model))  # VotingRegressor에 필요한 형식
    return VotingRegressor(estimators=estimators)

def train_and_predict(model, train_data, val_data, test_data, target_col):
    """모델 학습 및 예측"""
    # 모델의 `estimators` 속성을 확인
    if hasattr(model, 'estimators') and (not model.estimators or not isinstance(model.estimators, list)):
        raise ValueError("The 'estimators' attribute of the model is invalid. It should be a non-empty list of (string, estimator) tuples.")

    model.fit(train_data.drop(columns=[target_col, 'Year']), train_data[target_col])
    predictions_train = model.predict(train_data.drop(columns=[target_col, 'Year']))
    predictions_val = model.predict(val_data.drop(columns=[target_col, 'Year']))
    predictions_test = model.predict(test_data.drop(columns=[target_col, 'Year']))
    return predictions_train, predictions_val, predictions_test

def analyze_results(result_df, test_years, top_k=3):
    result_df['val_years_str'] = result_df['val_years'].astype(str)

    """결과 데이터프레임을 분석하여 최종 평균값 계산"""
    grouped_results = (
        result_df.groupby("val_years_str", group_keys=False)
        .apply(lambda x: x.nsmallest(top_k, "val"))
    )
    return {
        "train": grouped_results["train"].mean(),
        "val": grouped_results["val"].mean(),
        f"final_preds_{test_years[0]}": grouped_results[f"test_preds_{test_years[0]}"].mean(),
        f"final_preds_{test_years[1]}": grouped_results[f"test_preds_{test_years[1]}"].mean(),
        f"final_preds_{LIVE_YEAR}": grouped_results[f"live_preds_{LIVE_YEAR}"].mean(),
        "live_err": grouped_results["live_err"].mean(),
    }

def train_and_optimize_models(data_seen, model_num = 20, top_k = None, my_model_name = "my_model"):
    """
    주어진 학습 데이터로 모델을 설정하고 최적화하여 가장 좋은 모델을 반환.
    """
    data_seen = copy.deepcopy(data_seen)
    data_seen = data_seen.reset_index(drop=True)
    model_performance = []
    model_list = []

    for id in range(11, 11+model_num):  # 앙상블 모델 20개
        #id = random.randint(1,100)
        reg_test = setup(data=data_seen,
                         target=TARGET_COL,
                         use_gpu=True,
                         fold=3,
                         train_size=0.80,
                         session_id=id)

        # 모델 비교 및 최적화
        best_models = compare_models(
            sort='rmse',
            n_select=1,
        )

        model_name = best_models.__class__.__name__
        print(model_name)

        # 모델의 RMSE 추출
        model_rmse = pull()["RMSE"].iloc[0]
        model_performance.append((best_models, model_rmse))
        model_list.append(best_models)

    # RMSE 기준으로 정렬
    sorted_model_performance = sorted(model_performance, key=lambda x: x[1])

    # 정렬된 모델 리스트 (성능이 좋은 순서대로)
    model_list_sorted = [model[0] for model in sorted_model_performance]

    if top_k is None: # 별도로 갯수 옵션을 안줄시, 모델리스트의 모든 모델을 앙상블에 활용
        top_k = len(model_list)

    # 모델 블렌딩
    result = blend_models(model_list_sorted[:top_k])  #  전체사용
    result = finalize_model(result)  # 최종 모델로 고정

    if my_model_name:
        save_model(model=result,
            model_name=my_model_name,
            verbose=False)

    return result

def predict_with_models(model, data_seen, data_unseen):
    """
    주어진 모델들을 사용하여 예측하고, 예측 결과를 반환.
    """
    data_seen = copy.deepcopy(data_seen).reset_index(drop=True)
    data_unseen = copy.deepcopy(data_unseen).reset_index(drop=True)

    predictions_seen = predict_model(model, data=data_seen)
    predictions_seen["error_rate"] = 100 * (predictions_seen["prediction_label"] - predictions_seen[TARGET_COL]) / predictions_seen[TARGET_COL]
    seen_avg_error_rate = np.mean(np.abs(np.array(predictions_seen["error_rate"].iloc[:])))

    predictions_unseen = predict_model(model, data=data_unseen)
    predictions_unseen["error_rate"] = 100 * (predictions_unseen["prediction_label"] - predictions_unseen[TARGET_COL]) / predictions_unseen[TARGET_COL]
    unseen_avg_error_rate = np.mean(np.abs(np.array(predictions_unseen["error_rate"].iloc[:])))

    return predictions_seen, predictions_unseen, seen_avg_error_rate, unseen_avg_error_rate

def split_x_y(data, test_years):
    X = data.drop(columns=data.columns[data.columns.str.contains('next')],axis=1)  # 독립 변수만 모음
    X.dropna(axis=1, inplace=True) # 모든년도에서 쓸 수 있는 피처만 남기기
    y = data[TARGET_COL]  # 종속 변수

    X_real = X[X['year']==LIVE_YEAR] # 라이브 데이터는 따로 빼둔다
    y_real = y.loc[X_real.index]

    X = X[X['year']!=LIVE_YEAR]
    y = y.loc[X.index]

    X_train, X_test  = X[~X['year'].isin(test_years)], X[X['year'].isin(test_years)]
    y_train, y_test  = y[X_train.index], y[X_test.index]

    # 인덱스 에러 방지
    X_real.reset_index(drop=True,inplace=True)
    y_real.reset_index(drop=True,inplace=True)

    return X_train, X_test, y_train, y_test, X_real, y_real

def get_mi_feature_importance(x, y,num_of_features):
    x=x.reset_index(drop=True)
    y=y.reset_index(drop=True)

    # Mutual Information 계산
    mi_scores = mutual_info_regression(x, y)

    # 중요 Feature만 선택
    feature_importances = pd.Series(mi_scores, index=x.columns)
    selected_features = feature_importances[feature_importances > feature_importances.mean()].index  # 임계값 설정

    # 모델별 중요도 시각화
    fig, axes = plt.subplots(1, 3, figsize=(18, 6))
    for idx, importance in enumerate(feature_importances.items()):
        sorted_idx = np.argsort(importance)[::-1]  # 중요도 내림차순 정렬
        axes[idx].barh(x.columns[sorted_idx], importance[sorted_idx], color='skyblue')
        axes[idx].set_title(f'Mutual Information Feature Importance')
        axes[idx].set_xlabel('Importance')
        axes[idx].invert_yaxis()  # 중요도 높은 순서로 표시

    plt.tight_layout()
    plt.show()

    if num_of_features:
        selected_features = selected_features[:num_of_features]

    # pd.DataFrame(selected_features).to_csv("selected_features.csv",index=None,encoding='utf-8-sig')
    return selected_features

In [None]:
def my_data_load(num_of_features, test_years, group_idx):
    # 데이터 로딩
    bef = pd.read_csv("배추_transpose_2024.csv",encoding='utf-8-sig')
    aft = pd.read_csv("combined_weather_data(preprocessed_weather).csv",encoding='utf-8-sig')

    # drop 2025
    bef_from_2006 = bef[(bef['year']<2024)&(bef['year']>=2006)]
    aft_from_2006 = aft[(aft['year']<2024)&(aft['year']>=2006)]

    aft_from_2006 = aft_from_2006.interpolate(method='values')
    # data_origin = preprocess_data(data_origin) # 일부 열이름 변경 (통일)

    X_train, X_test, y_train, y_test, X_real, y_real = split_x_y(aft_from_2006, test_years) # bef_from_2006

    X_scaled_train, scaler = scale_data(X_train, is_fit=True)
    X_scaled_test, scaler = scale_data(X_test, is_fit=False, scaler = scaler)
    X_scaled_live, scaler = scale_data(X_real, is_fit=False, scaler = scaler)

    # Feature Importance 함수 호출 (2024는 y값이 없어서 제외)
    final_features = get_feature_importance(X_scaled_train, y_train, num_of_features)
    print("\nFinal Selected Features1:", final_features)

    # selected_features_2 = get_mi_feature_importance(X_scaled_train, y_train, num_of_features)
    # print("\nFinal Selected Features2:", selected_features_2)

    #final_features = list(set(selected_features_1)|set(selected_features_2))
    #print("common final feats : ", final_features)

    pd.DataFrame(final_features).to_csv(f"{MODEL_PATH}/selected_features{MODEL_NUM}({group_idx}).csv",index=None,encoding='utf-8-sig')

    # final_features = ['11월_김치수입','해당년도 11월 배추 경락가','해당년도 12월 배추 경락가','diff_해당년도 5월 배추 경락가_하순-해당년도 5월 배추 경락가_상순','diff_해당년도 7월 배추 경락가_하순-해당년도 7월 배추 경락가_중순','diff_해당년도 8월 배추 경락가_중순-해당년도 8월 배추 경락가_상순','diff_해당년도 9월 배추 경락가_상순-해당년도 8월 배추 경락가_상순','diff_해당년도 9월 배추 경락가_하순-해당년도 7월 배추 경락가_하순','diff_해당년도 10월 배추 경락가_하순-해당년도 10월 배추 경락가_상순','diff_해당년도 10월 배추 경락가_상순-해당년도 6월 배추 경락가_상순']

    X_scaled_train.index, X_scaled_test.index = X_train.index , X_test.index
    X_train, X_test, X_scaled_live = X_scaled_train[final_features], X_scaled_test[final_features], X_scaled_live[final_features]

    return X_train, X_test, y_train, y_test, X_scaled_live, y_real

import joblib

def main(test_years, group_idx):
    num_of_features = 10
    X_train, X_test, y_train, y_test, X_live, y_fakelive = my_data_load(num_of_features, test_years, group_idx)

    train_data = pd.concat([X_train,y_train],axis=1)
    test_data = pd.concat([X_test,y_test],axis=1)
    live_data = pd.concat([X_live, y_fakelive],axis=1)

    live_data = live_data.dropna(subset=['year']) # 2025등 널값제거
    train_for_live = pd.concat([train_data,test_data],axis=0)

    print(train_data.columns)

    # # 모델 학습 및 최적화
    model_name = MODEL_PATH + f"_{group_idx}"
    base_model = train_and_optimize_models(train_data, 15, 5, model_name) # 데이터, 만들어낼모델수(15), 앙상블할 top-k모델수(5), 모델이름

    # # 모델 로딩 및 기본 설정
    base_model = joblib.load(model_name + ".pkl")

    # 예측 및 결과
    predictions_train, predictions_test, train_avg_error_rate, test_avg_error_rate = predict_with_models(base_model, train_data, test_data)

    print(f"TEST YEARS: {test_years}")
    print(f"Train Average Error Rate: {train_avg_error_rate:.2f}%")
    print(f"Test({test_years}) Average Error Rate: {test_avg_error_rate:.2f}%")

    predictions_live_train, predictions_live, train_for_live_avg_error_rate, live_avg_error_rate = predict_with_models(base_model, train_for_live, live_data)
    print(f"LIVE YEAR: {LIVE_YEAR}")
    print(f"Train for live Average Error Rate: {train_for_live_avg_error_rate:.2f}%")
    print(f"Live({LIVE_YEAR}) Average Error Rate: {live_avg_error_rate:.2f}%")
    print(f"predicted Live({LIVE_YEAR}) : {predictions_live}")

In [None]:
# import time

# MODEL_NUM = 1
# MODEL_PATH = '/content/' # colab : /content/

# LIVE_YEAR = 2023
# test_years = [2021,2022]

# TARGET_COL = 'next_3_price'

# start_time = time.time()
# main(test_years, 7)
# print(f"Execution Time: {time.time() - start_time:.2f} seconds")

# 데이터 처리

## 외부 공실률 통합 및 정제

In [4]:
import numpy as np
import re

# 정규표현식 패턴
pattern = r"[^공실률가-힣]*([가-힣]+)[^공실률가-힣]*"

def preprocess_vcrate_df(df):
  preprocessed_df = df.copy()
  preprocessed_df = pd.melt(df, id_vars = ['자료시점','공실률종류'])
  preprocessed_df['년도'] = preprocessed_df['자료시점'].apply(lambda x : int(x[:4]) if '분기' in x else np.nan)
  preprocessed_df['분기'] = preprocessed_df['자료시점'].apply(lambda x : int(x[-3:-2]) if '분기' in x else np.nan)
  preprocessed_df = preprocessed_df.dropna(subset=['년도'],axis=0)
  preprocessed_df['지역'] = preprocessed_df['variable'].apply(lambda x : re.findall(pattern, x)[0] if len(re.findall(pattern, x))>0 else x)
  preprocessed_df = preprocessed_df[(preprocessed_df['variable']!='No')&(preprocessed_df['variable']!='전국')] # 불필요한 칼럼 제거
  return preprocessed_df

In [5]:
import zipfile
import pandas as pd
import os

center = pd.read_csv("center.csv")

# zip 파일 경로 설정
zip_path = '/content/vcrate_outfiles.zip'

all_data = []

# zip 파일을 열기
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    # zip 파일 내의 모든 파일 리스트
    file_list = zip_ref.namelist()

    # CSV 파일만 필터링
    csv_files = [f for f in file_list if f.endswith('.csv')]

    # 여러 CSV 파일을 읽어서 하나의 데이터프레임으로 결합
    file_names = [str(file).split("_")[-1].split(".")[0] for file in csv_files]

    for idx, file in enumerate(csv_files):
      df = pd.read_csv(zip_ref.open(file), encoding='euc-kr')
      df['공실률종류'] = file_names[idx]
      all_data.append(preprocess_vcrate_df(df))

all_data = pd.concat(all_data, ignore_index=True)
# 데이터 확인
all_data

FileNotFoundError: [Errno 2] No such file or directory: 'center.csv'

In [None]:
all_data.to_csv("전국지역별_년도별분기별_건물종류별_공실률.csv",encoding='euc-kr',index=None)

## 각종 외부 경제 지표

In [43]:
def preprocess_economy_df(df):
    import numpy as np
    import re
    import pandas as pd

    pattern = r'실적|전망'
    preprocessed_df = df.copy()
    preprocessed_df = pd.melt(df, id_vars=['시점', '업종코드별','BSI코드별'])

    preprocessed_df['년도'] = preprocessed_df['시점'].apply(lambda x: int(x[:4]) if '월' in x else np.nan)
    preprocessed_df['월'] = preprocessed_df['시점'].apply(lambda x: int(x[5:7]) if '월' in x else np.nan)
    preprocessed_df = preprocessed_df.dropna(subset=['년도'])

    preprocessed_df['종류'] = preprocessed_df['variable'].apply(lambda x: re.findall(pattern, x)[0] if re.findall(pattern, x) else x)

    # BSI 종류 정보만 보존
    preprocessed_df['BSI코드별'] = preprocessed_df['BSI코드별'].apply(lambda x: ' '.join(re.findall(r'[가-힣]+', x)))
    # 불필요 칼럼 제거
    preprocessed_df.drop(columns=['variable'], inplace=True)

    # 🚀 변경: stack() 없이 수치형 값만 필터링
    preprocessed_df['value'] = pd.to_numeric(preprocessed_df['value'], errors='coerce')
    df_numeric = preprocessed_df.dropna(subset=['value'])

    return df_numeric

In [44]:
# zip 파일 경로 설정
zip_path_economy = '/content/업종별_기업경기실사지수.zip'

all_ecos = []

# zip 파일을 열기
with zipfile.ZipFile(zip_path_economy, 'r') as zip_ref:
    # zip 파일 내의 모든 파일 리스트
    file_list = zip_ref.namelist()

    # CSV 파일만 필터링
    csv_files = [f for f in file_list if f.endswith('.csv')]

    # 여러 CSV 파일을 읽어서 하나의 데이터프레임으로 결합
    # file_names = [str(file).split("_")[-1].split(".")[0] for file in csv_files]

    for idx, file in enumerate(csv_files):
      df = pd.read_csv(zip_ref.open(file), encoding='euc-kr')
      all_ecos.append(preprocess_economy_df(df))

all_ecos = pd.concat(all_ecos, ignore_index=True)
# 데이터 확인
all_ecos

Unnamed: 0,시점,업종코드별,BSI코드별,value,년도,월,종류
0,2003.01 월,전 산 업,업황실적,82.0,2003,1,실적
1,2003.02 월,전 산 업,업황실적,73.0,2003,2,실적
2,2003.03 월,전 산 업,업황실적,71.0,2003,3,실적
3,2003.04 월,전 산 업,업황실적,76.0,2003,4,실적
4,2003.05 월,전 산 업,업황실적,74.0,2003,5,실적
...,...,...,...,...,...,...,...
272955,2024.10 월,내수기업,인력사정전망,89.0,2024,10,전망
272956,2024.11 월,내수기업,인력사정전망,90.0,2024,11,전망
272957,2024.12 월,내수기업,인력사정전망,91.0,2024,12,전망
272958,2025.01 월,내수기업,인력사정전망,93.0,2025,1,전망


In [45]:
all_ecos.to_csv("년도별월별_전국_업종별_기업경기지수.csv",encoding='euc-kr',index=None)