In [19]:
def estimate_trail_map(tracks, video_file, output_path, trail_props, antennal_pose_markers, body_pose_markers, dont_use_flags=True, remove_single_frame_tracks=False):
    '''
    Generates a trail map video from the cleaned data, using the provided trail properties and antennal pose markers.
    '''
    # generate the cleaned data
    cleaned_data = tracks.copy()
    # remove flagged frames
    if dont_use_flags:
        cleaned_data = [track[track['pose_flag'] == 0].reset_index(drop=True) for track in cleaned_data]
    # remove single frame tracks
    if remove_single_frame_tracks:
        cleaned_data = [track[track['single_frame'] == 0].reset_index(drop=True) for track in cleaned_data]

    cleaned_data = pd.concat(cleaned_data).sort_values(['frame_idx', 'segment']).reset_index(drop=True)


    # Get average body length
    x_diff = cleaned_data[body_pose_markers[0] + '.x'] - cleaned_data[body_pose_markers[1] + '.x']
    y_diff = cleaned_data[body_pose_markers[0] + '.y'] - cleaned_data[body_pose_markers[1] + '.y']
    body_length = np.nanmean(np.sqrt(x_diff**2 + y_diff**2))

    # Set the spread of the trail in pixels
    sd = trail_props['spread'] * body_length

    # Get the maximum frame index
    max_frame = int(cleaned_data['frame_idx'].max())

    # Create the empty trail map
    trail_map = np.zeros(DIMENSION)

    # Set up video reader
    cap = cv2.VideoCapture(video_file)

    # Set up video writer
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, FRAME_RATE, (DIMENSION[0], DIMENSION[1]), isColor=False)

    # Loop over the frames
    for frame in tqdm(range(max_frame//20)):

        # move the video to the frame
        cap.set(cv2.CAP_PROP_POS_FRAMES, frame)

        # Read the frame
        ret, back = cap.read()
        if not ret:
            print('Error reading frame')
            break
        # convert to grayscale
        back = cv2.cvtColor(back, cv2.COLOR_BGR2GRAY)

        # Decay the trail map
        trail_map = trail_map * trail_props['decay'] ** (1 / FRAME_RATE)
        
        # Get the tracks at the current frame
        current_tracks = cleaned_data[cleaned_data['frame_idx'] == frame]
        
        # Get the position of the source
        source_x = current_tracks[trail_props['source'] + '.x'].values
        source_y = current_tracks[trail_props['source'] + '.y'].values

        # Add a 2D Gaussian to the trail map
        for x, y in zip(source_x, source_y):
            # skip if the position is nan
            if np.isnan(x) or np.isnan(y):
                continue
            addition = norm.pdf(np.arange(DIMENSION[0]), x, sd)[:, None] * norm.pdf(np.arange(DIMENSION[1]), y, sd)
            # Normalize the addition
            addition = addition / addition.max()
            trail_map += addition

        # Get the indices of the cleaned data at the current frame
        indices = cleaned_data[cleaned_data['frame_idx'] == frame].index
        
        # Get the position of the antennae at the current frame
        antennae_x = cleaned_data.loc[indices, [marker + '.x' for marker in antennal_pose_markers]].values
        antennae_y = cleaned_data.loc[indices, [marker + '.y' for marker in antennal_pose_markers]].values

        # Loop over the indices and add the trail value to the cleaned data
        f = interp2d(np.arange(DIMENSION[0]), np.arange(DIMENSION[1]), trail_map)
        for i, idx in enumerate(indices):
            for j, marker in enumerate(antennal_pose_markers):
                if np.isnan(antennae_x[i, j]) or np.isnan(antennae_y[i, j]):
                    continue
                x = int(np.clip(antennae_x[i, j], 0, DIMENSION[0] - 1))
                y = int(np.clip(antennae_y[i, j], 0, DIMENSION[1] - 1))
                # Interpolate the value of the trail map at the position of the antennae
                cleaned_data.loc[idx, marker + '.trail'] = trail_map[x, y]
                cleaned_data.loc[idx, marker + '.trail_normalized'] = (trail_map[x, y] - trail_map.min()) / (trail_map.max() - trail_map.min())

        # Write the frame to the video in grayscale
        frame_to_write = (trail_map - trail_map.min()) / (trail_map.max() - trail_map.min()) * 255
        frame_to_write = frame_to_write.astype(np.uint8)

        # overlay back with the trail map
        frame_to_write = cv2.addWeighted(back, 0.5, frame_to_write, 0.5, 0)


        # Draw the ants on the frame (all pose markers)
        for idx in indices:
            # define the variables as nan
            petiole = (np.nan, np.nan)
            tip_of_head = (np.nan, np.nan)
            ovipositor = (np.nan, np.nan)
            antennaL = (np.nan, np.nan)
            antennaR = (np.nan, np.nan)
            
            # Get the positions of the pose markers
            try:
                petiole = (int(cleaned_data.loc[idx, 'petiole.y']), int(cleaned_data.loc[idx, 'petiole.x']))
                frame_to_write = cv2.circle(frame_to_write, petiole, 5, 255, -1)
            except:
                pass

            try:
                tip_of_head = (int(cleaned_data.loc[idx, 'tip_of_head.y']), int(cleaned_data.loc[idx, 'tip_of_head.x']))
                frame_to_write = cv2.circle(frame_to_write, tip_of_head, 5, 255, -1)
            except:
                pass

            try:
                ovipositor = (int(cleaned_data.loc[idx, 'ovipositor.y']), int(cleaned_data.loc[idx, 'ovipositor.x']))
                frame_to_write = cv2.circle(frame_to_write, ovipositor, 5, 255, -1)
            except:
                pass

            try:
                antennaL = (int(cleaned_data.loc[idx, 'antennaL.y']), int(cleaned_data.loc[idx, 'antennaL.x']))
                size = int((cleaned_data.loc[idx, 'antennaL.trail'] - trail_map.min()) / (trail_map.max() - trail_map.min()) * 30)
                frame_to_write = cv2.circle(frame_to_write, antennaL, size, 255, -1)
            except:
                pass

            try:
                antennaR = (int(cleaned_data.loc[idx, 'antennaR.y']), int(cleaned_data.loc[idx, 'antennaR.x']))
                size = int((cleaned_data.loc[idx, 'antennaR.trail'] - trail_map.min()) / (trail_map.max() - trail_map.min()) * 30)
                frame_to_write = cv2.circle(frame_to_write, antennaR, size, 255, -1)
            except:
                pass

            # Draw lines connecting pose markers
            try:
                frame_to_write = cv2.line(frame_to_write, petiole, tip_of_head, 255, 1)
            except:
                pass

            try:
                frame_to_write = cv2.line(frame_to_write, petiole, ovipositor, 255, 1)
            except:
                pass
            

            try:
                frame_to_write = cv2.line(frame_to_write, tip_of_head, antennaL, 255, 1)
            except:
                pass

            try:
                frame_to_write = cv2.line(frame_to_write, tip_of_head, antennaR, 255, 1)
            except:
                pass

            # draw an arrow from the higher antennae to the lower antennae to indicate direction
            try:
                offset = 30
                # shift the points to the center of the ant
                petiole = (int(cleaned_data.loc[idx, 'petiole.y']), int(cleaned_data.loc[idx, 'petiole.x']))
                tip_of_head = (int(cleaned_data.loc[idx, 'tip_of_head.y']), int(cleaned_data.loc[idx, 'tip_of_head.x']))
                body_axis_unit_vector = np.array([tip_of_head[0] - petiole[0], tip_of_head[1] - petiole[1]])
                body_axis_unit_vector = body_axis_unit_vector / np.linalg.norm(body_axis_unit_vector)

                # get the antennae positions by shifting them to the center of the ant
                antennaL = (int(cleaned_data.loc[idx, 'antennaL.y']), int(cleaned_data.loc[idx, 'antennaL.x']))
                antennaR = (int(cleaned_data.loc[idx, 'antennaR.y']), int(cleaned_data.loc[idx, 'antennaR.x']))
                # move point along the body axis
                antennaL = (antennaL[0] + int(body_axis_unit_vector[0] * offset), antennaL[1] + int(body_axis_unit_vector[1] * offset))
                antennaR = (antennaR[0] + int(body_axis_unit_vector[0] * offset), antennaR[1] + int(body_axis_unit_vector[1] * offset))

                # draw the arrow for antennal difference
                if abs(cleaned_data.loc[idx, 'antennaR.trail_normalized'] - cleaned_data.loc[idx, 'antennaL.trail_normalized']) > 1e-3:
                    if cleaned_data.loc[idx, 'antennaR.trail_normalized'] > cleaned_data.loc[idx, 'antennaL.trail_normalized']:
                        frame_to_write = cv2.arrowedLine(frame_to_write, antennaL, antennaR, 255, 10, tipLength=0.5)
                        # write the text
                        frame_to_write = cv2.putText(frame_to_write, f'{cleaned_data.loc[idx, "antennaR.trail_normalized"] - cleaned_data.loc[idx, "antennaL.trail_normalized"]:.2f}', antennaL, cv2.FONT_HERSHEY_SIMPLEX, 1, 255, 2, cv2.LINE_AA)
                    else:
                        frame_to_write = cv2.arrowedLine(frame_to_write, antennaR, antennaL, 255, 10, tipLength=0.5)
                        # write the text
                        frame_to_write = cv2.putText(frame_to_write, f'{cleaned_data.loc[idx, "antennaR.trail_normalized"] - cleaned_data.loc[idx, "antennaL.trail_normalized"]:.2f}', antennaL, cv2.FONT_HERSHEY_SIMPLEX, 1, 255, 2, cv2.LINE_AA)

                # shift the points another 10 pixels along the body axis
                antennaL = (antennaL[0] + int(body_axis_unit_vector[0] * offset), antennaL[1] + int(body_axis_unit_vector[1] * offset))
                antennaR = (antennaR[0] + int(body_axis_unit_vector[0] * offset), antennaR[1] + int(body_axis_unit_vector[1] * offset))

                # draw the arrow for average future turn direction
                # verify that the average future turn direction is not nan or 0
                if not np.isnan(cleaned_data.loc[idx, 'average_future_turn_direction']) and cleaned_data.loc[idx, 'average_future_turn_direction'] != 0:
                    if cleaned_data.loc[idx, 'average_future_turn_direction'] > 0:
                        frame_to_write = cv2.arrowedLine(frame_to_write, antennaR, antennaL, 0, 10, tipLength=0.5)
                        # write the text
                        frame_to_write = cv2.putText(frame_to_write, f'{cleaned_data.loc[idx, "average_future_turn_direction"]:.2f}', antennaR, cv2.FONT_HERSHEY_SIMPLEX, 1, 0, 2, cv2.LINE_AA)
                    else:
                        frame_to_write = cv2.arrowedLine(frame_to_write, antennaL, antennaR, 0, 10, tipLength=0.5)
                        # write the text
                        frame_to_write = cv2.putText(frame_to_write, f'{cleaned_data.loc[idx, "average_future_turn_direction"]:.2f}', antennaL, cv2.FONT_HERSHEY_SIMPLEX, 1, 0, 2, cv2.LINE_AA)
            except:
                pass

        # Write the frame to the video
        out.write(frame_to_write.astype(np.uint8))

    # Release the video writer
    out.release()

    # Release the video reader
    cap.release()

    # convert back to tracks by segment number
    tracks = []
    for segment in cleaned_data['segment'].unique():
        track = cleaned_data[cleaned_data['segment'] == segment].copy()
        tracks.append(track)
    return tracks



In [20]:
# Example usage:
trail_props = {
    'source': 'ovipositor',
    'spread': 0.25,
    'decay': 1,
}
antennal_pose_markers = ['antennaL', 'antennaR']
body_pose_markers = ['ovipositor', 'tip_of_head']

# estimate the trail map for all videos and also save the trail map path
for data_file in list(database.keys()):
    video_file = os.path.join(raw_video_folder, data_file.split('.')[2][4:] + '.mp4')

    output_path = os.path.join(data_folder, data_file.replace('.csv', '_trail_map.mp4'))
    tracks = database[data_file]['tracks']
    # add pose distance threshold to trail props
    trail_props['tip_of_head.petiole.pose_distance_threshold'] = database[data_file]['tip_of_head.petiole.pose_distance_threshold']
    trail_props['petiole.ovipositor.pose_distance_threshold'] = database[data_file]['petiole.ovipositor.pose_distance_threshold']
    trail_props['tip_of_head.antennaL.pose_distance_threshold'] = database[data_file]['tip_of_head.antennaL.pose_distance_threshold']
    trail_props['tip_of_head.antennaR.pose_distance_threshold'] = database[data_file]['tip_of_head.antennaR.pose_distance_threshold']
    tracks = estimate_trail_map(tracks, video_file, output_path, trail_props, antennal_pose_markers, body_pose_markers)
    # save the trail map path
    database[data_file]['trail_map'] = output_path
    # save the tracks
    database[data_file]['tracks'] = tracks
    print(f'{data_file}: Trail map generated')

  0%|          | 0/313 [00:00<?, ?it/s]

labels.predicted_preclean.006_2018_05_25_19_58_cam_6_3.analysis.csv: Trail map generated


  0%|          | 0/209 [00:00<?, ?it/s]

labels.predicted_preclean.004_2018_05_25_19_58_cam_5_2.analysis.csv: Trail map generated


  0%|          | 0/200 [00:00<?, ?it/s]

labels.predicted_preclean.003_2018_05_25_19_58_cam_4_3.analysis.csv: Trail map generated


  0%|          | 0/396 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [46]:
# get centroid distance matrix
def get_centroid_distance_matrix(tracks):
    '''
    Get the centroid distance matrix for the tracks (end of one track to start of another)
    '''
    # get the number of tracks
    n_tracks = len(tracks)
    # create an empty distance matrix
    distance_matrix = np.zeros((n_tracks, n_tracks))
    # calculate the distance between each pair of tracks
    for i in range(n_tracks):
        for j in range(n_tracks):
            if i == j:
                distance_matrix[i, j] = np.inf
            distance_matrix[i, j] = distance.euclidean(tracks[i].iloc[-1][['centroid.x', 'centroid.y']].values, tracks[j].iloc[0][['centroid.x', 'centroid.y']].values)
    return distance_matrix

# get the time difference matrix
def get_time_difference_matrix(tracks):
    '''
    Get the time difference matrix for the tracks (end of one track to start of another)
    '''
    # get the number of tracks
    n_tracks = len(tracks)
    # create an empty time difference matrix
    time_matrix = np.zeros((n_tracks, n_tracks))
    # calculate the time difference between each pair of tracks
    for i in range(n_tracks):
        for j in range(n_tracks):
            time_matrix[i, j] = tracks[j].iloc[0]['time'] - tracks[i].iloc[-1]['time']
            # if the time difference is negative, set it to infinity
            if time_matrix[i, j] < 0:
                time_matrix[i, j] = np.inf
    return time_matrix

# get the cost matrix
def get_cost_matrix(tracks, distance_limit=1, time_limit=0.5, body_pose_markers=['ovipositor', 'tip_of_head']):
    '''
    Get the cost matrix for the tracks (end of one track to start of another)
    '''
    # get the number of tracks
    n_tracks = len(tracks)
    # get the average maximum body length
    body_lengths = []
    for track in tracks:
        x_diff = track[body_pose_markers[1] + '.x'] - track[body_pose_markers[0] + '.x']
        y_diff = track[body_pose_markers[1] + '.y'] - track[body_pose_markers[0] + '.y']
        body_lengths.append(np.nanmax(np.sqrt(x_diff**2 + y_diff**2)))
    body_length = np.nanmean(body_lengths)
    # convert the distance limit to pixels
    distance_limit = distance_limit * body_length
    # create an empty cost matrix
    cost_matrix = np.zeros((n_tracks, n_tracks))
    # get the centroid distance matrix
    distance_matrix = get_centroid_distance_matrix(tracks)
    # get the time difference matrix
    time_matrix = get_time_difference_matrix(tracks)
    # calculate the cost matrix
    for i in range(n_tracks):
        for j in range(n_tracks):
            if distance_matrix[i, j] < distance_limit and time_matrix[i, j] < time_limit:
                cost_matrix[i, j] = distance_matrix[i, j] + time_matrix[i, j]
            else:
                cost_matrix[i, j] = np.inf
    return cost_matrix

In [55]:
# get the cost matrix for the tracks of the first video
cost_matrix = get_cost_matrix(database[list(database.keys())[0]]['tracks'])
# plot the cost matrix
plt.imshow(cost_matrix, cmap='cool')
plt.colorbar()
plt.xlabel('Track')
plt.ylabel('Track')
plt.title('Cost matrix')
plt.show()

In [56]:
# for each track, get the best matching track to find any sequential tracks
matches = []
for i in range(cost_matrix.shape[0]):
    j = np.argmin(cost_matrix[i])
    if cost_matrix[i, j] < np.inf:
        matches.append((i, j))
print(matches)

[(2, 3), (3, 4), (8, 9), (10, 41), (12, 60), (21, 22), (30, 31), (32, 44), (35, 51), (43, 44), (47, 48), (50, 31), (51, 52), (77, 78), (78, 79), (89, 90), (90, 91), (95, 96), (101, 102), (107, 108), (110, 111), (116, 117), (117, 118), (129, 130), (133, 146), (142, 138), (145, 146)]


In [54]:
sequential_matches

[]