In [1]:
%matplotlib inline
%config InlineBackend.figure_formats={'png','retina'}
import numpy as np
np.set_printoptions(precision=3, suppress=True)
import pandas as pd
import json
import time

import optuna
import torch
import torch.nn as nn

from model import model_utils
from model import fusion_model
from model import schedular

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f'Optuna version: {optuna.__version__}')
print(f'Torch version: {torch.__version__}')
print(f'Device: {DEVICE}')

Optuna version: 4.3.0
Torch version: 2.6.0
Device: cuda


## Hyperparameter search
+ Adjusting hyperparameters will take a lot of time
+ The hyperparameters used for CycPeptMP are the results of 150 trials of search

In [2]:
# Use auxiliary loss during training
USE_AUXILIARY = True

def create_model(trial):
    """
    Hyperparameters search for the Fusion model.
    """
    activation_name           = trial.suggest_categorical("activation_name", ['ReLU', 'LeakyReLU', 'SiLU', 'GELU'])
    dim_linear                = trial.suggest_categorical("dim_linear", [64, 128, 256, 512])
    dim_out                   = trial.suggest_categorical("dim_out", [16, 32, 64])
    # Trans
    Trans_activation          = activation_name
    Trans_dropout_rate        = trial.suggest_float("Trans_dropout_rate", 0.0, 0.3, step=0.05)
    Trans_n_encoders          = trial.suggest_int("Trans_n_encoders", 1, 6, 1)
    Trans_head_num            = trial.suggest_categorical("Trans_head_num", [4, 8, 16, 32])
    Trans_model_dim           = trial.suggest_categorical("Trans_model_dim", [32, 64, 128, 256])
    Trans_dim_feedforward     = trial.suggest_categorical("Trans_dim_feedforward", [64, 128, 256, 512])
    Trans_gamma_g             = trial.suggest_float("Trans_gamma_g", 0.1, 0.9, step=0.1)
    Trans_gamma_c             = 1.0 - Trans_gamma_g
    Trans_n_linears           = trial.suggest_int("Trans_n_linears", 1, 2, 1)
    Trans_dim_linear          = dim_linear
    Trans_dim_out             = dim_out
    # CNN
    CNN_type                  = trial.suggest_categorical("CNN_type", ['AugCNN', 'AugCyclicConv'])
    CNN_num_conv              = trial.suggest_int("CNN_num_conv", 1, 6, 1)
    CNN_conv_units            = [int(trial.suggest_categorical("conv_units" + str(i), [32, 64, 128, 256])) for i in range(CNN_num_conv)]
    if CNN_type == 'AugCyclicConv':
        CNN_padding = 0
    elif CNN_type == 'AugCNN':
        CNN_padding = 1
    CNN_num_linear            = trial.suggest_int("CNN_num_linear", 1, 2, 1)
    CNN_linear_units          = [dim_linear]*CNN_num_linear
    CNN_activation_name       = activation_name
    CNN_pooling_name          = trial.suggest_categorical("CNN_pooling_name", ['max', 'ave'])
    CNN_dim_out               = dim_out
    # MLP
    MLP_num_mlp               = trial.suggest_int("MLP_num_mlp", 1, 6, 1)
    MLP_dim_mlp               = trial.suggest_categorical("MLP_dim_mlp", [64, 128, 256, 512])
    MLP_dim_linear            = dim_linear
    MLP_activation_name       = activation_name
    MLP_dropout_rate          = trial.suggest_float("MLP_dropout_rate", 0.0, 0.3, step=0.05)
    MLP_dim_out               = dim_out
    # concat
    Fusion_num_concat         = trial.suggest_int("Fusion_num_concat", 1, 3, 1)
    Fusion_concat_units       = [dim_linear]*Fusion_num_concat

    model = fusion_model.FusionModel(
        DEVICE, USE_AUXILIARY,
        # Transformer
        Trans_activation, Trans_dropout_rate,
        Trans_n_encoders, Trans_head_num, Trans_model_dim, Trans_dim_feedforward,
        Trans_gamma_g, Trans_gamma_c,
        Trans_n_linears, Trans_dim_linear, Trans_dim_out,
        # CNN
        CNN_type, CNN_num_conv, CNN_conv_units, CNN_padding,
        CNN_activation_name, CNN_pooling_name,
        CNN_num_linear, CNN_linear_units, CNN_dim_out,
        # MLP
        MLP_num_mlp, MLP_dim_mlp,
        MLP_activation_name, MLP_dropout_rate,
        MLP_dim_linear, MLP_dim_out,
        # Fusion
        Fusion_num_concat, Fusion_concat_units,
    )

    return model


