In [None]:
# lgbm optuna
import os
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from rdkit import Chem
from rdkit.Chem import AllChem, DataStructs, Descriptors
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import r2_score, mean_squared_error
from sklearn.model_selection import KFold
import lightgbm as lgb
import optuna


CFG = {
    'SEED': 42,
    'NBITS': 2048,
    'N_SPLITS': 5,
    'LEARNING_RATE': 0.01,
    'EARLY_STOPPING_ROUNDS': 100,
    'N_ESTIMATORS': 5000
}

OPTUNA_TRIALS = 100
N_SPLITS = CFG['N_SPLITS']
ESR = CFG['EARLY_STOPPING_ROUNDS']

def get_lgbm_params(seed):
    return dict(
        objective='regression',
        learning_rate=CFG['LEARNING_RATE'],
        num_leaves=64,
        feature_fraction=0.9,
        bagging_fraction=0.8,
        bagging_freq=1,
        min_child_samples=20,
        random_state=seed,
        verbose=-1
    )

def seed_everything(seed):
    random.seed(seed)
    np.random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)

seed_everything(CFG['SEED'])

# ========== Utis ==========
def IC50_to_pIC50(ic50_nM):
    ic50_nM = np.clip(ic50_nM, 1e-12, None)
    return 9 - np.log10(ic50_nM)
def pIC50_to_IC50(pIC50): return 10**(9 - pIC50)

def bv_to_np(bitvect, nbits):
    arr = np.zeros((nbits,), dtype=np.uint8)
    DataStructs.ConvertToNumpyArray(bitvect, arr)
    return arr

# ========== Data ==========
def preprocess_data():
    chembl = pd.read_csv("./ChEMBL_ASK1(IC50).csv", sep=';')
    pubchem = pd.read_csv("./Pubchem_ASK1.csv")

    tsv_add = pd.read_csv("./1_11.tsv", sep='\t')
    tsv_add = tsv_add[['Ligand SMILES', 'IC50 (nM)']].rename(columns={
        'Ligand SMILES': 'smiles',
        'IC50 (nM)': 'ic50'
    })

    chembl.columns = chembl.columns.str.strip().str.replace('"','')
    chembl = chembl[['Smiles', 'Standard Value']].rename(columns={'Smiles': 'smiles', 'Standard Value': 'ic50'})
    if 'Standard Type' in chembl.columns and chembl['Standard Type'].nunique() > 1:
        chembl = chembl[chembl['Standard Type'] == 'IC50']

    pubchem = pubchem[['SMILES', 'Activity_Value']].rename(columns={'SMILES': 'smiles', 'Activity_Value': 'ic50'})

    chembl['ic50'] = pd.to_numeric(chembl['ic50'], errors='coerce')
    pubchem['ic50'] = pd.to_numeric(pubchem['ic50'], errors='coerce')
    tsv_add['ic50'] = pd.to_numeric(tsv_add['ic50'], errors='coerce')

    chembl['source'] = 'chembl'
    pubchem['source'] = 'pubchem'
    tsv_add['source'] = 'extra'

    df = pd.concat([chembl, pubchem, tsv_add], ignore_index=True)
    df = df.dropna(subset=['smiles', 'ic50'])
    df = df[df['ic50'] > 0]
    df = df.drop_duplicates(subset='smiles').reset_index(drop=True)


    df['pIC50'] = IC50_to_pIC50(df['ic50'])

    print(f"전처리 완료: 총 {len(df)}개 샘플")
    print("소스별 샘플 수:\n", df['source'].value_counts())
    return df
    

# ========= 2) Fingerprint 생성기 =========
def fp_morgan(smiles, radius=2, nBits=CFG['NBITS']):
    mol = Chem.MolFromSmiles(smiles)
    if mol is None:
        return None
    bv = AllChem.GetMorganFingerprintAsBitVect(mol, radius=radius, nBits=nBits)
    return bv_to_np(bv, nBits)

def smiles_to_morgan_fp(smiles, radius=2, nBits=CFG['NBITS']):
    mol = Chem.MolFromSmiles(smiles)
    if mol is None:
        return None
    fp = AllChem.GetMorganFingerprintAsBitVect(mol, radius, nBits=nBits)
    arr = np.zeros((nBits,), dtype=np.uint8)     # 길이 반드시 nBits
    DataStructs.ConvertToNumpyArray(fp, arr)
    return arr

def calculate_rdkit_descriptors(smiles):
    mol = Chem.MolFromSmiles(smiles)
    if mol is None:
        return np.full((len(Descriptors._descList),), np.nan, dtype=float)
    try:
        vals = [fn(mol) for _, fn in Descriptors._descList]
    except Exception:
        return np.full((len(Descriptors._descList),), np.nan, dtype=float)
    return np.array(vals, dtype=float)

