In [None]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from itertools import chain

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

from sklearn.metrics import f1_score

import optuna
from optuna.trial import TrialState
from optuna.visualization import plot_optimization_history
from optuna.visualization import plot_contour
from optuna.visualization import plot_edf
from optuna.visualization import plot_intermediate_values
from optuna.visualization import plot_optimization_history
from optuna.visualization import plot_parallel_coordinate
from optuna.visualization import plot_param_importances
from optuna.visualization import plot_slice

import os

Data

In [None]:
#pip install optuna

In [None]:
train = np.load('/content/baseline_train_smote.npz')
validation = np.load('/content/baseline_validation.npz')

In [None]:
DEVICE = torch.device("cuda:0")
BATCHSIZE = 128
CLASSES = 2
DIR = os.getcwd()
EPOCHS = 10
N_TRAIN_EXAMPLES = BATCHSIZE * 30
N_VALID_EXAMPLES = BATCHSIZE * 10

In [None]:
# train
X_train = torch.tensor(train["x"], dtype=torch.float32).to(DEVICE)
y_train = torch.tensor(train["y"], dtype=torch.float32).reshape(-1,1).to(DEVICE)
train_dataset = torch.utils.data.TensorDataset(X_train, y_train)

# validation
X_val = torch.tensor(validation["x"], dtype=torch.float32).to(DEVICE)
y_val = torch.tensor(validation["y"], dtype=torch.float32).reshape(-1,1).to(DEVICE)
val_dataset = torch.utils.data.TensorDataset(X_val, y_val)

## Optuna setup

Followed tutorial on their website. 

In [None]:
# Data loader 
def get_data():
  train_loader = torch.utils.data.DataLoader(train_dataset, 
                                           batch_size=BATCHSIZE, 
                                           shuffle=True) 
  val_loader = torch.utils.data.DataLoader(val_dataset, 
                                           batch_size=BATCHSIZE, 
                                           shuffle=True)
  return train_loader, val_loader 

In [None]:
def define_model(trial):
  # Optimize number of layers, hidden units, dropout ratio
  n_layers = trial.suggest_int("n_layers", 1, 5)
  layers = []

  in_features = 106
  for i in range(n_layers):
    out_features = trial.suggest_int("n_units_l{}".format(i), 4, 100)
    layers.append(nn.Linear(in_features, out_features))
    layers.append(nn.ReLU())
    p = trial.suggest_float("dropout_l{}".format(i), 0.4, 0.8)
    layers.append(nn.Dropout(p))

    in_features = out_features 
  layers.append(nn.Linear(in_features, 1))
  layers.append(nn.Sigmoid())

  return nn.Sequential(*layers)

In [None]:
def objective(trial):
    # Generate the model.
    model = define_model(trial).to(DEVICE)

    # Generate the optimizers.
    optimizer_name = trial.suggest_categorical("optimizer", ["Adam", "RMSprop", "SGD"])
    lr = trial.suggest_float("lr", 1e-5, 1e-1, log=True)
    optimizer = getattr(optim, optimizer_name)(model.parameters(), lr=lr)

    # Get the dataset.
    train_loader, valid_loader = get_data()

    # Training of the model.
    for epoch in range(EPOCHS):
        pred_labels = []
        true_labels = []
        model.train()
        for batch_idx, (data, target) in enumerate(train_loader):
            # Limiting training data for faster epochs.
            if batch_idx * BATCHSIZE >= N_TRAIN_EXAMPLES:
                break

            data, target = data.view(data.size(0), -1).to(DEVICE), target.to(DEVICE)

            optimizer.zero_grad()
            output = model(data)
            loss = F.binary_cross_entropy(output, target)
            loss.backward()
            optimizer.step()

        # Validation of the model.
        model.eval()
        f1 = 0
        with torch.no_grad():
            for batch_idx, (data, target) in enumerate(valid_loader):
                # Limiting validation data.
                if batch_idx * BATCHSIZE >= N_VALID_EXAMPLES:
                  break
                data, target = data.view(data.size(0), -1).to(DEVICE), target.to(DEVICE)
                output = model(data)

                # Get the index of the max log-probability.
                pred = output.round() #output.argmax(dim=1, keepdim=True)
                pred_labels.append(list(chain.from_iterable(pred.tolist())))
                true_labels.append(list(chain.from_iterable(target.tolist())))


        f1 = f1_score(true_labels, pred_labels, average="weighted")

        trial.report(f1, epoch)

        # Handle pruning based on the intermediate value.
        if trial.should_prune():
            raise optuna.exceptions.TrialPruned()

    return f1 #f1_score

In [None]:
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=100, timeout=600)

In [None]:
plot_optimization_history(study)

In [None]:
plot_intermediate_values(study)

In [None]:
plot_contour(study)

In [None]:
plot_param_importances(study)

In [None]:
plot_slice(study)

In [None]:
print(study.best_trial.value) 

In [None]:
study.best_trial