<a href="https://colab.research.google.com/github/savhaupt/Phantom_Analysis/blob/main/Phantom_Analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Phantom Results Analysis

This file is used to analyse the results from the six tests performed on the phantom model. It does the following:
- Loads the relevant test files
- Preprocesses each file:

      1.   Selects only the relevant columns and drops the rest
      2.   Removes rows where time is missing

- Extracts motion events from Cumulative Time (s) column (manual annotations)
- Plots each signal against time and overlays motion event markers
- Evaluation / Analysis

  1.   find_peaks is used to calculate peaks within each signal
  2.   Determines whether the peak falls within a certain window around the nearest motion event
  3. Calculates the number of peaks captured within the defined window per signal





## Imports

In [None]:
from google.colab import files
import pandas as pd
import matplotlib.pyplot as plt
import glob
from scipy.signal import find_peaks
import numpy as np
import seaborn as sns

## Load Files

In [None]:
# Mount Drive
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [None]:
# Find files
files = glob.glob("/content/gdrive/MyDrive/MSc BME TU Delft/Year 2/Thesis/Results   Analysis/*.csv")

In [None]:
# Store files in dataframe
test_1 = pd.read_csv("/content/gdrive/MyDrive/MSc BME TU Delft/Year 2/Thesis/Results   Analysis/Test 1.csv")
test_2 = pd.read_csv("/content/gdrive/MyDrive/MSc BME TU Delft/Year 2/Thesis/Results   Analysis/Test 2.csv")
test_3 = pd.read_csv("/content/gdrive/MyDrive/MSc BME TU Delft/Year 2/Thesis/Results   Analysis/Test 3.csv")
test_4 = pd.read_csv("/content/gdrive/MyDrive/MSc BME TU Delft/Year 2/Thesis/Results   Analysis/Test 4.csv")
test_5 = pd.read_csv("/content/gdrive/MyDrive/MSc BME TU Delft/Year 2/Thesis/Results   Analysis/Test 5.csv")
test_6 = pd.read_csv("/content/gdrive/MyDrive/MSc BME TU Delft/Year 2/Thesis/Results   Analysis/Test 6.csv")

In [None]:
# Store tests
tests = [test_1, test_2, test_3, test_4, test_5, test_6]

## Pre-processing

In [None]:
# Show column names to confirm
print("Columns in file:", test_1.columns.tolist())

Columns in file: ['Time(ms)', 'Time(s)', 'ADC', 'Pressure(cmH2O)', 'AccelX', 'AccelY', 'AccelZ', 'AccMag', 'GyroX', 'GyroY', 'GyroZ', 'Event', 'Lap Time(s)', 'Cumulative Time (s)']


In [None]:
# Select relevant columns
cols = ["Time(s)", "Pressure(cmH2O)", "AccelX", "AccelY", "AccelZ",
        "AccMag", "GyroX", "GyroY", "GyroZ", "Cumulative Time (s)"]

drop = ['Time(ms)', 'ADC', 'Event']

In [None]:
# Drop rows where time is missing
for test in tests:
  test.dropna(subset=["Time(s)"])
  test.drop(drop, axis=1, inplace=True, errors='ignore')

In [None]:
# Test print
print(test_1.head())

   Time(s)  Pressure(cmH2O)  AccelX  AccelY  AccelZ  AccMag  GyroX  GyroY  \
0     0.03            40.78    7.77   -1.18    6.38   10.12  -0.02  -0.01   
1     0.10            40.92    7.79   -1.20    6.34   10.12  -0.02  -0.01   
2     0.16            40.78    7.80   -1.20    6.30   10.10  -0.02  -0.01   
3     0.22            40.92    7.77   -1.19    6.33   10.09  -0.02   0.00   
4     0.28            41.32    7.75   -1.21    6.34   10.09  -0.02  -0.01   

   GyroZ  Lap Time(s)  Cumulative Time (s)  
0    0.0        32.97                32.97  
1    0.0         6.58                39.55  
2    0.0         6.20                45.75  
3    0.0         6.51                52.26  
4    0.0         5.36                57.62  


