# Analysis of positions, movements, and speeds
Requires the honeybee data set found in the paper "Markerless tracking of an entire honey bee colony" (Bozek et al, 2021)
The data can be found at https://zenodo.org/records/4507648. This code uses the files titled S1-S5.tgz. Ensure that the data folder name is correct as "Markerless_Tracking_Data" and that it is on the same directory level as the "/code" folder.

Begin by running the packages along with the data storing code. From there, all remaining code is plots.

In [None]:
import matplotlib.pyplot as plt
import matplotlib.cm as cm
import numpy as np
import glob
import os # 'operating system' interaction 
import time

### Data Storing 

In [None]:
# create a dictionary to store the trajectories for each S{n}
trajectories = {}

# implement a way to save the plotted images to a folder
output_folder = "../plotted_images" # this will be the folder name. 
os.makedirs(output_folder, exist_ok=True) # ensures that the folder exists. if it does (True), just continue the program.
    # searches in current directory, so use ../plotted_images if it was a different parallel folder 

# timing for optimizations
start_time = time.time()

# now read through the file and store them all in the dictionary.  
for n in range(1,6): # goes 1,2,3,4,5 but does not include 6 

    # get the x and y values for testing 
    min_x = float('inf')
    min_y = float('inf')
    max_x = float('-inf')  # Initialize to negative infinity, every real number will be bigger
    max_y = float('-inf')


    folder = f"../Markerless_Tracking_Data/S{n}/S{n}/trajectories/*.txt" # use fstrings to apply this to all folders, rather than hardcoding 1 folder
    trajectories[f"S{n}"] = [] # initialize empty list for the dictionary

    for filename in glob.glob(folder): # glob finds file paths that match a pattern

        data = np.loadtxt(filename, delimiter=",", dtype=int)

        t, x, y, cell, angle = data.T # transpose to swap column and rows
                
        # now store the trajectories inside a tuple for future use
        trajectories[f"S{n}"].append((t, x, y, cell, angle)) 
        # each trajectories S{n} contains a tuple 
        # so S1 looks like [(t1,x1,y1...),(t2,x2,y2...),...]


        # get the max movement boundaries
        max_x = max(max_x, np.max(x)) # tests current max_x versus the new max(x)
        min_x = min(min_x, np.min(x))
        max_y = max(max_y, np.max(y))
        min_y = min(min_y, np.min(y))

    print(f"min X of S{n}: {min_x}")
    print(f"max X of S{n}: {max_x}")
    print(f"min Y of S{n}: {min_y}")
    print(f"max Y of S{n}: {max_y}", "\n")  

end_time = time.time()
print(f"Time taken to store data: {end_time - start_time:.4f} seconds")




### Plotting with Units

In [None]:
# UNIT CONVERSIONS

# video resolutions after cropping
# manual cropping done. the intention was to match the visual size of the observation hive dimensions in
## S1 and S2 which were already in a cropped fashion

VIDEO_FRAME_RATE = 10 # used during plotting

HIVE_WIDTH_PIXELS_S1 = 2560
HIVE_HEIGHT_PIXELS_S1 = 2560

HIVE_WIDTH_PIXELS_S2 = 2560
HIVE_HEIGHT_PIXELS_S2 = 2560

HIVE_WIDTH_PIXELS_S3 = 2328
HIVE_HEIGHT_PIXELS_S3 = 2144

HIVE_WIDTH_PIXELS_S4 = 2360
HIVE_HEIGHT_PIXELS_S4 = 2160

HIVE_WIDTH_PIXELS_S5 = 2277
HIVE_HEIGHT_PIXELS_S5 = 2160

HIVE_CM = 47

# convert to a factor of pixel/cm
PIXELS_PER_CM_S1_WIDTH = HIVE_WIDTH_PIXELS_S1 / HIVE_CM # rough factor of 54.4
PIXELS_PER_CM_S1_HEIGHT = HIVE_HEIGHT_PIXELS_S1 / HIVE_CM

PIXELS_PER_CM_S2_WIDTH = HIVE_WIDTH_PIXELS_S2 / HIVE_CM
PIXELS_PER_CM_S2_HEIGHT = HIVE_HEIGHT_PIXELS_S2 / HIVE_CM

