In [1]:
import numpy as np
import os
import re

def extract_number(filename):
    return int(re.search(r'\d+', filename).group())

def load_data(path='cnn_input_data/'):
    class_files = sorted([f for f in os.listdir(path) if f.endswith(".npz")], key=extract_number)
    
    data, labels = [], []
    for i, class_file in enumerate(class_files):
        loaded = np.load(os.path.join(path, class_file))
        class_data = np.stack([loaded[k] for k in loaded.files])
        class_labels = np.ones(class_data.shape[0]) * i

        data.append(class_data)
        labels.append(class_labels)
        print("Loaded:", class_file)

    data = np.concatenate(data, axis=0)
    labels = np.concatenate(labels, axis=0)
    data[np.isnan(data)] = 0  # NaN 제거
    print(f"Data shape: {data.shape}, Labels shape: {labels.shape}")
    return data, labels, len(class_files), data.shape[2]


2025-06-09 17:58:43.386904: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-06-09 17:58:43.388765: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2025-06-09 17:58:43.394562: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2025-06-09 17:58:43.409866: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1749459523.435165 1089384 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1749459523.44

In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models, callbacks, backend as K
from sklearn.model_selection import train_test_split
import os

def precision_m(y_true, y_pred):
    y_true = K.one_hot(K.cast(y_true, 'int32'), num_classes)
    y_true = K.cast(y_true, 'float32')
    tp = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    pred_pos = K.sum(K.round(K.clip(y_pred, 0, 1)))
    return tp / (pred_pos + K.epsilon())

def recall_m(y_true, y_pred):
    y_true = K.one_hot(K.cast(y_true, 'int32'), num_classes)
    y_true = K.cast(y_true, 'float32')
    tp = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    actual_pos = K.sum(K.round(K.clip(y_true, 0, 1)))
    return tp / (actual_pos + K.epsilon())

def f1_m(y_true, y_pred):
    p = precision_m(y_true, y_pred)
    r = recall_m(y_true, y_pred)
    return 2 * ((p * r) / (p + r + K.epsilon()))

def create_model(input_shape, num_classes):
    return tf.keras.Sequential([
        layers.Conv2D(8, (1, 1), activation='relu', kernel_initializer='he_normal', input_shape=input_shape),
        layers.Conv2D(16, (10, 1), strides=(10, 1), activation='relu', kernel_initializer='he_normal'),
        layers.BatchNormalization(),
        layers.Conv2D(16, (10, 1), strides=(10, 1), activation='relu', kernel_initializer='he_normal'),
        layers.MaxPooling2D((1, 4)),
        layers.Conv2D(32, (1, 4), activation='relu', kernel_initializer='he_normal'),
        layers.BatchNormalization(),
        layers.MaxPooling2D((1, 2)),
        layers.GlobalAveragePooling2D(),
        layers.Dense(128, activation='relu'),
        layers.Dropout(0.4),
        layers.Dense(64, activation='relu'),
        layers.Dropout(0.4),
        layers.Dense(num_classes, activation='softmax')
    ])

def train_and_save_model(data, labels, num_classes, num_lrpair,
                         base_save_path='cnn_model/', n_repeat=10, batch_size=32, epochs=80):
    os.makedirs(base_save_path, exist_ok=True)

    for i in range(1, n_repeat + 1):
        print(f"▶ Training model {i}/{n_repeat}")
        rs = 41 + i
        X_tmp, test_data, Y_tmp, test_labels = train_test_split(data, labels, test_size=0.2, random_state=rs)
        train_data, val_data, train_labels, val_labels = train_test_split(X_tmp, Y_tmp, test_size=0.2, random_state=rs + 1)

        model = create_model((100, num_lrpair, 2), num_classes)
        model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
                      loss='sparse_categorical_crossentropy',
                      metrics=['accuracy', f1_m, precision_m, recall_m])

        lr_reduction = callbacks.ReduceLROnPlateau(monitor='val_loss', patience=5, factor=0.5, min_lr=1e-6)

        model.fit(train_data, train_labels, validation_data=(val_data, val_labels),
                  epochs=epochs, batch_size=batch_size, callbacks=[lr_reduction], verbose=1)

        model.save(os.path.join(base_save_path, f"data_cnn-model_v0{i}.h5"))
        print(f"✔ Model {i} saved.\n")


