In [2]:
import pandas as pd
import numpy as np
import pyreadr
import json
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import regularizers
from tensorflow.keras import layers, models
from tensorflow.keras.utils import to_categorical
from sklearn.datasets import load_iris
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score, precision_recall_fscore_support
import matplotlib.pyplot as plt
from typing import List, Optional, Sequence, Tuple, Union, Dict
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense, Dropout, Input
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.models import load_model as keras_load_model
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE, MDS
import os
import time
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, f1_score

import keras_tuner as kt
from dataclasses import dataclass, asdict
import random
from datetime import datetime
import joblib


Step 1: We load the expression, gene and sample data we saved after the pre-processing we applied in R. We aim to predict tumor grade from the RNAseq data, so we examine potential class imbalance in the tumor grade sample metadata column, and we remove NA values in tumors with unidentified grade.

In [3]:
#load expression, genes and samples data
expression=pd.read_csv("C:/Users/joann/Desktop/M2/Deep_Learning/merged_expression.csv",index_col=0)
genes=pd.read_csv("C:/Users/joann/Desktop/M2/Deep_Learning/merged_genes.csv")
samples = pyreadr.read_r("C:/Users/joann/Desktop/M2/Deep_Learning/merged_samples.rds")
samples = samples[None]

In [4]:
#check what the expression, genes and samples data look like
print(expression.columns[:5])
print(expression.head(2))
print(samples.head(2))

Index(['ENSG00000276168.1', 'ENSG00000129824.16', 'ENSG00000133048.13',
       'ENSG00000012223.13', 'ENSG00000198695.2'],
      dtype='object')
                              ENSG00000276168.1  ENSG00000129824.16  \
TCGA-HT-7468-01A-11R-2027-07           8.423027           13.253145   
TCGA-DU-7015-01A-11R-2027-07           8.252623            5.532866   

                              ENSG00000133048.13  ENSG00000012223.13  \
TCGA-HT-7468-01A-11R-2027-07            7.491808            4.879747   
TCGA-DU-7015-01A-11R-2027-07            8.707316            5.087127   

                              ENSG00000198695.2  ENSG00000228253.1  \
TCGA-HT-7468-01A-11R-2027-07          15.950897          12.763416   
TCGA-DU-7015-01A-11R-2027-07          16.922914          13.431528   

                              ENSG00000198763.3  ENSG00000133110.15  \
TCGA-HT-7468-01A-11R-2027-07          17.025877            5.074539   
TCGA-DU-7015-01A-11R-2027-07          17.670450            4.085189   


In [5]:
#explore the samples metadata
print(samples.shape)
# list of column names
samples.columns.tolist()

#check for potential class imbalance in tumor grade
print(samples["tumor_grade"].value_counts())

#count how many NA values are in the tumor grade column
print(samples["tumor_grade"].isna().sum())

(925, 114)
tumor_grade
G4    391
G3    216
G2    211
Name: count, dtype: int64
107


In [6]:
#we exclude samples with NA tumor grade
samples_all = samples.copy() #keep original samples dataframe
expression_all = expression.copy() #keep original expression dataframe

samples_sup = samples_all.dropna(subset=["tumor_grade"])
expression_sup = expression_all.loc[samples_sup.index]

assert expression_sup.shape[0] == samples_sup.shape[0], "Mismatch in number of samples between expression and samples dataframes after dropping NA tumor grades."



Step 2: We configure the model and ensure reproducibility.

In [None]:
@dataclass
class Config:
    out_dir: str = "C:/Users/joann/Desktop/M2/Deep_Learning/MLP_run"
    seed: int = 42

    # labels
    label_col: str = "tumor_grade"
    label_map: dict = None  # set in main if None

    # splits
    test_size: float = 0.15
    val_size: float = 0.15  # fraction of full dataset

    # training/tuning
    max_epochs: int = 100
    early_stop_patience: int = 5

    # tuner
    hyperband_factor: int = 3
    objective: str = "val_accuracy"

def set_global_seed(seed: int) -> None:
    os.environ["PYTHONHASHSEED"] = str(seed)
    random.seed(seed)
    np.random.seed(seed)
    tf.random.set_seed(seed)

    # makes TF ops more deterministic when possible
    try:
        tf.config.experimental.enable_op_determinism()
    except Exception:
        pass


