# AGLAE PIGE data treatment

In [None]:
%pip install pandas ipywidgets xlrd matplotlib openpyxl --user

In [None]:
import ipywidgets as widgets
from ipywidgets import interact, interactive, fixed, interact_manual
import io
import pandas as pd
from pandas.api.typing import DataFrameGroupBy
import matplotlib.pyplot as plt
import numpy as np
import matplotlib.font_manager

SMALL_SIZE = 10
MEDIUM_SIZE = 14
BIGGER_SIZE = 16

plt.rc('font', size=BIGGER_SIZE)          # controls default text sizes
plt.rc('axes', titlesize=BIGGER_SIZE)     # fontsize of the axes title
plt.rc('axes', labelsize=BIGGER_SIZE)    # fontsize of the x and y labels
plt.rc('xtick', labelsize=MEDIUM_SIZE)    # fontsize of the tick labels
plt.rc('ytick', labelsize=MEDIUM_SIZE)    # fontsize of the tick labels
plt.rc('legend', fontsize=BIGGER_SIZE)    # legend fontsize
plt.rc('figure', titlesize=MEDIUM_SIZE)  # fontsize of the figure title


# Upload de fichier

In [None]:
uploader_report = widgets.FileUpload(description="rapport d'analyses")
uploader_pige_sodium = widgets.FileUpload(description="données PIGE Na", accept=".txt", style={'description_width': 'initial'})
uploader_pixe = widgets.FileUpload(description="données PIXE")
has_bore_w = widgets.Checkbox(
    value=False,
    description='A du bore ?',
    disabled=False,
    indent=False
)

display(uploader_report)
display(uploader_pige_sodium)

# TODO: ajouter autre bouton quand y'a du bore
display(has_bore_w)
display(uploader_pixe)

In [None]:
print(uploader_report.value)
print(uploader_pige_sodium.value)
print(uploader_pixe.value)

# l'index des trois tableaux sera le nom du fichier (sans extension)
report = pd.read_excel(io.BytesIO(uploader_report.value[0].content), index_col=0)
pige_table = pd.read_csv(io.BytesIO(uploader_pige_sodium.value[0].content), sep="\t", index_col=0)
# on enleve "g7" pour avoir le meme index dans tous les tableaux pour facilement faire des jointures
pige_table.index = pige_table.index.str.replace('.g70', '', regex=False)
pige_table.index = pige_table.index.str.replace('.g7', '', regex=False)
pixe_table = pd.read_excel(io.BytesIO(uploader_pixe.value[0].content),sheet_name="S_Conc. ppm", header=1, index_col=0)

print(pixe_table.columns)


# On garde les tableaux avec uniquement les standards

In [None]:
pige_table["A/D"] = pige_table["Aire Gaussien"]/report["Dose"]

stds = pd.DataFrame(
    {
         "Ref Objet": ["BrillA", "BrillB", "BrillC", "BrillD", "BG4", "BG3"],
         "Conc theorique": [143900, 172600, 12000, 10700, 50000, 75000],
    }
)
conc_stds_sans_plomb = stds[stds["Ref Objet"].isin(["BrillA","BrillB","BrillD"])]
conc_stds_avec_plomb = stds[stds["Ref Objet"].isin(["BrillC","BG4","BG3"])]

report_stds = report[report["Ref Objet"].isin(stds["Ref Objet"])]

# on veut prendre que les standards dans le nom du fichier, on fait ca en appelant contains en mode regex avec tous les standards (séparés par des "ou", i.e. la barre '|')
pige_stds = pige_table[pige_table.index.str.contains("|".join(stds["Ref Objet"]), regex=True)].copy()

# On vérifie que le nombre de standards dans le rapport et dans PIGE est le même
print(f"On a {len(report_stds)} standards dans le rapport d'analyses")
print(f"On a {len(pige_stds)} standards dans les données PIGE Na")

# Droite de calibration

- On trace la droite de calibration PIGE $\Large c = k \times \frac{A}{D}$ avec:
  - $c$ la concentration théorique du sodium dans les standards
  - $k$ la pente déterminée par méthode des moindres carrés ordinaires.
  - $A$ l'aire gaussienne
  - $D$ la dose


On définit d'abord la fonction CALIB