## Extract Motion Events

Motion events stored in column "Cumulative Time (s)"

In [None]:
# Threshold to remove the last 30 second lap (non-event)
threshold = 25.0

all_motion_times = []

for i, test in enumerate(tests, start=1):

    # Find last valid index in Lap Time column
    last_idx = test["Lap Time(s)"].last_valid_index()

    if last_idx is None:
        print(f"Test {i}: no valid Lap Time found.")
        all_motion_times.append([]) # store empty list
        continue

    last_lap = test.loc[last_idx, "Lap Time(s)"]

    # If last lap time > threshold, remove it
    if np.isfinite(last_lap) and last_lap > threshold:
        test.loc[last_idx, ["Lap Time(s)", "Cumulative Time (s)"]] = np.nan
        print(f"\nTest {i}: removed last row (Lap Time = {last_lap:.2f}s)")
    else:
        print(f"\nTest {i}: last Lap Time = {last_lap:.2f}s (kept)")

    # Compute remaining motion events
    motion_times = test["Cumulative Time (s)"].dropna().values

    # Save for downstream use
    all_motion_times.append(motion_times)

    # Report
    print(f"  Number of motion events: {len(motion_times)}")
    print(f"  Motion events found at (s): {motion_times}")

# Summary dataframe
summary = pd.DataFrame({
    "Test": [f"Test {i+1}" for i in range(len(all_motion_times))],
    "Num_Motion_Events": [len(mt) for mt in all_motion_times],
    "Motion_Times": [list(mt) for mt in all_motion_times]
})

print("\n=== Summary ===")
display(summary)



Test 1: removed last row (Lap Time = 31.48s)
  Number of motion events: 81
  Motion events found at (s): [ 32.97  39.55  45.75  52.26  57.62  64.22  70.52  75.6   81.05  86.65
  92.61  99.09 105.44 111.65 117.21 123.69 129.79 136.32 142.1  147.66
 153.17 159.73 166.63 173.01 178.87 184.9  191.2  197.91 204.05 209.89
 215.78 221.06 226.37 231.81 237.9  243.75 248.26 253.65 259.78 266.23
 271.71 276.56 282.17 287.57 292.63 297.86 304.02 310.03 315.19 320.18
 325.39 331.12 336.38 341.44 346.5  351.3  356.45 362.31 367.47 372.71
 378.07 383.43 388.48 393.78 399.49 404.87 410.65 415.48 420.54 425.47
 430.28 435.23 440.28 445.46 450.51 455.46 460.59 465.52 470.43 475.41
 481.32]

