In [2]:
from utils import get_stock_price_data
import pandas as pd
from sklearn.linear_model import LinearRegression
import numpy as np
from sklearn.metrics import classification_report
from tqdm import tqdm
from torchinfo import summary

In [3]:
time_span = 7
lag_cols = [f'close_t{i}' for i in range(-time_span, 1)]
X = np.array(range(1, len(lag_cols)+1)).reshape(-1, 1)
def compute_slope_history(row):
    y = row[lag_cols].values.astype(float)
    model = LinearRegression()
    model.fit(X, y)
    return model.coef_[0]

future_cols = [f'close_t{i}' for i in range(0, time_span+1)]
X2 = np.array(range(1, len(future_cols)+1)).reshape(-1, 1)
def compute_slope_future(row):
    y = row[future_cols].values.astype(float)
    model = LinearRegression()
    model.fit(X2, y)
    return model.coef_[0]

In [4]:
symbol = "VCB"
threshold = 0.05
df = get_stock_price_data(symbol)
df.drop(['open', 'high', 'low', 'volume'], axis=1, inplace=True)

for i in range(-time_span, time_span+1):
    df[f'close_t{i}'] = df['close'].shift(-i)
df.dropna(inplace=True)

coef_col_name = f'coef_close_t{-time_span}_to_close'
df[coef_col_name] = df.apply(compute_slope_history, axis=1)
df[f'predict'] = np.where(
    df[coef_col_name] >= threshold, 'up',
    np.where(df[coef_col_name] <= -threshold, 'down', 'sideways')
)

coef_col_name = f'coef_close_to_close_t{time_span}'
df[coef_col_name] = df.apply(compute_slope_future, axis=1)
df[f'ground_truth'] = np.where(
    df[coef_col_name] >= threshold, 'up',
    np.where(df[coef_col_name] <= -threshold, 'down', 'sideways')
)

In [5]:
df

Unnamed: 0,time,close,close_t-7,close_t-6,close_t-5,close_t-4,close_t-3,close_t-2,close_t-1,close_t0,...,close_t2,close_t3,close_t4,close_t5,close_t6,close_t7,coef_close_t-7_to_close,predict,coef_close_to_close_t7,ground_truth
7,2023-04-10,50.41,52.50,51.77,52.56,52.10,52.16,51.65,50.97,50.41,...,50.01,49.95,50.07,50.07,49.84,50.12,-0.253571,down,-0.027143,sideways
8,2023-04-11,49.95,51.77,52.56,52.10,52.16,51.65,50.97,50.41,49.95,...,49.95,50.07,50.07,49.84,50.12,50.01,-0.326071,down,0.007619,sideways
9,2023-04-12,50.01,52.56,52.10,52.16,51.65,50.97,50.41,49.95,50.01,...,50.07,50.07,49.84,50.12,50.01,49.56,-0.411071,down,-0.034881,sideways
10,2023-04-13,49.95,52.10,52.16,51.65,50.97,50.41,49.95,50.01,49.95,...,50.07,49.84,50.12,50.01,49.56,49.78,-0.374524,down,-0.043333,sideways
11,2023-04-14,50.07,52.16,51.65,50.97,50.41,49.95,50.01,49.95,50.07,...,49.84,50.12,50.01,49.56,49.78,50.01,-0.315119,down,-0.033571,sideways
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
487,2025-03-13,65.50,62.21,62.21,62.54,63.61,64.75,64.75,66.70,65.50,...,67.30,66.80,66.50,66.80,66.00,66.50,0.633929,up,0.061905,up
488,2025-03-14,66.00,62.21,62.54,63.61,64.75,64.75,66.70,65.50,66.00,...,66.80,66.50,66.80,66.00,66.50,66.20,0.602381,up,-0.055952,down
489,2025-03-17,67.30,62.54,63.61,64.75,64.75,66.70,65.50,66.00,67.30,...,66.50,66.80,66.00,66.50,66.20,65.80,0.588929,up,-0.170238,down
490,2025-03-18,66.80,63.61,64.75,64.75,66.70,65.50,66.00,67.30,66.80,...,66.80,66.00,66.50,66.20,65.80,65.50,0.447976,up,-0.165476,down


In [6]:
X = df[['close_t-7', 'close_t-6', 'close_t-5', 'close_t-4', 'close_t-3', 'close_t-2', 'close_t-1', 'close_t0']].values
for i in range(len(X)):
    X[i] = X[i]/X[i][0]
