In [1]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import matplotlib
# matplotlib.use('TkAgg')
import matplotlib.pyplot as plt
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

In [2]:
data = pd.read_csv('/content/모델 테스트.csv')

for i in data.index[data['Volume'] == 0]:
    if i+1 in data.index:  # 9시의 거래량을 10시의 거래량의 두배로 채워줌.
        data.loc[i, 'Volume'] = 2 * data.loc[i+1, 'Volume']

# 이진 분류 타겟값 생성
data['target'] = None

for i in range(len(data)):
  if data.loc[i,'Close'] > data.loc[i,'Open']:
    data.at[i,'target'] = 1
  else:
    data.at[i,'target'] = 0


In [3]:
data["Datetime"] = pd.to_datetime(data["Datetime"]).dt.strftime('%Y-%m-%d %H:%M:%S')
data['Datetime'] = pd.to_datetime(data['Datetime'])
data.set_index('Datetime', inplace=True)

In [4]:
data

Unnamed: 0_level_0,Unnamed: 0,Open,High,Low,Close,Adj Close,Volume,target
Datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
2022-04-06 09:00:00,0,68600.0,68800.0,68600.0,68700.0,68700.0,3693164,1
2022-04-06 10:00:00,1,68600.0,68800.0,68600.0,68700.0,68700.0,1846582,1
2022-04-06 11:00:00,2,68700.0,68700.0,68600.0,68600.0,68600.0,901392,0
2022-04-06 12:00:00,3,68700.0,68700.0,68500.0,68600.0,68600.0,2053045,0
2022-04-06 13:00:00,4,68600.0,68600.0,68500.0,68600.0,68600.0,924688,0
...,...,...,...,...,...,...,...,...
2024-04-16 10:00:00,31,80300.0,80400.0,79700.0,79900.0,79900.0,6284883,0
2024-04-16 11:00:00,32,79800.0,80000.0,79400.0,79900.0,79900.0,4427816,1
2024-04-16 12:00:00,33,79900.0,80000.0,79600.0,79900.0,79900.0,2234167,0
2024-04-16 13:00:00,34,79900.0,80000.0,79800.0,79800.0,79800.0,1713636,0


In [5]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset


# 새로운 특징 생성 함수
# rolling의 파라미터를 설정하여 Nan이 나오지 않게 가능.

def add_features(df):
    df['H-L'] = df['High'] - df['Low']
    df['O-C'] = df['Close'] - df['Open']
    df['2_HOURS_MA'] = df['Close'].rolling(window = 2, min_periods=1).mean()
    df['4_HOURS_MA'] = df['Close'].rolling(window = 4, min_periods=1).mean()
    df['6_HOURS_MA'] = df['Close'].rolling(window = 6, min_periods=1).mean()
    df['3_HOURS_STD_DEV'] = df['Close'].rolling(window = 3, min_periods=1).std()
    df['target'] = df['target']
    return df

data = add_features(data)
data.dropna(inplace=True)  # 이동 평균 및 표준편차로 인한 NA 값 제거

# 특징 선택 및 정규화
features = data[['H-L', 'O-C', '2_HOURS_MA','4_HOURS_MA','6_HOURS_MA','3_HOURS_STD_DEV', 'Volume', 'Close','target']].values
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_features = scaler.fit_transform(features)

# 시퀀스 데이터 생성 함수
def create_sequences(data, seq_length):
    xs, ys = [], []
    for i in range(len(data) - seq_length):
        x = data[i:(i + seq_length), :-1]
        y = data[i + seq_length, -1]  # 'target'는 마지막 열에 위치
        xs.append(x)
        ys.append(y)
    return np.array(xs), np.array(ys)

seq_length = 10  # 시퀀스 길이

# 데이터 세트 크기 계산 및 분할
train_size = int(len(scaled_features) * 0.7)
val_size = int(len(scaled_features) * 0.2)
test_size = len(scaled_features) - train_size - val_size

# 시퀀스 생성을 고려한 데이터 분할
X_train, y_train = create_sequences(scaled_features[:train_size], seq_length)
X_val, y_val = create_sequences(scaled_features[train_size:train_size+val_size], seq_length)
X_test, y_test = create_sequences(scaled_features[train_size+val_size:], seq_length)

