In [None]:
pip install imbalanced-learn

In [None]:
pip install joblib

In [None]:
pip install tdqm

In [None]:
import numpy as np
import pandas as pd
import os
import logging

# TensorFlow 및 TPU 설정
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau

from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.impute import SimpleImputer
import joblib
import matplotlib.pyplot as plt
import time
import seaborn as sns

from tqdm import tqdm

In [None]:
from google.colab import drive
drive.mount('/content/drive')

######################

**메인 데이터 호출**
######################

In [None]:
# Load the dataset from Kaggle
file_path1 = '/content/drive/MyDrive/Data/SOL60_INDICATOR3'
data = pd.read_csv(file_path1)

# Display the first few rows of the dataset to ensure it is loaded correctly
data.head()

In [None]:
data.columns

In [None]:
# 사용하지 않을 열 제외
data = data.drop(columns=['Unnamed: 0.1'])

In [None]:
# 사용하지 않을 열 제외
data = data.drop(columns=['Unnamed: 0'])

######################

**메인 데이터 처리**
######################

In [None]:
# 데이터 전처리 함수
def preprocess_data(data):
    # 목표 변수 생성
    data['target'] = (data['max_return_60min'] >= 1.1).astype(int)

    # 특성과 목표 변수 분리
    X = data.drop(columns=['max_return_60min', 'min_return_60min', 'target'])
    y = data['target']

    # 무한대 값을 NaN으로 대체
    X.replace([np.inf, -np.inf], np.nan, inplace=True)

    # NaN 값을 평균으로 대체
    imputer = SimpleImputer(strategy='mean')
    X_imputed = imputer.fit_transform(X)

    # 데이터 정규화
    scaler = MinMaxScaler()
    X_scaled = scaler.fit_transform(X_imputed)

    return X_scaled, y

# 시계열 데이터 형태로 변환 함수
def create_sequences(data, target, sequence_length):
    sequences = []
    targets = []
    for i in range(len(data) - sequence_length + 1):
        seq = data[i:i + sequence_length]
        label = target[i + sequence_length - 1]
        sequences.append(seq)
        targets.append(label)
    return np.array(sequences), np.array(targets)

# 데이터 전처리
X_scaled, y = preprocess_data(data)

# 시퀀스 길이 설정
sequence_length = 60

# 데이터 길이 체크
if len(X_scaled) < sequence_length:
    raise ValueError(f"데이터 길이({len(X_scaled)})가 시퀀스 길이({sequence_length})보다 짧습니다.")

# 시퀀스 데이터 생성
y_array = y.values  # pandas Series를 numpy array로 변환
X_seq, y_seq = create_sequences(X_scaled, y_array, sequence_length)

# 생성된 시퀀스 데이터의 형태 확인
print(f"X_seq shape: {X_seq.shape}")
print(f"y_seq shape: {y_seq.shape}")

In [None]:
# 데이터 전처리 함수 V2
def preprocess_data(data):
    # 목표 변수 생성
    data['target'] = (data['max_return_60min'] >= 1.1).astype(int)

    # 피처 열만 선택
    feature_columns = ['ichimoku_conversion_9', 'ichimoku_conversion_200', 'supertrend_upper_14_2_10',
                       'supertrend_upper_10_3_20', 'bollinger_hband_200', 'volume_ma_100', 'ROC_30',
                       'open', 'high', 'supertrend_lower_10_3_20', 'obv', 'atr_50', 'volume_ma_200',
                       'Accumulation_Distribution_Line', 'bollinger_lband_20', 'lowerband', 'volume_ma_20',
                       'supertrend_lower_7_3_14', 'atr_14', 'disparity_index_100', 'price_ma_200',
                       'bollinger_lband_50', 'ichimoku_conversion_52', 'upperband', 'atr_20', 'price_ma_20',
                       'disparity_index_20', 'time', 'vwap', 'bollinger_lband_200', 'atr_10', 'MFI_40',
                       'volume_ma_10', 'supertrend_in_uptrend_7_3_14', 'Momentum_30', 'Momentum_20',
                       'supertrend_upper_20_4_50', 'bollinger_hband_100', 'MFI_50', 'CMO_50', 'close',
                       'Momentum_50', 'stoch_%k_21_5', 'supertrend_upper_7_3_14', 'bollinger_hband_50',
                       'Parabolic_SAR_0.02', 'bollinger_lband_100', 'stoch_%k_9_3', 'Williams_%R_30', 'CMO_40']
    # 특성과 목표 변수 분리
    X = data[feature_columns]
    y = data['target']

    # 무한대 값을 NaN으로 대체
    X.replace([np.inf, -np.inf], np.nan, inplace=True)

    # NaN 값을 평균으로 대체
    imputer = SimpleImputer(strategy='mean')
    X_imputed = imputer.fit_transform(X)

    # 데이터 정규화
    scaler = MinMaxScaler()
    X_scaled = scaler.fit_transform(X_imputed)

    return X_scaled, y