PIXELS_PER_CM_S3_WIDTH = HIVE_WIDTH_PIXELS_S3 / HIVE_CM
PIXELS_PER_CM_S3_HEIGHT = HIVE_HEIGHT_PIXELS_S3 / HIVE_CM

PIXELS_PER_CM_S4_WIDTH = HIVE_WIDTH_PIXELS_S4 / HIVE_CM
PIXELS_PER_CM_S4_HEIGHT = HIVE_HEIGHT_PIXELS_S4 / HIVE_CM

PIXELS_PER_CM_S5_WIDTH = HIVE_WIDTH_PIXELS_S5 / HIVE_CM
PIXELS_PER_CM_S5_HEIGHT = HIVE_HEIGHT_PIXELS_S5 / HIVE_CM

# now make a dictionary containing this factor
# add the factors together and divide by 2 because it's a square, but the resolutions aren't perfectly square
# therefore, take the average 
pixels_per_cm = {
    'S1': (PIXELS_PER_CM_S1_WIDTH + PIXELS_PER_CM_S1_HEIGHT) / 2, # Average if close
    'S2': (PIXELS_PER_CM_S2_WIDTH + PIXELS_PER_CM_S2_HEIGHT) / 2, 
    'S3': (PIXELS_PER_CM_S3_WIDTH + PIXELS_PER_CM_S3_HEIGHT) / 2, 
    'S4': (PIXELS_PER_CM_S4_WIDTH + PIXELS_PER_CM_S4_HEIGHT) / 2, 
    'S5': (PIXELS_PER_CM_S5_WIDTH + PIXELS_PER_CM_S5_HEIGHT) / 2, 
}

pixels_per_cm_width = {
    'S1': PIXELS_PER_CM_S1_WIDTH,
    'S2': PIXELS_PER_CM_S2_WIDTH,
    'S3': PIXELS_PER_CM_S3_WIDTH,
    'S4': PIXELS_PER_CM_S4_WIDTH,
    'S5': PIXELS_PER_CM_S5_WIDTH,
}
pixels_per_cm_height = {
    'S1': PIXELS_PER_CM_S1_HEIGHT,
    'S2': PIXELS_PER_CM_S2_HEIGHT,
    'S3': PIXELS_PER_CM_S3_HEIGHT,
    'S4': PIXELS_PER_CM_S4_HEIGHT,
    'S5': PIXELS_PER_CM_S5_HEIGHT,
}

# an offset is required due to the cropping of S3-S5 not being aligned with the bee's position
crop_offset_x = {  
    'S3': 750,  
    'S4': 750,  
    'S5': 750,  
}

print("Unit Conversion constants loaded!")

In [None]:
output_folder_units = "../plots_with_units"
os.makedirs(output_folder_units, exist_ok=True)
print("Folder 'plots_with_units' exists!")

Plot standard x,y values

In [None]:
# plot x and y values

# for optimizations 
start_time = time.time()

for n in range(1,6): 
    plt.figure(figsize=(5,5)) 
    
    s_label = f"S{n}"
    ppm_width = pixels_per_cm_width[s_label]
    ppm_height = pixels_per_cm_height[s_label]
    cm_per_pixel_width = 1 / ppm_width
    cm_per_pixel_height = 1 / ppm_height

    all_x_cm = []
    all_y_cm = []

    for t_arr, x_arr, y_arr, cell_arr, angle_arr in trajectories[s_label]:

        # apply the offset to the x-coordinates for S3-S5
        if s_label in crop_offset_x:
            x_corrected_arr = x_arr - crop_offset_x[s_label]
        else:
            x_corrected_arr = x_arr

        x_cm_arr = x_corrected_arr * cm_per_pixel_width
        y_cm_arr = y_arr * cm_per_pixel_height
        all_x_cm.extend(x_cm_arr)
        all_y_cm.extend(y_cm_arr)
        # Plot all trajectories for S{n} at once
        plt.plot(x_cm_arr, y_cm_arr, label=filename)

    

    # mins and max of xy, for unit cerification
    if all_x_cm and all_y_cm:
        min_x_cm = np.min(all_x_cm)
        max_x_cm = np.max(all_x_cm)
        min_y_cm = np.min(all_y_cm)
        max_y_cm = np.max(all_y_cm)
        print(f"S{n}:")
        print(f"  Min X (cm): {min_x_cm:.4f}")
        print(f"  Max X (cm): {max_x_cm:.4f}")
        print(f"  Min Y (cm): {min_y_cm:.4f}")
        print(f"  Max Y (cm): {max_y_cm:.4f}")
    else:
        print(f"No trajectory data found for {s_label}")
    

    # Plot Labels in Units
    plt.title(f"S{n} Trajectories (cm)")
    plt.xlabel("X Position (cm)")
    plt.ylabel("Y Position (cm)")

    save_path = os.path.join(output_folder_units, f"xy_plot_cm_S{n}.png")
    plt.savefig(save_path, dpi = 300, bbox_inches="tight") 

    # plt.close() --> close the figure to free up memory. alternate with show
    plt.show()

