In [None]:
from astropy.time import Time
from lsst.summit.utils.efdUtils import makeEfdClient, getEfdData
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors

from collections import defaultdict
from datetime import datetime, timedelta
import numpy as np
import pandas as pd

try:
    from lsst.ts.xml.tables.m1m3 import FATable
except ImportError:
    from lsst.ts.criopy.M1M3FATable import FATABLE as FATable

%matplotlib inline
#%load_ext lab_black

# M1M3 Bump Test Log Error Analysis and Measured Forces
---

## Overview
This notebook is dedicated to the analysis of bump test log error data, which is stored under the `lsst.sal.MTM1M3.logevent_logMessage` EFD topic. Our primary focus is on messages that include the "*Failed FA*" string, as this string is generated each time a bump test fails. 

The returned messages typically contain a string similar to this: "*measured force plus (215.394) is too far 222±5*". Our goal is to extract the measured forces (enclosed in parentheses) and analyze the severity of actuator failures by comparing these measured forces with the expected forces.

## Querying EFD

### Helper Function

In [None]:
async def query_bump_logs_in_chunks(
    start_date, end_date, client_name="", chunk_size_days=3
):
    """
    Queries the log messages related to bump tests from the EFD in chunks.

    Args:
        start_date (str): Start date of the query in ISO format (YYYY-MM-DD).
        
        end_date (str): End date of the query in ISO format (YYYY-MM-DD).
        
        client_name (str, optional): Name of the EFD client. Defaults to "".
        
        chunk_size_days (int, optional): Number of days per chunk. Defaults to 3.

    Returns:
        pandas.DataFrame: Concatenated DataFrame containing the queried log messages.
    """

    # Conditionally create the client based on client_name
    if client_name == "summit_efd":
        client = makeEfdClient("summit_efd")
    elif client_name == "usdf_efd":
        client = makeEfdClient("usdf_efd")
    elif client_name == "idf_efd":
        client = makeEfdClient("idf_efd")
    else:
        client = makeEfdClient()  # Default client

    # Convert start and end dates to datetime objects
    start = datetime.fromisoformat(start_date)
    end = datetime.fromisoformat(end_date)

    # Initialize an empty DataFrame to store concatenated results
    all_data = pd.DataFrame()

    current_start = start
    while current_start < end:
        current_end = min(current_start + timedelta(days=chunk_size_days), end)
        try:
            # Query the data for the current chunk
            chunk_data = await client.select_time_series(
                topic_name="lsst.sal.MTM1M3.logevent_logMessage",
                fields=["message"],
                start=Time(current_start.isoformat(), format="isot", scale="utc"),
                end=Time(current_end.isoformat(), format="isot", scale="utc"),
            )
            # Concatenate the chunk data to the main DataFrame
            all_data = pd.concat([all_data, chunk_data], ignore_index=False)
        except Exception as e:
            print(
                f"Error querying data from {current_start.isoformat()} to {current_end.isoformat()}: {e}"
            )
            continue  # Optionally, continue to the next chunk

        # Move to the next chunk
        current_start = current_end

    return all_data


# Example usage:
# begin = "2023-11-13T01:00"
# end = "2023-12-21T01:00"
# bump_logs = await query_bump_logs_in_chunks(begin, end, client_name='')

### Making Client and Checkig schema

In [None]:
# Schema may not work depending on the client (e.g. it does not work on `idf`) and how you execute your notebook (e.g. via vscode ssh connection)
client = makeEfdClient()
schema = await client.get_schema("lsst.sal.MTM1M3.logevent_logMessage")
schema

### Getting EFD data

Here we just run helpper function defined above to get the log messages from the EFD. This can take a little while depending on query size (start and end dates) and EFD load.

In [None]:
# Note that date between 09-12 and 12-12 are returning error when querying.
# For this reason, I'm avoid to query those dates.
begin = "2023-11-01T01:00"
end = "2023-12-09T01:00"
bump_logs_1 = await query_bump_logs_in_chunks(
    begin, end, client_name="", chunk_size_days=3
)

