Created on Tue Aug 27 22:21:22 2024

@author: Santiago D'hers

Use:

- This notebook will use .H5 files (from DeepLabCut) to prepare the position.csv files to be analyzed

- It filters out low likelihood positions, interpolates and smoothens the data

- The positions are scaled from pixels to cm for better generalization


Requirements:

- A folder with files of extention .H5 (from DeepLabCut) containing:

    - The position of the desired bodyparts and objects on the video

In [11]:
import os
from glob import glob
import pandas as pd
import numpy as np

import plotly.graph_objects as go

import shutil
import random

from scipy import signal

In [12]:
# State your path:
base = r'C:\Users\dhers\OneDrive - UBA\workshop'
experiment = r'Interferencia'
path = os.path.join(base, experiment)

all_h5_files = glob(os.path.join(path,"*position.h5"))
filter_example = 'TS' # You can pick an example that contains a specific string on its name

trials  = ["Hab", "TR1", "TR2", "TS"] # your filenames must contain the group name, these groups will be used to organize the files into folders 

tolerance = 'mean' # State the likelihood limit under which the coordenate will be erased. If 'mean' or 'median, the tolerance will be calculated separately for each bodypart
certainty = 2 # Between 0 and 5, say how many std_dev away from the mean the points should be erased (it is similar to asking "how good is your tracking?")
drop = 0.5 # State the drop in likelihood for the points to be erased from the dataframe

bodypart = 'nose' # State which bodypart you'd like to plot as an example
objects = ['obj_1', 'obj_2'] # Name the stationary objects that may appear on your data

measured_points = ['L_ear', 'R_ear'] # Measure the distance between two bodyparts to scale the video
measured_dist = 1.8 # State the distance between the measured points

video_fps = 30 # State the frames per second

In [13]:
def choose_example(files: list, filter_word: str = 'TS') -> str:
    """Picks an example file from a list of files.

    Args:
        files (list): List of files to choose from.
        filter_word (str, optional): Word to filter files by. Defaults to 'TS'.

    Returns:
        str: Name of the chosen file.
    """

    filtered_files = [file for file in files if filter_word in file] 

    if not filtered_files:
        print("No files found with the specified word")
        example = random.choice(files)
        print(f"Plotting coordinates from {os.path.basename(example)}")

    else:
        # Choose one file at random to use as example
        example = random.choice(filtered_files)
        print(f"Plotting coordinates from {os.path.basename(example)}")

    return example

example_path = choose_example(all_h5_files, filter_example)

Plotting coordinates from 2023-11_Interferencia_TS_R03_C01_B_L_position.h5


In [14]:
def open_h5(path: str, print_data: bool = False) -> tuple:
    """Opens an h5 file and returns the data as a pandas dataframe.

    Args:
        path (str): Path to the h5 file.

    Returns:
        tuple: A tuple containing the dataframe and a list of bodyparts.
    """
    
    df = pd.read_hdf(path)
    scorer = df.columns.levels[0][0]
    bodyparts = df.columns.levels[1].to_list()
    df = df[scorer]

    df_raw = pd.DataFrame()

    for key in df.keys():
        df_raw[str(key[0]) + "_" + str(key[1])] = df[key]

    if print_data:
        print(f"Positions obtained by model: {scorer}")
        print(f"Points in df: {bodyparts}")
        for point in bodyparts:
            median = df_raw[f'{point}_likelihood'].median()
            mean = df_raw[f'{point}_likelihood'].mean()
            std_dev = df_raw[f'{point}_likelihood'].std()
            print(f'{point} \t median: {median:.2f} \t mean: {mean:.2f} \t std_dev: {std_dev:.2f} \t tolerance: {mean - certainty*std_dev:.2f}')

    return df_raw, bodyparts

# Open the file
df_raw, bodyparts = open_h5(example_path, print_data=True)

