# UniKP (logkcat)

In [None]:
"""Refer and revise from UniKP https://github.com/Luo-SynBioLab/UniKP with gpl-3.0"""
import os, math
from sklearn.ensemble import ExtraTreesRegressor
import pandas as pd
from sklearn.model_selection import train_test_split, KFold
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error
from scipy.stats import pearsonr
import random
from collections import Counter
import warnings
warnings.filterwarnings("ignore", category=UserWarning)
import numpy as np
from sklearnex import patch_sklearn
patch_sklearn()
current_dir = os.getcwd()
random_state = 66
random.seed(random_state)
np.random.seed(random_state)

def return_scores(y_test, y_pred):
    # 移除 NaN 值
    mask = ~np.isnan(y_test)
    y_test_filtered = y_test[mask]
    y_pred_filtered = y_pred[mask]

    # scores
    rmse = np.sqrt(mean_squared_error(y_test_filtered, y_pred_filtered))
    mae = mean_absolute_error(y_test_filtered, y_pred_filtered)
    r2 = r2_score(y_test_filtered, y_pred_filtered)
    pcc = pearsonr(y_test_filtered, y_pred_filtered)

    return rmse, mae, r2, pcc[0]


def return_x_y(df_filtered):
    y = df_filtered[label_name].values
    mask = ~np.isnan(y)

    # factors
    auxiliary_data = []
    if use_t_ph_embedding:
        ph = df_filtered['ph'].values.reshape(-1, 1)
        t = df_filtered['t'].values.reshape(-1, 1)
        auxiliary_data.append(ph)
        auxiliary_data.append(t)

    if use_mw_logp:
        mw = df_filtered['mw'].values.reshape(-1, 1)
        logp = df_filtered['logp'].values.reshape(-1, 1)
        auxiliary_data.append(mw)
        auxiliary_data.append(logp)

    protein_data = np.array(df_filtered[protein_column].tolist())
    substrate_data = np.array(df_filtered[substrate_column].tolist())
    x = np.hstack([protein_data, substrate_data] + auxiliary_data)

    return x[mask], y[mask]

def Smooth_Label(Label_new):
    labels = Label_new
    for i in range(len(labels)):
        labels[i] = labels[i] - min(labels)
    bin_index_per_label = [int(label*10) for label in labels]
    # print(bin_index_per_label)
    Nb = max(bin_index_per_label) + 1
    print(Nb)
    num_samples_of_bins = dict(Counter(bin_index_per_label))
    print(num_samples_of_bins)
    emp_label_dist = [num_samples_of_bins.get(i, 0) for i in range(Nb)]
    print(emp_label_dist, len(emp_label_dist))
    eff_label_dist = []
    beta = 0.9
    for i in range(len(emp_label_dist)):
        eff_label_dist.append((1-math.pow(beta, emp_label_dist[i])) / (1-beta))
    print(eff_label_dist)
    eff_num_per_label = [eff_label_dist[bin_idx] for bin_idx in bin_index_per_label]
    weights = [np.float32(1 / x) for x in eff_num_per_label]
    weights = np.array(weights)
    print(weights)
    print(len(weights))
    return weights


print('Reading data...', end='')
df_input = pd.read_pickle(f'{current_dir}/../../data_process/dataset/df_all_log_transformed.pkl')
print('Finished.')

# TODO Split dataset
label_name = 'logkcat'
use_t_ph_embedding, use_mw_logp = True, True
protein_column, substrate_column = 'prott5', 'molebert'
score_names = ['rmse', 'mae', 'r2', 'pcc']

train_val_idx, test_idx = train_test_split(df_input.index, test_size=0.2, random_state=random_state)
kf = KFold(n_splits=5, shuffle=True, random_state=random_state)
test_x, test_y = return_x_y(df_input.loc[test_idx])

val_scores_list = []
test_scores_list = []
test_pred_list = []
for fold_idx, (train_index, val_index) in enumerate(kf.split(train_val_idx), start=1):
    print(f"Fold: {fold_idx}/5")
    fold_train_idx, fold_val_idx = train_val_idx[train_index], train_val_idx[val_index]
    train_x, train_y = return_x_y(df_input.loc[fold_train_idx])
    val_x, val_y = return_x_y(df_input.loc[fold_val_idx])

    model = ExtraTreesRegressor(n_jobs=-1)
    model.fit(train_x, train_y)

    val_pred = model.predict(val_x)
    val_scores = return_scores(val_y, val_pred)
    test_pred = model.predict(test_x)
    test_pred_list.append(test_pred)
    test_scores = return_scores(test_y, test_pred)

    val_scores_list.append(val_scores)
    test_scores_list.append(test_scores)
    print(f'Val  fold{fold_idx} ', val_scores)
    print(f'Test fold{fold_idx} ', test_scores)

val_scores_mean = np.mean(val_scores_list, axis=0)
test_scores_mean = np.mean(test_scores_list, axis=0)

