In [4]:
import os
import sys

from typing import Tuple
from datetime import datetime
from pathlib import Path

import cv2
import matplotlib.pyplot as plt
import mplfinance as mpf
import numpy as np
import pandas as pd
import torch

from sklearn.model_selection import train_test_split
from tqdm import tqdm

In [5]:
data_path = Path.cwd().parent / "data"
stock_data_path = data_path / "stock_data/daily_stock_data"
image_file_path = data_path / "stock_chart_image"

In [40]:
X = []
Y = []
zero_count = 0
for img_path in tqdm(os.listdir(image_file_path)):
    if img_path == ".ipynb_checkpoints":
        continue
    if not img_path.split("_")[0].isalpha():
        continue
    img = cv2.imread(str(image_file_path / img_path), cv2.IMREAD_COLOR)
    img = cv2.resize(img, (224, 224))
    label = int(img_path.split("_")[-1][0])
    # if label == 0:
    #     if zero_count >= 3000:
    #         continue
    #     zero_count += 1
    X.append(img)
    Y.append(label)
    
X = np.asarray(X)
Y = np.asarray(Y)

100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 24341/24341 [02:11<00:00, 184.91it/s]


In [37]:
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, stratify=Y)

In [41]:
import pickle

with open("X_data.pkl", "wb") as f:
    pickle.dump(X, f)
    
with open("Y_data.pkl", "wb") as f:
    pickle.dump(Y, f)

In [10]:
import torch
import torchvision.transforms

transform = torchvision.transforms.Compose(
    [torchvision.transforms.ToTensor(),
     torchvision.transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

class Dataset(torch.utils.data.Dataset):
    def __init__(self, 
                X_data: np.ndarray,
                Y_data: np.ndarray,
                transform: torchvision.transforms):
        super().__init__()
        
        self.X_data = X_data
        self.Y_data = Y_data
        self.len = len(Y_data)
        self.transform = transform
        
    def __getitem__(self, idx: int) -> Tuple:
        return self.transform(self.X_data[idx]), self.Y_data[idx]
        
    def __len__(self):
        return self.len

In [11]:
trainset = Dataset(X_train, Y_train, transform)
testset = Dataset(X_test, Y_test, transform)

In [12]:
batch_size = 256

trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size,
                                          shuffle=True)
testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size,
                                         shuffle=False)

In [42]:
import torch.nn as nn
import torch.nn.functional as F

class BaseModel(nn.Module):
    
    def training_step(self, batch):
        images, labels = batch 
        out = self(images)                  # Generate predictions
        loss = F.cross_entropy(out, labels) # Calculate loss
        return loss
    
    def validation_step(self, batch):
        images, labels = batch 
        out = self(images)                    # Generate predictions
        loss = F.cross_entropy(out, labels)   # Calculate loss
        acc = accuracy(out, labels)           # Calculate accuracy
        return {'val_loss': loss.detach(), 'val_acc': acc}
        
    def validation_epoch_end(self, outputs):
        batch_losses = [x['val_loss'] for x in outputs]
        epoch_loss = torch.stack(batch_losses).mean()   # Combine losses
        batch_accs = [x['val_acc'] for x in outputs]
        epoch_acc = torch.stack(batch_accs).mean()      # Combine accuracies
        return {'val_loss': epoch_loss.item(), 'val_acc': epoch_acc.item()}
    
    def epoch_end(self, epoch, result):
        print("Epoch [{}], train_loss: {:.4f}, val_loss: {:.4f}, val_acc: {:.4f}".format(
            epoch, result['train_loss'], result['val_loss'], result['val_acc']))