y_label = df['ground_truth'].values
y = []
for s in y_label:
    if s == 'sideways':
        y.append([0.0, 1.0, 0.0])
    elif s == 'up':
        y.append([1.0, 0.0, 0.0])
    elif s == 'down':
        y.append([0.0, 0.0, 1.0])
y = np.array(y)
train_size = int(len(X) * 0.8)
X_train, X_test = X[:train_size], X[train_size:]
y_train, y_test = y[:train_size], y[train_size:]

In [7]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

class TimeSeriesDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X).float()
        self.y = torch.tensor(y).float()

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

train_dataset = TimeSeriesDataset(X_train, y_train)
test_dataset = TimeSeriesDataset(X_test, y_test)

batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [30]:
class SimpleNN(nn.Module):
    def __init__(self, input_size=8, output_size=3):
        super(SimpleNN, self).__init__()
        hidden_size = 128
        self.layers = nn.Sequential(
            nn.Linear(input_size, hidden_size),
            nn.GELU(),
            nn.Linear(hidden_size, hidden_size),
            nn.GELU(),
            nn.Linear(hidden_size, hidden_size),
            nn.GELU(),
            nn.Linear(hidden_size, output_size),
        )

    def forward(self, x):
        return self.layers(x)


In [31]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = SimpleNN().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

In [32]:
summary(model)

Layer (type:depth-idx)                   Param #
SimpleNN                                 --
├─Sequential: 1-1                        --
│    └─Linear: 2-1                       1,152
│    └─GELU: 2-2                         --
│    └─Linear: 2-3                       16,512
│    └─GELU: 2-4                         --
│    └─Linear: 2-5                       16,512
│    └─GELU: 2-6                         --
│    └─Linear: 2-7                       387
Total params: 34,563
Trainable params: 34,563
Non-trainable params: 0

In [33]:
epochs = 1000
for epoch in tqdm(range(epochs)):
    model.train()
    for inputs, targets in train_loader:
        inputs, targets = inputs.to(device), targets.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()

100%|██████████| 1000/1000 [00:25<00:00, 39.12it/s]


In [34]:
model.eval()
test_loss = 0
correct_test = 0
train_loss = 0
correct_train = 0
label_indices_list = []
pred_indices_list = []
with torch.inference_mode():
    for inputs, targets in train_loader:
        inputs, targets = inputs.to(device), targets.to(device)
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        train_loss += loss.item()
        label_indices = torch.argmax(targets, dim=1)
        pred_indices = torch.argmax(outputs, dim=1)
        correct_train += (pred_indices == label_indices).int().sum()

    for inputs, targets in test_loader:
        inputs, targets = inputs.to(device), targets.to(device)
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        test_loss += loss.item()
        label_indices = torch.argmax(targets, dim=1)
        pred_indices = torch.argmax(outputs, dim=1)
        label_indices_list.append(label_indices)
        pred_indices_list.append(pred_indices)
        correct_test += (pred_indices == label_indices).int().sum()

    

print(f'Epoch: {epoch + 1:02d}, Train Loss: {train_loss:.4f}, Train Acc: {correct_train / len(train_loader.dataset):.4f}')
print(f'Epoch: {epoch + 1:02d}, Test Loss: {test_loss:.4f}, Test Acc: {correct_test / len(test_loader.dataset):.4f}')

Epoch: 1000, Train Loss: 13.6829, Train Acc: 0.4253
Epoch: 1000, Test Loss: 4.0873, Test Acc: 0.3711


In [35]:
label_list = []
for x in label_indices_list:
    for y in x:
        if y.item() == 0:
            label_list.append('up')
        elif y.item() == 1:
            label_list.append('sideways')
        elif y.item() == 2:
            label_list.append('down')

In [36]:
pred_list = []
for x in pred_indices_list:
    for y in x:
        if y.item() == 0:
            pred_list.append('up')
        elif y.item() == 1:
            pred_list.append('sideways')
        elif y.item() == 2:
            pred_list.append('down')

In [38]:
print(classification_report(label_list, pred_list))

              precision    recall  f1-score   support

        down       0.35      0.85      0.49        34
    sideways       0.00      0.00      0.00        25
          up       0.54      0.18      0.27        38

    accuracy                           0.37        97
   macro avg       0.29      0.35      0.26        97
weighted avg       0.33      0.37      0.28        97



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
