In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import h5py
import seaborn as sns
import matplotlib as mpl


# Load H5 file and print metadata 

In [None]:
filename = "model_comparison_6-13-2023.019_RaspiTestVid2_1000.analysis.h5"
with h5py.File(filename, "r") as f:
    dset_names = list(f.keys())
    locations = f["tracks"][:].T
    node_names = [n.decode() for n in f["node_names"][:]]

print("===filename===")
print(filename)
print()

print("===HDF5 datasets===")
print(dset_names)
print()

print("===locations data shape===")
print(locations.shape)
print()

print("===nodes===")
for i, name in enumerate(node_names):
    print(f"{i}: {name}")
print()

# Clean data by filling missing values

In [None]:
from scipy.interpolate import interp1d

def fill_missing(Y, kind="linear"):
    """Fills missing values independently along each dimension after the first."""

    # Store initial shape.
    initial_shape = Y.shape

    # Flatten after first dim.
    Y = Y.reshape((initial_shape[0], -1))

    # Interpolate along each slice.
    for i in range(Y.shape[-1]):
        y = Y[:, i]

        # Build interpolant.
        x = np.flatnonzero(~np.isnan(y))
        f = interp1d(x, y[x], kind=kind, fill_value=np.nan, bounds_error=False)

        # Fill missing
        xq = np.flatnonzero(np.isnan(y))
        y[xq] = f(xq)
        
        # Fill leading or trailing NaNs with the nearest non-NaN values
        mask = np.isnan(y)
        y[mask] = np.interp(np.flatnonzero(mask), np.flatnonzero(~mask), y[~mask])

        # Save slice
        Y[:, i] = y

    # Restore to initial shape.
    Y = Y.reshape(initial_shape)

    return Y

In [None]:
locations = fill_missing(locations)

In [None]:
NOSE_INDEX = 0
LEYE_INDEX = 1
REYE_INDEX = 2
HEAD_INDEX = 3
SPINE1_INDEX = 4
SPINE2_INDEX = 5
CAUDAL_INDEX = 6
TAIL_INDEX = 7

nose_loc = locations[:, NOSE_INDEX, :, :]
Leye_loc = locations[:, LEYE_INDEX, :, :]
Reye_loc = locations[:, REYE_INDEX, :, :]
head_loc = locations[:, HEAD_INDEX, :, :]
spine1_loc = locations[:, SPINE1_INDEX, :, :]
spine2_loc = locations[:, SPINE2_INDEX, :, :]
caudal_loc = locations[:, CAUDAL_INDEX, :, :]
tail_loc = locations[:, TAIL_INDEX, :, :]

In [None]:
sns.set('notebook', 'ticks', font_scale=1.2)
mpl.rcParams['figure.figsize'] = [15,6]

# Visualize position data for each node

In [None]:
tracking_data = [nose_loc, Leye_loc, Reye_loc, head_loc, spine1_loc, spine2_loc, tail_loc, caudal_loc]
titles = ['Nose', 'Leye', 'Reye', 'Head', 'Spine1', 'Spine2', 'Tail', 'Caudal']
colors = ['y', 'g', 'b', 'r', 'k']

fig, axs = plt.subplots(4, 2, figsize=(20, 20))

# Create an empty list to collect handles and labels from one subplot
legend_handles = []
legend_labels = []

for i in range(4):
    for j in range(2):
        ax = axs[i, j]
        ax.set_xlim(200, 1150)
        ax.set_ylim(0, 1024)
        ax.set_title(f'{titles[i * 2 + j]} tracks')

        for k, color in enumerate(colors):
            line, = ax.plot(tracking_data[i * 2 + j][:, 0, k], tracking_data[i * 2 + j][:, 1, k], color)
            if i == 0 and j == 0:
                # Collect handles and labels from the first subplot only
                legend_handles.append(line)
                legend_labels.append(f'Fish-{k + 1}')

# Create a single legend with labels from the first subplot only
fig.legend(legend_handles, legend_labels)

plt.tight_layout()
plt.show()

# testing strategy to estimate tank boundaries for potential filtering

In [None]:
from scipy.spatial import ConvexHull

points = np.vstack((tail_loc[:,0,0], tail_loc[:,1,0])).T

hull = ConvexHull(points)

fig, (ax1, ax2) = plt.subplots(ncols=2, figsize=(10, 3))

for ax in (ax1, ax2):
    ax.plot(points[:, 0], points[:, 1], color='k')
    if ax == ax1:
        ax.set_title('Given points')
    else:
        ax.set_title('Convex hull')
        for simplex in hull.simplices:
            ax.plot(points[simplex, 0], points[simplex, 1], 'c')
        ax.plot(points[hull.vertices, 0], points[hull.vertices, 1], 'o', mec='r', color='none', lw=1, markersize=10)
    ax.set_xticks(range(10))
    ax.set_yticks(range(10))
plt.show()

# Smooth data and differentiate to get velocity then visualize

In [None]:
from scipy.signal import savgol_filter