# 시계열 데이터 형태로 변환 함수
def create_sequences(data, target, sequence_length):
    sequences = []
    targets = []
    for i in range(len(data) - sequence_length + 1):
        seq = data[i:i + sequence_length]
        label = target[i + sequence_length - 1]
        sequences.append(seq)
        targets.append(label)
    return np.array(sequences), np.array(targets)

# 데이터 전처리
X_scaled, y = preprocess_data(data)

# 시퀀스 길이 설정
sequence_length = 60

# 데이터 길이 체크
if len(X_scaled) < sequence_length:
    raise ValueError(f"데이터 길이({len(X_scaled)})가 시퀀스 길이({sequence_length})보다 짧습니다.")

# 시퀀스 데이터 생성
y_array = y.values  # pandas Series를 numpy array로 변환
X_seq, y_seq = create_sequences(X_scaled, y_array, sequence_length)

# 생성된 시퀀스 데이터의 형태 확인
print(f"X_seq shape: {X_seq.shape}")
print(f"y_seq shape: {y_seq.shape}")

######################

**테스트 데이터 호출**
######################

In [None]:
# Load the dataset from Kaggle
file_path2 = '/content/drive/MyDrive/Data/SOL60_INDICATOR3'
data_test_tmp = pd.read_csv(file_path2)

# Display the first few rows of the dataset to ensure it is loaded correctly
data_test_tmp.head()

In [None]:
data_test_tmp.columns

In [None]:
# 사용하지 않을 열 제외
data_test_tmp = data_test_tmp.drop(columns=['Unnamed: 0.1'])

In [None]:
# 사용하지 않을 열 제외
data_test_tmp = data_test_tmp.drop(columns=['Unnamed: 0'])

In [None]:
#원형
data_test = data_test_tmp

######################

**테스트 데이터 분리**
######################

In [None]:
#12등분 => 1개월
def extract_last_twelfth(df):
    num_rows = len(df)
    twelfth_size = num_rows // 12
    start_index = 11 * twelfth_size
    end_index = num_rows
    last_twelfth = df.iloc[start_index:end_index]
    return last_twelfth

# 데이터프레임 12등분하여 마지막 등분 추출
data_test = extract_last_twelfth(data_test_tmp)
data_test.head()

In [None]:
#6등분 => 2개월
def extract_last_sixth(df):
    num_rows = len(df)
    sixth_size = num_rows // 6
    start_index = 5 * sixth_size
    end_index = num_rows
    last_sixth = df.iloc[start_index:end_index]
    return last_sixth

# 데이터프레임 6등분하여 마지막 등분 추출
data_test = extract_last_sixth(data_test_tmp)
data_test.head()

In [None]:
#4등분 => 3개월
def extract_last_fourth(df):
    num_rows = len(df)
    fourth_size = num_rows // 4
    start_index = 3 * fourth_size
    end_index = num_rows
    last_fourth = df.iloc[start_index:end_index]
    return last_fourth

# 데이터프레임 4등분하여 마지막 등분 추출
data_test = extract_last_fourth(data_test_tmp)
data_test.head()

