# **XAI4Spectra**

# **Loading data**

In [1]:
# importing the necessary libraries
import pandas as pd
import numpy as np
import kennard_stone as ks

# loading a soil spectral dataset based on X-ray fluorescence (XRF)

data_complete = pd.read_csv('https://raw.githubusercontent.com/joseviniciusr/XAI4Spectra/refs/heads/main/Toledo22.csv', sep=';')
data = data_complete.loc[:, '1':'15']
data.insert(0, 'exCa', data_complete['exCa'])  # inserting the target variable (e.g., exCa (exchangeable calcium))

## **PLS- (R or DA) modeling**

In [2]:
def explained_variance_from_scores(X, T, P, Q=None, Y=None):
    """
    Calculate percent variance explained (based on PCTVAR Matlab function) for X and Y
    by using the scores T and loadings P (and optionally Q for Y).
    Parameters
    ----------
    - **X** : array-like, shape (n_samples, n_features)
        X matrix used in PLS.
    - **T** : array-like, shape (n_samples, n_components)
        Scores matrix from PLS.
    - **P** : array-like, shape (n_features, n_components)
        Loadings matrix for X from PLS.
    - **Q** : array-like, shape (n_targets, n_components), optional
        Loadings matrix for Y from PLS. Required if Yc is provided.
    - **Y** : array-like, shape (n_samples, n_targets), optional
       Y matrix used in PLS.
    Returns
    -------
    - result : dict with keys:
        - **'varX_cumulative'** : ndarray shape (n_components,)
            Percent cumulative variance of X explained by 1..j components.
        - **'varX_per_component'** : ndarray shape (n_components,)
            Percent variance of X explained per component.
        - **'varY_cumulative'** : ndarray shape (n_components,), or None
            Percent cumulative variance of Y explained by 1..j components (if Yc and Q provided).
        - **'varY_per_component'** : ndarray shape (n_components,), or None
            Percent variance of Y explained per component (if Yc and Q provided).
    """
    import numpy as np
    X = np.asarray(X, dtype=float) # X preprocessed data
    T = np.asarray(T, dtype=float) # scores
    P = np.asarray(P, dtype=float) # loadings for X

    n_comp = T.shape[1]
    TSS_X = np.sum(X ** 2) # total sum of squares of X
    if TSS_X == 0: # avoid division by zero
        raise ValueError("TSS_X == 0 (X does not have variability).")

    pctvarX_cum = np.zeros(n_comp, dtype=float) # cumulative percent variance for X

    for j in range(1, n_comp + 1): # loop over components
        Xhat_j = T[:, :j] @ P[:, :j].T # reconstructed X using j components
        SS_Xhat_j = np.sum(Xhat_j ** 2) # sum of squares of reconstructed X
        pctvarX_cum[j-1] = 100.0 * SS_Xhat_j / TSS_X # percent variance explained cumulativa
    
    # incremental (per component)
    pctvarX_per = np.empty_like(pctvarX_cum) # incremental percent variance for X
    pctvarX_per[0] = pctvarX_cum[0] # first component
    pctvarX_per[1:] = pctvarX_cum[1:] - pctvarX_cum[:-1] # rest

    # Y (if provided)
    pctvarY_cum = None # cumulative percent variance for Y
    pctvarY_per = None # incremental percent variance for Y
    if Q is not None and Y is not None: # if Y loadings and Y centered provided
        Q = np.asarray(Q, dtype=float) # loadings for Y
        Y = np.asarray(Y, dtype=float) # centered (and possibly scaled) Y
        TSS_Y = np.sum(Y ** 2) # total sum of squares of Y
        if TSS_Y == 0: # avoid division by zero
            pctvarY_cum = np.zeros(n_comp, dtype=float) # all zeros if Y has no variance
            pctvarY_per = np.zeros(n_comp, dtype=float) # all zeros
        else:
            pctvarY_cum = np.zeros(n_comp, dtype=float) # cumulative percent variance for Y
            for j in range(1, n_comp + 1): # loop over components
                Yhat_j = T[:, :j] @ Q[:, :j].T # reconstructed Y using j components
                SS_Yhat_j = np.sum(Yhat_j ** 2) # sum of squares of reconstructed Y
                pctvarY_cum[j-1] = 100.0 * SS_Yhat_j / TSS_Y # percent variance explained cumulativa
            pctvarY_per = np.empty_like(pctvarY_cum) # incremental percent variance for Y
            pctvarY_per[0] = pctvarY_cum[0] # first component
            pctvarY_per[1:] = pctvarY_cum[1:] - pctvarY_cum[:-1] # rest

        return {
            'varX_cumulative': pctvarX_cum[-1],
            'varX_per_component': pctvarX_per[-1],
            'varY_cumulative': pctvarY_cum[-1],
            'varY_per_component': pctvarY_per[-1]
            }         