def make_run_dir(base_dir: str) -> str:
    ts = datetime.now().strftime("%Y%m%d_%H%M%S")
    run_dir = os.path.join(base_dir, ts)
    os.makedirs(run_dir, exist_ok=True)
    return run_dir

Step 3: We assign the classes. Grade II tumors will be 0, grade III will be 1 and grade IV will be 2.

In [8]:
def make_y(samples_sup: pd.DataFrame, label_col: str, label_map: dict) -> np.ndarray:
    """
    Convert tumor grade strings to integer classes.
    Example mapping: {"G2":0, "G3":1, "G4":2}
    """
    y = samples_sup[label_col].astype(str).map(label_map)
    if y.isna().any():
        bad = samples_sup.loc[y.isna(), label_col].unique()
        raise ValueError(f"Found unmapped labels: {bad}")
    return y.astype(int).to_numpy()


Step 4: We construct our 3 splits: a training set, a validation set and a separate testing set. 

In [20]:
def split_train_val_test(X: pd.DataFrame, y: np.ndarray, cfg: Config):
    """
    3-way stratified split.
    1) split off test
    2) split remaining into train/val
    """
    X_temp, X_test, y_temp, y_test = train_test_split(
        X, y,
        test_size=cfg.test_size,
        random_state=cfg.seed,
        stratify=y
    )

    # val_size is fraction of full dataset; convert to fraction of remaining temp
    val_frac_of_temp = cfg.val_size / (1.0 - cfg.test_size)

    X_train, X_val, y_train, y_val = train_test_split(
        X_temp, y_temp,
        test_size=val_frac_of_temp,
        random_state=cfg.seed,
        stratify=y_temp
    )

    return X_train, X_val, X_test, y_train, y_val, y_test

#we save the split ids to enable reproducibility
def save_split_ids(run_dir: str, X_train: pd.DataFrame, X_val: pd.DataFrame, X_test: pd.DataFrame) -> None:
    pd.Index(X_train.index).to_series().to_csv(os.path.join(run_dir, "train_ids.csv"), index=False)
    pd.Index(X_val.index).to_series().to_csv(os.path.join(run_dir, "val_ids.csv"), index=False)
    pd.Index(X_test.index).to_series().to_csv(os.path.join(run_dir, "test_ids.csv"), index=False)

Step 5: We standardize the data, fitting on test data only to avoid data leakage

In [10]:
def standardize(X_train: pd.DataFrame, X_val: pd.DataFrame, X_test: pd.DataFrame):
    scaler = StandardScaler(with_mean=True, with_std=True)
    X_train_s = scaler.fit_transform(X_train).astype(np.float32)
    X_val_s = scaler.transform(X_val).astype(np.float32)
    X_test_s = scaler.transform(X_test).astype(np.float32)
    return X_train_s, X_val_s, X_test_s, scaler

Step 6: We build our multi-layer perceptron model (MLP)

In [None]:
def build_model(hp: kt.HyperParameters, input_dim: int, num_classes: int = 3) -> keras.Model:
    """
    Build an MLP for 3-class classification with tunable hyperparameters.
    """
    #batch size 
    hp.Choice("batch_size", [16, 32, 64, 128])  # registered for the tuner

    # architecture choices
    n_layers = hp.Int("n_layers", 1, 3)
    units = hp.Choice("units", [64, 128, 256, 512])
    dropout = hp.Float("dropout", 0.0, 0.5, step=0.1)
    l2_strength = hp.Choice("l2", [0.0, 1e-5, 1e-4, 1e-3])

    # optimizer choice
    lr = hp.Choice("lr", [1e-4, 3e-4, 1e-3, 3e-3])

    
    inputs = keras.Input(shape=(input_dim,), name="expression")
    x = inputs

    for i in range(n_layers):
        x = layers.Dense(
            units=units,
            activation="relu",
            kernel_regularizer=regularizers.l2(l2_strength),
            name=f"dense_{i+1}"
        )(x)
        if dropout > 0:
            x = layers.Dropout(dropout, name=f"dropout_{i+1}")(x)

    outputs = layers.Dense(num_classes, activation="softmax", name="softmax")(x)
    model = keras.Model(inputs, outputs, name="MLP_grade")

    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=lr),
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"]
    )
    return model