In [None]:
#3등분 => 4개월
def extract_last_third(df):
    num_rows = len(df)
    third_size = num_rows // 3
    start_index = 2 * third_size
    end_index = num_rows
    last_third = df.iloc[start_index:end_index]
    return last_third

# 데이터프레임 3등분하여 마지막 등분 추출
data_test = extract_last_third(data_test_tmp)
data_test.head()

In [None]:
#12등분 => 5개월
def extract_last_five_parts(df):
    num_rows = len(df)
    part_size = num_rows // 12  # 각 등분의 크기 계산
    start_index = part_size * 7  # 맨 뒤 7등분의 시작 인덱스
    last_five_parts = df.iloc[start_index:num_rows]  # 시작 인덱스부터 끝까지 추출
    return last_five_parts

# 데이터프레임 2등분하여 마지막 등분 추출
data_test = extract_last_five_parts(data_test_tmp)
data_test.head()

In [None]:
#2등분 => 6개월
def extract_last_half(df):
    num_rows = len(df)
    half_size = num_rows // 2
    start_index = half_size
    end_index = num_rows
    last_half = df.iloc[start_index:end_index]
    return last_half

# 데이터프레임 2등분하여 마지막 등분 추출
data_test = extract_last_half(data_test_tmp)
data_test.head()

In [None]:
#12등분 => 7개월
def extract_last_five_parts(df):
    num_rows = len(df)
    part_size = num_rows // 12  # 각 등분의 크기 계산
    start_index = part_size * 5  # 맨 뒤 5등분의 시작 인덱스
    last_five_parts = df.iloc[start_index:num_rows]  # 시작 인덱스부터 끝까지 추출
    return last_five_parts

# 데이터프레임 2등분하여 마지막 등분 추출
data_test = extract_last_five_parts(data_test_tmp)
data_test.head()

######################

**테스트 데이터 처리**
######################

In [None]:
# open_time 열을 datetime 형식으로 변환
if not np.issubdtype(data_test['open_time'].dtype, np.datetime64):
    data_test['open_time'] = pd.to_datetime(data_test['open_time'])

# time 열을 분 단위로 변환
data_test['time'] = data_test['open_time'].dt.hour * 60 + data_test['open_time'].dt.minute

In [None]:
# 사용하지 않을 열 제외
data_test_predict = data_test.drop(columns=['open_time', 'max_return_60min', 'min_return_60min'])

In [None]:
# 사용하지 않을 열 제외 v2
# 피처 목록
features_to_keep = ['ichimoku_conversion_9', 'ichimoku_conversion_200', 'supertrend_upper_14_2_10',
                    'supertrend_upper_10_3_20', 'bollinger_hband_200', 'volume_ma_100', 'ROC_30',
                    'open', 'high', 'supertrend_lower_10_3_20', 'obv', 'atr_50', 'volume_ma_200',
                    'Accumulation_Distribution_Line', 'bollinger_lband_20', 'lowerband', 'volume_ma_20',
                    'supertrend_lower_7_3_14', 'atr_14', 'disparity_index_100', 'price_ma_200',
                    'bollinger_lband_50', 'ichimoku_conversion_52', 'upperband', 'atr_20', 'price_ma_20',
                    'disparity_index_20', 'time', 'vwap', 'bollinger_lband_200', 'atr_10', 'MFI_40',
                    'volume_ma_10', 'supertrend_in_uptrend_7_3_14', 'Momentum_30', 'Momentum_20',
                    'supertrend_upper_20_4_50', 'bollinger_hband_100', 'MFI_50', 'CMO_50', 'close',
                    'Momentum_50', 'stoch_%k_21_5', 'supertrend_upper_7_3_14', 'bollinger_hband_50',
                    'Parabolic_SAR_0.02', 'bollinger_lband_100', 'stoch_%k_9_3', 'Williams_%R_30', 'CMO_40']

# 피처들만 남기기
data_test_predict = data_test[features_to_keep]

In [None]:
# 무한대 값을 NaN으로 대체
data_test_predict.replace([np.inf, -np.inf], np.nan, inplace=True)

