In [1]:
import ipywidgets as widgets
from IPython.display import display, clear_output
from tkinter import Tk, filedialog
import os
from glob import glob
from pathlib import Path

# Widgets for directory selection
directory_path = widgets.Text(description="Directory Path")
select_button = widgets.Button(description="Select Directory")
output = widgets.Output()

# Global variables
selected_directory = None
csv_files = []
video_files = []
h5_files = []

# Function to handle directory selection
def on_select_button_click(b):
    global selected_directory, csv_files, video_files, h5_files
    with output:
        clear_output(wait=True)
        root = Tk()
        root.attributes("-topmost", True)
        root.withdraw()  # Hide the main Tkinter window
        new_directory = filedialog.askdirectory(title="Please select a directory containing the .csv files")
        root.destroy()  # Destroy the Tkinter instance

        if new_directory:  # Check if a directory was selected
            selected_directory = new_directory
            directory_path.value = selected_directory
            print(f"You chose: {selected_directory}")
            
            video_files = glob(os.path.join(selected_directory, '**', '*.avi'), recursive=True)
            csv_files = glob(os.path.join(selected_directory, '**', '*.csv'), recursive=True)
            h5_files = glob(os.path.join(selected_directory, '**', '*.h5'), recursive=True)
            
            if csv_files:
                print(f"\nFound {len(csv_files)} .csv files in the directory and subdirectories")
                # for csv_file in csv_files:
                #     print(f"- {os.path.basename(csv_file)}")
            if video_files:
                print(f"\nFound {len(video_files)} .avi files in the directory and subdirectories")
                # for video_file in video_files:
                #     print(f"- {os.path.basename(video_file)}")
            if h5_files:
                print(f"\nFound {len(h5_files)} .h5 files in the directory and subdirectories")
                # for h5_file in h5_files:
                #     print(f"- {os.path.basename(h5_file)}")
            else:
                print("No files found in the selected directory.")
        else:
            print("No directory selected.")



# Connect button to function and display widgets
select_button.on_click(on_select_button_click)
display(directory_path, select_button, output)

Text(value='', description='Directory Path')

Button(description='Select Directory', style=ButtonStyle())

Output()

In [None]:
#------------------------Read in DLC machine labels/add new labels-------------------------------

# Enable Qt event loop for Jupyter
%gui qt

from typing import List
from dask_image.imread import imread
from magicgui.widgets import ComboBox, Container
import napari
import numpy as np
import imageio
import pandas as pd
from pathlib import Path
import csv
from PyQt5.QtCore import QTimer

COLOR_CYCLE = [
    '#1f77b4',
    '#ff7f0e',
    '#2ca02c',
    '#d62728',
    '#9467bd',
    '#8c564b',
    '#e377c2',
    '#7f7f7f',
    '#bcbd22',
    '#17becf'
]

# Insert the labels you want to add into this array --> order will be the same order in the Napari GUI 
labels = ["FR", "FR_ti", "FR_ti1", "FR_tm", "FR_to1", "FR_to",
 "HL", "HL_ti", "HL_ti1", "HL_tm", "HL_to1", "HL_to",
 "FL", "FL_ti", "FL_ti1", "FL_tm", "FL_to1", "FL_to", 
 "HR", "HR_ti", "HR_ti1", "HR_tm", "HR_to1", "HR_to"]

# Create empty array for annotations 
step_annotations = []


# Function to load video frames in as a NumPy array
def load_video_as_frames(video_path):
    reader = imageio.get_reader(video_path)  # Open the video file
    frames = [frame for frame in reader]  # Read all frames
    return np.stack(frames, axis=0)  # Convert list of frames to a NumPy array (T, H, W, C)

