In [None]:
import math
import pandas as pd
from pandas import Series
#
import numpy as np
from numpy import log, mean, sqrt, where, std, exp, sign

#
import matplotlib.pyplot as plt
import time
#
import scipy
from scipy import stats
from scipy.stats import truncnorm
from scipy.stats import entropy
from scipy.stats import vonmises
#
import seaborn as sns
#
# https://stochastic.readthedocs.io/en/latest/general.html
# 
from fbm import FBM
from stochastic.processes import OrnsteinUhlenbeckProcess 

In [None]:
from numpy import finfo
epsilon = finfo(float).eps
print(epsilon)

In [None]:
#
# basic functions
#
def get_displacement(x_1, y_1, x_2, y_2):
    # compute the displacement between two points
    return sqrt((x_1-x_2)**2+(y_1-y_2)**2)

def get_displacements(x, y):
    #
    # inputs
    # x: list of x coordinate
    # y: list of y coordinate
    #
    # results
    # displacements: list of displacements between position at t(i-1) and t(i)
    #
    n_datapoints = len(x)
    displacements = np.array([get_displacement(x[i-1], y[i-1], x[i], y[i]) for i in range(1, n_datapoints)])
    return displacements

def get_d_traveled(x, y):
    #
    # inputs
    # x: list of x coordinate
    # y: list of y coordinate
    #
    # results
    # d_traveled: sum of list of displacements
    #    
    n_datapoints = len(x)
    d_traveled = sum(get_displacements(x, y)) 
    return d_traveled

In [None]:
#
# feature : efficiency and straightness
#
def get_efficiency(x, y):
    #
    # inputs
    # traj_x: list of x coordinate
    # traj_y: list of y coordinate
    #
    # results
    # efficiency: relevant to trajectory linearity; may be useful for detecting directed motion
    #       
    n_datapoints = len(x)
    upper = get_displacement(x[n_datapoints-1], y[n_datapoints-1], x[0], y[0])**2
    displacements_sq = get_displacements(x, y)**2
    lower = (n_datapoints-1)*sum(displacements_sq)
    efficiency = upper/lower
    return efficiency

def get_straightness(x, y):
    #
    # inputs
    # x: list of x coordinate
    # y: list of y coordinate
    #
    # results
    # straightness: relevant to average direction change between subsequent steps; may be useful for detecting directed motion
    #         
    n_datapoints = len(x)
    x_0 = x[0]
    y_0 = y[0]
    x_f = x[n_datapoints-1]
    y_f = y[n_datapoints-1]     
    upper = get_displacement(x_0, y_0, x_f, y_f)
    lower = get_d_traveled(x, y)
    straightness = upper/lower
    return straightness

In [None]:
#
# for computing the ratio of net displacement and distance traveled
# see: 
# Huang, S.Y., Zou, X.W. and Jin, Z.Z. (2002) Directed random walks in continuous space. Physical Review E, 65, 052105.
# Visser, A.W. and Kiørboe, T. (2006) Plankton motility patterns and encounter rates. Oecologia, 148, pp. 538--546.
#

def get_net_displacement(traj_x, traj_y):
    #
    # inputs
    # traj_x: collection of list of x coordinate
    # traj_y: collection of list of y coordinate
    #
    # results  
    # d_net_set: list of net displacement
    #
    n_runs = len(traj_x)
    d_net_set = []
    for i in range(0, n_runs):
        n_datapoints = len(traj_x[i])
        x_0 = traj_x[i][0]
        y_0 = traj_y[i][0]
        x_f = traj_x[i][n_datapoints-1]
        y_f = traj_y[i][n_datapoints-1]        
        d_net = get_displacement(x_0, y_0, x_f, y_f)
        d_net_set.append(d_net)
    return d_net_set

def get_pathlength(traj_x, traj_y):
    #
    # inputs
    # traj_x: collection of list of x coordinate
    # traj_y: collection of list of y coordinate
    #
    # results  
    # d_traveled_set: list of distance traveled
    #    
    n_runs = len(traj_x)
    d_traveled_set = []
    for i in range(0, n_runs):
        d_traveled = get_d_traveled(traj_x[i], traj_y[i])
        d_traveled_set.append(d_traveled)
    return d_traveled_set

