In [None]:
import sys
from pathlib import Path

import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import spectrochempy as scp
import xarray as xr
from matplotlib import gridspec, ticker
from matplotlib.colorbar import Colorbar
from matplotlib.colors import LinearSegmentedColormap, ListedColormap
from scipy.optimize import minimize
from spectrochempy import ur

In [None]:
# Ensure custom module Path is set before import
sys.path.append(r"D:\CHENG\OneDrive - UAB\ICMAB-Python\Figure")
from colors import tol_cmap, tol_cset  # type: ignore

# 画图的初始设置
scp.plot_preferences.style = r'classic'
plt.style.use(r"D:\CHENG\OneDrive - UAB\ICMAB-Python\Figure\liuchzzyy.mplstyle")
# print(plt.style.available)

# xarray setting
xr.set_options(
    cmap_sequential="viridis",
    cmap_divergent="viridis",
    display_width=150,
)  # viridis, gray

# 颜色设定
colors = tol_cset("vibrant")
if colors is not None:
    colors = list(colors)
if r"sunset" not in plt.colormaps():
    cmap = tol_cmap("sunset")
    if isinstance(cmap, LinearSegmentedColormap):
        plt.colormaps.register(cmap)
if r"rainbow_PuRd" not in plt.colormaps():
    cmap = tol_cmap("rainbow_PuRd")
    if isinstance(cmap, LinearSegmentedColormap):
        plt.colormaps.register(cmap)  # 备用 plasma

# 输出的文件夹
path_out = Path(r"C:\Users\chengliu\Desktop\Figure")

# Set math font
mpl.rcParams["mathtext.fontset"] = "custom"
mpl.rcParams["mathtext.rm"] = "Arial"
mpl.rcParams["mathtext.it"] = "Arial:italic"
mpl.rcParams["mathtext.bf"] = "Arial:bold"
mpl.rcParams["mathtext.sf"] = "Arial"
mpl.rcParams["mathtext.tt"] = "Arial"
mpl.rcParams["mathtext.cal"] = "Arial"
mpl.rcParams["mathtext.default"] = "regular"

### EXAFS 的数据

In [None]:
# 读取 reference + operando 数据
path_file = Path(r"D:\CHENG\OneDrive - UAB\ICMAB-Data\Zn-Mn\PaperDos\XAS\Operando\αMnO2\XAS\CLAESS_2022-10\Results\cell2")

data1 = pd.read_csv(
    Path.joinpath(path_file, r"Overview", r"cell2_opXAS_P2b_Mn_Oct2022_1.chir2_mag"),
    sep=r"\s+",
    index_col=0,
    header=None,
    comment="#",
)
data2 = pd.read_csv(
    Path.joinpath(path_file, r"Overview", r"cell2_opXAS_P2b_Mn_Oct2022_2.chir2_mag"),
    sep=r"\s+",
    index_col=0,
    header=None,
    comment="#",
)
exafs_Mn = pd.concat([data1, data2.iloc[:, 4:]], axis=1, ignore_index=True)  # noqa: N816
charge_index = [0, 1, 2, 3, *list(range(13, exafs_Mn.shape[1], 1))]
exafs_Mn = exafs_Mn.iloc[:, charge_index]  # noqa: N816
pdf_Mn = exafs_Mn.iloc[:, 0:4]  # noqa: N816
pdf_Mn.columns = [
    # r"Energy",
    r"0.2M_MnSO4(aq.)",
    r"alpha_MnO2_Electrode",
    r"alpha_MnO2_Powder",
    r"FC1st",
]
opData_Mn = exafs_Mn.iloc[:, list(range(4, exafs_Mn.shape[1]))]  # noqa: N816
opData_Mn.columns = list(range(0, opData_Mn.shape[1], 1))

In [None]:
# 剔除不好的谱线， Mn
opData_Mn = opData_Mn.drop(columns=opData_Mn.columns[1], axis=1)  # noqa: N816

In [None]:
opData_Mn.plot.line(legend=False, xlim=(0.5, 6.0), cmap="coolwarm")

In [None]:
# 创建一个汇总 DataFrame，显示每个样本在各个区间的最大值信息
intervals = [(1.0, 2.0), (2.0, 2.5), (2.8, 3.3)]
interval_names = ['[1.0, 2.0]', '[2.0, 2.5]', '[2.8, 3.3]']

# 准备汇总数据
summary_data = []

for i, (start, end) in enumerate(intervals):
    mask = (opData_Mn.index >= start) & (opData_Mn.index <= end)
    interval_data = opData_Mn[mask]

    if len(interval_data) == 0:
        continue

    for _, col in enumerate(interval_data.columns):
        max_idx = interval_data[col].idxmax()
        max_intensity = interval_data[col].max()

        summary_data.append({
            'Interval': interval_names[i],
            'Number': col,
            'Position': round(max_idx, 3),
            'Intensity': round(max_intensity, 3)
        })

# 创建DataFrame
summary_df = pd.DataFrame(summary_data)

# 显示前20行作为示例
print("各区间最大值汇总表 (前20行):")
print("="*60)
print(summary_df.head(20).to_string(index=False))

# 保存完整结果到CSV (可选)
summary_df.to_csv(path_out.joinpath(r"exafs_intensity_position.csv"), index=False, header=True)
print(f"\n总共分析了 {len(summary_df)} 个数据点（{len(intervals)} 个区间 × {len(opData_Mn.columns)} 个样本）")

### 挣扎，找第三个物相，PCA filtered 数据, Fixed Energy Cut -LCF

In [None]:
# 读取 reference + operando 数据
path_file = Path(r"D:\CHENG\OneDrive - UAB\ICMAB-Data\Zn-Mn\PaperDos\XAS\Operando\αMnO2\XAS\CLAESS_2022-10\Results\cell2")

data1 = pd.read_csv(
    Path.joinpath(path_file, r"Overview", r"cell2_opXAS_P2b_Mn_Oct2022_1.nor"),
    sep=r"\s+",
    index_col=None,
    header=None,
    comment="#",
)
data2 = pd.read_csv(
    Path.joinpath(path_file, r"Overview", r"cell2_opXAS_P2b_Mn_Oct2022_2.nor"),
    sep=r"\s+",
    index_col=None,
    header=None,
    comment="#",
)
xas_Mn = pd.concat([data1, data2.iloc[:, 5:]], axis=1, ignore_index=True)  # noqa: N816
charge_index = [0, 1, 2, 3, 4, *list(range(14, xas_Mn.shape[1], 1))]
xas_Mn = xas_Mn.iloc[:, charge_index]  # noqa: N816
pdf_Mn = xas_Mn.iloc[:, 0:5]  # noqa: N816
pdf_Mn.columns = [
    r"Energy",
    r"0.2M_MnSO4(aq.)",
    r"alpha_MnO2_Electrode",
    r"alpha_MnO2_Powder",
    r"FC1st",
]
opData_Mn = xas_Mn.iloc[:, list(range(5, xas_Mn.shape[1]))]  # noqa: N816
opData_Mn.columns = list(range(0, opData_Mn.shape[1], 1))

# # Zn
# data1 = pd.read_csv(
#     Path.joinpath(path_file, r"Overview", r"cell2_opXAS_P2b_Zn_Oct2022_1.nor"),
#     sep=r"\s+",
#     index_col=None,
#     header=None,
#     comment="#",
# )
# data2 = pd.read_csv(
#     Path.joinpath(path_file, r"Overview", r"cell2_opXAS_P2b_Zn_Oct2022_2.nor"),
#     sep=r"\s+",
#     index_col=None,
#     header=None,
#     comment="#",
# )
# xas_Zn = pd.concat([data1, data2.iloc[:, 3:]], axis=1, ignore_index=True)  # noqa: N816
# charge_index = [0, 1, 2, *list(range(13, xas_Zn.shape[1], 1))]
# xas_Zn = xas_Zn.iloc[:, charge_index]  # noqa: N816

# pdf_Zn = xas_Zn.iloc[:, 0:3]  # noqa: N816
# pdf_Zn.columns = [
#     r"Energy",
#     r"0.5M_ZnSO4(aq.)",
#     r"ZSH"]
# opData_Zn = xas_Zn.iloc[:, list(range(3, xas_Zn.shape[1]))]  # noqa: N816
# opData_Zn.columns = list(range(0, opData_Zn.shape[1], 1))