def pls_optimized(Xcal, ycal, LVmax, Xpred=None, ypred=None, aim='regression', cv=10):
    """
    ## PLS optimized
    Function to fit a PLS regression or PLS-DA model with optimization of latent variables (LVs)
    using cross-validation. It calculates various performance metrics for calibration, cross-validation,
    and prediction (if provided) datasets
    **Parameters**:
    - **Xcal** : pd.DataFrame
        Calibration dataset features.
    - **ycal** : pd.Series or np.ndarray
        Calibration dataset target variable (regression) or binary class labels (classification).
    - **LVmax** : int
        Maximum number of latent variables to consider.
    - **Xpred** : pd.DataFrame, optional
        Prediction dataset features. Default is None.
    - **ypred** : pd.Series or np.ndarray, optional
        Prediction dataset target variable (regression) or binary class labels (classification). Default is None.
    - **aim** : str, optional
        Type of analysis: 'regression' for PLS regression or 'classification' for PLS-DA. Default is 'regression'.
    - **cv** : int, optional
        Number of cross-validation folds. Default is 10
        
    **Returns**:
    - **df_results** : pd.DataFrame
        DataFrame containing performance metrics for each number of latent variables.
    - **calres** : pd.DataFrame
        DataFrame containing predicted values for the calibration dataset.
    - **predres** : pd.DataFrame
        DataFrame containing predicted values for the prediction dataset (if provided).
    """

    import numpy as np
    import pandas as pd

    if aim == 'regression': # regression (PLSR)
        from sklearn.cross_decomposition import PLSRegression
        from sklearn.model_selection import cross_val_predict
        from sklearn.metrics import mean_squared_error, r2_score
        from scipy.stats import iqr

        results = [] # list to store results for each LV
        calres = pd.DataFrame(index=range(len(ycal))) # calibration results
        predres = pd.DataFrame(index=range(len(ypred))) if (Xpred is not None and ypred is not None) else None # prediction results

        for n_comp in range(1, LVmax + 1): # loop over number of components
            plsr = PLSRegression(n_components=n_comp, scale=False)
            plsr.fit(Xcal, ycal)
            y_cal = plsr.predict(Xcal).flatten()
            calres[f'LV_{n_comp}'] = y_cal

            y_cv = cross_val_predict(plsr, Xcal, ycal, cv=cv) # cross-validated predictions
            y_cv = np.array(y_cv).flatten()

            R2_cal = r2_score(ycal, y_cal) # determination coefficient
            r2_cal = np.corrcoef(ycal, y_cal)[0, 1] ** 2 # correlation coefficient squared
            rmse_cal = np.sqrt(mean_squared_error(ycal, y_cal))
            R2_cv = r2_score(ycal, y_cv)
            r2_cv = np.corrcoef(ycal, y_cv)[0, 1] ** 2
            rmsecv = np.sqrt(mean_squared_error(ycal, y_cv))
            rpd_cv = ycal.std() / rmsecv if rmsecv != 0 else np.nan
            rpiq_cv = iqr(ycal, rng=(25, 75)) / rmsecv if rmsecv != 0 else np.nan
            bias_cv = np.sum(ycal - y_cv) / ycal.shape[0]
            SDV_cv = (ycal - y_cv) - bias_cv
            SDV_cv = np.sqrt(np.sum(SDV_cv * SDV_cv) / (ycal.shape[0] - 1)) if ycal.shape[0] > 1 else np.nan
            tbias_cv = abs(bias_cv) * (np.sqrt(ycal.shape[0]) / SDV_cv) if SDV_cv not in (0, np.nan) else np.nan
            
            # explained variance
            exp_var = explained_variance_from_scores(Xcal, plsr.x_scores_, plsr.x_loadings_,
                                               Q=plsr.y_loadings_, Y=ycal) # explained variance
            
            if Xpred is not None and ypred is not None: # prediction set
                y_pred = plsr.predict(Xpred).flatten()
                predres[f'LV_{n_comp}'] = y_pred

                R2_pred = r2_score(ypred, y_pred) # determination coefficient
                r2_pred = np.corrcoef(ypred, y_pred)[0, 1] ** 2 # correlation coefficient squared
                rmsep = np.sqrt(mean_squared_error(ypred, y_pred))
                rpd_pred = ypred.std() / rmsep if rmsep != 0 else np.nan
                rpiq_pred = iqr(ypred, rng=(25, 75)) / rmsep if rmsep != 0 else np.nan
                bias_pred = np.sum(ypred - y_pred) / ypred.shape[0]
                SDV_pred = (ypred - y_pred) - bias_pred
                SDV_pred = np.sqrt(np.sum(SDV_pred * SDV_pred) / (ypred.shape[0] - 1)) if ypred.shape[0] > 1 else np.nan
                tbias_pred = abs(bias_pred) * (np.sqrt(ypred.shape[0]) / SDV_pred) if SDV_pred not in (0, np.nan) else np.nan
            else:
                r2_pred = rmsep = rpd_pred = rpiq_pred = bias_pred = tbias_pred = None

            results.append({
                'LVs': n_comp,
                'R2_Cal': R2_cal,
                'r2_Cal': r2_cal,
                'RMSEC': rmse_cal,
                'R2_CV': R2_cv,
                'r2_Cv': r2_cv,
                'RMSECV': rmsecv,
                'RPD_CV': rpd_cv,
                'RPIQ_CV': rpiq_cv,
                'Bias_CV': bias_cv,
                'tbias_CV': tbias_cv,
                'R2_Pred': R2_pred,
                'r2_Pred': r2_pred,
                'RMSEP': rmsep,
                'RPD_Pred': rpd_pred,
                'RPIQ_Pred': rpiq_pred,
                'Bias_Pred': bias_pred,
                'tbias_Pred': tbias_pred,
                'X_Cum_Exp_Var' : exp_var['varX_cumulative'],
                'Y_Cum_Exp_Var' : exp_var['varY_cumulative'],
                'X_Ind_Exp_Var' : exp_var['varX_per_component'],
                'Y_Ind_Exp_Var' : exp_var['varY_per_component']
            })

        model = plsr  # last model fitted
        df_results = pd.DataFrame(results)
        calres.insert(0, 'Ref', np.array(ycal))
        if predres is not None:
            predres.insert(0, 'Ref', np.array(ypred))

    elif aim == 'classification': # classification (PLS-DA)
        from sklearn.cross_decomposition import PLSRegression
        from sklearn.model_selection import cross_val_predict
        from sklearn.metrics import accuracy_score, confusion_matrix

        results = []
        calres = pd.DataFrame(index=range(len(ycal))) # calibration results
        predres = pd.DataFrame(index=range(len(ypred))) if (Xpred is not None and ypred is not None) else None # prediction results

        # ensure binary classes
        ycal_series = pd.Series(ycal).reset_index(drop=True) # ensure it's a Series
        unique_labels = ycal_series.unique() # unique class labels
        if len(unique_labels) != 2: # check for binary classification
            raise ValueError(f"PLS-DA (this function) expects 2 classes (binary). Found: {unique_labels}")

        label_to_num = {lab: idx for idx, lab in enumerate(unique_labels)} # mapping labels to 0 and 1
        num_to_label = {idx: lab for lab, idx in label_to_num.items()} # reverse mapping for predictions
       
        # prepare ycal numeric
        ycal_numeric = np.array([label_to_num[i] for i in ycal]) 

        # prepare ypred numeric if provided
        ypred_numeric = None
        if ypred is not None:
            ypred_numeric = np.array([label_to_num[i] for i in ypred])

        for n_comp in range(1, LVmax + 1): # loop over number of components
            plsda = PLSRegression(n_components=n_comp, scale=False)
            plsda.fit(Xcal, ycal_numeric)

            # calibration continuous predictions -> binarize
            y_cal_cont = plsda.predict(Xcal).flatten()
            y_cal_bin = (y_cal_cont >= 0.5).astype(int)
            y_cal_class = np.array([num_to_label[i] for i in y_cal_bin])
            calres[f'LV_{n_comp}'] = y_cal_class

            # cross-validated continuous predictions -> binarize
            y_cv_cont = cross_val_predict(plsda, Xcal, ycal_numeric, cv=cv)
            y_cv_cont = np.array(y_cv_cont).flatten()
            y_cv_bin = (y_cv_cont >= 0.5).astype(int)

            # metrics
            acc_cal = accuracy_score(ycal_numeric, y_cal_bin)
            cm_cal = confusion_matrix(ycal_numeric, y_cal_bin)
            # safe unpack for binary confusion matrix
            if cm_cal.size == 4:
                tn, fp, fn, tp = cm_cal.ravel()
            else:
                tn = fp = fn = tp = np.nan
            sensitivity = tp / (tp + fn) if (tp + fn) > 0 else np.nan
            specificity = tn / (tn + fp) if (tn + fp) > 0 else np.nan

            acc_cv = accuracy_score(ycal_numeric, y_cv_bin)
            cm_cv = confusion_matrix(ycal_numeric, y_cv_bin)
            if cm_cv.size == 4:
                tn_cv, fp_cv, fn_cv, tp_cv = cm_cv.ravel()
            else:
                tn_cv = fp_cv = fn_cv = tp_cv = np.nan
            sensitivity_cv = tp_cv / (tp_cv + fn_cv) if (tp_cv + fn_cv) > 0 else np.nan
            specificity_cv = tn_cv / (tn_cv + fp_cv) if (tn_cv + fp_cv) > 0 else np.nan

            # explained variance
            exp_var = explained_variance_from_scores(Xcal, plsda.x_scores_, plsda.x_loadings_,
                                               Q=plsda.y_loadings_, Y=ycal_numeric.reshape(-1, 1)) # explained variance

            # prediction set (if provided)
            if Xpred is not None and ypred is not None:
                y_pred_cont = plsda.predict(Xpred).flatten()
                y_pred_bin = (y_pred_cont >= 0.5).astype(int)
                y_pred_class = np.array([num_to_label[i] for i in y_pred_bin])
                predres[f'LV_{n_comp}'] = y_pred_class

                acc_pred = accuracy_score(ypred_numeric, y_pred_bin)
                cm_pred = confusion_matrix(ypred_numeric, y_pred_bin)
                if cm_pred.size == 4:
                    tn_p, fp_p, fn_p, tp_p = cm_pred.ravel()
                else:
                    tn_p = fp_p = fn_p = tp_p = np.nan
                sensitivity_p = tp_p / (tp_p + fn_p) if (tp_p + fn_p) > 0 else np.nan
                specificity_p = tn_p / (tn_p + fp_p) if (tn_p + fp_p) > 0 else np.nan
            else:
                acc_pred = sensitivity_p = specificity_p = cm_pred = tn_p = fp_p = fn_p = tp_p = None

            results.append({
                'LVs': n_comp,
                'Accuracy Cal': acc_cal,
                'Sensitivity Cal': sensitivity,
                'Specificity Cal': specificity,
                'CM Cal': cm_cal,
                'Accuracy CV': acc_cv,
                'Sensitivity CV': sensitivity_cv,
                'Specificity CV': specificity_cv,
                'CM CV': cm_cv,
                'Accuracy Pred': acc_pred,
                'Sensitivity Pred': sensitivity_p,
                'Specificity Pred': specificity_p,
                'CM Pred': cm_pred,
                'X Cum Exp Var' : exp_var['varX_cumulative'],
                'Y Cum Exp Var' : exp_var['varY_cumulative'],
                'X Ind Exp Var' : exp_var['varX_per_component'],
                'Y Ind Exp Var' : exp_var['varY_per_component']
            })

        model = plsda  # last model fitted
        df_results = pd.DataFrame(results)
        calres.insert(0, 'Ref', np.array(ycal))
        if predres is not None:
            predres.insert(0, 'Ref', np.array(ypred))

    else:
        raise ValueError("Parameter `aim` must be 'regression' or 'classification'.")

    return df_results, calres, predres, model