begin = "2023-12-12T01:00"
end = "2024-01-15T23:59"
bump_logs_2 = await query_bump_logs_in_chunks(
    begin, end, client_name="", chunk_size_days=3
)

bump_logs = pd.concat([bump_logs_1, bump_logs_2], ignore_index=False)
bump_logs

In [None]:
# Place holder code to render the full error mesage, since notebook just give an abbreviated view of the dataframe.
counter = 0
for m in bump_logs[bump_logs.message.str.contains("Failed FA")].message:
    print(m)
    counter += 1
print(f"{counter} failed bump tests")

# Results
---

## Processing Bump Log

Here we will process the log messages to extract the measured forces and return a dataframe with the measured forces and the expected forces.

### Helper Functions

In [None]:
# Function to process the bump log
def process_bump_logs(bump_logs, expected_force_range=222, tolerance=5):
    """
    Processes bump log messages to extract relevant information and calculate deviations
    from expected forces.

    Parameters:
    bump_logs (DataFrame): DataFrame containing bump log messages.

    Returns:
    DataFrame: A processed DataFrame with extracted and calculated data.
    """
    # Filter the bump logs
    filtered_bump_log = bump_logs[bump_logs.message.str.contains("Failed FA")].message
    df_filtered_bump_log = pd.DataFrame(filtered_bump_log)

    # Process the bump log messages
    # Extract FA ID, Orientation, Index, and Error Message
    df_filtered_bump_log["ID"] = df_filtered_bump_log["message"].str.extract(
        r"FA ID (\d+)"
    )
    orientation_index = df_filtered_bump_log["message"].str.extract(r"\(([XYZ])(\d+)\)")
    df_filtered_bump_log["Orientation"] = orientation_index[0]
    df_filtered_bump_log["Index"] = orientation_index[1]
    df_filtered_bump_log["Error Message"] = df_filtered_bump_log["message"].str.extract(
        r"- (.+)$"
    )

    # Create a new DataFrame with the required columns
    new_df = df_filtered_bump_log.reset_index().rename(columns={"index": "Time"})[
        ["Time", "ID", "Orientation", "Index", "Error Message"]
    ]

    # Calculate deviations and classify error messages
    # Extract Measured Force
    new_df["MeasuredForce"] = (
        new_df["Error Message"].str.extract(r"\(([\-\d.]+)\)").astype(float)
    )

    # Filter out entries with zero force applied
    new_df = new_df[new_df["MeasuredForce"] != 0]

    # Classify applied force direction and calculate deviation
    new_df["AppliedForceDirection"] = new_df["Error Message"].apply(
        lambda x: "Positive" if "measured force plus" in x else "Negative"
    )

    # Initialize deviation columns
    new_df["Deviation"] = 0

    # Upper and lower limits are equal to the expected force range (+/- tolerance is optional if needed)
    upper_limit = expected_force_range  # + tolerance
    lower_limit = -expected_force_range  # - tolerance

    # Ensure Deviation column is float
    new_df["Deviation"] = new_df["Deviation"].astype(float)

    # Calculate deviation based on the direction of the applied force direction
    for index, row in new_df.iterrows():
        if row["AppliedForceDirection"] == "Positive":
            new_df.at[index, "Deviation"] = float(row["MeasuredForce"]) - upper_limit
        elif row["AppliedForceDirection"] == "Negative":
            new_df.at[index, "Deviation"] = float(row["MeasuredForce"]) - lower_limit

    return new_df