Positions obtained by model: DLC_resnet50_VaderDec1shuffle1_200000
Points in df: ['L_ear', 'R_ear', 'body', 'head', 'neck', 'nose', 'obj_1', 'obj_2', 'tail_1', 'tail_2', 'tail_3']
L_ear 	 median: 1.00 	 mean: 0.98 	 std_dev: 0.09 	 tolerance: 0.80
R_ear 	 median: 1.00 	 mean: 0.98 	 std_dev: 0.10 	 tolerance: 0.79
body 	 median: 1.00 	 mean: 0.99 	 std_dev: 0.09 	 tolerance: 0.81
head 	 median: 1.00 	 mean: 0.98 	 std_dev: 0.10 	 tolerance: 0.78
neck 	 median: 1.00 	 mean: 0.98 	 std_dev: 0.10 	 tolerance: 0.78
nose 	 median: 1.00 	 mean: 0.97 	 std_dev: 0.15 	 tolerance: 0.67
obj_1 	 median: 1.00 	 mean: 1.00 	 std_dev: 0.01 	 tolerance: 0.97
obj_2 	 median: 1.00 	 mean: 1.00 	 std_dev: 0.01 	 tolerance: 0.98
tail_1 	 median: 1.00 	 mean: 0.99 	 std_dev: 0.10 	 tolerance: 0.79
tail_2 	 median: 0.99 	 mean: 0.97 	 std_dev: 0.10 	 tolerance: 0.77
tail_3 	 median: 1.00 	 mean: 0.98 	 std_dev: 0.10 	 tolerance: 0.78


In [15]:
def filter_and_smooth_df(data: pd.DataFrame, bodyparts: list, objects: list, drop_below: float = 0.5, llhd_lim: str = 'mean', num_std: float = 2) -> pd.DataFrame:
    """Filters and smooths a DataFrame of coordinates.

    Args:
        data (pd.DataFrame): DataFrame of coordinates.
        bodyparts (list): List of bodyparts to filter.
        objects (list): List of objects to filter.
        drop_below (float, optional): Minimum likelihood to keep a bodypart. Defaults to 0.1.
        llhd_lim (str, optional): Method to use to determine the likelihood threshold. Defaults to 'mean'.
        num_std (float, optional): Number of standard deviations to use as the threshold. Defaults to 2.

    Returns:
        pd.DataFrame: Filtered and smoothed DataFrame of coordinates.
    """
    df = data.copy()

    # Try different filtering parameters
    med_filt_window = 3
    sigma, n_sigmas = 0.6, 2
    N = int(2 * n_sigmas * sigma + 1)

    # Gaussian kernel
    gauss_kernel = signal.windows.gaussian(N, sigma)
    gauss_kernel = gauss_kernel / sum(gauss_kernel)
    pad_width = (len(gauss_kernel) - 1) // 2

    for point in bodyparts:

        median = df[f'{point}_likelihood'].median()
        mean = df[f'{point}_likelihood'].mean()
        std_dev = df[f'{point}_likelihood'].std()
            
        if llhd_lim == 'mean':
            limit = mean - num_std*std_dev
        elif llhd_lim == 'median':
            limit = median - num_std*std_dev
        else:
            limit = llhd_lim

        # Set x and y coordinates to NaN where the likelihood is below the tolerance limit
        df.loc[df[f'{point}_likelihood'] < limit, [f'{point}_x', f'{point}_y']] = np.nan
        
        for axis in ['x','y']:
            column = f'{point}_{axis}'

            # Interpolate using the pchip method
            df[column] = df[column].interpolate(method='pchip', limit_area='inside')
            
            # Forward fill the remaining NaN values
            df[column] = df[column].ffill() #.bfill()
            
            # Apply median filter
            df[column] = signal.medfilt(df[column], kernel_size = med_filt_window)
            
            # Pad the median filtered data to mitigate edge effects
            padded = np.pad(df[column], pad_width, mode='edge')
            
            # Apply convolution
            smooth = signal.convolve(padded, gauss_kernel, mode='valid')
            
            # Trim the padded edges to restore original length
            df[column] = smooth[:len(df[column])]

            for obj in objects:
                if obj in column:
                    df[column] = df[column].median()

        # If the likelihood of an object is too low, probably the object is not there. Lets drop those columns
        if median < drop_below:
            df.drop([f'{point}_x', f'{point}_y', f'{point}_likelihood'], axis=1, inplace=True)
        
    return df

# Filter and smooth the example data
df_smooth = filter_and_smooth_df(df_raw, bodyparts, objects)

