In [None]:
%load_ext autoreload
%autoreload 2
%matplotlib inline

In [None]:
import json
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.colors as clr
import matplotlib.patches as patches
import matplotlib.cm as cm

from itertools import groupby
from matplotlib.ticker import MaxNLocator
from pathlib import Path
from loguru import logger
logger.remove()

from pim.simulator import SimulationExperiment
from pim.setup import load_results, enumerate_results

In [None]:
paths = []
# paths.append("../../results/evaluation-plots_20220926-170156/dye/")
# paths.append("../../results/dye-eval_20220928-160312")
# paths.append("../../results/cheat-dye-eval_20220927-121438")
# paths.append("../../results/model-v-model_20220928-103443") # beta 0.3
# paths.append("../../results/model-v-model_20220928-170840") # beta 0.5

# paths.append("../../results/dye-eval_20220929-123436/1500")
# paths.append("../../results/cheat-dye-eval_20220929-131609/1500")
paths.append("../../results/model-v-model_20220929-163934")

results = list(load_results(enumerate_results(paths)))

In [None]:
dye = [result for result in results if result.name.split('.')[0] == 'dye']
dye_beta = [result for result in results if result.name.split('.')[0] == 'dye-beta']
dye_cheat = [result for result in results if result.name.split('.')[0] == 'cheat-dye']
dye_outbound = [result for result in results if (result.name.split('.')[0] == 'dye' 
                                                or result.name.split('.')[0] == 'dye-outbound')]
dye_outbound_cheat = [result for result in results if (result.name.split('.')[0] == 'cheat-dye-outbound' 
                                                or result.name.split('.')[0] == 'cheat-dye-outbound')]
weights = [result for result in results if result.name.split('.')[0] == 'weights']
pontine = [result for result in results if result.name.split('.')[0] == 'pontine']
basic = [result for result in results if result.name.split('.')[0] == 'basic']
random = [result for result in results if result.name.split('.')[0] == 'random']

In [None]:
# ----------------------------------- Stone plotting functions -----------------------------------
def plot_angular_distances(noise_levels, angular_distances, bins=18, ax=None,
                           label_font_size=11, log_scale=False, title=None):
        
    fig = None
    if ax is None:
        fig, ax = plt.subplots(subplot_kw=dict(projection='polar'),
                               figsize=(10, 10))

    colors = [cm.viridis(x) for x in np.linspace(0, 1, len(noise_levels))]

    for i in reversed(range(len(noise_levels))):
        plot_angular_distance_histogram(angular_distance=angular_distances[i],
                                        ax=ax, bins=bins, color=colors[i])

    ax.set_theta_zero_location("N")
    ax.set_theta_direction(-1)
    ax.set_rlabel_position(22)
    ax.set_title(title, y=1.08, fontsize=label_font_size)

    if log_scale:
        ax.set_rscale('log')
        ax.set_rlim(0.0, 10001)  # What determines this?

    plt.tight_layout()
    return fig, ax

def plot_angular_distance_histogram(angular_distance, ax=None, bins=36,
                                    color='b'):
    fig = None
    if ax is None:
        fig, ax = plt.subplots(figsize=(10, 10))

    radii = np.histogram(angular_distance,
                         np.linspace(-np.pi - np.pi / bins,
                                     np.pi + np.pi / bins,
                                     bins + 2,
                                     endpoint=True))[0]
    radii[0] += radii[-1]
    radii = radii[:-1]
    radii = np.roll(radii, int(bins/2))
    radii = np.append(radii, radii[0])
    # Set all values to have at least a count of 1
    # Need this hack to get the plot fill to work reliably
    radii[radii == 0] = 0.5
    theta = np.linspace(0, 2 * np.pi, bins+1, endpoint=True)

    ax.plot(theta, radii, color=color, alpha=0.5)
    if color:
        ax.fill_between(theta, 0, radii, alpha=0.2, color=color)
    else:
        ax.fill_between(theta, 0, radii, alpha=0.2)

    return fig, ax

# --------------------------------- Evaluation stuff -----------------------------------

def mem_errors(model, noise=0.1, T_outbound=1500, ax=None, color="b"):
    errors = np.array([result.memory_error() for result in model 
                if result.parameters['cx']['params']['noise'] == noise 
                and result.parameters['T_outbound'] == T_outbound])
#     min_errors = errors.min(axis=0)
    mean_errors = errors.mean(axis=0)
#     max_errors = errors.max(axis=0)
    
    std_errors =  errors.std(axis=0)

    if ax is not None:
        timesteps = np.arange(0,T_outbound+model[0].parameters['T_inbound'],1)
        
        ax.fill_between(timesteps,mean_errors+std_errors, mean_errors, color=color, alpha=0.2)
        ax.plot(timesteps, mean_errors, color=color, label=noise)
        ax.fill_between(timesteps,mean_errors-std_errors, mean_errors, color=color, alpha=0.2)
        
        ax.set_xlabel("Timesteps")
        ax.set_ylabel("Memory error")
        
        ax.legend(title="Noise", loc='upper left')

    return errors