In [None]:
# Function to create histogram and distribution of the failures
def plot_failure_histogram(df, ax):
    """
    Function to plot a histogram of the deviations from the expected force range.
    
    Parameters:
    ------------
    df (DataFrame): DataFrame containing the processed bump log data.
    
    ax (matplotlib.axes.Axes): Axes object to plot the histogram.
    """

    # Define color scheme for orientations
    orientation_colors = {"X": "blue", "Y": "red", "Z": "green"}

    # Convert 'Time' column to datetime and extract the date part
    df["Time"] = pd.to_datetime(df["Time"]).dt.date

    # Aggregate the total number of failures per FA ID and Orientation
    total_failures_per_faid = df.groupby("ID").size()
    grouped_data = df.groupby(["ID", "Orientation"]).size().unstack(fill_value=0)
    sorted_grouped_data = grouped_data.reindex(
        total_failures_per_faid.sort_values(ascending=False).index
    )

    # Colors for present orientations
    bar_colors = {
        ori: orientation_colors[ori]
        for ori in grouped_data.columns
        if ori in orientation_colors
    }

    # Plotting with specified colors
    sorted_grouped_data.plot(
        kind="bar",
        stacked=True,
        ax=ax,
        alpha=0.8,
        color=[bar_colors.get(col) for col in grouped_data.columns],
    )

    # Set title and labels
    ax.set_title("Failures by FA ID and Orientation")
    ax.set_xlabel("FA ID")
    ax.set_ylabel("Number of Failures")

    # Generate the legend automatically
    ax.legend(title="Orientation")

    return ax


# Getting primary {id: index} dictionary
m1m3_actuator_id_index_table: dict[int, int] = dict(
    [(fa.actuator_id, fa.index) for fa in FATable]
)


def get_m1m3_actuator_ids() -> list[int]:
    """Get a list of the M1M3 actuator ids.

    Returns
    -------
    `list`[ `int` ]
        List of M1M3 actuator ids.
    """
    return list(m1m3_actuator_id_index_table.keys())


def get_xy_position(actuator_list=FATable):
    # Collect all x and y positions using list comprehensions
    xpos = [actuator.x_position for actuator in actuator_list]
    ypos = [actuator.y_position for actuator in actuator_list]

    return xpos, ypos


def ActuatorsLayout(ax, df, actuator_list=FATable):
    """
    Function to plot the layout of M1M3 actuators and highlight failed actuators.
    
    Parameters:
    ax (matplotlib.axes.Axes): Axes object to plot the actuators.
    df (DataFrame): DataFrame containing failure data including 'ID' and 'Orientation'.
    actuator_list (list): List of actuator objects, default is FATable.
    """
    orientation_colors = {"X": "blue", "Y": "red", "Z": "green"}
    combined_orientations_colors = {"XZ": "orange", "YZ": "black"}

    ax.set_xlabel("X position (m)")
    ax.set_ylabel("Y position (m)")
    ax.set_title("Failures Distribution", fontsize=12)

    ids = get_m1m3_actuator_ids()
    xpos, ypos = get_xy_position(actuator_list)

    # Create a defaultdict to store orientations for each failed actuator
    actuator_orientations = defaultdict(set)
    for actuator_id, orientation in zip(df["ID"], df["Orientation"]):
        actuator_orientations[int(actuator_id)].add(orientation)

    # Determine all unique individual and combined orientations
    unique_orientations = set()
    combined_orientations = set()
    for orientations in actuator_orientations.values():
        if len(orientations) == 1:
            unique_orientations.update(orientations)
        else:
            combined_label = "".join(sorted(orientations))
            combined_orientations.add(combined_label)

    # Plot all actuators
    ax.plot(xpos, ypos, "o", ms=14, color="blue", alpha=0.05, mec="red")

    # Annotate actuator IDs
    for l, x, y in zip(ids, xpos, ypos):
        ax.annotate(
            l,
            (x, y),
            textcoords="offset points",
            xytext=(-5.5, -2),
            color="blue",
            size="xx-small",
        )

    # Highlight failed actuators with different colors based on orientation
    for actuator_id, orientations in actuator_orientations.items():
        if actuator_id in m1m3_actuator_id_index_table:
            index = m1m3_actuator_id_index_table[actuator_id]
            if len(orientations) > 1:
                combined_orientation = "".join(sorted(orientations))
                color = combined_orientations_colors[combined_orientation]
            else:
                orientation = orientations.pop()
                color = orientation_colors[orientation]
            ax.scatter(
                xpos[index],
                ypos[index],
                marker="o",
                facecolors="none",
                edgecolors=color,
                s=250,
                alpha=0.5,
                linewidths=2,
            )

    # Additional plotting to show hardpoints position
    Rhp = 3.1  # Radius in meters
    for i in range(6):
        theta = 2.0 * np.pi / 6.0 * float(i)
        ax.scatter(
            Rhp * np.cos(theta),
            Rhp * np.sin(theta),
            marker="*",
            color="green",
            s=30,
            alpha=0.3,
        )

    # Add a legend for orientations, including both individual and combined orientations
    for orientation in unique_orientations:
        ax.scatter(
            [],
            [],
            marker="o",
            linestyle="None",
            s=10,
            facecolor="none",
            edgecolor=orientation_colors[orientation],
            alpha=0.9,
            label=f"Orientation {orientation}",
        )
    for combined_orientation in combined_orientations:
        ax.scatter(
            [],
            [],
            marker="o",
            linestyle="None",
            s=10,
            facecolor="none",
            edgecolor=combined_orientations_colors[combined_orientation],
            alpha=0.5,
            label=f"Orientation {combined_orientation}",
        )
    ax.legend(loc="upper left", bbox_to_anchor=(1, 1))

    return

