In [None]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

import math

from scipy import spatial

color_list =["#3eb991","#e9a820","#e01563","#edb196","#6ecadc","#1f94ac","#ae9a6a","#ccb8a6","#343a44"]

# Diffusion / swimming module

In [None]:
# defining function for diffusion in 3D over time
def single_step(X, Y, theta, delta_T):

    # defining variables
    v = 1 * pow(10, -6);
    omega = 0;
    D_r = 0.05;
    D_t = 0.1 * pow(10, -12);

    delta_X, delta_Y = 0, 0;
    delta_theta = 0;

    # randomly sampling in a uniform distribution from 0 to 1 to get an angle
    W_theta = (math.pi / 2) * np.random.uniform(-1, 1);

    # randomly sampling noise in Gaussian distrib. with mean 0 and variance 1
    W_X = np.random.normal(0, 1);
    W_Y = np.random.normal(0, 1);

    # solving change in position for each dimension
    delta_X = v * np.cos(theta) * delta_T + np.sqrt(2 * D_t * delta_T) * W_X;
    delta_Y = v * np.sin(theta) * delta_T + np.sqrt(2 * D_t * delta_T) * W_Y;

    # solving change in each angle
    delta_theta = omega * delta_T + np.sqrt(2 * D_r * delta_T) * W_theta;

    # using a temporary variable to track position and angle
    X += delta_X;
    Y += delta_Y;
    theta += delta_theta;

    temp_postition = [];
    temp_postition = np.array([X, Y, theta]);

    return temp_postition; # returning coordinates and diffusion coefficient (for comparison later)

# Defining graphing for video

In [None]:
import os
import cv2
from glob import glob

In [None]:
os.makedirs("../content/graphs/2D", exist_ok=True); # creates directory if doesn't exist

old_files = glob("../content/graphs/2D/*.png"); # opens directory
for f in old_files: # for all files in this directory with extension .png
    os.remove(f); # delete files (helpful to remove old files)

In [None]:
def graph_time(x,y,time):

    fig, ax = plt.subplots()

    plt.scatter(x, y, s = 1); # s = x sets dot size; smaller makes it clearer where each swimmer is

    ax.set_xlim(-5e-4, 5e-4) # setting permanent axes
    ax.set_ylim(-5e-4, 5e-4)

    ax.set_xlabel("X")
    ax.set_ylabel("Y")

    fig.autofmt_xdate() # helpful to rotate x-axis labels so they do not annoyingly overlap

    ax.set_title(f"time: {time}, N = {len(x)}") # setting title with current time

    if (time < 10): # using for naming, probably a smarter way to do this
        plt.savefig(f"../content/graphs/2D/000{time}")
    elif (time < 100):
        plt.savefig(f"../content/graphs/2D/00{time}")
    elif (time < 1000):
        plt.savefig(f"../content/graphs/2D/0{time}")
    else:
        plt.savefig(f"../content/graphs/2D/{time}")

    plt.close()

# Defining the swimmer class

In [None]:
class makeSwimmer: # defines swimmer class
  def __init__(self, x, y, theta, start_time, parent, stall_time, down_time): # one can enter with 0, 0, 0 start or other
    self.start_time = start_time;
    self.position = np.array([[x, y, theta]]);
    self.parent = parent; # the index of the swimmer you diverged from (in the list)
    self.density = 0; # finding the density of nearby swimmers
    self.swimming = False; # this will define if the swimmer is swimming or still, True = swim, False = still
    self.stall_time = stall_time;
    self.down_time = down_time;
    self.down = False;
    # notes:
    # [done?] make divide time, subtract from current time, use this to induce stalling --- built in cool down
    # [done?] need second stalling metric, for when local density is high enough
    # [done?] will also need some metric of ensuring movement for some time after movement resumes

# The probability of dividing

In [None]:
def find_probability_divide(density, delta_T):
    sigma = 5;
    mean = 10;
    max = 0.5 * (1 / pow(2 * math.pi, 0.5)) * np.exp(- 0.5 * pow(((mean - mean) / sigma), 2)); # used to normalize
    probability = (1 / max) * 0.5 * (1 / pow(2 * math.pi, 0.5)) * np.exp(- 0.5 * pow(((density - mean) / sigma), 2)); # normalized to 1

    bool_temp = False;
    if probability > np.random.uniform(0, 1):
        bool_temp = True;

    return bool_temp;

# The probability of stopping

