# Hardpoint Breakaway Tests - Recent History

This notebook evaluates the hardpoint breakaway tests performed on a given `day_obs`.  
The most important values to evaluate per hardpoint are:
  
  1. The measured forces when the breakaway happens in the positive direction (compression).
  2. The measured forces when the breakawat happens in the negative direction (tension).
  3. The stiffness of the breakaway mechanism.
     This is defined as the linear coefficient of the polynomial that fits the
     displacement versus the measured forces. 

In [None]:
# Times Square Parameters
day_obs = 20250804  # YYYYMMDD

In [None]:
import asyncio
import ipywidgets as W
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

from astropy.time import Time
from typing import Optional
from io import BytesIO
from IPython.display import display, clear_output
from typing import List

from lsst_efd_client import EfdClient
from lsst.ts.xml.enums.MTM1M3 import HardpointTest
from lsst.summit.utils.efdUtils import (
    getEfdData,
    getDayObsEndTime,
    getDayObsStartTime,
    makeEfdClient,
)

In [None]:
# --- Global Variables ---
# HP BreakAway Limits from https://sitcomtn-082.lsst.io/
COMPRESSION_LOWER_LIMIT = 2981  # N
COMPRESSION_UPPER_LIMIT = 3959  # N
TENSION_LOWER_LIMIT = -4420  # N
TENSION_UPPER_LIMIT = -3456  # N

# Used to select the window size near zero measured forces.
DISPLACEMENT_CROP_RANGE = 300  # um
DISPLACEMENT_CROP_RANGE_FOR_FIT = 100  # um

# Used to determine if a test was valid or not
MEASURED_FORCE_MAXIMUM_TOLERANCE = 1000  # N

# Meter to micrometer
METER_TO_MICROMETER = 1e6  # um/m

# Maximum spec stiffness
SPEC_STIFFNESS = 100  # N/um

# Status Plotting
STATUS_COLORS = {
    HardpointTest.NOTTESTED: "lightgrey",
    HardpointTest.MOVINGNEGATIVE: "lightsteelblue",
    HardpointTest.TESTINGPOSITIVE: "forestgreen",
    HardpointTest.TESTINGNEGATIVE: "royalblue",
    HardpointTest.MOVINGREFERENCE: "darkseagreen",
    HardpointTest.PASSED: "dimgrey",
    HardpointTest.FAILED: "red",
}

# We use this a lot, so let's make it shorter
TEST_POS_COLOR = STATUS_COLORS[HardpointTest.TESTINGPOSITIVE]
TEST_NEG_COLOR = STATUS_COLORS[HardpointTest.TESTINGNEGATIVE]

# Number of hardpoints
n_hardpoints = 6

# Gap size to split different test runs
gap_size = "3min"

# Create an EFD client instance
efd_client = makeEfdClient()

# Get the start and end times
start_time = getDayObsStartTime(day_obs)
end_time = getDayObsEndTime(day_obs)

In [None]:
# Query the CSC level command that we can use to define the start of a
# breakaway test for a given strut.
test_command_df = await efd_client.select_time_series(
    topic_name="lsst.sal.MTM1M3.command_testHardpoint",
    fields=["hardpointActuator"],
    start=start_time,
    end=end_time
)

In [None]:
# Query the status of the breakaway tests as they evolve.
test_status_df = await efd_client.select_time_series(
    topic_name="lsst.sal.MTM1M3.logevent_hardpointTestStatus",
    fields=[f"testState{i}" for i in range(n_hardpoints)],
    start=start_time,
    end=end_time
)

In [None]:
def ensure_utc(df: pd.DataFrame) -> pd.DataFrame:
    """
    Problems with timezones are common when querying data from the EFD.
    This function ensures that we localize the timestamps.
    """
    if df.index.tz is None:
        return df.tz_localize("UTC")
    else:
        return df.tz_convert("UTC")


# Ensure that all the dataframe indexes are timestamps with the timezone 
# set to UTC
test_command_df = ensure_utc(test_command_df)
test_status_df = ensure_utc(test_status_df)

In [None]:
def group_by_gaps(gap: str, df: pd.DataFrame) -> pd.DataFrame:
    """
    It is uncommon to have more than one execution of hardpoint breakaway
    tests in a single day. But hey happen. This function defines group labels
    that we can use to distinguish between each run of this test.
    """
    time_gap = pd.Timedelta(gap)
    group_id = df.index.to_series().diff().gt(time_gap).cumsum()
    return df.assign(group_id=group_id)


