In [None]:
import os
import pickle
from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from scipy.signal import find_peaks

In [None]:
with open("S10_af_time_series.pkl", "rb") as f:
    data = pickle.load(f)

afs = data["data"]
metas = pd.read_excel("S11_af_meta.xlsx")
weld_end = pd.read_excel("S12_1_af_weld_end.xlsx")

In [None]:
OUT_DIR = "S12_2"
os.makedirs(OUT_DIR, exist_ok=True)

In [None]:
metas

In [None]:
weld_end

In [None]:
afs

## Prototype

In [None]:
sample_no = 1
idx_start_chosen = None
idx_end_chosen = None
threshold = 0.1
peak_condition = dict(height=(1, None), threshold=(threshold, None))
zoom_level=1

# Extract relevant data
dwell_time = metas.loc[metas["sample_no"] == sample_no, "D"].values[0]
# sampling_rate = metas.loc[
#     metas["sample_no"] == sample_no, "Sampling rate [Hz]"
# ].values[0]
_weld_speed = metas.loc[metas["sample_no"] == sample_no, "W"].values[0]  # mm/min
weld_speed = _weld_speed / 60  # Convert to mm/s
af = afs.loc[afs["sample_no"] == sample_no].reset_index(drop=True)
af.set_index("Time", inplace=True)
weld_time_end = weld_end.loc[
    weld_end["sample_no"] == sample_no, "weld_time_end"
].values[0]

# Weld time is calculated from the speed and length of sample which is 130 mm
weld_time = 130 / weld_speed

# Estimate dwell time checkpoints
dwell_time_end_approx = weld_time_end - weld_time

# Find peaks in axial force data
peak_idxes, props = find_peaks(af["Fz"], **peak_condition)

# Choose dwell time end based on peak indexes
if idx_end_chosen is None:  # Auto select based on dwell time
    dwell_time_end_idx = af.index.get_indexer(
        [dwell_time_end_approx], method="nearest"
    )[0]
else:
    if idx_end_chosen < 0:
        raise ValueError("idx_end_chosen must be non-negative integer.")
    else:
        dwell_time_end_idx = idx_end_chosen
dwell_time_end = af.index[dwell_time_end_idx]

# Choose dwell time start
if idx_start_chosen is None:  # Auto select based on dwell time
    dwell_time_start = dwell_time_end - dwell_time
    dwell_time_start_idx = af.index.get_indexer(
        [dwell_time_start], method="nearest"
    )[0]
else:  # Manual select
    if idx_start_chosen < 0:
        raise ValueError("idx_start_chosen must be non-negative integer.")
    elif idx_start_chosen <= 10:  # Select from peak indexes
        dwell_time_start_idx = peak_idxes[idx_start_chosen]
        dwell_time_start = af.index[dwell_time_start_idx]
    else:  # Directly specify index
        dwell_time_start_idx = idx_start_chosen
        dwell_time_start = af.index[dwell_time_start_idx]

# Actual dwell time
dwell_time_actual = dwell_time_end - dwell_time_start

# Summary printout
print(
    f"Sample No: {sample_no}, D: {dwell_time}s, W: {_weld_speed} mm/min"
)
print(f"Weld Time: {weld_time}s")
print(f"time_start: {dwell_time_start}s, time_end: {dwell_time_end}s")
print(f"time_start_idx: {dwell_time_start_idx}, time_end_idx: {dwell_time_end_idx}")
print(f"Dwell Time (actual): {dwell_time_actual}s")

# Plotting
fig, axes = plt.subplots(3, 1, figsize=(12, 8))
axes[0].plot(af.index, af["Fz"], label="Axial Force (Fz)")
axes[0].plot(af.index[peak_idxes], af["Fz"].iloc[peak_idxes], "o", label="Peaks")
axes[0].axvline(
    x=dwell_time_start, color="red", linestyle="--", label="Dwell Start"
)
axes[0].axvline(x=dwell_time_end, color="green", linestyle="--", label="Dwell End")
axes[0].axvline(x=weld_time_end, color="purple", linestyle="--", label="Weld Stop")
axes[0].set_ylabel("Axial Force (kN)")

# Zoomed-in plot around dwell time
axes[1].plot(af.index, af["Fz"], label="Axial Force (Fz)")
axes[1].axvline(
    x=dwell_time_start, color="red", linestyle="--", label="Dwell Start"
)
xlim_start = [dwell_time_start - dwell_time * zoom_level, dwell_time_start + dwell_time * zoom_level]
axes[1].set_xlim(xlim_start)
axes[1].plot(af.index[peak_idxes], af["Fz"].iloc[peak_idxes], "o", label="Peaks")
for peak_idx in peak_idxes:
    axes[1].text(
        af.index[peak_idx],
        af["Fz"].iloc[peak_idx],
        str(peak_idx),
        color="red",
        clip_on=True,
    )