def get_score(y_true_ic50, y_pred_ic50, y_true_pic50, y_pred_pic50):
    rmse = mean_squared_error(y_true_ic50, y_pred_ic50)
    nrmse = rmse / (np.max(y_true_ic50) - np.min(y_true_ic50))
    A = 1 - min(nrmse, 1)
    B = r2_score(y_true_pic50, y_pred_pic50)
    score = 0.4 * A + 0.6 * B
    return score


# ========== Morgan + RDkit 결합 피처 생성 ==========
def build_X_morgan_plus_rdkit(df, radius=2, nBits=CFG['NBITS']):
    # Morgan FP
    morgan_list, keep_idx = [], []
    for i, smi in enumerate(df['smiles']):
        x = fp_morgan(smi, radius, nBits=nBits)
        if x is not None:
            morgan_list.append(x); keep_idx.append(i)
    sub = df.iloc[keep_idx].copy()
    X_morgan = np.stack(morgan_list)

    rd_list = [calculate_rdkit_descriptors(smi) for smi in sub['smiles']]
    X_rd = np.vstack(rd_list)

    imputer = SimpleImputer(strategy='median')
    scaler = StandardScaler()
    X_rd_imputed = imputer.fit_transform(X_rd)
    X_rd_scaled  = scaler.fit_transform(X_rd_imputed)

    # 결합
    X = np.hstack([X_morgan.astype(np.float32), X_rd_scaled.astype(np.float32)])

    # 타깃
    y_pic50 = sub['pIC50'].values.astype(float)
    y_ic50 = sub['ic50'].values.astype(float)

    desc_mean = np.nanmean(X_rd, axis=0)
    
    return X, y_pic50, y_ic50, sub, imputer, scaler, desc_mean

def comp_metric_sklearn(y_true, y_pred):
    y_true_ic50 = pIC50_to_IC50(y_true)
    y_pred_ic50 = pIC50_to_IC50(y_pred)

    score = get_score(y_true_ic50, y_pred_ic50, y_true, y_pred)
    return 'comp_score', score, True  # True: 높을수록 좋음


def objective(trial: optuna.Trial) -> float:
    # 탐색할 하이퍼파라미터 공간
    params = dict(
        objective='regression',
        learning_rate=trial.suggest_float('learning_rate', 1e-3, 3e-1, log=True),
        num_leaves=trial.suggest_int('num_leaves', 31, 255),
        feature_fraction=trial.suggest_float('feature_fraction', 0.6, 1.0),
        bagging_fraction=trial.suggest_float('bagging_fraction', 0.6, 1.0),
        bagging_freq=trial.suggest_int('bagging_freq', 0, 5),
        min_child_samples=trial.suggest_int('min_child_samples', 5, 60),
        reg_alpha=trial.suggest_float('reg_alpha', 0.0, 1.0),
        reg_lambda=trial.suggest_float('reg_lambda', 0.0, 3.0),
        random_state=CFG['SEED'],
        verbose=-1,
        n_estimators=CFG['N_ESTIMATORS'],
    )

    kf = KFold(n_splits=N_SPLITS, shuffle=True, random_state=CFG['SEED'])
    oof_pred_pic50 = np.zeros_like(y_pic50c, dtype=float)

    fold_scores = []
    for fold, (tr_idx, va_idx) in enumerate(kf.split(Xc_df), 1):
        X_tr, X_va = Xc_df.iloc[tr_idx].values, Xc_df.iloc[va_idx].values
        y_tr, y_va = y_pic50c[tr_idx], y_pic50c[va_idx]

        model = lgb.LGBMRegressor(**params)
        model.fit(
            X_tr, y_tr,
            eval_set=[(X_va, y_va)],
            eval_metric=comp_metric_sklearn,
            callbacks=[lgb.early_stopping(stopping_rounds=ESR, verbose=False)]
        )
        
        y_pred_va = model.predict(X_va, num_iteration=model.best_iteration_)
        oof_pred_pic50[va_idx] = y_pred_va

        y_va_ic50 = pIC50_to_IC50(y_va)
        y_pred_va_ic50 = pIC50_to_IC50(y_pred_va)
        score, A, B, rmse = comp_score(y_va_ic50, y_pred_va_ic50, y_va, y_pred_va)

        print(f"[Trial {trial.number}] Fold {fold} → Score={score:.4f}, A={A:.4f}, B={B:.4f}, RMSE(IC50)={rmse:.4f}")

    # OOF 종합 점수
    y_oof_ic50 = pIC50_to_IC50(oof_pred_pic50)
    score, A, B, rmse = comp_score(y_ic50c, y_oof_ic50, y_pic50c, oof_pred_pic50)

    print(f"[Trial {trial.number}] OOF 결과 → Score={score:.4f}, A={A:.4f}, B={B:.4f}, RMSE(IC50)={rmse:.4f}")

    trial.set_user_attr("A", A)
    trial.set_user_attr("B", B)
    trial.set_user_attr("RMSE(IC50)", rmse)

    return score