In [3]:
def extract_spectral_zones(Xcal, cuts):
    """
    Extract spectral zones from a DataFrame based on specified cuts.
    
    Parameters
    ----------
    - **Xcal** : pd.DataFrame
        DataFrame with spectral data, where columns are wavelengths/energies.
    - **cuts** : list of tuples/lists or dicts
        Each item defines a spectral zone to extract.
        - If tuple/list: (start, end) or (name, start, end)
        - If dict: {'name': str, 'start': float, 'end': float}
    
    Returns
    -------
    - **zones** : dict
        Dictionary where keys are zone names and values are DataFrames with the extracted spectral zones.
    """
    import numpy as np
    import pandas as pd

    # convert the column names to numeric when possible (NaN when not convertible)
    col_nums = pd.to_numeric(Xcal.columns.astype(str), errors='coerce')
    zones = {} # dictionary to store extracted zones

    for cut in cuts:
        # normalize cut format
        if isinstance(cut, dict): # if dict
            name = cut.get('name', f"{cut.get('start')}-{cut.get('end')}") # default name if not provided
            start = cut.get('start') # getting start value
            end = cut.get('end') # getting end value
        elif isinstance(cut, (list, tuple)): # if list/tuple
            if len(cut) == 2: 
                start, end = cut # getting start and end values
                name = f"{start}-{end}" # default name
            elif len(cut) == 3: # if name provided
                name, start, end = cut # getting name, start and end values
            else:
                raise ValueError("Cuts in tuple/list format must have 2 or 3 elements.")
        else:
            raise ValueError("Each cut must be a dict or a tuple/list.")

        # validate start and end
        try:
            s = float(start)
            e = float(end)
        except Exception: # Exception for conversion errors
            raise ValueError("star and end must be numeric values (int/float or convertible strings).")

        if s > e: # swap if necessary
            s, e = e, s

        # to select columns whose numeric value is in the interval [s, e]
        mask = (~np.isnan(col_nums)) & (col_nums >= s) & (col_nums <= e)
        selected_cols = list(Xcal.columns[mask])

        # piecing the zone DataFrame into the dictionary
        zones[name] = Xcal.loc[:, selected_cols]

    return zones