# LCF 数据
lcf_mn_path = Path(r"D:\CHENG\OneDrive - UAB\ICMAB-Data\Zn-Mn\PaperDos\XAS\Operando\αMnO2\XAS\CLAESS_2022-10\Results\cell2\FC1st-FD2nd\Oct2022_cell3_P2b\XANES\Mn\LCF")
lcf_mn = pd.read_csv(lcf_mn_path.joinpath(r"cell2_1stFC_opXAS_P2b_Mn_LCF.csv"), sep=r",", header=0, comment="#", skiprows=1).iloc[:, [6, 10]]

In [None]:
# # 截取数据范围，应该不需要
# opData_Mn = opData_Mn[(pdf_Mn[r"Energy"] >= 6530) & (pdf_Mn[r"Energy"] <= 6630)].reset_index(drop=True)  # noqa: N816
# pdf_Mn = pdf_Mn[(pdf_Mn[r"Energy"] >= 6530) & (pdf_Mn[r"Energy"] <= 6630)].reset_index(drop=True)  # noqa: N816

In [None]:
mn_lcf1 = opData_Mn - np.outer(pdf_Mn["0.2M_MnSO4(aq.)"], lcf_mn.iloc[:, 0])  # /lcf_mn.iloc[:, 1]
mn_lcf1 = mn_lcf1.dropna(how="all", axis=1)

pd.concat([pdf_Mn.iloc[:, 0], mn_lcf1], axis=1, ignore_index=True).to_csv(path_out.joinpath(r"RemoveLiquid_Mn.csv"), index=False)
pdf_Mn.to_csv(path_out.joinpath(r"pdf_Mn.csv"), index=False)

In [None]:
# 重新读取数据
path_file = Path(r"D:\CHENG\OneDrive - UAB\ICMAB-Data\Zn-Mn\PaperDos\XAS\Operando\αMnO2\XAS\CLAESS_2022-10\Results\cell2\FC1st-FD2nd\Oct2022_cell3_P2b\XANES\Mn\ThirdPhase_LCF")

xas_Mn = pd.read_csv(  # noqa: N816
    Path.joinpath(path_file, r"RemoveLiquid.nor"),
    sep=r"\s+",
    index_col=None,
    header=None,
    comment="#",
)

pdf_Mn = xas_Mn.iloc[:, 0:5]  # noqa: N816
pdf_Mn.columns = [
    r"Energy",
    r"0.2M_MnSO4(aq.)",
    r"alpha_MnO2_Electrode",
    r"alpha_MnO2_Powder",
    r"FC1st",
]
opData_Mn = xas_Mn.iloc[:, list(range(5, xas_Mn.shape[1]))]  # noqa: N816
opData_Mn.columns = list(range(0, opData_Mn.shape[1], 1))

x = pdf_Mn.iloc[:, 0]
y = opData_Mn.iloc[:, 0]
opData_Mn_1st = opData_Mn.apply(lambda col: np.gradient(col, x))  # noqa: N816
opData_Mn_2nd = opData_Mn.sub(y, axis=0)  # noqa: N816
opData_Mn_3rd = opData_Mn.diff(axis=1)  # noqa: N816

In [None]:
# 画图
# gridspec inside gridspec
plt.close("all")
%matplotlib inline
fig = plt.figure(figsize=(7.0, 5))
gs = gridspec.GridSpec(2, 2, width_ratios=[1, 1], height_ratios=None, wspace=0, hspace=0, figure=fig)

# 图 A
subfig = fig.add_subfigure(gs[0, 0])
ax = subfig.add_subplot()
ax.set_position((0.0, 0.0, 1, 1))
ax.set_box_aspect(0.8)

ax.imshow(
    mn_lcf1.T,
    aspect="auto",
    cmap="rainbow_PuRd",
    origin="lower",
    extent=[pdf_Mn[r"Energy"].min(), pdf_Mn[r"Energy"].max(), 0, mn_lcf1.shape[1]],
)
# 设置刻度线等格式
ax.set_xlim(6530, 6630)
ax.set_xlabel(r"Energy (eV)", fontsize=11)
ax.xaxis.set_major_locator(ticker.MultipleLocator(base=20, offset=10))
ax.xaxis.set_minor_locator(ticker.MultipleLocator(base=10, offset=10))
ax.title.set_text("LCF result")

# 图 B
subfig = fig.add_subfigure(gs[0, 1])
ax = subfig.add_subplot()
ax.set_position((0.1, 0.0, 1, 1))
ax.set_box_aspect(0.8)

ax.imshow(
    opData_Mn_1st.T,
    aspect="auto",
    cmap="rainbow_PuRd",
    origin="lower",
    extent=[pdf_Mn[r"Energy"].min(), pdf_Mn[r"Energy"].max(), 0, opData_Mn_1st.shape[1]],
)
# 设置刻度线等格式
ax.set_xlim(6530, 6630)
ax.set_xlabel(r"Energy (eV)", fontsize=11)
ax.xaxis.set_major_locator(ticker.MultipleLocator(base=20, offset=10))
ax.xaxis.set_minor_locator(ticker.MultipleLocator(base=10, offset=10))
ax.title.set_text("First derivative")

# 图 C
subfig = fig.add_subfigure(gs[1, 0])
ax = subfig.add_subplot()
ax.set_position((0.0, -0.3, 1, 1))
ax.set_box_aspect(0.8)

ax.imshow(
    opData_Mn_2nd.T,
    aspect="auto",
    cmap="rainbow_PuRd",
    origin="lower",
    extent=[pdf_Mn[r"Energy"].min(), pdf_Mn[r"Energy"].max(), 0, opData_Mn_2nd.shape[1]],
)
# 设置刻度线等格式
ax.set_xlim(6530, 6630)
ax.set_xlabel(r"Energy (eV)", fontsize=11)
ax.xaxis.set_major_locator(ticker.MultipleLocator(base=20, offset=10))
ax.xaxis.set_minor_locator(ticker.MultipleLocator(base=10, offset=10))
ax.title.set_text("Reference subtraction")

# 图 D
subfig = fig.add_subfigure(gs[1, 1])
ax = subfig.add_subplot()
ax.set_position((0.1, -0.3, 1, 1))
ax.set_box_aspect(0.8)

ax.imshow(
    opData_Mn_3rd.T,
    aspect="auto",
    cmap="rainbow_PuRd",
    origin="lower",
    extent=[pdf_Mn[r"Energy"].min(), pdf_Mn[r"Energy"].max(), 0, opData_Mn_3rd.shape[1]],
)
# 设置刻度线等格式
ax.set_xlim(6530, 6630)
ax.set_xlabel(r"Energy (eV)", fontsize=11)
ax.xaxis.set_major_locator(ticker.MultipleLocator(base=20, offset=10))
ax.xaxis.set_minor_locator(ticker.MultipleLocator(base=10, offset=10))
ax.title.set_text("Sequential subtraction")


# 保存图片
plt.savefig(
    Path.joinpath(path_out, r"Operando_Cell2_XAS_XANES_06_01_V0_300.tif"),
    transparent=False,
    pad_inches=0.2,
    bbox_inches="tight",
    dpi=300,
    pil_kwargs={"compression": "tiff_lzw"},
)

plt.gcf().set_facecolor("white")
plt.show()

In [None]:
# 重新读取数据2
path_file = Path(r"D:\CHENG\OneDrive - UAB\ICMAB-Data\Zn-Mn\PaperDos\XAS\Operando\αMnO2\XAS\CLAESS_2022-10\Results\cell2\FC1st-FD2nd\Oct2022_cell3_P2b\XANES\Mn\ThirdPhase_LCF")

xas_Mn = pd.read_csv(  # noqa: N816
    Path.joinpath(path_file, r"RemoveLiquid2.nor"),
    sep=r"\s+",
    index_col=None,
    header=None,
    comment="#",
)

pdf_Mn = xas_Mn.iloc[:, 0:5]  # noqa: N816
pdf_Mn.columns = [
    r"Energy",
    r"0.2M_MnSO4(aq.)",
    r"alpha_MnO2_Electrode",
    r"alpha_MnO2_Powder",
    r"FC1st",
]
opData_Mn = xas_Mn.iloc[:, list(range(5, xas_Mn.shape[1]))]  # noqa: N816
opData_Mn.columns = list(range(0, opData_Mn.shape[1], 1))