In [16]:
def plot_raw_vs_smoothed(df_raw, df_smooth, bodypart = 'nose', llhd_lim='mean', num_std=2):

    # Create figure
    fig = go.Figure()

    # Add traces for raw data
    for column in df_raw.columns:
        if bodypart in column:
            if 'likelihood' not in column:
                fig.add_trace(go.Scatter(x=df_raw.index, y=df_raw[column], mode='markers', name=f'raw {column}', marker=dict(symbol='x', size=6)))
            elif '_y' not in column:
                fig.add_trace(go.Scatter(x=df_raw.index, y=df_raw[column], name=f'{column}', line=dict(color='black', width=3), yaxis='y2',opacity=0.5))

    # Add traces for smoothed data
    for column in df_smooth.columns:
        if bodypart in column:
            if 'likelihood' not in column:
                fig.add_trace(go.Scatter(x=df_smooth.index, y=df_smooth[column], name=f'new {column}', line=dict(width=3)))

    median = df_raw[f'{bodypart}_likelihood'].median()
    mean = df_raw[f'{bodypart}_likelihood'].mean()
    std_dev = df_raw[f'{bodypart}_likelihood'].std()
        
    if llhd_lim == 'mean':
        limit = mean - num_std*std_dev
    elif llhd_lim == 'median':
        limit = median - num_std*std_dev
    else:
        limit = llhd_lim

    # Update layout for secondary y-axis
    fig.update_layout(
        xaxis=dict(title='Video frame'),
        yaxis=dict(title=f'{bodypart} position (pixels)'),
        yaxis2=dict(title=f'{bodypart} likelihood', 
                    overlaying='y', 
                    side='right',
                    gridcolor='black'),
        title=f'{bodypart} position & likelihood',
        legend=dict(yanchor="bottom",
                    y=1,
                    xanchor="center",
                    x=0.5,
                    orientation="h"),
        shapes=[dict(type='line', 
                    x0=df_raw.index.min(), 
                    x1=df_raw.index.max(), 
                    y0=limit, 
                    y1=limit, 
                    line=dict(color='green', dash='dash'),
                    yref='y2')],
        
    )

    # Show plot
    fig.show()

# Plot raw vs smoothed data
plot_raw_vs_smoothed(df_raw, df_smooth, bodypart = bodypart, llhd_lim = tolerance, num_std = certainty)

In [17]:
def find_scale(df: pd.DataFrame, measured_dist: float, measured_points: list, print_results: bool = False) -> float:
    """Plots the distance between ears and the mean and median distances.

    Args:
        df (pd.DataFrame): DataFrame containing the coordinates of the points.
        measured_dist (float): Measured distance between points.
        measured_points (list): List of strings containing the names of the points.

    Returns:
        scale (float): The scale factor to convert the measured distance to the actual distance.
    """

    df.dropna(inplace=True)

    A = measured_points[0]
    B = measured_points[1]

    # Calculate the distance between the two points
    dist = np.sqrt(
        (df[f'{A}_x'] - df[f'{B}_x'])**2 + 
        (df[f'{A}_y'] - df[f'{B}_y'])**2)

    dist.dropna(inplace=True)

    # Calculate the mean and median
    mean_dist = np.mean(dist)
    median_dist = np.median(dist)

    scale = (measured_dist / median_dist)

    if print_results:

        print(f'median distance is {median_dist}, mean distance is {mean_dist}. scale is {scale*100}')

        # Create the plot
        fig = go.Figure()

        # Add the distance trace
        fig.add_trace(go.Scatter(y=dist, mode='lines', name='Distance between ears'))

        # Add mean and median lines
        fig.add_trace(go.Scatter(y=[mean_dist]*len(dist), mode='lines', name=f'Mean: {mean_dist:.2f}', line=dict(color='red', dash='dash')))
        fig.add_trace(go.Scatter(y=[median_dist]*len(dist), mode='lines', name=f'Median: {median_dist:.2f}', line=dict(color='black')))

        # Update layout
        fig.update_layout(
            title=f'Distance between {A} and {B}',
            xaxis_title='Frame',
            yaxis_title='Distance (pixels)',
            legend=dict(yanchor="bottom",
                        y=1,
                        xanchor="center",
                        x=0.5,
                        orientation="h"),
        )

        # Show the plot
        fig.show()
    
    return scale

# plot scale
scale = find_scale(df_smooth, measured_dist, measured_points, print_results=True)

median distance is 38.87908782069537, mean distance is 38.40437419600777. scale is 4.629738249779252