# 텐서로 변환
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
X_val_tensor = torch.tensor(X_val, dtype=torch.float32)
y_val_tensor = torch.tensor(y_val, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

# DataLoader 설정
train_data = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(dataset=train_data, batch_size=8, shuffle=False)

val_data = TensorDataset(X_val_tensor, y_val_tensor)
val_loader = DataLoader(dataset=val_data, batch_size=8, shuffle=False)

test_data = TensorDataset(X_test_tensor, y_test_tensor)
test_loader = DataLoader(dataset=test_data, batch_size=8, shuffle=False)


### GRU 모델

In [6]:
import torch
import torch.nn as nn
from torch.optim import Adam
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import precision_score

# Define the GRU model
class StockPredictor(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers, output_dim):
        super(StockPredictor, self).__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers

        # GRU Layer
        self.gru = nn.GRU(input_dim, hidden_dim, num_layers, batch_first=True)
        # Fully connected layer
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        # Initialize hidden state
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).to(x.device)

        # Forward propagate the GRU
        out, _ = self.gru(x, h0)

        # Decode the hidden state of the last time step
        out = self.fc(out[:, -1, :])
        return out

# Hyperparameters
input_dim = X_train.shape[2]  # Number of features
hidden_dim = 32
num_layers = 2
output_dim = 1
num_epochs = 50
learning_rate = 0.001

# Initialize the GRU model
model = StockPredictor(input_dim, hidden_dim, num_layers, output_dim)

# Loss and optimizer
criterion = nn.BCEWithLogitsLoss()
optimizer = Adam(model.parameters(), lr=learning_rate)

# Training the model
def train_model(model, train_loader, val_loader, num_epochs):
    for epoch in range(num_epochs):
        model.train()
        for i, (features, labels) in enumerate(train_loader):
            optimizer.zero_grad()
            outputs = model(features)
            loss = criterion(outputs.squeeze(), labels)
            loss.backward()
            optimizer.step()

        # Validation loss
        model.eval()
        with torch.no_grad():
            val_loss = 0
            for features, labels in val_loader:
                outputs = model(features)
                loss = criterion(outputs.squeeze(), labels)
                val_loss += loss.item()
            print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}, Val Loss: {val_loss/len(val_loader):.4f}')

# Evaluate the model and calculate precision
def evaluate_model(model, test_loader):
    model.eval()
    predictions = []
    actuals = []
    prediction_details = []
    with torch.no_grad():
        for features, labels in test_loader:
            outputs = model(features)
            probabilities = torch.sigmoid(outputs.squeeze()).tolist()
            predicted = (torch.sigmoid(outputs.squeeze()) > 0.45).float()
            predictions.extend(predicted.tolist())
            actuals.extend(labels.tolist())
            prediction_details.extend(f"{int(pred)} ({prob:.4f})" for pred, prob in zip(predicted, probabilities))

    # Calculate precision
    precision = precision_score(actuals, predictions)
    print(f'Precision: {precision:.2f}')

    # 예측값 , 실제값 , 예측 확률
    return predictions, actuals, prediction_details

# Train the model
train_model(model, train_loader, val_loader, num_epochs)

# Evaluate the model and retrieve predictions, actual labels, and detailed predictions
predictions, actuals, prediction_details = evaluate_model(model, test_loader)
print("Detailed Predictions:", prediction_details)
print("Actuals:", actuals)


Epoch [1/50], Loss: 0.4313, Val Loss: 0.6678
Epoch [2/50], Loss: 0.4340, Val Loss: 0.6676
Epoch [3/50], Loss: 0.4353, Val Loss: 0.6676
Epoch [4/50], Loss: 0.4360, Val Loss: 0.6676
Epoch [5/50], Loss: 0.4364, Val Loss: 0.6676
Epoch [6/50], Loss: 0.4367, Val Loss: 0.6676
Epoch [7/50], Loss: 0.4368, Val Loss: 0.6676
Epoch [8/50], Loss: 0.4368, Val Loss: 0.6677
Epoch [9/50], Loss: 0.4367, Val Loss: 0.6677
Epoch [10/50], Loss: 0.4366, Val Loss: 0.6678
Epoch [11/50], Loss: 0.4364, Val Loss: 0.6679
Epoch [12/50], Loss: 0.4362, Val Loss: 0.6679
Epoch [13/50], Loss: 0.4360, Val Loss: 0.6679
Epoch [14/50], Loss: 0.4354, Val Loss: 0.6680
Epoch [15/50], Loss: 0.4337, Val Loss: 0.6681
Epoch [16/50], Loss: 0.4308, Val Loss: 0.6682
Epoch [17/50], Loss: 0.4290, Val Loss: 0.6682
Epoch [18/50], Loss: 0.4301, Val Loss: 0.6679
Epoch [19/50], Loss: 0.4325, Val Loss: 0.6674
Epoch [20/50], Loss: 0.4354, Val Loss: 0.6667
Epoch [21/50], Loss: 0.4390, Val Loss: 0.6663
Epoch [22/50], Loss: 0.4400, Val Loss: 0.66

### 모델결과 확인

In [7]:
check = pd.DataFrame({})