# Function to extract points and labels from the existing DLC csv (currently you need to adjust this filename to your specific file name)
# ToDo: make file paths universal 
def extract_points_and_labels(df):
    dlc_prefix = [col[0] for col in df.columns if 'DLC_resnet50' in col[0]][0]
    points = []
    labels_csv = []
    likelihoods = []
    for frame_idx in range(len(df)):
        for bodypart in df.columns.levels[1]:  # Iterate over bodyparts
            try:
                x = df.loc[frame_idx, (dlc_prefix, bodypart, 'x')]
                y = df.loc[frame_idx, (dlc_prefix, bodypart, 'y')]
                likelihood = df.loc[frame_idx, (dlc_prefix, bodypart, 'likelihood')]
                points.append([frame_idx, y, x])  # Napari uses (frame, y, x) order
                labels_csv.append(bodypart)  # Add the bodypart as a label
                likelihoods.append(likelihood)  # Append the likelihood
            except KeyError:
                continue
    return np.array(points), labels_csv, likelihoods

# Create a label Widget 
def create_label_menu(points_layer, labels):
    """Create a label menu widget that can be added to the napari viewer dock."""
    label_menu = ComboBox(label='feature_label', choices=labels)
    label_widget = Container(widgets=[label_menu])

    def update_label_menu(event):
        """Update the label menu when the point selection changes."""
        new_label = str(points_layer.current_properties['label'][0])
        if new_label != label_menu.value:
            label_menu.value = new_label

    points_layer.events.current_properties.connect(update_label_menu)

    def label_changed(selected_label):
        """Update the Points layer when the label menu selection changes."""
        current_properties = points_layer.current_properties
        current_properties['label'] = np.asarray([selected_label])
        points_layer.current_properties = current_properties
        points_layer.refresh_colors()

    label_menu.changed.connect(label_changed)
    return label_widget

