# Unit testing and QC for optical tracking data
## fake data at the end of notebook, real data at the beginning 
## Cohort 1 and 2 working, Cohort 0: onix_digital Clock column is 0, explore why and/or use timestamps instead 

In [None]:
from pathlib import Path
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import plotly.graph_objects as go
import plotly.io as pio
import plotly.express as px
from plotly.subplots import make_subplots
from scipy.stats import mode
from scipy.integrate import cumulative_trapezoid


import gc # garbage collector for removing large variables from memory instantly 
import importlib #for force updating changed packages 

#import harp
import harp_resources.process
import harp_resources.utils
from harp_resources import process, utils # Reassign to maintain direct references for force updating 
#from sleap import load_and_process as lp

In [None]:
#initiate variables 
ball_radius = 0.1 #in m
sensor_resolution = 5000 # in cpi, count per inch 

harp_streams_orig_created = False
has_heartbeat = False
cohort0 = False
cohort2 = False
onix_analog_clock_downsampled = False
onix_analog_framecount_upsampled = False
common_resampled_rate = 10000 #in Hz
unit_conversions = False #
save_full_asynchronous_data = False #saves alldata before resampling
has_photometry = True #if photometry data is present
has_onix_harp = True
has_video = True


#Cohort 2 tests for Encoder Test, NO photometry, No videodata1 
#data_path = Path('/Users/rancze/Documents/Data/vestVR/Cohort1/photometry/Optical_Sensor_Calibration/2025-03-17T15-40-16') #supposedly X rotation
data_path = Path('/Users/rancze/Documents/Data/vestVR/Cohort1/photometry/Optical_Sensor_Calibration/2025-03-17T15-41-30') #supposedly Y rotation CW then CCW 
#data_path = Path('/Users/rancze/Documents/Data/vestVR/EncoderTest/2025-02-26T14-57-04')
has_heartbeat = True
has_photometry = False
has_onix_harp = False
has_video = False


photometry_path = data_path.parent / f"{data_path.name}_processedData" / "photometry"

#create loaders 
session_settings_reader = utils.SessionData("SessionSettings")
experiment_events_reader = utils.TimestampedCsvReader("ExperimentEvents", columns=["Event"])
onix_framecount_reader = utils.TimestampedCsvReader("OnixAnalogFrameCount", columns=["Index"])
#photometry_reader = utils.PhotometryReader("Processed_fluorescence")
video_reader1 = utils.VideoReader("VideoData1")
video_reader2 = utils.VideoReader("VideoData2")
onix_digital_reader = utils.OnixDigitalReader("OnixDigital", columns=["Value.Clock", "Value.HubClock", 
                                                                         "Value.DigitalInputs",
                                                                         "Seconds"])
onix_harp_reader = utils.TimestampedCsvReader("OnixHarp", columns=["Clock", "HubClock", "HarpTime"])

### Load real data 

In [None]:
print ("Loading session settings")
session_settings = utils.load_2(session_settings_reader, data_path) #Andrew's, creates ugly df, but used in further analysis code
print ("Loading experiment events")
experiment_events = utils.load_2(experiment_events_reader, data_path)

if has_photometry:
    print ("Loading processed photometry")
    photometry_data=pd.read_csv(str(photometry_path)+'/Processed_fluorescence.csv')
    photometry_data.set_index("TimeStamp", inplace=True)
    photometry_data.index.name = 'Seconds'
    print ("Loading processed photometry info")
    photometry_info=pd.read_csv(str(photometry_path)+'/Info.csv')
    print ("Loading processed photometry events")
    photometry_events=pd.read_csv(str(photometry_path)+'/Events.csv')
    photometry_events["TimeStamp"] = photometry_events["TimeStamp"] /1000 # convert to seconds from ms
    photometry_events.set_index("TimeStamp", inplace=True)
    photometry_events.index.name = 'Seconds'

if has_video:
    print ("Loading video data 1")
    video_data1 = utils.load_2(video_reader1, data_path)
    print ("Loading video data 2")
    video_data2 = utils.load_2(video_reader2, data_path)

# read Onix data
if has_onix_harp:
    print ("Loading OnixDigital")
    onix_digital = utils.load_2(onix_digital_reader, data_path)

if cohort0:
    print ("Loading OnixAnalogFrameClock")
    onix_analog_framecount = utils.load_2(onix_framecount_reader, data_path)
    