## Histogram + Spatial distribution

Here, we first process the log messages to extract the measured forces and return a dataframe with the measured forces, direction of the force measured, deviations, etc. Then, we use the helper function defined on sections above to create a bar plot of the failures. In order to see the spatial distribution of the actuators that failed the bump test, we also plot the spatial distribution of the failed actuators indicating the direction of the failed actuators.

In [None]:
# Processing the bump logs
processed_bump_logs = process_bump_logs(bump_logs)
df = processed_bump_logs.copy()

df.tail()

In [None]:
# Create the figure and axes
%matplotlib inline
fig, [ax0, ax1] = plt.subplots(1, 2, figsize=(13, 5))

# Histogram aspect should be automatic
ax0.set_aspect("auto", adjustable="box")

# Layout figure aspect needs to be rectangular
ax1.set_aspect("equal", adjustable="box")

# Call the function to plot the actuators and flag the failed ones
plot_failure_histogram(df, ax0)

# Call the function to plot the actuators and flag the failed ones
ActuatorsLayout(ax1, df, actuator_list=FATable)

# Find the first and last date in the entire dataset
first_date = df["Time"].min()
last_date = df["Time"].max()

plt.suptitle(
    f"Starting Date: {first_date.strftime('%Y-%m-%d')}   Final Date: {last_date.strftime('%Y-%m-%d')}"
)

plt.tight_layout()

# Placeholder to save figure
# plt.savefig("histogram+distribution_of_failures.png", dpi=300)

## Measured Forces vs Expected Forces

We can also use the processed dataframe to plot the measured forces and the deviations vs the expected forces. Below, we plot the absolute measured forces for each actuator, either for forces applied in the positive or negative direction. The plot includes an color bar, which may help to identify how old the failures are and if the measured forces get worse over time (which we found is not particularly true).

