In [1]:
import numpy as np
import cv2
from itertools import cycle
import pickle
import pathlib
import tqdm
import pandas as pd
import bokeh
from matplotlib import rcParams

rcParams['pdf.fonttype'] = 42  # Ensure fonts are embedded and editable
rcParams['ps.fonttype'] = 42  # Ensure compatibility with vector outputs
%matplotlib inline

def horizontal_flip_eye_data(df: pd.DataFrame, frame_width: int) -> pd.DataFrame:
    """
    Horizontally flip eye-tracking data across the vertical (y) axis.

    Parameters
    ----------
    df : pd.DataFrame
        Must contain columns 'center_x', 'center_y', and 'phi' (in degrees).
    frame_width : int
        Width of the video/frame in pixels.

    Returns
    -------
    pd.DataFrame
        A copy of df where:
          - center_x → frame_width − center_x
          - center_y unchanged
          - phi      → (phi + 90) % 360
    """
    df_flipped = df.copy()
    # mirror x
    df_flipped['center_x'] = frame_width - df_flipped['center_x']
    # phi shift by +90°
    df_flipped['phi'] = (df_flipped['phi'] + 90) % 360
    return df_flipped


def bokeh_plotter(data_list, x_axis_list=None, label_list=None,
                  plot_name='default',
                  x_axis_label='X', y_axis_label='Y',
                  peaks=None, peaks_list=False, export_path=False):
    """Generates an interactive Bokeh plot for the given data vector.
    Args:
        data_list (list or array): The data to be plotted.
        label_list (list of str): The labels of the data vectors
        plot_name (str, optional): The title of the plot. Defaults to 'default'.
        x_axis (str, optional): The label for the x-axis. Defaults to 'X'.
        y_axis (str, optional): The label for the y-axis. Defaults to 'Y'.
        peaks (list or array, optional): Indices of peaks to highlight on the plot. Defaults to None.
        export_path (False or str): when set to str, will output the resulting html fig
    """
    color_cycle = cycle(bokeh.palettes.Category10_10)
    fig = bokeh.plotting.figure(title=f'bokeh explorer: {plot_name}',
                                x_axis_label=x_axis_label,
                                y_axis_label=y_axis_label,
                                plot_width=1500,
                                plot_height=700)

    for i, data_vector in enumerate(data_list):

        color = next(color_cycle)

        if x_axis_list is None:
            x_axis = range(len(data_vector))
        elif len(x_axis_list) == len(data_list):
            print('x_axis manually set')
            x_axis = x_axis_list[i]
        else:
            raise Exception(
                'problem with x_axis_list input - should be either None, or a list with the same length as data_list')
        if label_list is None:
            fig.line(x_axis, data_vector, line_color=color, legend_label=f"Line {i + 1}")
        elif len(label_list) == len(data_list):
            fig.line(range(len(data_vector)), data_vector, line_color=color, legend_label=f"{label_list[i]}")
        if peaks is not None and peaks_list is True:
            fig.circle(peaks[i], data_vector[peaks[i]], size=10, color=color)

    if peaks is not None and peaks_list is False:
        fig.circle(peaks, data_vector[peaks], size=10, color='red')

    if export_path is not False:
        print(f'exporting to {export_path}')
        bokeh.io.output.output_file(filename=str(export_path / f'{plot_name}.html'), title=f'{plot_name}')
    bokeh.plotting.show(fig)


def load_eye_data_2d_w_rotation_matrix(block):
    """
    This function checks if the eye dataframes and rotation dict object exist, then imports them
    :param block: The current blocksync class with verifiec re/le dfs
    :return: None
    """
    try:
        block.left_eye_data = pd.read_csv(block.analysis_path / 'left_eye_data.csv', index_col=0, engine='python')
        block.right_eye_data = pd.read_csv(block.analysis_path / 'right_eye_data.csv', index_col=0, engine='python')
    except FileNotFoundError:
        print('eye_data files not found, run the pipeline!')
        return

    try:
        with open(block.analysis_path / 'rotate_eye_data_params.pkl', 'rb') as file:
            rotation_dict = pickle.load(file)
            block.left_rotation_matrix = rotation_dict['left_rotation_matrix']
            block.right_rotation_matrix = rotation_dict['right_rotation_matrix']
            block.left_rotation_angle = rotation_dict['left_rotation_angle']
            block.right_rotation_angle = rotation_dict['right_rotation_angle']
    except FileNotFoundError:
        print('No rotation matrix file, create it')


