In [None]:
# Parameters : 入力パラメータ
file: str = ""
"""The filepath of data to analyze."""
outputdir: str = "."
"""Path to the directory to save the csv files of the analyzed data."""
wavelength_range: list[float] | None = None
"""The wavelength range to find peaks."""
dump_csv: bool = True
"""Whether the dataframe of the analyzed data is saved as CSV files.""";

In [None]:
# Validate parameters : 入力パラメータを検証
import pathlib

if not pathlib.Path(file).exists():
    raise FileNotFoundError(f"{file} not found")
if not (path := pathlib.Path(outputdir)).exists():
    path.mkdir(parents=True)
if (wavelength_range is not None) and (len(wavelength_range) != 2):
    raise ValueError("The length of `wavelength_range` must be 2")

In [None]:
# Define data types : データ型を定義
import typing as t

import pandas as pd


class AnalyzedData(t.NamedTuple):
    wdf: pd.DataFrame
    tdf: pd.DataFrame

In [None]:
# Defilne `analyze` function : データを解析する関数を定義
import os

import numpy as np
from sklearn import metrics
from tlab_analysis import apparatus, trpl, utils


def analyze(
    filepath: str | os.PathLike[str],
    fitting_function: t.Any,
    **kwargs: t.Any,
) -> AnalyzedData:
    # データの読み込み
    data = trpl.read_file(filepath)
    # 感度補正
    data = apparatus.streakscope.correct(data)
    # ピークを取得
    wdf = data.aggregate_along_time()
    range_to_find_peak = tuple(
        wavelength_range or wdf["wavelength"].quantile([0.20, 0.80])
    )
    is_to_find_peak = wdf["wavelength"].between(*range_to_find_peak)
    peaks = utils.find_peaks(
        wdf["wavelength"][is_to_find_peak],
        wdf["intensity"][is_to_find_peak],
    )
    # FWHM範囲を取得
    mean_wavelength = wdf["wavelength"].mean()
    peaks.sort(key=lambda peak: abs(peak.x - mean_wavelength))
    FWHM_range = (peaks[0].x0, peaks[0].x1) if len(peaks) > 0 else range_to_find_peak
    # キャリア緩和時間を計算
    tdf = data.aggregate_along_wavelength(FWHM_range)
    tdf["intensity"] = utils.smooth(tdf["intensity"], 0.025)
    decay_range = utils.find_decay_range(tdf["intensity"])
    is_decay = np.logical_and(tdf.index >= decay_range[0], tdf.index <= decay_range[1])
    try:
        params, _ = utils.curve_fit(
            fitting_function,
            tdf["time"][is_decay],
            tdf["intensity"][is_decay],
            **kwargs,
        )
        tdf["fit"] = np.where(is_decay, fitting_function(tdf["time"], *params), np.nan)
        tdf.attrs["params"] = params
        tdf.attrs["r2"] = metrics.r2_score(
            tdf["intensity"][is_decay], tdf["fit"][is_decay]
        )
    except Exception:
        tdf.attrs["r2"] = 0
    return AnalyzedData(wdf, tdf)

In [None]:
# Define `plot` function : データをプロットする関数を定義
import plotly.graph_objects as go
import plotly.io

plotly.io.renderers.default = "notebook+svg"


def plot(data: AnalyzedData) -> go.Figure:
    fig = go.Figure().set_subplots(rows=1, cols=2).update_layout(width=960, height=480)
    # スペクトル図を作成
    FWHM_range = data.tdf.attrs["wavelength_range"]
    fig.add_scatter(
        x=data.wdf["wavelength"],
        y=data.wdf["intensity"],
        line=go.scatter.Line(color="red"),
        showlegend=False,
        row=1,
        col=1,
    ).add_vrect(
        *FWHM_range,
        line=go.layout.shape.Line(width=0),
        fillcolor="orange",
        opacity=0.2,
        showlegend=False,
        annotation=go.layout.Annotation(
            text=f"FWHM: {FWHM_range[1] - FWHM_range[0]:.1f}nm"
        ),
        row=1,
        col=1,
    ).update_xaxes(
        title="Wavelength (nm)",
        row=1,
        col=1,
    ).update_yaxes(
        title="Intensity (arb.units)",
        row=1,
        col=1,
    )
    # PL強度の時間変化グラフを作成
    fig.add_scatter(
        x=data.tdf["time"],
        y=data.tdf["intensity"],
        line=go.scatter.Line(color="red"),
        showlegend=False,
        row=1,
        col=2,
    ).add_scatter(
        x=data.tdf["time"],
        y=data.tdf["fit"],
        line=go.scatter.Line(color="black"),
        showlegend=False,
        row=1,
        col=2,
    ).update_xaxes(
        title="Time (ns)",
        row=1,
        col=2,
    ).update_yaxes(
        title="Intensity (arb.units)",
        type="log",
        range=np.log10(data.tdf["intensity"].max()) + np.log10((0.05, 2)),
        row=1,
        col=2,
    )
    return fig

In [None]:
# Analyze the data : データを解析


def exponential(t: float, a: float, tau: float) -> float:
    return a * np.exp(-t / tau)  # type: ignore[no-any-return]


func = exponential
analyzed = analyze(file, func, bounds=(0, np.inf))
if analyzed.tdf.attrs["params"] is not None:
    for key, value in zip(
        list(func.__annotations__)[1:-1], analyzed.tdf.attrs["params"]
    ):
        print(f"{key} = {value:.4g}")
    print(f"r2 = {analyzed.tdf.attrs['r2']:.4f}")
else:
    print("Cannot fit curve to the data")
fig = plot(analyzed)
fig.show()

In [None]:
# Dump the results as CSV files : 解析結果をCSVファイルとして出力
if dump_csv:
    filepath = pathlib.Path(file)
    analyzed.wdf.to_csv(
        os.path.join(
            outputdir,
            "h_" + filepath.stem + ".csv",
        ),
        index=False,
        encoding="utf-8",
    )
    FWHM_range = analyzed.tdf.attrs["wavelength_range"]
    analyzed.tdf.to_csv(
        os.path.join(
            outputdir,
            "v({left:d}-{right:d})_".format(
                left=round(FWHM_range[0]),
                right=round(FWHM_range[1]),
            )
            + filepath.stem
            + ".csv",
        ),
        index=False,
        encoding="utf-8",
    )