https://github.com/walsvid/CoordConv/blob/master/coordconv.py
https://medium.com/analytics-vidhya/encoding-time-series-as-images-b043becbdbf3
https://eng.uber.com/coordconv/
https://towardsdatascience.com/reading-charts-with-convolutional-neural-networks-cbaabdd5f478

# Prerequisites

Run the cells below if the named packages are not installed on your evironment yet

In [1]:
!pip install optuna



You should consider upgrading via the 'c:\users\yvesd\anaconda3\python.exe -m pip install --upgrade pip' command.


# Intro

In this notebook we build a CNN classifier for the problem. The input data for this classifier consists of 1 image of the Gramian Angular Difference field for the past month, 1 image of the area plot of smoothed log prices for the past month, and a collection of numerical data on past month returns & volatility.

In [2]:
import pandas as pd
from torch.utils.data import Dataset

import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data
import torchvision.transforms as transforms

from PIL import ImageFile
from PIL import Image
ImageFile.LOAD_TRUNCATED_IMAGES = True

In [3]:
# Create a custom data loader for the train, test, & validation data
class NumericalAndImageDataset(Dataset):
    def __init__(self, overview_file: str, transform=None):
        """
        Initialize this dataloader
        :param overview_file: location of the overview file
        :param transform: transformer for the images
        """
        self.overview= pd.read_csv(overview_file)
        self.transform = transform


    def __len__(self):
        return len(self.overview.index)


    def __getitem__(self, idx):
        img_path_1m = self.overview["1_month_img"].iloc[idx]

        img_1m = Image.open(img_path_1m).convert('RGB') # Store image as RGB (3-channel)

        label = self.overview.label_1m.iloc[idx]

        if self.transform:
            img_1m = self.transform(img_1m)

        return img_1m, label

In [4]:
class OneImageNet(nn.Module):
    def __init__(self):
        super(OneImageNet, self).__init__()

        self.image_1_features = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )

        self.image_1_flat = nn.Sequential(
            nn.Dropout(p=0.25),
            nn.Linear(15 * 15 * 64, 128),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.5),
            nn.Linear(128, 3)
        )

    def forward(self, img_1):
        step1 = self.image_1_features(img_1)

        step1 = step1.view(step1.size(0), -1)

        step1 = self.image_1_flat(step1)

        return step1

In [5]:
def test(model, test_loader, criterion):
    model.eval()
    running_loss = 0
    running_corrects = 0
    running_total = 0

    df_collector = []

    for img_1ms, labels in test_loader:
        outputs = model(img_1ms)
        loss = criterion(outputs, labels)
        _, preds = torch.max(outputs, 1)

        running_loss += loss.item() * img_1ms.size(0)
        running_corrects += torch.sum(preds == labels.data)

        act_vs_pred_temp = pd.DataFrame({
            "actual": labels.data.numpy(),
            "pred": preds.numpy()
        })

        df_collector.append(act_vs_pred_temp)

        running_total += len(img_1ms)

    total_loss = running_loss / running_total
    total_acc = running_corrects.double() / running_total

    act_vs_pred = pd.concat(df_collector)

    return total_loss, total_acc, act_vs_pred

In [6]:
def train(model, train_loader, criterion, optimizer, epochs):
    datasets = {'train':train_loader}

    train_losses = []
    train_accs = []

    for epoch in range(epochs):

        for phase in ['train']:
            if phase == 'train':
                model.train()
                running_loss = 0.0
                running_corrects = 0
                running_total = 0

                for pos, (img_1ms, labels) in enumerate(datasets[phase]):

                    outputs = model(img_1ms)
                    loss = criterion(outputs, labels)

                    if phase=='train':
                        optimizer.zero_grad()
                        loss.backward()
                        optimizer.step()

                    _, preds = torch.max(outputs, 1)
                    running_loss += loss.item() * img_1ms.size(0)
                    running_corrects += torch.sum(preds == labels.data)
                    running_total += len(img_1ms)

                train_losses.append(running_loss / running_total)
                train_accs.append(running_corrects / running_total)

    return model, train_losses, train_accs

In [7]:
def create_data_loaders(batch_size):
    train_data_path = "ModelData/obs_train.csv"
    test_data_path = "ModelData/obs_test.csv"
    val_data_path = "ModelData/obs_val.csv"

    train_transform = transforms.Compose([
        # transforms.RandomResizedCrop((224, 224)),
        # transforms.RandomHorizontalFlip(),
        # transforms.Resize((30, 30)),
        transforms.ToTensor(),
    ])

    test_transform = transforms.Compose([
        # transforms.Resize((30, 30)),
        transforms.ToTensor(),
    ])

    train_data = NumericalAndImageDataset(
        overview_file=train_data_path,
        transform=train_transform
    )
    train_data_loader = torch.utils.data.DataLoader(train_data, batch_size=batch_size, shuffle=True)

    test_data = NumericalAndImageDataset(
        overview_file=test_data_path,
        transform=test_transform
    )
    test_data_loader = torch.utils.data.DataLoader(test_data, batch_size=batch_size, shuffle=True)

    val_data = NumericalAndImageDataset(
        overview_file=val_data_path,
        transform=test_transform
    )
    val_data_loader = torch.utils.data.DataLoader(val_data, batch_size=batch_size, shuffle=True)

    return train_data_loader, test_data_loader, val_data_loader

In [None]:
import optuna

def objective(trial):

    params = {
        "learning_rate": trial.suggest_loguniform("learning_rate", 0.0001, 0.01),
        "epochs": trial.suggest_int("epochs", 50, 200),
        "batch_size": trial.suggest_int("batch_size", 64, 2056)
    }

    train_loader, _, val_loader = create_data_loaders(params["batch_size"])

    model = OneImageNet()

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=params["learning_rate"]) # Use adaptive momentum optimizer

    model, _, _ = train(model, train_loader, criterion, optimizer, params["epochs"])

    _, val_acc, _ = test(model, val_loader, criterion)

    return val_acc

study = optuna.create_study(direction="maximize", sampler=optuna.samplers.TPESampler())
study.optimize(objective, n_trials=5)

[32m[I 2022-08-19 22:31:49,759][0m A new study created in memory with name: no-name-dfc5acbd-bc89-46e5-95fe-89160c789ef8[0m
[32m[I 2022-08-19 23:33:59,140][0m Trial 0 finished with value: 0.6375929682217715 and parameters: {'learning_rate': 0.00018278937444306029, 'epochs': 176, 'batch_size': 1300}. Best is trial 0 with value: 0.6375929682217715.[0m
[32m[I 2022-08-20 00:03:17,818][0m Trial 1 finished with value: 0.398016677935542 and parameters: {'learning_rate': 0.00015557871092569854, 'epochs': 75, 'batch_size': 1541}. Best is trial 0 with value: 0.6375929682217715.[0m


In [None]:
for param, value in study.best_trial.params.items():
    print("{}: {}".format(param, value))

In [None]:
optuna.visualization.plot_param_importances(study)

In [None]:
optuna.visualization.plot_intermediate_values(study)