x = pdf_Mn.iloc[:, 0]
y = opData_Mn.iloc[:, 0]
opData_Mn_1st = opData_Mn.apply(lambda col: np.gradient(col, x))  # noqa: N816
opData_Mn_2nd = opData_Mn.sub(y, axis=0)  # noqa: N816
opData_Mn_3rd = opData_Mn.diff(axis=1)  # noqa: N816

In [None]:
# 画图
# gridspec inside gridspec
plt.close("all")
%matplotlib inline
fig = plt.figure(figsize=(7.0, 5))
gs = gridspec.GridSpec(2, 2, width_ratios=[1, 1], height_ratios=None, wspace=0, hspace=0, figure=fig)

# 图 A
subfig = fig.add_subfigure(gs[0, 0])
ax = subfig.add_subplot()
ax.set_position((0.0, 0.0, 1, 1))
ax.set_box_aspect(0.8)

ax.imshow(
    mn_lcf1.T,
    aspect="auto",
    cmap="rainbow_PuRd",
    origin="lower",
    extent=[pdf_Mn[r"Energy"].min(), pdf_Mn[r"Energy"].max(), 0, mn_lcf1.shape[1]],
)
# 设置刻度线等格式
ax.set_xlim(6530, 6630)
ax.set_xlabel(r"Energy (eV)", fontsize=11)
ax.xaxis.set_major_locator(ticker.MultipleLocator(base=20, offset=10))
ax.xaxis.set_minor_locator(ticker.MultipleLocator(base=10, offset=10))
ax.title.set_text("LCF result")

# 图 B
subfig = fig.add_subfigure(gs[0, 1])
ax = subfig.add_subplot()
ax.set_position((0.1, 0.0, 1, 1))
ax.set_box_aspect(0.8)

ax.imshow(
    opData_Mn_1st.T,
    aspect="auto",
    cmap="rainbow_PuRd",
    origin="lower",
    extent=[pdf_Mn[r"Energy"].min(), pdf_Mn[r"Energy"].max(), 0, opData_Mn_1st.shape[1]],
)
# 设置刻度线等格式
ax.set_xlim(6530, 6630)
ax.set_xlabel(r"Energy (eV)", fontsize=11)
ax.xaxis.set_major_locator(ticker.MultipleLocator(base=20, offset=10))
ax.xaxis.set_minor_locator(ticker.MultipleLocator(base=10, offset=10))
ax.title.set_text("First derivative")

# 图 C
subfig = fig.add_subfigure(gs[1, 0])
ax = subfig.add_subplot()
ax.set_position((0.0, -0.3, 1, 1))
ax.set_box_aspect(0.8)

ax.imshow(
    opData_Mn_2nd.T,
    aspect="auto",
    cmap="rainbow_PuRd",
    origin="lower",
    extent=[pdf_Mn[r"Energy"].min(), pdf_Mn[r"Energy"].max(), 0, opData_Mn_2nd.shape[1]],
)
# 设置刻度线等格式
ax.set_xlim(6530, 6630)
ax.set_xlabel(r"Energy (eV)", fontsize=11)
ax.xaxis.set_major_locator(ticker.MultipleLocator(base=20, offset=10))
ax.xaxis.set_minor_locator(ticker.MultipleLocator(base=10, offset=10))
ax.title.set_text("Reference subtraction")

# 图 D
subfig = fig.add_subfigure(gs[1, 1])
ax = subfig.add_subplot()
ax.set_position((0.1, -0.3, 1, 1))
ax.set_box_aspect(0.8)

ax.imshow(
    opData_Mn_3rd.T,
    aspect="auto",
    cmap="rainbow_PuRd",
    origin="lower",
    extent=[pdf_Mn[r"Energy"].min(), pdf_Mn[r"Energy"].max(), 0, opData_Mn_3rd.shape[1]],
)

# 设置刻度线等格式
ax.set_xlim(6530, 6630)
ax.set_xlabel(r"Energy (eV)", fontsize=11)
ax.xaxis.set_major_locator(ticker.MultipleLocator(base=20, offset=10))
ax.xaxis.set_minor_locator(ticker.MultipleLocator(base=10, offset=10))
ax.title.set_text("Sequential subtraction")


# 保存图片
plt.savefig(
    Path.joinpath(path_out, r"Operando_Cell2_XAS_XANES_06_02_V0_300.tif"),
    transparent=False,
    pad_inches=0.2,
    bbox_inches="tight",
    dpi=300,
    pil_kwargs={"compression": "tiff_lzw"},
)

plt.gcf().set_facecolor("white")
plt.show()

In [None]:
# 能量的切片
data_path = Path(r"D:\CHENG\OneDrive - UAB\ICMAB-Data\Zn-Mn\PaperDos\XAS\Operando\αMnO2\XAS\CLAESS_2022-10\Results\cell2\FC1st-FD2nd\Oct2022_cell3_P2b\XANES\Mn\ThirdPhase_LCF\RemoveLiquid3.xmu")

data = pd.read_csv(data_path, delim_whitespace=True, header=None, comment="#").set_index(0, drop=True)
data.index.name = "Energy (eV)"

In [None]:
# 截取多个能量值的数据并绘制在同一张图中
target_energies = [6548.69, 6553.0, 6557.0, 6565.43]

# 存储所有截取的数据
energy_data_dict = {}

for target_energy in target_energies:
    # 找到最接近的能量值
    closest_idx_pos = abs(data.index - target_energy).argmin()
    closest_idx = data.index[closest_idx_pos]
    # 获取该能量处的数据
    energy_data = data.loc[closest_idx, :]
    energy_data_dict[f"{closest_idx:.2f}"] = energy_data

# 保存所有截取的数据到一个CSV文件
combined_data = pd.DataFrame(energy_data_dict)
combined_data.to_csv(path_out.joinpath("multiple_energy_cuts_Mm.csv"))

# 画图
# gridspec inside gridspec
plt.close("all")
%matplotlib inline
fig = plt.figure(figsize=(7.0, 2.5))
gs = gridspec.GridSpec(1, 2, width_ratios=[1, 1], height_ratios=None, wspace=0, hspace=0, figure=fig)

# 图 A
subfig = fig.add_subfigure(gs[0, 0])
ax = subfig.add_subplot()
ax.set_position((0.0, 0.0, 1, 1))
ax.set_box_aspect(0.8)
xas_colors = ListedColormap(
    mpl.colormaps["coolwarm"](np.linspace(1.0, 0.0, data.shape[1])),
    name="xas_colors",
)

for m in range(data.shape[1]):
    ax.plot(data.index, data.iloc[:, m], label=f"{m+1}", color=xas_colors.colors[m], linewidth=1)

# 设置刻度线等格式
ax.set_xlim(6530, 6630)
ax.set_xlabel(r"Energy (eV)", fontsize=11)
ax.xaxis.set_major_locator(ticker.MultipleLocator(base=20, offset=10))
ax.xaxis.set_minor_locator(ticker.MultipleLocator(base=10, offset=10))
ax.set_ylim(-0.01, 1.6)
ax.set_ylabel(r"Absorbance (arb.u.)", fontsize=11)
ax.yaxis.set_major_locator(ticker.MultipleLocator(base=0.4, offset=0))
ax.yaxis.set_minor_locator(ticker.MultipleLocator(base=0.2, offset=0))
ax.tick_params(axis='both', which='both', direction='out', top=False, right=False)

# 图 B
subfig = fig.add_subfigure(gs[0, 1])
ax = subfig.add_subplot()
ax.set_position((0.1, 0.0, 1, 1))
ax.set_box_aspect(0.8)

for m in range(combined_data.shape[1]):
    temp = combined_data.iloc[:, m] / combined_data.iloc[:, m].max()
    ax.plot(
        temp,
        color=colors[m%len(colors)],
        linewidth=2,
        marker="o",
        markersize=4,
        label=f"{float(combined_data.columns[m]):.1f} eV",
    )