def train_oof_ensemble(best_params: dict):
    """Best params로 KFold 앙상블 학습해 object(리스트) 반환"""
    models = []
    kf = KFold(n_splits=N_SPLITS, shuffle=True, random_state=CFG['SEED'])
    for tr_idx, va_idx in kf.split(Xc_df):
        # DataFrame 그대로 사용 (values 제거)
        X_tr, X_va = Xc_df.iloc[tr_idx], Xc_df.iloc[va_idx]
        y_tr, y_va = y_pic50c[tr_idx], y_pic50c[va_idx]
    
        model = lgb.LGBMRegressor(**best_params, n_estimators=CFG['N_ESTIMATORS'])
        model.fit(
            X_tr, y_tr,
            eval_set=[(X_va, y_va)],
            eval_metric=comp_metric_sklearn,
            callbacks=[lgb.early_stopping(stopping_rounds=ESR, verbose=False)]
        )
        models.append(model)

    return models

def predict_with_ensemble(models, X_test_df: pd.DataFrame) -> np.ndarray:
    preds = np.zeros(len(X_test_df), dtype=float)
    for m in models:
        preds += m.predict(X_test_df, num_iteration=m.best_iteration_) / len(models)
    return preds

def comp_score(y_true_ic50, y_pred_ic50, y_true_pic50, y_pred_pic50):
    rmse = mean_squared_error(y_true_ic50, y_pred_ic50)
    nrmse = rmse / (np.max(y_true_ic50) - np.min(y_true_ic50))
    A = 1 - min(nrmse, 1)
    B = r2_score(y_true_pic50, y_pred_pic50)
    score = 0.4 * A + 0.6 * B
    return score, A, B, rmse

if __name__ == "__main__":
    df = preprocess_data()
    Xc, y_pic50c, y_ic50c, subc, imputer, scaler, desc_mean = build_X_morgan_plus_rdkit(df, radius=2, nBits=CFG['NBITS'])

    morgan_feature_names = [f"morgan_{i}" for i in range(CFG['NBITS'])]
    rd_dim = len(Descriptors._descList)
    rdkit_feature_names = [f"rdkit_{i}" for i in range(rd_dim)]
    feature_names = morgan_feature_names + rdkit_feature_names
    Xc_df = pd.DataFrame(Xc, columns=feature_names)

    sampler = optuna.samplers.TPESampler(seed=CFG['SEED'])
    study = optuna.create_study(direction="maximize", sampler=sampler)
    study.optimize(objective, n_trials=OPTUNA_TRIALS, show_progress_bar=False)

    best = study.best_trial
    score = best.value
    print(f"Best trial: {best.number}. Best value: {best.value:.6f}")
    print("Best params:", best.params)
    print(f"[lgbm+optuna] Score={score:.4f} |A={best.user_attrs.get('A'):.4f} | B={best.user_attrs.get('B'):.4f} | RMSE(IC50)={best.user_attrs.get('RMSE(IC50)') if best.user_attrs.get('RMSE(IC50)') is not None else 'NA'}")

    best_params = dict(
        objective='regression',
        **best.params,
        random_state=CFG['SEED'],
        verbose=-1
    )

    models = train_oof_ensemble(best_params)

    test_df = pd.read_csv("./test.csv")
    test_df['fingerprint'] = test_df['Smiles'].apply(smiles_to_morgan_fp)
    test_df['descriptors'] = test_df['Smiles'].apply(calculate_rdkit_descriptors)

    valid_mask = test_df['fingerprint'].apply(lambda x: x is not None) & test_df['descriptors'].apply(lambda x: x is not None)

    fp_test = np.stack(test_df.loc[valid_mask, 'fingerprint'].values)
    desc_test = np.stack(test_df.loc[valid_mask, 'descriptors'].values)
    desc_test = np.nan_to_num(desc_test, nan=desc_mean)
    desc_test = scaler.transform(desc_test)
    X_test = np.hstack([fp_test.astype(np.float32), desc_test.astype(np.float32)])

    X_test_df = pd.DataFrame(X_test, columns=feature_names)

    test_preds_p = predict_with_ensemble(models, X_test_df)
    test_preds_ic50 = pIC50_to_IC50(test_preds_p)

    submission = pd.read_csv("./sample_submission.csv")
    pred_df = pd.DataFrame({
        'ID': test_df.loc[valid_mask, 'ID'].astype(str).values,
        'ASK1_IC50_nM': test_preds_ic50
    })

    submission['ID'] = submission['ID'].astype(str)
    out = submission[['ID']].merge(pred_df, on='ID', how='left')

    ic50_mean = df['ic50'].mean()
    out['ASK1_IC50_nM'] = pd.to_numeric(out['ASK1_IC50_nM'], errors='coerce').fillna(ic50_mean).clip(lower=1e-6).round(6)

    os.makedirs("result_jy", exist_ok=True)
    out_path = "result_jy/lgbm_optuna_submission.csv"
    out.to_csv(out_path, index=False)
    print(f"제출 파일 저장 완료: {out_path}")
