In [1]:
import pandas as pd
import numpy as np
from numpy.lib.stride_tricks import sliding_window_view

import os
import random
import yfinance as yf
from pathlib import Path
from datetime import datetime as dt

import plotly.graph_objects as go
import glob
import cv2

from torch.optim import Adam
from torchvision import transforms
import torch.nn.functional as F
from torch.utils import data as data_u
from torch.utils.data import DataLoader
from torch import nn, optim
import torchvision
import torch

### Creating the Training Data (creating the labels)

In [38]:
# read the stock names
stocks = pd.read_excel('Data/universe.xlsx')['symbol'][:50]

# convert pd Series of strings into one long string (that the format yf wants)
string_format_stocks = stocks.str.cat(sep=' ')

# fetch the data
prices = yf.download(string_format_stocks, start='2010-01-01', end=dt.today().strftime('%Y-%m-%d'))

[*********************100%***********************]  50 of 50 completed


In [39]:
prices.reset_index(inplace=True)
prices.head()

  prices.reset_index(inplace=True)


Unnamed: 0_level_0,Date,Adj Close,Adj Close,Adj Close,Adj Close,Adj Close,Adj Close,Adj Close,Adj Close,Adj Close,...,Volume,Volume,Volume,Volume,Volume,Volume,Volume,Volume,Volume,Volume
Unnamed: 0_level_1,Unnamed: 1_level_1,ABBV,ACN,AEP,AIZ,ALLE,AMAT,AMP,AMZN,AVB,...,PYPL,RE,ROL,ROST,UNH,URI,V,VRSK,WRK,XOM
0,2010-01-04,,32.970905,21.340599,23.350733,,11.356113,29.852716,6.695,54.045589,...,,444800,839363,15743600,12199500,1692500,20180000,390000,,27809100
1,2010-01-05,,33.174686,21.096291,24.047535,,11.268759,30.548885,6.7345,53.639587,...,,327200,701663,9369600,11180700,1459200,25833600,430000,,30174700
2,2010-01-06,,33.527359,21.310055,23.977858,,11.244936,30.975569,6.6125,53.133747,...,,266400,841388,13144800,9761100,1072900,16254000,848900,,35044700
3,2010-01-07,,33.496002,21.493298,24.272064,,11.125816,31.207605,6.5,53.24025,...,,354600,536119,23984800,11789800,2052800,27841200,426600,,27192100
4,2010-01-08,,33.362778,21.749826,24.28755,,11.554646,31.319889,6.676,52.840878,...,,276900,330581,15926000,7228700,1399000,11907200,253200,,24891800


In [40]:
# extract the dates to use in the sliding window
dates = prices['Date']

# create windows of 15 days that jump every 5 days (5 days overlap)
windows = sliding_window_view(dates, window_shape = 15)[::5]

In [41]:
# helper function to assign reccomendation given return
def assign_recommendation(ret):
    if ret < -0.1:
        recommendation = 'strong sell'
    elif ret < -0.02:
        recommendation = 'sell'
    elif ret < 0.05:
        recommendation = 'neutral'
    elif ret < 0.1:
        recommendation = 'buy'
    else:
        recommendation = 'strong buy'
    return recommendation

In [42]:
# create an array to store the associated following week returns for each window
counter = 0

# specify the test ratio
test_ratio = 0.2

# loop through each stock and then through the windows
for stock in stocks:
    # extract the current stock data
    current_stock = prices.iloc[:, (prices.columns.get_level_values(1)==stock) | (prices.columns.get_level_values(0)=='Date')]

    # drop the multiindex column names (stock name is uneccessary)
    current_stock = current_stock.droplevel(level=1, axis=1)
    
    # some stocks dont have historical data from year 2010, so drop na
    current_stock.dropna(inplace=True)
        
    for i in range(len(windows)-1):

        # slice the dataframe
        window_data = current_stock.loc[prices['Date'].isin(windows[i])]
        
        if len(window_data) == 15:
            
            # allocate test_ratio of images to the test folder, others to the train folder
            test = random.random() < test_ratio
            train_or_test = 'test' if test else 'train'

            # make the figure
            fig = go.Figure(data=[go.Candlestick(x=window_data['Date'], open=window_data['Open'], high=window_data['High'],
                                                 low=window_data['Low'], close=window_data['Close'])])

            # remove uneccessary stuff from the figure
            fig.update_yaxes(showticklabels=False)
            fig.update_xaxes(showticklabels=False)
            fig.update_layout(xaxis_rangeslider_visible=False)
            
            # find the associated next week return
            next_window_data = current_stock.loc[current_stock['Date'].isin(windows[i+1])]
            next_window_data.reset_index(inplace=True, drop=True) # to have the index always from 0 to 9
            following_week_return = next_window_data['Adj Close'].pct_change(periods=5)[5].round(3)
            recommendation = assign_recommendation(following_week_return)
            
            
            path = f'pattern_images/{train_or_test}/{recommendation}/'

            if not os.path.exists(path):
                os.makedirs(path)
                
            # save the figure
            fig.write_image(path + f'fig {counter}.png')


            # increase the counter
            counter += 1
            