In [None]:
# feature: 
# smallest enclosing circle (i.e., minimum bounding circle) and its normalized value
# normalized against distance traveled
#
def mbc_xyr(traj_x, traj_y):
    # note that traj_x and traj_y contain multiple records of x and y cordinates
    # size of traj_x and traj_y is n_trajectories
    n_runs = len(traj_x)
    #
    mbc_x = []
    mbc_y = []
    mbc_r = []
    nmbcd = []
    #
    for i in range(0, n_runs):
        position = list(zip(traj_x[i], traj_y[i]))
        mbcx_i, mbcy_i, mbcr_i = smallestenclosingcircle.make_circle(position)
        d_traveled = get_d_traveled(traj_x[i], traj_y[i])
        nmbcd_i = 2*mbcr_i/d_traveled # normalized minimum bounding circle size
        mbc_x.append(mbcx_i)
        mbc_y.append(mbcy_i)
        mbc_r.append(mbcr_i)
        nmbcd.append(nmbcd_i) 
    #
    # sort elements from lower to higher
    mbc_x = np.sort(mbc_x) 
    mbc_y = np.sort(mbc_y) 
    mbc_r = np.sort(mbc_r) 
    nmbcd = np.sort(nmbcd) 
    #
    return mbc_x, mbc_y, mbc_r, nmbcd

In [None]:
#
# feature : turning angle 
#
# entropy of turning angle 
# see:
# Liu et al. PRE 2017
# Establishing the kinetics of ballistic-to-diffusive transition using directional statistics
# Appendix A: Determining theta from trajectory
#
def get_turning_angle_measures(traj_x, traj_y, tau_frame, binwidth):
    #
    # inputs
    # traj_x = record_raw['relative_x']: list of x coordinate
    # traj_y = record_raw['relative_y']: list of y coordinate  
    # tau_frame: relevant to sampling frequency
    # binwidth: bin width for computing entropy
    #
    # results
    # turning_angle_avg: average of turning angle
    # turning_angle_std: standard deviation of turning angle
    # turning_angle_entropy: Shannon entropy of turning angle
    #
    # compute turning angle
    #    
    traj_x = pd.Series(traj_x)
    traj_y = pd.Series(traj_y)
    relative_x = traj_x[::tau_frame] # every tau-th row
    relative_y = traj_y[::tau_frame] # every tau-th row  
    relative_x = relative_x.reset_index(drop=True) # reset row index
    relative_y = relative_y.reset_index(drop=True) # reset row index
    turning_angle = []
    for i in range (1, len(relative_x)-1):
        diff_x1 = relative_x[i]-relative_x[i-1]
        diff_x2 = relative_x[i+1]-relative_x[i]
        diff_y1 = relative_y[i]-relative_y[i-1]
        diff_y2 = relative_y[i+1]-relative_y[i] 
        # compute k1 and k2
        k1 = 0 # diff_x1 > 0 and diff_y1 > 0
        if (diff_x1 >= 0):
            if (diff_y1 >= 0):
                k1 = 0
            else:
                k1 = 2
        if (diff_x1 < 0):
            k1 = 1
        k2 = 0
        if (diff_x2 >= 0):
            if (diff_y2 >= 0):
                k2 = 0
            else:
                k2 = 2
        if (diff_x2 < 0):
            k2 = 1    
        # compute phi_1 and phi_2
        # arc tangent of y/x in radians
        phi_1 = k1*np.pi + math.atan2(diff_y1, diff_x1) # 0, ..., 2*np.pi
        phi_2 = k2*np.pi + math.atan2(diff_y2, diff_x2) # 0, ..., 2*np.pi
        # compute m
        m = 0
        phi_diff = abs(phi_2-phi_1)
        if (phi_diff < np.pi):
            m = 0
        if (phi_diff > np.pi):
            if (phi_2 > phi_1):
                m = -1
            if (phi_2 < phi_1):
                m = 1
        # compute theta
        theta_i = 2*m*np.pi+phi_2-phi_1  # -np.pi, ..., np.pi
        turning_angle.append(theta_i)
    #
    # compute entropy
    #
    epsilon_p = 0.0001
    epsilon_q = 0.0001
    # create relative frequency histogram
    turning_angle_deg = [(x/np.pi)*180.0 for x in turning_angle] # degree
    x_max = 180 # deg
    x_min = -180 # deg
    #
    bin_list = np.arange(x_min, x_max, binwidth) 
    hist_p, edges_p = np.histogram(turning_angle_deg, bins=bin_list)
    freq_p = hist_p/float(hist_p.sum())
    freq_p += epsilon_p
    pk = np.reshape(freq_p, -1)
    base = len(bin_list) # normalized entropy
    entropy_p = entropy(pk, base=base)        
    #
    turning_angle_avg = np.mean(turning_angle)
    turning_angle_std = np.std(turning_angle)
    turning_angle_entropy = entropy_p
    #
    return turning_angle_avg, turning_angle_std, turning_angle_entropy