# 设置刻度线等格式
ax.set_xlabel(r"Numbers", fontsize=11)
ax.set_xlim(0, combined_data.shape[0]+1)
ax.xaxis.set_major_locator(ticker.MultipleLocator(base=5, offset=0))
ax.xaxis.set_minor_locator(ticker.MultipleLocator(base=2.5, offset=0))
ax.set_ylabel(r"Intensity (a.u.)", fontsize=11)
ax.set_ylim(0.75, 1.05)
ax.yaxis.set_major_locator(ticker.MultipleLocator(base=0.1, offset=0.05))
ax.yaxis.set_minor_locator(ticker.MultipleLocator(base=0.05, offset=0.05))
ax.tick_params(axis='both', which='both', direction='out', top=False, right=False, labelsize=9, labeltop=False, labelright=False)
ax.legend(fontsize=9, loc='upper right', frameon=False, bbox_to_anchor=(0.8, 0.5))

# 保存图片
plt.savefig(
    Path.joinpath(path_out, r"Operando_Cell2_XAS_XANES_06_03_V0_300.tif"),
    transparent=False,
    pad_inches=0.2,
    bbox_inches="tight",
    dpi=300,
    pil_kwargs={"compression": "tiff_lzw"},
)

plt.gcf().set_facecolor("white")
plt.show()

### 比较 ref 和 Dino ref

In [None]:
path_ref = Path(r"D:\CHENG\OneDrive - UAB\ICMAB-Data\Zn-Mn\PaperDos\XAS\Operando\αMnO2\XAS\CLAESS_2022-10\Results\cell2")

ref = pd.read_csv(
    path_ref.joinpath(r"Overview", r"cell2_opXAS_P1_Mn_Oct2022_1.nor"),
    sep=r"\s+",
    index_col=None,
    header=None,
    comment="#",
).iloc[:, [0, 2, 4]]

ref = ref[(ref.iloc[:, 0] >= 6530.0) & (ref.iloc[:, 0] <= 6630.0)].copy().reset_index(drop=True)

In [None]:
from scipy.interpolate import interp1d

path_dino = Path(r"D:\CHENG\OneDrive - UAB\ICMAB-Data\Zn-Mn\PaperDos\XAS\Operando\αMnO2\XAS\CLAESS_2022-10\Results\cell2\Dino")
dino = pd.read_csv(Path.joinpath(path_dino, r"FC1st_3rd_phase.txt"), sep=r"\s+", header=None, index_col=None)
dino = dino[(dino.iloc[:, 0] >= 6530.0) & (dino.iloc[:, 0] <= 6630.0)].copy()

interp_func = interp1d(dino.iloc[:, 0], dino.iloc[:, 3], kind="quadratic", bounds_error=False, fill_value=0)
dino_interp = interp_func(ref.iloc[:, 0])

In [None]:
# 画图
# gridspec inside gridspec
plt.close("all")
%matplotlib inline
fig = plt.figure(figsize=(3.3, 2.5))
gs = gridspec.GridSpec(1, 1, width_ratios=None, height_ratios=None, wspace=0, hspace=0, figure=fig)

# 图 A
subfig = fig.add_subfigure(gs[0, 0])
ax = subfig.add_axes((0, 0, 1.0, 1.0))
ax.set_box_aspect(0.8)

ax.plot(ref.iloc[:, 0], ref.iloc[:, 1], c=colors[0], ls="-", lw=1.0, label=r"$\mathrm{\alpha-MnO_2}$")
ax.plot(ref.iloc[:, 0], ref.iloc[:, 2], c=colors[1], ls="-", lw=1.0, label=r"FC#C")
ax.plot(dino.iloc[:, 0], dino.iloc[:, 3], c=colors[3], ls="--", lw=1.0, label=r"$\mathrm{3^{rd} \ Phase}$")

# 设置刻度线等格式
ax.set_xlim(6530, 6630)
ax.set_xlabel(r"Energy (eV)", fontsize=11)
ax.xaxis.set_major_locator(ticker.MultipleLocator(base=20, offset=10))
ax.xaxis.set_minor_locator(ticker.MultipleLocator(base=10, offset=10))
ax.set_ylim(0, 1.6)
ax.set_ylabel(r"Absorption (arb.u.)", fontsize=11)
ax.yaxis.set_major_locator(ticker.MultipleLocator(base=0.4))
ax.yaxis.set_minor_locator(ticker.MultipleLocator(base=0.2))

ax.tick_params(axis="both", which="both", direction="out", left=True, top=False, right=False, labelbottom=True, labelleft=True)
ax.legend(
    loc="upper left",
    bbox_to_anchor=(0.55, 1),
    ncols=1,
    frameon=False,
    labelcolor="linecolor",
    fontsize=10,
    handlelength=3,
    handletextpad=0.2,
)

# 保存图片
plt.savefig(
    Path.joinpath(path_out, r"XAS_XANES_00_V0_300.tif"),
    transparent=False,
    pad_inches=0.05,
    bbox_inches="tight",
    dpi=300,
    pil_kwargs={"compression": "tiff_lzw"},
)
# plt.savefig(
#     Path.joinpath(path_out, r"XAS_XANES_00_V0_600.tif"),
#     transparent=False,
#     pad_inches=0.05,
#     bbox_inches="tight",
#     dpi=600,
#     pil_kwargs={"compression": "tiff_lzw"},
# )

plt.gcf().set_facecolor("white")
plt.show()

### 三个物相 MCR-ALS, P2b

In [None]:
from scipy.interpolate import interp1d

# 读取 reference + operando 数据
path_file = Path(r"D:\CHENG\OneDrive - UAB\ICMAB-Data\Zn-Mn\PaperDos\XAS\Operando\αMnO2\XAS\CLAESS_2022-10\Results\cell2")

data1 = pd.read_csv(
    Path.joinpath(path_file, r"Overview", r"cell2_opXAS_P2b_Mn_Oct2022_1.nor"),
    sep=r"\s+",
    index_col=None,
    header=None,
    comment="#",
)
data2 = pd.read_csv(
    Path.joinpath(path_file, r"Overview", r"cell2_opXAS_P2b_Mn_Oct2022_2.nor"),
    sep=r"\s+",
    index_col=None,
    header=None,
    comment="#",
)
xas_Mn = pd.concat([data1, data2.iloc[:, 5:]], axis=1, ignore_index=True)  # noqa: N816
charge_index = [0, 1, 2, 3, 4, *list(range(14, xas_Mn.shape[1], 1))]
xas_Mn = xas_Mn.iloc[:, charge_index]  # noqa: N816
pdf_Mn = xas_Mn.iloc[:, 0:5]  # noqa: N816
pdf_Mn.columns = [
    r"Energy",
    r"0.2M_MnSO4(aq.)",
    r"alpha_MnO2_Electrode",
    r"alpha_MnO2_Powder",
    r"FC1st",
]
opData_Mn = xas_Mn.iloc[:, [0, *list(range(5, xas_Mn.shape[1]))]]  # noqa: N816
opData_Mn.columns = list(range(0, opData_Mn.shape[1], 1))

# Dino
path_dino = Path(r"D:\CHENG\OneDrive - UAB\ICMAB-Data\Zn-Mn\PaperDos\XAS\Operando\αMnO2\XAS\CLAESS_2022-10\Results\cell2\Dino")
dino = pd.read_csv(Path.joinpath(path_dino, r"FC1st_3rd_phase.txt"), sep=r"\s+", header=None, index_col=None)
# dino = dino[(dino.iloc[:, 0] >= 6530.0) & (dino.iloc[:, 0] <= 6630.0)].copy()

interp_func = interp1d(dino.iloc[:, 0], dino.iloc[:, 3], kind="quadratic", bounds_error=False, fill_value=0)
dino_interp = interp_func(pdf_Mn["Energy"])
pdf_Mn = pd.concat([pdf_Mn, pd.Series(dino_interp, name="Dino")], axis=1, ignore_index=True)  # noqa: N816

In [None]:
# XANES + REF
data_all = pd.concat([pdf_Mn.iloc[:, [0, 1, 2, 5]], opData_Mn.iloc[:, 1:]], axis=1, ignore_index=True)

scp_data_all = scp.NDDataset(
    data=data_all.iloc[:, 1:].T.to_numpy(),
    name="opXAS_Mn_CLAESS_2022",
    author="Cheng Liu, Ashely, and Dino Tonti",
    description="opXAS_Mn_CLAESS_2022",
    history="creation",
    title="absorption",
    units=ur.absorbance,
)

scp_data_all.y = scp.Coord.arange(data_all.shape[1] - 1, title="spectrum number")
scp_data_all.x = scp.Coord(data_all.iloc[:, 0].values, title="energy", units=ur.eV)