def create_optimizer(trial, model):
    """
    Hyperparameters search for the optimizer.
    """
    optimizer_name = trial.suggest_categorical("optimizer_name", ['AdamW', 'NAdam', 'RAdam'])
    weight_decay = trial.suggest_categorical("weight_decay", [5e-6, 1e-5, 5e-5, 1e-4, 5e-4, 1e-3, 5e-3, 1e-2, 5e-2, 1e-1])

    # lr=0.001, betas=(0.9, 0.999), eps=1e-08, weight_decay=0.01,
    if optimizer_name == 'AdamW':
        optimizer = torch.optim.AdamW(model.parameters(), weight_decay=weight_decay)
    # lr=0.002, betas=(0.9, 0.999), eps=1e-08, weight_decay=0, momentum_decay=0.004, *, foreach=None, differentiable=False
    elif optimizer_name == 'NAdam':
        optimizer = torch.optim.NAdam(model.parameters(), weight_decay=weight_decay)
    # lr=0.001, betas=(0.9, 0.999), eps=1e-08, weight_decay=0, *, foreach=None, differentiable=False
    elif optimizer_name == 'RAdam':
        optimizer = torch.optim.RAdam(model.parameters(), weight_decay=weight_decay)

    return optimizer

In [3]:
MODEL_TYPE = 'Fusion'
REPLICA_NUM = 60 # Augmentation times

EPOCH_NUM = 50
PATIENCE = 5 # Stop early when validation loss does not decrease for five consecutive epochs

CV = 3

gamma_layer  = 0.05 # Weight of auxiliary layer loss
gamma_subout = 0.10 # Weight of auxiliary sub-model loss

# OPTIMIZE
# seed = 2024
# model_utils.set_seed(seed)


def objective(trial):

    time_start_trial = time.time()

    batch_size = trial.suggest_categorical("batch_size", [64, 128, 256])

    loss_trial = 0

    for cv in range(CV):
        folder_path = 'model/input/'
        dataset_train = model_utils.load_dataset(folder_path, MODEL_TYPE, REPLICA_NUM, f'Train_cv{cv}')
        dataset_valid = model_utils.load_dataset(folder_path, MODEL_TYPE, REPLICA_NUM, f'Valid_cv{cv}')

        dataloader_train = torch.utils.data.DataLoader(dataset_train, batch_size=batch_size, shuffle=True)
        dataloader_valid = torch.utils.data.DataLoader(dataset_valid, batch_size=batch_size, shuffle=False)

        model = create_model(trial)
        model = nn.DataParallel(model)
        model.to(DEVICE)

        criterion = nn.MSELoss()
        optimizer = create_optimizer(trial, model)
        # OPTIMIZE: Adjusting the learning rate
        init_lr=0.0001
        scheduler = schedular.NoamLR(optimizer=optimizer,
                                     warmup_epochs=[0.2*EPOCH_NUM],
                                     total_epochs=[EPOCH_NUM],
                                     steps_per_epoch=len(dataset_train) // batch_size,
                                     init_lr=[init_lr],
                                     max_lr=[init_lr*10],
                                     final_lr=[init_lr/10])

        model_path = f'weight/{MODEL_TYPE}_optuna/{MODEL_TYPE}-{REPLICA_NUM}_t{trial.number}_cv{cv}.cpt'

        loss_train_list, loss_valid_list = model_utils.train_loop(model_path, DEVICE, PATIENCE, EPOCH_NUM,
                                                                  dataloader_train, dataloader_valid, model, criterion,
                                                                  optimizer, scheduler,
                                                                  verbose=True,
                                                                  use_auxiliary=USE_AUXILIARY, gamma_layer=gamma_layer, gamma_subout=gamma_subout)
        # Save complete loss after early stopping
        if DEVICE == 'cuda':
            checkpoint = torch.load(model_path, weights_only=False)
        else:
            checkpoint = torch.load(model_path, map_location=torch.device('cpu'))
        checkpoint['loss_train_list'] = loss_train_list
        checkpoint['loss_valid_list'] = loss_valid_list
        torch.save(checkpoint, model_path)

        loss_trial += min(loss_valid_list)


    time_end_trial = time.time()
    print(f'Execution time of trial {trial.number:03d}: {(time_end_trial-time_start_trial):.0f}')
    print("------------------------------------------------------------------------")

    return loss_trial / CV


In [None]:
study = optuna.create_study(direction='minimize', study_name=f'{MODEL_TYPE}-{REPLICA_NUM}',
                            load_if_exists=True,
                            storage=f'sqlite:///weight/{MODEL_TYPE}_optuna/{MODEL_TYPE}-{REPLICA_NUM}.db')
study.optimize(objective, 10)
study.trials_dataframe().to_csv(f'weight/{MODEL_TYPE}_optuna/study_history_{MODEL_TYPE}-{REPLICA_NUM}.csv')

[I 2025-07-10 00:26:58,201] Using an existing study with name 'Fusion-60' instead of creating a new one.
  Trans_n_encoders          = trial.suggest_int("Trans_n_encoders", 1, 6, 1)
  Trans_n_linears           = trial.suggest_int("Trans_n_linears", 1, 2, 1)
  CNN_num_conv              = trial.suggest_int("CNN_num_conv", 1, 6, 1)
  CNN_num_linear            = trial.suggest_int("CNN_num_linear", 1, 2, 1)
  MLP_num_mlp               = trial.suggest_int("MLP_num_mlp", 1, 6, 1)
  Fusion_num_concat         = trial.suggest_int("Fusion_num_concat", 1, 3, 1)