def point_annotator(
    video_path: str,
    csv_path: str,
    labels: List[str],
):
    """Create a GUI for annotating points in a series of images.

    Parameters
    ----------
    video_path : str
        Path to the video file.
    csv_path : str
        Path to the CSV file containing points data.
    labels : List[str]
        List of the labels for each keypoint to be annotated (e.g., the body parts to be labeled).
    """
    # Load video frames
    stack = load_video_as_frames(video_path)

    # Load and process CSV data
    df = pd.read_csv(csv_path, header=[0, 1, 2])
    points_data, labels_data, likelihood_data = extract_points_and_labels(df)
    unmod_points_data, unmod_labels_data, unmod_likelihood_data = extract_points_and_labels(df)
   

    # Initialize the Napari viewer
    viewer = napari.view_image(stack, name="Video Frames")
    viewer.dims.set_point(0, 0)  # Ensure the first frame (index 0) is displayed


    # Add the existing points layer from the CSV file
    viewer.add_points(
        points_data,
        size=4,  # Adjust point size as needed
        name="Body Points",
        face_color='red',
        properties={"labels": labels_data},  # Add the labels as properties
        text={"text": "{labels}", "anchor": "upper_left", "color": "white"}  # Display labels
    )

    # Add a new empty points layer for new label annoations 
    points_layer = viewer.add_points(
        ndim=3,
        property_choices={'label': labels},
        border_color='label',
        border_color_cycle=COLOR_CYCLE,
        symbol='o',
        face_color='transparent',
        border_width=0.5,  # fraction of point size
        size=3,
    )
    points_layer.border_color_mode = 'cycle'

    # Add the label menu widget to the viewer
    label_widget = create_label_menu(points_layer, labels)
    viewer.window.add_dock_widget(label_widget)

    @viewer.bind_key('Down')
    def next_label(event=None):
        """Keybinding to advance to the next label with wraparound."""
        
        # deselecting 
        points_layer.selected_data = set()  # Deselect all points
        points_layer.refresh()              # Ensure Napari updates the view
        # print("Selection cleared")
        
        current_properties = points_layer.current_properties
        current_label = current_properties['label'][0]
        # print("Current label: ", current_label)
    
        ind = list(labels).index(current_label)
        new_ind = (ind + 1) % len(labels)
        new_label = labels[new_ind]
    
        current_properties['label'] = np.array([new_label])
        points_layer.current_properties = current_properties
        points_layer.refresh_colors()
    
        # print("Layer label array: ", points_layer.properties['label'])
    
    def next_on_click(layer, event):
        """Mouse click binding to advance the label when a point is added."""
        if layer.mode == 'add': #and len(layer.data) > 0:
            # Step 1: Deselect the last added point
            layer.selected_data = set()  # This clears the selection
            layer.refresh()             # Refresh to process deselection
        
            # print("Selection cleared. Advancing label after delay...")

            # Step 2: Add a short delay to ensure deselection is processed
            QTimer.singleShot(100, lambda: next_label())  # 100 ms delay


    points_layer.mode = 'add'
    points_layer.mouse_drag_callbacks.append(next_on_click)
    
    # Deselect points when changing frames
    @viewer.dims.events.current_step.connect
    def clear_selection_on_frame_change(event):
        points_layer.selected_data = set()  # Deselect all points
        points_layer.refresh()              # Ensure Napari updates the view
        # print("Selection cleared on frame change.")


    @viewer.bind_key('Up')
    def prev_label(event):
        """Keybinding to decrement to the previous label with wraparound."""
        current_properties = points_layer.current_properties
        current_label = current_properties['label'][0]
        ind = list(labels).index(current_label)
        n_labels = len(labels)
        new_ind = ((ind - 1) + n_labels) % n_labels
        new_label = labels[new_ind]
        current_properties['label'] = np.array([new_label])
        points_layer.current_properties = current_properties
        points_layer.refresh_colors()
        
    @viewer.bind_key('Ctrl-Right')
    def note_right_step(event): 
        """Keybinding to annotate a right step"""   
        # print("ctrl + right was pressed")
        current_frame = viewer.dims.current_step[0]  # Get the current frame index
        step_annotations.append([current_frame, 'right']) 
        # print(step_annotations)
        
        viewer.status = f"Right step annotated at frame {current_frame}!"
        
        
    @viewer.bind_key('Ctrl-Left')
    def note_right_step(event): 
        """Keybinding to annotate a left step"""   
        # print("ctrl + left was pressed")
        current_frame = viewer.dims.current_step[0]  # Get the current frame index
        step_annotations.append([current_frame, 'left'])   # Get the current frame index and step indication 
        # print(step_annotations)
        
        viewer.status = f"Left step annotated at frame {current_frame}!"
        

    @viewer.bind_key('Ctrl-d')
    def save_data(event=None):
        """Save data when Ctrl+d is pressed."""
        # print("Ctrl+d pressed! Saving data...")

        # Ensure the points layer state is updated
        points_layer.refresh()  # Refresh the points layer to ensure the latest data is retrieved

        # Prepare data for the output CSV
        output_data = {}

        
        # Check if points from the old data were moved (either x and/or y coord)
        # If so, assign a likelihood of 1 to them
        
        # Ensure both original and modified data are aligned
        assert len(points_data) == len(unmod_points_data), "Mismatch in data length."

        # Process old data (points from the CSV file)
        for label, x, y, likelihood, frame, unmod_x, unmod_y in zip(
            labels_data,
            points_data[:, 2],  # Modified x
            points_data[:, 1],  # Modified y
            likelihood_data,
            points_data[:, 0].astype(int),  # Frame number
            unmod_points_data[:, 2],  # Unmodified x
            unmod_points_data[:, 1],  # Unmodified y
        ):
            if label not in output_data:
                output_data[label] = {"x": [], "y": [], "likelihood": []}

            # Expand the lists to accommodate the frame index
            while len(output_data[label]["x"]) <= frame:
                output_data[label]["x"].append("")
                output_data[label]["y"].append("")
                output_data[label]["likelihood"].append("")

            # Check if the point has moved (in x or y)
            if x != unmod_x or y != unmod_y:
                # print(f"Point moved for label '{label}' in frame {frame}: ({unmod_x}, {unmod_y}) → ({x}, {y})")
                likelihood = 1.0  # Assign likelihood of 1 if modified

            # Assign the data to the correct frame index
            output_data[label]["x"][frame] = x
            output_data[label]["y"][frame] = y
            output_data[label]["likelihood"][frame] = likelihood

        # Process new data (user-added points)
        new_labels = points_layer.properties['label']
        new_frames = points_layer.data[:, 0].astype(int)
        new_points_x = points_layer.data[:, 2]
        new_points_y = points_layer.data[:, 1]

        for label, frame, x, y in zip(new_labels, new_frames, new_points_x, new_points_y):
            if label not in output_data:
                output_data[label] = {"x": [], "y": [], "likelihood": []}
    
            # Expand the lists to accommodate the frame index
            while len(output_data[label]["x"]) <= frame:
                output_data[label]["x"].append("")
                output_data[label]["y"].append("")
                output_data[label]["likelihood"].append("")
    
            # Assign the new data to the correct frame index
            output_data[label]["x"][frame] = x
            output_data[label]["y"][frame] = y
            output_data[label]["likelihood"][frame] = 1.0  # Assign likelihood of 1.0 for new points


        # Add step annotations to a new column
        step_annotations_dict = {frame: step for frame, step in step_annotations}

        # Generate output filename by appending "_annotations" to the original CSV file name
        output_csv_name = csv_path.stem + "_annotations.csv"
        output_csv_path = csv_path.parent / output_csv_name

        # Write data to CSV
        with open(output_csv_path, "w", newline="") as csvfile:
            writer = csv.writer(csvfile)

            # Write header
            header = ["ID"]  # Add ID as the first column
            for label in output_data.keys():
                header.extend([f"{label}_x", f"{label}_y", f"{label}_likelihood"])
            header.append("Step_Annotation")
            writer.writerow(header)

            # Find the maximum number of rows across all labels
            max_rows = max(len(values["x"]) for values in output_data.values())

            # Write rows
            for i in range(max_rows):
                row = [i + 1]  # Add ID (starting from 1)
                for label in output_data.keys():
                    x = output_data[label]["x"][i] if i < len(output_data[label]["x"]) else ""
                    y = output_data[label]["y"][i] if i < len(output_data[label]["y"]) else ""
                    likelihood = (
                        output_data[label]["likelihood"][i]
                        if i < len(output_data[label]["likelihood"])
                        else ""
                    )
                    row.extend([x, y, likelihood])
                row.append(step_annotations_dict.get(i, ""))
                writer.writerow(row)

        # Notify the user in the Napari viewer
        viewer.status = f"Data saved to {output_csv_name}!"
        # print("Data saved to CSV:", output_csv_path)
        
        viewer.status = f"Data was saved to csv!"