# **Regression case**

In [4]:
# splitting the data into calibration and prediction sets by kennard-stone algorithm
datacal_reg, datapred_reg = ks.train_test_split(data, test_size=0.25)
Xcalreg = datacal_reg.iloc[:, 1:].reset_index(drop=True)
ycalreg = datacal_reg.iloc[:, 0].reset_index(drop=True)
Xpredreg = datapred_reg.iloc[:, 1:].reset_index(drop=True)
ypredreg = datapred_reg.iloc[:, 0].reset_index(drop=True)

2025-11-02 14:21:28,674 - kennard_stone.utils._pairwise:109[INFO] - Calculating pairwise distances using scikit-learn.

2025-11-02 14:21:28,682 - kennard_stone.utils._pairwise:109[INFO] - Calculating pairwise distances using scikit-learn.



In [5]:
# preprocessings
import preprocessings as prepr # preprocessing methods for XRF data

Xcalreg_prep, mean_calreg, mean_calreg_poisson  = prepr.poisson(Xcalreg, mc=True)
Xpredreg_prep = ((Xpredreg/np.sqrt(mean_calreg)) - mean_calreg_poisson)

In [6]:
plsr_results = pls_optimized(Xcalreg_prep, ycalreg, 
                             LVmax=5, 
                             Xpred=Xpredreg_prep,
                             ypred=ypredreg,
                             aim='regression',
                             cv=10)
plsr_results[0]

Unnamed: 0,LVs,R2_Cal,r2_Cal,RMSEC,R2_CV,r2_Cv,RMSECV,RPD_CV,RPIQ_CV,Bias_CV,...,r2_Pred,RMSEP,RPD_Pred,RPIQ_Pred,Bias_Pred,tbias_Pred,X_Cum_Exp_Var,Y_Cum_Exp_Var,X_Ind_Exp_Var,Y_Ind_Exp_Var
0,1,0.656883,0.656883,1.051658,0.62277,0.623123,1.102696,1.633304,2.217293,-0.002055,...,0.450342,1.150487,1.25741,1.721011,-0.436808,2.959459,22.692968,11.941667,22.692968,11.941667
1,2,0.736333,0.736333,0.921893,0.65148,0.664269,1.059904,1.699246,2.306812,-0.00596,...,0.728276,0.880903,1.642216,2.247694,-0.334291,2.957763,43.587589,13.386025,20.894621,1.444358
2,3,0.745287,0.745287,0.906106,0.634746,0.658008,1.085052,1.659864,2.253349,-0.012925,...,0.794095,0.797528,1.813896,2.482671,-0.25507,2.434145,71.133516,13.54879,27.545927,0.162765
3,4,0.769532,0.769532,0.861903,0.625823,0.661211,1.098225,1.639954,2.226319,-0.035702,...,0.823721,0.755711,1.914269,2.620051,-0.302328,3.147724,80.643752,13.989554,9.510235,0.440764
4,5,0.781251,0.781251,0.839704,0.629997,0.667999,1.092083,1.649177,2.238841,-0.037054,...,0.791529,0.782207,1.849425,2.531299,-0.302154,3.019937,90.924579,14.202595,10.280828,0.213041


In [7]:
pd.options.plotting.backend = 'plotly' # setting plotly as the backend for pandas plotting 
Xcalreg.T.plot() # easily plotting the spectra 