In [None]:
def compute_synthetic_trajectory_statistics(x, y, dt, tau_frame):
    binwidth_turning_angle = 15 # deg
    # tunring angle
    angle_avg = []
    angle_std = []
    angle_entropy = []
    # efficiency and straightness
    efficiency = []
    straightness = []
    #
    n_trajectories = len(x)
    n_steps = len(x[0])
    #
    for i in range(0, n_trajectories):
        x_i = x[i]
        y_i = y[i]
        # tunring angle
        angle_avg_i, angle_std_i, angle_entropy_i = get_turning_angle_measures(x_i, y_i, tau_frame, binwidth_turning_angle)
        angle_avg.append(angle_avg_i)
        angle_std.append(angle_std_i)
        angle_entropy.append(angle_entropy_i)    
        # efficiency and straightness
        efficiency_i = get_efficiency(x_i, y_i)
        efficiency.append(efficiency_i)
        straightness_i = get_straightness(x_i, y_i)
        straightness.append(straightness_i)
    # sort elements from lower to higher
    angle_avg = np.sort(angle_avg) 
    angle_std = np.sort(angle_std) 
    angle_entropy = np.sort(angle_entropy) 
    efficiency = np.sort(efficiency) 
    straightness = np.sort(straightness) 
    #
    return angle_avg, angle_std, angle_entropy, efficiency, straightness

In [None]:
def generate_graphs(subplot_title, trajectory_x, trajectory_y, variable_x, label_x, x_binwidth, x_min_i, x_max_i, xtick_space):
    #
    # prepare subplots
    #
    n_columns = len(variable_x)+1
    n_rows = 1
    #
    subplot_x = 4.0
    subplot_y = 3.0
    figsize_x = subplot_x*n_columns
    figsize_y = subplot_y*n_rows
    fig = plt.figure(figsize=(figsize_x, figsize_y))       
    #
    for i in range (0, n_rows):
        #
        for j in range (0, n_columns):
            ax = fig.add_subplot(n_rows, n_columns, (n_columns*i+j+1))
            #
            index_ij = i*n_columns+j
            #
            if (j < n_columns-1):
                x_ij = variable_x[index_ij]
                #
                check_x_min = False
                check_x_max = False
                if not x_min_i[j]:
                    check_x_min = True
                if not x_max_i[j]:
                    check_x_max = True
                #
                x_min = 0.0
                x_max = 0.0
                if (check_x_min == True):
                    x_min = min(min(x_ij), 0.0)  
                else: 
                    x_min = x_min_i[j]
                if (check_x_max == True):
                    x_max = max(x_ij)  
                else: 
                    x_max = x_max_i[j]            
                #
                n_xticks = 5            
                #
                bin_list = np.arange(x_min, x_max*1.05, x_binwidth[j]) 
                counts = plt.hist(x_ij, bins=bin_list, alpha=0.5, weights=np.ones_like(x_ij)/len(x_ij)) # alpha controls transparency
                #
                plt.xlim(x_min, x_max)
                if not xtick_space:
                    ax.xaxis.set_major_locator(plt.MaxNLocator(n_xticks))
                else:
                    plt.xticks(np.arange(x_min, x_max*1.05, xtick_space[j]))       
                plt.tick_params(labelsize=12)
                #
                plt.xlabel(label_x[j], fontsize=12)
            if (j == n_columns-1):
                plt.plot(trajectory_x[0], trajectory_y[0], color = 'b', lw=3) 
                plt.xlabel("position x (m)", fontsize=12)
                plt.ylabel("position y (m)", fontsize=12)                
            if (j == 0):
                plt.ylabel('relative frequency', fontsize=12) 
                plt.title(subplot_title, fontsize=16)               
    # set the spacing between subplots
    plt.subplots_adjust(left=0.1, bottom=0.1, right=0.9, top=0.9, wspace=0.4, hspace=0.4)    
    plt.show()