# NaN 값을 평균으로 대체
imputer = SimpleImputer(strategy='mean')
data_test_predict_imputed = imputer.fit_transform(data_test_predict)  # 같은 imputer 사용

# 데이터 정규화
scaler = MinMaxScaler()
data_test_predict_scaled = scaler.fit_transform(data_test_predict_imputed)  # 같은 scaler 사용

# 예측 데이터를 시퀀스 형태로 변환 (LSTM용)
def create_sequences_for_prediction(data, sequence_length):
    sequences = []
    for i in range(len(data) - sequence_length + 1):
        seq = data[i:i + sequence_length]
        sequences.append(seq)
    return np.array(sequences)

# 시퀀스 길이 설정
sequence_length = 60

# 예측용 시퀀스 데이터 생성
X_test_seq = create_sequences_for_prediction(data_test_predict_scaled, sequence_length)

######################

**Transformer 신규 학습**
######################

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

In [None]:
# 학습 데이터와 검증 데이터 분리
X_train, X_test, y_train, y_test = train_test_split(X_seq, y_seq, test_size=0.2, random_state=42)

# 데이터를 텐서로 변환
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32).unsqueeze(1)

# 데이터 로더 생성
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

In [None]:
# TransformerEncoder 모델 정의
class TransformerEncoderModel(nn.Module):
    def __init__(self, input_dim, nhead, num_layers, dim_feedforward, output_dim):
        super(TransformerEncoderModel, self).__init__()
        self.embedding = nn.Linear(input_dim, input_dim)
        encoder_layer = nn.TransformerEncoderLayer(d_model=input_dim, nhead=nhead, dim_feedforward=dim_feedforward, batch_first=True)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.fc = nn.Linear(input_dim, output_dim)

    def forward(self, src):
        src = self.embedding(src)
        output = self.transformer_encoder(src)
        output = self.fc(output[:, -1, :])  # Use the output from the last time step
        return output

# 모델 설정
input_dim = X_train.shape[2]
nhead = 2
num_layers = 2
dim_feedforward = 64
output_dim = 1

model = TransformerEncoderModel(input_dim, nhead, num_layers, dim_feedforward, output_dim)

# 손실 함수 및 옵티마이저 설정
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 학습률 감소 스케줄러 설정
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)

# 조기 종료 설정
patience = 5
best_val_loss = float('inf')
patience_counter = 0

# 학습 및 검증 손실을 저장할 리스트
train_losses = []
val_losses = []

# 모델 학습
num_epochs = 20  # 최대 에포크 수
for epoch in range(num_epochs):
    start_time = time.time()
    model.train()
    running_loss = 0.0
    for X_batch, y_batch in train_loader:
        optimizer.zero_grad()
        output = model(X_batch)
        loss = criterion(output, y_batch)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    avg_train_loss = running_loss / len(train_loader)
    train_losses.append(avg_train_loss)

    # 검증 단계
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            output = model(X_batch)
            loss = criterion(output, y_batch)
            val_loss += loss.item()

    avg_val_loss = val_loss / len(test_loader)
    val_losses.append(avg_val_loss)

    end_time = time.time()
    epoch_duration = end_time - start_time

    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}, Duration: {epoch_duration:.2f}s')

    # 최적의 모델 저장 및 조기 종료 조건 체크
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        patience_counter = 0
    else:
        patience_counter += 1

    if patience_counter >= patience:
        print("조기 종료 조건 충족. 학습을 중지합니다.")
        break

    # 학습률 감소
    scheduler.step()

# 학습 및 검증 손실 시각화
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

# 모델 평가
model.eval()
with torch.no_grad():
    y_true = []
    y_pred = []
    for X_batch, y_batch in test_loader:
        output = model(X_batch)
        y_true.extend(y_batch.tolist())
        y_pred.extend(torch.sigmoid(output).squeeze().tolist())

# 이진 분류 결과를 위한 평가 지표 계산
y_pred = np.array(y_pred) > 0.5
y_true = y_test_tensor.numpy()

accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred)
recall = recall_score(y_true, y_pred)
f1 = f1_score(y_true, y_pred)

