In [1]:
# Necessary imports
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import os
from datetime import datetime, timedelta
import shutil

  from pandas.core import (


Documentations for this function overall:
1. data_generation: taking in a paramter of path to data and target frequency of this dataset (only support daily and hourly data), returns a dataframe that has parsed the data. Could add in additional features if wanted.
2. charts will initialize mainly out of default parameters, the important ones are data(dataframe that we want to plot), freq(frequency of the data), additional_cols(all additional feature column names), other_cols_color(specifiy the corresponding colors for all additional columns in RGB fashion), overlap(meaning if we want the plots to contain overlapping dates or not)
3. charts.generate is the main function generating all the plots, taking in two parameters, "benchmark" (if True, then the plots will only contain OHLC; if False, will plot all features specified in initialization) and "make_up_short" (if True, we will randomly select plots with label -1 and duplicate them in order to make the short plots about 50% of the whole training dataset)

In [7]:
# Add in additional features here if needed
def data_generation(data_path, freq):
    if freq == 'D':
        data = pd.read_csv(data_path, index_col=0, parse_dates=True)
    elif freq == 'H':
        data = pd.read_csv(data_path, index_col=0)
        incr = 1e-9
        unix_start = datetime(1970, 1, 1)
        date = []
        for i in range(len(data)):
            date.append(unix_start + timedelta(seconds=incr*data.index[i]))
        data.index = date
        for col in ['open', 'high', 'low', 'close']:
            data[col] = data[col].astype(float) * incr
    # Add features here
    
    data.columns = data.columns.str.lower()
    return data

In [3]:
class charts():
    def __init__(self,
                 data,
                 window_size=60,
                 additional_cols=[],
                 other_cols_color=[],
                 freq='D',
                 percent_train=0.7,
                 overlap=True,
                 ):
        assert len(additional_cols) == len(other_cols_color)
        self.data = data.copy()
        self.overlap = overlap

        self.window_size = window_size
        self.multiple = 1 if freq == 'D' else 16

        # Set the symbol for the data, default is SPY
        self.symbol = 'SPY'
        if 'symbol' in data.columns:
            self.symbol = data['symbol'].iloc[0]
            data.drop('symbol', axis=1, inplace=True)

        # Resample at the desired frequency, default is daily (only support daily and hourly)
        self.freq = freq

        # Set all column names
        self.feat_cols = ['open', 'high', 'low', 'close'] + additional_cols
        self.other_cols = additional_cols
        self.other_cols_color = other_cols_color
        self.cutoff = percent_train

    # Main function
    def plot(self, df, benchmark):
        if self.window_size == 60:
            const = np.array([76, 19, 96])
        elif self.window_size == 20:
            const = np.array([51, 12, 64])
        elif self.window_size == 5:
            const = np.array([25, 6, 32])
        const *= self.multiple
        df = df.reset_index(drop=True).copy()
        high = df[['high'] + self.other_cols].values.max()
        low = df[['low'] + self.other_cols].values.min()
        for col in self.feat_cols:
            df[col] = (df[col] - low) / (high - low) * const[0]
            df[col] = df[col].round(0).astype(int) + int(const[1])

        df['volume'] = df['volume'] / df['volume'].max() * const[1]
        df['volume'] = df['volume'].round(0).astype(int)

        plot = np.zeros((const[-1], 3 * self.window_size * self.multiple, 3))

        for idx, row in df.iterrows():
            plot[int(row['open']), idx*3, :] = [255] * 3
            plot[int(row['low']):int(row['high']+1), idx*3+1, :] = [255] * 3
            plot[int(row['close']):int(row['close']+1), idx*3+2, :] = [255] * 3
            if not benchmark:
                plot[:int(row['volume']+1), idx*3+1, :] = [255] * 3

                # for col in self.ma_cols:
                #     pre_ma = df.loc[idx-1, col] if idx >= 1 else df.loc[idx, col]
                #     next_ma = df.loc[idx+1, col] if idx <= len(df)-2 else df.loc[idx, col]

                #     plot[int((row[col] + pre_ma)//2), idx*3, :] = [255] * 3
                #     plot[int(row[col]), idx*3+1, :] = [255] * 3
                #     plot[int((row[col] + next_ma)//2), idx*3+2, :] = [255] * 3
                
                for i, col in enumerate(self.other_cols):
                    pre = df.loc[idx-1, col] if idx >= 1 else df.loc[idx, col]
                    nex = df.loc[idx+1, col] if idx <= len(df)-2 else df.loc[idx, col]

                    plot[int((row[col] + pre)//2), idx*3, :] = self.other_cols_color[i]
                    plot[int(row[col]), idx*3+1, :] = self.other_cols_color[i]
                    plot[int((row[col] + nex)//2), idx*3+2, :] = self.other_cols_color[i]

        plot = plot[::-1, :, :]  # reversion
        plot = plot / 255.0

        return plot
    
    def generate(self, benchmark=False, make_up_short=False):
        m, _ = self.data.shape
        counter = 0
        last_counter = 0
        short_plot = []
        short_info = []
        short_cnt, long_cnt = 0, 0
        incr = self.window_size * self.multiple
        counter_incr = 12 if self.overlap else incr
        train_name = 'train' if not benchmark else 'train_benchmark'
        test_name = 'test' if not benchmark else 'test_benchmark'
        if os.path.exists(train_name):
            shutil.rmtree(train_name)
        os.makedirs(train_name)
        if os.path.exists(test_name):
            shutil.rmtree(test_name)
        os.makedirs(test_name)
        train_test_cutoff = int(self.cutoff * m)
        while last_counter < train_test_cutoff:
            curr_plot = self.plot(self.data.iloc[counter:counter+incr, :].copy(), benchmark)
            counter += counter_incr
            last_counter += incr
            curr_dir = []
            for i in [5, 10, 15]:
                if self.data.iloc[counter + i + 1, :].loc['close'] > self.data.iloc[counter + 1, :].loc['close']:
                    curr_dir.append(1)
                else:
                    curr_dir.append(-1)
            plt.imshow(curr_plot)
            plt.axis('off')
            plt.savefig(f"{train_name}/{self.symbol}_{self.window_size}_{self.freq}_{curr_dir[0]}_{curr_dir[1]}_{curr_dir[2]}_{self.data.index.values[counter]}_version_1.png", bbox_inches='tight', pad_inches=0)
            plt.close()
            if curr_dir[0] == 1:
                long_cnt += 1
            else:
                short_cnt += 1
                short_plot.append(curr_plot)
                short_info.append((curr_dir, self.data.index.values[counter]))
        if make_up_short:
            if short_cnt / (long_cnt + short_cnt) < 0.5:
                target = int(0.5 * (long_cnt + short_cnt) - short_cnt)
                for i in range(target):
                    rand_ind = np.random.randint(0, len(short_plot))
                    tmp_plot = short_plot[rand_ind]
                    tmp_dir, tmp_end = short_info[rand_ind]
                    plt.imshow(tmp_plot)
                    plt.axis('off')
                    # plt.show()
                    plt.savefig(f"{train_name}/{self.symbol}_{self.window_size}_{self.freq}_{tmp_dir[0]}_{tmp_dir[1]}_{tmp_dir[2]}_{tmp_end}_duplicated.png", bbox_inches='tight', pad_inches=0)
                    plt.close()
        last_counter += 15
        counter = last_counter
        while counter < m:
            curr_plot = self.plot(self.data.iloc[counter:counter+incr, :].copy(), benchmark)
            counter += counter_incr
            curr_dir = []
            for i in [5, 10, 15]:
                if counter + i + 1 < m:
                    if self.data.iloc[counter + i + 1, :]['close'] > self.data.iloc[counter + 1, :]['close']:
                        curr_dir.append(1)
                    else:
                        curr_dir.append(-1)
            if len(curr_dir) == 3:
                plt.imshow(curr_plot)
                plt.axis('off')
                plt.savefig(f"{test_name}/{self.symbol}_{self.window_size}_{self.freq}_{curr_dir[0]}_{curr_dir[1]}_{curr_dir[2]}_{self.data.index.values[counter]}_version_1.png", bbox_inches='tight', pad_inches=0)
                plt.close()

In [6]:
# d = data_generation('data/XNAS-20240629-DM3T6SKQVU/xnas-itch-20180501-20240628.ohlcv-1h.csv', 'H')
d = data_generation('yfinance_SPY.csv', freq='D')
ohlc = charts(data=d, freq='D', overlap=True)
ohlc.generate(benchmark=False, make_up_short=False)

In [12]:
# Get the list of S&P 500 companies from Wikipedia
sp500_url = 'https://en.wikipedia.org/wiki/List_of_S%26P_500_companies'
table = pd.read_html(sp500_url)
sp500_df = table[0]
tickers = sp500_df['Symbol'].tolist()

error_count = 0
error_list = []
for ticker in tickers:
    print(f"Working at {ticker}")
    try:
        d = data_generation(f'data-SPYstock/{ticker}.csv', freq='D')
        ohlc = charts(data=d, freq='D', overlap=True)
        ohlc.generate(benchmark=True, make_up_short=False)
        print(f"Generation complete for {ticker}")
    except:
        print(f"Error generating {ticker}")
        error_count += 1
        error_list.append(ticker)

Working at MMM
Generation complete for MMM
Working at AOS
Error generating AOS
Working at ABT
Error generating ABT
Working at ABBV
Generation complete for ABBV
Working at ACN
Generation complete for ACN
Working at ADBE
Error generating ADBE
Working at AMD
Error generating AMD
Working at AES
Generation complete for AES
Working at AFL
Error generating AFL
Working at A
Generation complete for A