Step 7: We tune the hyper-paramaters to choose the best model and avoid overfitting

In [22]:
def tune_hyperparameters(X_train_s, y_train, X_val_s, y_val, input_dim: int, run_dir: str, cfg: Config):
    tuner_dir = os.path.join(run_dir, "tuner")
    os.makedirs(tuner_dir, exist_ok=True)

    tuner = kt.Hyperband(
        hypermodel=lambda hp: build_model(hp, input_dim=input_dim, num_classes=3),
        objective=kt.Objective(cfg.objective, direction="max"),
        max_epochs=cfg.max_epochs,
        factor=cfg.hyperband_factor,
        directory=tuner_dir,
        project_name="grade_mlp"
    )

    early_stop = keras.callbacks.EarlyStopping(
        monitor="val_accuracy",
        mode="max",
        patience=cfg.early_stop_patience,
        restore_best_weights=True
    )

    # Tune batch size as well
    # We choose from common values.
    def fit_kwargs(hp):
        return {"batch_size": hp.Choice("batch_size", [16, 32, 64, 128])}

    tuner.search(
        X_train_s, y_train,
        validation_data=(X_val_s, y_val),
        epochs=cfg.max_epochs,
        callbacks=[early_stop],
        **fit_kwargs(tuner.oracle.get_space())
    )

    best_hp = tuner.get_best_hyperparameters(1)[0]
    return tuner, best_hp

Step 8: We choose the best model to train

In [24]:
def train_best(X_train_s, y_train, X_val_s, y_val, input_dim: int, best_hp, run_dir: str, cfg: Config):
    model = build_model(best_hp, input_dim=input_dim, num_classes=3)

    ckpt_path = os.path.join(run_dir, "best_model.keras")

    early_stop = keras.callbacks.EarlyStopping(
        monitor="val_accuracy",
        mode="max",
        patience=cfg.early_stop_patience,
        restore_best_weights=True
    )

    checkpoint = keras.callbacks.ModelCheckpoint(
        filepath=ckpt_path,
        monitor="val_loss",
        save_best_only=True
    )

    history = model.fit(
        X_train_s, y_train,
        validation_data=(X_val_s, y_val),
        epochs=cfg.max_epochs,
        batch_size=best_hp.get("batch_size"),
        callbacks=[early_stop, checkpoint],
        verbose=1
    )

    # Save history
    with open(os.path.join(run_dir, "history.json"), "w") as f:
        json.dump(history.history, f, indent=2)

    return model, ckpt_path

Step 9: We evaluate the model

In [25]:
def evaluate(model: keras.Model, X_test_s, y_test, run_dir: str):
    test_loss, test_acc = model.evaluate(X_test_s, y_test, verbose=0)

    probs = model.predict(X_test_s, verbose=0)
    y_pred = np.argmax(probs, axis=1)

    report = classification_report(y_test, y_pred, output_dict=True)
    cm = confusion_matrix(y_test, y_pred).tolist()
    macro_f1 = float(f1_score(y_test, y_pred, average="macro"))

    results = {
        "test_loss": float(test_loss),
        "test_accuracy": float(test_acc),
        "macro_f1": macro_f1,
        "classification_report": report,
        "confusion_matrix": cm
    }

    with open(os.path.join(run_dir, "test_metrics.json"), "w") as f:
        json.dump(results, f, indent=2)

    return results

Step 10: We save all the run metadata

In [15]:
def save_run_metadata(run_dir: str, cfg: Config, best_hp, scaler):
    with open(os.path.join(run_dir, "config.json"), "w") as f:
        json.dump(asdict(cfg), f, indent=2)

    with open(os.path.join(run_dir, "best_hyperparameters.json"), "w") as f:
        json.dump(best_hp.values, f, indent=2)

    joblib.dump(scaler, os.path.join(run_dir, "scaler.joblib"))

    versions = {
        "python": f"{os.sys.version_info.major}.{os.sys.version_info.minor}.{os.sys.version_info.micro}",
        "numpy": np.__version__,
        "pandas": pd.__version__,
        "tensorflow": tf.__version__,
        "keras_tuner": kt.__version__
    }
    with open(os.path.join(run_dir, "versions.json"), "w") as f:
        json.dump(versions, f, indent=2)