In [8]:
# establishing spectral cuts based on expert knowledge of XRF spectra
spectral_cuts = [
('Al', 1.38, 1.60),
('Si', 1.64, 1.84),
('P', 1.94, 2.10),
('S', 2.20, 2.44),
('Rh L + Ar', 2.56, 3.10),
('K', 3.22, 3.42),
('Ca ka', 3.58, 3.82),
('Ca kb', 3.92, 4.14),
('Ti ka', 4.38, 4.66),
('Ti kb', 4.82, 5.06),
('Mn', 5.78, 6.02),
('Fe ka', 6.26, 6.56),
('Fe kb', 6.92, 7.22),
('Cu', 7.92, 8.20)
]

spectral_zones_reg = extract_spectral_zones(Xcalreg, spectral_cuts) # extracting the spectral zones
spectral_zones_reg['Fe ka'].T.plot(title='spectral zone') # plotting the Al ka spectral zone

In [9]:
plsr_spectral_zone_results = {} # dictionary to store the results for each spectral zone
for zone_name, Xcalreg_zone in spectral_zones_reg.items():
    # preparing the corresponding Xpred zone
    col_zone = Xcalreg_zone.columns # getting the columns of the spectral zone
    Xpredreg_zone = Xpredreg.loc[:, col_zone] # selecting the same columns in Xpredreg

    # preprocessing
    Xcalreg_zone_prep, mean_calreg_zone, mean_calreg_poisson_zone  = prepr.poisson(Xcalreg_zone, mc=True)
    Xpredreg_zone_prep = ((Xpredreg_zone/np.sqrt(mean_calreg_zone)) - mean_calreg_poisson_zone)

    # PLSR with optimized latent variables for the spectral zone
    plsr_zone_result = pls_optimized(Xcalreg_zone_prep, 
                                      ycalreg,
                                      LVmax=5,
                                      Xpred=Xpredreg_zone_prep,
                                      ypred=ypredreg,
                                      aim='regression',
                                      cv=10)
    plsr_spectral_zone_results[zone_name] = plsr_zone_result # storing only the results DataFrame

r2_pred_list = []
for zone_name, results_df in plsr_spectral_zone_results.items():
    r2_pred = results_df[0].iloc[-1]['r2_Pred'] # get R2 Pred for the last row (highest LV)
    r2_pred_list.append({'Spectral Zone': zone_name, 'r2_Pred': r2_pred}) # joining the results
r2_pred_df = pd.DataFrame(r2_pred_list)
r2_pred_df.plot(kind='line', x='Spectral Zone', y='r2_Pred', title='r2 Pred by Spectral Zone')    

In [10]:
Y_cum_var_list = [] # list to store Y Cum Var for each zone
for zone_name, results_df in plsr_spectral_zone_results.items(): # iterating over the spectral zones
    y_cum_var = results_df[0].iloc[-1]['Y_Cum_Exp_Var']    # get Y Cum Exp Var for the last row (highest LV)
    Y_cum_var_list.append({'Spectral Zone': zone_name, 'Y_Cum_Exp_Var': y_cum_var}) # appending to the list
Y_cum_var_df = pd.DataFrame(Y_cum_var_list)
Y_cum_var_df.plot(kind='line', x='Spectral Zone', y='Y_Cum_Exp_Var', title='Y Cum Exp Var by Spectral Zone') # plotting the results

# **Classification case**

In [11]:
# Creating a new column 'Class' based on the condition of 'BSP' values
data_complete['Class'] = np.where(data_complete['BSP'] > 50.00, 'eut', 'dist') # eutrophic (eut) if BSP > 50.00 (higher fertility), otherwise dystrophic (dist)
data_eut = data_complete[data_complete['Class'] == 'eut'].reset_index(drop=True)
data_dist = data_complete[data_complete['Class'] == 'dist'].reset_index(drop=True)

In [12]:
# splitting the data into calibration and prediction sets by kennard-stone algorithm
Xeut_cal, Xeut_pred = ks.train_test_split(data_eut.loc[:, '1':'15'], test_size=0.30) # class eutrophic
Xeut_cal = Xeut_cal.reset_index(drop=True)
Xeut_pred = Xeut_pred.reset_index(drop=True)

Xdist_cal, Xdist_pred = ks.train_test_split(data_dist.loc[:, '1':'15'], test_size=0.30) # class dystrophic
Xdist_cal = Xdist_cal.reset_index(drop=True)
Xdist_pred = Xdist_pred.reset_index(drop=True)

Xcalclass = pd.concat([Xeut_cal, Xdist_cal], axis=0).reset_index(drop=True) # concatenating both classes
Xpredclass = pd.concat([Xeut_pred, Xdist_pred], axis=0).reset_index(drop=True)
ycalclass = pd.Series(['eut']*Xeut_cal.shape[0] + ['dist']*Xdist_cal.shape[0]) # creating the target variable for calibration set
ypredclass = pd.Series(['eut']*Xeut_pred.shape[0] + ['dist']*Xdist_pred.shape[0]) # creating the target variable for prediction set


'force_all_finite' was renamed to 'ensure_all_finite' in 1.6 and will be removed in 1.8.

2025-11-02 14:21:41,921 - kennard_stone.utils._pairwise:109[INFO] - Calculating pairwise distances using scikit-learn.




'force_all_finite' was renamed to 'ensure_all_finite' in 1.6 and will be removed in 1.8.

2025-11-02 14:21:41,926 - kennard_stone.utils._pairwise:109[INFO] - Calculating pairwise distances using scikit-learn.


