In [9]:
import pandas as pd
import numpy as np
import tensorflow as tf
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, LSTM, Dense, Dropout, Flatten
from tensorflow.keras.callbacks import EarlyStopping

# 데이터 로드 함수
def load_data(pattern, num):
    path = f'algo_dataset/{pattern}/{pattern}_{num}.csv'
    df = pd.read_csv(path)
    return df

# 슬라이딩 윈도우 생성 함수
def create_sliding_window(data, window_size):
    sliding_windows = []
    for i in range(len(data) - window_size + 1):
        sliding_windows.append(data[i:i+window_size])
    return np.array(sliding_windows)

# 패턴 리스트
patterns = ['ascending_triangle', 'descending_triangle', 'ascending_wedge', 'descending_wedge', 'double_top', 'double_bottom']
window_size = 70  # 윈도우 크기 설정
data = []

# 모든 패턴 데이터를 불러오고 슬라이딩 윈도우 적용
for pattern in patterns:
    for i in range(1000):  # 각 패턴에 1000개의 데이터가 있음
        df = load_data(pattern, i+1)
        scaled_data = MinMaxScaler().fit_transform(df[['Open', 'High', 'Low', 'Close', 'Volume']].values)
        windows = create_sliding_window(scaled_data, window_size)
        data.extend([(window, pattern) for window in windows])

# 데이터를 학습용, 테스트용으로 분리합니다.
train_data, test_data = train_test_split(data, test_size=0.2, shuffle=True)

# 데이터 전처리 함수
def prepare_data(data):
    X, y = [], []
    for window, label in data:
        X.append(window)
        y.append(patterns.index(label))  # 패턴 이름을 숫자로 변환

    X = np.array(X)
    y = tf.keras.utils.to_categorical(y, num_classes=6)  # 6개의 클래스에 대한 one-hot encoding

    return X, y

# 학습 데이터 및 테스트 데이터 전처리
X_train, y_train = prepare_data(train_data)
X_test, y_test = prepare_data(test_data)

# 모델 생성 함수
def create_model():
    model = Sequential()

    # CNN Layers
    model.add(Conv1D(filters=128, kernel_size=3, activation='relu', input_shape=(window_size, 5)))  # window_size는 OHLCV 데이터의 길이
    model.add(MaxPooling1D(pool_size=2))
    model.add(Dropout(0.3))

    # LSTM Layer
    model.add(LSTM(128, return_sequences=False))
    model.add(Dropout(0.3))

    # Fully Connected Layer
    model.add(Dense(256, activation='relu'))
    model.add(Dropout(0.3))
    model.add(Dense(6, activation='softmax'))  # 6개의 패턴에 대한 확률 출력

    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001), loss='categorical_crossentropy', metrics=['accuracy'])
    
    return model

# 모델 생성
model = create_model()

# Early stopping을 사용해 과적합 방지
early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

# 모델 학습
model.fit(X_train, y_train, 
          epochs=50, 
          batch_size=64, 
          validation_data=(X_test, y_test), 
          callbacks=[early_stopping])

# 최종 성능 평가
loss, accuracy = model.evaluate(X_test, y_test)
print(f"Final test accuracy: {accuracy:.4f}")


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