### Train CNN on the Data

In [204]:
# num_images = len(glob.glob('pattern_images/*'))

# data = []
# for i in range(num_images):
#     image = cv2.imread(f'pattern_images/fig {i}.png')
#     image = cv2.resize(image, (150, 150))
#     associated_return = associated_returns[i]
#     data.append([image, associated_return])

In [45]:
# Transforms
transformer = transforms.Compose([
    transforms.Resize((150,150)),  # resize the image
    transforms.ToTensor(),
    transforms.Normalize([0.5, 0.5, 0.5],
                         [0.5, 0.5, 0.5])
])

In [46]:
# Dataloader

train_path = 'pattern_images/train'
test_path = 'pattern_images/test'

train_loader = DataLoader(
    torchvision.datasets.ImageFolder(train_path, transform=transformer),
    batch_size = 250, shuffle=True
)

test_loader = DataLoader(
    torchvision.datasets.ImageFolder(test_path, transform=transformer),
    batch_size = 250, shuffle=True
)

In [47]:
# Categories
path = Path(train_path)
categories = sorted([category.name.split('/')[-1] for category in path.iterdir()])
print(categories)

['buy', 'neutral', 'sell', 'strong buy', 'strong sell']


In [48]:
class Network(nn.Module):
    def __init__(self, num_classes=2):
        super().__init__()
        
        # output size after convolution filter: ((w-f+2p)/s)+1
        
        # input shape = (10, 3, 150, 150)
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=12, kernel_size=3, stride=1, padding=1)
        
        # input shape = (10, 12, 150, 150)
        self.bn1 = nn.BatchNorm2d(num_features=12)
        
        # input shape = (10, 12, 150, 150)
        self.conv2 = nn.Conv2d(in_channels=12, out_channels=20, kernel_size=3, stride=1, padding=1)
        
        # input shape = (10, 12, 150, 150)
        self.conv3 = nn.Conv2d(in_channels=20, out_channels=32, kernel_size=3, stride=1, padding=1)
        
        # input shape = (10, 12, 150, 150)
        self.pool = nn.MaxPool2d(kernel_size=2)
        
        # input shape = (10, 32, 75, 75)
        self.fc1 = nn.Linear(in_features=32 * 75 * 75, out_features=120)
        self.fc2 = nn.Linear(in_features=120, out_features=60)
        self.fc3 = nn.Linear(in_features=60, out_features=num_classes)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = self.pool(F.relu(self.conv3(x)))
        x = torch.flatten(x, 1)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x


model = Network(num_classes=len(categories))

In [49]:
loss_fn = nn.CrossEntropyLoss()
optimizer = Adam(model.parameters(), lr=0.001, weight_decay=0.0001)

In [50]:
train_count = len(glob.glob(train_path+'/*/*.png'))
test_count = len(glob.glob(test_path+'/*/*.png'))
print('number of training images:', train_count)
print('number of testing images:', test_count)

number of training images: 23211
number of testing images: 5989


In [60]:
def init_weights(m):
    """
        Initialize weights of the model to random normal
    """
    if type(m) == nn.Linear:
        torch.nn.init.normal_(m.weight, std=0.01)