In [None]:
# Fonction qui va afficher les courbes de calibration et son fit linéaire par méthode des moindres carrés ordinaires
def calib(titre, ads, conc, axe):
    axe.set_title(titre)
    axe.set_xlabel("Aire / Dose (u.a.)")
    axe.set_ylabel("Concentration théorique (ppm)")
    axe.grid(True)
    colors = ["blue", "red"]
    coefs = []
    align = ["bottom", "top"]
    
    # On affiche toutes les droites de calibration
    for i, df in enumerate(ads):
        # il peut y avoir plusieures prises de standards, on les aggrège dans une liste
        ad_stds = df.groupby("Ref Objet")["A/D"].agg(list)
        
        # Au lieu de fitter chaque standard séparément,
        # on va collecter tous les points (x, y) de ce DataFrame
        all_x = []
        all_y = []
        for std, x_vals in ad_stds.items():
            # x est déjà une liste, on passe en array NumPy
            x_vals = np.array(x_vals, dtype=float)
            
            # on ne prend que les valeurs theoriques pour lesquelles on a fait une prise experimentale
            subset_conc = conc[conc["Ref Objet"] == std]["Conc theorique"]
            if subset_conc.empty:
                # Cas où la référence std n'existe pas dans conc
                print(f"Attention : std '{std}' pas trouvé dans conc.")
                continue

            y_val = subset_conc.iloc[0]
            y_vals = np.full_like(x_vals, y_val)
            axe.plot(x_vals, y_vals, '.', label=f"{std} {'init' if i == 0 else 'fin'}", markersize=12, color=colors[i % len(colors)]) 
            
            # On stocke les points dans nos listes globales
            all_x.extend(x_vals)
            all_y.extend(y_vals)

        #display(titre, "X", all_x, "Y", all_y)
        # Si on a recueilli au moins 2 points pour faire un fit
        if len(all_x) > 1:
            all_x = np.array(all_x, dtype=float)
            all_y = np.array(all_y, dtype=float)

            # Régression forcée à l'origine : y = a * x
            # => X = all_x.reshape(-1,1), Y = all_y
            a_coef, _, _, _ = np.linalg.lstsq(all_x.reshape(-1, 1), all_y, rcond=None)
            a = a_coef[0]
            coefs.append(a)

            # Calcul du y_pred
            y_pred = a * all_x

            ## Calcul du R² classique
            #ss_res = np.sum((all_y - y_pred)**2)
            #ss_tot = np.sum((all_y - all_y.mean())**2)
            #if ss_tot == 0:
             #   r_squared = np.nan
            #else:
             #   r_squared = 1 - (ss_res / ss_tot)

            # Calcul du R² no-intercept (style Excel)
            ss_res = np.sum((all_y - y_pred)**2)
            ss_tot_no_intercept = np.sum(all_y**2)
            r_squared = 1 - ss_res/ss_tot_no_intercept

            # Tracé de la droite
            x_sorted = np.sort(all_x)
            y_pred_sorted = a * x_sorted

            axe.plot(
                x_sorted, 
                y_pred_sorted, 
                linestyle="--",
                color=colors[i % len(colors)],
                #label=f"Droite {i}: y={a:.2f}x\nR²={r_squared:.3f}"
            )

            x_min, x_max = np.min(all_x), np.max(all_x)
            # Calcul du point "milieu" pour y mettre le texte
            mid_x = 0.5 * (x_min + x_max) - 0.01
            mid_y = a * mid_x
        
            # Angle de la pente en degrés (pour faire pivoter le texte)
            #angle = np.degrees(a) / 1000000
            angle = np.degrees(np.arctan(a))
            
            # Ajout du texte sur la droite
            axe.text(
                mid_x, mid_y,               # Coordonnées où placer le texte
                f"y = {a:.2f} x\nR²={r_squared:.4f}",           # Contenu du texte (équation)
                rotation=angle,            # Rotation pour suivre la pente
                ha="left", va=align[i % len(align)],  # Alignement horizontal/vertical du texte
                rotation_mode="anchor",
                transform_rotates_text=True,
                fontsize=BIGGER_SIZE,
            )
                
    axe.legend(loc='best', fontsize=10)
    return coefs

Pour chaque jour d'expériences, on trace les différentes droites de calibration pour les standards avec ou sans plomb, et en mettant sur le même plot la droite du matin et celle du soir.
On corrige la déviation des droites de calibration entre matin et soir par une régression linéaire en fonction de l'heure de mesure.
Pour cela on fait la moyenne des durées d'acquisition des standards avec ou sans plomb pour obtenir $t1$ (matin) et $t2$ (soir)
Puis on calcule $k(t) = \frac{k2-k1}{t2-t1}$ avec $k1$ et $k2$ la pente extraite des droites de calibration le matin et le soir