In [None]:
def generate_cdf(subplot_title, alpha_significance, estimate_lower, estimate_upper, trajectory_x, trajectory_y, variable_x, label_x, x_binwidth, x_min_i, x_max_i, xtick_space):
    #
    # prepare subplots
    #
    n_columns = len(variable_x)+1
    n_rows = 1
    #
    subplot_x = 4.0
    subplot_y = 3.0
    figsize_x = subplot_x*n_columns
    figsize_y = subplot_y*n_rows
    fig = plt.figure(figsize=(figsize_x, figsize_y))       
    #
    for i in range (0, n_rows):
        #
        for j in range (0, n_columns):
            ax = fig.add_subplot(n_rows, n_columns, (n_columns*i+j+1))
            #
            index_ij = i*n_columns+j
            #
            if (j < n_columns-1):
                x_ij = variable_x[index_ij]
                #
                check_x_min = False
                check_x_max = False
                if not x_min_i[j]:
                    check_x_min = True
                if not x_max_i[j]:
                    check_x_max = True
                #
                x_min = 0.0
                x_max = 0.0
                if (check_x_min == True):
                    x_min = min(min(x_ij), 0.0)  
                else: 
                    x_min = x_min_i[j]
                if (check_x_max == True):
                    x_max = max(x_ij)  
                else: 
                    x_max = x_max_i[j]            
                #
                n_xticks = 5            
                #
                bin_list = np.arange(x_min, x_max*1.05, x_binwidth[j]) 
                n, bins, patches = plt.hist(x_ij, bins=bin_list, density=True, histtype="step", cumulative=True, lw=3) # cumulative histogram
                #
                plt.xlim(x_min, x_max)
                if not xtick_space:
                    ax.xaxis.set_major_locator(plt.MaxNLocator(n_xticks))
                else:
                    plt.xticks(np.arange(x_min, x_max*1.05, xtick_space[j]))       
                plt.tick_params(labelsize=12)
                #
                alpha = alpha_significance # significance level
                # estimate_lower = [index_lower, entropy_lower, efficiency_lower, straightness_lower]
                # estimate_upper = [index_upper, entropy_upper, efficiency_upper, straightness_upper]
                # x_sorted = np.sort(x_ij) 
                # n_trajectories = len(x_sorted)
                # index_lower = int(n_trajectories*alpha*0.5)
                # index_upper = n_trajectories-index_lower
                # x_left  = x_sorted[index_lower]
                # x_right = x_sorted[index_upper]
                #
                x_left  = estimate_lower[j+1]
                x_right = estimate_upper[j+1]                
                plt.axvspan(x_left, x_right, color='r', alpha=0.5, lw=0)
                # plt.axhline(y = 0.5*alpha, color = 'r', linestyle = 'dashed', label = "red line")    
                # plt.axhline(y = 1.0-0.5*alpha, color = 'r', linestyle = 'dashed', label = "red line")    
                #
                plt.xlabel(label_x[j], fontsize=12)  
            if (j == n_columns-1):
                plt.axis('off')
            if (j == 0):
                plt.ylabel("cdf", fontsize=12) 
                plt.title(subplot_title, fontsize=16)               
    # set the spacing between subplots
    plt.subplots_adjust(left=0.1, bottom=0.1, right=0.9, top=0.9, wspace=0.4, hspace=0.4)    
    plt.show()