def create_saccade_events_df(eye_data_df, speed_threshold, bokeh_verify_threshold=False, magnitude_calib=1,
                             speed_profile=True):
    """
    Detects saccade events in eye tracking data and computes relevant metrics.

    Parameters:
    - eye_data_df (pd.DataFrame): Input DataFrame containing eye tracking data.
    - speed_threshold (float): Threshold for saccade detection based on speed.

    Returns:
    - df (pd.DataFrame): Modified input DataFrame with added columns for speed and saccade detection.
    - saccade_events_df (pd.DataFrame): DataFrame containing information about detected saccade events.

    Steps:
    1. Calculate speed components ('speed_x', 'speed_y') based on differences in 'center_x' and 'center_y'.
    2. Compute the magnitude of the velocity vector ('speed_r').
    3. Create a binary column ('is_saccade') indicating saccade events based on the speed threshold.
    4. Determine saccade onset and offset indices and timestamps.
    5. Create a DataFrame ('saccade_events_df') with columns:
        - 'saccade_start_ind': Indices of saccade onset.
        - 'saccade_start_timestamp': Timestamps corresponding to saccade onset.
        - 'saccade_end_ind': Indices of saccade offset.
        - 'saccade_end_timestamp': Timestamps corresponding to saccade offset.
        - 'length': Duration of each saccade event.
    6. Calculate distance traveled and angles for each saccade event.
    7. Append additional columns to 'saccade_events_df':
        - 'magnitude': Magnitude of the distance traveled during each saccade.
        - 'angle': Angle of the saccade vector in degrees.
        - 'initial_x', 'initial_y': Initial coordinates of the saccade.
        - 'end_x', 'end_y': End coordinates of the saccade.

    Note: The original 'eye_data_df' is not modified; modified data is returned as 'df'.
    """
    df = eye_data_df
    df['speed_x'] = df['center_x'].diff()  # Difference between consecutive 'center_x' values
    df['speed_y'] = df['center_y'].diff()  # Difference between consecutive 'center_y' values

    # Step 2: Calculate magnitude of the velocity vector (R vector speed)
    df['speed_r'] = (df['speed_x'] ** 2 + df['speed_y'] ** 2) ** 0.5

    # Create a column for saccade detection
    df['is_saccade'] = df['speed_r'] > speed_threshold

    # create a saccade_on_off indicator where 1 is rising edge and -1 is falling edge by subtracting a shifted binary mask
    saccade_on_off = df.is_saccade.astype(int) - df.is_saccade.shift(periods=1, fill_value=False).astype(int)
    saccade_on_inds = np.where(saccade_on_off == 1)[
                          0] - 1  # notice the manual shift here, chosen to include the first (sometimes slower) eye frame, just before saccade threshold crossing
    saccade_on_ms = df['ms_axis'].iloc[saccade_on_inds]
    saccade_on_timestamps = df['OE_timestamp'].iloc[saccade_on_inds]
    saccade_off_inds = np.where(saccade_on_off == -1)[0]
    saccade_off_timestamps = df['OE_timestamp'].iloc[saccade_off_inds]
    saccade_off_ms = df['ms_axis'].iloc[saccade_off_inds]

    saccade_dict = {'saccade_start_ind': saccade_on_inds,
                    'saccade_start_timestamp': saccade_on_timestamps.values,
                    'saccade_end_ind': saccade_off_inds,
                    'saccade_end_timestamp': saccade_off_timestamps.values,
                    'saccade_on_ms': saccade_on_ms.values,
                    'saccade_off_ms': saccade_off_ms.values}

    saccade_events_df = pd.DataFrame.from_dict(saccade_dict)
    saccade_events_df['length'] = saccade_events_df['saccade_end_ind'] - saccade_events_df['saccade_start_ind']
    # Drop columns used for intermediate steps
    df = df.drop(['is_saccade'], axis=1)

    distances = []
    angles = []
    speed_list = []
    diameter_list = []
    for index, row in tqdm.tqdm(saccade_events_df.iterrows()):
        saccade_samples = df.loc[(df['OE_timestamp'] >= row['saccade_start_timestamp']) &
                                 (df['OE_timestamp'] <= row['saccade_end_timestamp'])]
        distance_traveled = saccade_samples['speed_r'].sum()
        if speed_profile:
            saccade_speed_profile = saccade_samples['speed_r'].values
            speed_list.append(saccade_speed_profile)
        saccade_diameter_profile = saccade_samples['pupil_diameter'].values
        diameter_list.append(saccade_diameter_profile)
        # Calculate angle from initial position to endpoint
        initial_position = saccade_samples.iloc[0][['center_x', 'center_y']]
        endpoint = saccade_samples.iloc[-1][['center_x', 'center_y']]
        overall_angle = np.arctan2(endpoint['center_y'] - initial_position['center_y'],
                                   endpoint['center_x'] - initial_position['center_x'])

        angles.append(overall_angle)
        distances.append(distance_traveled)

    saccade_events_df['magnitude_raw'] = np.array(distances)
    saccade_events_df['magnitude'] = np.array(distances) * magnitude_calib
    saccade_events_df['angle'] = np.where(np.isnan(angles), angles, np.rad2deg(
        angles) % 360)  # Convert radians to degrees and ensure result is in [0, 360)
    start_ts = saccade_events_df['saccade_start_timestamp'].values
    end_ts = saccade_events_df['saccade_end_timestamp'].values
    saccade_start_df = df[df['OE_timestamp'].isin(start_ts)]
    saccade_end_df = df[df['OE_timestamp'].isin(end_ts)]
    start_x_coord = saccade_start_df['center_x']
    start_y_coord = saccade_start_df['center_y']
    end_x_coord = saccade_end_df['center_x']
    end_y_coord = saccade_end_df['center_y']
    saccade_events_df['initial_x'] = start_x_coord.values
    saccade_events_df['initial_y'] = start_y_coord.values
    saccade_events_df['end_x'] = end_x_coord.values
    saccade_events_df['end_y'] = end_y_coord.values
    saccade_events_df['calib_dx'] = (saccade_events_df['end_x'].values - saccade_events_df[
        'initial_x'].values) * magnitude_calib
    saccade_events_df['calib_dy'] = (saccade_events_df['end_y'].values - saccade_events_df[
        'initial_y'].values) * magnitude_calib
    if speed_profile:
        saccade_events_df['speed_profile'] = speed_list
    saccade_events_df['diameter_profile'] = diameter_list
    if bokeh_verify_threshold:
        bokeh_plotter(data_list=[df.speed_r], label_list=['Pupil Velocity'], peaks=saccade_on_inds)

    return df, saccade_events_df


