In [1]:
import pandas as pd
import numpy as np
import random

import optuna
from optuna.samplers import TPESampler
from optuna.pruners import HyperbandPruner

import rdkit
from rdkit.Chem import AllChem, rdFingerprintGenerator
from rdkit import Chem, DataStructs

from sklearn.model_selection import train_test_split, StratifiedKFold, KFold
from sklearn.metrics import mean_absolute_error, mean_squared_error
from sklearn.preprocessing import StandardScaler, MinMaxScaler, LabelEncoder

import matplotlib.pyplot as plt

import catboost as cat

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
RANDOM_SEED = 777

In [3]:
def feature_engineer():
  train_df = pd.read_csv("../data/train.csv")
  test_df = pd.read_csv("../data/test.csv")
  train_df = train_df.dropna()
  test_df["AlogP"] = np.where(pd.isna(test_df["AlogP"]), test_df["LogD"], test_df["AlogP"])
  
  train_df["mol"] = train_df["SMILES"].apply(lambda x: Chem.MolFromSmiles(x))
  test_df["mol"] = test_df["SMILES"].apply(lambda x: Chem.MolFromSmiles(x))
  
  train_df['mol'] = train_df['mol'].apply(lambda x: Chem.AddHs(x))
  train_df['num_of_atoms'] = train_df['mol'].apply(lambda x: x.GetNumAtoms())
  train_df['num_of_heavy_atoms'] = train_df['mol'].apply(lambda x: x.GetNumHeavyAtoms())
  
  test_df['mol'] = test_df['mol'].apply(lambda x: Chem.AddHs(x))
  test_df['num_of_atoms'] = test_df['mol'].apply(lambda x: x.GetNumAtoms())
  test_df['num_of_heavy_atoms'] = test_df['mol'].apply(lambda x: x.GetNumHeavyAtoms())
  
  # 중복제거
  def canonize(mol):
    return Chem.MolToSmiles(Chem.MolFromSmiles(mol), isomericSmiles=True, canonical=True)

  canon_smile = []
  for molecule in train_df['SMILES']:
    canon_smile.append(canonize(molecule))
  
  train_df['canon_smiles'] = canon_smile
  
  ind = train_df.index[train_df['canon_smiles'].duplicated()]
  train_df = train_df.drop(ind)
  
  train_df.drop(columns=["id", "SMILES"], inplace=True)
  test_df.drop(columns=["id", "SMILES"], inplace=True)
  
  fmgen = rdFingerprintGenerator.GetMorganGenerator()
  train_fps = train_df["mol"].apply(lambda x: fmgen.GetFingerprintAsNumPy(x))
  train_fps = np.stack(train_fps)
  test_fps = test_df["mol"].apply(lambda x: fmgen.GetFingerprintAsNumPy(x))
  test_fps = np.stack(test_fps)
  
  origin_train_features = train_df[["AlogP", "Molecular_Weight", "Num_H_Acceptors", "Num_H_Donors", "Num_RotatableBonds", "LogD", "Molecular_PolarSurfaceArea", "num_of_atoms", "num_of_heavy_atoms"]].values
  origin_test_features = test_df[["AlogP", "Molecular_Weight", "Num_H_Acceptors", "Num_H_Donors", "Num_RotatableBonds", "LogD", "Molecular_PolarSurfaceArea", "num_of_atoms", "num_of_heavy_atoms"]].values
  
  train_features = np.append(origin_train_features, train_fps, axis=1)
  test_features = np.append(origin_test_features, test_fps, axis=1)
  target = train_df["HLM"].values
  
  return train_features, target, test_features
  

In [4]:
train_x, train_y, test_x = feature_engineer()

27
3496
3469


In [None]:
def create_model(param):
  return cat.CatBoostRegressor(
    random_state=RANDOM_SEED,
    verbose=False,
    **param
  )

In [None]:
def train_model(param, X, y, X_test, trial=None):
    skf = KFold(n_splits=5, shuffle=True, random_state=RANDOM_SEED)

    val_scores = []
    y_tests = []
    models = []

    for idx, (train_idx, val_idx) in enumerate(skf.split(X, y)):

        X_train, y_train = X[train_idx], y[train_idx]
        X_val, y_val = X[val_idx], y[val_idx]

        model = create_model(param)
        model.fit(X_train, y_train, eval_set=[(X_train, y_train), (X_val, y_val)], early_stopping_rounds=50, verbose=500)

        y_hat_val = model.predict(X_val)
        score = mean_squared_error(y_val, y_hat_val, squared=True)
        val_scores.append(score)
        print(f'Fold: {idx+1}/5 score = {score:.5f}')

        y_tests.append(model.predict(X_test))
        models.append(model)

        if trial:
            trial.report(score, idx)

            if trial.should_prune():
                raise optuna.TrialPruned()

    return val_scores, y_tests, models

In [None]:
def objective_cat(trial):
    param = {
        "iterations": trial.suggest_int("iterations", 2000, 20000),
        "learning_rate": trial.suggest_float("learning_rate", 0.001, 0.3),
        "depth": trial.suggest_int("depth", 1, 10),
        "subsample": trial.suggest_float("subsample", 0.1, 1.0),
        "min_data_in_leaf": trial.suggest_int("min_data_in_leaf", 10, 100),
        "colsample_bylevel": trial.suggest_float("colsample_bylevel", 0.05, 1.0),
    }

    val_scores, y_tests, models = train_model(param, train_x, train_y, test_x, trial)

    return sum(val_scores) / len(val_scores)

In [None]:
study = optuna.create_study(
    sampler=TPESampler(seed=RANDOM_SEED),
    direction='minimize',
    study_name='cat_tuning',
    pruner=HyperbandPruner(
        min_resource=1, max_resource=8, reduction_factor=3
    ),
)

study.optimize(objective_cat, n_trials=70)

In [None]:
trial = study.best_trial

In [None]:
print(trial.params)

In [None]:
best_param = {
  'iterations': 5384,
  'learning_rate': 0.016291040637706457,
  'depth': 10,
  'subsample': 0.9609604742509466,
  'min_data_in_leaf': 96,
  'colsample_bylevel': 0.36816810171615777
}

val_scores, y_tests, models = train_model(best_param, train_x, train_y, test_x)

In [None]:
for idx, model in enumerate(models):
  model.save_model(f"model/catboost_HLM_fold_{idx}.model")

In [None]:
submit = pd.read_csv("../data/sample_submission.csv")
submit["HLM"] = np.mean(y_tests, axis=0)
submit.to_csv("catboost_optuna.csv", index=False)