print ("Loading OnixAnalogClock")
onix_analog_clock = utils.read_OnixAnalogClock(data_path)
print ("Loading OnixAnalogData and converting to boolean photodiode array")
photodiode = utils.read_OnixAnalogData(data_path, channels = [0], binarise=True, method='adaptive', refractory = 300, flip=True, verbose=False) #method adaptive or threshold (which is hard threshold at 120), refractory to avoid multiple detections

#read HARP data
print ("Loading H1 and H2 streams, AnalogInput removed")
harp_streams = utils.load_registers(data_path, dataframe = True, has_heartbeat = has_heartbeat, verbose = False) #loads as df, or if False, as dict
harp_streams.drop(columns=["AnalogInput(39)"], inplace=True)  # Removes AnalogInput permanently, as not currently used
harp_streams = harp_streams.dropna(how="all") # remove rows with all NaNs
# Convert specific columns in harp_streams to boolean type
columns_to_convert = []
for col in columns_to_convert:
    harp_streams[col] = harp_streams[col].astype(bool)

#read syncronising signal between HARP and ONIX
if not cohort0 and has_onix_harp:
    print ("Loading OnixHarp")
    onix_harp = utils.load_2(onix_harp_reader, data_path)
    onix_harp = utils.detect_and_remove_outliers(
    df=onix_harp,
    x_column="HarpTime",
    y_column="Clock",
    verbose=False  # True prints all outliers
    )
    onix_harp["HarpTime"] = onix_harp["HarpTime"] + 1 # known issue with current version of ONIX, harp timestamps lag 1 second
    print ("❗Reminder: HarpTime was increased by 1s to account for know issue with ONIX")

print ("✅ Done Loading")

Convert platform position and flow sensor streams to real world units and forward fill 

In [None]:
if not harp_streams_orig_created:
    harp_streams_orig = harp_streams.copy() #keep original for reference
    harp_streams_orig_created = True
else:
    harp_streams = harp_streams_orig.copy() #reset to original if already created   
    
#----------------------------------------------------------------------------
# Perform unit conversions if not already done
#----------------------------------------------------------------------------

if not unit_conversions:
    #get raw cumulative counts as position in raw data counts for plotting only 
    harp_streams["CumulativeCounts_0X"] = harp_streams["OpticalTrackingRead0X(46)"].copy().cumsum()
    harp_streams["CumulativeCounts_0Y"] = harp_streams["OpticalTrackingRead0Y(46)"].copy().cumsum()

    harp_streams["Position_OpticalTrackingRead0X(46)"] = process.running_unit_conversion(harp_streams["OpticalTrackingRead0X(46)"].copy().cumsum().ffill().bfill().to_numpy(), sensor_resolution)
    harp_streams["Position_OpticalTrackingRead0Y(46)"] = process.turning_unit_conversion(harp_streams["OpticalTrackingRead0Y(46)"].copy().cumsum().ffill().bfill().to_numpy(), sensor_resolution, ball_radius)

    unit_conversions = True
    print("✅ Unit conversions to real-life values done")
else:
    print("❗ Flow sensor and encoder values already converted to real-world units, skipping")


In [None]:
#plot real data
# Create subplots with two columns

def get_symmetric_range(series):
    """Ensures the 0 value is in the center of the axis."""
    max_abs = max(abs(series.min()), abs(series.max()))
    return [-max_abs, max_abs]

fig = make_subplots(
    rows=1, cols=2, shared_xaxes=True, 
    subplot_titles=("CumulativeCounts_0X vs Position_0X", "CumulativeCounts_0Y vs Position_0Y"),
    specs=[[{"secondary_y": True}, {"secondary_y": True}]]  # Enable secondary y-axes
)

# First subplot (0X data) - Position first (background)
fig.add_trace(
    go.Scatter(x=harp_streams.index, y=harp_streams["Position_OpticalTrackingRead0X(46)"], 
               mode='lines', name="Position_0X", 
               line=dict(color='lightskyblue', width=8), opacity=0.5),  # Move opacity here
    row=1, col=1, secondary_y=True
)

# CumulativeCounts in foreground
fig.add_trace(
    go.Scatter(x=harp_streams.index, y=harp_streams["CumulativeCounts_0X"], 
               mode='markers', name="CumulativeCounts_0X", 
               marker=dict(color='black', size=2, opacity=1)),  # Bigger dots, semi-transparent
    row=1, col=1, secondary_y=False
)

