In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import ramanspy as rp
import torch

from IPython.display import clear_output

from functions.configs import *
from functions.utils import translate_confusion_matrix
from functions.data_loader import RamanDataLoader
from functions.noise_func import RamanNoiseProcessor
from functions.pipeline import RamanPipeline, SNV
from functions.visualization import RamanVisualizer
from functions.ML import RamanML, save_model_to_onnx, load_model_from_onnx

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

In [None]:
# https://datadryad.org/dataset/doi:10.5061/dryad.cjsxksn3p
PROSTATE_CANCER_DATASET_PATH = os.path.join(
    CURRENT_DIR,
    "test_rawdata",
    "A1-dataset_prostate_cancer",
    "Benign_vs_Cancer.pkl",
)

load_dataset = RamanDataLoader(PROSTATE_CANCER_DATASET_PATH)
rawdata = load_dataset.data

In [None]:
# create a subset for a given key value
chum_df = rawdata[rawdata['Cohort'] == 'CHUM']
uhn_df = rawdata[rawdata['Cohort'] == 'UHN']
chuq_df = rawdata[rawdata['Cohort'] == 'CHUQc-UL']
print(rawdata.shape, chum_df.shape, uhn_df.shape, chuq_df.shape)

In [None]:
# hirushu_dir = os.getcwd() + '/test_rawdata/Tamura/引き継ぎ/ヒルシュ'
# hirushu_dir

# normal_dfs = []
# window_size = 50s
# for k in range(1, 4):
#     csv_path = os.path.join(hirushu_dir, 'データ', 'merged_data_raw', f'Case{k}', 'normal', 'normal.csv')
#     loader = RamanDataLoader(csv_path)
#     df = loader.data
#     processor = RamanNoiseProcessor(df)
#     processed_df = processor.baselineAndGaussianNoise(window_size=window_size)
#     normal_dfs.append(processed_df)

In [None]:
def processDFA1(df: pd.DataFrame) -> Tuple[pd.DataFrame, list]:
    """
    Process the DataFrame to create a new DataFrame with interpolated spectra.
    Args:
        df (pd.DataFrame): Input DataFrame with columns 'xAxis', 'RawSpectra', 'CoreID', and 'Label'.

    Returns:
        pd.DataFrame: New DataFrame with interpolated spectra.
        list: List of labels corresponding to the spectra.
    """
    all_wavenumbers = np.unique(np.concatenate(df["xAxis"].values))
    all_wavenumbers.sort()

    data = {}
    for idx, (wn, spec, cid) in enumerate(zip(
            df["xAxis"],
            df["RawSpectra"],
            df["CoreID"])):
        # Make unique column name if needed
        col_name = f"{cid}_{idx}"
        interp_spec = np.interp(all_wavenumbers, wn, spec)
        data[col_name] = interp_spec

    # 3. Create DataFrame: index=wavenumber, columns=ids
    merged_df = pd.DataFrame(data, index=all_wavenumbers)
    merged_df.index.name = "wavenumber"

    return merged_df


region = (600, 1500)  # Raman region of interest
labels = ["benign", "cancer"]
# https://ramanspy.readthedocs.io/en/latest/preprocessing.html
# https://www.nature.com/articles/s41377-024-01394-5
preprocess_steps_test = [
    rp.preprocessing.misc.Cropper(region=region),
    rp.preprocessing.despike.WhitakerHayes(),
    rp.preprocessing.denoise.SavGol(window_length=11, polyorder=3),
    # rp.preprocessing.baseline.ASPLS(),
    rp.preprocessing.baseline.ModPoly(tol=0.001),
    # rp.preprocessing.normalise.Vector(),
    SNV()   # Use SNV normalization as in the Readme
]


chumDF_benign = RamanPipeline().preprocess(
    dfs=[processDFA1(chum_df[chum_df['Label'] == 'Benign'])],
    label=labels[0],
    region=region,
    preprocessing_steps=preprocess_steps_test,
    visualize_steps=False
)

In [None]:
chumDF_cancer = RamanPipeline().preprocess(
    dfs=[processDFA1(chum_df[chum_df['Label'] == 'Cancer'])],
    label=labels[1],
    region=region,
    preprocessing_steps=preprocess_steps_test,
    visualize_steps=False
)

In [None]:
from scipy.signal import find_peaks


def find_peaks_in_spectrum(spectrum, height=0.1, distance=10):
    """
    Find peaks in a spectrum using the find_peaks function from scipy.

    Parameters:
        spectrum (np.ndarray or rp.SpectralContainer): The spectrum data.
        height (float): Minimum height of peaks.
        distance (int): Minimum distance between peaks.

    Returns:
        np.ndarray: Indices of the found peaks.
    """
    if hasattr(spectrum, "spectral_data"):
        y = spectrum.spectral_data
    else:
        y = np.asarray(spectrum)
    if y.ndim == 1:
        peaks, _ = find_peaks(y, height=height, distance=distance)
        return peaks
    else:
        # Loop over all spectra (rows)
        all_peaks = []
        for i, row in enumerate(y):
            peaks, _ = find_peaks(row, height=height, distance=distance)
            all_peaks.append(peaks)
        return all_peaks