In [None]:
def find_probability_stop(density, delta_T):
    sigma = 5;
    mean = 15;
    max = 0.5 * (1 / pow(2 * math.pi, 0.5)) * np.exp(- 0.5 * pow(((mean - mean) / sigma), 2)); # used to normalize
    probability = (1 / max) * 0.5 * (1 / pow(2 * math.pi, 0.5)) * np.exp(- 0.5 * pow(((density - mean) / sigma), 2)); # normalized to 1

    bool_temp = False;
    if probability > np.random.uniform(0, 1):
        bool_temp = True;

    return bool_temp;

# Main controller and graph generator

In [None]:
total_time = 3000; # in seconds?
delta_T = 1;
indexed_time = int(total_time / delta_T); # used to index through time list
swimmer_Radius = 1 * pow(10, -7); # this is the swimmer size (too small?), so that when they divide their centers will be offset by this

down_time = 50;
stall_time = 100;

swimmerList = []; # a list of swimmers
for i in range(0, 1, 1): # initializes a list of 10 swimmers
    swimmerList.append(makeSwimmer(0, 0, 0, 0, 0, 0, 0)); # set starting point at 0, 0

population_size = [];
start_times = [0]; # will be helpful later and save a ton of comp. power

temp_coords_list = np.array([[0, 0]]);
for time in range(0, indexed_time, 1):
    population_size.append(len(swimmerList));

    # this is used to find the nearest swimmer
    # let us instead try to convert this module to density of nearby swimmers

    for i in range(0, len(swimmerList), 1): # used to compute the density around each swimmer

        time_effective = time - swimmerList[i].start_time; # to make using the start time easier
        compare_coord = [[swimmerList[i].position[time_effective, 0], swimmerList[i].position[time_effective, 1]]]; # stores current position
        distances = spatial.distance.cdist(temp_coords_list, compare_coord).flatten(); # flatten added to reduce to 1D array
        distances = distances[distances != 0]; # used to remove / extract non-zero numbers (i.e., self)

        if swimmerList[i].swimming: # boolean value for if the swimmer is swimming or still
            local_density = 0;
            for distance in distances:
                if distance < 0.0001: # not sure what this value equals functionally; I think it's in meters
                    local_density += 1; # increases for swimmers in this range
                    # note that if not satisfied, local density = 0, which is correct
            swimmerList[i].density = local_density; # stores local density

    temp_coords_list = np.array([[0, 0]]);

    for i in range(0, len(swimmerList), 1):

        time_effective = time - swimmerList[i].start_time; # to make using the start time easier
        arr = swimmerList[i].position[time_effective]; # to store the previous position

        bool_stop = find_probability_stop(swimmerList[i].density, delta_T);
        if(bool_stop):
            swimmerList[i].stall_time += delta_T;

        if swimmerList[i].swimming: # checks if swimmer is swimming
            temp_coords = single_step(arr[0], arr[1], arr[2], delta_T); # stores its current coordinates
            swimmerList[i].position = np.vstack((swimmerList[i].position, temp_coords)); # adds new dt coordinates, which are temporarily stored in temp_coords
        elif swimmerList[i].stall_time >= 0 and not swimmerList[i].swimming: # condition for if the swimmer is stalled (i.e., dividing)
            # note: I think it must be geq; consider the case when stall_time = 100 and dt = 100 for reasoning
            swimmerList[i].stall_time = swimmerList[i].stall_time - delta_T; # removes dt from stall_time
            swimmerList[i].position = np.vstack((swimmerList[i].position, [arr[0], arr[1], arr[2]])); # adds current position as next time's position
        else: # condition for if not stalled and not swimming (i.e., starting to swim again)
            swimmerList[i].swimming = True; # sets swimming back to True
            swimmerList[i].stall_time = 0; # sets stall_time to 0
            swimmerList[i].position = np.vstack((swimmerList[i].position, [arr[0], arr[1], arr[2]])); # finds position at next time

        if swimmerList[i].down and swimmerList[i].down_time >= 0: # if swimmer is in down_time and down_time still >= 0
            swimmerList[i].down_time = swimmerList[i].down_time - delta_T; # subtracts dt from downtime
        else: # condition when the swimmer is not on down_time
            swimmerList[i].down_time = 0; # sets down_time to be 0
            swimmerList[i].down = False; # sets down boolean to be False

        # determining probability, then using a random number generator to define division
        bool_divide = find_probability_divide(swimmerList[i].density, delta_T);

        if bool_divide and swimmerList[i].swimming and not swimmerList[i].down: # currently causing stochastic cell division
            swimmerList[i].swimming = False; # causes swimmer to stop
            swimmerList[i].down = True; # prevents swimmer from dividing again
            swimmerList[i].stall_time = stall_time; # sets duration of stopping
            swimmerList[i].down_time = down_time; # sets duration of down_time
            offset = np.random.normal(0, swimmer_Radius); # just doing this so it isn't exact same length every time, will need to think of a better way to do this, probably
            x_after_divide = arr[0] + offset; # causes next swimmer to start around body length away
            y_after_divide = arr[1] + offset;
            swimmerList.append(makeSwimmer(x_after_divide, y_after_divide, swimmerList[i].position[time_effective, 2], time + 1, i, 200, 200)); # set starting point at 0, 0
            start_times.append(time + 1);
            temp_coords_list = np.vstack((temp_coords_list, [x_after_divide, y_after_divide])); # necessary to include swimmers that just divided---ugly, though

        temp_coords_list = np.vstack((temp_coords_list, [swimmerList[i].position[time_effective, 0], swimmerList[i].position[time_effective, 1]])); # adds current coordinates to a list with other swimmers current coordinates

    temp_coords_list = np.delete(temp_coords_list, 0, 0); # removes first element of array, I think (which is 0,0, I think)

    dt_per_graph = 10;
    if time % dt_per_graph == 0: # defines how often we decide to graph---right now, we graph every 10 dt
        graph_time(temp_coords_list[:,0], temp_coords_list[:,1], time); # calls our graphing function
        # print(f"time = {time}, and n = {len(temp_coords_list)}");