XANES_all = scp_data_all[:, 6530.0:6630.0]

_ = XANES_all.plot(title="Normalized Data")

In [None]:
# pd.concat([pd.Series(XANES_all.x.data), pd.DataFrame(XANES_all.T.data)], axis=1, ignore_index=True).to_csv(
#     Path.joinpath(path_out, r"cell2_opXAS_P2b_Mn_XANES_6530-6630.csv"), header=["E"] + [f"{i}" for i in range(XANES_all.shape[0])],
#     index=False
# )

In [None]:
XANES = XANES_all[3:, :]
ref_XANES = XANES_all[0:3, :]  # noqa: N816
_ = XANES.plot(title="Normalized Data")
_ = ref_XANES.plot(title=r"Normalized Data and Refs", ls="--", clear=False)

In [None]:
# # pca analysis
pca = scp.PCA()
pca.fit(XANES)
pca.printev()
_ = pca.screeplot()
# loadings = pca.loadings
# scores = pca.scores
# _ = pca.loadings.plot(); _.set_ylabel('loadings')
# _ = pca.scores.T.plot(); _.set_ylabel('scores')
# _ =pca.scoreplot(pca.scores, 1, 2, color_mapping="labels")

#### 三个标样做约束， Mn2+，MnO2，3rd phase

In [None]:
def get_St(St):  # St must be passed, even if not actuially used  # noqa: N802, N803
    St[0, :] = ref_XANES[0]
    St[1, :] = ref_XANES[1]
    # St[2, :] = ref_XANES[2]
    return St


# def get_C(C): # noqa: N802, N803
#     C[:, 0] = 0.633
#     C[:, 1] = 0.367
#     C[:, 2] = 0.0
#     return C

St0 = np.stack(
    (
        ref_XANES[0].squeeze().data,
        ref_XANES[1].squeeze().data,
        XANES[0].squeeze().data,
    ),
    axis=0,
)

St0 = scp.NDDataset(
    St0,
    name="Dataset initial guesses spectrum",
    author="ChengLiu",
    description="Initial spectrum guesses",
)

mcr_3 = scp.MCRALS(
    max_iter=300,
    maxdiv=100,
    log_level="ERROR",
    tol=0.001,
    closureConc=[0, 1, 2],
    closureMethod="constantSum",
    closureTarget="default",  # closure constraints
    nonnegConc="all",
    solverConc="nnls",  # Non-negativity constraint
    nonnegSpec="all",
    solverSpec="nnls",  # Non-negativity constraint
)
mcr_3.hardSpec = [0, 1, 2]
mcr_3.getSpec = get_St
mcr_3.getSt_to_St_idx = [0, 1, 2]
# mcr_3.hardConc = [0, 1, 2]
# mcr_3.getConc = get_C
# mcr_3.getC_to_C_idx =  [0, 1, 2]

mcr_3.fit(XANES, St0)

mcr_3.kwargsGetConc = {
    "ivp_solver_kwargs": {"return_NDDataset": False},
    "optimizer_kwargs": {"options": {"disp": False}},
}

_ = mcr_3.C.T.plot(clear=True, title=r"Concentration")
_ = mcr_3.St.plot(clear=True)
_ = ref_XANES.plot(clear=False, ls="--", title=r"MCR-ALS and Refs")

_ = mcr_3.plotmerit(nb_traces=5, clear=True)
mcr_3_residual = XANES - mcr_3.inverse_transform()

In [None]:
Mn_MCR_3_stFixed = xr.Dataset(
    data_vars={
        "MCR_C": (["spectrum_number", "mcr_number"], mcr_3.C.data),
        "MCR_C_constrained": (["spectrum_number", "mcr_number"], mcr_3.C_constrained.data),
        "MCR_St": (["mcr_number", "pca_energy"], mcr_3.St.data),
        "MCR_recon_Data": (["spectrum_number", "pca_energy"], mcr_3.inverse_transform().data),
        "MCR_residual": ((["spectrum_number", "pca_energy"], mcr_3_residual.data)),
        "Normalized_XANES": (["spectrum_number", "pca_energy"], XANES.data),
        "Ref_XANES": (["mcr_number", "pca_energy"], ref_XANES.data),
    },
    coords={
        "spectrum_number": np.arange(XANES.shape[0]),
        "pca_energy": XANES.x.data,
        "mcr_number": np.arange(mcr_3.St.shape[0]),
    },
)

Mn_MCR_3_stFixed.to_netcdf(Path.joinpath(path_out, r"P2b_Mn_MCR_3_stFixed.NETCDF4"), mode="w", engine="h5netcdf")

##### C 主导

In [None]:
# 读取 C0 的值

concentration_path = Path(r"D:\CHENG\OneDrive - UAB\ICMAB-Data\Zn-Mn\PaperDos\XAS\Operando\αMnO2\XAS\CLAESS_2022-10\Results\cell2\FC1st-FD2nd\Oct2022_cell3_P2b\XANES\Mn\MCR")
C0 = pd.read_csv(
    concentration_path.joinpath(r"mcr_concetrations_3.dat"),
    sep=r"\s+",
    index_col=None,
    header=0,
    comment="#",
)

C0 = C0.iloc[3:, 0:3].to_numpy()

In [None]:
pd.DataFrame(C0, columns=["C0_alpha_MnO2", "C0_FC1st", "C0_3rdPhase"]).plot()

In [None]:
def get_St(St):  # St must be passed, even if not actuially used  # noqa: N802, N803
    St[0, :] = ref_XANES[0]
    St[1, :] = ref_XANES[1]
    St[2, :] = ref_XANES[2]
    return St


def get_C(C):  # noqa: N802, N803
    C[:, 0] = 0.376
    C[:, 1] = 0.624
    C[:, 2] = 0.0
    return C


C0_initial = scp.NDDataset(
    C0,
    name="Dataset initial guesses spectrum",
    author="ChengLiu",
    description="Initial spectrum guesses",
)

mcr_3 = scp.MCRALS(
    max_iter=300,
    maxdiv=100,
    log_level="ERROR",
    tol=0.001,
    closureConc=[0, 1, 2],
    closureMethod="constantSum",
    closureTarget="default",  # closure constraints
    nonnegConc="all",
    solverConc="nnls",  # Non-negativity constraint
    nonnegSpec="all",
    solverSpec="nnls",  # Non-negativity constraint
)
order_number = [0, 2, 1]
mcr_3.hardSpec = order_number
mcr_3.getSpec = get_St
mcr_3.getSt_to_St_idx = order_number
mcr_3.hardConc = order_number
mcr_3.getConc = get_C
mcr_3.getC_to_C_idx = order_number
mcr_3.fit(XANES, C0_initial)

mcr_3.kwargsGetConc = {
    "ivp_solver_kwargs": {"return_NDDataset": False},
    "optimizer_kwargs": {"options": {"disp": False}},
}

_ = mcr_3.C.T.plot(clear=True, title=r"Concentration")
_ = mcr_3.St.plot(clear=True)
_ = ref_XANES.plot(clear=False, ls="--", title=r"MCR-ALS and Refs")

_ = mcr_3.plotmerit(nb_traces=5, clear=True)
mcr_3_residual = XANES - mcr_3.inverse_transform()

In [None]:
Mn_MCR_3_cFixed = xr.Dataset(
    data_vars={
        "MCR_C": (["spectrum_number", "mcr_number"], mcr_3.C.data),
        "MCR_C_constrained": (["spectrum_number", "mcr_number"], mcr_3.C_constrained.data),
        "MCR_St": (["mcr_number", "pca_energy"], mcr_3.St.data),
        "MCR_recon_Data": (["spectrum_number", "pca_energy"], mcr_3.inverse_transform().data),
        "MCR_residual": ((["spectrum_number", "pca_energy"], mcr_3_residual.data)),
        "Normalized_XANES": (["spectrum_number", "pca_energy"], XANES.data),
        "Ref_XANES": (["mcr_number", "pca_energy"], ref_XANES.data),
    },
    coords={
        "spectrum_number": np.arange(XANES.shape[0]),
        "pca_energy": XANES.x.data,
        "mcr_number": np.arange(mcr_3.St.shape[0]),
    },
)

Mn_MCR_3_cFixed.to_netcdf(Path.joinpath(path_out, r"P2b_Mn_MCR_3_cFixed.NETCDF4"), mode="w", engine="h5netcdf")