# Second subplot (0Y data) - Position first (background)
fig.add_trace(
    go.Scatter(x=harp_streams.index, y=harp_streams["Position_OpticalTrackingRead0Y(46)"], 
               mode='lines', name="Position_0Y", 
               line=dict(color='lightskyblue', width=8), opacity=0.5),  # Move opacity here
    row=1, col=2, secondary_y=True
)

# CumulativeCounts in foreground
fig.add_trace(
    go.Scatter(x=harp_streams.index, y=harp_streams["CumulativeCounts_0Y"], 
               mode='markers', name="CumulativeCounts_0Y", 
               marker=dict(color='black', size=2, opacity=1)),  # Bigger dots, semi-transparent
    row=1, col=2, secondary_y=False
)

# Update layout and ensure 0 is in the middle of each y-axis
fig.update_layout(
    title=data_path.name,
    template="plotly_white",
    height=400,  # Reduce the height
    xaxis_title="Time",
    xaxis2_title="Time",
)

# Set the y-axis ranges to be symmetric around zero
fig.update_yaxes(title="Cumulative Counts (raw)", range=get_symmetric_range(harp_streams["CumulativeCounts_0X"]), row=1, col=1, secondary_y=False)
fig.update_yaxes(title="Position (m)", range=get_symmetric_range(harp_streams["Position_OpticalTrackingRead0X(46)"]), row=1, col=1, secondary_y=True)

fig.update_yaxes(title="Cumulative Counts (raw)", range=get_symmetric_range(harp_streams["CumulativeCounts_0Y"]), row=1, col=2, secondary_y=False)
fig.update_yaxes(title="Position (deg)", range=get_symmetric_range(harp_streams["Position_OpticalTrackingRead0Y(46)"]), row=1, col=2, secondary_y=True)

# Open the figure in a browser
pio.show(fig, renderer="notebook")


In [None]:
Y_min =  ((harp_streams["Position_OpticalTrackingRead0Y(46)"].min() / -360))
print ("cpi conversion factor = ", Y_min)
print ("Inferred cpi = ", sensor_resolution * Y_min)

# UNIT TESTING with fake data 

In [None]:
#----------------------------------------------------------------------------
# Generate fake data for testing
#----------------------------------------------------------------------------

#create fake position data 
num_points = 5000  # 5 seconds at 1 ms resolution
datetime_index = pd.date_range(start="2025-01-01 00:00:00", periods=num_points, freq="1ms")

# Generate a full sine cycle
t = np.linspace(0, 2 * np.pi, num_points)  # One full sine cycle
sinusoid = np.sin(t) * (2 * (((2*ball_radius*np.pi)/0.0254)*sensor_resolution))  # Scale amplitude to 2 full ball rotations 

# Create DataFrame with the new millisecond-based index
harp_streams_test = pd.DataFrame({
    "OpticalTrackingRead0X(46)": sinusoid,
    "OpticalTrackingRead0Y(46)": sinusoid,  # Same values as 0X
}, index=datetime_index)

# print("INFO: TEST 2x+, 4x-, 2x+ fullrotations created")
# print ("Maximum X : ", (harp_streams_test["OpticalTrackingRead0X(46)"].max()))
# print ("Minimum X : ", (harp_streams_test["OpticalTrackingRead0X(46)"].min()))
# print ("Maximum Y : ", (harp_streams_test["OpticalTrackingRead0X(46)"].max()))
# print ("Minimum Y : ", (harp_streams_test["OpticalTrackingRead0X(46)"].min()))
# #harp_streams_test["OpticalTrackingRead0Y(46)"].plot()

# get displacement values 
harp_streams_test["OpticalTrackingRead0X(46)"]= harp_streams_test["OpticalTrackingRead0X(46)"].diff()
harp_streams_test["OpticalTrackingRead0Y(46)"]= harp_streams_test["OpticalTrackingRead0Y(46)"].diff()
#harp_streams_test["OpticalTrackingRead0Y(46)"].plot()

#----------------------------------------------------------------------------
# Perform unit conversions 
#----------------------------------------------------------------------------

#get raw cumulative counts as position in raw data counts 
harp_streams_test["CumulativeCounts_0X"] = harp_streams_test["OpticalTrackingRead0X(46)"].copy().cumsum()
harp_streams_test["CumulativeCounts_0Y"] = harp_streams_test["OpticalTrackingRead0Y(46)"].copy().cumsum()
harp_streams_test["CumulativeCounts_0X"].plot()