Test 2: removed last row (Lap Time = 33.97s)
  Number of motion events: 76
  Motion events found at (s): [ 31.88  37.36  42.72  47.77  53.05  58.57  63.77  68.62  73.74  78.84
  83.82  88.63  93.65  98.58 103.68 108.81 114.46 119.71 124.52 129.7
 134.7  139.7  145.5  150.83 155.83 160.88 165.68 171.33 176.49 181.72

Unnamed: 0,Test,Num_Motion_Events,Motion_Times
0,Test 1,81,"[32.97, 39.55, 45.75, 52.26, 57.62, 64.22, 70...."
1,Test 2,76,"[31.88, 37.36, 42.72, 47.77, 53.05, 58.57, 63...."
2,Test 3,22,"[30.46, 36.2, 41.73, 47.17, 52.63, 58.14, 63.5..."
3,Test 4,45,"[30.37, 39.12, 44.55, 56.88, 62.34, 69.81, 76...."
4,Test 5,45,"[30.14, 35.37, 40.75, 45.33, 55.3, 60.18, 65.2..."
5,Test 6,21,"[30.23, 35.67, 40.31, 45.48, 50.45, 55.14, 60...."


In [None]:
print(type(motion_times))
print(motion_times)

<class 'numpy.ndarray'>
[ 30.23  35.67  40.31  45.48  50.45  55.14  60.05  65.36  70.37  78.37
  80.73  85.17  90.81  96.34 101.65 107.16 112.6  119.49 124.77 130.26
 135.77]


## Plot

In [None]:
for i, test in enumerate(tests):
  time = test["Time(s)"]
  signals = [c for c in test.columns if c not in ["Time(s)", "Lap Time(s)", "Cumulative Time (s)"]]

  # Get motion times for each test iteration
  motion_times = all_motion_times[i]

  for sig in signals:
      plt.figure(figsize=(10, 5))

      # Plot main signal
      plt.plot(time, test[sig], label=sig, color='steelblue', zorder=1)

      # Overlay motion event markers (higher zorder so they appear in front)
      if len(motion_times) > 0:
        for j, t in enumerate(motion_times):
          # Find the closest index in time
          idx = (test["Time(s)"] - t).abs().idxmin()
          plt.scatter(
              test.loc[idx, "Time(s)"], test.loc[idx, sig],
              color="red", marker="o", s=20, edgecolor="black",
              zorder=3, label="Motion Event" if t == motion_times[0] else "" # only show label once
          )

      plt.title(f"Test {i+1}: {sig} vs Time(s)")
      plt.xlabel("Time (s)")
      plt.ylabel(sig)
      plt.legend()
      plt.grid(True, linestyle='--', alpha=0.6)
      plt.tight_layout()
      plt.show()


## Evaluate

## Define Parameters

# Adaptive Function for Peak Parameters

In [None]:
def adaptive_find_peaks(signal, fs, multiplier_prom=0.3, min_distance_sec=2):
    """
    Adaptive peak detection based on signal characteristics.

    Parameters:
        signal (array-like): Input signal data
        fs (float): Sampling frequency (Hz)
        multiplier_prom (float): Prominence scaling factor (default=0.3)
        min_distance_sec (float): Minimum distance between peaks in seconds (default=2s)
    Returns:
        peaks (np.ndarray): Indices of detected peaks
        properties (dict): Properties returned by scipy.find_peaks
    """
    # Ignore NaNs
    signal = np.array(signal)
    signal = signal[~np.isnan(signal)]

    # Estimate signal stats
    std = np.std(signal)
    prom = std * multiplier_prom
    distance = int(fs * min_distance_sec)

    # Run find_peaks
    peaks, properties = find_peaks(signal, prominence=prom, distance=distance)
    return peaks, properties

From here, have two analysis approaches:
One using standard prominence and distance values (not signal specific), one using the adaptive function above. Need to check what works best later and fine tune

In [None]:
# Define time window around motion events (seconds)
window = 3

# find_peaks parameters
prominence = 0.2
distance = 40

## Visual Inspection

In [None]:
# Parameters
window = 3         # seconds around motion events
prominence = 0.5
distance = 70

# Loop through all tests
for i, test in enumerate(tests, start=1):
    time = test["Time(s)"].values

    # Get all signal columns except time and motion tracking
    signals = [c for c in test.columns if c not in ["Time(s)", "Lap Time(s)", "Cumulative Time (s)"]]

    # Motion times for this test
    motion_times = all_motion_times[i-1]

    for sig in signals:
        y = test[sig].values

        # Detect peaks
        peaks, _ = find_peaks(y, distance=distance, prominence=prominence)
        peak_times = time[peaks]

        # Plot signal
        plt.figure(figsize=(10, 5))
        plt.plot(time, y, label=sig, color="steelblue")

        # Plot all peaks at once
        plt.scatter(peak_times, y[peaks], color="orange", s=30, label="Detected Peaks", zorder=3)

        # Highlight motion event windows
        for t_event in motion_times:
            plt.axvspan(t_event - window, t_event + window, color="red", alpha=0.2)
            plt.axvline(t_event, color="red", linestyle="--", linewidth=1)

        plt.title(f"Test {i}: {sig} with Peaks and Motion Event Windows (±{window}s)")
        plt.xlabel("Time (s)")
        plt.ylabel(sig)
        plt.legend()
        plt.grid(True, alpha=0.4)
        plt.tight_layout()
        plt.show()


## Summary Table

In [None]:
results = []

for i, test in enumerate(tests, start=1):

  # Get motion times for the test
  motion_times = all_motion_times[i-1]

  # Get all signals columsn except time and motion tracking
  signals = [c for c in test.columns if c not in ["Time(s)", "Lap Time(s)", "Cumulative Time (s)"]]
  time = test["Time(s)"].values

  for sig in signals:
      y = test[sig].values

      # Detect peaks
      peaks, _ = find_peaks(y, distance=distance, prominence=prominence)
      peak_times = time[peaks]

      # Match peaks to motion events
      matches = 0
      for t_event in motion_times:
          # Find peaks within ±window seconds of this event
          nearby_peaks = np.where(
              (peak_times >= t_event - window) & (peak_times <= t_event + window)
          )[0]
          if len(nearby_peaks) > 0:
              matches += 1

      # Store results
      results.append({
          "Test": i,
          "Signal": sig,
          "Total Peaks": len(peak_times),
          "Motion Events": len(motion_times),
          "Peaks near Motion Events": matches,
          "Percent of Events with Nearby Peaks": 100 * matches / len(motion_times) if len(motion_times) > 0 else np.nan
      })

# Combine all test results into one DataFrame
summary_df = pd.DataFrame(results)

# Display or save
display(summary_df)


Unnamed: 0,Test,Signal,Total Peaks,Motion Events,Peaks near Motion Events,Percent of Events with Nearby Peaks
0,1,Pressure(cmH2O),98,81,71,87.654321
1,1,AccelX,80,81,75,92.592593
2,1,AccelY,80,81,71,87.654321
3,1,AccelZ,80,81,76,93.82716
4,1,AccMag,79,81,76,93.82716
5,1,GyroX,70,81,65,80.246914
6,1,GyroY,35,81,33,40.740741
7,1,GyroZ,0,81,0,0.0
8,2,Pressure(cmH2O),75,76,72,94.736842
9,2,AccelX,39,76,44,57.894737


## Stats

In [None]:
# Exclude pressure signal
motion_signals = summary_df[summary_df["Signal"] != "Pressure(cmH2O)"]

# --- Step 1: Best motion signal per test ---
best_per_test = (
    motion_signals
    .loc[motion_signals.groupby("Test")["Percent of Events with Nearby Peaks"].idxmax()]
    .reset_index(drop=True)
)

print("Best motion signal per test:")
print(best_per_test[["Test", "Signal", "Percent of Events with Nearby Peaks"]])

# --- Step 2: Find the most common best-performing signal across all tests ---
most_common_signal = (
    best_per_test["Signal"]
    .value_counts()
    .idxmax()
)

most_common_count = best_per_test["Signal"].value_counts().max()

print(f"\nMost common best-performing signal across tests: {most_common_signal}")
print(f"   → Best performing in {most_common_count} out of {best_per_test['Test'].nunique()} tests")

# --- Optional: Average detection percentage for that signal ---
avg_performance = (
    motion_signals[motion_signals["Signal"] == most_common_signal]
    ["Percent of Events with Nearby Peaks"]
    .mean()
)

print(f"   → Average detection percentage: {avg_performance:.2f}%")


Best motion signal per test:
   Test  Signal  Percent of Events with Nearby Peaks
0     1  AccelZ                            93.827160
1     2  AccMag                            96.052632
2     3  AccelZ                            95.454545
3     4  AccelZ                            48.888889
4     5  AccelY                            82.222222
5     6  AccelX                            95.238095

Most common best-performing signal across tests: AccelZ
   → Best performing in 3 out of 6 tests
   → Average detection percentage: 81.24%


# Test code using adaptive function

In [None]:
results = []
fs = 20
window = 3  # seconds

for i, test in enumerate(tests, start=1):
    motion_times = all_motion_times[i-1]
    signals = [c for c in test.columns if c not in ["Time(s)", "Lap Time(s)", "Cumulative Time (s)"]]
    time = test["Time(s)"].values

    # Get valid cumulative times
    # Remove all NaN values from column (otherwise takes rows from full table and returns NaN as last time)
    valid_cum_times = test["Cumulative Time (s)"].dropna().values
    if len(valid_cum_times) == 0:
        print(f"Test {i}: No valid cumulative times, skipping.")
        continue

    t_start = valid_cum_times[0]
    t_end = valid_cum_times[-1]

    print(f"Test {i}:")
    print(t_start)
    print(t_end)

    for sig in signals:
        y = test[sig].values

        # Detect peaks
        peaks, _ = adaptive_find_peaks(y, fs, multiplier_prom=0.3, min_distance_sec=2)
        peak_times = time[peaks]

        # Keep peaks only within cumulative time bounds
        valid_mask = (peak_times >= t_start) & (peak_times <= t_end)
        peak_times = peak_times[valid_mask]

        # True positives (TP): peaks matched to motion events
        tp = 0
        matched_peaks = set()

        for t_event in motion_times:
            nearby_peaks = np.where((peak_times >= t_event - window) & (peak_times <= t_event + window))[0]
            if len(nearby_peaks) > 0:
                tp += 1
                matched_peaks.update(nearby_peaks)

        # --- False negatives (FN): motion events with no matching peak ---
        fn = len(motion_times) - tp

        # --- False positives (FP): peaks not matched to any motion event ---
        fp = len(peak_times) - len(matched_peaks)

        # --- Precision, recall, F1 ---
        precision = tp / (tp + fp) if (tp + fp) > 0 else np.nan
        recall = tp / (tp + fn) if (tp + fn) > 0 else np.nan
        f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else np.nan

        results.append({
            "Test": i,
            "Signal": sig,
            "Total Peaks": len(peak_times),
            "Motion Events": len(motion_times),
            "TP": tp,
            "FP": fp,
            "FN": fn,
            "Precision": precision,
            "Recall": recall,
            "F1": f1
        })

summary_df = pd.DataFrame(results)
display(summary_df)


Test 1:
32.97
481.32
Test 2:
31.88
414.85
Test 3:
30.46
145.36
Test 4:
30.37
392.53
Test 5:
30.14
260.33
Test 6:
30.23
135.77


Unnamed: 0,Test,Signal,Total Peaks,Motion Events,TP,FP,FN,Precision,Recall,F1
0,1,Pressure(cmH2O),129,81,81,2,0,0.975904,1.0,0.987805
1,1,AccelX,117,81,80,5,1,0.941176,0.987654,0.963855
2,1,AccelY,101,81,77,4,4,0.950617,0.950617,0.950617
3,1,AccelZ,113,81,81,3,0,0.964286,1.0,0.981818
4,1,AccMag,101,81,79,3,2,0.963415,0.975309,0.969325
5,1,GyroX,88,81,73,5,8,0.935897,0.901235,0.918239
6,1,GyroY,108,81,79,3,2,0.963415,0.975309,0.969325
7,1,GyroZ,2,81,1,1,80,0.5,0.012346,0.024096
8,2,Pressure(cmH2O),112,76,76,0,0,1.0,1.0,1.0
9,2,AccelX,68,76,66,0,10,1.0,0.868421,0.929577


In [None]:
# --- Parameters ---
window = 3  # seconds around motion events
fs = 20     # sampling frequency (Hz)

# --- Loop through all tests ---
for i, test in enumerate(tests, start=1):
    time = test["Time(s)"].values

    # Get signal columns (exclude non-signal ones)
    signals = [c for c in test.columns if c not in ["Time(s)", "Lap Time(s)", "Cumulative Time (s)"]]

    # Motion times for this test
    motion_times = all_motion_times[i-1]

    # Get valid cumulative times (exclude NaNs)
    valid_cum_times = test["Cumulative Time (s)"].dropna().values
    if len(valid_cum_times) == 0:
        print(f"Test {i}: No valid cumulative times, skipping.")
        continue

    # Define analysis window
    t_start = valid_cum_times[0]
    t_end = valid_cum_times[-1]

    print(f"\nTest {i} time bounds: {t_start:.2f} → {t_end:.2f}")

    # --- Process each signal ---
    for sig in signals:
        y = test[sig].values

        # Adaptive peak detection
        peaks, _ = adaptive_find_peaks(
            y,
            fs=fs,
            multiplier_prom=0.3,      # tune for sensitivity
            min_distance_sec=2       # tune for min spacing between peaks
        )

        # Map to times
        peak_times = time[peaks]

        # Filter to cumulative-time bounds
        valid_mask = (peak_times >= t_start) & (peak_times <= t_end)
        peak_times = peak_times[valid_mask]
        valid_peaks = peaks[valid_mask]

        print(f"  {sig}: {len(valid_peaks)} peaks within valid range")

        # --- Plot signal with peaks and motion windows ---
        plt.figure(figsize=(10, 5))
        plt.plot(time, y, label=sig, color="steelblue")
        plt.scatter(peak_times, y[valid_peaks], color="orange", s=30, label="Detected Peaks", zorder=3)

        # Highlight motion event windows
        for t_event in motion_times:
            plt.axvspan(t_event - window, t_event + window, color="red", alpha=0.2)
            plt.axvline(t_event, color="red", linestyle="--", linewidth=1)

        plt.title(f"Test {i}: {sig} — Peaks within Motion Windows (±{window}s)")
        plt.xlabel("Time (s)")
        plt.ylabel(sig)
        plt.legend()
        plt.grid(True, alpha=0.4)
        plt.tight_layout()
        plt.show()


In [None]:
# Exclude pressure signal
motion_signals = summary_df[summary_df["Signal"] != "Pressure(cmH2O)"]

# --- Step 1: Best motion signal per test ---
best_per_test = (
    motion_signals
    .loc[motion_signals.groupby("Test")["Percent of Events with Nearby Peaks"].idxmax()]
    .reset_index(drop=True)
)

print("Best motion signal per test:")
print(best_per_test[["Test", "Signal", "Percent of Events with Nearby Peaks"]])

# --- Step 2: Find the most common best-performing signal across all tests ---
most_common_signal = (
    best_per_test["Signal"]
    .value_counts()
    .idxmax()
)

most_common_count = best_per_test["Signal"].value_counts().max()

print(f"\nMost common best-performing signal across tests: {most_common_signal}")
print(f"   → Best performing in {most_common_count} out of {best_per_test['Test'].nunique()} tests")

# --- Optional: Average detection percentage for that signal ---
avg_performance = (
    motion_signals[motion_signals["Signal"] == most_common_signal]
    ["Percent of Events with Nearby Peaks"]
    .mean()
)

print(f"   → Average detection percentage: {avg_performance:.2f}%")


KeyError: 'Column not found: Percent of Events with Nearby Peaks'

# Bar Graphs

In [None]:
# Define plot settings
plt.figure(figsize=(12, 6))
sns.barplot(
    data=summary_df,
    x="Test",
    y="Percent of Events with Nearby Peaks",
    hue="Signal",
    palette="tab10"
)

plt.title("Detection Performance per Test and Signal")
plt.ylabel("Percent of Events with Nearby Peaks (%)")
plt.xlabel("Test Number")
plt.legend(title="Signal", bbox_to_anchor=(1.05, 1), loc="upper left")
plt.tight_layout()
plt.grid(axis="y", linestyle="--", alpha=0.6)
plt.show()

# Stats

## F1 Score

In [None]:
sns.boxplot(data=summary_df, x="Signal", y="F1")
plt.title("Distribution of F1-scores per Signal Across Tests")
plt.show()

In [None]:
final_summary = summary_df.groupby("Signal")[["Precision", "Recall", "F1"]].agg(['mean', 'std'])
display(final_summary)


In [None]:
# Compute summary table
final_summary = (
    summary_df
    .groupby("Signal")[["Precision", "Recall", "F1"]]
    .agg(['mean', 'std'])
)

# Find best-performing signal (by F1 mean)
best_signal = final_summary['F1']['mean'].idxmax()

# Highlight the best row
def highlight_best(row):
    color = 'background-color: lightgreen' if row.name == best_signal else ''
    return [color] * len(row)

styled = final_summary.style.apply(highlight_best, axis=1)\
    .format("{:.3f}")
styled