print(f'Accuracy: {accuracy:.4f}')
print(f'Precision: {precision:.4f}')
print(f'Recall: {recall:.4f}')
print(f'F1 Score: {f1:.4f}')

In [None]:
# 모델 저장 경로
model_path = '/content/drive/MyDrive/Data/Model/SOL60_SMALLL_INDICATOR3_Transformer.pth'

# 모델 상태 저장
torch.save(model.state_dict(), model_path)
print(f"Model saved to {model_path}")

######################

**Transformer추가 학습**
######################

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

In [None]:
# TransformerEncoder 모델 정의
class TransformerEncoderModel(nn.Module):
    def __init__(self, input_dim, nhead, num_layers, dim_feedforward, output_dim):
        super(TransformerEncoderModel, self).__init__()
        self.embedding = nn.Linear(input_dim, input_dim)
        encoder_layer = nn.TransformerEncoderLayer(d_model=input_dim, nhead=nhead, dim_feedforward=dim_feedforward, batch_first=True)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.fc = nn.Linear(input_dim, output_dim)

    def forward(self, src):
        src = self.embedding(src)
        output = self.transformer_encoder(src)
        output = self.fc(output[:, -1, :])  # Use the output from the last time step
        return output

# 입력 차원 확인 및 설정
input_dim = X_train.shape[2]
print(f"Input dimension: {input_dim}")

# 모델 설정 (로드할 때 필요)
nhead = 2
num_layers = 2
dim_feedforward = 64
output_dim = 1

model = TransformerEncoderModel(input_dim, nhead, num_layers, dim_feedforward, output_dim)

# 모델 로드
model_path = '/kaggle/input/lstm_indiactor3/pytorch/tcn_transformer/6/SOL60_SMALLL_INDICATOR3_Transformer_v3.pth'
model.load_state_dict(torch.load(model_path))
model.train()  # 추가 학습을 위해 학습 모드로 전환
print(f"Model loaded from {model_path}")

In [None]:
# 학습 데이터와 검증 데이터 분리
X_train, X_test, y_train, y_test = train_test_split(X_seq, y_seq, test_size=0.2, random_state=42)

# 데이터를 텐서로 변환
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32).unsqueeze(1)

# 데이터 로더 생성
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

In [None]:
# 손실 함수 및 옵티마이저 설정
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 조기 종료 설정
patience = 5
best_loss = float('inf')
patience_counter = 0

# 학습 및 검증 손실을 저장할 리스트
train_losses = []
val_losses = []

# 추가 학습
num_epochs = 100  # 최대 에포크 수
for epoch in range(num_epochs):
    # 학습 단계
    model.train()
    running_loss = 0.0
    for X_batch, y_batch in train_loader:
        optimizer.zero_grad()
        output = model(X_batch)
        loss = criterion(output, y_batch)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    avg_train_loss = running_loss / len(train_loader)
    train_losses.append(avg_train_loss)

    # 검증 단계
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            output = model(X_batch)
            loss = criterion(output, y_batch)
            val_loss += loss.item()

    avg_val_loss = val_loss / len(test_loader)
    val_losses.append(avg_val_loss)

    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}')

    # 조기 종료 조건 체크
    if avg_val_loss < best_loss:
        best_loss = avg_val_loss
        patience_counter = 0
    else:
        patience_counter += 1

    if patience_counter >= patience:
        print("조기 종료 조건 충족. 학습을 중지합니다.")
        break

# 학습 및 검증 손실 시각화
import matplotlib.pyplot as plt

plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

# 모델 평가
model.eval()
with torch.no_grad():
    y_true = []
    y_pred = []
    for X_batch, y_batch in test_loader:
        output = model(X_batch)
        y_true.extend(y_batch.tolist())
        y_pred.extend(torch.sigmoid(output).squeeze().tolist())

# 이진 분류 결과를 위한 평가 지표 계산
y_pred = np.array(y_pred) > 0.5
y_true = y_test_tensor.numpy()

accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred)
recall = recall_score(y_true, y_pred)
f1 = f1_score(y_true, y_pred)

print(f'Accuracy: {accuracy:.4f}')
print(f'Precision: {precision:.4f}')
print(f'Recall: {recall:.4f}')
print(f'F1 Score: {f1:.4f}')

In [None]:
# 모델 저장 경로
model_path = '/content/drive/MyDrive/Data/Model/SOL60_SMALLL_INDICATOR3_Transformer_v2.pth'

# 모델 상태 저장
torch.save(model.state_dict(), model_path)
print(f"Model saved to {model_path}")

######################

**Transformer 모델 로드**
######################

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

In [None]:
# 모델 정의 (로드할 때 필요)
class TransformerEncoderModel(nn.Module):
    def __init__(self, input_dim, nhead, num_layers, dim_feedforward, output_dim):
        super(TransformerEncoderModel, self).__init__()
        self.embedding = nn.Linear(input_dim, input_dim)
        encoder_layer = nn.TransformerEncoderLayer(d_model=input_dim, nhead=nhead, dim_feedforward=dim_feedforward, batch_first=True)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.fc = nn.Linear(input_dim, output_dim)

    def forward(self, src):
        src = self.embedding(src)
        output = self.transformer_encoder(src)
        output = self.fc(output[:, -1, :])  # Use the output from the last time step
        return output

# 입력 차원 확인 및 설정
input_dim = X_test_seq.shape[2]
print(f"Input dimension: {input_dim}")

# 모델 설정 (로드할 때 필요)
nhead = 2
num_layers = 2
dim_feedforward = 64
output_dim = 1

model = TransformerEncoderModel(input_dim, nhead, num_layers, dim_feedforward, output_dim)

# 모델 로드
model_path = '/kaggle/input/lstm_indiactor3/pytorch/tcn_transformer/5/SOL60_SMALLL_INDICATOR3_Transformer_v2.pth'
model.load_state_dict(torch.load(model_path))
model.eval()
print(f"Model loaded from {model_path}")

In [None]:
X_test_tensor = torch.tensor(X_test_seq, dtype=torch.float32)

######################

**Transformer 모델 테스트**
######################

In [None]:
#2등분
def extract_last_half(df):
    num_rows = len(df)
    half_size = num_rows // 2
    start_index = half_size
    end_index = num_rows
    last_half = df.iloc[start_index:end_index]
    return last_half

# 데이터프레임 2등분하여 마지막 등분 추출
data_test = extract_last_half(data_test)
data_test.head()

In [None]:
#3등분
def extract_last_third(df):
    num_rows = len(df)
    third_size = num_rows // 3
    start_index = 2 * third_size
    end_index = num_rows
    last_third = df.iloc[start_index:end_index]
    return last_third

# 데이터프레임 3등분하여 마지막 등분 추출
data_test = extract_last_third(data_test)
data_test.head()

In [None]:
# 4등분하여 마지막 등분을 추출하는 함수
def extract_last_quarter(df):
    num_rows = len(df)
    quarter_size = num_rows // 4
    start_index = 3 * quarter_size
    end_index = num_rows
    last_quarter = df.iloc[start_index:end_index]
    return last_quarter

# 데이터프레임 4등분하여 마지막 등분 추출
data_test = extract_last_quarter(data_test)
data_test.head()

In [None]:
# 5등분하여 마지막 등분을 추출하는 함수
def extract_last_sixth(df):
    num_rows = len(df)
    fifth_size = num_rows // 5
    start_index = 4 * fifth_size
    end_index = num_rows
    last_sixth = df.iloc[start_index:end_index]
    return last_sixth

# 데이터프레임 5등분하여 마지막 등분 추출
data_test = extract_last_sixth(data_test)
data_test.head()

In [None]:
# 6등분하여 마지막 등분을 추출하는 함수
def extract_last_sixth(df):
    num_rows = len(df)
    sixth_size = num_rows // 6
    start_index = 5 * sixth_size
    end_index = num_rows
    last_sixth = df.iloc[start_index:end_index]
    return last_sixth