Epoch 1/50
[1m5250/5250[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m184s[0m 35ms/step - accuracy: 0.5958 - loss: 0.8629 - val_accuracy: 0.7918 - val_loss: 0.4520
Epoch 2/50
[1m5250/5250[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m171s[0m 33ms/step - accuracy: 0.7910 - loss: 0.4711 - val_accuracy: 0.8284 - val_loss: 0.3883
Epoch 3/50
[1m5250/5250[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m169s[0m 32ms/step - accuracy: 0.8150 - loss: 0.4214 - val_accuracy: 0.8405 - val_loss: 0.3673
Epoch 4/50
[1m5250/5250[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m206s[0m 39ms/step - accuracy: 0.8297 - loss: 0.3909 - val_accuracy: 0.8480 - val_loss: 0.3490
Epoch 5/50
[1m5250/5250[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m205s[0m 39ms/step - accuracy: 0.8402 - loss: 0.3700 - val_accuracy: 0.8488 - val_loss: 0.3531
Epoch 6/50
[1m5250/5250[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m188s[0m 36ms/step - accuracy: 0.8494 - loss: 0.3526 - val_accuracy: 0.8580 - val_loss: 0.323

In [19]:
import os

# 모델 저장 디렉토리 생성
save_dir = './save_model'
if not os.path.exists(save_dir):
    os.makedirs(save_dir)

# 모델 저장 경로
model_save_path = os.path.join(save_dir, 'chart_pattern_model.h5')

# 모델 저장
model.save(model_save_path)
print(f"Model saved at {model_save_path}")



Model saved at ./save_model/chart_pattern_model.h5


In [4]:
import numpy as np
from sklearn.preprocessing import MinMaxScaler

# 예측을 위한 함수
def predict_pattern(model, ohlcv_data, window_size=70):
    """
    외부에서 입력받은 OHLCV 데이터를 기반으로 6개의 차트 패턴에 대한 확률을 예측하는 함수
    :param model: 학습된 CNN-LSTM 모델
    :param ohlcv_data: 새로운 OHLCV 데이터 (numpy 배열 형태로)
    :param window_size: 모델이 사용한 윈도우 크기 (슬라이딩 윈도우 크기)
    :return: 각 패턴에 속할 확률
    """
    # OHLCV 데이터를 정규화 (외부 데이터)
    scaler = MinMaxScaler()
    scaled_data = scaler.fit_transform(ohlcv_data)

    # 입력 데이터를 슬라이딩 윈도우로 변환
    if len(scaled_data) < window_size:
        raise ValueError(f"Input data length should be at least {window_size}")

    input_data = np.array([scaled_data[-window_size:]])  # 최신 데이터로 슬라이딩 윈도우 생성
    
    # 모델 예측 (확률 반환)
    predictions = model.predict(input_data)

    # 6개의 클래스에 대한 확률 출력
    return predictions[0]  # 예측된 확률

In [5]:
# DataFrame 분할 함수
def split_df(df, split_window):
    df_length = len(df)

    if df_length < split_window:
        print(f'{split_window} 기준으로 분할할 수 없습니다! 현재 길이: {df_length}')
    
    final_df_list = []

    for i in range(len(df) - split_window):
        df_start_point = i
        df_end_point = i + split_window
        splited_df = df.iloc[df_start_point:df_end_point]
        final_df_list.append(splited_df)

    return final_df_list    

In [13]:
from pykrx import stock

# 삼성전자 주식 데이터 불러오기 (열 이름 변경)
df = stock.get_market_ohlcv("20240101", "20240701", "005930").drop('등락률', axis=1)

# 열 이름을 영어로 변경
df.rename(columns={'시가': 'Open', '고가': 'High', '저가': 'Low', '종가': 'Close', '거래량': 'Volume'}, inplace=True)

In [14]:
df_list = split_df(df, 70)

In [18]:
for i in range(len(df_list)):
    predicted_probabilities = predict_pattern(model, df_list[i])

    # 각 패턴에 속할 확률을 출력
    patterns = ['ascending_triangle', 'descending_triangle', 'ascending_wedge', 'descending_wedge', 'double_top', 'double_bottom']
    for pattern, prob in zip(patterns, predicted_probabilities):
        print(f"Pattern: {pattern}, Probability: {prob:.4f}")


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 16ms/step
Pattern: ascending_triangle, Probability: 0.0019
Pattern: descending_triangle, Probability: 0.0000
Pattern: ascending_wedge, Probability: 0.0646
Pattern: descending_wedge, Probability: 0.0000
Pattern: double_top, Probability: 0.9218
Pattern: double_bottom, Probability: 0.0118
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step
Pattern: ascending_triangle, Probability: 0.0011
Pattern: descending_triangle, Probability: 0.0000
Pattern: ascending_wedge, Probability: 0.0586
Pattern: descending_wedge, Probability: 0.0000
Pattern: double_top, Probability: 0.9146
Pattern: double_bottom, Probability: 0.0258
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step
Pattern: ascending_triangle, Probability: 0.0092
Pattern: descending_triangle, Probability: 0.0000
Pattern: ascending_wedge, Probability: 0.2599
Pattern: descending_wedge, Probability: 0.0001
Pattern: double_top, Probability: 0.65