In [1]:
import pandas as pd
import numpy as np

from lib.data import get_all_csv_files, large_df, make_dataset, scale, inverse_scale

In [2]:
files = get_all_csv_files('./stock_market_data/')

In [3]:
df = large_df(files)

100%|██████████| 4280/4280 [01:25<00:00, 50.12it/s] 


In [4]:
df = df[['Low', 'Open', 'High', 'Close', 'Volume']].dropna(axis=0)

In [5]:
df = df[df['Low'] < 5e3]

In [23]:
import os, gzip, json, shutil, torch, pickle
from tqdm import tqdm
from torch.utils.data import Dataset

from typing import List

In [57]:
class DataProcessor(Dataset):
    def __init__(self, time_d: int = 10, output_params: int = 4):
        self.time_d = time_d
        self.output_params = output_params

        self.max_low = 5e3
        self.cols = ['Low', 'Open', 'High', 'Close', 'Volume']
        self.compressed = False

        self.X = None
        self.y = None

        self.scl = {
            'money': (0.0, 1.0),
            'volume': (0.0, 1.0),
        }

    def __len__(self):
        return 0 if type(self.X) == type(None) else len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

    @staticmethod
    def ticker_from_path(path: str):
        return os.path.basename(path)[:-4]

    @staticmethod
    def get_all_csv_files(dir: str):
        files = []
        a = os.listdir(dir)
        for b in a:
            if os.path.isdir(os.path.join(dir, b)):
                c = os.path.join(dir, b, 'csv')
                if os.path.isdir(c):
                    files += [os.path.join(c, f) for f in os.listdir(c) if f[-4:] == '.csv']

        return files

    @staticmethod
    def to_tensors(d):
        return torch.from_numpy(np.array(d))

    def find_ranges(self):
        money = self.X[:, :-1]
        volume = self.X[:, -1:]

        self.scl['money'] = (float(torch.min(money)), float(torch.max(money)))
        self.scl['volume'] = (float(torch.min(volume)), float(torch.max(volume)))

    def scale(self, data: torch.Tensor, new_min=0, new_max=1, with_volume: bool = True):
        money = data[:, :4] if with_volume else data
        money = (money - self.scl['money'][0]) / (self.scl['money'][1] - self.scl['money'][0]) * (new_max - new_min) + new_min
        
        if not with_volume:
            return money

        volume = (data[:, -1:] - self.scl['volume'][0]) / (self.scl['volume'][1] - self.scl['volume'][0]) * (new_max - new_min) + new_min
        return torch.hstack((money, volume))

    def inverse_scale(self, data: torch.Tensor, curr_max = 1, with_vol: bool = False):
        money = data[:, :-1] if with_vol else data
        money = money / curr_max * (self.scl['money'][1] - self.scl['money'][0]) + self.scl['money'][0]

        if not with_vol:
            return money
        
        volume = data[:, -1:] / curr_max * (self.scl['volume'][1] - self.scl['volume'][0]) + self.scl['volume'][0]
        return torch.hstack((money, volume))

    def make_dataset(self, d: List[List[float]]):
        d: torch.Tensor = self.to_tensors(d)
        Xd, yd = None, None

        for i in range(d.shape[0] - self.time_d - 1):
            idx = i + self.time_d
            row = d[idx + 1]
            row = row[:-1] # remove volume is it doesn't need to be predicted
            x = d[i:idx]
            if torch.isnan(torch.sum(x)) or torch.isnan(torch.sum(row)):
                continue # if the row (y) has nans, skip

            if type(Xd) == type(None):
                Xd = x
                yd = row
            else:
                Xd = torch.concat((Xd, x))
                yd = torch.concat((yd, row))
            
        return Xd, yd

    def make_datasets(self, data: dict):
        loop = tqdm(data, total=len(data), leave=False)
        for ticker in loop:
            # to_dataset
            dataset = self.make_dataset(data[ticker])

            # checks
            if type(dataset[0]) == type(None) or type(dataset[1]) == type(None):
                print(f"{ticker} is useless when using time_d {self.time_d}")
                continue

            # add to large dataset
            if type(self.X) == type(None):
                self.X = dataset[0]
                self.y = dataset[1]
            else:
                self.X = torch.concat((self.X, dataset[0]))
                self.y = torch.concat((self.y, dataset[1]))

        # scale
        self.find_ranges()
        self.X = self.scale(self.X)
        self.y = self.scale(self.y, with_volume=False)

    def large_df(self, path):
        files = self.get_all_csv_files(path)
        return pd.concat((pd.read_csv(f) for i, f in tqdm(enumerate(files), total=len(files), leave=False)), ignore_index=True)

    def filter_data(self, df: pd.DataFrame):
        df = df[self.cols].dropna(axis=0)
        return df[df['Low'] < self.max_low]

    def format_file(self, path: str):
        df = pd.read_csv(path)
        df = self.filter_data(df)
        return df.to_numpy().tolist()

    def from_csvs(self, path):
        files = self.get_all_csv_files(path)
        loop = tqdm(files, total=len(files), leave=False)
        return {self.ticker_from_path(path): self.format_file(path) for path in loop}

    def load_datasets(self, Xpath: str, ypath: str, scl: dict):
        assert os.path.isfile(Xpath), 'X file does not exist'
        assert os.path.isfile(ypath), 'y file does not exist'

        self.X: torch.Tensor = pickle.load(open(Xpath, 'rb'))
        self.y: torch.Tensor = pickle.load(open(ypath, 'rb'))

        assert self.X.shape[0] / self.time_d == self.X.shape[0] // self.time_d, 'time_d does not seem to be correct for this dataset'
        assert self.y.shape[0] / self.output_params == self.y.shape[0] // self.output_params , 'the output_params seem to be incorrect for this dataset'
        
        self.X = self.X.reshape((self.X.shape[0] // self.time_d, self.time_d, self.X.shape[-1]))
        self.y = self.y.reshape((self.y.shape[0] // self.output_params, self.output_params))

        self.cl = scl
        
    def save_data(self, data: dict, path: str):
        if os.path.isfile(path):
            raise Exception('File already exists')

        pickle.dump(data, open(path, 'wb'))

        if self.compressed:
            with open(path, 'rb') as f_in:
                with gzip.open(path+'.gz', 'wb') as f_out:
                    shutil.copyfileobj(f_in, f_out)

    def load_data(self, path: str):
        if not os.path.isfile(path):
            raise Exception('File does not exist')

        data = None
        if self.compressed:
            with gzip.open(path, 'rb') as f_in:
                with open(path.replace('.gz', ''), 'wb') as f_out:
                    shutil.copyfileobj(f_in, f_out)

        return pickle.load(open(path.replace('.gz', '') if self.compressed else path, 'rb'))

In [58]:
dp = DataProcessor()

In [31]:
data = dp.from_csvs('./stock_market_data/')

                                                   

In [32]:
len(data['AAPL'])

10479

In [34]:
dp.compressed = True
dp.save_data(data, './data/dp.bin')

In [45]:
data = dp.load_data('./data/dp.bin')

In [46]:
dp.make_datasets(data)

                                                   

KeyboardInterrupt: 

In [59]:
dp.load_datasets('./data/Xlarge.bin', './data/ylarge.bin')

In [60]:
dp.X.shape

torch.Size([24210020, 10, 5])

In [61]:
dp.scl

{'money': (0.0, 1.0), 'volume': (0.0, 1.0)}

In [38]:
import torch

def find_ranges(data: torch.Tensor):
    money = data[:, :-1]
    volume = data[:, -1:]

    money_range = (float(torch.min(money)), float(torch.max(money)))
    volume_range = (float(torch.min(volume)), float(torch.max(volume)))

    return money_range, volume_range

money_range, volume_range = find_ranges(torch.tensor(data.to_numpy()))
print(money_range, volume_range)

(0.0, 44500.0) (0.0, 7421640800.0)


In [40]:
def scale(data: torch.Tensor, new_min=0, new_max=1):
    global money_range
    global volume_range
    money = data[:, :4]
    volume = data[:, -1:]

    money = (money - money_range[0]) / (money_range[1] - money_range[0]) * (new_max - new_min) + new_min
    volume = (volume - volume_range[0]) / (volume_range[1] - volume_range[0]) * (new_max - new_min) + new_min
    return torch.hstack((money, volume))

In [45]:
def make_dataset(df: torch.Tensor, time_d: int = 10):
    Xd, yd = None, None

    for i in range(df.shape[0] - time_d - 1):
        idx = i + time_d
        row = df[idx + 1]
        row = row[:-1] # remove volume is it doesn't need to be predicted
        x = df[i:idx]
        if torch.isnan(torch.sum(x)) or torch.isnan(torch.sum(row)):
            continue # if the row (y) has nans, skip

        if type(Xd) == type(None):
            Xd = x
            yd = row
        else:
            Xd = torch.concat((Xd, x))
            yd = torch.concat((yd, row))
        
    return Xd, yd

In [54]:
from tqdm import tqdm

X, y = None, None

loop = tqdm(files, total=len(files))
for file in loop:
    tmp = pd.read_csv(file)
    tmp = tmp[['Low', 'Open', 'High', 'Close', 'Volume']].dropna()
    tmp = tmp[tmp['Low'] < 5e3]

    scaled = scale(torch.Tensor(tmp.to_numpy()))
    dataset = make_dataset(scaled)

    if type(dataset[0]) == type(None) or type(dataset[1]) == type(None):
        print(f"file {file} is useless")
        continue

    if type(X) == type(None):
        X = dataset[0]
        y = dataset[1]
    else:
        X = torch.concat((X, dataset[0]))
        y = torch.concat((y, dataset[1]))

 32%|███▏      | 1363/4280 [24:10<42:59,  1.13it/s]  

file ./stock_market_data/nasdaq\csv\CCUR.csv is useless


 36%|███▌      | 1551/4280 [27:57<1:08:53,  1.51s/it]

file ./stock_market_data/nasdaq\csv\DXM.csv is useless


 40%|███▉      | 1698/4280 [31:10<36:06,  1.19it/s]  

file ./stock_market_data/nasdaq\csv\FSFF.csv is useless


 63%|██████▎   | 2713/4280 [54:44<51:25,  1.97s/it]  

file ./stock_market_data/nyse\csv\ACG.csv is useless


 65%|██████▌   | 2796/4280 [56:57<48:14,  1.95s/it]  

file ./stock_market_data/nyse\csv\AVG.csv is useless


 73%|███████▎  | 3108/4280 [1:06:03<19:29,  1.00it/s]  

file ./stock_market_data/nyse\csv\FGL.csv is useless


 73%|███████▎  | 3127/4280 [1:06:32<19:06,  1.01it/s]

file ./stock_market_data/nyse\csv\FPT.csv is useless


100%|██████████| 4280/4280 [1:55:26<00:00,  1.62s/it]  


In [55]:
X.shape

torch.Size([242100200, 5])

In [56]:
import pickle

In [57]:
pickle.dump(X, open('./data/Xlarge.bin', 'wb'))

In [58]:
pickle.dump(y, open('./data/ylarge.bin', 'wb'))

In [16]:
import torch, pickle

In [17]:
x = pickle.load(open('./data/X.bin', 'rb'))

In [21]:
x.reshape((6906777, 10, 5)).shape

torch.Size([6906777, 10, 5])

In [13]:
c = torch.Tensor(2, 7, 5)

In [15]:
torch.cat((a, b))

tensor([[[0.5939, 0.6351, 0.9802, 0.4510, 0.4155],
         [0.1423, 0.9028, 0.1143, 0.7449, 0.7464],
         [0.3748, 0.6898, 0.1778, 0.4913, 0.3977],
         [0.8381, 0.5423, 0.1430, 0.4897, 0.9605],
         [0.5049, 0.6050, 0.2317, 0.6488, 0.9490],
         [0.9069, 0.8218, 0.7441, 0.8825, 0.8529],
         [0.0739, 0.9540, 0.8932, 0.2987, 0.5890]],

        [[0.1823, 0.8595, 0.1832, 0.9272, 0.9944],
         [0.7184, 0.2080, 0.2378, 0.2978, 0.4247],
         [0.2342, 0.6028, 0.7682, 0.6944, 0.3655],
         [0.6381, 0.3703, 0.3003, 0.3945, 0.7956],
         [0.4893, 0.5456, 0.0970, 0.7302, 0.4601],
         [0.1616, 0.1483, 0.7895, 0.0596, 0.5900],
         [0.3478, 0.8016, 0.8224, 0.1190, 0.4755]]])

In [11]:
c.shape

torch.Size([14, 5])