end_time = time.time()
print(f"Time taken to plot (normal, cm): {end_time - start_time:.4f} seconds")

Plotting with standardized values, every bee starts at the (0,0) position

In [None]:
# PLOT WITH STANDARDIZED VALUES
start_time = time.time()

for n in range(1,6): 

    min_x_standardized = float('inf')
    min_y_standardized = float('inf')
    max_x_standardized = float('-inf')
    max_y_standardized = float('-inf')

    plt.figure(figsize=(5,5)) # creates a new figure for every single new S{n}

    s_label = f"S{n}"
    ppm_width = pixels_per_cm_width[s_label]
    ppm_height = pixels_per_cm_height[s_label]
    cm_per_pixel_width = 1 / ppm_width
    cm_per_pixel_height = 1 / ppm_height
    

    for t_arr, x_arr, y_arr, cell_arr, angle_arr in trajectories[f"S{n}"]:  # for all data in trajectories of S1,S2,...
        if s_label in crop_offset_x:
            x_corrected_arr = x_arr - crop_offset_x[s_label]
        else:
            x_corrected_arr = x_arr

        x_standardized = x_corrected_arr - x_corrected_arr[0]  # Standardize to start at 0
        y_standardized = y_arr - y_arr[0]

        x_standardized_cm = x_standardized * cm_per_pixel_width
        y_standardized_cm = y_standardized * cm_per_pixel_height

        plt.plot(x_standardized_cm, y_standardized_cm, label=filename)

        max_x_standardized = max(max_x_standardized, np.max(x_standardized_cm))
        min_x_standardized = min(min_x_standardized, np.min(x_standardized_cm))
        max_y_standardized = max(max_y_standardized, np.max(y_standardized_cm))
        min_y_standardized = min(min_y_standardized, np.min(y_standardized_cm))
    
    print(f"Standardized S{n}:")
    print(f"  Min X (cm): {min_x_standardized:.4f}")
    print(f"  Max X (cm): {max_x_standardized:.4f}")
    print(f"  Min Y (cm): {min_y_standardized:.4f}")
    print(f"  Max Y (cm): {max_y_standardized:.4f}")

    
    plt.title(f"Standardized S{n} Trajectories (cm)")
    plt.xlabel("Standardized X Position (cm)")
    plt.ylabel("Standardized Y Position (cm)")

    save_path = os.path.join(output_folder_units, f"standardized_plot_cm_S{n}.png") 
    plt.savefig(save_path, dpi = 300, bbox_inches="tight") 

    # plt.close() --> close the figure to free up memory. alternate with show
    plt.show()

end_time = time.time()
print(f"Time taken to plot data (standardized, cm): {end_time - start_time:.4f} seconds")

Plot the x amount of bees with the highest and lowest displacements (mainly for presentation purposes)

In [None]:
start_time = time.time()

num_top_bees = 5
num_bottom_bees = 5

