# 실행 준비

In [None]:
#from google.colab import drive
#drive.mount('/content/drive')

In [2]:
import pandas as pd

In [None]:
file_path = '/content/daily_all.csv'
df = pd.read_csv(file_path, engine='pyarrow')

In [None]:
df = pd.read_csv("daily_all.csv")

In [None]:
df

# 데이터셋 로드

- 피처 선택

In [None]:
company_name = "AMZN"  # 예측할 회사 선택

# 데이터 선택: 종가는 모든 회사에 대해, 거래량과 감정분석은 예측할 회사만
data = df[[f'prccd_{company}' for company in ['TSLA', 'NVDA', 'MSFT', 'GOOG', 'AAPL', 'DIS', 'XOM', 'CRM', 'INTC', 'AMZN']] +
          [f'cshtrd_{company_name}', f'sent_{company_name}', 'datadate']].copy()
data.set_index('datadate', inplace=True)

# 예측 회사의 상승/하락 결과 (1: 상승, 0: 하락) : GAT에서 사용 X
# data.loc[:, 'y'] = (df[f'prccd_{company_name}'] > df[f'prccd_{company_name}'].shift(1)).astype(int)
# data = data[1:] # 첫 번째 행은 상승/하락 정보를 알 수 없으므로 제거

# 감정분석 결측값을 0으로 채움
data.fillna(0, inplace=True)  # 기존 sent에는 NaN이 너무 많음; 임베딩도 NaN으로 출력됨

# 결과 출력
data

In [None]:
data.isna().sum()

# GAT

- 노드 : 각 시점
- 피처 : 10개 주식 종가 + 예측 기업 감정분석
- GAT의 역할 : 피처들의 관계(회사 간의 관계 등)을 파악해 **시점별** 임베딩 생성;  다른 시점과의 연관성을 반영    
(ex. 1~10일 전과 연결이 되어있는 상태에서, 1일 전 정보는 얼마나 중요하고 10일 전 정보는 얼마나 중요한지 판단)

In [2]:
# PyTorch Geometric 설치
!pip install torch-geometric
!pip install torch-scatter torch-sparse torch-cluster torch-spline-conv -f https://data.pyg.org/whl/torch-2.0.0+cu111.html


In [1]:
# import 문
import torch
import torch.nn as nn
from torch_geometric.nn import GATv2Conv

In [None]:
# GAT 레이어 정의
class GATLayer(nn.Module):
    def __init__(self, in_features, out_features, heads=4):
        super().__init__()
        self.gat = GATv2Conv(in_features, out_features, heads=heads, concat=False) # 선형 변환 : concat이 True이면 결합, False이면 평균
                                                                                   # GATv2Conv 내부에서 어텐션 계산이 수행됨

    def forward(self, x, edge_index):
        return self.gat(x, edge_index)

In [None]:
# 데이터 준비
node_features = data.values  # shape: [날짜 수, feature 수]
n_nodes = node_features.shape[0] # 노드 수 (==날짜 수)

# 그래프 형태로 변환 : 엣지 생성 (시점 간 연결)
edge_list = []
for i in range(n_nodes):
    for j in [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]: # 이후 시점들에 단방향 연결; (휴장 같은 것은 생각하지 않음.... 시점 기준)
        if i + j < n_nodes:
            edge_list.append([i, i + j])
edge_index = torch.tensor(edge_list).t()

In [None]:
edge_index # 출력 결과의 첫째행이 출발노드, 둘째행이 도착노드!

In [None]:
# GAT 입력 형태로 변환
x = torch.tensor(node_features, dtype=torch.float)

In [None]:
# GAT 모델 생성 및 실행
gat_model = GATLayer(in_features=data.shape[1], out_features=8) # 결과: 각 시점(노드)에 대한 (output_features)차원 임베딩
embeddings = gat_model(x, edge_index)  # shape: [날짜 수, 설정한 out_features 수]

In [None]:
embeddings

# csv로 저장

In [None]:
embeddings_np = embeddings.detach().cpu().numpy()

# DataFrame으로 변환
df_embeddings = pd.DataFrame(embeddings_np)