### 动力学，("A -> B",)

In [None]:
reactions = ("A -> B",)
species_concentrations = {
    "A": 0.624,
    "B": 0.0,
}
optimization_history = []


def get_St(St):  # St must be passed, even if not actuially used  # noqa: N802, N803
    St[0, :] = ref_XANES[0]
    # St[1, :] = ref_XANES[1]
    # St[2, :] = ref_XANES[2]
    return St


def evaluate_kinetics(k, XANES, ref_XANES):  # noqa: N803
    try:
        k0 = np.array([k])
        kin = scp.ActionMassKinetics(reactions, species_concentrations, k0)
        Ckin = kin.integrate(XANES.y.data)  # noqa: N806

        param_to_optimize = {"k[0]": k}
        mcr_4 = scp.MCRALS(
            log_level="ERROR",
            max_iter=300,
            maxdiv=10,
            tol=0.001,
            closureConc=[0, 1],
            closureMethod="constantSum",
            closureTarget="default",  # closure constraints
            nonnegConc="all",
            solverConc="nnls",  # Non-negativity constraint
            nonnegSpec="all",
            solverSpec="nnls",  # Non-negativity constraint
        )
        mcr_4.hardSpec = [0, 1]
        mcr_4.getSpec = get_St
        mcr_4.hardSt_to_St_idx = [0, 1]
        mcr_4.hardConc = [0, 1]
        mcr_4.getConc = kin.fit_to_concentrations
        mcr_4.argsGetConc = ([0, 1], [0, 1], param_to_optimize)
        mcr_4.kwargsGetConc = {
            "ivp_solver_kwargs": {"return_NDDataset": False, "method": "BDF"},
            "optimizer_kwargs": {"options": {"disp": False}},
        }
        mcr_4.fit(XANES, Ckin)
        std_st_Mn = float(np.std(np.asarray(mcr_4.St.data[1, :] - ref_XANES[2].data)))  # noqa: N806
        return std_st_Mn, mcr_4
    except Exception as e:
        print(f"[Warning] k={k:.3f}, failed: {e}")
        return float("inf"), None


def objective(k_array):
    k_val = float(np.squeeze(k_array))
    std_st_Mn, _ = evaluate_kinetics(k_val, XANES, ref_XANES)  # noqa: N806
    optimization_history.append((k_val, std_st_Mn))
    return std_st_Mn


initial_guess = np.array([0.1])
bounds = [(0.0, 1.0)]
opt_result = minimize(
    objective,
    initial_guess,
    method="Powell",
    bounds=bounds,
    options={"maxiter": 100, "ftol": 1e-6},
)

if not opt_result.success:
    raise RuntimeError(f"Optimization failed: {opt_result.message}")

best_k = float(opt_result.x[0])
best_std, best_mcr_model = evaluate_kinetics(best_k, XANES, ref_XANES)
# Ensure we obtained a valid MCR model
if best_mcr_model is None:
    raise RuntimeError(f"Failed to obtain MCR model at optimized k={best_k:.6f}")

optimization_df = pd.DataFrame(optimization_history, columns=["k", "std_comp1"]).drop_duplicates(subset="k", keep="last").sort_values("k").reset_index(drop=True)
print(f"最佳反应速率常数 k = {best_k:.6f}, 对应标准差 = {best_std:.6g}")

optimization_df.to_csv(
    Path.joinpath(path_out, r"P2b_Mn_MCRHM_2reaction_comp_3_A_B_B_AC.csv"),
    sep=",",
    index=False,
    header=True,
)

result = optimization_df
optimal_k = best_k
optimal_mcr_model = best_mcr_model

In [None]:
from functools import lru_cache

from scipy.optimize import minimize_scalar

reactions = ("A -> B",)
species_concentrations = {
    "A": 0.624,
    "B": 0.0,
}
optimization_history = []
BIG = 1e12  # 失败/NaN 惩罚


def get_St(St):  # noqa: N802, N803
    St[0, :] = ref_XANES[0]
    # St[1, :] = ref_XANES[1]
    # St[2, :] = ref_XANES[2]
    return St


def evaluate_kinetics(k, XANES, ref_XANES):  # noqa: N803
    try:
        k0 = np.array([float(k)])
        kin = scp.ActionMassKinetics(reactions, species_concentrations, k0)
        Ckin = kin.integrate(XANES.y.data)  # noqa: N806

        param_to_optimize = {"k[0]": float(k)}
        mcr_4 = scp.MCRALS(
            log_level="ERROR",
            max_iter=300,
            maxdiv=10,
            tol=0.001,
            closureConc=[0, 1],
            closureMethod="constantSum",
            closureTarget="default",
            nonnegConc="all",
            solverConc="nnls",
            nonnegSpec="all",
            solverSpec="nnls",
        )
        mcr_4.hardSpec = [0, 1]
        mcr_4.getSpec = get_St
        mcr_4.hardSt_to_St_idx = [0, 1]
        mcr_4.hardConc = [0, 1]
        mcr_4.getConc = kin.fit_to_concentrations
        mcr_4.argsGetConc = ([0, 1], [0, 1], param_to_optimize)
        mcr_4.kwargsGetConc = {
            "ivp_solver_kwargs": {"return_NDDataset": False, "method": "BDF"},
            "optimizer_kwargs": {"options": {"disp": False}},
        }
        mcr_4.fit(XANES, Ckin)

        diff = np.asarray(mcr_4.St.data[1, :] - ref_XANES[2].data)
        std_st_Mn = float(np.std(diff))  # noqa: N806
        if not np.isfinite(std_st_Mn):
            return BIG, None
        return std_st_Mn, mcr_4

    except Exception as e:
        print(f"[Warning] k={k:.6f}, failed: {e}")
        return BIG, None


@lru_cache(maxsize=None)
def _objective_cached(k):
    std_val, _ = evaluate_kinetics(k, XANES, ref_XANES)
    return float(std_val)


def objective(k):
    k = float(k)
    f = _objective_cached(k)
    optimization_history.append((k, f))
    return f


# 一维有界优化更稳：在 [0.0, 1.0] 内
opt = minimize_scalar(objective, bounds=(0.0, 1.0), method="bounded", options={"xatol": 1e-4, "maxiter": 200})

if not opt.success:
    raise RuntimeError(f"Optimization failed: {opt.message}")

best_k = float(opt.x)
best_std, best_mcr_model = evaluate_kinetics(best_k, XANES, ref_XANES)
if best_mcr_model is None:
    raise RuntimeError(f"Failed to obtain MCR model at optimized k={best_k:.6f}")

# 整理记录并保存
optimization_df = pd.DataFrame(optimization_history, columns=["k", "std_comp1"]).drop_duplicates(subset="k", keep="last").sort_values("k").reset_index(drop=True)
print(f"最佳反应速率常数 k = {best_k:.6f}, 对应标准差 = {best_std:.6g}")

optimization_df.to_csv(
    Path.joinpath(path_out, r"P2a_Mn_MCRHM_2reaction_comp_3_A_B_B_AC.csv"),
    sep=",",
    index=False,
    header=True,
)

result = optimization_df
optimal_k = best_k
optimal_mcr_model = best_mcr_model

In [None]:
optimization_df.sort_values("std_comp1").head()

In [None]:
# 将上述结果作为初始值，进行优化处理
# kinetic model
ka = optimal_k
reactions = ("A -> B",)  # note the coma: this is now a tuple of size 1
species_concentrations = {"A": 1.0, "B": 0.0}

mcr_4 = optimal_mcr_model
if mcr_4 is None:
    _, mcr_4 = evaluate_kinetics(ka, XANES, ref_XANES)
    if mcr_4 is None:
        raise RuntimeError("Unable to retrieve MCR model with optimized kinetics.")
_ = mcr_4.C.T.plot(ylabel=r"relative ratio", ylims=(-0.1, 0.8))
_ = mcr_4.C_constrained.T.plot(clear=False, ls="--", ylims=(-0.1, 0.8))
_ = mcr_4.St.plot(clear=True, ylims=(-0.1, 2.0))
_ = ref_XANES.plot(clear=False, ls="--", ylims=(-0.1, 2.0))
_ = mcr_4.plotmerit(nb_traces=1)
mcr_4_residual = XANES - mcr_4.inverse_transform()