In [None]:
def plot_absolute_measured_forces(df):
    """
    Function to plot the absolute measured forces by FA ID either to forces applied in the positive or negative direction.
    
    Parameters:
    df (DataFrame): DataFrame containing the processed bump log data.
    """

    # Expected force range and tolerance
    expected_force_range = 222
    tolerance = 5

    # Convert MeasuredForce to absolute values
    df["AbsMeasuredForce"] = df["MeasuredForce"].abs()

    # Calculate the mean and standard deviation for y-axis limits
    mean_force = df["AbsMeasuredForce"].mean()
    std_dev_force = df["AbsMeasuredForce"].std()
    y_min = max(0, mean_force - 7 * std_dev_force)  # Ensuring y_min is not negative
    y_max = mean_force + 1 * std_dev_force

    # Calculate the number of failures per ID and sort
    failure_counts = df.groupby("ID", observed=True).size().sort_values(ascending=False)
    sorted_ids = failure_counts.index.tolist()

    # Sort the DataFrame based on the sorted IDs
    df["ID"] = pd.Categorical(df["ID"], categories=sorted_ids, ordered=True)
    df_sorted = df.sort_values("ID")

    # Create a plot
    fig, ax = plt.subplots(figsize=(12, 5))

    # Function to map ID to position for scatter plot
    def map_id_to_position(dev_df):
        return dev_df["ID"].cat.codes

    # Convert 'Time' to numerical format for color mapping
    df_sorted["Time_Num"] = mdates.date2num(df_sorted["Time"])

    # Scatter Plot for Absolute Measured Forces, separated by AppliedForceDirection with different markers
    # Positive Direction
    sc1 = ax.scatter(
        map_id_to_position(df_sorted[df_sorted["AppliedForceDirection"] == "Positive"]),
        df_sorted[df_sorted["AppliedForceDirection"] == "Positive"]["AbsMeasuredForce"],
        c=df_sorted[df_sorted["AppliedForceDirection"] == "Positive"]["Time_Num"],
        marker="o",
        label="Positive Side",
        alpha=0.9,
        cmap="viridis",
    )

    # Negative Direction
    sc2 = ax.scatter(
        map_id_to_position(df_sorted[df_sorted["AppliedForceDirection"] == "Negative"]),
        df_sorted[df_sorted["AppliedForceDirection"] == "Negative"]["AbsMeasuredForce"],
        c=df_sorted[df_sorted["AppliedForceDirection"] == "Negative"]["Time_Num"],
        marker="s",
        label="Negative Side",
        alpha=0.9,
        cmap="viridis",
    )

    # Adding horizontal lines and shaded areas for tolerances
    ax.axhline(
        expected_force_range,
        color="gray",
        linestyle="--",
        linewidth=0.9,
        label="+/- 222 N",
    )
    ax.fill_between(
        range(len(sorted_ids)),
        expected_force_range - tolerance,
        expected_force_range + tolerance,
        color="gray",
        alpha=0.3,
    )

    # Set x-ticks to be the categorical IDs and label them with the sorted IDs
    ax.set_xticks(range(len(sorted_ids)))
    ax.set_xticklabels(sorted_ids, rotation=45)

    # Adding a colorbar for dates
    cbar = plt.colorbar(sc1, ax=ax, pad=0.01)
    cbar.ax.yaxis.set_major_formatter(mdates.DateFormatter("%Y-%m-%d"))
    cbar.set_label("Measurement Date")

    # Set y-axis limits
    # ax.set_ylim(y_min, y_max)

    # Adding titles and labels
    ax.set_title("Absolute Measured Forces by FA ID")
    ax.set_xlabel("FA ID")
    ax.set_ylabel("Absolute Measured Force")

    # Adjusting the legend to include marker types only
    ax.legend(loc="best")

    plt.tight_layout()
    
    # Placeholder to save figure
    # plt.savefig("absolute_measured_forces_with_age_of_error.png", dpi=300)


In [None]:
# Running the plot
plot_absolute_measured_forces(df)

## Deviations from Expected Forces

Similar to the absolute measured forces, we can also plot the deviations from the expected forces for each actuator. It gives basically the same information as above, but plotting average deviations for negative and positve forces.  Note that actuators with a single failure don't have an error bar, as the deviation is zero.

THe graph below seems to reveal that forces appied in the positive direction are more likely to fail than forces applied in the negative direction. This is consistent with the fact that the actuators are more likely to fail when they are pushing the mirror up, rather than down (not sure if this is true). The magnitudes of the deviations tend also to be larger for forces applied in the positive direction.