# data의 인덱스(datadate)를 가져와서 추가 (data는 이미 set_index로 datadate를 인덱스로 설정했음)
df_embeddings['datadate'] = data.index.values

# 컬럼 이름 설정
column_names = [f'emb_{i}' for i in range(embeddings_np.shape[1])]
df_embeddings.columns = column_names + ['datadate']

# datadate를 첫 번째 컬럼으로 이동
df_embeddings = df_embeddings[['datadate'] + column_names]

# 결과 확인
print("Embeddings with datadate:")
print(df_embeddings.head())

In [None]:
# CSV로 저장
df_embeddings.to_csv(f'embeddings_{company_name}.csv', index=False)

In [None]:
!pip install optuna

###TCN 시작

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
import optuna
# --- 유틸 함수 ---
def get_conv1d(in_channels, out_channels, kernel_size, stride, padding, dilation, groups, bias):
    return nn.Conv1d(in_channels=in_channels, out_channels=out_channels,
                     kernel_size=kernel_size, stride=stride,
                     padding=padding, dilation=dilation,
                     groups=groups, bias=bias)

def get_bn(channels):
    return nn.BatchNorm1d(channels)

def conv_bn(in_channels, out_channels, kernel_size, stride, padding, groups, dilation=1, bias=False):
    if padding is None:
        padding = kernel_size // 2
    result = nn.Sequential()
    result.add_module('conv', get_conv1d(in_channels, out_channels, kernel_size, stride, padding, dilation, groups, bias))
    result.add_module('bn', get_bn(out_channels))
    return result

# --- RevIN ---
class RevIN(nn.Module):
    def __init__(self, num_features: int, eps=1e-5, affine=True, subtract_last=False):
        super().__init__()
        self.num_features = num_features
        self.eps = eps
        self.affine = affine
        self.subtract_last = subtract_last
        if self.affine:
            self._init_params()

    def _init_params(self):
        self.affine_weight = nn.Parameter(torch.ones(self.num_features))
        self.affine_bias = nn.Parameter(torch.zeros(self.num_features))

    def forward(self, x, mode: str):
        if mode == 'norm':
            self._get_statistics(x)
            x = self._normalize(x)
        elif mode == 'denorm':
            x = self._denormalize(x)
        return x

    def _get_statistics(self, x):
        dim2reduce = tuple(range(1, x.ndim - 1))
        if self.subtract_last:
            self.last = x[:, -1:, :].unsqueeze(1)
        else:
            self.mean = torch.mean(x, dim=dim2reduce, keepdim=True).detach()
        self.stdev = torch.sqrt(torch.var(x, dim=dim2reduce, keepdim=True, unbiased=False) + self.eps).detach()

    def _normalize(self, x):
        x = (x - self.mean) / self.stdev
        if self.affine:
            x = x * self.affine_weight[None, None, :] + self.affine_bias[None, None, :]
        return x

    def _denormalize(self, x):
        if self.affine:
            x = (x - self.affine_bias[None, None, :]) / self.affine_weight[None, None, :]
        x = x * self.stdev + self.mean
        return x

