In [157]:
from unittest import case

import torch
from joblib.externals.loky import cpu_count
from pytorch_lightning.callbacks import ModelCheckpoint
from pytorch_lightning.loggers import TensorBoardLogger
from torchmetrics.functional import accuracy
import torch.multiprocessing as mp


has_mps = torch.backends.mps.is_built()
device = "mps" if has_mps else "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")



Using device: cuda


In [158]:
import numpy


In [159]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import StandardScaler
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch.utils.data import random_split, DataLoader
from tqdm import tqdm
import os
import pytorch_lightning as pl
import json
from sklearn.metrics import accuracy_score
from torch.utils.data import Dataset

from sklearn.preprocessing import MinMaxScaler
import joblib
import pickle


In [160]:
from decimal import Decimal
from dataclasses import dataclass, field
import json
import dataclasses


@dataclass
class PreProcessed:
    train_index_start:int
    val_index_start:int
    test_index_start:int
    train_index_end:int
    val_index_end:int
    test_index_end:int
    train_set_length:int
    val_set_length:int
    test_set_length:int
    file:str

    def to_dict(self):
        return {
            'train_index_start':train_index_start,
            'val_index_start':val_index_start,
            'test_index_start':test_index_start,
            'train_index_end':train_index_end,
            'val_index_end':val_index_end,
            'test_index_end':test_index_end,
            'train_set_length':train_set_length,
            'val_set_length':val_set_length,
            'test_set_length':test_set_length,
            'file':file
        }
    def __getstate__(self):
        return self.__dict__

    def __setstate__(self, d):
        self.__dict__ = d


class PreProcessedEncoder(json.JSONEncoder):
        def default(self, o):
            if dataclasses.is_dataclass(o):
                return dataclasses.asdict(o)
            if isinstance(o, Decimal):
                return str(o)
            return super().default(o)


In [161]:
a = [1,2,3,4]
max(a)
json.dumps([PreProcessed(1,2,3,4,5,6,7,8,9,"aaaa")], cls=PreProcessedEncoder)


'[{"train_index_start": 1, "val_index_start": 2, "test_index_start": 3, "train_index_end": 4, "val_index_end": 5, "test_index_end": 6, "train_set_length": 7, "val_set_length": 8, "test_set_length": 9, "file": "aaaa"}]'

In [162]:
def convert_to_sequences(memory, days_prediction, data_sequence):
    x = []
    y = []
    for i in range(len(data_sequence) - memory - max(days_prediction)):
        intermediate_y = []
        window = data_sequence[i:i+memory]
        for k in range(memory):
            prediction = []
            for j in range(len(days_prediction)) :
                after_days = days_prediction[j]
                prediction.append(data_sequence[i + k  + after_days - 1,[8,9]])
            after_window = np.hstack(prediction)
            intermediate_y.append(after_window)
        x.append(window)
        y.append(intermediate_y)
    return np.asarray(x), np.asarray(y)
    

In [163]:
def populate_scaler(csv_directory, cache_directory):

    jumbo_df = pd.DataFrame()
    for f in tqdm(os.listdir(csv_directory)):
        file = os.path.join(csv_directory, f)
        df = pd.read_csv(file, sep=',', index_col=False)
        jumbo_df = pd.concat([jumbo_df, df])
    scaler = MinMaxScaler()
    scaler.fit(jumbo_df)
    joblib.dump(scaler, os.path.join(cache_directory, "scaler.gz"))