np.save(f'{current_dir}/results/unikp_test_pred.npy', np.array(test_pred_list))
np.save(f'{current_dir}/results/unikp_test_y.npy', np.array(test_y))

print(f"UniKP with CBW\t RMSE\t MAE\t R2\t PCC\t")
print(f"Val_mean \t {val_scores_mean[0]:.4f} \t {val_scores_mean[1]:.4f} \t {val_scores_mean[2]:.4f} \t {val_scores_mean[3]:.4f}\n"
      f"Test_mean \t {test_scores_mean[0]:.4f} \t {test_scores_mean[1]:.4f} \t {test_scores_mean[2]:.4f} \t {test_scores_mean[3]:.4f}\n")

# Calculate the mean value of predicted logkcat of 5 folds on test dataset

In [None]:
import numpy as np
import os
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error
from scipy.stats import pearsonr

def return_scores(y_test, y_pred):
    # 移除 NaN 值
    mask = ~np.isnan(y_test)
    y_test_filtered = y_test[mask]
    y_pred_filtered = y_pred[mask]

    # scores
    rmse = np.sqrt(mean_squared_error(y_test_filtered, y_pred_filtered))
    mae = mean_absolute_error(y_test_filtered, y_pred_filtered)
    r2 = r2_score(y_test_filtered, y_pred_filtered)
    pcc = pearsonr(y_test_filtered, y_pred_filtered)

    return rmse, mae, r2, pcc[0]

current_dir = os.getcwd()
test_pred_list = np.load(f'{current_dir}/results/unikp_test_pred.npy')
test_pred_npy = np.array([np.array(_) for _ in test_pred_list])
logkcat_pred_mean = test_pred_npy.mean(axis=0)

logkcat_test_y = np.load(f'{current_dir}/results/unikp_test_y.npy')
logkcat_scores = return_scores(logkcat_test_y, logkcat_pred_mean)
logkcat_scores

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import pearsonr, gaussian_kde
import matplotlib
from mpl_toolkits.axes_grid1.inset_locator import inset_axes
matplotlib.rc("font", weight="bold")

# 过滤掉填充值
mask = ~np.isnan(logkcat_test_y)
logkcat_test_y = logkcat_test_y[mask]
logkcat_pred_mean = logkcat_pred_mean[mask]
n = len(logkcat_test_y)

# 计算密度
xy = np.vstack([logkcat_test_y, logkcat_pred_mean])
z = gaussian_kde(xy)(xy)
fig, ax = plt.subplots(figsize=(8, 6))

sc = ax.scatter(
    logkcat_test_y,
    logkcat_pred_mean,
    c=z,
    s=20,
    cmap='viridis',
    linewidths=0
)

# 在图内底部居中插入一个横向 colorbar
cax = inset_axes(
    ax,
    width="60%",        # 控制宽度百分比
    height="4%",        # 控制高度
    loc='lower center', # 放在图的内部底部中间
    bbox_to_anchor=(0.5, 0.02, 0.5, 1),  # 精细控制位置偏移（可选）
    bbox_transform=ax.transAxes,
    borderpad=2
)

cb = plt.colorbar(sc, cax=cax, orientation='horizontal')
cb.ax.tick_params(labelsize=10)
cb.set_label('Density', fontsize=11, labelpad=6)
cb.ax.xaxis.set_label_position('top')

ax.set_xlabel(r'$log_{10}(k_{cat})$ $experimental$ $value$', fontsize=18)
ax.set_ylabel(r'$log_{10}(k_{cat})$ $predicted$ $value$', fontsize=18)
ax.text(
    0.05, 0.95,
    f'UniKP\n\nPCC = {logkcat_scores[-1]:.2f}\n$R^2$ = {logkcat_scores[-2]:.2f}\nN = {n}',
    transform=ax.transAxes,
    bbox=dict(facecolor='white', edgecolor='lightgray', alpha=0.9),
    fontsize=16,
    verticalalignment='top'
)
ax.tick_params(axis='x', labelsize=14)
ax.tick_params(axis='y', labelsize=14)
ax.grid(False)
plt.tight_layout()

plt.savefig(f'{current_dir}/results/unikp_scatter.png')
plt.show()

# Check the data in folds

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split, KFold
import os
import numpy as np
random_state = 66

current_dir = os.getcwd()
df_input = pd.read_pickle(f'{current_dir}/../../data_process/dataset/df_all_log_transformed.pkl')
train_val_idx, test_idx = train_test_split(df_input.index, test_size=0.2, random_state=random_state)
kf = KFold(n_splits=5, shuffle=True, random_state=random_state)

for fold_idx, (train_index, val_index) in enumerate(kf.split(train_val_idx), start=1):
    print(f"Fold: {fold_idx}/5")
    fold_train_idx, fold_val_idx = train_val_idx[train_index], train_val_idx[val_index]
    break

df_input.loc[fold_train_idx].head(20)