In [None]:
def generate_synthetic_trajectories_vonMises(n_trajectories, n_steps, dt, kappa):
    x = [] # trajectory_x
    y = [] # trajectory_y
    for i in range(0, n_trajectories):
        # initialize trajectories
        x_i = np.zeros(n_steps, dtype=float)
        y_i = np.zeros(n_steps, dtype=float)
        theta_i = np.zeros(n_steps, dtype=float)
        speed_i = 1.0 # unit step
        # generate a synthetic trajectory
        for j in range(1, n_steps):
            # for each step
            theta_i[j] = theta_i[j-1]+np.random.vonmises(0, kappa)
            x_i[j] = x_i[j-1]+speed_i*np.cos(theta_i[j])*dt
            y_i[j] = y_i[j-1]+speed_i*np.sin(theta_i[j])*dt
        #
        x.append(x_i)
        y.append(y_i)
    #
    return x, y

In [None]:
def test_synthetic_trajectories_vonMises(n_trajectories, n_steps, alpha_significance, kappa):
    #
    dt = 1
    tau_frame = 2
    D_diffusion = 0
    #
    # generate synthetic trajectories and then compute entropy, efficiency, and nbmcd
    #
    trajectory_x, trajectory_y = generate_synthetic_trajectories_vonMises(n_trajectories, n_steps, dt, kappa)
    angle_avg, angle_std, angle_entropy, efficiency, straightness = compute_synthetic_trajectory_statistics(trajectory_x, trajectory_y, dt, tau_frame)   
    d_net  = get_net_displacement(trajectory_x, trajectory_y)
    d_path = get_pathlength(trajectory_x, trajectory_y)
    #
    # sort elements from lower to higher
    #
    angle_avg = np.sort(angle_avg) 
    angle_std = np.sort(angle_std) 
    angle_entropy = np.sort(angle_entropy) 
    efficiency = np.sort(efficiency) 
    straightness = np.sort(straightness)     
    #
    # compute avgerage and standard deviation
    #
    entropy_avg      = np.mean(angle_entropy)
    entropy_std      = np.std(angle_entropy)
    efficiency_avg   = np.mean(efficiency)
    efficiency_std   = np.std(efficiency)
    straightness_avg = np.mean(straightness)
    straightness_std = np.std(straightness)
    d_net_avg        = np.mean(d_net)
    d_path_avg       = np.mean(d_path)
    #
    variable_x  = [angle_entropy, efficiency, straightness]
    label_x     = ["turning angle entropy", "efficiency", "straightness"]
    x_binwidth  = [0.05, 0.05, 0.05]
    x_min       = [0.0, 0.0, 0.0]
    x_max       = [1.0, 1.0, 1.0]
    xtick_space = [0.2, 0.2, 0.2]    
    #
    # find boundary of significance level
    #
    # for entropy of ballistic motion, we find upper boundary, 
    # so we consider entropy smaller than the threshold value as ballistic motion
    #
    # for efficiency and straightness of ballistic motion, we find lower boundary,
    # so we consider efficiency and straightness smaller than the threshold value as ballistic motion
    #
    index_lower        = int(n_trajectories*alpha_significance*0.5)
    index_upper        = n_trajectories-index_lower    
    #
    entropy_lower      = x_min[0]
    entropy_upper      = max(angle_entropy[index_upper-1], x_binwidth[0])
    efficiency_lower   = min(efficiency[index_lower], (x_max[1]-x_binwidth[1]))
    efficiency_upper   = x_max[1]
    straightness_lower = min(straightness[index_lower], (x_max[2]-x_binwidth[2]))
    straightness_upper = x_max[2]
    #
    estimate_lower = [index_lower, entropy_lower, efficiency_lower, straightness_lower]
    estimate_upper = [index_upper, entropy_upper, efficiency_upper, straightness_upper]   
    #
    summary_statistics = [kappa, n_steps,  
                          entropy_avg, entropy_std, entropy_lower, entropy_upper,
                          efficiency_avg, efficiency_std, efficiency_lower, efficiency_upper,
                          straightness_avg, straightness_std, straightness_lower, straightness_upper,
                          d_net_avg, d_path_avg]             
    #
    subplot_title = r"$\kappa$="+str(kappa)+", n="+str(n_steps)
    #
    generate_graphs(subplot_title, trajectory_x, trajectory_y, variable_x, label_x, x_binwidth, x_min, x_max, xtick_space)
    generate_cdf(subplot_title, alpha_significance, estimate_lower, estimate_upper, trajectory_x, trajectory_y, variable_x, label_x, x_binwidth, x_min, x_max, xtick_space)
    #
    return summary_statistics