# Define file paths
video_file = Path(video_files[0])  # Use the first file in your video_files list
csv_file = Path(csv_files[0])  # Use the first file in your csv_files list

# Call the function
point_annotator(video_file, csv_file, labels)


In [33]:
# ----------------------  Read in annotated csv and modify it ------------------------------------------

# Enable Qt event loop for Jupyter
%gui qt

# ---------- Step 1: Load Video Frames ----------
def load_video_as_frames(video_path):
    reader = imageio.get_reader(video_path)
    frames = [frame for frame in reader]
    return np.stack(frames, axis=0)

# ---------- Step 2: Load the Annotations CSV ----------
annot_csv_path = glob(os.path.join(selected_directory, '**', '*annotations.csv'), recursive=True)[0]
print("Annotation CSV Path:", annot_csv_path)

# Load CSV
df = pd.read_csv(annot_csv_path)

# Store original data for comparison
original_df = df.copy()

# ---------- Step 3: Extract Annotation Points ----------
points = []
labels = []
likelihoods = []
step_points = []   # For step annotations (left/right)
step_labels = []   # Store "left" or "right" labels

# Iterate through all frames
for idx, row in df.iterrows():
    for col in df.columns:
        if col.endswith('_x'):
            label = col[:-2]  # Extract the label (remove '_x')
            x = row[col]
            y = row[f'{label}_y']
            likelihood = row.get(f'{label}_likelihood', 0.0)

            if pd.notna(x) and pd.notna(y):
                points.append([idx, y, x])  # (frame, y, x)
                labels.append(label)
                likelihoods.append(likelihood)

    # ---------- Step 4: Extract Step Annotations ----------
    if 'Step_Annotation' in df.columns:
        step = row['Step_Annotation']
        if pd.notna(step) and step in ['left', 'right']:
            step_points.append([idx, 20, 20])  # Example position
            step_labels.append(step)

