In [7]:
import numpy as np
import pandas as pd
import os


In [8]:
# 상수 정의
WINDOW_SIZE_20 = 20
WINDOW_SIZE_60 = 60
INPUT_FILE = '../data/feature_database_0801_final_ver.xlsx'
OUTPUT_FILE = '../data/feature_database_0801_final_ver_20_60.xlsx'

COMMON_FEATURES_COUNT = 28

# 키워드 리스트 및 개별 피처 매핑
keywords_features = {
    "auto": ["US_auto", "export_auto"],
    "construct": ["매매가격지수", "아파트매매거래량"],
    "capital_market": ["증권 배당 수익률", "코스피+코스닥 거래량"],
    "chemicals": ["lc_price", "naphtha_price"],
    "equipment": ["기계수주경상금액", "BSI 제조업경기실사지수"],
    "transport": ["발틱운임", "scfi"],
    "semi": ["SOX", "메모리 수출금액"],
    "bank": ["bank_dividend", "call_rate"],
    "steel": ["철광석 가격", "미국 철강 코일 선물"],
    "telecom": ["통신배당", "IT산업별/월별 수출 현황 중 통신기기 소계"],
    "staples": ["미 소맥 선물 ", "미 대두 선물 "],
    "discretionary": ["소비자심리지수", "항공 여객"],
    "kospi": []
}


In [9]:
def calculate_trend_slope(data, window_size):
    """단일 데이터 윈도우에 대한 트렌드 기울기를 계산합니다."""
    if len(data) < window_size:
        raise ValueError("데이터 길이가 구간 크기보다 작습니다.")
    
    window_data = data.astype(float)
    X = np.arange(window_size)
    
    nan_indices = np.isnan(window_data)
    if np.any(nan_indices):
        window_data = window_data[~nan_indices]
        X = X[~nan_indices]
    
    coefficients = np.polyfit(X, window_data, 1)
    slope = coefficients[0]
    
    abs_mean = np.mean(np.abs(window_data))
    if abs_mean == 0:
        return None
    else:
        return slope / abs_mean

def calculate_trend_slope_daily(data, window_size):
    """전체 데이터셋에 대해 일별로 트렌드 기울기를 계산합니다."""
    result = {}
    column_names = data.columns.tolist()
    
    for column in column_names:
        trend_slope_result = []
        for i in range(len(data) - window_size + 1):
            window_data = data[column].iloc[i:i+window_size]
            trend_slope = calculate_trend_slope(window_data, window_size)
            trend_slope_result.append(trend_slope)
        result[column] = trend_slope_result
    
    return result

In [10]:
def load_and_preprocess_data(file_path):
    """데이터를 로드하고 전처리합니다."""
    try:
        df = pd.read_excel(file_path, sheet_name='Sheet1', index_col=0)
        df.index = pd.to_datetime(df.index).strftime('%Y-%m-%d')
        df.index = pd.to_datetime(df.index)
        df = df.dropna(axis=0)
        return df
    except Exception as e:
        print(f"데이터 로드 중 오류 발생: {e}")
        return None

def calculate_and_merge_slopes(df, window_size):
    """주어진 윈도우 크기로 트렌드 기울기를 계산하고 원본 데이터와 병합합니다."""
    result = calculate_trend_slope_daily(df, window_size)
    new_df = df.iloc[window_size-1:]
    result_df = pd.DataFrame(result)
    result_df.index = new_df.index
    return result_df

def save_results(writer, sheet_name, df):
    """결과를 Excel 파일로 저장합니다."""
    try:
        df.index = df.index.strftime('%Y-%m-%d')
        df.to_excel(writer, sheet_name=sheet_name)
        print(f"{sheet_name} 시트에 결과가 저장되었습니다.")
    except Exception as e:
        print(f"파일 저장 중 오류 발생: {e}")

In [12]:
def main():
    # 데이터 로드 및 전처리
    df = load_and_preprocess_data(INPUT_FILE)
    if df is None:
        return

    # 공통 피처와 개별 피처 분리
    common_features = df.iloc[:, :COMMON_FEATURES_COUNT]
    
    with pd.ExcelWriter(OUTPUT_FILE) as writer:
        for keyword, features in keywords_features.items():
            if features:
                specific_features = df[features]
                
                # 20일 및 60일 윈도우에 대한 트렌드 기울기 계산
                result_df_20_common = calculate_and_merge_slopes(common_features, WINDOW_SIZE_20)
                result_df_60_common = calculate_and_merge_slopes(common_features, WINDOW_SIZE_60)
                result_df_20 = calculate_and_merge_slopes(specific_features, WINDOW_SIZE_20)
                result_df_60 = calculate_and_merge_slopes(specific_features, WINDOW_SIZE_60)

                # 결과 병합
                merge_inner = pd.concat([common_features, specific_features, result_df_20_common.add_suffix('_20'), result_df_60_common.add_suffix('_60'), result_df_20.add_suffix('_20'), result_df_60.add_suffix('_60')], axis=1)
            else:
                # kospi의 경우 공통 피처만 포함
                result_df_20_common = calculate_and_merge_slopes(common_features, WINDOW_SIZE_20)
                result_df_60_common = calculate_and_merge_slopes(common_features, WINDOW_SIZE_60)
                
                merge_inner = pd.concat([common_features, result_df_20_common.add_suffix('_20'), result_df_60_common.add_suffix('_60')], axis=1)
                
            # NaN 값을 포함하는 행 제거
            merge_inner = merge_inner.dropna()
            
            # 인덱스를 datetime 형식으로 변환
            merge_inner.index = pd.to_datetime(merge_inner.index)
            
            # 결과 저장
            save_results(writer, keyword, merge_inner)

if __name__ == "__main__":
    main()

auto 시트에 결과가 저장되었습니다.
construct 시트에 결과가 저장되었습니다.
capital_market 시트에 결과가 저장되었습니다.
chemicals 시트에 결과가 저장되었습니다.
equipment 시트에 결과가 저장되었습니다.
transport 시트에 결과가 저장되었습니다.
semi 시트에 결과가 저장되었습니다.
bank 시트에 결과가 저장되었습니다.
steel 시트에 결과가 저장되었습니다.
telecom 시트에 결과가 저장되었습니다.
staples 시트에 결과가 저장되었습니다.
discretionary 시트에 결과가 저장되었습니다.
kospi 시트에 결과가 저장되었습니다.