'force_all_finite' was renamed to 'ensure_all_finite' in 1.6 and will be removed in 1.8.


'force_all_finite' was renamed to 'ensure_all_finite' in 1.6 and will be removed in 1.8.

2025-11-02 14:21:41,950 - kennard_stone.utils._pairwise:109[INFO] - Calculating pairwise distances using scikit-learn.


'force_all_finite' was renamed to 'ensure_all_finite' in 1.6 and will be removed in 1.8.

2025-11-02 14:21:41,955 - kennard_stone.utils._pairwise:109[INFO] - Calculating pairwise distances using scikit-learn.


'force_all_finite' was renamed to 'ensure_all_finite' in 1.6 and will be removed in 1.8.



In [13]:
# preprocessings
import preprocessings as prepr # preprocessing methods for XRF data

Xcalclass_prep, mean_calclass, mean_calclass_poisson  = prepr.poisson(Xcalclass, mc=True)
Xpredclass_prep = ((Xpredclass/np.sqrt(mean_calclass)) - mean_calclass_poisson)

In [14]:
# performing PLS-DA with optimized latent variables
plsda_results = pls_optimized(Xcalclass_prep, 
                              ycalclass,
                              LVmax=4,
                              Xpred=Xpredclass_prep,
                              ypred=ypredclass,
                              aim='classification',
                              cv=10)
plsda_results[0]

Unnamed: 0,LVs,Accuracy Cal,Sensitivity Cal,Specificity Cal,CM Cal,Accuracy CV,Sensitivity CV,Specificity CV,CM CV,Accuracy Pred,Sensitivity Pred,Specificity Pred,CM Pred,X Cum Exp Var,Y Cum Exp Var,X Ind Exp Var,Y Ind Exp Var
0,1,0.804054,0.84507,0.766234,"[[59, 18], [11, 60]]",0.722973,0.746479,0.701299,"[[54, 23], [18, 53]]",0.75,0.870968,0.636364,"[[21, 12], [4, 27]]",24.084452,18.48632,24.084452,18.48632
1,2,0.851351,0.915493,0.792208,"[[61, 16], [6, 65]]",0.810811,0.873239,0.753247,"[[58, 19], [9, 62]]",0.84375,0.967742,0.727273,"[[24, 9], [1, 30]]",46.025807,20.883518,21.941354,2.397198
2,3,0.831081,0.901408,0.766234,"[[59, 18], [7, 64]]",0.797297,0.84507,0.753247,"[[58, 19], [11, 60]]",0.828125,0.967742,0.69697,"[[23, 10], [1, 30]]",70.291459,21.960174,24.265653,1.076656
3,4,0.871622,0.887324,0.857143,"[[66, 11], [8, 63]]",0.77027,0.84507,0.701299,"[[54, 23], [11, 60]]",0.875,0.967742,0.787879,"[[26, 7], [1, 30]]",78.655943,24.395837,8.364483,2.435663


In [15]:
# Xcal_prep.to_csv('Xcal_prep_calss.csv', index=False, sep=';')
# Xpred_prep.to_csv('Xpred_prep_class.csv', index=False, sep=';')
# ycal.to_csv('ycal_class.csv', index=False, sep=';')
# ypred.to_csv('ypred_class.csv', index=False, sep=';')

In [16]:
spectral_zones_class = extract_spectral_zones(Xcalclass, spectral_cuts) # extracting the spectral zones
spectral_zones_class['Fe ka'].T.plot(title='spectral zone') # plotting the Al ka spectral zone

In [17]:
plsda_spectral_zone_results = {}
for zone_name, Xcalclass_zone in spectral_zones_class.items():
    # preparing the corresponding Xpred zone
    col_zone = Xcalclass_zone.columns # getting the columns of the zone
    Xpred_zone = Xpredclass.loc[:, col_zone] # selecting the columns from Xpredclass

    # preprocessing
    Xcalclass_zone_prep, mean_calclass_zone, mean_calclass_poisson_zone  = prepr.poisson(Xcalclass_zone, mc=True)
    Xpredclass_zone_prep = ((Xpred_zone/np.sqrt(mean_calclass_zone)) - mean_calclass_poisson_zone)

    # PLS-DA with optimized latent variables for the spectral zone
    plsda_zone_result = pls_optimized(Xcalclass_zone_prep, 
                                      ycalclass,
                                      LVmax=1,
                                      Xpred=Xpredclass_zone_prep,
                                      ypred=ypredclass,
                                      aim='classification',
                                      cv=10)
    plsda_spectral_zone_results[zone_name] = plsda_zone_result # storing only the results DataFrame

accuracy_pred_list = [] # list to store Accuracy Pred for each zone
for zone_name, results_df in plsda_spectral_zone_results.items(): # iterating over the spectral zones
    accuracy_pred = results_df[0].iloc[-1]['Accuracy Pred']    # get Accuracy Pred for the last row (highest LV)
    accuracy_pred_list.append({'Spectral Zone': zone_name, 'Accuracy Pred': accuracy_pred}) # appending to the list
accuracy_pred_df = pd.DataFrame(accuracy_pred_list)
accuracy_pred_df.plot(kind='line', x='Spectral Zone', y='Accuracy Pred', title='Accuracy Pred by Spectral Zone') # plotting the results

In [18]:
Y_cum_var_list = [] # list to store Y Cum Var for each zone
for zone_name, results_df in plsda_spectral_zone_results.items(): # iterating over the spectral zones
    y_cum_var = results_df[0].iloc[-1]['Y Cum Exp Var']    # get Y Cum Exp Var for the last row (highest LV)
    Y_cum_var_list.append({'Spectral Zone': zone_name, 'Y Cum Exp Var': y_cum_var}) # appending to the list