Mn_MCRHM_TworeactionData = xr.Dataset(
    data_vars={
        "MCR_HM_C": (["spectrum_number", "mcr_number"], mcr_4.C.data),
        "MCR_HM_C_constrained": (["spectrum_number", "mcr_number"], mcr_4.C_constrained.data),
        "MCR_HM_St": (["mcr_number", "pca_energy"], mcr_4.St.data),
        "MCR_HM_recon_Data": (["spectrum_number", "pca_energy"], mcr_4.inverse_transform().data),
        "MCR_HM_residual": ((["spectrum_number", "pca_energy"], mcr_4_residual.data)),
    },
    coords={
        "spectrum_number": XANES.y.data,
        "pca_energy": XANES.x.data,
        "mcr_number": np.arange(mcr_4.St.shape[0]),
    },
    attrs={
        "reactions": reactions,
        "species_concentrations": str(species_concentrations),
        "k0": str(ka),
    },
)

# (
#     xr.merge(
#         [Mn_NormalData, Mn_PCA, Mn_MCRHM_TworeactionData], compat="no_conflicts", combine_attrs="no_conflicts"
#     ).to_netcdf(Path.joinpath(path_out, r"P2a_Mn_MCRHM_2reaction_comp_3_A_B_B_AC.NETCDF4"), mode="w", engine="h5netcdf")
# )

### 动力学，("A + B -> C",), ("A -> B + C",)

#### 初始条件未预设

In [None]:
from functools import lru_cache

from scipy.optimize import minimize

reactions = ("A -> B + C",)
S_total = 1.0

# 仅用于初始化/占位，真正用于积分的初始浓度会在映射函数里被覆盖
species_concentrations_template = {"A": 0.624, "B": 0.376, "C": 0.0}

optimization_history = []
BIG = 1e12  # 失败/NaN 惩罚

k_min, k_max = 0.0, 1.0  # 你要优化的 k 的物理/经验范围（可改）


def _sigmoid(x):
    return 1.0 / (1.0 + np.exp(-x))


def _softmax(v):
    v = np.asarray(v, dtype=float)
    v = v - np.max(v)
    ev = np.exp(v)
    return ev / np.sum(ev)


def map_params(theta):
    """
    theta: (4,) 实数向量 -> (k, conc0_dict)
      - theta[0] -> k in [k_min, k_max] via sigmoid
      - theta[1:4] -> (A0,B0,C0) >= 0, sum=S_total via softmax
    """
    t = np.asarray(theta, dtype=float)
    k = k_min + (k_max - k_min) * _sigmoid(t[0])
    w = _softmax(t[1:4])
    conc0 = {"A": S_total * w[0], "B": S_total * w[1], "C": S_total * w[2]}
    return float(k), conc0


def get_St(St):  # noqa: N802, N803
    St[0, :] = ref_XANES[0]
    St[1, :] = ref_XANES[1]
    # St[2, :] = ref_XANES[2]
    return St


def evaluate_kinetics_params(theta, XANES, ref_XANES):  # noqa: N803

    try:
        k, species_concentrations = map_params(theta)
        k0 = np.array([k], dtype=float)

        # 用映射得到的浓度作为初始条件
        kin = scp.ActionMassKinetics(reactions, species_concentrations, k0)
        Ckin = kin.integrate(XANES.y.data)  # 时间轴来自 XANES.y.data

        param_to_optimize = {"k[0]": float(k)}

        mcr_4 = scp.MCRALS(
            log_level="ERROR",
            max_iter=300,
            maxdiv=10,
            tol=0.001,
            closureConc=[0, 1, 2],
            closureMethod="constantSum",
            closureTarget="default",
            nonnegConc="all",
            solverConc="nnls",
            nonnegSpec="all",
            solverSpec="nnls",
        )
        mcr_4.hardSpec = [0, 1, 2]
        mcr_4.getSpec = get_St
        mcr_4.hardSt_to_St_idx = [0, 1, 2]
        mcr_4.hardConc = [0, 1, 2]
        mcr_4.getConc = kin.fit_to_concentrations
        mcr_4.argsGetConc = ([0, 1, 2], [0, 1, 2], param_to_optimize)
        mcr_4.kwargsGetConc = {
            "ivp_solver_kwargs": {"return_NDDataset": False, "method": "BDF"},
            "optimizer_kwargs": {"options": {"disp": False}},
        }
        mcr_4.fit(XANES, Ckin)

        diff = np.asarray(mcr_4.St.data[2, :] - ref_XANES[2].data)
        std_val = float(np.std(diff))
        if not np.isfinite(std_val):
            return BIG, None, k, species_concentrations

        return std_val, mcr_4, k, species_concentrations

    except Exception as e:
        print(f"[Warning] theta={np.array(theta)}, failed: {e}")
        k_dbg, conc_dbg = map_params(theta)
        return BIG, None, k_dbg, conc_dbg


def _round_tuple(x, ndigits=6):
    x = np.asarray(x, dtype=float).ravel().tolist()
    return tuple(round(v, ndigits) for v in x)


@lru_cache(maxsize=None)
def _objective_cached(theta_key):
    theta = np.array(theta_key, dtype=float)
    f, _, k_mapped, conc0 = evaluate_kinetics_params(theta, XANES, ref_XANES)
    optimization_history.append((k_mapped, conc0["A"], conc0["B"], conc0["C"], f))
    return float(f)


def objective(theta):
    theta_key = _round_tuple(theta, ndigits=6)
    return _objective_cached(theta_key)


theta0 = np.array([0.0, 0.0, 0.0, 0.0])  # 初值：k 的 sigmoid ~ 0.5 -> k ~ 0.5*(k_max-k_min)
opt = minimize(
    objective,
    theta0,
    method="Powell",
    options={"maxiter": 400, "xtol": 1e-4, "ftol": 1e-6, "disp": False},
)

if not opt.success:
    raise RuntimeError(f"Optimization failed: {opt.message}")

# 还原最优物理参数
k_best, conc0_best = map_params(opt.x)
best_std, best_mcr_model, _, _ = evaluate_kinetics_params(opt.x, XANES, ref_XANES)
if best_mcr_model is None:
    raise RuntimeError(f"Failed to obtain MCR model at optimized params, k={k_best:.6f}")

optimization_df = pd.DataFrame(optimization_history, columns=["k", "A0", "B0", "C0", "std_comp"]).drop_duplicates(subset=["k", "A0", "B0", "C0"], keep="last").reset_index(drop=True)
print(f"最佳参数: k = {k_best:.6f}, A0 = {conc0_best['A']:.6f}, B0 = {conc0_best['B']:.6f}, C0 = {conc0_best['C']:.6f}, 对应标准差 = {best_std:.6g}")

optimization_df.to_csv(
    Path.joinpath(path_out, r"P2a_Mn_MCRHM_2reaction_comp_opt_k_initConc.csv"),
    sep=",",
    index=False,
    header=True,
)

result = optimization_df
optimal_k = k_best
optimal_mcr_model = best_mcr_model
optimal_init_conc = conc0_best

#### 初始条件预设

In [None]:
reactions = ("A -> B + C",)
species_concentrations = {"A": 0.624, "B": 0.376, "C": 0.0}
optimization_history = []


def get_St(St):  # St must be passed, even if not actuially used  # noqa: N802, N803
    St[0, :] = ref_XANES[0]
    St[1, :] = ref_XANES[1]
    # St[2, :] = ref_XANES[2]
    return St


def evaluate_kinetics(k, XANES, ref_XANES):  # noqa: N803
    try:
        k0 = np.array([k])
        kin = scp.ActionMassKinetics(reactions, species_concentrations, k0)
        Ckin = kin.integrate(XANES.y.data)  # noqa: N806

        param_to_optimize = {"k[0]": k}
        mcr_4 = scp.MCRALS(
            log_level="ERROR",
            max_iter=300,
            maxdiv=10,
            tol=0.001,
            closureConc=[0, 1, 2],
            closureMethod="constantSum",
            closureTarget="default",  # closure constraints
            nonnegConc="all",
            solverConc="nnls",  # Non-negativity constraint
            nonnegSpec="all",
            solverSpec="nnls",  # Non-negativity constraint
        )
        mcr_4.hardSpec = [0, 1, 2]
        mcr_4.getSpec = get_St
        mcr_4.hardSt_to_St_idx = [0, 1, 2]
        mcr_4.hardConc = [0, 1, 2]
        mcr_4.getConc = kin.fit_to_concentrations
        mcr_4.argsGetConc = ([0, 1, 2], [0, 1, 2], param_to_optimize)
        mcr_4.kwargsGetConc = {
            "ivp_solver_kwargs": {"return_NDDataset": False, "method": "BDF"},
            "optimizer_kwargs": {"options": {"disp": False}},
        }
        mcr_4.fit(XANES, Ckin)
        std_st_Mn = float(np.std(np.asarray(mcr_4.St.data[2, :] - ref_XANES[2].data)))  # noqa: N806
        return std_st_Mn, mcr_4
    except Exception as e:
        print(f"[Warning] k={k:.3f}, failed: {e}")
        return float("inf"), None