def angular_mem_errors(model, noise=0.1, T_outbound=1500, ax=None, color="b"):
    errors = np.array([result.angular_memory_error() for result in model 
                        if result.parameters['cx']['params']['noise'] == noise 
                        and result.parameters['T_outbound'] == T_outbound])
#     min_errors = errors.min(axis=0) * 180/np.pi
    mean_errors = errors.mean(axis=0) * 180/np.pi
#     max_errors = errors.max(axis=0) * 180/np.pi
    
    std_errors =  errors.std(axis=0) * 180/np.pi
    
    if ax is not None:
        timesteps = np.arange(0,T_outbound+model[0].parameters['T_inbound'],1)
        
        ax.fill_between(timesteps,mean_errors+std_errors, mean_errors, color=color, alpha=0.2)
        ax.plot(timesteps, mean_errors, color=color, label=noise)
        ax.fill_between(timesteps,mean_errors-std_errors, mean_errors, color=color, alpha=0.2)
        
        ax.set_xlabel("Timesteps")
        ax.set_ylabel("Angular memory error")
        ax.set_yticks([-180,-90,0,90,180])
        
        ax.legend(title="Noise", loc='upper left')

    return errors

def heading_errors(model, noise=0.1, T_outbound=1500, ax=None, color="b"):
    errors = np.array([result.heading_error() for result in model 
                if result.parameters['cx']['params']['noise'] == noise 
                and result.parameters['T_outbound'] == T_outbound])
#     min_errors = errors.min(axis=0)
    mean_errors = errors.mean(axis=0)
#     max_errors = errors.max(axis=0)
    
    std_errors =  errors.std(axis=0)

    if ax is not None:
        timesteps = np.arange(0,T_outbound+model[0].parameters['T_inbound'],1)
        
        ax.fill_between(timesteps,mean_errors+std_errors, mean_errors, color=color, alpha=0.2)
        ax.plot(timesteps, mean_errors, color=color, label=noise)
        ax.fill_between(timesteps,mean_errors-std_errors, mean_errors, color=color, alpha=0.2)
        
        ax.set_xlabel("Timesteps")
        ax.set_ylabel("Heading error")
        
        ax.legend(title="Noise", loc='upper left')

    return errors

def tortuosity_plot(model, noise=0.1, T_outbound=1500, ax=None):
    actual_dists = []
    optimal_dists = []
    T = []
    
    for result in model:
        if result.parameters['cx']['params']['noise'] == noise and result.parameters['T_outbound'] == T_outbound:
            T, _, actual, _, optimal = result.homing_tortuosity()
            T = T
            actual_dists.append(actual)
            optimal_dists.append(optimal)

    actual_dists = np.array(actual_dists)
    optimal_dists = np.array(optimal_dists)
    
    mean_actual = actual_dists.mean(axis=0)
    std_actual = actual_dists.std(axis=0)
    
    mean_optimal = optimal_dists.mean(axis=0)
    std_optimal = optimal_dists.std(axis=0)
    
    if ax is not None:
#         ax.fill_between(100*mean_actual, 100*(mean_actual+std_actual), color="blue", alpha=0.2)
        ax.plot(T, mean_actual, label="distance from home", color="blue")
#         ax.fill_between(100*mean_actual, 100*(mean_actual-std_actual), color="blue", alpha=0.2)
        ax.plot(T, mean_optimal, label="optimal", color="orange")
        ax.set_xlabel("time (steps)")
        ax.set_ylabel("% of homing distance remaining")
        ax.set_xlim(0, T_outbound)
        ax.legend()

def example_path(result, ax=None, decode=False):
    ax.axis("equal")
    T_inbound = result.parameters['T_inbound']
    T_outbound = result.parameters['T_outbound']
    ax.set_title(f'Example path, {T_outbound+T_inbound} steps')
    result.plot_path(ax=ax, decode=decode)
    
def min_dist_histogram(model, noise=0.1, T_outbound=1500, ax=None, binwidth=1, confidence = 0.95):
    min_dists = np.array([np.linalg.norm(result.closest_position()) for result in model 
                            if result.parameters['cx']['params']['noise'] == noise
                            and result.parameters['T_outbound'] == T_outbound])
    
    interval = np.percentile(min_dists,[100*(1-confidence)/2,100*(1-(1-confidence)/2)])
    
    if ax is not None:
        ax.yaxis.set_major_locator(MaxNLocator(integer=True))
        ax.set_xlabel("Closest distance")
        ax.set_ylabel("Frequency")
        ax.hist(min_dists, bins=np.arange(min(min_dists), max(min_dists) + binwidth, binwidth))
        ax.axvline(interval[1],color="k",linestyle="--")
    
def angle_offset_after_radius(model, radius=20, ax=None):
    
    noise_levels = get_noise_levels(model)
    grouped = group_by_noise(model)
    angular_dists = np.array([[result.compute_disk_leaving_angle(radius) for result in group] for group in grouped])
    if ax is not None:
        plot_angular_distances(noise_levels,angular_dists,ax=ax)
    