In [None]:
from tensorflow.keras.models import load_model
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, precision_score, recall_score
import numpy as np

def evaluate_saved_models(data, labels, num_classes, model_dir='cnn_model/', n_repeat=10):
    for i in range(1, n_repeat + 1):
        print(f"=== Model v{i} Evaluation ===")
        rs = 41 + i
        _, test_data, _, test_labels = train_test_split(data, labels, test_size=0.2, random_state=rs)

        model_path = os.path.join(model_dir, f"data_cnn-model_v0{i}.h5")
        model = load_model(model_path, custom_objects={
            'f1_m': f1_m,
            'precision_m': precision_m,
            'recall_m': recall_m
        })

        y_pred_probs = model.predict(test_data, batch_size=32, verbose=0)
        pred_labels = np.argmax(y_pred_probs, axis=1)
        true_labels = test_labels.astype(int)

        f1 = f1_score(true_labels, pred_labels, average='weighted')
        recall = recall_score(true_labels, pred_labels, average='weighted')
        precision = precision_score(true_labels, pred_labels, average='weighted')
        loss, accuracy, *_ = model.evaluate(test_data, true_labels, verbose=0)

        print(f"Loss: {loss:.4f} | Accuracy: {accuracy:.4f} | F1: {f1:.4f} | Precision: {precision:.4f} | Recall: {recall:.4f}\n")


In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, roc_curve, auc
from sklearn.preprocessing import label_binarize
from itertools import cycle

def visualize_final_model_results(model, test_data, test_labels, class_names=None):
    """
    Visualize confusion matrix and ROC-AUC curves for a trained multi-class classifier.
    
    Parameters
    ----------
    model : keras.Model
        Trained Keras model.
    test_data : np.ndarray
        Input features for test set.
    test_labels : np.ndarray
        True labels for test set (integer class labels).
    class_names : list of str, optional
        Class name labels. If None, use integer indices.
    """
    # === Confusion Matrix ===
    predictions = model.predict(test_data)
    predicted_classes = np.argmax(predictions, axis=1)

    cm = confusion_matrix(test_labels, predicted_classes)
    n_classes = len(np.unique(test_labels))
    
    if class_names is None:
        class_names = [f'Class {i}' for i in range(n_classes)]

    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", 
                xticklabels=class_names, yticklabels=class_names)
    plt.title('Confusion matrix')
    plt.ylabel('Actual label')
    plt.xlabel('Predicted label')
    plt.show()

    # === ROC-AUC Curve ===
    y_test_bin = label_binarize(test_labels, classes=np.arange(n_classes))
    y_score = predictions

    fpr = dict()
    tpr = dict()
    roc_auc = dict()

    for i in range(n_classes):
        fpr[i], tpr[i], _ = roc_curve(y_test_bin[:, i], y_score[:, i])
        roc_auc[i] = auc(fpr[i], tpr[i])

    plt.figure(figsize=(10, 8))
    colors = cycle(['blue', 'red', 'green', 'cyan', 'magenta', 'yellow', 
                    'black', 'orange', 'purple', 'brown', 'gray'])
    for i, color in zip(range(n_classes), colors):
        plt.plot(fpr[i], tpr[i], color=color, lw=2,
                 label=f'{class_names[i]} (AUC = {roc_auc[i]:0.2f})')

    plt.plot([0, 1], [0, 1], 'k--', lw=1)
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False positive rate')
    plt.ylabel('True positive rate')
    plt.title('Multi-class ROC curve')
    plt.legend(loc='lower right', fontsize=8)
    plt.show()


In [None]:
import os
import re
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.models import load_model
from tensorflow.keras import layers, Model
from tf_keras_vis.gradcam_plus_plus import GradcamPlusPlus
from tf_keras_vis.utils.model_modifiers import ReplaceToLinear
from tf_keras_vis.utils.scores import CategoricalScore

