In [None]:
# extract_trs.py
# Purpose: compute TRS (mean probability across ensemble), build result table,
# assign sample set (Train/Validation/Other), derive predicted labels and save results.
# Note: this script assumes `models`, `scalers`, and feature matrix `X` (pandas DataFrame)
# are already available in the current session.

import os
import numpy as np
import pandas as pd

# --- Path configuration ---
BASE_DIR = r"D:\结直肠癌肝转移Biomarker 诊断\新的策略\Autoencoder"
OUTPUT_DIR = os.path.join(BASE_DIR, "跨平台模型", "multi_omics_results")
os.makedirs(OUTPUT_DIR, exist_ok=True)

# --- Read training / validation sample ID lists ---
train_ids_file = os.path.join(BASE_DIR, "final_model_results_updated", "train_sample_ids.txt")
val_ids_file = os.path.join(BASE_DIR, "final_model_results_updated", "val_sample_ids.txt")

def read_id_list(path):
    if not os.path.exists(path):
        return []
    with open(path, "r") as f:
        return [line.strip() for line in f if line.strip()]

train_ids = read_id_list(train_ids_file)
val_ids = read_id_list(val_ids_file)

In [None]:
# --- Compute TRS scores (mean predicted probability across ensemble) ---
# Require: models (list of keras models), scalers (list of sklearn scalers), X (DataFrame)
if not ('models' in globals() and 'scalers' in globals() and 'X' in globals()):
    raise RuntimeError("Please ensure `models`, `scalers`, and `X` (DataFrame) are defined before running this script.")

trs_predictions = []
for model, scaler in zip(models, scalers):
    # scale features with the scaler associated to this model
    X_scaled = scaler.transform(X)
    X_cnn = np.expand_dims(X_scaled, axis=-1)  # shape: (n_samples, n_features, 1)
    pred = model.predict(X_cnn, verbose=0)
    trs_predictions.append(pred.flatten())

In [None]:
# Mean ensemble probability (raw)
trs_raw = np.mean(trs_predictions, axis=0)

# --- Build TRS results DataFrame ---
trs_df = pd.DataFrame({
    "Sample_ID": X.index,
    "TRS": trs_raw
})
trs_df["Set"] = "Other"
if len(train_ids) > 0:
    trs_df.loc[trs_df["Sample_ID"].isin(train_ids), "Set"] = "Train"
if len(val_ids) > 0:
    trs_df.loc[trs_df["Sample_ID"].isin(val_ids), "Set"] = "Validation"

In [None]:
# --- Derive predicted labels and correctness ---
# If true labels are available, try to add them. Common options:
# - A vector `y_true` aligned with X.index
# - A DataFrame column X['true_label'] or a separate `labels_df`
# The script will use whichever is present; otherwise true-label columns are omitted.

result = trs_df.copy()

# Attach true labels if available in common variable names
if 'y_true' in globals():
    # assume y_true is aligned with X.index (pandas Series or array)
    result['True_Label_int'] = pd.Series(y_true, index=X.index).values
if 'labels_df' in globals() and 'True_Label_int' in labels_df.columns:
    # merge by index if provided
    merged = labels_df[['True_Label_int']].reindex(result['Sample_ID']).reset_index(drop=True)
    result['True_Label_int'] = merged['True_Label_int'].values
if 'True_Label_int' in result.columns:
    # optional human-readable string label if not present
    if 'True_Label_str' not in result.columns:
        result['True_Label_str'] = result['True_Label_int'].map({0: "Primary", 1: "Metastasis"})

In [None]:
# Interpret TRS as metastatic risk probability (higher = higher risk)
result["TRS_Score"] = result["TRS"]
result["Predicted_Probability"] = result["TRS_Score"]
result["Predicted_Label_int"] = (result["Predicted_Probability"] >= 0.5).astype(int)
result["Predicted_Label_str"] = result["Predicted_Label_int"].map({0: "Primary", 1: "Metastasis"})


In [None]:
# Classification correctness if true labels exist
if 'True_Label_int' in result.columns:
    result["Classification_Correct"] = result["Predicted_Label_int"] == result["True_Label_int"]
else:
    result["Classification_Correct"] = pd.NA

# --- Select columns to save ---
cols_to_keep = [
    "Sample_ID",
    # include true labels if present
] 
if "True_Label_str" in result.columns:
    cols_to_keep.append("True_Label_str")
if "True_Label_int" in result.columns:
    cols_to_keep.append("True_Label_int")


In [None]:
cols_to_keep += [
    "TRS_Score",
    "Predicted_Probability",
    "Predicted_Label_int",
    "Predicted_Label_str",
    "Classification_Correct",
    "Set"
]

final_table = result[cols_to_keep]

# --- Save results ---
output_path = os.path.join(OUTPUT_DIR, "trs_prediction_results of training and internal validation set.csv")
final_table.to_csv(output_path, index=False)
print(f"Corrected TRS results saved to: {output_path}")
print("Note: TRS is interpreted such that higher scores indicate higher metastasis risk.")