In [None]:
# Fonction qui sert à convertir la durée d'acquisition des standards en secondes 
def timestamp_total_seconds(ts):
    return  ts.hour * 3600 + ts.minute * 60 + ts.second
    

In [None]:
# On fusionne les trois documents Rapport, PIGE et PIXE 
report_pige_pixe = report.join([pige_table,pixe_table])

# On calcule le nombre de lignes dans la planche de plot (nombre de jours de manips)
report_par_jour = report_pige_pixe.groupby(report["Time"].dt.date)

# On trace la planche pour les plots 
fig, axes = plt.subplots(nrows=len(report_par_jour), ncols=2, figsize=(30,30))

# On convertit les valeurs de la colonne PbO en type non convertible et on affiche NaN si pas possible
report_pige_pixe["PbO"] = pd.to_numeric(report_pige_pixe["PbO"], errors = "coerce")

# On vérifie la longueur des dataframes
print("Taille de report_pige_pixe:", len(report_pige_pixe))

sodium_conc_dfs = []

# On fait une boucle for pour parcourir le rapport jour par jour et extraire les standards avec ou sans plomb
for i, (day, group) in enumerate(report_par_jour):
    time_sans_plomb = []
    time_avec_plomb = []

    # On sépare les standards avec ou sans plomb
    stds_sans_plomb = group[group["Ref Objet"].isin(["BrillA","BrillB","BrillD"])]
    stds_avec_plomb = group[group["Ref Objet"].isin(["BrillC","BG4","BG3"])]
    
    
    # On sépare les standards avant et après midi
    am_data_sans_plomb = stds_sans_plomb[stds_sans_plomb["Time"].dt.hour < 12]
    am_data_avec_plomb = stds_avec_plomb[stds_avec_plomb["Time"].dt.hour < 12]

    pm_data_sans_plomb = stds_sans_plomb[stds_sans_plomb["Time"].dt.hour >= 12]
    pm_data_avec_plomb = stds_avec_plomb[stds_avec_plomb["Time"].dt.hour >= 12]
    
        
    # On trace les droite de calibration et leur fit avec l'équation et le coefficient R
    coefs_sans_plomb = calib(f"Courbe de calibration Na PIGE sans Plomb {day}", [am_data_sans_plomb, pm_data_sans_plomb], conc_stds_sans_plomb, axes[i, 0])
    coefs_avec_plomb = calib(f"Courbe de calibration Na PIGE avec Plomb {day}", [am_data_avec_plomb, pm_data_avec_plomb], conc_stds_avec_plomb, axes[i, 1])

    # On calcule la moyenne des temps de mesure des standards avec ou sans plomb, matin et soir
    
    t_moyen_sans_plomb_init = am_data_sans_plomb["Time"].apply(timestamp_total_seconds).mean()                   
    t_moyen_avec_plomb_init = am_data_avec_plomb["Time"].apply(timestamp_total_seconds).mean()

    t_moyen_sans_plomb_fin = pm_data_sans_plomb["Time"].apply(timestamp_total_seconds).mean()                   
    t_moyen_avec_plomb_fin = pm_data_avec_plomb["Time"].apply(timestamp_total_seconds).mean()
    
   # on crée deux dataframe pour stocker les valeurs des temps moyens et des pentes calculées par calib, avec ou sans plomb
    times_init =  pd.Series({
        "sans plomb": t_moyen_sans_plomb_init, 
    })
    coefs_init =  pd.Series({
        "sans plomb": coefs_sans_plomb[0],
    })       
    times_fin = pd.Series({
        "sans plomb": t_moyen_sans_plomb_fin, 
    })
    coefs_fin = pd.Series({
        "sans plomb": coefs_sans_plomb[-1],
    })

    if len(coefs_avec_plomb) == 2:
        # cas ou on a une droite de calibration matin et soir pour standards avec plomb

        # on rajoute les valeurs avec plomb matin et soir
        times_init["avec plomb"] = t_moyen_avec_plomb_init
        coefs_init["avec plomb"] = coefs_avec_plomb[0]
        times_fin["avec plomb"] = t_moyen_avec_plomb_fin
        coefs_fin["avec plomb"] = coefs_avec_plomb[-1]
    
    # Calcul du coefficient k de régression linéaire de correction d'heures
    t_diff = times_fin - times_init
    coefs_diff = coefs_fin - coefs_init

    k = coefs_diff / t_diff
    k_t_sans_plomb = k["sans plomb"] * (group["Time"].apply(timestamp_total_seconds) - times_fin["sans plomb"]) + coefs_fin["sans plomb"]
    sodium_conc_day_df = pd.DataFrame({
        "Ref Objet": group["Ref Objet"],
        "PbO": group["PbO"], 
        "Time": group["Time"],
        "A/D": group["Aire Gaussien"]/group["Dose"],
        "k1": coefs_init["sans plomb"],
        "k2": coefs_fin["sans plomb"],
        "t1": times_init["sans plomb"],
        "t2": times_fin["sans plomb"],
        "k(t) sans plomb": k_t_sans_plomb,
    })

    if len(coefs_avec_plomb) == 2:
        k_t_avec_plomb = k["avec plomb"] * (group["Time"].apply(timestamp_total_seconds) - times_fin["avec plomb"]) + coefs_fin["avec plomb"]
        sodium_conc_day_df["k(t) avec plomb"] = k_t_avec_plomb
        sodium_conc_day_df["Na2O PIGE"] = np.where(
            group["PbO"] < 50000,
            k_t_sans_plomb * group["A/D"],
            k_t_avec_plomb * group["A/D"]
        )
    elif len(coefs_avec_plomb) == 1:
        # cas ou on a une droite de calibration matin ou soir pour standards avec plomb, on prend la pente de la seule droite
        sodium_conc_day_df["Na2O PIGE"] = np.where(
            group["PbO"] < 50000,
            k_t_sans_plomb * group["A/D"],
            coefs_avec_plomb[0] * group["A/D"]
        )
    else:
        # cas ou on a que des points (pas de droite) matin ou soir pour standards avec plomb
        # on calcule le produit en croix avec la concentration et A/D de BG3 (standard avec la concentration en plomb la plus haute)
        conc_bg3 = stds[stds["Ref Objet"] == "BG3"]["Conc theorique"].item()
        daily_ad_bg3 = group[group["Ref Objet"] == "BG3"]["A/D"].item()
        
        sodium_conc_day_df["Na2O PIGE"] = np.where(
            group["PbO"] < 50000,
            k_t_sans_plomb * group["A/D"],
            conc_bg3 * group["A/D"] / daily_ad_bg3
        )
    


    sodium_conc_dfs.append(sodium_conc_day_df)