# There are a few days where we have more than one execution of the breakaway
# tests in a single day. The function below adds a new column to the dataframe
# to help separating each test execution.
test_command_df = group_by_gaps(gap_size, test_command_df)

In [None]:
def find_end_of_test(
    df: pd.DataFrame,
    start_time: pd.Timestamp,
    end_reasons: List[int] | None = None,
    end_buffer: str = "10s",
) -> Optional[pd.Timestamp]:
    """
    Find the end of a test run by looking for the first row where all columns
    have the same value and that value is in end_reasons.
    """
    if end_reasons is None:
        end_reasons = [
            HardpointTest.NOTTESTED,
            HardpointTest.PASSED,
            HardpointTest.FAILED,
        ]

    # Ensure we don't get a fake NOTTESTED in the beginning of the test
    df = df.loc[df.index >= start_time + pd.Timedelta("5s")]

    for idx, row in df.iterrows():
        unique_values = set(row.values)
        if len(unique_values) == 1 and list(unique_values)[0] in end_reasons:
            return idx + pd.Timedelta(end_buffer)

    return None


def build_labels(cmds: pd.DataFrame, sts: pd.DataFrame) -> pd.DataFrame:
    """
    Once our dataframe has a new column called "group_id", we can use it to
    group our tests by this identifier.
    """
    summary = (
        cmds.reset_index()
        .groupby("group_id")
        .agg(
            reference_time=("index", lambda s: s.dt.floor("min").min()),
            n=("group_id", "size"),
            uniq_hp=("hardpointActuator", lambda s: sorted(s.unique())),
        )
        .reset_index()
    )

    summary["start_time"] = summary["reference_time"].apply(
        lambda x: sts.index[sts.index.searchsorted(x, side="left")]
    )

    summary["end_time"] = summary["reference_time"].apply(
        lambda x: find_end_of_test(sts, x)
    )

    return summary


# Create a new dataframe containing the test executions group ids,
# the reference times, the number of hardpoints exercised, the start time,
# and the end time.
summary_df = build_labels(test_command_df, test_status_df)

print("HP Breakaway Tests - Execution groups")
summary_df

In [None]:
# There is a lot of work to create the plots and add a dropdown menu to select 
# which test execution we want to display.
def get_status_segments(status_s: pd.Series, t0: pd.Timestamp, t1: pd.Timestamp):
    """Return list of (start, end, state) where state is constant in [t0,t1]."""
    s = status_s.sort_index()
    s = s[(s.index <= t1) & (s.index >= (t0 - pd.Timedelta("1s")))]

    if s.empty:
        return []
    if s.index[0] > t0:
        s = pd.concat([pd.Series([s.iloc[0]], index=[t0]), s])
    if s.index[-1] < t1:
        s.loc[t1] = s.iloc[-1]

    s = s.sort_index()
    changed = s.ne(s.shift()).to_numpy()
    segs = []
    idxs = [i for i, c in enumerate(changed) if c]
    idxs.append(len(s) - 1)

    for i in range(len(idxs) - 1):
        a = s.index[idxs[i]]
        b = s.index[idxs[i + 1]]
        state = int(s.iloc[idxs[i]])
        beg = max(pd.Timestamp(a), t0)
        end = min(pd.Timestamp(b), t1)
        if end > beg:
            segs.append((beg, end, state))

    return segs
    

def plot_status_timeline(
    ax, sub_status_df: pd.DataFrame, t0: pd.Timestamp, t1: pd.Timestamp
):
    """Plot colored boxes per strut vs time for states 1..7 in [t0,t1]."""
    ax.clear()
    ax.set_xlim(t0, t1)
    ax.set_ylim(0.5, 6.5)
    ax.set_yticks([1, 2, 3, 4, 5, 6])
    ax.set_yticklabels([f"HP{i}" for i in range(1, 7)])
    ax.set_xlabel("Time (UTC)")
    ax.set_ylabel("Strut")
    ax.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M"))

    for i in range(6):
        col = f"testState{i}"
        if col not in sub_status_df.columns:
            continue
        segs = get_status_segments(sub_status_df[col], t0, t1)
        y = i + 1
        for a, b, state in segs:
            enum_state = HardpointTest(state)
            ax.broken_barh(
                [(mdates.date2num(a), mdates.date2num(b) - mdates.date2num(a))],
                (y - 0.4, 0.8),
                facecolors=STATUS_COLORS.get(enum_state, "#bdbdbd"),
                edgecolors="none",
            )

    handles = [
        plt.Line2D(
            [0], [0], lw=8, color=STATUS_COLORS[st], label=f"{st.value}-{st.name}"
        )
        for st in HardpointTest
    ]
    ax.legend(
        handles=handles, bbox_to_anchor=(1.02, 1), loc="upper left", borderaxespad=0.0
    )
    ax.grid(True, linestyle=":", alpha=0.3)
    