Final Step: Main run script

In [16]:
def main(expression_sup: pd.DataFrame, samples_sup: pd.DataFrame):
    cfg = Config()
    if cfg.label_map is None:
        cfg.label_map = {"G2": 0, "G3": 1, "G4": 2}

    set_global_seed(cfg.seed)
    run_dir = make_run_dir(cfg.out_dir)

    # 1) labels
    y = make_y(samples_sup, cfg.label_col, cfg.label_map)

    # 2) split
    X_train, X_val, X_test, y_train, y_val, y_test = split_train_val_test(expression_sup, y, cfg)
    save_split_ids(run_dir, X_train, X_val, X_test)

    # 3) standardize
    X_train_s, X_val_s, X_test_s, scaler = standardize(X_train, X_val, X_test)

    # 4) tune
    tuner, best_hp = tune_hyperparameters(X_train_s, y_train, X_val_s, y_val, input_dim=X_train_s.shape[1], run_dir=run_dir, cfg=cfg)

    # 5) train best
    model, ckpt_path = train_best(X_train_s, y_train, X_val_s, y_val, input_dim=X_train_s.shape[1], best_hp=best_hp, run_dir=run_dir, cfg=cfg)

    # 6) evaluate
    results = evaluate(model, X_test_s, y_test, run_dir)

    # 7) save metadata + scaler + best HP
    save_run_metadata(run_dir, cfg, best_hp, scaler)

    print("Saved run to:", run_dir)
    print("Best model:", ckpt_path)
    print("Test accuracy:", results["test_accuracy"])
    print("Macro F1:", results["macro_f1"])

    return results, run_dir

In [26]:
#Now we call the main function to run the entire pipeline
results, run_dir = main(expression_sup, samples_sup)


Trial 254 Complete [00h 00m 14s]
val_accuracy: 0.869918704032898

Best val_accuracy So Far: 0.934959352016449
Total elapsed time: 00h 39m 00s
Epoch 1/100
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 111ms/step - accuracy: 0.5332 - loss: 2.3894 - val_accuracy: 0.8049 - val_loss: 1.8378
Epoch 2/100
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 60ms/step - accuracy: 0.6993 - loss: 2.0815 - val_accuracy: 0.8130 - val_loss: 1.7874
Epoch 3/100
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 52ms/step - accuracy: 0.7273 - loss: 2.0125 - val_accuracy: 0.8211 - val_loss: 1.7368
Epoch 4/100
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 46ms/step - accuracy: 0.7517 - loss: 1.8847 - val_accuracy: 0.8618 - val_loss: 1.6946
Epoch 5/100
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 56ms/step - accuracy: 0.7745 - loss: 1.8534 - val_accuracy: 0.8943 - val_loss: 1.6688
Epoch 6/100
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37

In [None]:
#we can now access the saved metrics to produce evaluation reports and plots
run_dir = r"C:/Users/joann/Desktop/M2/Deep_Learning/MLP_run/20251228_211457" 
  

# Load metrics
with open(os.path.join(run_dir, "test_metrics.json"), "r") as f:
    metrics = json.load(f)

cm = np.array(metrics["confusion_matrix"])
class_names = ["G2", "G3", "G4"]  # must match your label mapping

def plot_confusion_matrix(cm, class_names, out_path):
    fig, ax = plt.subplots()
    im = ax.imshow(cm)

    ax.set_xticks(range(len(class_names)))
    ax.set_yticks(range(len(class_names)))
    ax.set_xticklabels(class_names, rotation=45, ha="right")
    ax.set_yticklabels(class_names)

    ax.set_xlabel("Predicted")
    ax.set_ylabel("True")
    ax.set_title("Confusion Matrix")

    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            ax.text(j, i, str(cm[i, j]), ha="center", va="center")

    fig.colorbar(im, ax=ax)
    fig.tight_layout()
    fig.savefig(out_path, dpi=200)
    plt.close(fig)

plot_confusion_matrix(cm, class_names, os.path.join(run_dir, "confusion_matrix.png"))
print("Saved:", os.path.join(run_dir, "confusion_matrix.png"))


Saved: C:/Users/joann/Desktop/M2/Deep_Learning/MLP_run/20251228_211457\confusion_matrix.png
