# Single classifier optuna optimization

In [37]:
import numpy as np
import pandas as pd
from sklearn.model_selection import StratifiedKFold
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier
from src.common import init_logger,setup_seed
from sklearn.naive_bayes import GaussianNB
from lightgbm import LGBMClassifier
from sklearn.metrics import f1_score
from sklearn.metrics import roc_curve, auc
import warnings
import json
import itertools
import optuna
warnings.filterwarnings("ignore")

## optuna configuration parameters

In [38]:
def load_config(path):
    with open(path, 'r') as f:
        return json.load(f)


CV_RESULT = ""
LOGFILE = f"01model_optuna_f1.log"

logger,file_handler = init_logger(LOGFILE)

def my_svm(**params):
    return SVC(probability=True, **params)

def my_lgb(**params):
    return LGBMClassifier(verbose=-1, **params)

model_dict = {
    "LR": LogisticRegression,
    "SVM": my_svm,
    "NB": GaussianNB,
    "KNN": KNeighborsClassifier,
    "RF": RandomForestClassifier,
    "XGBoost": XGBClassifier,
    "LightGBM": my_lgb,
}

clinical_features = ['Gender', 'ALT', 'AST', 'Globulin', 'DBIL', 'IBIL', 'AFP', 'DNA load', 'HBsAg', 'HBeAg_COI']
specific_features = ['SFU', 'HBsAg1_T', 'HBsAg2_T', 'HBpol1_T', 'HBpol2_T', 'HBx1_T', 'HBx2_T', 'HBeAg1_T', 'HBeAg2_T']
treat_features = ['ThSched', 'ADV', 'ETV', 'PEG_IFN', 'TAF', 'TDF', 'TFV', 'TMF', 'UnusedD']


## sklearn traditional machine learning

In [39]:
setup_seed(42)
config = load_config('../config/ml_config.json')
storage_name = "postgresql://postgres:xxx@127.0.0.1/hep_f1"
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

def ml_optimize_model(X, y, model_name, feature_group, n_trials=50):
    def objective(trial):
        try:
            params = {}
            for param_name, param_info in config[model_name].items():
                param_method = getattr(trial, param_info["type"])
                params[param_name] = param_method(param_name, *param_info["args"])
            
            auc_list = []
            for train_index, test_index in skf.split(X, y):
                model_constructor = model_dict[model_name]
                model = model_constructor(**params)
                X_train, X_test = X.iloc[train_index], X.iloc[test_index]
                y_train, y_test = y.iloc[train_index], y.iloc[test_index]
                model.fit(X_train, y_train)

                y_pred = model.predict(X_test)
                f1 = f1_score(y_test, y_pred)
                auc_list.append(f1)

            return np.mean(auc_list)
        except Exception as e:
            trial.report(float('-inf'), step=0)
            raise optuna.exceptions.TrialPruned()

    study = optuna.create_study(study_name=feature_group+"_"+model_name, storage=storage_name, direction='maximize')
    study.optimize(objective, n_trials=n_trials, n_jobs=-1)
    return study.best_params, study.best_value, study



# deep learning

In [41]:
import torch
import torch.nn as nn
import torch.optim as optim
from ..src.deep_model import FCN, CNN, train_evaluate
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
device = torch.device(f'cuda' if torch.cuda.is_available() else 'cpu')

