In [56]:
import os
import numpy as np
import pandas as pd

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, TensorDataset, DataLoader
from torch.optim.adam import Adam

from pytorch_lightning import Trainer
from pytorch_lightning import LightningModule, LightningDataModule
from pytorch_lightning.loggers import WandbLogger
from pytorch_lightning.callbacks import LearningRateMonitor

## Multiple Time Periods

What if we used more than just the most recent pricing details into account when predicting the next close?
We will build a `torch.utils.data.Dataset` to batch our data into multiple time periods per batch then test our models ability to predict the `NextClose`.

In [57]:
df = pd.read_csv('../data/processed/eth_hourly.csv')
df.head()

Unnamed: 0,Open,High,Low,CurrentClose,Volume_USD,NextClose
0,733.12,736.48,731.19,733.04,4246576.84,734.64
1,733.04,735.99,731.7,734.64,2044880.32,731.32
2,734.64,734.65,722.0,731.32,7891317.14,728.44
3,731.32,732.0,728.44,728.44,2111099.12,735.21
4,728.44,739.3,725.52,735.21,7197617.75,732.1


In [58]:
class PricingDataModule(LightningDataModule):
    
    def __init__(
        self, 
        path_to_csv: str,
        batch_size: int, 
        time_periods_to_batch: int = 6, 
        train_size: float = 0.7, 
        test_size: float = 0.2
    ):
        super().__init__()
        
        self.path_to_csv = path_to_csv
        self.batch_size = batch_size
        self.time_periods_to_batch = time_periods_to_batch
        
        assert train_size + test_size <= 1, f"sum of train and test are greater than 1: train_size: {train_size}\ntest_size: {test_size}"
        self.train_size = train_size
        self.test_size = test_size
        
    def prepare_data(self):
        print('In prepare_data')
        
    def setup(self):
        
        dataframe = pd.read_csv(self.path_to_csv)
        pricing_data = dataframe.values
        self.number_of_rows = pricing_data.shape[0]
        self.num_samples = self.number_of_rows - self.time_periods_to_batch - 1
        
        train_samples = int(self.train_size * self.num_samples)
        test_samples = int(self.test_size * self.num_samples)
        
        features = []
        targets = []
        for step, index in enumerate(range(self.num_samples)):
            lower_index = index
            upper_index = lower_index + self.time_periods_to_batch
            pricing_features = pricing_data[lower_index:upper_index, :-1]
            next_close_price = pricing_data[upper_index, -1]
            
            features.append(pricing_features)
            targets.append(next_close_price)
        
        features = torch.FloatTensor(features)
        targets = torch.FloatTensor(targets)
        
        train_features = features[0:train_samples]
        train_targets = targets[0:train_samples]
        
        test_features = features[train_samples:train_samples+test_samples]
        test_targets = targets[train_samples:train_samples+test_samples]
        
        if train_samples + test_samples < self.num_samples:
            val_features = features[train_samples+test_samples:]
            val_targets = targets[train_samples+test_samples:]
        
        self.train_dataset = TensorDataset(train_features, train_targets)
        self.test_dataset = TensorDataset(test_features, test_targets)
        self.val_dataset = TensorDataset(val_features, val_targets)

    def train_dataloader(self):
        return DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=True)

    def val_dataloader(self):
        return DataLoader(self.test_dataset, batch_size=self.batch_size, shuffle=False)

    def test_dataloader(self):
        return DataLoader(self.val_dataset, batch_size=self.batch_size, shuffle=False)

In [88]:
class ConvModel(LightningModule):
    def __init__(self, in_channels: int = 6, out_channels: int = 6):
        super().__init__()
        self.in_channels = in_channels
        self.out_channels = out_channels
        
        self.conv1 = nn.Conv1d(in_channels=in_channels, out_channels=out_channels, kernel_size=3)
        self.conv2 = nn.Conv1d(in_channels=in_channels, out_channels=out_channels, kernel_size=3)
        self.linear1 = nn.Linear(in_features=out_channels, out_features=1)
        self.loss_fn = nn.L1Loss()
        
    def forward(self, x):
        x = F.gelu(F.max_pool1d(self.conv1(x), 1))
        x = F.gelu(F.max_pool1d(self.conv2(x), 1))
        x = x.squeeze(-1)
        x = self.linear1(x)
        return x
    
    def _shared_pass(self, batch):
        feature, target = batch
        model_prediction = self(feature)
        loss = self.loss_fn(model_prediction, target.reshape(-1, 1))
        return loss
    
    def training_step(self, batch, batch_idx):
        loss = self._shared_pass(batch)
        self.log('train_loss', loss, on_step=True, on_epoch=True)
        return loss
    
    def validation_step(self, batch, batch_idx):
        loss = self._shared_pass(batch)
        self.log('validation_loss', loss, on_step=False, on_epoch=True)
        self.log('batch_size', batch[0].shape[0], on_epoch=True)
        return loss
    
    def test_step(self, batch, batch_idx):
        loss = self._shared_pass(batch)
        self.log('test_loss', loss, on_step=False, on_epoch=True)
        return loss
        
    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=3e-5)