Y_cum_var_df = pd.DataFrame(Y_cum_var_list)
Y_cum_var_df.plot(kind='line', x='Spectral Zone', y='Y Cum Exp Var', title='Y Cum Exp Var by Spectral Zone') # plotting the results

# **Extracting predicates**

In [None]:
# calculating the median for each spectral zone DataFram with respect to the variables (columns)
zone_medians_df = pd.DataFrame({
    zone: df.median(axis=1) for zone, df in spectral_zones_class.items()
})
zone_medians_df

Unnamed: 0,Al,Si,P,S,Rh L + Ar,K,Ca ka,Ca kb,Ti ka,Ti kb,Mn,Fe ka,Fe kb,Cu
0,1.7125,6.240,0.390,0.510,6.1700,1.490,2.330,1.1950,25.970,8.470,3.875,232.2025,37.7700,1.175
1,2.0350,5.985,0.430,0.530,6.3050,1.585,2.655,1.2300,22.205,7.260,3.195,221.2225,36.2100,1.110
2,1.7775,6.295,0.330,0.510,6.2475,1.380,1.705,1.1325,27.480,7.925,3.650,230.0525,37.3225,1.165
3,1.9925,5.520,0.340,0.520,6.3200,1.260,2.390,1.2200,26.665,8.585,3.785,243.6350,39.6275,1.095
4,2.0700,6.515,0.340,0.525,6.3525,1.370,1.675,1.0825,25.210,8.140,3.510,233.2850,37.9650,1.150
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
143,2.0950,5.465,0.375,0.525,6.1550,1.265,1.395,1.1400,18.485,6.595,4.280,253.3025,41.0225,1.170
144,1.8450,5.770,0.570,0.570,6.1800,1.330,1.590,1.1650,28.400,7.530,3.430,234.1200,37.9850,1.160
145,1.9800,5.810,0.340,0.530,6.3450,1.330,0.920,1.0500,29.300,7.800,3.440,237.8750,38.9850,1.140
146,1.8400,4.750,0.325,0.530,5.9850,1.245,1.350,0.9900,20.190,6.970,2.920,217.4650,35.3550,1.070


In [36]:
# calculating the quantiles for each column of zone_medians_df
zone_quantiles = zone_medians_df.quantile([0.25, 0.5, 0.75])
zone_quantiles

Unnamed: 0,Al,Si,P,S,Rh L + Ar,K,Ca ka,Ca kb,Ti ka,Ti kb,Mn,Fe ka,Fe kb,Cu
0.25,1.99,5.24,0.33,0.505,6.113125,1.25,1.315,1.054375,21.895,7.2825,3.02875,231.33375,37.51875,1.125
0.5,2.11375,5.645,0.36,0.52,6.2125,1.33,1.57,1.1075,23.395,7.685,3.19,237.545,38.46375,1.165
0.75,2.215625,5.94125,0.41,0.545,6.300625,1.44,2.085,1.195,25.21625,8.08,3.59375,242.280625,39.27375,1.21


In [56]:
# extraindo predicados baseados nos quartis (incluindo o 50%)
zone_predicate_list = []
predicate_num = 1
for zone in zone_medians_df.columns:
    q25 = zone_quantiles.loc[0.25, zone]
    q50 = zone_quantiles.loc[0.50, zone]
    q75 = zone_quantiles.loc[0.75, zone]

    # low
    zone_predicate_list.append({
        'predicate': f'P{predicate_num}',
        'rule': f"{zone} <= {q25:.2f}",
        'zone': zone,
        'thresholds': f"{q25:.2f}",
        'operator': "<="
    })
    predicate_num += 1
    # low-medium
    zone_predicate_list.append({
        'predicate': f'P{predicate_num}',
        'rule': f"{zone} > {q25:.2f} and {zone} <= {q50:.2f}",
        'zone': zone,
        'thresholds': f"{q25:.2f},{q50:.2f}",
        'operator': "> and <="
    })
    predicate_num += 1
    # medium-high
    zone_predicate_list.append({
        'predicate': f'P{predicate_num}',
        'rule': f"{zone} > {q50:.2f} and {zone} <= {q75:.2f}",
        'zone': zone,
        'thresholds': f"{q50:.2f},{q75:.2f}",
        'operator': "> and <="
    })
    predicate_num += 1
    # high
    zone_predicate_list.append({
        'predicate': f'P{predicate_num}',
        'rule': f"{zone} > {q75:.2f}",
        'zone': zone,
        'thresholds': f"{q75:.2f}",
        'operator': ">"
    })
    predicate_num += 1

predicates_df = pd.DataFrame(zone_predicate_list)
predicates_df

Unnamed: 0,predicate,rule,zone,thresholds,operator
0,P1,Al <= 1.99,Al,1.99,<=
1,P2,Al > 1.99 and Al <= 2.11,Al,"1.99,2.11",> and <=
2,P3,Al > 2.11 and Al <= 2.22,Al,"2.11,2.22",> and <=
3,P4,Al > 2.22,Al,2.22,>
4,P5,Si <= 5.24,Si,5.24,<=
5,P6,Si > 5.24 and Si <= 5.64,Si,"5.24,5.64",> and <=
6,P7,Si > 5.64 and Si <= 5.94,Si,"5.64,5.94",> and <=
7,P8,Si > 5.94,Si,5.94,>
8,P9,P <= 0.33,P,0.33,<=
9,P10,P > 0.33 and P <= 0.36,P,"0.33,0.36",> and <=