def smooth_diff(node_loc, win=25, poly=3):
    """
    node_loc is a [frames, 2] array
    
    win defines the window to smooth over
    
    poly defines the order of the polynomial
    to fit with
    
    """
    node_loc_vel = np.zeros_like(node_loc)
    
    for c in range(node_loc.shape[-1]):
        node_loc_vel[:, c] = savgol_filter(node_loc[:, c], win, poly, deriv=1)
    
    node_vel = np.linalg.norm(node_loc_vel,axis=1)

    return node_vel

In [None]:
# Define the tracking data and the number of individuals
tracking_data = [nose_loc, Leye_loc, Reye_loc, head_loc, spine1_loc, spine2_loc, caudal_loc, tail_loc]
num_individuals = 5

# Initialize a list to store the velocity data for each tracking point and individual
velocity_data = []

# Loop through each tracking point
for data in tracking_data:
    # Initialize a list to store the velocity data for each individual
    point_velocity = []
    
    # Loop through each individual
    for i in range(num_individuals):
        # Calculate the velocity for the current individual and tracking point
        vel = smooth_diff(data[:, :, i])
        point_velocity.append(vel)
    
    velocity_data.append(point_velocity)

    
# velocity data is a nested list. To index: velocity_data[0][1] for nose and FISH-2 

In [None]:
# Plotting velocity (heatmap)
fig = plt.figure(figsize=(15,7))
ax1 = fig.add_subplot(211)
ax1.plot(nose_loc[:, 0, 0], 'k', label='x')
ax1.plot(-1*nose_loc[:, 1, 0], 'g', label='y')
ax1.legend()
ax1.set_xticks([])
ax1.set_title('Nose Position')

fishnnode = velocity_data[0][0]
ax2 = fig.add_subplot(212, sharex=ax1)
ax2.imshow(fishnnode[:,np.newaxis].T, aspect='auto', vmin=0, vmax=10)
ax2.set_yticks([])
ax2.set_title('Velocity')

In [None]:
# plotting velocity intensity by position
fig = plt.figure(figsize=(15,6))
ax1 = fig.add_subplot(121)
ax1.plot(nose_loc[:, 0, 0], nose_loc[:, 1, 0], 'k')
ax1.set_xlim(200,1150)
ax1.set_xticks([])
ax1.set_ylim(0,1024)
ax1.set_yticks([])
ax1.set_title('Nose tracks (Fish1)')

kp = velocity_data[0][0]  
vmin = 0
vmax = 10

ax2 = fig.add_subplot(122)
scatter = ax2.scatter(nose_loc[:,0,0], nose_loc[:,1,0], c=kp, s=4, vmin=vmin, vmax=vmax)
ax2.set_xlim(200,1150)
ax2.set_xticks([])
ax2.set_ylim(0,1024)
ax2.set_yticks([])
ax2.set_title('Nose tracks colored by magnitude of swim speed (Fish1)')

cbar = plt.colorbar(scatter, ax=ax2)
cbar.set_label('Swim Speed')
cbar.set_ticks(np.arange(0,10,1)) 

In [None]:
# velcity densities, could be useful for determining threshold??

# nose velocity data 
nose_velocities = velocity_data[0][0]  

# Create a histogram of nose velocities
plt.figure(figsize=(8, 6))
plt.hist(nose_velocities, bins=20, density=True, alpha=0.6, color='b', label='Histogram')

# Create a KDE plot of nose velocities
sns.kdeplot(nose_velocities, color='r', label='KDE')

# Add labels and a legend
plt.xlabel('Nose Velocity')
plt.ylabel('Density')
plt.title('Distribution of Nose Velocities')
plt.legend()

# Show the plot
plt.show()

# Identify Erroneous Frames

In [None]:
x_data = nose_loc[:, 0, 3] 
y_data = nose_loc[:, 1, 3]
# Set a threshold for the maximum allowable velocity
velocity_threshold = 15 # Adjust this threshold as needed
euclidean_distance_threshold = 10

# Initialize a list to store potential error indices from velocity threshold and temporal consistency
error_indices_velocity = []
error_indices_TC = []
# Initialize a list to store final tracking error indices 
error_indices_final = []

# Find frames where the velocity exceeds the threshold
error_indices_velocity = np.where(velocity_data[0][3] > velocity_threshold)[0]


# Apply the temporal consistency approach by comparing Euclidean distances
for i in range(1, len(x_data)):
    if i > 0 and i < len(x_data) - 1:
        # Calculate the Euclidean distance between the current frame and its neighboring frames
        distance_prev = np.sqrt((x_data[i] - x_data[i - 1])**2 + (y_data[i] - y_data[i - 1])**2)
        
        
        # Check if both neighboring frames exhibit consistency
        if distance_prev > euclidean_distance_threshold:
            error_indices_TC.append(i)

# Create the final list of frames by finding frames that are in both lists
error_indices_final = list(set(error_indices_velocity) & set(error_indices_TC))


# Print the frames with potential tracking errors after applying temporal consistency by Euclidean distance
if error_indices_final:
    print("Potential tracking errors detected in frames:", error_indices_final)
else:
    print("No tracking errors detected.")



In [None]:
plt.scatter(x_data,y_data,c='b',s=70)
plt.scatter(x_data[error_indices_final],y_data[error_indices_final],c='r',s=10)