In [None]:
def plot_avg_deviation_with_color_bar(df):
    """
    Creates a bar plot of the average deviation for each actuator ID,
    with color mapping based on the total number of failures.
    
    Parameters:
    df (DataFrame): DataFrame containing the processed bump log data.
    """
    # Calculating the average deviation and standard deviation for each Actuator ID, separated direction of applied forces
    avg_deviation = (
        df.groupby(["ID", "AppliedForceDirection"], observed=True)["Deviation"]
        .mean()
        .unstack()
    )
    std_deviation = (
        df.groupby(["ID", "AppliedForceDirection"], observed=True)["Deviation"]
        .std()
        .unstack()
    )
    count_deviation = (
        df.groupby(["ID", "AppliedForceDirection"], observed=True)["Deviation"]
        .size()
        .unstack()
    )

    # Normalize the count deviation for color mapping
    norm = mcolors.Normalize(vmin=1, vmax=count_deviation.max().max())
    cmap = plt.cm.viridis

    # Create the bar plot
    fig, ax = plt.subplots(figsize=(15, 6))

    # Define hatch patterns
    hatch_patterns = {
        "Positive": "",
        "Negative": "////",
    }  # Example: no hatch for positive, slashes for negative

    # Iterate over the direction applied forces and plot each set of bars with the appropriate hatch pattern
    for error_type, hatch in hatch_patterns.items():
        bar_colors = cmap(norm(count_deviation[error_type].values))
        avg_deviation[error_type].plot(
            kind="bar",
            yerr=std_deviation[error_type],
            ax=ax,
            color=bar_colors,
            capsize=4,
            hatch=hatch,  # Apply hatch pattern
            label=error_type,
            alpha=0.7,
        )

    # Create color bar with a pad of 0.01
    sm = plt.cm.ScalarMappable(cmap=cmap, norm=norm)
    sm.set_array([])
    cbar = plt.colorbar(sm, ax=ax, pad=0.01, alpha=0.7)
    cbar.set_label("Total Failures")

    # Setting the title and labels
    plt.title("Average Deviation with Dispersion by Actuator ID and Error Type")

    plt.xlabel("Actuator ID")
    plt.ylabel("Average Deviation")
    plt.legend(title="Applied Force Direction")
    plt.grid(axis="y")

    plt.tight_layout()
    
    # Placeholder to save figure
    #plt.savefig("average_deviation_with_dispersion_and_colorbar.png", dpi=300)
 

# Usage
plot_avg_deviation_with_color_bar(df)

## Applied Forces Over Time

For failed actuators, we now want to investigate how the applied forces have changed over time. We will plot the measured forces for each one of fiftieth (15th) actuators that failed the most. This will help us to identify if the measured forces have changed over time, and if the failures are getting worse.

From the plot below, we see that the measures forces don't follow any specific trend over time, but a somewhat random distribution. According to Petr, this is expected for actuators that failed the bump test, but a similar plot for actuators that passed the bump test would be able to identify actuators that are getting worse over time. However, right now (Feb 2024), the bump test controller does not publish the measured forces for actuators that passed the bump test, so we can't make this comparison. Petr have plans to change this in the future, so we can have a better understanding of the actuators health and catch possible failures before they actually happen.

