In [2]:

# 1. 라이브러리 임포트

%pip install yfinance  # (Colab 등에서 필요 시)
%pip install tensorflow
%pip install matplot
import yfinance as yf
import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
from math import sqrt

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout

import random




[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.3.1[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.3.1[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.
Collecting matplot
  Downloading matplot-0.1.9-py2.py3-none-any.whl.metadata (241 bytes)
Collecting pyloco>=0.0.134 (from matplot)
  Downloading pyloco-0.0.139-py2.py3-none-any.whl.metadata (1.1 kB)
Collecting matplotlib>=3.1.1 (from matplot)
  Downloading matplotlib-3.10.1-cp311-cp311-macosx_11_0_arm64.whl.metadata (11 kB)
Collecting contourpy>=1.0.1 (from ma

In [3]:
###################################
# 2. (샘플) FinBERT 감정 점수 생성 함수
###################################
def get_finbert_sentiment_scores(dates):
    """
    실제로는 FinBERT를 통해 금융 텍스트(뉴스, 공시 등)를 분석해야 합니다.
    여기서는 예시로 날짜별 임의의 연속값(0~1)을 반환합니다.
    FinBERT에서는 보통 (neg, neu, pos) 확률을 얻은 뒤,
    sentiment_score = pos_prob - neg_prob 등을 계산하게 됩니다.
    """
    # dates: 날짜 리스트 (pd.DatetimeIndex)
    sentiment_values = []
    for d in dates:
        # 실제론 날짜별 텍스트를 수집 -> FinBERT 추론 -> (pos, neg, neu) -> (pos - neg) 등
        score = random.random()  # 0~1 사이 난수 (샘플)
        sentiment_values.append(score)

    return pd.Series(sentiment_values, index=dates, name='Sentiment')



In [None]:
###################################
# 3. 데이터 수집 (주가 + 임의 감정 점수)
###################################
ticker = 'AAPL'  # 애플 예시
start_date = '2015-01-01'
end_date = '2025-01-01'

df = yf.download(ticker, start=start_date, end=end_date)
# df: ['Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume']
df.dropna(inplace=True)

# 날짜별 가짜 FinBERT 감정 점수(0~1) 생성 (실제로는 FinBERT를 통해 얻어야 함)
df['Sentiment'] = get_finbert_sentiment_scores(df.index)



[*********************100%***********************]  1 of 1 completed


In [6]:
###################################
# 4. 멀티피처 (Close, Volume, Sentiment) + 정규화
###################################
feature_cols = ['Close', 'Volume', 'Sentiment']
data = df[feature_cols].copy()

scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(data)

# DataFrame 형태로 다시 저장(편의용)
scaled_df = pd.DataFrame(scaled_data, columns=feature_cols, index=df.index)



In [7]:
###################################
# 5. 시계열 윈도우 생성 함수
###################################
def create_sequences_multi_features(scaled_df, window_size=30, target_col='Close'):
    """
    window_size일 동안의 [Close,Volume,Sentiment]를 보고 
    다음 날(target_col)의 값을 예측하는 구조.
    """
    X_list = []
    y_list = []
    dates = scaled_df.index
    date_list = []

    target_idx = scaled_df.columns.get_loc(target_col)

    for i in range(len(scaled_df) - window_size):
        X_window = scaled_df.iloc[i : i + window_size].values
        y_value = scaled_df.iloc[i + window_size, target_idx]
        
        X_list.append(X_window)
        y_list.append(y_value)
        date_list.append(dates[i + window_size])  # 이 샘플의 '정답' 날짜(예: 예측 시점)

    X_arr = np.array(X_list)
    y_arr = np.array(y_list)
    return X_arr, y_arr, date_list

window_size = 30
X_all, y_all, date_all = create_sequences_multi_features(scaled_df, window_size=window_size, target_col='Close')

print("X_all shape:", X_all.shape)  # 예: (N, 30, 3)
print("y_all shape:", y_all.shape)  # 예: (N,)



X_all shape: (1228, 30, 3)
y_all shape: (1228,)


In [8]:
###################################
# 6. Walk-Forward Validation 설정
###################################
def walk_forward_validation(X, y, dates, train_ratio=0.8, n_splits=3):
    """
    단순히 예시로, 전체 기간의 80%까지를 기본 Train 범위로,
    이후를 세 등분(예: n_splits=3) 하여 rolling으로 테스트.
    
    실제론 더 정교한 방식(rolling window, expanding window 등)을 구현할 수 있습니다.
    """
    n_total = len(X)
    n_initial_train = int(n_total * train_ratio)

    # 초반 80%를 기본 훈련 세트로 하고, 나머지 20%를 n_splits 구간으로 나눔
    X_train_initial = X[:n_initial_train]
    y_train_initial = y[:n_initial_train]
    dates_train_initial = dates[:n_initial_train]

    X_test_part = X[n_initial_train:]
    y_test_part = y[n_initial_train:]
    dates_test_part = dates[n_initial_train:]
    
    # n_splits로 나눔
    chunk_size = len(X_test_part) // n_splits

    results = []
    start_idx = 0
    
    # 최근까지 누적(Expanding window) 시연
    for split_idx in range(n_splits):
        end_idx = start_idx + chunk_size
        if split_idx == n_splits - 1:  # 마지막 구간 처리
            end_idx = len(X_test_part)

        # 이 구간까지를 테스트로 사용
        X_test_current = X_test_part[start_idx:end_idx]
        y_test_current = y_test_part[start_idx:end_idx]
        dates_test_current = dates_test_part[start_idx:end_idx]

        # Train = 기존 initial train + 지금까지의 test 일부(Expanding)
        X_train_current = np.concatenate([X_train_initial, X_test_part[:start_idx]], axis=0)
        y_train_current = np.concatenate([y_train_initial, y_test_part[:start_idx]], axis=0)

        # 모델 학습
        model = build_lstm_model(input_shape=(window_size, 3))
        history = model.fit(
            X_train_current, y_train_current,
            epochs=10, batch_size=32, verbose=0
        )

        # 예측
        preds = model.predict(X_test_current)
        mse = mean_squared_error(y_test_current, preds)
        rmse = sqrt(mse)

        results.append({
            'split': split_idx,
            'test_range': (dates_test_current[0], dates_test_current[-1]),
            'rmse': rmse
        })

        start_idx = end_idx  # 다음 구간으로 넘어감

    return results



In [9]:
###################################
# 7. LSTM(2레이어) 모델 구성 함수
###################################
def build_lstm_model(input_shape):
    """
    2레이어 LSTM(64, 32) + Dropout(0.2) + Dense(1)
    """
    model = Sequential()
    model.add(LSTM(64, return_sequences=True, input_shape=input_shape))
    model.add(Dropout(0.2))
    model.add(LSTM(32, return_sequences=False))
    model.add(Dropout(0.2))
    model.add(Dense(1, activation='linear'))

    model.compile(optimizer='adam', loss='mean_squared_error')
    return model



In [10]:
###################################
# 8. Walk-Forward Validation 실행
###################################
results = walk_forward_validation(X_all, y_all, date_all, train_ratio=0.8, n_splits=3)



  super().__init__(**kwargs)


[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 68ms/step


  super().__init__(**kwargs)


[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 70ms/step


  super().__init__(**kwargs)


[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 59ms/step


In [12]:
###################################
# 9. 결과 분석
###################################
for r in results:
    print(f"Split {r['split']} | Test Range: {r['test_range'][0]} ~ {r['test_range'][1]} | RMSE: {r['rmse']:.4f}")

avg_rmse = np.mean([r['rmse'] for r in results])
print(f"Average RMSE across splits: {avg_rmse:.4f}")



Split 0 | Test Range: 2019-01-10 00:00:00 ~ 2019-05-08 00:00:00 | RMSE: 0.0331
Split 1 | Test Range: 2019-05-09 00:00:00 ~ 2019-09-04 00:00:00 | RMSE: 0.0374
Split 2 | Test Range: 2019-09-05 00:00:00 ~ 2019-12-31 00:00:00 | RMSE: 0.0478
Average RMSE across splits: 0.0394


In [13]:
###################################
# 10. 최종 모델 학습 (원하는 구간)
###################################
# 예: 전체 80%는 Train, 나머지 20% Test로 마지막 한 번만 평가하는 단순 접근
simple_split = int(len(X_all)*0.8)
X_train_simple, X_test_simple = X_all[:simple_split], X_all[simple_split:]
y_train_simple, y_test_simple = y_all[:simple_split], y_all[simple_split:]

model_final = build_lstm_model((window_size, 3))
history_final = model_final.fit(X_train_simple, y_train_simple, epochs=10, batch_size=32, verbose=1)

preds_simple = model_final.predict(X_test_simple)
rmse_simple = sqrt(mean_squared_error(y_test_simple, preds_simple))

print(f"[Simple final split] RMSE: {rmse_simple:.4f}")

Epoch 1/10


  super().__init__(**kwargs)


[1m31/31[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 11ms/step - loss: 0.0348
Epoch 2/10
[1m31/31[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step - loss: 0.0022
Epoch 3/10
[1m31/31[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 12ms/step - loss: 0.0022
Epoch 4/10
[1m31/31[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 12ms/step - loss: 0.0021
Epoch 5/10
[1m31/31[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step - loss: 0.0019
Epoch 6/10
[1m31/31[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step - loss: 0.0019
Epoch 7/10
[1m31/31[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step - loss: 0.0019
Epoch 8/10
[1m31/31[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step - loss: 0.0018
Epoch 9/10
[1m31/31[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step - loss: 0.0018
Epoch 10/10
[1m31/31[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 12ms/step - loss: 0.0018
[1m8/8[0