check['Detailed Prediction'] = prediction_details
check['Actuals'] = actuals

filtered_df = check[check['Detailed Prediction'].str.startswith('1 ')]
filtered_df

Unnamed: 0,Detailed Prediction,Actuals
3,1 (0.4547),1.0
75,1 (0.4791),0.0
77,1 (0.4544),0.0
111,1 (0.4811),0.0
117,1 (0.4503),0.0
159,1 (0.4751),0.0
177,1 (0.4554),1.0
180,1 (0.7923),1.0
181,1 (0.8039),1.0
182,1 (0.7368),0.0


### only 변동성 돌파 전략

In [8]:
almac_m = pd.read_csv('/content/almac_m.csv')

almac_m["Datetime"] = pd.to_datetime(almac_m["Datetime"]).dt.strftime('%Y-%m-%d %H:%M:%S')
almac_m['Datetime'] = pd.to_datetime(almac_m['Datetime'])
almac_m.set_index('Datetime', inplace=True)

In [29]:
# 변동성 돌파 전략을 적용하는 함수 정의
def apply_volatility_breakout_strategy(df):
    df['Hour'] = df.index.hour
    df['Date'] = df.index.date

    results = []  # 결과 저장 리스트
    k = 0.6  # 변동폭 계수 -> 자유롭게 설정

    grouped = df.groupby(['Date', 'Hour'])

    for (date, hour), group in grouped:
        if group.empty:
            continue

        # 이전 시간대 데이터를 기반으로 목표 매수 가격 설정 , 9시 이후부터 변동성 돌파전략에 의한 매수가능.
        if hour > 9:
            # 전 시간대의 정보를 가져오는 코드
            previous_group = grouped.get_group((date, hour - 1))
            if not previous_group.empty:
                high_price = previous_group['High'].max()
                low_price = previous_group['Low'].min()
                target_price = group['Open'][0] + (high_price - low_price) * k

                # 목표 가격 돌파 여부 확인 , High가 target_price보다 크면 변동성 돌파전략에 의한 조건이 만족 ,
                buy_prices = group[group['High'] > target_price]

                if not buy_prices.empty:
                    buy_price = buy_prices.iloc[0]['Open']  # 첫 돌파 가격에서 매수 , 1분 안에 정확히 언제 매수한지는 알 수 없으므로, open가에 산다고 가정
                    sell_price = group.iloc[-1]['Close']  # 해당 시간 마지막 종가에서 매도
                    results.append((sell_price - buy_price) / buy_price * 100)  # 수익률 계산

    return results


# 전략 적용
trade_results = apply_volatility_breakout_strategy(almac_m)
average_return = sum(trade_results) / len(trade_results) if trade_results else 0

print(f"Average Return: {average_return}")
print("Individual trade returns:", trade_results)


Average Return: 0.2254817720813425
Individual trade returns: [0.6527415143603132, 0.0, 0.0, -0.40431266846361186, -0.136986301369863, 0.0, 0.4026845637583893, -0.13333333333333333, -0.33185840707964603, 2.2058823529411766]


변동성 돌파 전략 + 기존 모델 결합

In [46]:
almac_h = pd.read_csv('/content/almac_h.csv')

almac_h["Datetime"] = pd.to_datetime(almac_h["Datetime"]).dt.strftime('%Y-%m-%d %H:%M:%S')
almac_h['Datetime'] = pd.to_datetime(almac_h['Datetime'])
almac_h.set_index('Datetime', inplace=True)

In [47]:
# 이 부분은 자신의 모델 sequence 길이에 따라 인덱스를 조정해줘야함.
# 나는 sequence 길이가 10이기 때문에 이렇게 설정
# 10개를 입력하면 4월24일부터 예측이 실행됨.

almac_h = almac_h[20:]

In [62]:
# hynix_h를 바탕으로 모델가격 예측
# 특성 추가
def add_features(df):
    df = df.copy()
    df['H-L'] = df['High'] - df['Low']
    df['O-C'] = df['Close'] - df['Open']
    df['2_HOURS_MA'] = df['Close'].rolling(window=2, min_periods=1).mean()
    df['4_HOURS_MA'] = df['Close'].rolling(window=4, min_periods=1).mean()
    df['6_HOURS_MA'] = df['Close'].rolling(window=6, min_periods=1).mean()
    df['3_HOURS_STD_DEV'] = df['Close'].rolling(window=3, min_periods=1).std()
    return df

almac_value = add_features(almac_h)

# 특징 선택 및 정규화
features = almac_value[['H-L', 'O-C', '2_HOURS_MA', '4_HOURS_MA', '6_HOURS_MA', '3_HOURS_STD_DEV', 'Volume', 'Close']]
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_features = scaler.fit_transform(features)