def objective(k_array):
    k_val = float(np.squeeze(k_array))
    std_st_Mn, _ = evaluate_kinetics(k_val, XANES, ref_XANES)  # noqa: N806
    optimization_history.append((k_val, std_st_Mn))
    return std_st_Mn


initial_guess = np.array([0.1])
bounds = [(0.0, 1.0)]
opt_result = minimize(
    objective,
    initial_guess,
    method="Powell",
    bounds=bounds,
    options={"maxiter": 100, "ftol": 1e-6},
)

if not opt_result.success:
    raise RuntimeError(f"Optimization failed: {opt_result.message}")

best_k = float(opt_result.x[0])
best_std, best_mcr_model = evaluate_kinetics(best_k, XANES, ref_XANES)
# Ensure we obtained a valid MCR model
if best_mcr_model is None:
    raise RuntimeError(f"Failed to obtain MCR model at optimized k={best_k:.6f}")

optimization_df = pd.DataFrame(optimization_history, columns=["k", "std_comp1"]).drop_duplicates(subset="k", keep="last").sort_values("k").reset_index(drop=True)
print(f"最佳反应速率常数 k = {best_k:.6f}, 对应标准差 = {best_std:.6g}")

optimization_df.to_csv(
    Path.joinpath(path_out, r"P2a_Mn_MCRHM_2reaction_comp_3_A_B_B_AC.csv"),
    sep=",",
    index=False,
    header=True,
)

result = optimization_df
optimal_k = best_k
optimal_mcr_model = best_mcr_model

In [None]:
from functools import lru_cache

from scipy.optimize import minimize_scalar

reactions = ("A -> B + C",)
species_concentrations = {"A": 0.624, "B": 0.376, "C": 0.0}
optimization_history = []
BIG = 1e12  # 失败/NaN 惩罚


def get_St(St):  # noqa: N802, N803
    St[0, :] = ref_XANES[0]
    St[1, :] = ref_XANES[1]
    # St[2, :] = ref_XANES[2]
    return St


def evaluate_kinetics(k, XANES, ref_XANES):  # noqa: N803
    try:
        k0 = np.array([float(k)])
        kin = scp.ActionMassKinetics(reactions, species_concentrations, k0)
        Ckin = kin.integrate(XANES.y.data)  # noqa: N806

        param_to_optimize = {"k[0]": float(k)}
        mcr_4 = scp.MCRALS(
            log_level="ERROR",
            max_iter=300,
            maxdiv=10,
            tol=0.001,
            closureConc=[0, 1, 2],
            closureMethod="constantSum",
            closureTarget="default",
            nonnegConc="all",
            solverConc="nnls",
            nonnegSpec="all",
            solverSpec="nnls",
        )
        mcr_4.hardSpec = [0, 1, 2]
        mcr_4.getSpec = get_St
        mcr_4.hardSt_to_St_idx = [0, 1, 2]
        mcr_4.hardConc = [0, 1, 2]
        mcr_4.getConc = kin.fit_to_concentrations
        mcr_4.argsGetConc = ([0, 1, 2], [0, 1, 2], param_to_optimize)
        mcr_4.kwargsGetConc = {
            "ivp_solver_kwargs": {"return_NDDataset": False, "method": "BDF"},
            "optimizer_kwargs": {"options": {"disp": False}},
        }
        mcr_4.fit(XANES, Ckin)

        diff = np.asarray(mcr_4.St.data[2, :] - ref_XANES[2].data)
        std_st_Mn = float(np.std(diff))  # noqa: N806
        if not np.isfinite(std_st_Mn):
            return BIG, None
        return std_st_Mn, mcr_4

    except Exception as e:
        print(f"[Warning] k={k:.6f}, failed: {e}")
        return BIG, None


@lru_cache(maxsize=None)
def _objective_cached(k):
    std_val, _ = evaluate_kinetics(k, XANES, ref_XANES)
    return float(std_val)


def objective(k):
    k = float(k)
    f = _objective_cached(k)
    optimization_history.append((k, f))
    return f


# 一维有界优化更稳：在 [0.0, 1.0] 内
opt = minimize_scalar(objective, bounds=(0.0, 1.0), method="bounded", options={"xatol": 1e-4, "maxiter": 200})

if not opt.success:
    raise RuntimeError(f"Optimization failed: {opt.message}")

best_k = float(opt.x)
best_std, best_mcr_model = evaluate_kinetics(best_k, XANES, ref_XANES)
if best_mcr_model is None:
    raise RuntimeError(f"Failed to obtain MCR model at optimized k={best_k:.6f}")

# 整理记录并保存
optimization_df = pd.DataFrame(optimization_history, columns=["k", "std_comp1"]).drop_duplicates(subset="k", keep="last").sort_values("k").reset_index(drop=True)
print(f"最佳反应速率常数 k = {best_k:.6f}, 对应标准差 = {best_std:.6g}")

optimization_df.to_csv(
    Path.joinpath(path_out, r"P2a_Mn_MCRHM_2reaction_comp_3_A_B_B_AC.csv"),
    sep=",",
    index=False,
    header=True,
)

result = optimization_df
optimal_k = best_k
optimal_mcr_model = best_mcr_model

In [None]:
optimization_df.sort_values("std_comp1").head()

In [None]:
# 将上述结果作为初始值，进行优化处理
# kinetic model
ka = optimal_k
reactions = ("A -> B + C",)  # note the coma: this is now a tuple of size 1
species_concentrations = {"A": 0.624, "B": 0.376, "C": 0.0}

mcr_4 = optimal_mcr_model
if mcr_4 is None:
    _, mcr_4 = evaluate_kinetics(ka, XANES, ref_XANES)
    if mcr_4 is None:
        raise RuntimeError("Unable to retrieve MCR model with optimized kinetics.")
_ = mcr_4.C.T.plot(ylabel=r"relative ratio")
_ = mcr_4.C_constrained.T.plot(clear=False, ls="--")
_ = mcr_4.St.plot(clear=True, ylims=(-0.1, 1.8))
_ = ref_XANES.plot(clear=False, ls="--", title=r"HM_MCR-ALS and refs")
_ = mcr_4.plotmerit(nb_traces=5)
mcr_4_residual = XANES - mcr_4.inverse_transform()

Mn_MCRHM_TworeactionData = xr.Dataset(
    data_vars={
        "MCR_HM_C": (["spectrum_number", "mcr_number"], mcr_4.C.data),
        "MCR_HM_C_constrained": (["spectrum_number", "mcr_number"], mcr_4.C_constrained.data),
        "MCR_HM_St": (["mcr_number", "pca_energy"], mcr_4.St.data),
        "MCR_HM_recon_Data": (["spectrum_number", "pca_energy"], mcr_4.inverse_transform().data),
        "MCR_HM_residual": ((["spectrum_number", "pca_energy"], mcr_4_residual.data)),
    },
    coords={
        "spectrum_number": XANES.y.data,
        "pca_energy": XANES.x.data,
        "mcr_number": np.arange(mcr_4.St.shape[0]),
    },
    attrs={
        "reactions": reactions,
        "species_concentrations": str(species_concentrations),
        "k0": str(ka),
    },
)

# (
#     xr.merge(
#         [Mn_NormalData, Mn_PCA, Mn_MCRHM_TworeactionData], compat="no_conflicts", combine_attrs="no_conflicts"
#     ).to_netcdf(Path.joinpath(path_out, r"P2a_Mn_MCRHM_2reaction_comp_3_A_B_B_AC.NETCDF4"), mode="w", engine="h5netcdf")
# )