batch   100/ 2907 of epoch  0 completed
batch   200/ 2907 of epoch  0 completed
batch   300/ 2907 of epoch  0 completed
batch   400/ 2907 of epoch  0 completed
batch   500/ 2907 of epoch  0 completed
batch   600/ 2907 of epoch  0 completed
batch   700/ 2907 of epoch  0 completed
batch   800/ 2907 of epoch  0 completed
batch   900/ 2907 of epoch  0 completed
batch  1000/ 2907 of epoch  0 completed
batch  1100/ 2907 of epoch  0 completed
batch  1200/ 2907 of epoch  0 completed
batch  1300/ 2907 of epoch  0 completed
batch  1400/ 2907 of epoch  0 completed
batch  1500/ 2907 of epoch  0 completed
batch  1600/ 2907 of epoch  0 completed
batch  1700/ 2907 of epoch  0 completed
batch  1800/ 2907 of epoch  0 completed
batch  1900/ 2907 of epoch  0 completed
batch  2000/ 2907 of epoch  0 completed
batch  2100/ 2907 of epoch  0 completed
batch  2200/ 2907 of epoch  0 completed
batch  2300/ 2907 of epoch  0 completed
batch  2400/ 2907 of epoch  0 completed
batch  2500/ 2907 of epoch  0 completed


### History visualization

In [None]:
study = optuna.load_study(study_name=f'{MODEL_TYPE}-{REPLICA_NUM}',
                          storage=f'sqlite:///weight/{MODEL_TYPE}_optuna/{MODEL_TYPE}-{REPLICA_NUM}.db')

In [None]:
optuna.visualization.plot_param_importances(study)

In [None]:
optuna.visualization.plot_optimization_history(study)

## Use determined hyperparameters for re-training

In [None]:
MODEL_TYPE = 'Fusion'
REPLICA_NUM = 60 # Augmentation times

EPOCH_NUM = 50
PATIENCE = 5 # Stop early when validation loss does not decrease for five consecutive epochs

CV = 3

gamma_layer  = 0.05 # Weight of auxiliary layer loss
gamma_subout = 0.10 # Weight of auxiliary sub-model loss

# Use auxiliary loss during training
USE_AUXILIARY = True

# OPTIMIZE
# seed = 2024
# model_utils.set_seed(seed)

In [None]:
# The results of the hyperparameters search
study_history = pd.read_csv(f'weight/{MODEL_TYPE}_optuna/study_history_{MODEL_TYPE}-{REPLICA_NUM}.csv').iloc[:,1:]
study_history = study_history[study_history['state'] == 'COMPLETE']
best_trial = study_history.sort_values('value').iloc[0]

# # Use CycPeptMP hyperparameters
# config_path = 'config/CycPeptMP.json'
# config = json.load(open(config_path,'r'))
# best_trial = config['model']

In [None]:
batch_size = int(best_trial['params_batch_size'])

for cv in range(CV):
    folder_path = 'model/input/'
    dataset_train = model_utils.load_dataset(folder_path, MODEL_TYPE, REPLICA_NUM, f'Train_{cv}')
    dataset_valid = model_utils.load_dataset(folder_path, MODEL_TYPE, REPLICA_NUM, f'Valid_{cv}')

    dataloader_train = torch.utils.data.DataLoader(dataset_train, batch_size=batch_size, shuffle=True)
    dataloader_valid = torch.utils.data.DataLoader(dataset_valid, batch_size=batch_size, shuffle=False)

    model = model_utils.create_model(best_trial, DEVICE, USE_AUXILIARY)
    model = nn.DataParallel(model)
    model.to(DEVICE)

    criterion = nn.MSELoss()
    optimizer = model_utils.create_optimizer(best_trial, model)

    # OPTIMIZE: Adjusting the learning rate
    init_lr=0.0001
    scheduler = schedular.NoamLR(optimizer=optimizer,
                                 warmup_epochs=[0.2*EPOCH_NUM],
                                 total_epochs=[EPOCH_NUM],
                                 steps_per_epoch=len(dataset_train) // batch_size,
                                 init_lr=[init_lr],
                                 max_lr=[init_lr*10],
                                 final_lr=[init_lr/10])
    # OPTIMIZE
    model_path = f'weight/{MODEL_TYPE}_retrain/{MODEL_TYPE}-{REPLICA_NUM}_cv{cv}.cpt'

    loss_train_list, loss_valid_list = model_utils.train_loop(model_path, DEVICE, PATIENCE, EPOCH_NUM,
                                                              dataloader_train, dataloader_valid, model, criterion,
                                                              optimizer, scheduler,
                                                              verbose=True,
                                                              use_auxiliary=USE_AUXILIARY, gamma_layer=gamma_layer, gamma_subout=gamma_subout)
    # Save complete loss after early stopping
    if DEVICE == 'cuda':
        checkpoint = torch.load(model_path)
    else:
        checkpoint = torch.load(model_path, map_location=torch.device('cpu'))
    checkpoint['loss_train_list'] = loss_train_list
    checkpoint['loss_valid_list'] = loss_valid_list
    torch.save(checkpoint, model_path)