harp_streams_test["Position_OpticalTrackingRead0X(46)"] = process.running_unit_conversion(harp_streams_test["OpticalTrackingRead0X(46)"].copy().cumsum().ffill().bfill().to_numpy(), sensor_resolution)
harp_streams_test["Position_OpticalTrackingRead0Y(46)"] = process.turning_unit_conversion(harp_streams_test["OpticalTrackingRead0Y(46)"].copy().cumsum().ffill().bfill().to_numpy(), sensor_resolution, ball_radius)

print ("----------")
print("INFO: Flow sensor converted to real-world units")
print ("Maximum X meter (should be ~1.2566): ", (harp_streams_test["Position_OpticalTrackingRead0X(46)"].max()))
print ("Minimum X meter (should be ~-1.2566): ", (harp_streams_test["Position_OpticalTrackingRead0X(46)"].min()))
print ("Maximum Y degree (should be 720): ", (harp_streams_test["Position_OpticalTrackingRead0Y(46)"].max()))
print ("Minimum Y degree (should be -720): ", (harp_streams_test["Position_OpticalTrackingRead0Y(46)"].min()))

In [None]:
#plot harp_stream_test 
# Create subplots with two columns

def get_symmetric_range(series):
    """Ensures the 0 value is in the center of the axis."""
    max_abs = max(abs(series.min()), abs(series.max()))
    return [-max_abs, max_abs]

fig = make_subplots(
    rows=1, cols=2, shared_xaxes=True, 
    subplot_titles=("CumulativeCounts_0X vs Position_0X", "CumulativeCounts_0Y vs Position_0Y"),
    specs=[[{"secondary_y": True}, {"secondary_y": True}]]  # Enable secondary y-axes
)

# First subplot (0X data) - Position first (background)
fig.add_trace(
    go.Scatter(x=harp_streams_test.index, y=harp_streams_test["Position_OpticalTrackingRead0X(46)"], 
               mode='lines', name="Position_0X", 
               line=dict(color='lightskyblue', width=8), opacity=0.5),  # Move opacity here
    row=1, col=1, secondary_y=True
)

# CumulativeCounts in foreground
fig.add_trace(
    go.Scatter(x=harp_streams_test.index, y=harp_streams_test["CumulativeCounts_0X"], 
               mode='markers', name="CumulativeCounts_0X", 
               marker=dict(color='black', size=2, opacity=1)),  # Bigger dots, semi-transparent
    row=1, col=1, secondary_y=False
)

# Second subplot (0Y data) - Position first (background)
fig.add_trace(
    go.Scatter(x=harp_streams_test.index, y=harp_streams_test["Position_OpticalTrackingRead0Y(46)"], 
               mode='lines', name="Position_0Y", 
               line=dict(color='lightskyblue', width=8), opacity=0.5),  # Move opacity here
    row=1, col=2, secondary_y=True
)

# CumulativeCounts in foreground
fig.add_trace(
    go.Scatter(x=harp_streams_test.index, y=harp_streams_test["CumulativeCounts_0Y"], 
               mode='markers', name="CumulativeCounts_0Y", 
               marker=dict(color='black', size=2, opacity=1)),  # Bigger dots, semi-transparent
    row=1, col=2, secondary_y=False
)

# Update layout and ensure 0 is in the middle of each y-axis
fig.update_layout(
    title="UNIT TESTING, +2 then -4 then +2  full ball rotations synthetic data, ball circumference ~62.8 cm",
    template="plotly_white",
    height=400,  # Reduce the height
    xaxis_title="Time",
    xaxis2_title="Time",
)

# Set the y-axis ranges to be symmetric around zero
fig.update_yaxes(title="Cumulative Counts (raw)", range=get_symmetric_range(harp_streams_test["CumulativeCounts_0X"]), row=1, col=1, secondary_y=False)
fig.update_yaxes(title="Position (m)", range=get_symmetric_range(harp_streams_test["Position_OpticalTrackingRead0X(46)"]), row=1, col=1, secondary_y=True)

fig.update_yaxes(title="Cumulative Counts (raw)", range=get_symmetric_range(harp_streams_test["CumulativeCounts_0Y"]), row=1, col=2, secondary_y=False)
fig.update_yaxes(title="Position (deg)", range=get_symmetric_range(harp_streams_test["Position_OpticalTrackingRead0Y(46)"]), row=1, col=2, secondary_y=True)

# Open the figure in a browser
pio.show(fig, renderer="notebook")