In [164]:
populate_scaler("/data/datasets/stockPredictor/outputs", "/data/datasets/stockPredictor/cache")


  0%|          | 0/75 [00:00<?, ?it/s][A
 15%|█▍        | 11/75 [00:00<00:00, 107.47it/s][A
 29%|██▉       | 22/75 [00:00<00:00, 85.96it/s] [A
 41%|████▏     | 31/75 [00:00<00:00, 76.87it/s][A
 52%|█████▏    | 39/75 [00:00<00:00, 68.15it/s][A
 61%|██████▏   | 46/75 [00:00<00:00, 64.99it/s][A
 71%|███████   | 53/75 [00:00<00:00, 60.65it/s][A
 80%|████████  | 60/75 [00:00<00:00, 55.03it/s][A
 88%|████████▊ | 66/75 [00:01<00:00, 52.89it/s][A
100%|██████████| 75/75 [00:01<00:00, 59.64it/s][A


In [165]:
def load_data(directory, cache_directory, memory, train_perc, val_perc, device, forecast_days = [1, 7, 15]):
    rv = []
    cache_json_file = os.path.join(cache_directory, "preprocessed.json")
    if os.path.exists(cache_json_file):
        with open(cache_json_file, "r") as f :
            for o in json.load(f):
                v = PreProcessed(**o)
                rv.append(PreProcessed(**o))
        return rv;
    train_input_dataset = []
    train_output_dataset = []
    val_input_dataset = []
    val_output_dataset = []
    test_input_dataset = []
    test_output_dataset = []
    count = 1
    train_data_size = 0
    val_data_size = 0
    test_data_size = 0
    train_index_start = 0
    val_index_start = 0
    test_index_start = 0
    for f in tqdm(os.listdir(directory)):
        file = os.path.join(directory, f)
        df = pd.read_csv(file, sep=',', index_col=False)
        input,output = convert_to_sequences(memory, forecast_days, df.to_numpy())
        train_size = int(len(input) * train_perc);
        val_size = int(len(input) * val_perc);
        test_size = int(len(input) - train_size - val_size)
        train_input_dataset_single, val_input_dataset_single, test_input_dataset_single = random_split(input, [train_size, val_size, test_size])
        train_output_dataset_single, val_output_dataset_single, test_output_dataset_single = random_split(output, [train_size, val_size, test_size])
        train_input_dataset = np.vstack([train_input_dataset, train_input_dataset_single]) if (len(train_input_dataset) != 0) else train_input_dataset_single 
        train_output_dataset = np.vstack([train_output_dataset, train_output_dataset_single]) if (len(train_output_dataset) != 0) else train_output_dataset_single 
        val_input_dataset = np.vstack([val_input_dataset, val_input_dataset_single]) if (len(val_input_dataset) != 0) else val_input_dataset_single 
        val_output_dataset = np.vstack([val_output_dataset, val_output_dataset_single]) if (len(val_output_dataset) != 0) else val_output_dataset_single 
        test_input_dataset = np.vstack([test_input_dataset, test_input_dataset_single]) if (len(test_input_dataset) != 0) else test_input_dataset_single 
        test_output_dataset = np.vstack([test_output_dataset, test_output_dataset_single]) if (len(test_output_dataset) != 0) else test_output_dataset_single
        train_data_size = len(train_input_dataset)
        val_data_size = len(val_input_dataset)
        test_data_size = len(test_input_dataset)
        if (train_data_size > 20000):
            print("Writing file of size ", train_data_size)
            outfile = os.path.join(cache_directory, 'preprocessed-{}.npz'.format(count))
            np.savez_compressed(outfile, train_input=np.asarray(train_input_dataset), train_output=np.asarray(train_output_dataset), val_input=np.asarray(val_input_dataset), val_output=np.asarray(val_output_dataset), test_input=np.asarray(test_input_dataset), test_output=np.asarray(test_output_dataset)) 
            rv.append(PreProcessed(train_index_start, val_index_start, test_index_start, train_index_start + train_data_size - 1, val_index_start + val_data_size - 1, test_index_start + test_data_size - 1, train_data_size, val_data_size, test_data_size, outfile))
            train_index_start = train_index_start + train_data_size
            val_index_start = val_index_start + val_data_size
            test_index_start = test_index_start + test_data_size
            train_data_size = 0
            val_data_size = 0
            test_data_size = 0
            count = count + 1
            train_input_dataset = []
            train_output_dataset = []
            val_input_dataset = []
            val_output_dataset = []
            test_input_dataset = []
            test_output_dataset = []
    with open(cache_json_file, "w") as f :
        json.dump(rv, f,cls=PreProcessedEncoder)
    return rv

In [166]:
a = 4
i = 0 if a else 45
print(i)

0


In [167]:
from enum import Enum


class DatasetType(Enum):
    TRAIN = 1,
    VALIDATE = 2,
    TEST = 3

In [168]:
def findBucketIndex(idx, dataset_type:DatasetType, sequence):
    match dataset_type:
        case DatasetType.TRAIN:
            for i in range(0, len(sequence)):
                if idx >= sequence[i].train_index_start and idx <= sequence[i].train_index_end:
                    return i
            return len(sequence) - 1
        case DatasetType.VALIDATE: 
            for i in range(0, len(sequence)):
                if idx >= sequence[i].val_index_start and idx <= sequence[i].val_index_end:
                    return i
            return len(sequence) - 1
        case DatasetType.TEST: 
            for i in range(0, len(sequence)):
                if idx >= sequence[i].test_index_start and idx <= sequence[i].test_index_end:
                    return i
            return len(sequence) - 1

In [169]:
from torch.utils.data import Dataset


class StockPredictorDataset(Dataset) :
    def __init__(self, base_directory, dataset_type:DatasetType, sequences: list[PreProcessed],device) :
        self.base_directory = base_directory
        self.sequences = sequences
        self.dataset_type = dataset_type
        self.length = 0
        self.current_bucket_index = 0
        self.device = device
        for p in sequences:
            match self.dataset_type:
                case DatasetType.TRAIN:
                    self.length = max(p.train_index_end, self.length)
                case DatasetType.VALIDATE:
                    self.length = max(p.val_index_end, self.length)
                case DatasetType.TEST:
                    self.length = max(p.test_index_end, self.length)
        self.length += 1

    def __getstate__(self):
        return self.__dict__

    def __setstate__(self, d):
        self.__dict__ = d

    def __len__(self):
        return self.length

    def __getitem__(self, idx):
        current_dataset_bucket = self.sequences[self.current_bucket_index]
        current_bucket_data = np.load(self.current_dataset_bucket.file)
        match self.dataset_type:
            case DatasetType.TRAIN:
                if (idx < current_dataset_bucket.train_index_start or idx > current_dataset_bucket.train_index_end):
                    self.current_bucket_index = findBucketIndex(idx, DatasetType.TRAIN, self.sequences)
                    current_dataset_bucket = self.sequences[self.current_bucket_index]
                    current_bucket_data = np.load(current_dataset_bucket.file)
                index_in_bucket = idx - current_dataset_bucket.train_index_start;
                train_ip = torch.from_numpy(current_bucket_data['train_input'][index_in_bucket]).float().to(self.device)
                train_op = torch.from_numpy(current_bucket_data['train_output'][index_in_bucket]).float().to(self.device)
                return train_ip,train_op, 
            case DatasetType.VALIDATE:
                if (idx < current_dataset_bucket.val_index_start or idx > current_dataset_bucket.val_index_end):
                    self.current_bucket_index = findBucketIndex(idx, DatasetType.VALIDATE, self.sequences)
                    current_dataset_bucket = self.sequences[self.current_bucket_index]
                    current_bucket_data = np.load(current_dataset_bucket.file)
                index_in_bucket = idx - current_dataset_bucket.val_index_start;
                val_ip = torch.from_numpy(current_bucket_data['val_input'][index_in_bucket]).float().to(self.device)
                val_op = torch.from_numpy(current_bucket_data['val_output'][index_in_bucket]).float().to(self.device)
                return val_ip,val_op, 
            case DatasetType.TEST:
                if (idx < current_dataset_bucket.test_index_start or idx > current_dataset_bucket.test_index_end):
                    self.current_bucket_index = findBucketIndex(idx, DatasetType.TEST, self.sequences)
                    current_dataset_bucket = self.sequences[self.current_bucket_index]
                    current_bucket_data = np.load(current_dataset_bucket.file)
                index_in_bucket = idx - current_dataset_bucket.test_index_start;
                test_ip = torch.from_numpy(current_bucket_data['test_input'][index_in_bucket]).float().to(self.device)
                test_op =torch.from_numpy(current_bucket_data['test_output'][index_in_bucket]).float().to(self.device)
                return test_ip,test_op,

In [170]:
class StockPreductorDataModule(pl.LightningDataModule):
    def __init__(self, base_directory, device, train_batch_size: int = 512, val_batch_size:int = 64, test_batch_size:int = 32):
        super().__init__()
        self.base_directory = base_directory
        self.sequences = load_data(os.path.join(base_directory, 'outputs'), os.path.join(base_directory, 'cache'), 300, 0.70, 0.15, device)
        self.train_batch_size = train_batch_size
        self.val_batch_size = val_batch_size
        self.test_batch_size = test_batch_size
        self.device = device;

    def __getstate__(self):
        self.sequences = None
        return self.__dict__

    def __setstate__(self, d):
        self.__dict__ = d
        self.sequences = load_data(os.path.join(self.base_directory, 'outputs'), os.path.join(self.base_directory, 'cache'), 300, 0.70, 0.15, device)

    def setup(self, stage=None):
        self.train_dataset = StockPredictorDataset(self.base_directory, DatasetType.TRAIN, self.sequences, self.device)
        self.val_dataset = StockPredictorDataset(self.base_directory, DatasetType.VALIDATE, self.sequences, self.device)
        self.test_dataset = StockPredictorDataset(self.base_directory, DatasetType.TEST, self.sequences, self.device)

    def train_dataloader(self):
        return DataLoader(self.train_dataset, batch_size=self.train_batch_size, shuffle=True, num_workers=19, persistent_workers=True)

    def val_dataloader(self):
        return DataLoader(self.val_dataset, batch_size=self.val_batch_size, shuffle=False, num_workers=19, persistent_workers=True)

    def test_dataloader(self):
        return DataLoader(self.test_dataset, batch_size=self.test_batch_size, shuffle=False, num_workers=1, persistent_workers=True)

In [171]:
class PositionalEncoder(nn.Module):
    """
    PyTorch Lightning module for positional encoding, suitable for transformer models.
    """
    def __init__(self, d_model:int, dropout=0.1, max_len: int = 5000):
        super(PositionalEncoder, self).__init__()
        self.dropout = nn.Dropout(p=dropout)
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Args:
            x: Tensor of shape (batch_size, seq_len, d_model)
        Returns:
            Tensor with positional encoding added.
        """
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)


In [172]:
class TransformerModel(nn.Module):
    """
    PyTorch Lightning module for a generic Transformer model.
    """
    def __init__(self, input_dim=31, model_dim=512, num_heads=32, num_layers=3, output_dim=6, dropout=0.1):
        super().__init__()
        self.model_dim = model_dim
        self.embedding = nn.Linear(input_dim, model_dim)
        self.pos_encoder = PositionalEncoder(model_dim, dropout)
        encoder_layer = nn.TransformerEncoderLayer(d_model=model_dim, nhead=num_heads, dropout=dropout, batch_first=True)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.criterion = nn.MSELoss()
        self.decoder = nn.Linear(model_dim, output_dim)

    def forward(self, x):
        # x: (batch, seq_len, input_dim)
        x = self.embedding(x)
        x = self.pos_encoder(x)
        x = self.transformer_encoder(x)
        x = self.decoder(x)
        return x


In [173]:
class StockPredictor(pl.LightningModule):
    def __init__(self, device, learning_rate: float = 1e-4):
        super().__init__()
        self.model = TransformerModel().to(device)
        self.learning_rate = learning_rate
        self.criterion = nn.MSELoss()

    def forward(self, x, outputs=None):
        output = self.model(x)
        loss = 0
        if outputs is not None:
            loss = self.criterion(output, outputs)
        return output, loss

    def training_step(self, batch, batch_idx):
        outputs, loss = self.common_step(batch, batch_idx)
        self.log('train_loss', loss)
        return loss

    def validation_step(self, batch, batch_idx):
        outputs, loss = self.common_step(batch, batch_idx)
        self.log('val_loss', loss)
        return loss

    def test_step(self, batch, batch_idx):
        outputs, loss = self.common_step(batch, batch_idx)
        self.log('test_loss', loss)
        return loss

    def common_step(self, batch, batch_idx):
        inputs, targets = batch
        print('Input Shape = ', inputs.shape, ' target shape = ' , targets.shape)
        outputs, loss = self(inputs, targets)
        predictions = torch.argmax(outputs, dim=1)
        return outputs, loss

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.learning_rate)
        scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=5)
        return {'optimizer': optimizer, 'lr_scheduler': scheduler, 'monitor': 'val_loss'}

In [174]:
%load_ext tensorboard
%tensorboard --logdir lightning_logs


The tensorboard extension is already loaded. To reload it, use:
  %reload_ext tensorboard


Reusing TensorBoard on port 6006 (pid 304033), started 0:18:52 ago. (Use '!kill 304033' to kill it.)

In [175]:
from pytorch_lightning.callbacks import ModelCheckpoint
from pytorch_lightning.loggers import TensorBoardLogger

checkpoint_callback = ModelCheckpoint(dirpath = "checkpoints", filename="best-checkpoint", save_top_k=1, verbose=True, monitor="val_loss", mode="min")
logger = TensorBoardLogger('lightning_logs', name="stock_predictor")

trainer = pl.Trainer(logger=logger, callbacks=[checkpoint_callback], max_epochs=500)

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs


In [176]:
mp.set_start_method('spawn', force=True)
model = StockPredictor(device).to(device)
data_module = StockPreductorDataModule(base_directory="/data/datasets/stockPredictor", device=device, train_batch_size=1024, val_batch_size=64, test_batch_size=32)
data_module.setup()
model.train()
trainer.fit(model, data_module)

LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name      | Type             | Params | Mode 
-------------------------------------------------------
0 | model     | TransformerModel | 9.5 M  | train
1 | criterion | MSELoss          | 0      | train
-------------------------------------------------------
9.5 M     Trainable params
0         Non-trainable params
9.5 M     Total params
37.906    Total estimated model params size (MB)
39        Modules in train mode
0         Modules in eval mode


Sanity Checking: |          | 0/? [00:00<?, ?it/s]

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/home/vavasthi/anaconda3/lib/python3.11/multiprocessing/spawn.py", line 122, in spawn_main
    exitcode = _main(fd, parent_sentinel)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/vavasthi/anaconda3/lib/python3.11/multiprocessing/spawn.py", line 132, in _main
    self = reduction.pickle.load(from_parent)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: Can't get attribute 'StockPredictorDataset' on <module '__main__' (built-in)>
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/home/vavasthi/anaconda3/lib/python3.11/multiprocessing/spawn.py", line 122, in spawn_main
    exitcode = _main(fd, parent_sentinel)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/vavasthi/anaconda3/lib/python3.11/multiprocessing/spawn.py", line 132, in _main
    self = reduction.pickle.load(from_parent)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: Can'

RuntimeError: DataLoader worker (pid(s) 309257, 309258, 309259, 309260, 309261, 309262, 309263, 309264, 309265, 309266, 309267, 309268, 309269, 309270, 309271, 309272, 309273, 309274, 309275) exited unexpectedly