In [18]:
def process_hdf5_file(files: list, objects: list = [], measured_dist: float = 1.8, measured_points: list = ['L_ear', 'R_ear'], fps:int = 30, llhd_lim: float = 'mean', num_std: float = 2, drop_below: float = 0.5):
    """Processes a list of HDF5 files and saves the smoothed data as a CSV file.

    Args:
        files (list): List of HDF5 files to process.
        objects (list): List of objects to process.
        measured_dist (float): Measured distance between points in cm.
        measured_points (list): List of reference points for the distance calculation.
        fps (int): Frames per second of the video.
        llhd_lim (float): Below this likelihood threshold, the value is erased.
        num_std (float): Number of standard deviations to consider.
        drop_below (float): Drop values if the median likelihood is below this threshold.
    """
    
    for h5_file in files:

        df_raw, bodyparts = open_h5(h5_file)

        df_smooth = filter_and_smooth_df(df_raw, bodyparts, objects)

        # Drop the likelihood columns
        df_smooth = df_smooth.drop(columns=df_smooth.filter(like='likelihood').columns)

        # Drop the frames when the mouse is not in the video
        df_smooth.dropna(inplace=True)
        
        # Use a constant that can be measured in real life to scale different sized videos from px to cm
        scale = find_scale(df_smooth, measured_dist, measured_points)
        df_smooth = df_smooth * scale            
        
        # Determine the output file path in the same directory as the input file
        # Split the path and filename
        input_dir, input_filename = os.path.split(h5_file)
        
        # Remove the original extension
        filename_without_extension = os.path.splitext(input_filename)[0]
        
        # Add the new extension '.csv'
        output_csv_path = os.path.join(input_dir, filename_without_extension + '.csv')
    
        # Save the processed data as a CSV file
        df_smooth.to_csv(output_csv_path, index=False)
        
        # Calculate the moment when the mouse enters the video
        mouse_enters = (len(df_raw) - len(df_smooth)) / fps

        print(f"{input_filename} has {df_smooth.shape[1]} columns. The mouse took {mouse_enters:.2f} sec to enter. scale is {scale*100:.2f}.")

# Process every file in the folder
process_hdf5_file(all_h5_files, objects, measured_dist, measured_points, video_fps, tolerance, certainty, drop)

2023-11_Interferencia_Hab_R01_C01_A_L_position.h5 has 18 columns. The mouse took 6.00 sec to enter. scale is 5.10.
2023-11_Interferencia_Hab_R02_C01_A_R_position.h5 has 18 columns. The mouse took 2.60 sec to enter. scale is 5.26.
2023-11_Interferencia_Hab_R03_C01_B_L_position.h5 has 18 columns. The mouse took 6.90 sec to enter. scale is 4.87.
2023-11_Interferencia_Hab_R04_C01_B_R_position.h5 has 18 columns. The mouse took 6.53 sec to enter. scale is 5.06.
2023-11_Interferencia_Hab_R05_C02_A_L_position.h5 has 18 columns. The mouse took 3.03 sec to enter. scale is 4.99.
2023-11_Interferencia_Hab_R06_C02_A_R_position.h5 has 18 columns. The mouse took 0.37 sec to enter. scale is 4.96.
2023-11_Interferencia_Hab_R07_C02_B_L_position.h5 has 18 columns. The mouse took 3.03 sec to enter. scale is 5.01.
2023-11_Interferencia_Hab_R08_C03_B_R_position.h5 has 18 columns. The mouse took 5.80 sec to enter. scale is 5.06.
2023-11_Interferencia_Hab_R09_C03_A_L_position.h5 has 18 columns. The mouse took

In [19]:
def filter_and_move_files(folder: str, subfolders: list):
    """Filters and moves files to a subfolder.
    """
    for subfolder in subfolders:
        # Create a new subfolder
        output_folder = os.path.join(folder, subfolder, "position")
        os.makedirs(output_folder, exist_ok=True)

        # Get a list of all files in the input folder
        files = [f for f in os.listdir(folder) if os.path.isfile(os.path.join(folder, f))]

        # Iterate through files, move those without the word "position" to the "extra" subfolder
        for file in files:
            if subfolder in file and ".csv" in file and "filtered" not in file:
                file_path = os.path.join(folder, file)
                output_path = os.path.join(output_folder, file)

                # Move the file to the "extra" subfolder
                shutil.move(file_path, output_path)

    print("Files filtered and moved successfully.")

    """
    It also cleans all other files in the folder into a subfolder
    """
    subfolder = os.path.join(folder, "h5 files & others")
    os.makedirs(subfolder, exist_ok=True)
        
    # Get a list of all files in the input folder
    other_files = [f for f in os.listdir(folder) if os.path.isfile(os.path.join(folder, f))]

    # Iterate through files, move those without the word "position" to the "extra" subfolder
    for file in other_files:
        file_path = os.path.join(folder, file)
        output_path = os.path.join(subfolder, file)
        
        # Move the file to the "extra" subfolder
        shutil.move(file_path, output_path)

    print("All .H5 files are stored away")

# Clean the folder
filter_and_move_files(path, trials)

Files filtered and moved successfully.
All .H5 files are stored away