def make_hardpoint_group_selector(
    commands: pd.DataFrame,
    statuses: pd.DataFrame,
    summary: pd.DataFrame,
):
    """Select a group, simulate query, and plot status timeline boxes."""
    def _label(row: pd.Series) -> str:
        s = row["reference_time"].strftime("%Y-%m-%d %H:%M UTC")
        return f"Test run {int(row['group_id'])} | Started near {s} | {row['n']} cmds"

    options = [(_label(r), int(r["group_id"])) for _, r in summary.iterrows()]
    dd = W.Dropdown(description="Select test run:", options=options,
                    value=options[0][1] if options else None,
                    layout=W.Layout(width="650px"))

    out = W.Output()

    def show_group(gid: int):
        with out:
            
            # Clear the output and print a nice message for impacient people
            clear_output()
            print(f"Querying and processing data for option:\n  {options[gid][0]}")
            row = summary.loc[summary["group_id"] == gid].iloc[0]
            t0, t1 = row["start_time"], row["end_time"]
            
            sub_status = test_status_df.loc[t0:t1]
            fig, ax = plt.subplots(1, 1, figsize=(12, 4), constrained_layout=True)
            plot_status_timeline(ax, sub_status, t0, t1)
                        
            # display the figure using html
            display(W.HTML(f"<b>Group window:</b> {t0} → {t1} (UTC)"))
            display(fig)
            plt.close(fig)  # Ensures the plots are shown only once

    #         # telemetry = query_hardpoints_telemetry(client, row)

    #         # for hp in row["uniq_hp"]:
                
    #         #     status_df = create_status_table(sub_status, hp-1)
    #         #     pos_tel = slice_telemetry(telemetry, status_df, HardpointTest.TESTINGPOSITIVE)
    #         #     neg_tel = slice_telemetry(telemetry, status_df, HardpointTest.TESTINGNEGATIVE)

    #         #     fig, (ax_t, ax_s) = plt.subplots(1, 2, figsize=(12, 4), constrained_layout=True)
    #         #     fig.suptitle(f"Hardpoint {hp} telemetry")
                
    #         #     plot_forces_timeline(ax_t, telemetry, hp-1, test_positive=pos_tel, test_negative=neg_tel)
    #         #     measured_stiffness = plot_stiffness(ax_s, pos_tel, neg_tel, hp-1)
                
    #         #     display(W.HTML(f"""
    #         #         <b>Hardpoint {hp}: </b><br>
    #         #         Positive (compression) breakaway happened at
    #         #         <span style="color: {TEST_POS_COLOR}"><b>
    #         #           {pos_tel[f"mean_measured_force_{hp-1}"].max():.0f} N
    #         #         </b></span><br>
    #         #         Negative (tension) breakaway happened at
    #         #         <span style="color: {TEST_NEG_COLOR}"><b>
    #         #           {neg_tel[f"mean_measured_force_{hp-1}"].min():.0f} N
    #         #         </b></span><br>
    #         #         Stiffness measured in positive (compression): <b>{measured_stiffness[0]:.0f} N/um</b> <br>
    #         #         Stiffness measured in negative (tension): <b>{measured_stiffness[1]:.0f} N/um</b>
    #         #     """))
                
    #         #     display(fig)
    #         #     plt.close(fig)

    def _on_change(change):
        if change["name"] == "value" and change["new"] is not None:
            show_group(change["new"])

    dd.observe(_on_change)
    display(dd, out)
    if dd.value is not None:
        show_group(dd.value)

    return dd, out

In [None]:
dd, out = make_hardpoint_group_selector(test_command_df, test_status_df, summary_df)