final_x = []; # storing the final positions, for looking at the shape of the colony at the end
final_y = [];
for i in np.arange(0, len(swimmerList), 1):
    final_x.append(swimmerList[i].position[-1,0]);
    final_y.append(swimmerList[i].position[-1,1]);

# 2d graphing definitions
plt.subplot(2, 2, 1); # putting plots on the same image, this at row 1, col 1, index 1
for i in range(0, len(swimmerList), 1):
    plt.plot(swimmerList[i].position[:,0], swimmerList[i].position[:,1], linewidth=0.5);
    # here, : takes all values at position 0 (x) or 1 (y) for each swimmer in the list
    # plot then graphs all of those x and y independently

plt.xlabel('x')
plt.ylabel('y')
plt.title("path, N=" + str(len(swimmerList)))
plt.tight_layout()
# plt.gca().set_aspect(0.75)

plt.subplot(2, 2, 2); # putting plots on the same image, this at row 1, col 1, index 1
for i in range(0, len(swimmerList), 1):
    plt.plot(swimmerList[i].position[:,0], swimmerList[i].position[:,1], linewidth=0.5, color = color_list[swimmerList[i % len(color_list)].parent]);
    # here, : takes all values at position 0 (x) or 1 (y) for each swimmer in the list
    # plot then graphs all of those x and y independently

plt.xlabel('x')
plt.ylabel('y')
plt.title("path, N=" + str(len(swimmerList)))
plt.tight_layout()
# plt.gca().set_aspect(0.75)

plt.subplot(2, 2, 3); # putting plots on the same image, this at row 1, col 1, index 1
plt.plot(swimmerList[0].position[:,0], swimmerList[0].position[:,1], linewidth=1);

plt.xlabel('x')
plt.ylabel('y')
plt.title("path, swimmer 0")
plt.tight_layout()
# plt.gca().set_aspect(0.75)

plt.subplot(2, 2, 4); # putting plots on the same image, this at row 1, col 1, index 1
plt.plot(np.arange(0, total_time, delta_T), population_size);

plt.xlabel('time')
plt.ylabel('N')
plt.title("N over time")
plt.tight_layout()
# plt.gca().set_aspect('equal')


# Graphing final position
Helpful in knowing how they cluster in the end.

In [None]:
plt.xlabel('x')
plt.ylabel('y')
plt.title("final, N=" + str(len(swimmerList)))
plt.tight_layout()

plt.scatter(final_x, final_y);
plt.show()

# Create video
Combining the images into a proper video

In [None]:
image_folder = '../content/graphs/2D'; # the folder containing the images
video_name = '../content/graphs/video.mp4'; # the name of the video generated

fourcc = cv2.VideoWriter_fourcc(*'mp4v'); # helps openCV understand how to format the video

images = [img for img in os.listdir(image_folder) if img.endswith(".png")]; # puts all of the images into this list
frame = cv2.imread(os.path.join(image_folder, images[0])); # finds the frame of the images
height, width, layers = frame.shape; # creates dimensions based on the read frames

fps = 10; # sets frames per second of video (I think; it may also set some similarly scaled value)
video = cv2.VideoWriter(video_name, fourcc, fps, (width,height)); # creates video with specifications set by frame and other

images.sort(); # needed because, for some weird reason, it will read in the images in a strange order

for image in images: # take all of the images and combine them together, and save to image_folder
    video.write(cv2.imread(os.path.join(image_folder, image)));

cv2.destroyAllWindows();
video.release();