# 데이터프레임 6등분하여 마지막 등분 추출
data_test = extract_last_sixth(data_test)
data_test.head()

In [None]:
#12등분 => 1개월
def extract_last_twelfth(df):
    num_rows = len(df)
    twelfth_size = num_rows // 12
    start_index = 11 * twelfth_size
    end_index = num_rows
    last_twelfth = df.iloc[start_index:end_index]
    return last_twelfth

# 데이터프레임 12등분하여 마지막 등분 추출
data_test = extract_last_twelfth(data_test)
data_test.head()

In [None]:
# 7등분하여 마지막 등분을 추출하는 함수
def extract_last_sixth(df):
    num_rows = len(df)
    sixth_size = num_rows // 7
    start_index = 6 * sixth_size
    end_index = num_rows
    last_sixth = df.iloc[start_index:end_index]
    return last_sixth

# 데이터프레임 6등분하여 마지막 등분 추출
data_test = extract_last_sixth(data_test)
data_test.head()

In [None]:
# sequence_length를 사용하여 X_test_seq의 shape를 맞춤
sequence_length = X_test_seq.shape[1]
X_test_tensor = torch.tensor(X_test_seq, dtype=torch.float32).view(-1, sequence_length, input_dim)

# 예측 수행
with torch.no_grad():
    predictions = torch.sigmoid(model(X_test_tensor)).squeeze().numpy()

# 예측 결과를 이진 분류로 변환 (0 또는 1)
predictions = (predictions > 0.5).astype(int)

# 예측 결과를 데이터프레임에 추가
data_test['prediction_Transformer'] = np.nan  # 예측 결과를 담을 열을 초기화
data_test.iloc[sequence_length - 1:sequence_length - 1 + len(predictions), data_test.columns.get_loc('prediction_Transformer')] = predictions

# 결과 확인
print(data_test[['open_time', 'prediction_Transformer']].head())

In [None]:
data_test_non_nan = data_test.dropna(subset=['prediction_Transformer'])

# max_return_60min 기준으로 내림차순 정렬
data_test_sorted = data_test_non_nan.sort_values(by='max_return_60min', ascending=False)

# 결과 출력
data_test_sorted

In [None]:
# 'max_return_60min' 값이 1 이상이고 'prediction' 값이 0인 데이터의 개수
count_max_return_ge_1_prediction_0 = len(data_test[(data_test['max_return_60min'] >= 1.1) & (data_test['prediction_Transformer'] == 1)])

# 'max_return_60min' 값이 1 미만이고 'prediction' 값이 1인 데이터의 개수
count_max_return_lt_1_prediction_1 = len(data_test[(data_test['max_return_60min'] < 1.1) & (data_test['prediction_Transformer'] == 0)])

print(f"max_return_60min이 1 이상인데 prediction이 0인 데이터의 비율: {count_max_return_ge_1_prediction_0/len(data_test)*100}")
print(f"max_return_60min이 1 미만인데 prediction이 1인 데이터의 비율: {count_max_return_lt_1_prediction_1/len(data_test)*100}")

######################

**Transformer 모델 시간대 테스트**
######################

In [None]:
# 조건을 만족하는 데이터 필터링
filtered_data = data_test[(data_test['max_return_60min'] >= 1.1) & (data_test['prediction_Transformer'] == 0)]

# min_return_60min 값의 분포를 히스토그램으로 시각화
plt.figure(figsize=(10, 6))
plt.hist(filtered_data['min_return_60min'], bins=20, color='skyblue', edgecolor='black')
plt.title('Distribution of min_return_60min for max_return_60min >= 1.1 and prediction_Transformer == 0')
plt.xlabel('min_return_60min')
plt.ylabel('Frequency')
plt.grid(True)
plt.show()