def convert_sequential_to_functional(model):
    inputs = layers.Input(shape=model.input_shape[1:])
    x = inputs
    for layer in model.layers:
        x = layer(x)
    return Model(inputs, x)

def run_gradcam_analysis(data, gene_list_csv, model_dir='cnn_model/', 
                         save_dir='gcam_res/', class_files=None, 
                         custom_objects=None, data_points=1000):
    """
    Apply GradCAM++ to all trained models and generate gene importance files.
    
    Parameters
    ----------
    data : np.ndarray
        CNN input data of shape (n_samples, height, width, 2).
    gene_list_csv : str
        Path to filtered CCIdb CSV containing columns 'A', 'B'.
    model_dir : str
        Directory containing saved CNN models (.h5).
    save_dir : str
        Output directory to store GradCAM result files.
    class_files : list of str
        List of .npz class input file names used to determine class labels.
    custom_objects : dict
        Custom metrics used in model loading.
    data_points : int
        Number of data points per class (default 1000).
    """
    os.makedirs(save_dir, exist_ok=True)

    model_names = sorted([f for f in os.listdir(model_dir) if f.endswith('.h5')])
    class_names = [re.search(r'combi-(.*)_c\d+\.npz', file).group(1) for file in class_files]

    gene_df = pd.read_csv(gene_list_csv)
    genes_A = gene_df['A'].tolist()
    genes_B = gene_df['B'].tolist()

    for model_name in model_names:
        version_suffix = model_name.split('_')[-1].replace('.h5', '')
        model_path = os.path.join(model_dir, model_name)
        print(f"🔍 Loading model: {model_path}")

        model = load_model(model_path, custom_objects=custom_objects)
        model = convert_sequential_to_functional(model)
        gradcam = GradcamPlusPlus(model, model_modifier=ReplaceToLinear(), clone=True)

        for class_index, class_name in enumerate(class_names):
            start = class_index * data_points
            end = start + data_points
            class_data = data[start:end]
            class_labels = np.full((data_points,), class_index)

            cam_tot = np.mean([
                gradcam(CategoricalScore(label), np.expand_dims(sample, axis=0), penultimate_layer=-1)
                for label, sample in zip(class_labels, class_data)
            ], axis=0)

            cam_mean = np.mean(cam_tot[0], axis=0)
            cam_norm = (cam_mean - cam_mean.min()) / (cam_mean.max() - cam_mean.min())

            output_txt = os.path.join(save_dir, f"gcamplus_result_{class_name}_{version_suffix}.txt")
            with open(output_txt, 'w') as f_out:
                f_out.write(f"TumorCell\t{class_name}\tNormalized_Weight\n")
                for a, b, w in zip(genes_A, genes_B, cam_norm):
                    f_out.write(f"{a}\t{b}\t{w}\n")

            print(f"✅ Completed: {output_txt}")


In [None]:
# 데이터 로드
data, labels, num_classes, num_lrpair = load_data(path='cnn_input_data/')

# 학습
train_and_save_model(data, labels, num_classes, num_lrpair)

# 평가
evaluate_saved_models(data, labels, num_classes)


In [None]:
from tensorflow.keras.models import load_model
from sklearn.model_selection import train_test_split

# 마지막 모델 불러오기
model_path = 'cnn_model/data_cnn-model_v010.h5'
model = load_model(model_path, custom_objects={
    'f1_m': f1_m,
    'precision_m': precision_m,
    'recall_m': recall_m
})

# 데이터 분할 (test set만)
_, test_data, _, test_labels = train_test_split(data, labels, test_size=0.2, random_state=51)

# 시각화 실행
visualize_final_model_results(model, test_data, test_labels)


In [None]:
from gradcam_module import run_gradcam_analysis

# npz로부터 로딩한 class_files와 data를 사용한다고 가정
custom_objs = {'f1_m': f1_m, 'precision_m': precision_m, 'recall_m': recall_m}
run_gradcam_analysis(data=data,
                     gene_list_csv='DB/filtered_CCIdb.csv',
                     model_dir='cnn_model/',
                     save_dir='gcam_res/',
                     class_files=class_files,
                     custom_objects=custom_objs)