In [None]:
def plot_absolute_measured_force_over_time(df, top_n=15):
    expected_force = 222
    tolerance = 5

    # Convert 'Time' to datetime if not already
    df["Time"] = pd.to_datetime(df["Time"])

    # Calculate days since the first date for the entire dataset
    first_date = df["Time"].min()
    last_date = df["Time"].max()
    total_days = (last_date - first_date).days
    df["DaysSinceFirst"] = (df["Time"] - first_date).dt.days

    # Use absolute values of measured force
    df["AbsMeasuredForce"] = df["MeasuredForce"].abs()

    # Calculate the average and standard deviation for setting y-axis limits
    std_dev_force = df["AbsMeasuredForce"].std()
    y_min = max(0, expected_force - 7 * std_dev_force)  # Ensuring y_min is not negative
    y_max = expected_force + 7 * std_dev_force

    # Find the top N actuators with the highest frequency of failures
    top_actuators = df["ID"].value_counts().head(top_n).index

    # Number of columns for subplots (5 rows)
    n_cols = (top_n + 4) // 5

    # Create a figure with subplots
    fig, axs = plt.subplots(
        5, n_cols, figsize=(5 * n_cols, 10), sharex=True, sharey=False
    )

    # Flatten the axs array for easy iteration
    axs = axs.flatten()

    # Create a full range for the fill_between based on the entire dataset
    full_range = range(total_days + 1)  # +1 to include the last day

    # Plot each actuator's absolute measured forces over time in a separate subplot
    for i, actuator_id in enumerate(top_actuators):
        ax = axs[i]
        actuator_data = df[df["ID"] == actuator_id]

        # Scatter plot for absolute measured forces differentiated by error type
        ax.scatter(
            actuator_data[actuator_data["AppliedForceDirection"] == "Positive"][
                "DaysSinceFirst"
            ],
            actuator_data[actuator_data["AppliedForceDirection"] == "Positive"][
                "AbsMeasuredForce"
            ],
            marker="o",
            color="blue",
            label="Positive" if i == 0 else "",
            alpha=0.6,
        )
        ax.scatter(
            actuator_data[actuator_data["AppliedForceDirection"] == "Negative"][
                "DaysSinceFirst"
            ],
            actuator_data[actuator_data["AppliedForceDirection"] == "Negative"][
                "AbsMeasuredForce"
            ],
            marker="s",
            color="red",
            label="Negative" if i == 0 else "",
            alpha=0.6,
        )

        # Shaded area for tolerance range
        ax.axhline(
            expected_force - tolerance, color="green", linestyle="--", linewidth=0.3
        )
        ax.axhline(
            expected_force + tolerance, color="green", linestyle="--", linewidth=0.3
        )
        ax.fill_between(
            full_range,
            expected_force - tolerance,
            expected_force + tolerance,
            color="green",
            alpha=0.3,
        )

        # Actuator ID as title inside the plot area
        ax.text(
            0.05,
            0.95,
            f"FA {actuator_id}",
            transform=ax.transAxes,
            verticalalignment="top",
        )

        # Set y-axis limits based on the calculated average and standard deviation
        # ax.set_ylim(-50, 300)

        # Set labels only for the outermost subplots
        if i // n_cols == 4:  # Last row
            ax.set_xlabel(f"Days Since {first_date.strftime('%Y-%m-%d')}")
        if i % n_cols == 0:  # First column
            ax.set_ylabel("Abs. Measured Force")

        # Legend only for the first subplot
        if i == 0:
            ax.legend(loc="lower right")

    plt.suptitle("Absolute Measured Force for Failed Actuators", fontsize=14)

    # Adjust layout to remove spaces between subpanels
    plt.tight_layout(w_pad=0)

    # Placeholder to save figure.
    # plt.savefig("absolute_measured_force_over_time.png", dpi=300)
    # ax.set_ylim(y_min, y_max)

In [None]:
# Example usage:
plot_absolute_measured_force_over_time(df, top_n=15)

# Conclusions
---

- The actuators are more likely to fail when they are pushing the mirror up, rather than down. The magnitudes of the deviations tend also to be larger for forces applied in the positive direction.
- The actuators that failed the bump test are not getting worse over time, as the measured forces don't follow any specific trend over time.
- Actuators that pass the bump also need to publish the measured forces, so we can have a better understanding of the actuators health and catch possible failures before they actually happen.

# A few small additions from Craig Lage
13-May-24

### This is the data stored in the EFD for each bump test.

In [None]:
df.iloc[34]

## The cell below plots the same data in a slightly different way