In [83]:
hparams = {
    'time_periods_to_batch': 24,
    'batch_size': 32,
    'epochs': 100
}

In [84]:
pricing_dm = PricingDataModule(
    path_to_csv='../data/processed/eth_hourly.csv', 
    batch_size=hparams['batch_size'], 
    time_periods_to_batch=time_periods
)
pricing_dm.setup()

conv_model = ConvModel(
    in_channels=hparams['time_periods_to_batch'], 
    out_channels=hparams['time_periods_to_batch']
)

lr_monitor = LearningRateMonitor(logging_interval='step')
wandb_logger = WandbLogger(project='ETH-Price', log_model=True)

trainer = Trainer(
    logger=wandb_logger, 
    max_epochs=hparams['epochs'], 
    auto_lr_find=True, 
    auto_scale_batch_size=True,
    callbacks=[lr_monitor]
)
# log gradients and model topology
wandb_logger.watch(conv_model)
wandb_logger.log_hyperparams(hparams)

GPU available: False, used: False
TPU available: False, using: 0 TPU cores


In [85]:
trainer.fit(conv_model, pricing_dm)


  | Name    | Type   | Params
-----------------------------------
0 | conv1   | Conv1d | 1.8 K 
1 | conv2   | Conv1d | 1.8 K 
2 | linear1 | Linear | 6.0 K 
3 | linear2 | Linear | 5.8 K 
4 | linear3 | Linear | 25    
5 | loss_fn | L1Loss | 0     
-----------------------------------
15.3 K    Trainable params
0         Non-trainable params
15.3 K    Total params
0.061     Total estimated model params size (MB)


In prepare_data


Validation sanity check: 0it [00:00, ?it/s]

Training: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]



In [86]:
trainer.test(conv_model, pricing_dm.test_dataloader(), )

Testing: 0it [00:00, ?it/s]

--------------------------------------------------------------------------------
DATALOADER:0 TEST RESULTS
{'test_loss': 964.4672241210938}
--------------------------------------------------------------------------------


[{'test_loss': 964.4672241210938}]

In [87]:
wandb_logger.experiment.finish()

VBox(children=(Label(value=' 0.00MB of 0.00MB uploaded (0.00MB deduped)\r'), FloatProgress(value=1.0, max=1.0)…

0,1
train_loss_step,236.26668
epoch,99.0
trainer/global_step,53100.0
_runtime,272.0
_timestamp,1623933998.0
_step,1262.0
train_loss_epoch,224.04512
validation_loss,307.54071
batch_size,31.948
test_loss,964.46722


0,1
train_loss_step,▁▆▃▃▄▄▄▃▄▄▆▃▄▆▅▄▄▄▃▃▄▃▄▄▄▃▃▆▃▃█▇▄▅▄▄▆▄▅▇
epoch,▁▁▁▁▂▂▂▂▂▃▃▃▃▃▃▄▄▄▄▄▅▅▅▅▅▅▆▆▆▆▆▇▇▇▇▇▇███
trainer/global_step,▁▁▁▁▂▂▂▂▂▃▃▃▃▃▃▄▄▄▄▄▅▅▅▅▅▅▆▆▆▆▆▇▇▇▇▇▇███
_runtime,▁▁▁▁▂▂▂▂▂▃▃▃▃▃▃▄▄▄▄▄▅▅▅▅▅▅▆▆▆▆▆▇▇▇▇▇▇███
_timestamp,▁▁▁▁▂▂▂▂▂▃▃▃▃▃▃▄▄▄▄▄▅▅▅▅▅▅▆▆▆▆▆▇▇▇▇▇▇███
_step,▁▁▁▁▂▂▂▂▂▃▃▃▃▃▃▄▄▄▄▄▅▅▅▅▅▅▆▆▆▆▆▇▇▇▇▇▇███
train_loss_epoch,█▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁
validation_loss,█▂▂▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁
batch_size,▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁
test_loss,▁