axes[1].set_ylabel("Axial Force (kN)")

# Zoomed-in plot around dwell time
axes[2].plot(af.index, af["Fz"], label="Axial Force (Fz)")
axes[2].axvline(x=dwell_time_end, color="green", linestyle="--", label="Dwell End")
xlim_end = [dwell_time_end - dwell_time * zoom_level, dwell_time_end + dwell_time * zoom_level]
axes[2].set_xlim(xlim_end)
axes[2].plot(af.index[peak_idxes], af["Fz"].iloc[peak_idxes], "o", label="Peaks")
for peak_idx in peak_idxes:
    axes[2].text(
        af.index[peak_idx],
        af["Fz"].iloc[peak_idx],
        str(peak_idx),
        color="red",
        clip_on=True,
    )
axes[2].set_ylabel("Axial Force (kN)")
axes[2].set_xlabel("Time (s)")


xlim_all = [dwell_time_start - dwell_time * 0.5, dwell_time_end + dwell_time * 0.5]
filt = (af.index[peak_idxes] > xlim_all[0]) & (af.index[peak_idxes] < xlim_all[1])
peak_idxes_zoomed = peak_idxes[filt]
print(peak_idxes_zoomed)

fig.suptitle(
    f"Axial Force, Sample No: {sample_no}, Dwell Time: {dwell_time:.1f}s / {dwell_time_actual:.1f}s, W: {_weld_speed} mm/min"
)
fig.tight_layout()



## Function