sodium_conc_df = pd.concat(sodium_conc_dfs)
sodium_conc_df

In [None]:
# ajouter pixe ref objet dans l'index
pixe_table = pixe_table.set_index('Pixe Ref Objet', append=True)

In [None]:
# l'index est un tuple dans pixe_table donc on extrait que les valeurs de Na_conc_df
pixe_table["Na2O PIGE"] = sodium_conc_df["Na2O PIGE"].values
pixe_Na2O = pixe_table["Na2O"]
pixe_table = pixe_table.drop(columns=["Na2O"])
pige_corrected_compositions_table = pixe_table
pige_corrected_compositions_table.head(20)

In [None]:
pd.set_option("display.float_format", "{:.2f}".format)

# Normalisation du fichier PIXE PIGE

In [None]:
# Fonction qui permet de normaliser par rapport au Na2O PIGE

def normalize(row: pd.Series):
    cleaned_row = row.map(lambda s: pd.to_numeric(s, downcast="integer", errors="coerce"))
    sodium_pige = cleaned_row.loc["Na2O PIGE"]
    total_comp_without_Na = 1000000 - sodium_pige
    row_without_Na = cleaned_row.drop("Na2O PIGE")
    sum_comp_without_Na = row_without_Na.sum()
    return pd.concat([pd.Series(sodium_pige, index=["Na2O PIGE"]), row_without_Na.map(lambda x: x * total_comp_without_Na / sum_comp_without_Na)])

normalized_compositions_table = pige_corrected_compositions_table.apply(normalize, axis=1)
normalized_compositions_table.head(20)

# Test de la somme des lignes = 100%

In [None]:
normalized_compositions_table.sum(axis=1)

In [None]:
def extract_simplified_name(row, group_standards=True):
    name = row[1].strip().replace(" ", "_")
    name = row[1].strip().replace("-", "_")
    
    if "_" in name:
        split = name.rsplit('_', 1)
        return split[0] if "pt" in split[1] or "map" in split[1] else name
    elif not group_standards and (stds["Ref Objet"] == name).any():
        return row[0]
    else:
        return name