In [None]:
#
# create a list for collecting summary statistics 
# later we will creat a pandas dataframe for writing the summaries
# 
# see
# https://stackoverflow.com/questions/17091769/python-pandas-fill-a-dataframe-row-by-row
#
summary_statistics = []

In [None]:
def generate_results(n_steps_set, kappa, summary_statistics):
    t_temp = time.localtime()
    t_start = time.strftime("%H:%M:%S", t_temp)
    print(t_start)
    #
    # von Mises process
    #
    n_trajectories = 10000
    alpha_significance = 0.05 # significance level, 0.05
    #
    for n_steps in n_steps_set:
        results_i = test_synthetic_trajectories_vonMises(n_trajectories, n_steps, alpha_significance, kappa)
        summary_statistics.append(results_i)
    #   
    # print the summary statistics on screen
    #
    header = "kappa\tn_steps"
    header += "\tentropy_avg\t_std\t_lower\t_upper"
    header += "\tefficiency_avg\t_std\t_lower\t_upper"
    header += "\tstraightness_avg\t_std\t_lower\t_upper"
    header += "\td_net\td_path"
    print(header)
    for i in range(0, len(summary_statistics)):
        print_summary = str(summary_statistics[i][0])+"\t"+str(summary_statistics[i][1])
        for j in range(2, len(summary_statistics[i])):
            print_summary += "\t{:.3f}".format(summary_statistics[i][j])
        print(print_summary)
    #
    #
    t_temp = time.localtime()
    t_end = time.strftime("%H:%M:%S", t_temp)
    print(t_end)    
    print(t_start)
    #
    return summary_statistics

In [None]:
n_steps_set = [10, 25, 50, 75, 100, 200, 250, 300, 500, 750, 1000]
kappa = 0
summary_statistics = generate_results(n_steps_set, kappa, summary_statistics)

In [None]:
n_steps_set = [10, 25, 50, 75, 100, 200, 250, 300, 500, 750, 1000]
kappa = 10
summary_statistics = generate_results(n_steps_set, kappa, summary_statistics)

In [None]:
n_steps_set = [10, 25, 50, 75, 100, 200, 250, 300, 500, 750, 1000]
kappa = 50
summary_statistics = generate_results(n_steps_set, kappa, summary_statistics)

In [None]:
n_steps_set = [10, 25, 50, 75, 100, 200, 250, 300, 500, 750, 1000]
kappa = 100
summary_statistics = generate_results(n_steps_set, kappa, summary_statistics)

In [None]:
n_steps_set = [10, 25, 50, 75, 100, 200, 250, 300, 500, 750, 1000]
kappa = 200
summary_statistics = generate_results(n_steps_set, kappa, summary_statistics)

In [None]:
n_steps_set = [10, 25, 50, 75, 100, 200, 250, 300, 500, 750, 1000]
kappa = 300
summary_statistics = generate_results(n_steps_set, kappa, summary_statistics)

In [None]:
n_steps_set = [10, 25, 50, 75, 100, 200, 250, 300, 500, 750, 1000]
kappa = 400
summary_statistics = generate_results(n_steps_set, kappa, summary_statistics)

In [None]:
n_steps_set = [10, 25, 50, 75, 100, 200, 250, 300, 500, 750, 1000]
kappa = 500
summary_statistics = generate_results(n_steps_set, kappa, summary_statistics)

In [None]:
#
# creat results in terms of pandas datafreame and write them in csv file.
#
df_summary = pd.DataFrame(data=summary_statistics,
                          columns=["kappa", "n_steps",
                                   "entropy_avg","entropy_std","entropy_lower","entropy_upper",
                                   "efficiency_avg","efficiency_std","efficiency_lower","efficiency_upper",
                                   "straightness_avg","straightness_std","straightness_lower","straightness_upper",
                                   "d_net", "d_path"])
#
csv_file_name = "summary_statistics_CRW.csv"
df_summary.to_csv(csv_file_name, index=False, float_format='%.3f') 