# Convert to NumPy arrays
points = np.array(points)
step_points = np.array(step_points)

# ---------- Step 5: Load Video ----------
video_file = Path(video_files[0])  # Assuming video_files is defined
video_stack = load_video_as_frames(video_file)

# ---------- Step 6: Display in Napari ----------
viewer = napari.Viewer()

# Add the video stack
viewer.add_image(video_stack, name="Video Frames")

# Show frame 1 (index 0) on startup
viewer.dims.set_point(0, 0)

# ---------- Add Keypoint Annotations ----------
points_layer = viewer.add_points(
    points,
    size=4,
    name="All Annotations",
    face_color='red',
    properties={"labels": labels, "likelihood": likelihoods},
    text={"text": "{labels}", "anchor": "upper_left", "color": "white"}
)

# ---------- Add Step Annotations (Left/Right) ----------
if len(step_points) > 0:
    viewer.add_points(
        step_points,
        size=1,
        name="Step Annotations",
        face_color='transparent',
        border_color='red',
        properties={"step": step_labels},
        text={"text": "{step}", "anchor": "center", "color": "red", "size": 25}
    )

# ---------- Step 7: Saving Logic (Ctrl + D) ----------
@viewer.bind_key('Ctrl-d')
def save_annotations(event=None):
    """Save annotations when Ctrl+D is pressed."""
    updated_points = points_layer.data
    updated_labels = points_layer.properties['labels']

    # Create a dictionary to store the updated data
    output_data = {col: [] for col in df.columns}

    for idx in range(len(df)):
        row = df.iloc[idx].copy()

        # Update points
        for col in df.columns:
            if col.endswith('_x'):
                label = col[:-2]
                original_x = original_df.at[idx, col]
                original_y = original_df.at[idx, f'{label}_y']
                original_likelihood = original_df.at[idx, f'{label}_likelihood']

                # Find the corresponding point
                mask = (updated_points[:, 0] == idx) & (np.array(updated_labels) == label)
                if np.any(mask):
                    new_y, new_x = updated_points[mask][0][1:]
                    
                    # Check if point moved
                    if (new_x != original_x) or (new_y != original_y):
                        row[col] = new_x
                        row[f'{label}_y'] = new_y
                        row[f'{label}_likelihood'] = 1.0  # Point moved
                    else:
                        row[f'{label}_likelihood'] = original_likelihood  # Point unchanged

        # Add to output data
        for col in df.columns:
            output_data[col].append(row[col])

    # Save updated annotations
    output_csv_path = Path(annot_csv_path)
    with open(output_csv_path, "w", newline="") as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(df.columns)  # Write header
        for i in range(len(df)):
            writer.writerow([output_data[col][i] for col in df.columns])

    viewer.status = f"Annotations saved to {output_csv_path}!"

# Keep Napari running
napari.run()

Annotation CSV Path: G:/My Drive/Study/Climbing_Robot/Wrist_angle_project/Lizards/DLC/Dragons/aburn1_up_01\aburn1_up_01_enhDLC_resnet50_lizardpaper21DJul5shuffle1_500000_annotations.csv


  return func(*args, **kwargs)