def dl_optimize_model(X, y, model_name, feature_group, n_trials=50):

    def objective(trial):
        if model_name == "FCN":
            lr = trial.suggest_loguniform('lr', 1e-5, 1e-1)
            n_layers = trial.suggest_int("n_layers", 1, 5)
            hidden_layers = [trial.suggest_categorical(f"n_units_l{i}", [16, 32, 64, 128, 256]) for i in range(n_layers)]
            activation_func = trial.suggest_categorical('activation_func', ['relu', 'tanh', 'sigmoid'])
            optimizer_name = trial.suggest_categorical('optimizer', ['Adam', 'SGD','AdamW'])
            
            f1_list = []
            for train_index, test_index in skf.split(X, y):
                setup_seed(42)
                X_train, X_val = X.iloc[train_index], X.iloc[test_index]
                y_train, y_val = y.iloc[train_index], y.iloc[test_index]
                model = FCN(input_dim=X.shape[1], output_dim=1, hidden_layers=hidden_layers, activation_func=activation_func).to(device)
                optimizer = getattr(optim, optimizer_name)(model.parameters(), lr=lr)
                criterion = nn.BCEWithLogitsLoss()

                f1 = train_evaluate(model, criterion, optimizer, X_train, y_train, X_val, y_val)
                f1_list.append(f1)

            return np.mean(f1_list)
        
        elif model_name == "CNN":
            lr = trial.suggest_loguniform('lr', 1e-5, 1e-1)
            n_layers = trial.suggest_int("n_layers", 1, 5)
            n_filters = [trial.suggest_categorical(f"n_filters_l{i}", [16, 32, 64, 128]) for i in range(n_layers)]
            kernel_size = trial.suggest_categorical("kernel_size", [3, 5, 7])
            activation_func = trial.suggest_categorical('activation_func', ['relu', 'tanh', 'sigmoid'])
            optimizer_name = trial.suggest_categorical('optimizer', ['Adam', 'SGD','AdamW'])
            
            f1_list = []
            for train_index, test_index in skf.split(X, y):
                setup_seed(42)
                X_train, X_val = X.iloc[train_index], X.iloc[test_index]
                y_train, y_val = y.iloc[train_index], y.iloc[test_index]
                model = CNN(n_features=X.shape[1], output_dim=1, n_layers=n_layers, n_filters=n_filters, kernel_size=kernel_size, activation_func=activation_func).to(device)
                optimizer = getattr(optim, optimizer_name)(model.parameters(), lr=lr)
                criterion = nn.BCEWithLogitsLoss()

                f1 = train_evaluate(model, criterion, optimizer, X_train, y_train, X_val, y_val)
                f1_list.append(f1)

            return np.mean(f1_list)

        else:
            NotImplementedError

    study = optuna.create_study(study_name=feature_group+"_"+model_name, storage=storage_name, direction='maximize')
    study.optimize(objective, n_trials=n_trials, n_jobs=-1)
    return study.best_params, study.best_value, study


# main

In [None]:
setup_seed(42)
data = pd.read_csv('')
y = data['Label']
model_list = []

feature_dict = {
    "CIF": clinical_features,
    "STCF": specific_features,
    "TPF": treat_features
}

feature_names = list(feature_dict.keys())
combinations_1 = list(itertools.combinations(feature_names, 1))
combinations_2 = list(itertools.combinations(feature_names, 2))
combinations_3 = list(itertools.combinations(feature_names, 3))
all_combinations = combinations_1 + combinations_2 + combinations_3
all_combinations

for combo in all_combinations:

    combined_features = []
    for group in combo:
        combined_features.extend(feature_dict[group])
    feature_group = ' + '.join([g for g in combo])
    logger.info("#"*50)
    logger.info(feature_group)
    X = data[combined_features]
    for model_name in model_dict.keys():
        best_params, best_score, study = ml_optimize_model(X, y, model_name, feature_group, n_trials=200)
        logger.info(f"{model_name} Best parameters: {best_params}")
        logger.info(f"{model_name} Best score: {best_score}")
    
    for model_name in ['FCN','CNN']:
        best_params, best_score, study = dl_optimize_model(X, y, model_name, feature_group, n_trials=200)
        logger.info(f"{model_name} Best parameters: {best_params}")
        logger.info(f"{model_name} Best score: {best_score}")

In [44]:
file_handler.close()

In [None]:

!optuna-dashboard --host 0.0.0.0 --port 8083 postgresql://postgres:123...@127.0.0.1/hep_f1