for n in range(1, 6):
    plt.figure(figsize=(5, 5))

    s_label = f"S{n}"
    ppm_width = pixels_per_cm_width[s_label]
    ppm_height = pixels_per_cm_height[s_label]
    cm_per_pixel_width = 1 / ppm_width
    cm_per_pixel_height = 1 / ppm_height

    all_trajectories_cm = []
    total_displacements = []
    bee_index = 0

    for t_arr, x_arr, y_arr, cell_arr, angle_arr in trajectories[s_label]:
        # apply the offset to the x-coordinates
        if s_label in crop_offset_x:
            x_corrected_arr = x_arr - crop_offset_x[s_label]
        else:
            x_corrected_arr = x_arr

        x_cm_arr = x_corrected_arr * cm_per_pixel_width
        y_cm_arr = y_arr * cm_per_pixel_height

        # Store the trajectory in cm
        trajectory_cm = np.column_stack((x_cm_arr, y_cm_arr))
        all_trajectories_cm.append(trajectory_cm)

        # Calculate total displacement for this bee
        if len(trajectory_cm) > 1:
            displacement = np.sum(np.sqrt(np.diff(trajectory_cm[:, 0])**2 + np.diff(trajectory_cm[:, 1])**2))
            total_displacements.append((displacement, bee_index))
        else:
            total_displacements.append((0, bee_index))  # Zero displacement for single point trajectories

        bee_index += 1

    # Sort bees by total displacement
    sorted_displacements = sorted(total_displacements, key=lambda item: item[0])

    # Get indices of top and bottom bees
    top_bee_indices = [index for _, index in sorted_displacements[-num_top_bees:]]
    bottom_bee_indices = [index for _, index in sorted_displacements[:num_bottom_bees]]

    plotted_bee_count = 0
    plotted_labels = set() # To avoid duplicate labels

    # Plot trajectories of top and bottom bees
    for i, trajectory_cm in enumerate(all_trajectories_cm):
        if i in top_bee_indices or i in bottom_bee_indices:
            label = f"Bee {i+1}" # A simple label for each trajectory
            if label not in plotted_labels:
                plt.plot(trajectory_cm[:, 0], trajectory_cm[:, 1], label=label, alpha=0.7)
                plotted_labels.add(label)
                plotted_bee_count += 1

    # mins and max of xy for the plotted bees
    all_x_cm_plotted = np.concatenate([all_trajectories_cm[i][:, 0] for i in top_bee_indices + bottom_bee_indices]) if (top_bee_indices or bottom_bee_indices) else []
    all_y_cm_plotted = np.concatenate([all_trajectories_cm[i][:, 1] for i in top_bee_indices + bottom_bee_indices]) if (top_bee_indices or bottom_bee_indices) else []

    if all_x_cm_plotted.size > 0 and all_y_cm_plotted.size > 0:
        min_x_cm = np.min(all_x_cm_plotted)
        max_x_cm = np.max(all_x_cm_plotted)
        min_y_cm = np.min(all_y_cm_plotted)
        max_y_cm = np.max(all_y_cm_plotted)
        print(f"S{n} (Top {num_top_bees} & Bottom {num_bottom_bees} Bees):")
        print(f"  Min X (cm): {min_x_cm:.4f}")
        print(f"  Max X (cm): {max_x_cm:.4f}")
        print(f"  Min Y (cm): {min_y_cm:.4f}")
        print(f"  Max Y (cm): {max_y_cm:.4f}")
    else:
        print(f"No top/bottom trajectory data found for {s_label}")

    # Plot Labels in Units
    plt.title(f"S{n} Trajectories (cm)")
    plt.xlabel("X Position (cm)")
    plt.ylabel("Y Position (cm)")

    save_path = os.path.join(output_folder_units, f"xy_plot_top_bottom_displacements_cm_S{n}.png")
    plt.savefig(save_path, dpi=300, bbox_inches="tight")
    plt.show()

end_time = time.time()
print(f"Time taken to plot top/bottom (cm): {end_time - start_time:.4f} seconds")

### Everything below plots without units
This is old code from the beginning of the project, and is not relevant. The only code here that is not in the units version is the speed plots, as those were not used in the final presentation

In [None]:
# Normal Plots: no changes to data, plotting all bees 

start_time = time.time()
for n in range(1,6): 
    plt.figure(figsize=(4,4)) # creates a new figure for every single new S{n}

    for t, x, y, cell, angle in trajectories[f"S{n}"]: # for all data in trajectores of S1,S2,...
        plt.plot(x,y, label=filename) # plot after reading all x's and y's. label is for legend.  
    
    # now label everything and show the plot before restarting the loop for the next S{n}
    plt.title(f"S{n} Trajectories")
    plt.xlabel("X Position")
    plt.ylabel("Y Position")
    # plt.legend() # run time is too long when this is used
    plt.grid(True)

    save_path = os.path.join(output_folder, f"normal_plot_S{n}.png") # this ensures all files are formatted correctly across devices 
    plt.savefig(save_path, dpi = 300, bbox_inches="tight") # now we save the png with a dotsperinch of 300. 'tight' minimizes whitespace

    # plt.close() --> close the figure to free up memory. alternate this with show

    # the program saves images to a folder AND shows these plots. comment out plt.show() to only save images. 
    plt.show()