In [43]:
class VGG16(BaseModel):
    def __init__(self):
        super().__init__()
        self.conv1_1 = nn.Conv2d(in_channels=3, out_channels=64, kernel_size=3, padding=1)
        self.conv1_2 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, padding=1)

        self.conv2_1 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1)
        self.conv2_2 = nn.Conv2d(in_channels=128, out_channels=128, kernel_size=3, padding=1)

        self.conv3_1 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, padding=1)
        self.conv3_2 = nn.Conv2d(in_channels=256, out_channels=256, kernel_size=3, padding=1)
        self.conv3_3 = nn.Conv2d(in_channels=256, out_channels=256, kernel_size=3, padding=1)

        self.conv4_1 = nn.Conv2d(in_channels=256, out_channels=512, kernel_size=3, padding=1)
        self.conv4_2 = nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, padding=1)
        self.conv4_3 = nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, padding=1)

        self.conv5_1 = nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, padding=1)
        self.conv5_2 = nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, padding=1)
        self.conv5_3 = nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, padding=1)

        self.maxpool = nn.MaxPool2d(kernel_size=2, stride=2)

        self.fc1 = nn.Linear(2048, 4096)
        self.fc2 = nn.Linear(4096, 4096)
        self.fc3 = nn.Linear(4096, 3)

    def forward(self, x):
        x = F.relu(self.conv1_1(x))
        x = F.relu(self.conv1_2(x))
        x = self.maxpool(x)
        x = F.relu(self.conv2_1(x))
        x = F.relu(self.conv2_2(x))
        x = self.maxpool(x)
        x = F.relu(self.conv3_1(x))
        x = F.relu(self.conv3_2(x))
        x = F.relu(self.conv3_3(x))
        x = self.maxpool(x)
        x = F.relu(self.conv4_1(x))
        x = F.relu(self.conv4_2(x))
        x = F.relu(self.conv4_3(x))
        x = self.maxpool(x)
        x = F.relu(self.conv5_1(x))
        x = F.relu(self.conv5_2(x))
        x = F.relu(self.conv5_3(x))
        x = self.maxpool(x)
        x = x.reshape(x.shape[0], -1)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, 0.5) #dropout was included to combat overfitting
        x = F.relu(self.fc2(x))
        x = F.dropout(x, 0.5)
        x = self.fc3(x)
        return x

In [19]:
def accuracy(outputs, labels):
    _, preds = torch.max(outputs, dim=1)
    return torch.tensor(torch.sum(preds == labels).item() / len(preds))

  
@torch.no_grad()
def evaluate(model, val_loader):
    model.eval()
    outputs = [model.validation_step(batch) for batch in val_loader]
    return model.validation_epoch_end(outputs)

  
def fit(epochs, lr, model, train_loader, val_loader, opt_func = torch.optim.SGD):
    
    history = []
    optimizer = opt_func(model.parameters(),lr)
    for epoch in range(epochs):
        
        model.train()
        train_losses = []
        for batch in tqdm(train_loader):
            loss = model.training_step(batch)
            train_losses.append(loss)
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
            
        result = evaluate(model, val_loader)
        result['train_loss'] = torch.stack(train_losses).mean().item()
        model.epoch_end(epoch, result)
        history.append(result)
    
    return history

In [70]:
model = VGG16()

In [73]:
model.load_state_dict(torch.load("model.pth"))

<All keys matched successfully>

In [69]:
torch.save(model.state_dict(), "model.pth")

In [67]:
dummy = torch.randn((1, 3, 64, 64))
model(dummy)

tensor([[-0.1898,  0.0886,  0.1357]], grad_fn=<AddmmBackward0>)

In [55]:
num_epochs = 10
opt_func = torch.optim.Adam
lr = 0.001
history = fit(num_epochs, lr, model, trainloader, testloader, opt_func)

 16%|████████████████████████████████████████████▌                                                                                                                                                                                                                                       | 5/31 [05:33<28:55, 66.73s/it]


KeyboardInterrupt: 

In [68]:
def save_chart_image(df:pd.DataFrame, 
                     save_path: Path,
                     file_prefix: str,
                     window=20):
    customstyle = mpf.make_mpf_style(base_mpf_style='yahoo', facecolor='w')

    width_config={"candle_linewidth":1.5, 
                  "candle_width":0.9, 
                  "volume_width": 0.6, 
                  "line_width": 1}
                     
    for i in tqdm(range(len(df) - window - 5), desc=f"Data {file_prefix}"):
        target_df = df[i: i + window]
        label = label_data(df, i + window - 1, 5, 0.04)
        
        ma_5 = mpf.make_addplot(target_df.ma_5)
        ma_20 = mpf.make_addplot(target_df.ma_20)
        ma_60 = mpf.make_addplot(target_df.ma_60)
        # ma_120 = mpf.make_addplot(target_df.ma_120)
        # ma_240 = mpf.make_addplot(target_df.ma_240)
        
        file_path = save_path / f"{file_prefix}_{i}_{label}.jpg"

        fig = mpf.plot(target_df, type="candle",
                       style=customstyle, 
                         addplot=[ma_5, ma_20, ma_60],
                         update_width_config=width_config, 
                         figsize=(10, 10),
                         fontscale=0,
                         axisoff=True,
                         volume=True,
                         tight_layout=True,
                         savefig=file_path)