In [59]:
# Função para avaliar se um valor satisfaz um predicado
def eval_predicate(value, thresholds, operator):
    if operator == "<=":
        return float(value <= float(thresholds))
    elif operator == ">":
        return float(value > float(thresholds))
    elif operator == "> and <=":
        t1, t2 = map(float, thresholds.split(","))
        return float((value > t1) and (value <= t2))
    else:
        return np.nan  # operador desconhecido

# Criação do novo DataFrame de indicadores de predicados
predicate_indicator_df = pd.DataFrame(index=zone_medians_df.index)

for _, row in predicates_df.iterrows():
    pred = row['predicate']
    zone = row['zone']
    thresholds = row['thresholds']
    operator = row['operator']
    predicate_indicator_df[pred] = zone_medians_df[zone].apply(lambda v: eval_predicate(v, thresholds, operator)).astype(int)

predicate_indicator_df

Unnamed: 0,P1,P2,P3,P4,P5,P6,P7,P8,P9,P10,...,P47,P48,P49,P50,P51,P52,P53,P54,P55,P56
0,1,0,0,0,0,0,0,1,0,0,...,0,0,0,1,0,0,0,0,1,0
1,0,1,0,0,0,0,0,1,0,0,...,0,0,1,0,0,0,1,0,0,0
2,1,0,0,0,0,0,0,1,1,0,...,0,0,1,0,0,0,0,1,0,0
3,0,1,0,0,0,1,0,0,0,1,...,0,1,0,0,0,1,1,0,0,0
4,0,1,0,0,0,0,0,1,0,1,...,0,0,0,1,0,0,0,1,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
143,0,1,0,0,0,1,0,0,0,0,...,0,1,0,0,0,1,0,1,0,0
144,1,0,0,0,0,0,1,0,0,0,...,0,0,0,1,0,0,0,1,0,0
145,1,0,0,0,0,0,1,0,0,1,...,1,0,0,0,1,0,0,1,0,0
146,1,0,0,0,1,0,0,0,1,0,...,0,0,1,0,0,0,1,0,0,0


In [63]:
# co-ocorrence matrix
co_occurrence_matrix = np.dot(predicate_indicator_df.T, predicate_indicator_df)
co_occurrence_matrix_df = pd.DataFrame(co_occurrence_matrix, index=predicate_indicator_df.columns, columns=predicate_indicator_df.columns)

## the co-occurrence matrix indicates how many samples satisfy each pair of predicates
## for example, if P1 and P2 co-occur in 10 samples, the value at (P1, P2) and (P2, P1) will be 10
## the principal diagonal indicates how many samples satisfy each individual predicate
## while the off-diagonal elements indicate co-occurrence counts

In [64]:
# plotly backend does not support kind='heatmap', so use imshow
co_occurrence_matrix_df.plot(kind='imshow', title='Co-Occurrence Matrix of Predicates', color_continuous_scale='Viridis')

In [68]:
import networkx as nx

import plotly.graph_objects as go

# Create a graph from the co-occurrence matrix
G = nx.Graph()

# Add nodes (predicates)
for pred in predicates_df['predicate']:
    G.add_node(pred)

# Add edges with weights from off-diagonal elements of co-occurrence matrix
for i, pred_i in enumerate(co_occurrence_matrix_df.index):
    for j, pred_j in enumerate(co_occurrence_matrix_df.columns):
        if i < j:  # Only upper triangle to avoid duplicate edges
            weight = co_occurrence_matrix_df.iloc[i, j]
            if weight > 0:  # Only add edges with positive co-occurrence
                G.add_edge(pred_i, pred_j, weight=weight)

# Get positions for nodes using spring layout
pos = nx.spring_layout(G, k=0.5, iterations=50, seed=42)

# Extract edge information
edge_x = []
edge_y = []
edge_weights = []
for edge in G.edges():
    x0, y0 = pos[edge[0]]
    x1, y1 = pos[edge[1]]
    edge_x.extend([x0, x1, None])
    edge_y.extend([y0, y1, None])
    edge_weights.append(G[edge[0]][edge[1]]['weight'])

# Create edge trace
edge_trace = go.Scatter(
    x=edge_x, y=edge_y,
    line=dict(width=0.5, color='#888'),
    hoverinfo='none',
    mode='lines')

# Extract node information
node_x = []
node_y = []
node_text = []
for node in G.nodes():
    x, y = pos[node]
    node_x.append(x)
    node_y.append(y)
    # Add rule information to node text
    rule = predicates_df[predicates_df['predicate'] == node]['rule'].values[0]
    node_text.append(f"{node}<br>{rule}")

# Create node trace
node_trace = go.Scatter(
    x=node_x, y=node_y,
    mode='markers+text',
    text=[node for node in G.nodes()],
    textposition="top center",
    hovertext=node_text,
    hoverinfo='text',
    marker=dict(
        showscale=True,
        colorscale='YlGnBu',
        size=10,
        color=[G.degree(node) for node in G.nodes()],
        colorbar=dict(
            thickness=15,
            title=dict(text='Node Degree', side='right'),
            xanchor='left'
        ),
        line_width=2))

# Create figure
fig = go.Figure(data=[edge_trace, node_trace],
             layout=go.Layout(
                title=dict(text='Predicate Co-occurrence Network', font=dict(size=16)),
                showlegend=False,
                hovermode='closest',
                margin=dict(b=20,l=5,r=5,t=40),
                xaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
                yaxis=dict(showgrid=False, zeroline=False, showticklabels=False))
                )

fig.show()