In [None]:
names = ['PosForce_NegDeviation', 'PosForce_PosDeviation', 'NegForce_NegDeviation', 'NegForce_PosDeviation']
orientations = ['Z', 'Y', 'X']
xplot = range(4)
yplot = np.zeros([3, 4])
for index in range(len(df)):
    for i, orientation in enumerate(orientations):
        if df.iloc[index]['Orientation'] != orientation:
            continue
        if df.iloc[index]['AppliedForceDirection'] == 'Positive':
            if df.iloc[index]['Deviation'] < 0.0:
                yplot[i, 0] += 1
            else:
                yplot[i, 1] += 1
        if df.iloc[index]['AppliedForceDirection'] == 'Negative':
            if df.iloc[index]['Deviation'] < 0.0:
                yplot[i, 2] += 1
            else:
                yplot[i, 3] += 1
print(f"Negative deviations = {(np.sum(yplot[0,:]) / np.sum(yplot) * 100.0):.2f} %")
print(f"Positive deviations = {(np.sum(yplot[1,:]) / np.sum(yplot) * 100.0):.2f} %")
fig, ax = plt.subplots(1,1,figsize=(10,10))
ax.set_position([0.1, 0.25, 0.8, 0.65])
ax.set_title("Bump test failure types, 2023-11-01 to 2024-01-15", fontsize=18)
ax.bar(xplot, yplot[0,:], color='blue', label='Z')
ax.bar(xplot, yplot[1,:], bottom = yplot[0,:], color='green', label='Y')
ax.bar(xplot, yplot[2,:], bottom = (yplot[0,:] + yplot[1,:]), color='red', label='X')
ax.set_xticks(xplot, rotation=90, labels=names)
ax.set_ylabel("Number of bump test failures", fontsize=16)
ax.legend()
plt.savefig("/home/c/cslage/u/MTM1M3/data/Bump_Test_Failures_12May24.pdf")

## The cell below makes plots of all of the failures.

In [None]:
from astropy.time import Time, TimeDelta
from lsst.sitcom.vandv.m1m3 import actuator_analysis as act
from matplotlib.backends.backend_pdf import PdfPages
%matplotlib inline
pdf = PdfPages("/home/c/cslage/u/MTM1M3/data/Bump_Test_Failures_01Apr24.pdf")
fig = plt.figure(figsize=(10, 10))
for index in range(len(df)):
    print(index)
    try:
        force_actuator_id = int(df.iloc[index]['ID'])
        
        bump_test_status = getEfdData(
            client=client,
            topic="lsst.sal.MTM1M3.logevent_forceActuatorBumpTestStatus",
            columns="*",
            begin=Time(df.iloc[index]['Time']) - TimeDelta(30.0, format='sec'),
            end=Time(df.iloc[index]['Time']) + TimeDelta(30.0, format='sec'),
    )
        [delay_primary, delay_secondary] = act.plot_actuator_delay(
        fig, client, force_actuator_id, bt_results=bump_test_status
    )
        axs = fig.get_axes()
        if len(axs) != 4:
            print(f" Number of axes = {len(axs)}")
            print("Axes not right?!")
            plt.clf()
            continue
        if df.iloc[index]['Orientation'] == 'Z':
            axs[0].text(8, 0, "Failed", fontsize=18, color='red')
        else:
            axs[2].text(8, 0, "Failed", fontsize=18, color='red')
        axs[1].remove()
        axs[3].remove()
        pdf.savefig(fig, bbox_inches='tight')  # saves the current figure into a pdf page
        print(f"Actuator {force_actuator_id} at {df.iloc[index]['Time']} plotted successfully")
        plt.clf()
    except:
        print(f"Actuator {force_actuator_id} at {df.iloc[index]['Time']} failed to plot")
        plt.clf()
        continue
pdf.close()

# Conclusions to Craig Lage additions
---
In terms of the statistics of the failures, we have:

actual force overshoot compared to the demanded force: about 23%

actual force undershoot compared to the demanded force: about 74%

excessive latency of the actual force compared to the demanded force: None seen

locked/constant force independent of demand: about 6%

The first two are from the above graph “Bump_Test_Failures_Summary_12May24.pdf”.  The last value was obtained by scrolling through the plots of all of the failures and counting them.