# 시퀀스 데이터 생성
def create_sequences(data, seq_length):
    xs = []
    for i in range(len(data) - seq_length):
        x = data[i:(i + seq_length)]
        xs.append(x)
    return np.array(xs)

seq_length = 10
X_test = create_sequences(scaled_features, seq_length)

# 텐서로 변환
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)

# DataLoader 설정
test_data = TensorDataset(X_test_tensor)
test_loader = DataLoader(dataset=test_data, batch_size=1, shuffle=False)


In [73]:
def evaluate_model(model, loader):
    model.eval()
    predictions = []
    with torch.no_grad():
        for features in loader:
            outputs = model(features[0])  # DataLoader의 tuple 중 첫번째 요소가 features
            predicted = (torch.sigmoid(outputs) > 0.35).float()  # 예측 결과
            # 예측 결과의 배열을 반복하면서 각 원소를 int로 변환하여 리스트에 추가
            predictions.extend([int(pred.item()) for pred in predicted])
    return predictions

# 모델 평가
predictions = evaluate_model(model, test_loader)

# 결과 출력
print("Predictions:", predictions)


Predictions: [0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1]


In [74]:
# 이게 총 42개가 나오는지 확인해봐야함 ( 6 시간 x 7일 ) -> 야후 파이낸스 라이브러리 한계 때문에 빠진 데이터도 존재해서 잘 확인해야함 . 5/1일은 법정 공휴일이라서 7일
len(predictions)

42

In [75]:
# 임시 데이터프레임을 생성해야함. -> 모델의 예측값을 보관하기 위한 데이터프레임
temp = almac_m.groupby(['Date', 'Hour'])['Open'].sum().reset_index()
temp['prediction'] = predictions

predictions_df = temp[['Date','Hour','prediction']]

predictions_df_copy = predictions_df.copy()
predictions_df_copy['Date'] = pd.to_datetime(predictions_df_copy['Date'])
predictions_df_copy.set_index(['Date', 'Hour'], inplace=True)

In [77]:
predictions_df_copy

Unnamed: 0_level_0,Unnamed: 1_level_0,prediction
Date,Hour,Unnamed: 2_level_1
2024-04-24,9,0
2024-04-24,10,0
2024-04-24,11,0
2024-04-24,12,0
2024-04-24,13,0
2024-04-24,14,1
2024-04-25,9,1
2024-04-25,10,1
2024-04-25,11,1
2024-04-25,12,1


In [84]:
import pandas as pd

# predictions_df를 파라미터로 추가하여 모델의 예측값까지 고려

def apply_volatility_breakout_strategy(df, predictions_df):
    df['Hour'] = df.index.hour
    df['Date'] = df.index.date

    results = []  # 결과 저장 리스트
    k = 0.3  # 변동폭 계수

    grouped = df.groupby(['Date', 'Hour'])

    for (date, hour), group in grouped:
        if group.empty:
            continue

        # 이전 시간대 데이터를 기반으로 목표 매수 가격 설정
        if hour > 9:
            previous_group = grouped.get_group((date, hour - 1))
            if not previous_group.empty:
                high_price = previous_group['High'].max()
                low_price = previous_group['Low'].min()
                target_price = group['Open'][0] + (high_price - low_price) * k

                # 목표 가격 돌파 여부 확인 및 모델의 예측 값 확인
                # 마찬가지로 1분 단위 데이터라 정확히 언제 target_price를 넘었는지는 알 수 없어서 High가 넘은 행만 추출.
                buy_prices = group[group['High'] > target_price]
                prediction = predictions_df.loc[(pd.Timestamp(date), hour), 'prediction']

                if not buy_prices.empty and prediction == 1:
                    buy_price = buy_prices.iloc[0]['Open']  # 첫 돌파 가격에서 매수
                    sell_price = group.iloc[-1]['Close']  # 해당 시간 마지막 종가에서 매도
                    results.append((sell_price - buy_price) / buy_price * 100)  # 수익률 계산

    return results

# 전략 적용
trade_results = apply_volatility_breakout_strategy(almac_m, predictions_df_copy)
average_return = sum(trade_results) / len(trade_results) if trade_results else 0

print(f"Average Return: {round(average_return,3)} %")
print("Individual trade returns:", trade_results)


Average Return: 0.351 %
Individual trade returns: [0.38961038961038963, 0.13280212483399734, -0.40431266846361186, 0.2751031636863824, 0.136986301369863, 0.13477088948787064, 0.2691790040376851, -0.535475234270415, 0.5376344086021506, -0.3978779840848806, -0.2652519893899204, 0.13280212483399734, -0.13386880856760375, 0.0, 2.5974025974025974, 2.7455121436114043]