end_time = time.time()
print(f"Time taken to plot (normal): {end_time - start_time:.4f} seconds")

In [None]:
# Standardized Plot: everything starts from (0,0)

''' 
CONSIDERATIONS: 
    just make the xlim and ylim the size of the resolution of the video
    no need to calculate a minimum and maximum 
'''

# TEST 1: ONLY X AMOUNT OF STANDARDIZED PLOTS
#print("TEST 1 ACTIVE: First 10 points of standardized plots only")

start_time = time.time()

for n in range(1,6): 

    min_x = float('inf')
    min_y = float('inf')
    max_x = float('-inf') 
    max_y = float('-inf')

    plt.figure(figsize=(4,4)) # creates a new figure for every single new S{n}

    # TEST 1: INITIALIZING A COUNTER
    #bee_count = 0; 
    # TEST 1^^

    for t, x, y, cell, angle in trajectories[f"S{n}"]: # for all data in trajectores of S1,S2,...

        # TEST 1: STOP AFTER X BEES
        #if bee_count >= 10: 
         #   break
        # TEST 1^^


        x_standardized = x - x[0] # this populates a list with a loop. xn = each element of x, subtract it by the initial x[0]
        y_standardized = y - y[0]

        plt.plot(x_standardized, y_standardized, label=filename)

        max_x = max(max_x, np.max(x_standardized))
        min_x = min(min_x, np.min(x_standardized))
        max_y = max(max_y, np.max(y_standardized))
        min_y = min(min_y, np.min(y_standardized))

        # TEST 1: INCREMENT
        #bee_count += 1
        # TEST 1^^
    
    print(f"min X of S{n}: {min_x}")
    print(f"max X of S{n}: {max_x}")
    print(f"min Y of S{n}: {min_y}")
    print(f"max Y of S{n}: {max_y}")  


    # Find the overall min and max for a square aspect ratio
    overall_min = min(min_x, min_y) # find the 'most minimum' 
    overall_max = max(max_x, max_y)
    padding_factor = 1.1 # for visual clarity

    range_min = overall_min * padding_factor
    range_max = overall_max * padding_factor

    # now label everything and show the plot before restarting the loop for the next S{n}
    # set limits to be a square
    plt.xlim(range_min, range_max)
    plt.ylim(range_min, range_max)

    # testing out ticks v
    ticks = np.linspace(range_min, range_max, num=5)  # 5 evenly spaced ticks, but uneven numbers
    plt.xticks(ticks)
    plt.yticks(ticks)
    # testing out ticks ^ 

    plt.title(f"Standardized S{n} Trajectories")
    plt.xlabel("Standardized X Position")
    plt.ylabel("Standardized Y Position")
    # plt.legend() # run time is too long when this is used
    plt.grid(True)

    save_path = os.path.join(output_folder, f"standardized_plot_S{n}.png") # this ensures all files are formatted correctly across devices 
    plt.savefig(save_path, dpi = 300, bbox_inches="tight") # now we save the png with a dotsperinch of 300. 'tight' minimizes whitespace

    # plt.close() --> close the figure to free up memory. alternate this with show

    # the program saves images to a folder AND shows these plots. comment out plt.show() to only save images. 
    plt.show()

end_time = time.time()
print(f"Time taken to plot data (standardized): {end_time - start_time:.4f} seconds")

In [None]:
# Standardized Scatter Plots, with Time Gradients

# TEST 2: ONLY X AMOUNT OF STANDARDIZED PLOTS
#print("TEST 2 ACTIVE: First 10 points of standardized color gradient plots only")

# Normalize the time values only once
time_normalized = plt.Normalize(min(t), max(t))

# Precompute all colors for the time values
colors = cm.plasma(time_normalized(t))