def evaluate_accuracy(data_iter, net, device=torch.device('cpu')):
    """
        Evaluate accuracy of a model on the given data set
    """
    net.eval()  # Switch to evaluation mode for Dropout, BatchNorm etc layers.
    acc_sum, n = torch.tensor([0], dtype=torch.float32, device=device), 0
    for X, y in data_iter:
        # Copy the data to device.
        X, y = X.to(device), y.to(device)
        with torch.no_grad():
            y = y.long()
            acc_sum += torch.sum((torch.argmax(net(X), dim=1) == y))
            n += y.shape[0]
    return acc_sum.item()/n

def train(num_epochs=5, previous_epochs=0):
    """
        Train the model
    """
    
    for epoch in range(num_epochs):
        train_l_sum, train_acc_sum, n = 0.0, 0.0, 0
        for i, data in enumerate(train_loader, 0):
            inputs, labels = data

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = model(inputs)          # get model predictions
            loss = loss_fn(outputs, labels)  # calculate the loss
            loss.backward()                  # backward propagation
            optimizer.step()                 # update weights and biases

            labels = labels.type(torch.float32)
            train_l_sum += loss.item()
            train_acc_sum += torch.sum((torch.argmax(outputs, dim=1).type(torch.FloatTensor) == labels).detach()).float()
            n += list(labels.size())[0]

        # print statistics
        test_acc = evaluate_accuracy(test_loader, model)
        print('epoch %d, loss %.4f, train acc %.3f, test acc %.3f' % (epoch + previous_epochs + 1, train_l_sum / n, train_acc_sum / n, test_acc))

    print('Finished Training')
    
def overall_test_accuracy():
    """
        Calculate the overall test accuracy
    """
    
    correct = 0
    total = 0
    # since we're not training, we don't need to calculate the gradients for our outputs
    with torch.no_grad():
        for data in test_loader:
            images, labels = data
            # calculate outputs by running images through the network
            outputs = model(images)
            # the class with the highest energy is what we choose as prediction
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    print(f'Accuracy of the network on the test images: {100 * correct // total} %')
    
def per_class_category():
    """
        Calculate the accuracy per class
    """

    # prepare to count predictions for each class
    correct_pred = {classname: 0 for classname in categories}
    total_pred = {classname: 0 for classname in categories}

    # again no gradients needed
    with torch.no_grad():
        for data in test_loader:
            images, labels = data
            outputs = model(images)
            _, predictions = torch.max(outputs, 1)
            # collect the correct predictions for each class
            for label, prediction in zip(labels, predictions):
                if label == prediction:
                    correct_pred[categories[label]] += 1
                total_pred[categories[label]] += 1


    # print accuracy for each class
    for classname, correct_count in correct_pred.items():
        try:
            accuracy = 100 * float(correct_count) / total_pred[classname]
        except ZeroDivisionError:
            accuracy = 0

        print(f'Accuracy for class: {classname:5s} is {accuracy:.1f} %')

In [66]:
# initialize weights to random numbers
model.apply(init_weights)

Network(
  (conv1): Conv2d(3, 12, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (bn1): BatchNorm2d(12, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv2): Conv2d(12, 20, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv3): Conv2d(20, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (fc1): Linear(in_features=180000, out_features=120, bias=True)
  (fc2): Linear(in_features=120, out_features=60, bias=True)
  (fc3): Linear(in_features=60, out_features=5, bias=True)
)

In [None]:
# train the model
num_epochs = 8
train(num_epochs)

epoch 1, loss 0.0024, train acc 0.773, test acc 0.814
epoch 2, loss 0.0016, train acc 0.836, test acc 0.821
epoch 3, loss 0.0013, train acc 0.861, test acc 0.819
epoch 4, loss 0.0011, train acc 0.894, test acc 0.821
epoch 5, loss 0.0008, train acc 0.926, test acc 0.817
epoch 6, loss 0.0005, train acc 0.957, test acc 0.817
epoch 7, loss 0.0003, train acc 0.976, test acc 0.802


In [15]:
# save the trained model
torch.save(model.state_dict(), 'model_weights.pth')

In [67]:
# check the accuract of the model
overall_test_accuracy()
per_class_category()

Accuracy of the network on the test images: 6 %
Accuracy for class: buy   is 100.0 %
Accuracy for class: neutral is 0.0 %
Accuracy for class: sell  is 0.0 %
Accuracy for class: strong buy is 0.0 %
Accuracy for class: strong sell is 0.0 %