In [None]:
ramanML = RamanML()

In [None]:
uhnDF_benign = RamanPipeline().preprocess(
    dfs=[processDFA1(uhn_df[uhn_df['Label'] == 'Benign'])],
    label=labels[0],
    region=region,
    preprocessing_steps=preprocess_steps_test,
    visualize_steps=False
)

In [None]:
uhnDF_cancer = RamanPipeline().preprocess(
    dfs=[processDFA1(uhn_df[uhn_df['Label'] == 'Cancer'])],
    label=labels[1],
    region=region,
    preprocessing_steps=preprocess_steps_test,
    visualize_steps=False
)

In [None]:
chuqDF_benign = RamanPipeline().preprocess(
    dfs=[processDFA1(chuq_df[chuq_df['Label'] == 'Benign'])],
    label=labels[0],
    region=region,
    preprocessing_steps=preprocess_steps_test,
    visualize_steps=False
)

In [None]:
chuqDF_cancer = RamanPipeline().preprocess(
    dfs=[processDFA1(chuq_df[chuq_df['Label'] == 'Cancer'])],
    label=labels[1],
    region=region,
    preprocessing_steps=preprocess_steps_test,
    visualize_steps=False
)

In [None]:
mlresult = ramanML.train_svc(normal_data=([chumDF_benign["processed"]], labels[0]),
                             disease_data=([chumDF_cancer["processed"]], labels[1]), param_search=False, test_size=0.2,
                             SVC_model=ramanML.SVCMODEL(kernel='linear', C=1.0, gamma='scale', class_weight='balanced', probability=True),)

In [None]:
pprint(translate_confusion_matrix(
    mlresult["confusion_matrix"], labels), indent=2)
pprint(mlresult["classification_report"], indent=2)
print(
    f"CV Accuracy: {mlresult['cross_val_score'].mean():.3f} ± {mlresult['cross_val_score'].std():.3f}")
# print(f"Decision Function Score: {mlresult['decision_function_score'].mean():.3f} ± {mlresult['decision_function_score'].std():.3f}")

In [None]:
cancer_spectra = [chumDF_cancer, uhnDF_cancer, chuqDF_cancer]
benign_spectra = [chuqDF_benign]
test_spectra = [k["processed"] for k in cancer_spectra + benign_spectra]
true_labels = []
for k in cancer_spectra + benign_spectra:
    true_labels.extend(k["labels"])

predict_data = ramanML.predict(
    test_spectra=test_spectra,
    # model = mlresult["model"]
)

pprint(predict_data["label_percentages"], indent=2)
pprint(predict_data["most_common_label"])

In [None]:
predict_accuracy, plotdata = RamanVisualizer(None).confusion_matrix_heatmap(
    y_true=true_labels,          # true labels for test set
    y_pred=predict_data["y_pred"],  # predicted labels for test set
    # class names, e.g. ["benign", "cancer"]
    class_labels=labels,
    title="SVC Confusion Matrix",
    normalize=False,
    cmap="Blues",
    figsize=(8, 6),
    fmt="s",
)
print("Predict Accuracy:")
print("=====================================")
for i, (label, percentage) in enumerate(predict_accuracy.items()):
    print(f"{label}: {percentage:.2f}%")

In [None]:
# RamanVisualizer(None).pca2d(
#     spectral_data=mlresult["x_train"],
#     spectral_axis=np.arange(mlresult["x_train"].shape[1]),  # PCA does not use axis, just needs shape
#     title="PCA of SVC Training Data",
#     sample_names=mlresult["y_train"],
#     sample_limit=50  # adjust as needed
# )

In [None]:
saved_model = save_model_to_onnx(model=mlresult["model"],
                                 labels=labels,
                                 filename="SVC_raman_prostate_model1",
                                 metadata={"model_type": "SVC",
                                           "model_name": "SVC raman prostate linear",
                                           "model_version": "1.0",
                                           "model_description": "SVC model for prostate cancer classification based on Raman spectroscopy data kernel linear.",
                                           "model_author": "MUHAMMAD HELMI",
                                           "model_date": "2025-05-09", }
                                 )

In [None]:
# import shap

# # Use only 100 representative samples for background
# background_data_small = shap.sample(mlresult["x_train"], 100, random_state=42)

# # Run SHAP explanation
# shap_result = ramanML.shap_explain(
#     background_data=background_data_small,
#     test_data=mlresult["x_test"],
#     nsamples=100
# )