In [None]:
# 함수 정의: 각 행에 대해 60행 이내의 high와 low 값을 비교하여 시각을 찾는 함수
def find_high_low_times(df, window=60):
    max_high_times = []
    min_low_times = []

    for i in range(len(df)):
        # 현재 행부터 60행 이내의 데이터를 선택
        subset = df.iloc[i:i+window]

        # 최대 high와 최소 low 값을 찾음
        max_high_idx = subset['high'].idxmax()
        min_low_idx = subset['low'].idxmin()

        # 해당 인덱스의 시간을 저장
        max_high_time = df.at[max_high_idx, 'open_time']
        min_low_time = df.at[min_low_idx, 'open_time']

        max_high_times.append(max_high_time)
        min_low_times.append(min_low_time)

    df['max_high_time'] = max_high_times
    df['min_low_time'] = min_low_times

    return df

# 함수 호출
data_test = find_high_low_times(data_test)
data_test

In [None]:
# 조건에 맞는 새로운 열 추가
def add_condition_column(df):
    # 'max_high_time'과 'min_low_time'이 datetime 형식인지 확인하고 변환
    df['max_high_time'] = pd.to_datetime(df['max_high_time'])
    df['min_low_time'] = pd.to_datetime(df['min_low_time'])

    # 조건에 맞는 행 필터링 (max_return_60min >= 1.1 및 prediction_Transformer == 1)
    filtered_df = df[(df['max_return_60min'] >= 1.1) & (df['prediction_Transformer'] == 1)].copy()

    # 새로운 열 추가 및 초기화
    filtered_df['condition'] = 0

    # 조건을 만족하는 경우 condition 값을 설정
    condition_indices = filtered_df.index[filtered_df['max_high_time'] < filtered_df['min_low_time']]

    filtered_df.loc[condition_indices, 'condition'] = 1

    return filtered_df

# 데이터 타입 변환
data_test['max_high_time'] = pd.to_datetime(data_test['max_high_time'])
data_test['min_low_time'] = pd.to_datetime(data_test['min_low_time'])

# 조건 열 추가
filtered_data_test = add_condition_column(data_test)

# 결과 확인
filtered_data_test

In [None]:
condition_1_data = filtered_data_test[filtered_data_test['condition'] == 0]
condition_1_data

In [None]:
# max_high_time과 min_low_time의 차이를 분으로 계산하여 새로운 열 추가
condition_1_data['time_difference_minutes'] = (condition_1_data['max_high_time'] - condition_1_data['min_low_time']).dt.total_seconds() / 60
condition_1_data

In [None]:
#min_low_time과 open_time의 차이를 분으로 계산하여 새로운 열 추가
condition_1_data['min_low_open_difference_minutes'] = (condition_1_data['max_high_time'] - condition_1_data['open_time']).dt.total_seconds() / 60
condition_1_data

In [None]:
# time_difference_minutes의 최대, 최소, 최빈, 평균값 계산
time_difference_max = condition_1_data['min_low_open_difference_minutes'].max()
time_difference_min = condition_1_data['min_low_open_difference_minutes'].min()
time_difference_mode = condition_1_data['min_low_open_difference_minutes'].mode()[0]  # 최빈값이 여러 개일 경우 첫 번째 값 선택
time_difference_mean = condition_1_data['min_low_open_difference_minutes'].mean()

# 결과 출력
time_difference_stats = {
    'max': time_difference_max,
    'min': time_difference_min,
    'mode': time_difference_mode, #최빈
    'mean': time_difference_mean # 평균
}

time_difference_stats

In [None]:
# min_return_60min이 -1 이하인 행들 필터링
negative_return_data = condition_1_data[condition_1_data['min_return_60min'] <= -1]

# time_difference_minutes의 최대, 최소, 최빈, 평균값 계산
time_difference_max = negative_return_data['time_difference_minutes'].max()
time_difference_min = negative_return_data['time_difference_minutes'].min()
time_difference_mode = negative_return_data['time_difference_minutes'].mode()[0]  # 최빈값이 여러 개일 경우 첫 번째 값 선택
time_difference_mean = negative_return_data['time_difference_minutes'].mean()

# 결과 출력
time_difference_stats = {
    'max': time_difference_max,
    'min': time_difference_min,
    'mode': time_difference_mode,  # 최빈값
    'mean': time_difference_mean   # 평균값
}

time_difference_stats