# create a multi-animal block_collection:

def create_block_collections(animals, block_lists, experiment_path, bad_blocks=None):
    """
    Create block collections and a block dictionary from multiple animals and their respective block lists.

    Parameters:
    - animals: list of str, names of the animals.
    - block_lists: list of lists of int, block numbers corresponding to each animal.
    - experiment_path: pathlib.Path, path to the experiment directory.
    - bad_blocks: list of int, blocks to exclude. Default is an empty list.

    Returns:
    - block_collection: list of BlockSync objects for all specified blocks.
    - block_dict: dictionary where keys are block numbers as strings and values are BlockSync objects.
    """
    import UtilityFunctions_newOE as uf

    if bad_blocks is None:
        bad_blocks = []

    block_collection = []
    block_dict = {}

    for animal, blocks in zip(animals, block_lists):
        # Generate blocks for the current animal
        current_blocks = uf.block_generator(
            block_numbers=blocks,
            experiment_path=experiment_path,
            animal=animal,
            bad_blocks=bad_blocks
        )
        # Add to collection and dictionary
        block_collection.extend(current_blocks)
        for b in current_blocks:
            block_dict[f"{animal}_block_{b.block_num}"] = b

    return block_collection, block_dict


def load_self_kerr_refs(block, filename: str = "self_kerr_refs.csv") -> bool:
    """
    Load Kerr reference coordinates from the analysis folder CSV and set them on `block`.

    Reads a single-row CSV with columns:
        kerr_ref_r_x, kerr_ref_r_y, kerr_ref_l_x, kerr_ref_l_y

    Returns
    -------
    bool
        True if refs were loaded and applied, False if the file was missing or empty.
    """
    path = pathlib.Path(block.analysis_path) / filename
    if not path.exists():
        print(f"No Kerr refs file found at: {path}")
        return False

    df = pd.read_csv(path)
    if df.empty:
        print(f"Kerr refs file is empty: {path}")
        return False

    row = df.iloc[0]

    # Helper to safely set attribute if value is finite
    def _set_attr(name):
        if name in row and pd.notna(row[name]):
            try:
                setattr(block, name, int(round(float(row[name]))))
            except (ValueError, TypeError):
                # keep existing value if conversion fails
                pass

    for col in ("kerr_ref_r_x", "kerr_ref_r_y", "kerr_ref_l_x", "kerr_ref_l_y"):
        _set_attr(col)

    print(f"Kerr refs loaded from: {path}")
    return True

In [2]:
#animals = ['PV_62', 'PV_126', 'PV_57']
#block_lists = [[24, 26, 38], [7, 8, 9, 10, 11, 12], [7, 8, 9, 12, 13]]
#experiment_path = pathlib.Path(r"Z:\Nimrod\experiments")
animals = ['PV_106']
block_lists = [[14]]
experiment_path = pathlib.Path(r"Z:\Nimrod\experiments")