start_time = time.time()
for n in range(1,6): 
    plt.figure(figsize=(4,4)) # creates a new figure for every single new S{n}

    # TEST 2: INITIALIZING A COUNTER
    #bee_count = 0; 
    # TEST 2^^

    for t, x, y, cell, angle in trajectories[f"S{n}"]: # for all data in trajectores of S1,S2,...

        # TEST 2: STOP AFTER X BEES
        #if bee_count >= 10:  
         #  break
        # TEST 2^^

        x_standardized = x - x[0] # this populates a list with a loop. xn = each element of x, subtract it by the initial x[0]
        y_standardized = y - y[0]

        # TO BE DELETED WITH MORE TESTING ON TIME MAPS
        # now create a color map based on time values
        #time_normalized = plt.Normalize(min(t), max(t)) # time_normalized is now a function with standardized time values
        #colors = cm.plasma(time_normalized(t))

        # now use a scatterplot to plot everything according to time
        # c = color based on time[i], cmap is the color map that we're using, norm to fit the color map, size of each scatter point
        plt.scatter(x_standardized, y_standardized, c=t, cmap = 'plasma', norm = time_normalized, s=1)
        
        #for i in range(len(x_standardized)):
         #   plt.plot(x_standardized[i], y_standardized[i], marker=',', color=cm.plasma(time_normalized(t[i])), label=filename)

        #for i in range(len(x_standardized) - 1):  
         #   plt.plot(x_standardized[i:i+2], y_standardized[i:i+2], color=cm.plasma(time_normalized(t[i])))


        # TEST 2: INCREMENT
        #bee_count += 1
        # TEST 2^^
    
    # now label everything and show the plot before restarting the loop for the next S{n}
    plt.title(f"Standardized S{n} Trajectories w/ Time Gradient")
    plt.xlabel("Standardized X Position")
    plt.ylabel("Standardized Y Position")
    plt.colorbar(label = "Time (Frames)")
    # plt.legend() # run time is too long when this is used
    plt.grid(True)

    # testing limits 
    #plt.xlim(-250,250)
    #plt.ylim(-250,250)

    save_path = os.path.join(output_folder, f"standardized_timegradient_plot_S{n}.png") 
    plt.savefig(save_path, dpi = 300, bbox_inches="tight") 

    plt.close() # --> close the figure to free up memory. alternate this with show

    # the program saves images to a folder AND shows these plots. comment out plt.show() to only save images. 
    #plt.show()

end_time = time.time()
print(f"Time taken to plot data (standardized, time gradient): {end_time - start_time:.4f} seconds")

Plotting speed against time. It would be more significant to plot the speed as a gradient on top of the x,y position plot.

In [None]:
# SPEED AGAINST TIME


# TEST 3: ONLY X AMOUNT OF SPEED PLOTS
#print("TEST 3 ACTIVE: First ALL points of speed plots only")

start_time = time.time()
for n in range(1,6): 
    plt.figure(figsize=(4,4)) 

    # TEST 3: INITIALIZING A COUNTER
    bee_count = 0; 
    # TEST 3 ^

    for t, x, y, cell, angle in trajectories[f"S{n}"]: 

        # TEST 3: STOP AFTER X BEES
        if bee_count >= 5: 
            break
        # TEST 3^^

        
        dx = np.diff(x) # consecutive differences, so x[1]-x[0], x[2]-x[1], etc.
        dy = np.diff(y) 

        distances = np.sqrt(dx**2 + dy**2) 

        dt = np.diff(t)
        
        speeds = distances / dt

        plt.plot(t[1:], speeds, label=filename) # plot after reading all x's and y's. label is for legend.  

        # TEST 3: INCREMENT
        #bee_count += 1
        # TEST 3^^
    
    
    plt.title(f"S{n} Speeds")
    plt.xlabel("Time (Frames)")
    plt.ylabel("Speed (pix. per frame?)")
    # plt.legend() 
    plt.grid(True)

    save_path = os.path.join(output_folder, f"speeds_S{n}.png") 
    plt.savefig(save_path, dpi = 300, bbox_inches="tight") 

    # plt.close() --> close the figure to free up memory. alternate this with show

    # the program saves images to a folder AND shows these plots. comment out plt.show() to only save images. 
    plt.xlim(0,500)
    plt.show()

end_time = time.time()
print(f"Time taken to plot (speeds): {end_time - start_time:.4f} seconds")