x_widget = widgets.FloatSlider(min=0.2, max=3.0, step=0.1)
@interact(df=fixed(normalized_compositions_table), tolerance=x_widget, threshold=fixed(10000))
def remove_outliers(df: pd.DataFrame, tolerance, threshold=10000):
    """
    Supprime les valeurs aberrantes (outliers) d'un DataFrame basé sur des critères de tolérance et de seuil.

    Cette fonction identifie les lignes du DataFrame `df` où les valeurs dépassent un certain seuil (`threshold`)
    et calculent la déviation par rapport à la médiane au sein de chaque groupe simplifié.
    Si la déviation relative dépasse la `tolerance`, la ligne est marquée pour suppression.

    Args:
        df (pd.DataFrame): Le DataFrame contenant les données à analyser.
        tolerance (float): Le seuil de tolérance pour déterminer si une valeur est un outlier. Les valeurs dont
                           la déviation relative par rapport à la médiane dépasse cette tolérance seront considérées comme
                           des outliers.
        threshold (float, optional): Le seuil minimal que doit dépasser une valeur dans `normalized_compositions_table`
                                     pour être prise en compte dans l'analyse. Par défaut à 10000.

    Returns:
        list: Une liste des indices des lignes du DataFrame `df` qui contiennent des valeurs aberrantes.
        ```

    """
    to_delete=[]
    major_group = df[df > threshold].groupby(lambda x: extract_simplified_name(x, True), sort=False)
    for name, grp in major_group:
        # on évite les standards
        if grp.count().max() <= 1 or (stds["Ref Objet"] == name).any():
            continue
        
        median = grp.median()
        d = (grp - median) / median
        for i, row in d.iterrows():
            for j, elt in row[row > tolerance].items():
                print(i, j)
                to_delete.append(i)
                
    return to_delete

#normalized_compositions_table.loc[to_delete[0]]
#df_group.filter(detect_outliers)
#pager(res,5,10)
#pager(comp_table_without_outliners,90,10)
#for idx in to_delete:
#    print(idx[2])
tolerance_final_w = widgets.FloatText(
    value=0.5,
    description='tolerance finale:',
    disabled=False
)
display(tolerance_final_w)

In [None]:
#display(remove_outliners(normalized_compositions_table, tolerance=3.0))
table_without_outliers = normalized_compositions_table.drop(remove_outliers(normalized_compositions_table, tolerance=tolerance_final_w.value))
table_without_outliers

In [None]:
final_table_ppm = table_without_outliers.groupby(lambda x: extract_simplified_name(x, False), sort=False).agg(["mean", "std"])
final_table_ppm_without_sigma = table_without_outliers.groupby(lambda x: extract_simplified_name(x, False), sort=False).mean()
final_table_ppm_without_sigma.tail(50)

In [None]:
major_elements_list = ["Na2O PIGE", "MgO", "Al2O3", "SiO2", "P2O5", "SO3", "Cl", "K2O", "CaO", "TiO2","V2O3", "Cr2O3", "MnO", "Fe2O3", "CoO", "NiO", "CuO", "ZnO", "As2O5", "SrO", "Ag2O", "SnO2", "Sb2O5", "BaO", "Au2O3", "PbO"]

In [None]:
final_table_ppm_major_elements = final_table_ppm[major_elements_list]
final_table_ppm_major_elements.head(5)

In [None]:
final_table_wt = (final_table_ppm / 10000).round(3)
final_table_wt_without_sigma = (final_table_ppm_without_sigma / 10000).round(4)
final_table_wt_without_sigma.head(5)

In [None]:
final_table_wt_major_elts = final_table_wt[major_elements_list]

In [None]:
from datetime import datetime

d = datetime.today().strftime("%Y%m%d%H%M")
with pd.ExcelWriter(f"{d}_emma_final.xlsx") as writer:
    final_table_ppm.map(lambda x: f"{x:.0f}").to_excel(writer, sheet_name="ppm all elements")
    final_table_ppm_major_elements.map(lambda x: f"{x:.0f}").to_excel(writer, sheet_name="ppm major elements")
    final_table_ppm_without_sigma.map(lambda x: f"{x:.0f}").to_excel(writer, sheet_name="ppm without sigma")
    final_table_wt.to_excel(writer, sheet_name="wt all elements")
    final_table_wt_major_elts.to_excel(writer, sheet_name="wt major elements")
    final_table_wt_without_sigma.to_excel(writer, sheet_name="wt without sigma")