def min_dist_v_route_length(model, noise=0.1, ax=None, color="b"):
    model = [result for result in model if result.parameters['cx']['params']['noise'] == noise]
    outbounds = get_outbounds(model)
    grouped = group_by_outbound(model)
    
    distances = np.array([get_min_dists(group,noise,get_outbounds(group)[0]) for group in grouped])
#     min_dists = distances.min(axis=1)
    mean_dists = distances.mean(axis=1)
#     max_dists = distances.max(axis=1)
    std_dists = distances.std(axis=1)
    
    if ax is not None:
        ax.set_xlabel("Outbound timesteps")
        ax.set_ylabel("Closest distance")

        ax.plot(outbounds ,mean_dists, color=color, label=noise)
        ax.fill_between(outbounds, mean_dists+std_dists, mean_dists, alpha=0.2, color=color)
        ax.fill_between(outbounds, mean_dists-std_dists, mean_dists, alpha=0.2, color=color)
        ax.legend(title="Noise", loc='upper left')

# ----------------------------------- Help functions -----------------------------------
def get_random_result(model, noise=0.1, T_outbound=1500):
    filtered_model = [result for result in model 
                          if result.parameters['cx']['params']['noise'] == 0.1 
                          and result.parameters['T_outbound'] == T_outbound]
    random_idx = np.random.randint(0,len(filtered_model))
    return filtered_model[random_idx]

def get_noise_levels(model):
    return sorted(set([result.parameters["cx"]["params"]["noise"] for result in model]))

def group_by_noise(model):
    return [list(v) for l,v in groupby(sorted(model, key=lambda x:x.parameters["cx"]["params"]["noise"]), lambda x: x.parameters["cx"]["params"]["noise"])]

def get_outbounds(model):
    return sorted(set([result.parameters["T_outbound"] for result in model]))

def group_by_outbound(model):
    return [list(v) for l,v in groupby(sorted(model, key=lambda x:x.parameters["T_outbound"]), lambda x: x.parameters["T_outbound"])]

def get_min_dists(model,noise=0.1,T_outbound=1500):
    return np.array([np.linalg.norm(result.closest_position()) for result in model 
                            if result.parameters['cx']['params']['noise'] == noise
                            and result.parameters['T_outbound'] == T_outbound])

def tortuosity_scores(model, noise=0.1, T_outbound=1500):
    return np.array([result.tortuosity_score() for result in model 
                if result.parameters['cx']['params']['noise'] == noise 
                and result.parameters['T_outbound'] == T_outbound])


In [None]:
def single_model_eval(model, model_2, noise_levels=[0.1,0.2,0.3,0.4]):
    fig = plt.figure(figsize=(15,15))
    
    noise = 0.1
    T_outbound = 1500

    colors = [cm.viridis(x) for x in np.linspace(0, 1, len(noise_levels))]

#     # Min dist histogram
#     min_dist_histogram(model,noise,T_outbound,ax=plt.subplot(421))

#     # Min dist v outbound path
#     for color, noise in reversed(list(zip(colors,noise_levels))):
#         min_dist_v_route_length(model_2,noise,ax=plt.subplot(422),color=color)

#     # Memory errors
#     for color, noise in reversed(list(zip(colors,noise_levels))):
#         _ = mem_errors(model,noise,T_outbound,ax=plt.subplot(423),color=color)

#     # Heading errors
#     for color, noise in reversed(list(zip(colors,noise_levels))):
#         _ = heading_errors(model,noise,T_outbound,ax=plt.subplot(424),color=color)

    # # Angular memory errors
    # for color, noise in reversed(list(zip(colors,noise_levels))):
    #     _ = angular_mem_errors(model,noise,T_outbound,ax=plt.subplot(424),color=color)
    
    # Tortuosity
    tortuosity_plot(model,noise,T_outbound,ax=plt.subplot(425))

    # Angle after steps
    angle_offset_after_radius(model,radius=20,ax=plt.subplot(426,projection='polar'))

    # Example path
    example_path(get_random_result(model,noise,T_outbound),ax=plt.subplot(427))
    
    fig.tight_layout()
    

single_model_eval(dye,None)

In [None]:
def model_to_model(models=[]):
    
    labels = [model[0].name.split(".")[0] for model in models]
    fig = plt.figure(figsize=(15,15))
    
    ax = plt.subplot(411)
    data = [get_min_dists(model) for model in models]
    ax.boxplot(data,notch=True,labels=labels)
    ax.set_title("min dists")
    
    ax = plt.subplot(412)
    data = [mem_errors(model).mean(axis=1) for model in models]
    ax.boxplot(data,notch=True,labels=labels)
    ax.set_title("memory error")
    
    ax = plt.subplot(413)
    data = [heading_errors(model).mean(axis=1) for model in models]
    ax.boxplot(data,notch=True,labels=labels)
    ax.set_title("heading error")
    
    ax = plt.subplot(414)
    data = [tortuosity_scores(model) for model in models]
    ax.boxplot(data,notch=True,labels=labels,showfliers=False)
    ax.set_title("Tortuosity rmse")
    
    fig.tight_layout()
        
models = [dye,dye_beta,dye_cheat,weights,pontine,basic]
model_to_model(models)