# --- 시계열 분해 ---
class moving_avg(nn.Module):
    def __init__(self, kernel_size, stride):
        super().__init__()
        self.kernel_size = kernel_size
        self.avg = nn.AvgPool1d(kernel_size=kernel_size, stride=stride, padding=0)

    def forward(self, x):
        front = x[:, 0:1, :].repeat(1, (self.kernel_size - 1) // 2, 1)
        end = x[:, -1:, :].repeat(1, (self.kernel_size - 1) // 2, 1)
        x = torch.cat([front, x, end], dim=1)
        x = self.avg(x.permute(0, 2, 1))
        return x.permute(0, 2, 1)

class series_decomp(nn.Module):
    def __init__(self, kernel_size):
        super().__init__()
        self.moving_avg = moving_avg(kernel_size, stride=1)

    def forward(self, x):
        moving_mean = self.moving_avg(x)
        return x - moving_mean, moving_mean

# --- 커스텀 커널 ---
class ReparamLargeKernelConv(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride, groups, small_kernel, small_kernel_merged=False):
        super().__init__()
        self.kernel_size = kernel_size
        self.small_kernel = small_kernel
        padding = kernel_size // 2
        if small_kernel_merged:
            self.lkb_reparam = nn.Conv1d(in_channels, out_channels, kernel_size, stride, padding, groups=groups, bias=True)
        else:
            self.lkb_origin = conv_bn(in_channels, out_channels, kernel_size, stride, padding, groups)
            if small_kernel is not None:
                self.small_conv = conv_bn(in_channels, out_channels, small_kernel, stride, small_kernel // 2, groups)

    def forward(self, x):
        if hasattr(self, 'lkb_reparam'):
            return self.lkb_reparam(x)
        out = self.lkb_origin(x)
        if hasattr(self, 'small_conv'):
            out += self.small_conv(x)
        return out

# --- 출력층 ---
class Flatten_Head(nn.Module):
    def __init__(self, d_model):
        super().__init__()
        self.linear = nn.Linear(d_model, 1)

    def forward(self, x):         # x: [B, C, T]
        x = x.permute(0, 2, 1)    # → [B, T, C]
        x = self.linear(x)        # → [B, T, 1]
        return x.squeeze(-1)     # → [B, T]

# --- ModernTCN 모델 ---
class ModernTCN(nn.Module):
    def __init__(self, configs):
        super().__init__()
        self.revin = RevIN(configs.enc_in, affine=configs.affine) if configs.revin else None
        self.decomp = series_decomp(configs.kernel_size) if configs.decomposition else None

        self.conv_layers = nn.ModuleList()
        self.norm_layers = nn.ModuleList()

        c_in = configs.enc_in
        for i in range(len(configs.dims)):
            conv = ReparamLargeKernelConv(c_in, configs.dims[i],
                                          kernel_size=configs.large_size[i],
                                          stride=1,
                                          groups=1,
                                          small_kernel=configs.small_size[i],
                                          small_kernel_merged=configs.small_kernel_merged)
            self.conv_layers.append(conv)
            self.norm_layers.append(nn.BatchNorm1d(configs.dims[i]))
            c_in = configs.dims[i]

        self.head = Flatten_Head(configs.dims[-1])

    def forward(self, x):  # x: [B, T, C]
        if self.revin:
            x = self.revin(x, 'norm')
        if self.decomp:
            x, _ = self.decomp(x)
        x = x.permute(0, 2, 1)  # [B, C, T]
        for conv, norm in zip(self.conv_layers, self.norm_layers):
            x = conv(x)
            x = norm(x)
            x = F.relu(x)
        out = self.head(x)  # [B, T]
        return out

# --- Config 클래스 ---
class Configs:
    def __init__(self, enc_in):
        self.enc_in = enc_in
        self.dims = [8,16, 32]
        self.large_size = [5, 5, 3]
        self.small_size = [5, 3, 3]
        self.small_kernel_merged = False
        self.dropout = 0.1
        self.head_dropout = 0.2
        self.revin = True
        self.affine = True
        self.decomposition = True
        self.kernel_size = 25


###학습 및 OPTUNA

In [None]:
import matplotlib.pyplot as plt
from sklearn.metrics import f1_score # Import f1_score
def train_model(model, X_train, y_train, X_val, y_val, epochs=30, lr=1e-3,pos_weight=None):
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    #criterion = nn.BCEWithLogitsLoss()
    criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight) if pos_weight is not None else nn.BCEWithLogitsLoss()
    train_losses, val_losses, val_accs = [], [], []

    for epoch in range(epochs):
        # 1. 학습 단계
        model.train()
        optimizer.zero_grad()
        train_output = model(X_train).squeeze(0)  # → [T]
        train_output = train_output.detach().clone().requires_grad_(True)  # detach + clone + requires_grad 복구
        loss = criterion(train_output, y_train.float())
        loss.backward()
        optimizer.step()

        train_losses.append(loss.item())

        # 2. 검증 단계
        model.eval()
        with torch.no_grad():
            val_output = model(X_val).squeeze(0)
            val_loss = criterion(val_output, y_val.float()).item()
            pred = (torch.sigmoid(val_output) > 0.5).int()
            acc = (pred == y_val).float().mean().item()

        val_losses.append(val_loss)
        val_accs.append(acc)

        print(f"[{epoch+1}/{epochs}] Train Loss: {loss.item():.4f}, Val Loss: {val_loss:.4f}, Val Acc: {acc:.4f}")

    return train_losses, val_losses, val_accs

In [None]:
# Config class 내부를 trial 기반으로 생성
def objective(trial):
    dims = [
        trial.suggest_categorical("dim1", [8, 16, 32, 64]),
        trial.suggest_categorical("dim2", [16, 32, 64, 128]),
        trial.suggest_categorical("dim3", [32, 64, 128, 256])
    ]
    large_size = [
        trial.suggest_categorical("k1", [3, 5, 7, 9, 11]),
        trial.suggest_categorical("k2", [3, 5, 7, 9]),
        trial.suggest_categorical("k3", [3, 5, 7])
    ]
    small_size = [
        trial.suggest_categorical("s1", [1, 3, 5]),
        trial.suggest_categorical("s2", [1, 3]),
        trial.suggest_categorical("s3", [1, 3])
    ]
    dropout = trial.suggest_float("dropout", 0.0, 0.3)
    head_dropout = trial.suggest_float("head_dropout", 0.0, 0.3)
    kernel_size = trial.suggest_categorical("kernel_size", [5, 11, 15, 25, 31])
    decomposition = trial.suggest_categorical("decomposition", [True, False])
    revin = trial.suggest_categorical("revin", [True, False])
    affine = trial.suggest_categorical("affine", [True, False])

    class TrialConfig:
        def __init__(self):
            self.enc_in = X_train.shape[2]
            self.dims = dims
            self.large_size = large_size
            self.small_size = small_size
            self.small_kernel_merged = False
            self.dropout = dropout
            self.head_dropout = head_dropout
            self.revin = revin
            self.affine = affine
            self.decomposition = decomposition
            self.kernel_size = kernel_size

    model = ModernTCN(TrialConfig())
    #_, _, val_accs = train_model(model, X_train, y_train, X_val, y_val, epochs=15)
    #return max(val_accs)  # Accuracy 기준 최적화

   #pos_weight 계산 (불균형 데이터 보정)
    pos_weight = torch.tensor([(y_train == 0).sum() / (y_train == 1).sum()]).to(y_train.device)

    #모델 학습 (loss에 pos_weight 반영됨)
    train_model(model, X_train, y_train, X_val, y_val, epochs=15, pos_weight=pos_weight)

    model.eval()
    with torch.no_grad():
        pred = model(X_val).squeeze(0)
        probs = torch.sigmoid(pred).cpu().numpy()
        preds = (probs > 0.5).astype(int)

    y_true = y_val.cpu().numpy()
    return f1_score(y_true, preds)


# Optuna 튜닝 실행
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=50)

# 최적 하이퍼파라미터 출력
print("✅ Best Trial:")
print(study.best_trial.params)
best_params = study.best_trial.params

###시각화 코드

In [None]:
def visualize_training(train_losses, val_losses, val_accs):
    plt.figure(figsize=(12,4))
    plt.subplot(1,2,1)
    plt.plot(train_losses, label='Train Loss')
    plt.plot(val_losses, label='Val Loss')
    plt.legend(); plt.title("Loss over Epochs")

    plt.subplot(1,2,2)
    plt.plot(val_accs, label='Val Accuracy')
    plt.legend(); plt.title("Validation Accuracy")
    plt.show()

def visualize_prediction(pred_probs, true_labels):
    plt.figure(figsize=(10,4))
    plt.plot(true_labels, label='True')
    plt.plot(pred_probs, label='Pred (sigmoid)', alpha=0.7)
    plt.legend(); plt.title("Prediction vs True")
    plt.show()

def visualize_cumulative_return(pred_probs, true_labels, prices):
    signal = (pred_probs > 0.5).astype(int)
    returns = (prices[1:] / prices[:-1]) - 1
    strategy_returns = returns * signal[:-1]  # 예측한 시점의 다음날 수익

    cumulative = (strategy_returns + 1).cumprod()
    market = (returns + 1).cumprod()

    plt.plot(cumulative, label='Strategy')
    plt.plot(market, label='Market (buy & hold)')
    plt.legend(); plt.title("Cumulative Return")
    plt.show()


In [None]:
import numpy as np
import random
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    torch.use_deterministic_algorithms(True)
set_seed(42)

# 1. 라벨 생성
close_prices = data[f'prccd_{company_name}'].values
returns = (close_prices[1:] / close_prices[:-1]) - 1
labels = np.where(returns > 0.003, 1, 0)  # 0.3% 초과만 1로
labels = torch.tensor(labels, dtype=torch.float32)

# 2. GAT 임베딩 → TCN 입력 형태로 변환
embeddings = embeddings[:-1]  # 라벨과 길이 맞춤
tcn_input = embeddings.unsqueeze(0)  # [1, T, C]

# ✅ 3. 길이 맞춰주기 (가장 중요)
min_len = min(tcn_input.shape[1], labels.shape[0])
tcn_input = tcn_input[:, :min_len, :]
labels = labels[:min_len]

# 3. 학습/검증 데이터 분할
seq_len = tcn_input.shape[1]
split = int(seq_len * 0.8)
X_train = tcn_input[:, :split, :]
X_val   = tcn_input[:, split:, :]
y_train = labels[:split]
y_val   = labels[split:]

class BestConfig:
    def __init__(self):
        self.enc_in = X_train.shape[2]
        self.dims = [best_params['dim1'], best_params['dim2'], best_params['dim3']]
        self.large_size = [best_params['k1'], best_params['k2'], best_params['k3']]
        self.small_size = [best_params['s1'], best_params['s2'], best_params['s3']]
        self.small_kernel_merged = False
        self.dropout = best_params['dropout']
        self.head_dropout = best_params['head_dropout']
        self.revin = best_params['revin']
        self.affine = best_params['affine']
        self.decomposition = best_params['decomposition']
        self.kernel_size = best_params['kernel_size']

# 4. 모델 생성 및 학습
model = ModernTCN(BestConfig())

train_losses, val_losses, val_accs = train_model(model,
X_train, y_train, X_val, y_val, epochs=100,lr=1e-2)

# 5. 예측 및 시각화
model.eval()
with torch.no_grad():
    pred_logits = model(X_val).squeeze(0)  # [1, T] → [T]
    pred_probs = torch.sigmoid(pred_logits).cpu().numpy()
    pred_labels = (pred_probs > 0.5).astype(int)


from sklearn.metrics import precision_recall_curve, f1_score

"""
# 1. 실제 라벨
true_labels = y_val.cpu().numpy()

# 2. 다양한 threshold에 대해 f1-score 측정
precisions, recalls, thresholds = precision_recall_curve(true_labels, pred_probs)

f1s = 2 * (precisions * recalls) / (precisions + recalls + 1e-8)  # f1-score 계산
best_idx = np.argmax(f1s)
best_threshold = thresholds[best_idx]

print(f"✅ Best threshold by F1-score: {best_threshold:.4f}, F1: {f1s[best_idx]:.4f}")

# 3. 최적 threshold로 예측 라벨 생성
pred_labels = (pred_probs > best_threshold).astype(int)
# 6. 누적 수익률 (선택)
# future_prices = close_prices[split+1:]  # 실제 수익률 계산용
# visualize_cumulative_return(pred_probs, y_val.cpu().numpy(), future_prices)
"""

visualize_training(train_losses, val_losses, val_accs)
visualize_prediction(pred_probs, y_val.cpu().numpy())


In [None]:
#정확도 기반
from sklearn.metrics import classification_report, confusion_matrix
print(classification_report(y_val.cpu(), pred_labels))
print(confusion_matrix(y_val.cpu(), pred_labels))


In [None]:
#f1 score 기반
from sklearn.metrics import classification_report, confusion_matrix
print(classification_report(y_val.cpu(), pred_labels))
print(confusion_matrix(y_val.cpu(), pred_labels))


In [None]:
unique, counts = np.unique(labels.numpy(), return_counts=True)
print(dict(zip(unique, counts)))