In [None]:
def peak_detection(
    sample_no, threshold, idx_start_chosen, idx_end_chosen, file_prefix="S12_2_peak", zoom_level=1
):
    # sample_no = 1
    # idx_start_chosen = None
    # idx_end_chosen = None
    # threshold = 0.1
    # zoom_level=1
    peak_condition = dict(height=(1, None), threshold=(threshold, None))

    # Extract relevant data
    dwell_time = metas.loc[metas["sample_no"] == sample_no, "D"].values[0]
    # sampling_rate = metas.loc[
    #     metas["sample_no"] == sample_no, "Sampling rate [Hz]"
    # ].values[0]
    _weld_speed = metas.loc[metas["sample_no"] == sample_no, "W"].values[0]  # mm/min
    weld_speed = _weld_speed / 60  # Convert to mm/s
    af = afs.loc[afs["sample_no"] == sample_no].reset_index(drop=True)
    af.set_index("Time", inplace=True)
    weld_time_end = weld_end.loc[
        weld_end["sample_no"] == sample_no, "weld_time_end"
    ].values[0]

    # Weld time is calculated from the speed and length of sample which is 130 mm
    weld_time = 130 / weld_speed

    # Estimate dwell time checkpoints
    dwell_time_end_approx = weld_time_end - weld_time

    # Find peaks in axial force data
    peak_idxes, props = find_peaks(af["Fz"], **peak_condition)

    # Choose dwell time end based on peak indexes
    if idx_end_chosen is None:  # Auto select based on dwell time
        dwell_time_end_idx = af.index.get_indexer(
            [dwell_time_end_approx], method="nearest"
        )[0]
    else:
        if idx_end_chosen < 0:
            raise ValueError("idx_end_chosen must be non-negative integer.")
        else:
            dwell_time_end_idx = idx_end_chosen
    dwell_time_end = af.index[dwell_time_end_idx]

    # Choose dwell time start
    if idx_start_chosen is None:  # Auto select based on dwell time
        dwell_time_start = dwell_time_end - dwell_time
        dwell_time_start_idx = af.index.get_indexer(
            [dwell_time_start], method="nearest"
        )[0]
    else:  # Manual select
        if idx_start_chosen < 0:
            raise ValueError("idx_start_chosen must be non-negative integer.")
        elif idx_start_chosen <= 10:  # Select from peak indexes
            dwell_time_start_idx = peak_idxes[idx_start_chosen]
            dwell_time_start = af.index[dwell_time_start_idx]
        else:  # Directly specify index
            dwell_time_start_idx = idx_start_chosen
            dwell_time_start = af.index[dwell_time_start_idx]

    # Actual dwell time
    dwell_time_actual = dwell_time_end - dwell_time_start

    # Plotting
    fig, axes = plt.subplots(3, 1, figsize=(12, 8))
    axes[0].plot(af.index, af["Fz"], label="Axial Force (Fz)")
    axes[0].plot(af.index[peak_idxes], af["Fz"].iloc[peak_idxes], "o", label="Peaks")
    axes[0].axvline(
        x=dwell_time_start, color="red", linestyle="--", label="Dwell Start"
    )
    axes[0].axvline(x=dwell_time_end, color="green", linestyle="--", label="Dwell End")
    axes[0].axvline(x=weld_time_end, color="purple", linestyle="--", label="Weld Stop")
    axes[0].set_ylabel("Axial Force (kN)")

    # Zoomed-in plot around dwell time
    axes[1].plot(af.index, af["Fz"], label="Axial Force (Fz)")
    axes[1].axvline(
        x=dwell_time_start, color="red", linestyle="--", label="Dwell Start"
    )
    xlim_start = [dwell_time_start - dwell_time * zoom_level, dwell_time_start + dwell_time * zoom_level]
    axes[1].set_xlim(xlim_start)
    axes[1].plot(af.index[peak_idxes], af["Fz"].iloc[peak_idxes], "o", label="Peaks")
    for peak_idx in peak_idxes:
        axes[1].text(
            af.index[peak_idx],
            af["Fz"].iloc[peak_idx],
            str(peak_idx),
            color="red",
            clip_on=True,
        )
    axes[1].set_ylabel("Axial Force (kN)")

    # Zoomed-in plot around dwell time
    axes[2].plot(af.index, af["Fz"], label="Axial Force (Fz)")
    axes[2].axvline(x=dwell_time_end, color="green", linestyle="--", label="Dwell End")
    xlim_end = [dwell_time_end - dwell_time * zoom_level, dwell_time_end + dwell_time * zoom_level]
    axes[2].set_xlim(xlim_end)
    axes[2].plot(af.index[peak_idxes], af["Fz"].iloc[peak_idxes], "o", label="Peaks")
    for peak_idx in peak_idxes:
        axes[2].text(
            af.index[peak_idx],
            af["Fz"].iloc[peak_idx],
            str(peak_idx),
            color="red",
            clip_on=True,
        )
    axes[2].set_ylabel("Axial Force (kN)")
    axes[2].set_xlabel("Time (s)")


    xlim_all = [dwell_time_start - dwell_time * 0.5, dwell_time_end + dwell_time * 0.5]
    filt = (af.index[peak_idxes] > xlim_all[0]) & (af.index[peak_idxes] < xlim_all[1])
    peak_idxes_zoomed = peak_idxes[filt]
    print(peak_idxes_zoomed)

    fig.suptitle(
        f"Axial Force, Sample No: {sample_no}, Dwell Time: {dwell_time:.1f}s / {dwell_time_actual:.1f}s, W: {_weld_speed} mm/min"
    )
    fig.tight_layout()
    fig.savefig(f"{OUT_DIR}/{file_prefix}_sample_{sample_no}.png", dpi=300, bbox_inches="tight")
    plt.close(fig)

    return dict(
        dwell_time_start=dwell_time_start,
        dwell_time_end=dwell_time_end,
        dwell_time_start_idx=dwell_time_start_idx,
        dwell_time_end_idx=dwell_time_end_idx,
        dwell_time_actual=dwell_time_actual,
    )

In [None]:
default_idx_start_chosen = None
default_idx_end_chosen = None
default_threshold = 0.1
default_zoom_level = 1

_params = []
for sample_no in metas["sample_no"].unique():
    _params.append(
        dict(
            sample_no=sample_no,
            idx_start_chosen=default_idx_start_chosen,
            idx_end_chosen=default_idx_end_chosen,
            threshold=default_threshold,
            zoom_level=default_zoom_level,
        )
    )
params = pd.DataFrame.from_dict(_params)
display(params)

## Override parameters

In [None]:
# Override parameters for specific samples if needed
# Example: dict(sample_no=1, idx_start_chosen=8, idx_end_chosen=None, threshold=0.1)

override = [
    # dict(sample_no=4, idx_end_chosen=2624),
]

for ov in override:
    for key, value in ov.items():
        if key != "sample_no":
            params.loc[params["sample_no"] == ov["sample_no"], key] = value

display(params)

In [None]:
reses = []
for param in params.to_dict(orient="records")[:]:
    print(param)
    res = peak_detection(**param)
    res = {**param, **res}
    reses.append(res)
    print("----------")

dfpeak = pd.DataFrame.from_dict(reses)

In [None]:
dfpeak.to_excel("S12_2_af_peak.xlsx", index=False)