bad_blocks = [0]  # Example of bad blocks

block_collection, block_dict = create_block_collections(
    animals=animals,
    block_lists=block_lists,
    experiment_path=experiment_path,
    bad_blocks=bad_blocks
)

instantiated block number 014 at Path: Z:\Nimrod\experiments\PV_106\2025_09_04\block_014, new OE version
Found the sample rate for block 014 in the xml file, it is 20000 Hz
created the .oe_rec attribute as an open ephys recording obj with get_data functionality
retrieving zertoh sample number for block 014
got it!


In [3]:
for block in block_collection:
    block.parse_open_ephys_events()
    block.get_eye_brightness_vectors()
    #block.create_eye_brightness_df(threshold_value=20)
    block.handle_eye_videos()
    # if the code fails here, go to manual synchronization
    block.calibrate_pixel_size(10)
    load_eye_data_2d_w_rotation_matrix(block)



running parse_open_ephys_events...
block 014 has a parsed events file, reading...
Getting eye brightness values for block 014...
Found an existing file!
Eye brightness vectors generation complete.
handling eye video files
converting videos...
converting files: ['Z:\\Nimrod\\experiments\\PV_106\\2025_09_04\\block_014\\eye_videos\\LE\\imu_trial2\\imu_trial2.h264', 'Z:\\Nimrod\\experiments\\PV_106\\2025_09_04\\block_014\\eye_videos\\RE\\imu_trial2\\imu_trial2.h264'] 
 avoiding conversion on files: ['Z:\\Nimrod\\experiments\\PV_106\\2025_09_04\\block_014\\eye_videos\\LE\\imu_trial2\\imu_trial2_LE.mp4', 'Z:\\Nimrod\\experiments\\PV_106\\2025_09_04\\block_014\\eye_videos\\RE\\imu_trial2\\imu_trial2.mp4']
The file Z:\Nimrod\experiments\PV_106\2025_09_04\block_014\eye_videos\RE\imu_trial2\imu_trial2.mp4 already exists, no conversion necessary
Validating videos...
The video named imu_trial2_LE.mp4 has reported 39118 frames and has 39118 frames, it has dropped 0 frames
The video named imu_trial2

In [None]:
# CAREFUL! THIS CELL USES THE ROTATION MATRICES TO ROTATE THE DATA AND REFERENCE POINTS TOGETHER BEFORE DEGREE CONVERSION:

def apply_rotation(df: pd.DataFrame, rot_mat: np.ndarray, rot_angle: float) -> pd.DataFrame:
    df2 = df.copy()
    pts = df2[['center_x', 'center_y']].values.reshape(-1, 1, 2).astype(np.float32)
    pts2 = cv2.transform(pts, rot_mat.astype(np.float32))
    df2['center_x'] = pts2[:, 0, 0]
    df2['center_y'] = pts2[:, 0, 1]
    df2['phi'] = (df2['phi'] + rot_angle) % 360
    return df2

for block in block_collection:
    # Rotate left eye data in memory
    if hasattr(block, 'left_eye_data') and block.left_eye_data is not None:
        R_L = np.array(block.left_rotation_matrix, dtype=np.float32)
        ang_L = float(block.left_rotation_angle)
        block.left_eye_data = apply_rotation(block.left_eye_data, R_L, ang_L)

    # Rotate right eye data in memory
    if hasattr(block, 'right_eye_data') and block.right_eye_data is not None:
        R_R = np.array(block.right_rotation_matrix, dtype=np.float32)
        ang_R = float(block.right_rotation_angle)
        block.right_eye_data = apply_rotation(block.right_eye_data, R_R, ang_R)

    print(f"Applied rotation to block {block.animal_call}-{block.block_num}")

In [4]:
for block in block_collection:
    load_self_kerr_refs(block)

Kerr refs loaded from: Z:\Nimrod\experiments\PV_106\2025_09_04\block_014\analysis\self_kerr_refs.csv


In [None]:
# This one rotates the reference points
for block in block_collection:
    # Rotate left reference point
    if hasattr(block, 'kerr_ref_l_x') and block.kerr_ref_l_x is not None:
        x0, y0 = block.kerr_ref_l_x, block.kerr_ref_l_y
        pt = np.array([[[x0, y0]]], dtype=np.float32)
        R_L = np.array(block.left_rotation_matrix, dtype=np.float32)
        pt_rot = cv2.transform(pt, R_L)
        # Update in-place
        block.kerr_ref_l_x = float(pt_rot[0, 0, 0])
        block.kerr_ref_l_y = float(pt_rot[0, 0, 1])

    # Rotate right reference point
    if hasattr(block, 'kerr_ref_r_x') and block.kerr_ref_r_x is not None:
        x0, y0 = block.kerr_ref_r_x, block.kerr_ref_r_y
        pt = np.array([[[x0, y0]]], dtype=np.float32)
        R_R = np.array(block.right_rotation_matrix, dtype=np.float32)
        pt_rot = cv2.transform(pt, R_R)
        block.kerr_ref_r_x = float(pt_rot[0, 0, 0])
        block.kerr_ref_r_y = float(pt_rot[0, 0, 1])

    print(f"Block {block.animal_call}-{block.block_num}:")
    print(f"  Rotated left ref -> ({block.kerr_ref_l_x:.2f}, {block.kerr_ref_l_y:.2f})")
    print(f"  Rotated right ref-> ({block.kerr_ref_r_x:.2f}, {block.kerr_ref_r_y:.2f})")


In [5]:
name_tag = 'raw_verified'
for block in block_collection:
    # Here is where the conversion happens:
    block.calculate_kerr_angles(name_tag=name_tag)


working on Block 014
Left eye
Left eye
finished successfully and saved to Z:\Nimrod\experiments\PV_106\2025_09_04\block_014\analysis with tag= raw_verified


In [6]:
# load and combine the eye data with the angle calculation
name_tag = 'raw_verified'
def append_angle_data(eye_df, new_df):
    """
    Appends the angle columns (phi and theta) from new_df to eye_df.
    The function renames 'phi' to 'k_phi' and 'theta' to 'k_theta', then merges
    on the shared 'OE_timestamp' column.

    Parameters:
    - eye_df: pandas DataFrame containing the eye tracking data.
    - new_df: pandas DataFrame containing the new kinematics data with columns
              'phi' and 'theta' along with 'OE_timestamp' (and possibly others).

    Returns:
    - merged_df: pandas DataFrame resulting from merging the new kinematics data
                 into eye_df.
    """
    # Select the necessary columns and rename them
    angle_data = new_df[['OE_timestamp', 'phi', 'theta']].rename(
        columns={'phi': 'k_phi', 'theta': 'k_theta'}
    )

    # Merge on OE_timestamp using a left join to preserve all rows in eye_df
    merged_df = pd.merge(eye_df, angle_data, on='OE_timestamp', how='left')

    return merged_df



for block in block_collection:
    print(block)
    try:
        left_angles = pd.read_csv([i for i in block.analysis_path.iterdir() if (f'left_kerr_angle_{name_tag}.csv' in str(i))][0])
        right_angles = pd.read_csv([i for i in block.analysis_path.iterdir() if (f'right_kerr_angle_{name_tag}.csv' in str(i))][0])
    except IndexError:
        print(f'{block} has a problem, files missing')

    block.left_eye_data = append_angle_data(block.left_eye_data,left_angles)
    block.right_eye_data = append_angle_data(block.right_eye_data,right_angles)



PV_106, block 014, on PV106_IMU_trial3_2025-09-04_13-11-07


In [7]:
name_tag

'raw_verified'

In [7]:
def export_eye_data_w_angles(block, name_tag='0'):
    block.right_eye_data.to_csv(block.analysis_path / f'right_eye_data_{name_tag}.csv')
    block.left_eye_data.to_csv(block.analysis_path / f'left_eye_data_{name_tag}.csv')

for block in block_collection:
    export_eye_data_w_angles(block, name_tag='degrees_raw_verified')

In [8]:
block_collection

[BlockSync object for animal PV_106 with 
 block_num 014 at date PV106_IMU_trial3_2025-09-04_13-11-07]

In [16]:
name_tag = 'degrees_raw_verified'
for block in block_collection:
    print(block.analysis_path / f'right_eye_data_{name_tag}.csv')
    block.right_eye_data.to_csv(block.analysis_path / f'right_eye_data_{name_tag}.csv')
    block.left_eye_data.to_csv(block.analysis_path / f'left_eye_data_{name_tag}.csv')

X:\Nimrod\experiments\PV_143\2025_08_25\block_001\analysis\right_eye_data_degrees_raw_verified.csv
X:\Nimrod\experiments\PV_143\2025_08_25\block_002\analysis\right_eye_data_degrees_raw_verified.csv
X:\Nimrod\experiments\PV_143\2025_08_25\block_003\analysis\right_eye_data_degrees_raw_verified.csv
X:\Nimrod\experiments\PV_143\2025_08_25\block_004\analysis\right_eye_data_degrees_raw_verified.csv
