# Preparation
Before we get to the actual Vicsek implementation, we need a few basics. Imports, enums, helper methods etc.


Installs

In [1]:
!pip install bayesian-optimization



## Imports

In [2]:
import numpy as np
import pandas as pd
import math
import json, codecs
from sklearn.cluster import AgglomerativeClustering
from enum import Enum
from bayes_opt import BayesianOptimization


## Enums
Enums to represent constant values for things such as the different ways of selecting the neighbours

In [3]:
class DistributionType(str, Enum):
    GLOBAL = "G",
    LOCAL_SINGLE_SITE = "LSS"

class EventEffect(Enum):
    ALIGN_TO_FIXED_ANGLE = "align_fixed", "DISTANT", # the same angle is imposed on all particles
    ALIGN_TO_FIXED_ANGLE_NOISE = "align_noise", "DISTANT WITH NOISE", # the same angle is imposed on all particles but subject to noise
    AWAY_FROM_ORIGIN = "origin_away", "PREDATOR", # turn away from the point of origin of the event
    RANDOM = "random", "RANDOM" # sets the orientations to a random value, intended for baseline

    def __init__(self, val, label):
        self.val = val
        self.label = label

class EventSelectionType(str, Enum):
    NEAREST_DISTANCE = "ND",
    RANDOM = "R"

class Metrics(Enum):
    ORDER = "order", "order",
    CLUSTER_NUMBER = "clusternumber", "number of clusters",
    CLUSTER_NUMBER_WITH_RADIUS = "numclusterr", "number of clusters"
    CLUSTER_SIZE = "clustersize", "cluster size",
    ORDER_VALUE_PERCENTAGE = "numorder", "percentage of particles with order-inducing value", # number of order particles
    DUAL_OVERLAY_ORDER_AND_PERCENTAGE = "dual-order-num", "order/percentage of particles with order-inducing value",
    AVERAGE_NUMBER_NEIGHBOURS ="avg_num_neighbours", "number of neighbours",
    MIN_AVG_MAX_NUMBER_NEIGHBOURS = "min_avg_max_num_neighbours", "number of neighbours",
    AVG_DISTANCE_NEIGHBOURS = "avg_distance_neighbours", "distance between neighbours",
    AVG_CENTROID_DISTANCE = "avg_centroid_distance", "average distance from the centroid"

    def __init__(self, val, label):
        self.val = val
        self.label = label

class NeighbourSelectionMechanism(str, Enum):
    NEAREST = "N",
    FARTHEST = "F",
    LEAST_ORIENTATION_DIFFERENCE = "LOD",
    HIGHEST_ORIENTATION_DIFFERENCE = "HOD",
    ALL = "A",
    RANDOM = "R"

class SwitchType(Enum):
    NEIGHBOUR_SELECTION_MECHANISM = "N", "nsms",
    K = "K", "ks",
    SPEED = "SPEED", "speeds",
    ACTIVATION_TIME_DELAY = "ATD", "activationTimeDelays"

    def __init__(self, val, switchTypeValueKey):
        self.val = val
        self.switchTypeValueKey = switchTypeValueKey



## Helper methods

### Saving and loading the simulation results
Once the Vicsek model has been completed, it can be saved and later loaded with the methods in this section

In [4]:
def saveModel(simulationData, path="sample.json", modelParams=None, saveInterval=1, switchValues={'nsms': [], 'ks': [], 'speeds': [], 'activationTimeDelays': []}):
    """
    Saves a model trained by the Viscek simulator implementation.

    Parameters:
        - simulationData (times, positions, orientations): the data to be saved
        - path (string) [optional]: the location and name of the target file
        - modelParams (dict) [optional]: a summary of the model's params such as n, k, neighbourSelectionMode etc.
        - saveInterval (int) [optional]: specifies the interval at which the saving should occur, i.e. if any time steps should be skipped
        - switchValues (array) [optional]: the switch type value assigned to each particle at every timestep

    Returns:
        Nothing. Creates or overwrites a file.
    """
    time, positions, orientations = simulationData
    dict = {"time": __getSpecifiedIntervals(saveInterval, time.tolist()),
            "positions": __getSpecifiedIntervals(saveInterval, positions.tolist()),
            "orientations": __getSpecifiedIntervals(saveInterval, orientations.tolist())}
    for key in switchValues.keys():
        vals = {key :__getSpecifiedIntervals(saveInterval, switchValues[key])}
        dict["switchValues"] = {k : np.array(v).tolist() for k,v in vals.items()} # deal with np.array instances in the values
    __saveDict(path, dict, modelParams)

def loadModel(path, loadSwitchValues=False):
    """
    Loads a single model from a single file.

    Parameters:
        - path (string): the location and file name of the file containing the model data
        - loadSwitchValues (boolean) [optional]: loads the switch type values from the save file

    Returns:
        The model's params as well as the simulation data containing the time, positions, orientations.
    """
    loadedJson = __loadJson(path)

    modelParams = loadedJson["modelParams"]
    time = np.array(loadedJson["time"])
    positions = np.array(loadedJson["positions"])
    orientations = np.array(loadedJson["orientations"])
    if loadSwitchValues == True:
        switchValues = loadedJson["switchValues"]
        return modelParams, (time, positions, orientations), switchValues
    return modelParams, (time, positions, orientations)

def loadModels(paths, loadSwitchValues=False):
    """
    Loads multiple models from multiple files.

    Parameters:
        - paths (array of strings): An array containing the locations and names of the files containing a single model each
        - loadSwitchValues (boolean) [optional]: loads the switch type values from the save files

    Returns:
        Returns an array containing the model params for each model and a second array containing the simulation data for each model. Co-indexed.
    """
    data = []
    params = []
    switchValuesArr = []
    for path in paths:
        if loadSwitchValues == True:
            modelParams, simulationData, switchValues = loadModel(path, loadSwitchValues=loadSwitchValues)
            params.append(modelParams)
            data.append(simulationData)
            switchValuesArr.append(switchValues)
        else:
            modelParams, simulationData = loadModel(path, loadSwitchValues=loadSwitchValues)
            params.append(modelParams)
            data.append(simulationData)
    if loadSwitchValues == True:
        return params, data, switchValuesArr
    return params, data

def saveTimestepsResults(results, path, modelParams=None, saveInterval=1):
    """
    Saves evaluator results for all timesteps.

    Parameters:
        - results (dictionary): the evaluation results per timestep
        - path (string): the location and name of the target file
        - modelParams (dict) [optional]: a summary of the model's params such as n, k, neighbourSelectionMode etc.
        - saveInterval (int) [optional]: specifies the interval at which the saving should occur, i.e. if any time steps should be skipped

    Returns:
        Nothing. Creates or overwrites a file.
    """
    dict = {"time": __getSpecifiedIntervals(saveInterval, list(results.keys())),
            "results": __getSpecifiedIntervals(saveInterval, list(results.values()))}
    __saveDict(path, dict, modelParams)

def loadTimestepsResults(path):
    """
    Loads the evaluation results from a single file.

    Parameters:
        - path (string): the location and file name of the file containing the model data

    Returns:
        The model's params as well as the evaluation data as a {time: results} dictionary
    """
    loadedJson = __loadJson(path)
    modelParams = loadedJson["modelParams"]
    time = np.array(loadedJson["time"])
    results = np.array(loadedJson["results"])
    data = {time[i]: results[i] for i in range(len(time))}
    return modelParams, data



def __getSpecifiedIntervals(interval, lst):
    """
    Selects the data within the list which coincides with the specified interval, e.g. every third data point.

    Parameters:
        - interval (int): which data points should be considered, e.g. 3 would indicate indices 0, 3, 6 etc.
        - lst (list): the data to be reduced according to the intervals

    Returns:
        A reduced list containing only the data points of the original list at the specified intervals.
    """
    return [lst[idx] for idx in range(0, len(lst)) if idx % interval == 0]

def __saveDict(path, dict, modelParams=None):
    """
    Saves the values of a dictionary to a file at the specified path.

    Parameters:
        - path (string): the location and name of the target file
        - dict (dictionary): the dictionary containing the data to be saved
        - modelParams (dict) [optional]: a summary of the model's params such as n, k, neighbourSelectionMode etc.

    Returns:
        Nothing. Creates or overwrites a file.
    """
    if modelParams != None:
        paramsDict = {"modelParams": modelParams}
        paramsDict.update(dict)
        dict = paramsDict

    with open(path, "w") as outfile:
        json.dump(dict, outfile)

def __loadJson(path):
    """
    Loads data as JSON from a single file.

    Parameters:
        - path (string): the location and file name of the file containing the data

    Returns:
        All the data from the file as JSON.
    """
    obj_text = codecs.open(path, 'r', encoding='utf-8').read()
    return json.loads(obj_text)

### General helper methods

In [5]:

def createListOfFilenamesForI(baseFilename, maxI, minI=0, fileTypeString="json"):
    filenames = []
    for i in range(minI, maxI):
        filenames.append(baseFilename + f"_{i}.{fileTypeString}")
    return filenames


def evaluateSingleTimestep(positions, orientations, metric, domainSize=None, radius=None, threshold=0.99, switchTypeValues=None, switchType=None, switchTypeOptions=None):
     """
        Evaluates the simulation data for a single timestep according to the selected metric.

        Parameters:
            - positions (array): the position of every particle at this timestep
            - orientations (array): the orientation of every particle at this timestep
            - metric (EnumMetrics): the metric for evaluating the data
            - radius (int) [optional]: the perception radius of every particle. Radius is only relevant for certain metrics such as Clustering, therefore it can be None for the others.
            - threshold (float) [optional]: the threshold for the agglomerative clustering
            - switchTypeValues (array) [optional]: the switch type values for individual switching
            - switchTypeOptions (tuple) [optional]: contains the orderValue and the disorderValue respectively
        Returns:
            An array of the results according to the metric.
     """
     n = len(positions)
     match metric:
        case Metrics.ORDER:
            return computeGlobalOrder(orientations)
        case Metrics.CLUSTER_NUMBER:
            nClusters, _ = findClusters(orientations, threshold)
            return nClusters
        case Metrics.CLUSTER_NUMBER_WITH_RADIUS:
            nClusters, _ = findClustersWithRadius(positions, orientations, domainSize, radius, threshold)
            return nClusters
        case Metrics.CLUSTER_SIZE:
            nClusters, clusters = findClusters(orientations, threshold)
            # TODO: make sure the change from array to dict is taken care of in the visualisation
            clusterSizes = computeClusterSizes(clusters)
            return clusterSizes
        case Metrics.ORDER_VALUE_PERCENTAGE:
            orderCount, _ = getNumbersPerSwitchTypeValue(switchTypeValues, switchType, switchTypeOptions)
            return orderCount
        case Metrics.DUAL_OVERLAY_ORDER_AND_PERCENTAGE: # not a single metric but rather overlaying two metrics in the same graph
            order = computeGlobalOrder(orientations)
            orderCount, _ = getNumbersPerSwitchTypeValue(switchTypeValues, switchType, switchTypeOptions)
            return order, orderCount/100 # normalise to fit with order
        case Metrics.AVERAGE_NUMBER_NEIGHBOURS:
            _, avg, _ =  getMinAvgMaxNumberOfNeighbours(positions, domainSize, radius)
            return avg
        case Metrics.MIN_AVG_MAX_NUMBER_NEIGHBOURS:
            return getMinAvgMaxNumberOfNeighbours(positions, domainSize, radius)
        case Metrics.AVG_DISTANCE_NEIGHBOURS:
            _, avg, _ = getMinAvgMaxDistanceOfNeighbours(positions, domainSize, radius)
            return avg
        case Metrics.AVG_CENTROID_DISTANCE:
            _, avg, _ = getMinAvgMaxDistanceFromCentroid(positions)
            return avg

def computeGlobalOrder(orientations):
    """
    Computes the order within the provided orientations.
    Can also be called for a subsection of all particles by only providing their orientations.

    Params:
        - orientations (array of (u,v)-coordinates): the orientation of all particles that should be included

    Returns:
        A float representing the order in the given orientations
    """
    """
    sumOrientation = [0,0]
    for j in range(len(orientations)):
        sumOrientation += orientations[j]
    return np.sqrt(sumOrientation[0]**2 + sumOrientation[1]**2) / len(orientations)
    """
    sumOrientation = np.sum(orientations[np.newaxis,:,:],axis=1)
    return np.divide(np.sqrt(np.sum(sumOrientation**2,axis=1)), len(orientations))[0]

def computeLocalOrders(orientations, neighbours):
    """
    Computes the local order for every individual.

    Params:
        - orientations (array of floats): the orientation of every individual at the current timestep
        - neighbours (array of arrays of booleans): the identity of every neighbour of every individual

    Returns:
        An array of floats representing the local order for every individual at the current time step (values between 0 and 1)
    """
    sumOrientation = np.sum(neighbours[:,:,np.newaxis]*orientations[np.newaxis,:,:],axis=1)
    return np.divide(np.sqrt(np.sum(sumOrientation**2,axis=1)), np.count_nonzero(neighbours, axis=1))

def findClusters(orientations, threshold):
    """
    Find clusters in the data using AgglomerativeClustering.

    Params:
        - orientations (array of arrays of float): the orientation of every particle at every timestep
        - threshold (float): the threshold used to cut the tree in AgglomerativeClustering

    Returns:
        The number of clusters, the labels of the clusters
    """
    cluster = AgglomerativeClustering(n_clusters=None, metric='euclidean', linkage='single', compute_full_tree=True, distance_threshold=threshold)

    # Cluster the data
    cluster.fit_predict(orientations)

    # number of clusters
    nClusters = 1+np.amax(cluster.labels_)

    return nClusters, cluster.labels_

def findClustersWithRadius(positions, orientations, domainSize, radius, threshold=0.01):
    """
    Finds clusters in the particle distribution. The clustering is performed according to the following constraints:
        - to belong to a cluster, a particle needs to be within the radius of at least one other member of the same cluster
        - to belong to a cluster, the orientation has to be similar or equal (<= 1.29° orientation difference by default)

    Parameters:
        - positions (array): the position of every particle at the current timestep
        - orientations (array): the orientations of every particle at the current timestep
        - radius (int): the perception radius of the particles

    Returns:
        A tuple containing the number of clusters and the clusters.
    """
    # TODO refactor
    if radius == None:
        print("ERROR: Radius needs to be provided for clustering.")

    n = len(positions)
    clusters = np.full(n, -1)

    neighbours = ServiceVicsekHelper.getNeighbours(positions=positions, domainSize=domainSize, radius=radius)

    clusterNumber, baseClusters = findClusters(orientations, threshold)
    neighbourIndices = np.argwhere(neighbours)
    neighbourIndicesRegrouped = {}
    maxClusterId = clusterNumber -1
    for indexPair in neighbourIndices:
        if indexPair[0] in neighbourIndicesRegrouped.keys():
            neighbourIndicesRegrouped[indexPair[0]].append(indexPair[1])
        else:
            neighbourIndicesRegrouped[indexPair[0]] = [indexPair[1]]

    for c in range(clusterNumber):
        members = np.where(baseClusters==c)
        for member in members[0]:
            maxClusterId = updateClusters(member, c, maxClusterId, clusters, members[0], neighbourIndicesRegrouped)


    return len(np.unique(clusters)), clusters

def updateClusters(currentIdx, clusterId, maxClusterId, clusters, candidates, neighbourIndices):
    # if the currentIdx is already assigned a value, we return
    if clusters[currentIdx] != -1:
        return maxClusterId
    # the first member is always automatically ok
    if currentIdx == candidates[0]:
        clusters[currentIdx] = clusterId
    else:
        # if any of the neighbours are also members of the same cluster, we set the clusterId
        for neighbour in neighbourIndices[currentIdx]:
            if clusters[neighbour] != -1 and neighbour in candidates:
                clusters[currentIdx] = clusters[neighbour]
        # if not, we set a new clusterId
        if clusters[currentIdx] == -1:
            maxClusterId += 1
            clusters[currentIdx] = maxClusterId
    return maxClusterId


def computeClusterSizes(clusters):
    """
    Computes the size of every cluster.

    Parameters:
        - clusterCounter (int): the total number of clusters in the current state of the domain
        - clusters (array): array containing the id of the cluster that every particle belongs to

    Returns:
        An dictionary with the ids of the cluster as keys and the number of members as values.
    """
    unique, counts = np.unique(clusters, return_counts=True)
    return dict(zip(unique, counts))

def getNumbersPerSwitchTypeValue(switchTypeValues, switchType, switchTypeOptions):
    """
    Counts the occurrences for all switch type values.

    Params:
        - switchTypeValues (array): the switch type values for individual switching
        - switchTypeOptions (array): all possible switch type values
    Returns:
        Two integer representing the counts of the orderValue and disorderValue respectively
    """
    unique, counts = np.unique(switchTypeValues[switchType.switchTypeValueKey], return_counts=True)
    d = dict(zip(unique, counts))
    n = sum(list(d.values()))
    if d.get(switchTypeOptions[0]) != None:
        percentageOrdered = d.get(switchTypeOptions[0])/n
    else:
        percentageOrdered = 0
    if d.get(switchTypeOptions[1]) != None:
        percentageDisordered = d.get(switchTypeOptions[1])/n
    else:
        percentageDisordered = 0

    return percentageOrdered * 100, percentageDisordered * 100

def checkTurnSuccess(orientations, fixedAngle, noise, eventStartTimestep, interval=100):
    """
    Checks if the event has managed to make the whole swarm align to the new angle or if the defecting group has been
    reabsorbed.
    Is only really useful for EventEffect.ALIGN_TO_FIXED_ANGLE (DISTANT).

    Params:
        - orientations (array of (u,v)-coordinates): the orientation of every particle at every timestep
        - fixedAngle (angle in radians): the angle to which the affected particles have turned
        - noise (float): the noise level in the domain impacting the actual orientations
        - eventStartstep (int): the timestep at which the event first occurs
        - eventDuration (int) [optional]: the number of timesteps that need to pass before comparing. By default 100.

    Returns:
        Boolean signifying whether the whole swarm has managed to align to the fixed angle.
    """
    # TODO refactor
    #print("starting turn success eval...")
    if eventStartTimestep == 0:
        raise Exception("Cannot be used if there is no previous timestep for comparison")
    if eventStartTimestep + interval > len(orientations[0]):
        interval -= 1

    orientationsBefore = orientations[eventStartTimestep-1]
    orientationsExpected = [ServiceOrientations.computeUvCoordinates(fixedAngle) for orient in orientationsBefore]
    orientationsAfter = orientations[eventStartTimestep+interval]
    if (1-computeGlobalOrder(orientationsAfter)) <= noise: # if the swarm is aligned
        before = [ServiceOrientations.normaliseAngle(ServiceOrientations.computeAngleForOrientation(orientationsBefore[i])) for i in range(0, len(orientationsBefore))]
        after = [ServiceOrientations.normaliseAngle(ServiceOrientations.computeAngleForOrientation(orientationsAfter[i])) for i in range(0, len(orientationsAfter))]
        expected = [ServiceOrientations.normaliseAngle(ServiceOrientations.computeAngleForOrientation(orientationsExpected[i])) for i in range(0, len(orientationsExpected))]

        beforeAvg = np.average(before)
        afterAvg = np.average(after)
        expectedAvg = np.average(expected)

        if np.absolute(beforeAvg-expectedAvg) <= noise:
            #print("No turn necessary")
            return "not_necessary"

        # if the average new angle is closer to the expected angle and the difference between the expected and the new angles can be explained by noise, the turn was successful
        if np.absolute(expectedAvg-afterAvg) < np.absolute(beforeAvg-afterAvg) and np.absolute(expectedAvg-afterAvg) <= noise:
            return "turned"

    return "not_turned"

def getMinAvgMaxNumberOfNeighbours(positions, domainSize, radius):
    """
    Determines the minimum, average and maximum of neighbours perceived by the particles

    Params:
        - positions (array of (x,y)-coordinates): the current positions of all particles
        - radius (float): the perception radius of the particles

    Returns:
        3 floats representing the minimum, average and maximum number of neighbours
    """
    neighbours = ServiceVicsekHelper.getNeighbours(positions=positions, domainSize=domainSize, radius=radius)
    np.fill_diagonal(neighbours, False)
    neighbourNumbersArray = np.count_nonzero(neighbours, axis=1)
    return np.min(neighbourNumbersArray), np.average(neighbourNumbersArray), np.max(neighbourNumbersArray)

def getMinAvgMaxDistanceOfNeighbours(positions, domainSize, radius):
    neighbours = ServiceVicsekHelper.getNeighbours(positions=positions, domainSize=domainSize, radius=radius)
    np.fill_diagonal(neighbours, False)
    posDiff = np.sqrt(ServiceVicsekHelper.getPositionDifferences(positions=positions, domainSize=domainSize))
    maskedArray = np.ma.MaskedArray(posDiff, mask=neighbours==False, fill_value=0)
    return np.min(maskedArray), np.average(maskedArray), np.max(maskedArray)

def getMinAvgMaxDistanceFromCentroid(positions):
    centroid = np.mean(positions, axis=0)
    distances = [math.dist(pos, centroid) for pos in positions]
    return np.min(distances), np.average(distances), np.max(distances)

def normalizeOrientations(orientations):
    """
    Normalises the orientations of all particles for the current time step

    Parameters:
        - orientations (array): The current orientations of all particles

    Returns:
        The normalised orientations of all particles as an array.
    """
    return orientations/(np.sqrt(np.sum(orientations**2,axis=1))[:,np.newaxis])

def computeUvCoordinates(angle):
    """
    Computes the (u,v)-coordinates based on the angle.

    Params:
        - angle (float): the angle in radians

    Returns:
        An array containing the [u, v]-coordinates corresponding to the angle.
    """
    # compute the uv-coordinates
    U = np.cos(angle)
    V = np.sin(angle)

    return [U,V]

def computeUvCoordinatesForList(angles):
    """
    Computes the (u,v)-coordinates based on the angle.

    Params:
        - angle (float): the angle in radians

    Returns:
        An array containing the [u, v]-coordinates corresponding to the angle.
    """
    # compute the uv-coordinates
    U = np.cos(angles)
    V = np.sin(angles)

    return np.column_stack((U,V))

def computeAngleForOrientation(orientation):
    """
    Computes the angle in radians based on the (u,v)-coordinates of the current orientation.

    Params:
        - orientation (array of floats): the current orientation in (u,v)-coordinates

    Returns:
        A float representin the angle in radians.
    """
    return np.arctan2(orientation[1], orientation[0])

def computeAnglesForOrientations(orientations):
    """
    Computes the angle in radians based on the (u,v)-coordinates of the current orientation.

    Params:
        - orientation (array of floats): the current orientation in (u,v)-coordinates

    Returns:
        A float representin the angle in radians.
    """
    return np.arctan2(orientations[:, 1], orientations[:, 0])

def computeAnglesWithRespectToFocusPoint(positions, focusPoint):
    return np.arctan2(positions[:, 1]-focusPoint[1], positions[:, 0]-focusPoint[0])


def getDomainSizeForConstantDensity(density, numberOfParticles):
    """
    Computes the domain size to keep the density constant for the supplied number of particles.
    Density formula: "density" = "number of particles" / "domain area"

    Parameters:
        - density (float): the desired constant density of the domain
        - numberOfParticles (int): the number of particles to be placed in the domain

    Returns:
        A tuple containing the x and y dimensions of the domain size that corresponds to the density.
    """
    area = numberOfParticles / density
    return (np.sqrt(area), np.sqrt(area))

def getNumberOfParticlesForConstantDensity(density, domainSize):
    """
    Computes the number of particles to keep the density constant for the supplied domain size.
    Density formula: "density" = "number of particles" / "domain area"

    Parameters:
        - density (float): the desired constant density of the domain
        - domainSize (tuple): tuple containing the x and y dimensions of the domain size

    Returns:
        The number of particles to be placed in the domain that corresponds to the density.
    """
    return int(density * (domainSize[0] * domainSize[1])) # density * area

def getDensity(domainSize, numberOfParticles):
    """
    Computes the density of a given system.
    Density formula: "density" = "number of particles" / "domain area"

    Parameters:
        - domainSize (tuple): tuple containing the x and y dimensions of the domain size
        - numberOfParticles (int): the number of particles to be placed in the domain

    Returns:
        The density of the system as a float.
    """
    return numberOfParticles / (domainSize[0] * domainSize[1]) # n / area

def getNoiseAmplitudeValueForPercentage(percentage):
    """
    Paramters:
        - percentage (int, 1-100)
    """
    return 2 * np.pi * (percentage/100)

def getRadiusToSeeOnAverageNNeighbours(n, density):
    """
    Computes the radius that will ensure that every particle sees at least n other particles
    if the density is equally distributed in the whole domain.

    Params:
        - n (int): the number of neighbours that the particle should be able to see
        - density (float): the domain density (assumed to be equally distributed)

    Returns:
        An integer representing the perception radius of each particle
    """
    area = n/density
    return np.ceil(np.sqrt(area))

def createOrderedInitialDistributionEquidistancedIndividual(startSwitchTypeValue, domainSize, numberOfParticles, angleX=None, angleY=None):
    """
    Creates an ordered, equidistanced initial distribution of particles in a domain ready for use in individual decision scenarios.
    The particles are placed in a grid-like fashion. The orientation of the particles is random unless specified
    but always the same for all particles.

    Parameters:
        - domainSize (tuple): tuple containing the x and y dimensions of the domain size
        - numberOfParticles (int): the number of particles to be placed in the domain
        - angleX (float [0,1)): first angle component to specify the orientation of all particles
        - angleY (float [0,1)): second angle component to specify the orientation of all particles

    Returns:
        Positions and orientations for all particles within the domain. Can be used as the initial state of a Vicsek simulation.
    """
    positions, orientations = createOrderedInitialDistributionEquidistanced(domainSize, numberOfParticles, angleX, angleY)
    if startSwitchTypeValue != None:
        switchTypeValues = numberOfParticles * [startSwitchTypeValue]
        return positions, orientations, switchTypeValues
    return positions, orientations

def createOrderedInitialDistributionEquidistanced(domainSize, numberOfParticles, angleX=None, angleY=None):
    """
    Creates an ordered, equidistanced initial distribution of particles in a domain.
    The particles are placed in a grid-like fashion. The orientation of the particles is random unless specified
    but always the same for all particles.

    Parameters:
        - domainSize (tuple): tuple containing the x and y dimensions of the domain size
        - numberOfParticles (int): the number of particles to be placed in the domain
        - angleX (float [0,1)): first angle component to specify the orientation of all particles
        - angleY (float [0,1)): second angle component to specify the orientation of all particles

    Returns:
        Positions and orientations for all particles within the domain. Can be used as the initial state of a Vicsek simulation.
    """
    # choose random angle for orientations
    if angleX is None:
        angleX = random.random()
    if angleY is None:
        angleY = random.random()

    # prepare the distribution for the positions
    xLength = domainSize[0]
    yLength = domainSize[1]

    area = xLength * yLength
    pointArea = area / numberOfParticles
    length = np.sqrt(pointArea)

    # initialise the initialState components
    positions = np.zeros((numberOfParticles, 2))
    orientations = np.zeros((numberOfParticles, 2))

    # set the orientation for all particles
    orientations[:, 0] = angleX
    orientations[:, 1] = angleY

    counter = 0
    # set the position of every particle
    for x in np.arange(length/2, xLength, length):
        for y in np.arange(length/2, yLength, length):
            if counter < numberOfParticles:
                positions[counter] = [x,y]
            counter += 1

    return positions, orientations


def createOrderedInitialDistributionEquidistancedForLowNumbers(domainSize, numberOfParticles, angleX=None, angleY=None):
    """
    Creates an ordered, equidistanced initial distribution of particles in a domain.
    The particles are placed in a grid-like fashion. The orientation of the particles is random unless specified
    but always the same for all particles.

    Parameters:
        - domainSize (tuple): tuple containing the x and y dimensions of the domain size
        - numberOfParticles (int): the number of particles to be placed in the domain
        - angleX (float [0,1)): first angle component to specify the orientation of all particles
        - angleY (float [0,1)): second angle component to specify the orientation of all particles

    Returns:
        Positions and orientations for all particles within the domain. Can be used as the initial state of a Vicsek simulation.

    """
    # choose random angle for orientations
    if angleX is None:
        angleX = random.random()
    if angleY is None:
        angleY = random.random()

    # prepare the distribution for the positions
    xLength = domainSize[0]
    yLength = domainSize[1]

    area = xLength * yLength
    pointArea = area / numberOfParticles
    length = np.sqrt(pointArea)

    # initialise the initialState components
    positions = np.zeros((numberOfParticles, 2))
    orientations = np.zeros((numberOfParticles, 2))

    # set the orientation for all particles
    orientations[:, 0] = angleX
    orientations[:, 1] = angleY

    counter = 0
    # set the position of every particle
    for x in np.arange(length/2, xLength, length):
        positions[counter] = [x,x]
        counter += 1

    return positions, orientations


def createInitialStateInCircle(domainSize, center, radius, numberOfParticles, isOrdered, startSwitchTypeValue=None):
    t = np.random.uniform(0, 1, size=numberOfParticles)
    u = np.random.uniform(0, 1, size=numberOfParticles)

    x = center[0] + (radius*np.sqrt(t) * np.cos(2*np.pi*u))
    y = center[1] + (radius*np.sqrt(t) * np.sin(2*np.pi*u))

    positions = np.column_stack((x, y))

    if isOrdered:
        baseOrientation = np.random.rand(1, len(domainSize))-0.5
        orientations = numberOfParticles * baseOrientation
    else:
        orientations = np.random.rand(numberOfParticles, len(domainSize))-0.5
    orientations = ServiceOrientations.normalizeOrientations(orientations)

    return positions, orientations

def getDifferences(array, domainSize):
    """
    Computes the differences between all individuals for the values provided by the array.

    Params:
        - array (array of floats): the values to be compared

    Returns:
        An array of arrays of floats containing the difference between each pair of values.
    """
    rij=array[:,np.newaxis,:]-array
    rij = rij - domainSize*np.rint(rij/domainSize) #minimum image convention
    return np.sum(rij**2,axis=2)

def getOrientationDifferences(orientations, domainSize):
    """
    Helper method to gloss over identical differences implementation for position and orientation.
    """
    return getDifferences(orientations, domainSize)

def getPositionDifferences(positions, domainSize):
    """
    Helper method to gloss over identical differences implementation for position and orientation.
    """
    return getDifferences(positions, domainSize)

def getNeighbours(positions, domainSize, radius):
    """
    Determines all the neighbours for each individual.

    Params:
        - positions (array of floats): the position of every individual at the current timestep

    Returns:
        An array of arrays of booleans representing whether or not any two individuals are neighbours
    """
    rij2 = getPositionDifferences(positions, domainSize)
    return (rij2 <= radius**2)

def getNeighboursWithLimitedVision(positions, orientations, domainSize, radius, degreesOfVision):
    candidates = getNeighbours(positions=positions, domainSize=domainSize, radius=radius)
    minAngles, maxAngles = determineMinMaxAngleOfVision(orientations=orientations, degreesOfVision=degreesOfVision)
    inFieldOfVision = isInFieldOfVision(positions=positions, minAngles=minAngles, maxAngles=maxAngles)

    combined = candidates & inFieldOfVision
    np.fill_diagonal(combined, True)
    return combined

def padArray(a, n, kMin, kMax):
    if kMax > len(a[0]):
        minusDiff = np.full((n,kMax-kMin), -1)
        return np.concatenate((a, minusDiff), axis=1)
    return a

def getIndicesForTrueValues(a):
    indices = np.transpose(np.nonzero(a))
    perRow = np.full(len(a), None)
    maxLength = 0
    for idx in indices:
        if perRow[idx[0]] == None:
            perRow[idx[0]] = [idx[1]]
        else:
            perRow[idx[0]].append(idx[1])
        if len(perRow[idx[0]]) > maxLength:
            maxLength = len(perRow[idx[0]])
    result = []
    for rowIdx in range(len(a)):
        if perRow[rowIdx] == None:
            result.append(np.full(maxLength, -1))
        else:
            #result.append(np.pad(perRow[rowIdx], maxLength, ))
            pr = np.array(perRow[rowIdx])
            result.append(np.pad(pr, ((0, maxLength-pr.shape[0])), 'constant', constant_values=-1))

    return np.array(result)

def getIndicesForTrueValuesWithPadding(a, n, kMin, kMax):
    withoutPadding = getIndicesForTrueValues(a)
    if len(withoutPadding) == 0 or len(withoutPadding[0]) == 0:
        return np.full((len(a), kMax), -1)
    return padArray(withoutPadding, n, np.max(np.min(withoutPadding), 0), kMax)

def revertTimeDelayedChanges(t, oldValues, newValues, activationTimeDelays):
    vals = np.where((np.array([t % activationTimeDelays == 0, t % activationTimeDelays == 0]).T), newValues, oldValues)
    return vals

def determineMinMaxAngleOfVision(orientations, degreesOfVision):
    """
    Determines the boundaries of the field of vision of a particle.

    Params:
        - orientation (array of floats): the current orientation of the particle
        - degreesOfVision (int [0,2*np.pi]): how many degrees of its surroundings the particle can see.

    Returns:
        Two integers representing the angular boundary of vision, i.e. the minimal and maximal angle that is still visible to the particle
    """
    angularDistance = degreesOfVision / 2
    currentAngles = normaliseAngles(computeAnglesForOrientations(orientations))

    minAngles = normaliseAngles(currentAngles - angularDistance)
    maxAngles = normaliseAngles(currentAngles + angularDistance)

    return minAngles, maxAngles

def getRelativeAngles(positions):
    posDiffs=positions-positions[:,np.newaxis,:]
    relativeAngles = np.arctan(posDiffs[:, :, 1], posDiffs[:, :, 0])
    angles = relativeAngles % (2*np.pi)
    return angles

def isInFieldOfVision(positions, minAngles, maxAngles):
    """
    Checks if a given particle is within the field of vision of the current particle.

    Params:
        - positionParticle (array of floats): the position of the current particle in (x,y)-coordinates
        - positionCandidate (array of floats): the position of the particle that is considered as potentially visible to the current particle
        - minAngle (float): the left boundary of the field of vision
        - maxAngle (float): the right boundary of the field of vision

    Returns:
        A boolean representing whether the given particle is in the field of vision of the current particle.
    """
    angles = getRelativeAngles(positions)
    return ((minAngles < maxAngles) & ((angles >= minAngles) & (angles <= maxAngles))) | ((minAngles >= maxAngles) & ((angles >= minAngles) | (angles <= maxAngles)))

def normaliseAngles(angles):
    """
    Normalises the degrees of an angle to be between 0 and 2pi.

    Params:
        - angle (float): the angle in radians

    Returns:
        Float representing the normalised angle.
    """
    angles = np.where((angles < 0), ((2*np.pi)-np.absolute(angles)), angles)
    angles = np.where((angles > (2*np.pi)), (angles % (2*np.pi)), angles)

    return angles

## Switches
Serveral types of switches are possible within the system, most importantly choosing between two values of k or two neighbour selection mechanisms.

In [6]:
class SwitchInformation(object):
    def __init__(self, switchType, values, thresholds, numberPreviousStepsForThreshold, initialValues=None):
        self.switchType = switchType
        self.thresholds = thresholds
        self.orderSwitchValue, self.disorderSwitchValue = values
        self.numberPreviousStepsForThreshold = numberPreviousStepsForThreshold
        self.initialValues = initialValues

        self.lowerThreshold, self.upperThreshold = self.__getLowerAndUpperThreshold()

    def getParameterSummary(self):
        return f"{self.switchType.value}_o={self.orderSwitchValue}_do={self.disorderSwitchValue}_th={self.lowerThreshold}-{self.upperThreshold}_p={self.numberPreviousStepsForThreshold}"

    def __getLowerAndUpperThreshold(self):
        """
        Determines the lower and upper thresholds for hysteresis.

        Params:
            None

        Returns:
            Two floats representing the lower and upper threshold respectively
        """
        if len(self.thresholds) == 1:
            switchDifferenceThresholdLower = self.thresholds[0]
            switchDifferenceThresholdUpper = 1 - self.thresholds[0]
        else:
            switchDifferenceThresholdLower = self.thresholds[0]
            switchDifferenceThresholdUpper = self.thresholds[1]
        return switchDifferenceThresholdLower, switchDifferenceThresholdUpper

class SwitchSummary(object):

    def __init__(self, switches):
        self.rawSwitches = switches
        self.switches = self.__transformSwitches(switches)
        self.actives = self.__determineActives()

    def getParameterSummary(self):
        return "_".join([switch.getParameterSummary() for switch in self.rawSwitches])

    def isActive(self, switchType):
        return self.actives[switchType]

    def getBySwitchType(self, switchType):
        if self.isActive(switchType):
            return self.switches[switchType]
        return None

    def getMinMaxValuesForKSwitchIfPresent(self):
        if self.isActive(SwitchType.K):
            kSwitch = self.getBySwitchType(SwitchType.K)
            kMin = np.min([kSwitch.orderSwitchValue, kSwitch.disorderSwitchValue])
            kMax = np.max([kSwitch.orderSwitchValue, kSwitch.disorderSwitchValue])
            return kMin, kMax
        return None, None

    def __transformSwitches(self, switches):
        return {switch.switchType: switch for switch in switches}

    def __determineActives(self):
        actives = {}
        for switchType in SwitchType:
            isPresent = False
            for switch in self.rawSwitches:
                if switch.switchType == switchType:
                    isPresent = True
            actives[switchType] = isPresent
        return actives

# Events
Serveral types of events can occur in a given system. These take care of the orientation adaptations and all other changes that occur because of the event itself

In [7]:
class BaseEvent:
    # TODO refactor to allow areas with a radius bigger than the radius of a particle, i.e. remove neighbourCells and determine all affected cells here
    """
    Representation of an event occurring at a specified time and place within the domain and affecting
    a specified percentage of particles. After creation, the check()-method takes care of everything.
    """
    def __init__(self, startTimestep, duration, domainSize, eventEffect, noisePercentage=None, blockValues=False, alterValues=False,
                 switchSummary=None):
        """
        Creates an external stimulus event that affects part of the swarm at a given timestep.

        Params:
            - timestep (int): the timestep at which the stimulus is presented and affects the swarm
            - percentage (float, range: 0-100): how many percent of the swarm is directly affected by the event
            - angle (int, range: 1-359): how much the orientation of the affected particles is changed in a counterclockwise manner
            - eventEffect (EnumEventEffect): how the orientations should be affected
            - distributionType (EnumDistributionType) [optional]: how the directly affected particles are distributed, i.e. if the event occurs globally or locally
            - areas ([(centerXCoordinate, centerYCoordinate, radius)]) [optional]: list of areas in which the event takes effect. Should be specified if the distributionType is not GLOBAL and match the DistributionType
            - domainSize (tuple of floats) [optional]: the size of the domain
            - targetSwitchValue (switchTypeValue) [optional]: the value that every affected particle should select

        Returns:
            No return.
        """
        self.startTimestep = startTimestep
        self.duration = duration
        self.eventEffect = eventEffect
        self.domainSize = np.asarray(domainSize)
        self.noisePercentage = noisePercentage
        self.blockValues = blockValues
        self.alterValues = alterValues
        self.switchSummary = switchSummary
        if self.noisePercentage != None:
            self.noise = getNoiseAmplitudeValueForPercentage(self.noisePercentage)

    def getShortPrintVersion(self):
        return f"t{self.startTimestep}d{self.duration}e{self.eventEffect.val}"

    def getParameterSummary(self):
        summary = {"startTimestep": self.startTimestep,
            "duration": self.duration,
            "eventEffect": self.eventEffect.val,
            "domainSize": self.domainSize.tolist(),
            "noisePercentage": self.noisePercentage,
            "blockValues": self.blockValues,
            "alterValues": self.alterValues
            }
        if self.switchSummary:
            summary["switchSummary"] = self.switchSummary.getParameterSummary()
        else:
            summary["switchSummary"] = None
        return summary

    def check(self, totalNumberOfParticles, currentTimestep, positions, orientations, nsms, ks, speeds, dt=None, activationTimeDelays=None, isActivationTimeDelayRelevantForEvent=False):
        """
        Checks if the event is triggered at the current timestep and executes it if relevant.

        Params:
            - totalNumberOfParticles (int): the total number of particles within the domain. Used to compute the number of affected particles
            - currentTimestep (int): the timestep within the experiment run to see if the event should be triggered
            - positions (array of tuples (x,y)): the position of every particle in the domain at the current timestep
            - orientations (array of tuples (u,v)): the orientation of every particle in the domain at the current timestep
            - switchValues (array of switchTypeValues): the switch type value of every particle in the domain at the current timestep
            - cells (array: [(minX, minY), (maxX, maxY)]): the cells within the cellbased domain
            - cellDims (tuple of floats): the dimensions of a cell (the same for all cells)
            - cellToParticleDistribution (dictionary {cellIdx: array of indices of all particles within the cell}): A dictionary containing the indices of all particles within each cell

        Returns:
            The orientations of all particles - altered if the event has taken place, unaltered otherwise.
        """
        blocked = np.full(totalNumberOfParticles, False)
        if self.checkTimestep(currentTimestep):
            #if currentTimestep == self.startTimestep or currentTimestep == (self.startTimestep + self.duration):
                #print(f"executing event at timestep {currentTimestep}")
            alteredOrientations, alteredNsms, alteredKs, alteredSpeeds, blockedUpdate = self.executeEvent(totalNumberOfParticles=totalNumberOfParticles, positions=positions, orientations=orientations, nsms=nsms, ks=ks, speeds=speeds, dt=dt)

            if isActivationTimeDelayRelevantForEvent:
                orientations = revertTimeDelayedChanges(currentTimestep, orientations, alteredOrientations, activationTimeDelays)
            else:
                orientations = alteredOrientations
                if self.blockValues:
                    blocked = blockedUpdate
                if self.alterValues:
                    nsms = alteredNsms
                    ks = alteredKs
                    speeds = alteredSpeeds
        return orientations, nsms, ks, speeds, blocked

    def checkTimestep(self, currentTimestep):
        """
        Checks if the event should be triggered.

        Params:
            - currentTimestep (int): the timestep within the experiment run to see if the event should be triggered

        Returns:
            A boolean representing whether or not the event should be triggered.
        """
        return self.startTimestep <= currentTimestep and currentTimestep <= (self.startTimestep + self.duration)

    def applyNoiseDistribution(self, orientations):
        return orientations + np.random.normal(scale=self.noise, size=(len(orientations), len(self.domainSize)))

    def executeEvent(self, totalNumberOfParticles, positions, orientations, nsms, ks, speeds, dt):
        """
        Executes the event.

        Params:
            - totalNumberOfParticles (int): the total number of particles within the domain. Used to compute the number of affected particles
            - positions (array of tuples (x,y)): the position of every particle in the domain at the current timestep
            - orientations (array of tuples (u,v)): the orientation of every particle in the domain at the current timestep
            - switchValues (array of switchTypeValues): the switch type value of every particle in the domain at the current timestep
            - cells (array: [(minX, minY), (maxX, maxY)]): the cells within the cellbased domain
            - cellDims (tuple of floats): the dimensions of a cell (the same for all cells)
            - cellToParticleDistribution (dictionary {cellIdx: array of indices of all particles within the cell}): A dictionary containing the indices of all particles within each cell

        Returns:
            The orientations, switchTypeValues of all particles after the event has been executed as well as a list containing the indices of all affected particles.
        """
        # base event does not do anything here
        return orientations, nsms, ks, speeds, np.full(totalNumberOfParticles, False)


class ExternalStimulusOrientationChangeEvent(BaseEvent):
    # TODO refactor to allow areas with a radius bigger than the radius of a particle, i.e. remove neighbourCells and determine all affected cells here
    """
    Representation of an event occurring at a specified time and place within the domain and affecting
    a specified percentage of particles. After creation, the check()-method takes care of everything.
    """
    def __init__(self, startTimestep, duration, domainSize, eventEffect, distributionType, areas=None, radius=None, numberOfAffected=None, eventSelectionType=None, angle=None, noisePercentage=None, blockValues=False):
        """
        Creates an external stimulus event that affects part of the swarm at a given timestep.

        Params:
            - timestep (int): the timestep at which the stimulus is presented and affects the swarm
            - percentage (float, range: 0-100): how many percent of the swarm is directly affected by the event
            - angle (int, range: 1-359): how much the orientation of the affected particles is changed in a counterclockwise manner
            - eventEffect (EnumEventEffect): how the orientations should be affected
            - distributionType (EnumDistributionType) [optional]: how the directly affected particles are distributed, i.e. if the event occurs globally or locally
            - areas ([(centerXCoordinate, centerYCoordinate, radius)]) [optional]: list of areas in which the event takes effect. Should be specified if the distributionType is not GLOBAL and match the DistributionType
            - domainSize (tuple of floats) [optional]: the size of the domain
            - targetSwitchValue (switchTypeValue) [optional]: the value that every affected particle should select

        Returns:
            No return.
        """
        super().__init__(startTimestep=startTimestep, duration=duration, domainSize=domainSize, eventEffect=eventEffect, noisePercentage=noisePercentage, blockValues=blockValues)
        self.angle = angle
        self.distributionType = distributionType
        self.areas = areas
        self.numberOfAffected = numberOfAffected
        self.eventSelectionType = eventSelectionType

        match self.distributionType:
            case DistributionType.GLOBAL:
                self.radius = (domainSize[0] * domainSize[1]) /np.pi
            case DistributionType.LOCAL_SINGLE_SITE:
                self.radius = self.areas[0][2]

        if radius:
            self.radius = radius

        if self.distributionType != DistributionType.GLOBAL and self.areas == None:
            raise Exception("Local effects require the area to be specified")

        if self.numberOfAffected and self.radius:
            print("Radius is set. The full number of affected particles may not be reached.")

    def getShortPrintVersion(self):
        return f"t{self.startTimestep}d{self.duration}e{self.eventEffect.val}a{self.angle}dt{self.distributionType.value}a{self.areas}"

    def getParameterSummary(self):
        summary = super().getParameterSummary()
        summary["angle"] = self.angle
        summary["distributionType"] = self.distributionType.name
        summary["areas"] = self.areas
        summary["radius"] = self.radius
        summary["numberOfAffected"] = self.numberOfAffected
        if self.eventSelectionType:
            summary["eventSelectionType"] = self.eventSelectionType.value
        return summary

    def executeEvent(self, totalNumberOfParticles, positions, orientations, nsms, ks, speeds, dt=None):
        """
        Executes the event.

        Params:
            - totalNumberOfParticles (int): the total number of particles within the domain. Used to compute the number of affected particles
            - positions (array of tuples (x,y)): the position of every particle in the domain at the current timestep
            - orientations (array of tuples (u,v)): the orientation of every particle in the domain at the current timestep
            - switchValues (array of switchTypeValues): the switch type value of every particle in the domain at the current timestep
            - cells (array: [(minX, minY), (maxX, maxY)]): the cells within the cellbased domain
            - cellDims (tuple of floats): the dimensions of a cell (the same for all cells)
            - cellToParticleDistribution (dictionary {cellIdx: array of indices of all particles within the cell}): A dictionary containing the indices of all particles within each cell

        Returns:
            The orientations, switchTypeValues of all particles after the event has been executed as well as a list containing the indices of all affected particles.
        """
        posWithCenter = np.zeros((totalNumberOfParticles+1, 2))
        posWithCenter[:-1] = positions
        posWithCenter[-1] = self.getOriginPoint()
        rij2 = getDifferences(posWithCenter, self.domainSize)
        relevantDistances = rij2[-1][:-1] # only the comps to the origin and without the origin point
        candidates = (relevantDistances <= self.radius**2)
        affected = self.selectAffected(candidates, relevantDistances)

        match self.eventEffect:
            case EventEffect.ALIGN_TO_FIXED_ANGLE:
                orientations[affected] = computeUvCoordinates(self.angle)
            case EventEffect.ALIGN_TO_FIXED_ANGLE_NOISE:
                orientations[affected] = computeUvCoordinates(self.angle)
                orientations[affected] = self.applyNoiseDistribution(orientations[affected])
            case EventEffect.AWAY_FROM_ORIGIN:
                orientations[affected] = self.computeAwayFromOrigin(positions[affected])
            case EventEffect.RANDOM:
                orientations[affected] = self.__getRandomOrientations(np.count_nonzero(affected))
        orientations = normalizeOrientations(orientations)
        return orientations, nsms, ks, speeds, affected # external events do not directly impact the values


    def selectAffected(self, candidates, rij2):
        preselection = candidates # default case, we take all the candidates
        match self.eventSelectionType:
            case EventSelectionType.NEAREST_DISTANCE:
                indices = np.argsort(rij2)[:self.numberOfAffected]
                preselection = np.full(len(candidates), False)
                preselection[indices] = True
            case EventSelectionType.RANDOM:
                indices = np.argwhere(candidates).flatten()
                selectedIndices = np.random.choice(indices, self.numberOfAffected, replace=False)
                preselection = np.full(len(candidates), False)
                preselection[selectedIndices] = True
        return candidates & preselection

    def computeAwayFromOrigin(self, positions):
        """
        Computes the (u,v)-coordinates for the orientation after turning away from the point of origin.

        Params:
            - position ([X,Y]): the position of the current particle that should turn away from the point of origin

        Returns:
            [U,V]-coordinates representing the new orientation of the current particle.
        """
        angles = self.__computeAngleWithRegardToOrigin(positions)
        #angles = ServiceOrientations.normaliseAngles(angles)
        return computeUvCoordinatesForList(angles)

    def __computeAngleWithRegardToOrigin(self, positions):
        """
        Computes the angle between the position of the current particle and the point of origin of the event.

        Params:
            - position ([X,Y]): the position of the current particle that should turn towards the point of origin

        Returns:
            The angle in radians between the two points.
        """
        orientationFromOrigin = positions - self.getOriginPoint()
        anglesRadian = computeAnglesForOrientations(orientationFromOrigin)
        return anglesRadian

    def getOriginPoint(self):
        """
        Determines the point of origin of the event.

        Returns:
            The point of origin of the event in [X,Y]-coordinates.
        """
        match self.distributionType:
            case DistributionType.GLOBAL:
                origin = (self.domainSize[0]/2, self.domainSize[1]/2)
            case DistributionType.LOCAL_SINGLE_SITE:
                origin = self.areas[0][:2]
        return origin


    def __getRandomOrientations(self, numAffectedParticles):
        """
        Selects a random orientation.

        Returns:
            A random orientation in [U,V]-coordinates.
        """
        return normalizeOrientations(np.random.rand(numAffectedParticles, len(self.domainSize))-0.5)

# Vicsek model

In [8]:
class VicsekWithNeighbourSelection:

    def __init__(self, domainSize, radius, noise, numberOfParticles, k, neighbourSelectionMechanism,
                 speed=1, switchSummary=None, events=None, degreesOfVision=2*np.pi,
                 activationTimeDelays=[], isActivationTimeDelayRelevantForEvents=False):
        self.domainSize = np.asarray(domainSize)
        self.radius = radius
        self.noise = noise
        self.numberOfParticles = numberOfParticles
        self.k = k
        self.neighbourSelectionMechanism = neighbourSelectionMechanism
        self.speed = speed

        self.switchSummary = switchSummary

        self.minReplacementValue = -1
        self.maxReplacementValue = domainSize[0] * domainSize[1] + 1
        self.disorderPlaceholder = -1
        self.orderPlaceholder = -2

        self.events = events
        self.degreesOfVision = degreesOfVision
        self.activationTimeDelays = np.array(activationTimeDelays)
        self.isActivationTimeDelayRelevantForEvents = isActivationTimeDelayRelevantForEvents

    def getParameterSummary(self, asString=False):
        """
        Creates a summary of all the model parameters ready for use for conversion to JSON or strings.

        Parameters:
            - asString (bool, default False) [optional]: if the summary should be returned as a dictionary or as a single string

        Returns:
            A dictionary or a single string containing all model parameters.
        """
        summary = {"n": self.numberOfParticles,
                    "k": self.k,
                    "noise": self.noise,
                    "radius": self.radius,
                    "neighbourSelectionMechanism": self.neighbourSelectionMechanism.name,
                    "domainSize": self.domainSize.tolist(),
                    "tmax": self.tmax,
                    "dt": self.dt,
                    "degreesOfVision": self.degreesOfVision,
                    "activationTimeDelays": self.activationTimeDelays.tolist(),
                    "isActivationTimeDelayRelevantForEvents": self.isActivationTimeDelayRelevantForEvents
                    }
        if self.switchSummary != None:
            summary["switchSummary"] = self.switchSummary.getParameterSummary()

        if self.events:
            eventsSummary = []
            for event in self.events:
                eventsSummary.append(event.getParameterSummary())
            summary["events"] = eventsSummary

        if asString:
            strPrep = [tup[0] + ": " + tup[1] for tup in summary.values()]
            return ", ".join(strPrep)
        return summary


    def __initializeState(self):
        """
        Initialises the state of the swarm at the start of the simulation.

        Params:
            None

        Returns:
            Arrays of positions, orientations and switchTypeValues containing values for every individual within the system
        """
        positions = self.domainSize*np.random.rand(self.numberOfParticles,len(self.domainSize))
        orientations = normalizeOrientations(np.random.rand(self.numberOfParticles, len(self.domainSize))-0.5)

        return positions, orientations

    def initialiseSwitchingValues(self):
        nsms = np.full(self.numberOfParticles, self.orderPlaceholder)
        nsmsDf = pd.DataFrame(nsms, columns=["nsms"])
        nsmsDf["nsms"] = nsmsDf["nsms"].replace(self.orderPlaceholder, self.neighbourSelectionMechanism)
        nsms = np.array(nsmsDf["nsms"])

        ks = np.array(self.numberOfParticles * [self.k])
        speeds = np.full(self.numberOfParticles, self.speed)

        activationTimeDelays = np.ones(self.numberOfParticles)

        if self.switchSummary != None:
            info = self.switchSummary.getBySwitchType(SwitchType.NEIGHBOUR_SELECTION_MECHANISM)
            if info != None and info.initialValues != None:
                nsms = info.initialValues
            info = self.switchSummary.getBySwitchType(SwitchType.K)
            if info != None and info.initialValues != None:
                ks = info.initialValues
            info = self.switchSummary.getBySwitchType(SwitchType.SPEED)
            if info != None and info.initialValues != None:
                speeds = info.initialValues
            info = self.switchSummary.getBySwitchType(SwitchType.ACTIVATION_TIME_DELAY)
            if info != None and info.initialValues != None:
                activationTimeDelays = info.initialValues

        if len(self.activationTimeDelays) > 0:
            activationTimeDelays = self.activationTimeDelays

        return nsms, ks, speeds, activationTimeDelays

    def generateNoise(self):
        """
        Generates some noise based on the noise amplitude set at creation.

        Params:
            None

        Returns:
            An array with the noise to be added to each individual
        """
        return np.random.normal(scale=self.noise, size=(self.numberOfParticles, len(self.domainSize)))

    def calculateMeanOrientations(self, orientations, neighbours):
        """
        Computes the average of the orientations of all selected neighbours for every individual.

        Params:
            - orientations (array of floats): the orientation of every individual at the current timestep
            - neighbours (array of arrays of booleans): the identity of every neighbour of every individual

        Returns:
            An array of floats containing the new, normalised orientations of every individual
        """
        summedOrientations = np.sum(neighbours[:,:,np.newaxis]*orientations[np.newaxis,:,:],axis=1)
        return normalizeOrientations(summedOrientations)

    def __getPickedNeighbourIndices(self, sortedIndices, kMaxPresent, ks):
        if self.switchSummary != None and self.switchSummary.isActive(SwitchType.K):
            kSwitch = self.switchSummary.getBySwitchType(SwitchType.K)
            kMin, kMax = self.switchSummary.getMinMaxValuesForKSwitchIfPresent()

            candidatesOrder = sortedIndices[:, :kSwitch.orderSwitchValue]
            if kSwitch.orderSwitchValue < kMax and kMax == kMaxPresent:
                candidatesOrder = padArray(candidatesOrder, self.numberOfParticles, kMin=kMin, kMax=kMax)

            candidatesDisorder = sortedIndices[:, :kSwitch.disorderSwitchValue]
            if kSwitch.disorderSwitchValue < kMax and kMax == kMaxPresent:
                candidatesDisorder = padArray(candidatesDisorder, self.numberOfParticles, kMin=kMin, kMax=kMax)

            candidates = np.where(((ks == kSwitch.orderSwitchValue)[:, None]), candidatesOrder, candidatesDisorder)
        else:
            candidates = sortedIndices[:, :self.k]
        return candidates

    def __checkPickedForNeighbourhood(self, posDiff, candidates, kMaxPresent):
        if len(candidates) == 0 or len(candidates[0]) == 0:
            return candidates
        # exclude any individuals that are not neighbours
        pickedDistances = np.take_along_axis(posDiff, candidates, axis=1)
        minusOnes = np.full((self.numberOfParticles,kMaxPresent), -1)
        picked = np.where(((candidates == -1) | (pickedDistances > self.radius**2)), minusOnes, candidates)
        return picked

    def __createBooleanMaskFromPickedNeighbourIndices(self, picked):
        if len(picked) == 0 or len(picked[0]) == 0:
            return np.full((self.numberOfParticles, self.numberOfParticles), False)
        # create the boolean mask
        ns = np.full((self.numberOfParticles,self.numberOfParticles+1), False) # add extra dimension to catch indices that are not applicable
        pickedValues = np.full((self.numberOfParticles, self.k), True)
        np.put_along_axis(ns, picked, pickedValues, axis=1)
        ns = ns[:, :-1] # remove extra dimension to catch indices that are not applicable
        return ns

    def __getPickedNeighbours(self, posDiff, candidates, ks, isMin):
        """
        Determines which neighbours the individuals should considered based on a preexisting maskedArray, the neighbour selection mechanism and k.

        Params:
            - maskedArray (MaskedArray): masked array containing the values for consideration
            - posDiff (array of arrays of floats): the distance from every individual to all other individuals
            - ks (Dataframe): which value of k every individual observes (in column "val")
            - isMin (boolean) [optional, default=True]: whether to take the nearest or farthest neighbours

        Returns:
            An array of arrays of booleans representing the selected neighbours
        """

        kMaxPresent = np.max(ks)

        sortedIndices = candidates.argsort(axis=1)
        if isMin == False:
            sortedIndices = np.flip(sortedIndices, axis=1)

        picked = self.__getPickedNeighbourIndices(sortedIndices=sortedIndices, kMaxPresent=kMaxPresent, ks=ks)
        picked = self.__checkPickedForNeighbourhood(posDiff=posDiff, candidates=picked, kMaxPresent=kMaxPresent)
        mask = self.__createBooleanMaskFromPickedNeighbourIndices(picked)
        return mask

    def pickPositionNeighbours(self, positions, neighbours, ks, isMin=True):
        """
        Determines which neighbours the individuals should considered based on the neighbour selection mechanism and k.

        Params:
            - positions (array of floats): the position of every individual at the current timestep
            - neighbours (array of arrays of booleans): the identity of every neighbour of every
            - ks (Dataframe): which value of k every individual observes (in column "val")
            - isMin (boolean) [optional, default=True]: whether to take the nearest or farthest neighbours

        Returns:
            An array of arrays of booleans representing the selected neighbours
        """
        posDiff = getPositionDifferences(positions, self.domainSize)
        if isMin == True:
            fillValue = self.maxReplacementValue
        else:
            fillValue = self.minReplacementValue

        fillVals = np.full((self.numberOfParticles,self.numberOfParticles), fillValue)
        candidates = np.where((neighbours), posDiff, fillVals)

        # select the best candidates
        return self.__getPickedNeighbours(posDiff=posDiff, candidates=candidates, ks=ks, isMin=isMin)

    def pickOrientationNeighbours(self, positions, orientations, neighbours, ks, isMin=True):
        """
        Determines which neighbours the individuals should considered based on the neighbour selection mechanism and k.

        Params:
            - positions (array of floats): the position of every individual at the current timestep
            - orientations (array of floats): the orientation of every individual at the current timestep
            - neighbours (array of arrays of booleans): the identity of every neighbour of every individual
            - ks (Dataframe): which value of k every individual observes (in column "val")
            - isMin (boolean) [optional, default=True]: whether to take the least orientionally different or most orientationally different neighbours

        Returns:
            An array of arrays of booleans representing the selected neighbours
        """
        posDiff = getPositionDifferences(positions, self.domainSize)
        orientDiff = getOrientationDifferences(orientations, self.domainSize)

        if isMin == True:
            fillValue = self.maxReplacementValue
        else:
            fillValue = self.minReplacementValue

        fillVals = np.full((self.numberOfParticles,self.numberOfParticles), fillValue)
        candidates = np.where((neighbours), orientDiff, fillVals)

        # select the best candidates
        return self.__getPickedNeighbours(posDiff=posDiff, candidates=candidates, ks=ks, isMin=isMin)

    def pickRandomNeighbours(self, positions, neighbours, ks):
        np.fill_diagonal(neighbours, False)
        posDiff = getPositionDifferences(positions, self.domainSize)
        kMaxPresent = np.max(ks)

        indices = getIndicesForTrueValuesWithPadding(neighbours, self.numberOfParticles, np.min(ks), kMaxPresent)
        rng = np.random.default_rng()
        rng.shuffle(indices, axis=1)
        indicesOrdered = [np.arange(len(indices[0]))] * len(indices)
        sortedIndices = np.flip(np.sort(np.where((indices == -1), indices, indicesOrdered)))
        originalSortedIndices = np.take_along_axis(indices, sortedIndices, axis=1)
        candidateIndices = np.where((sortedIndices == -1), sortedIndices, originalSortedIndices)
        if self.switchSummary != None and self.switchSummary.isActive(SwitchType.K):
            kMin, kMax = self.switchSummary.getMinMaxValuesForKSwitchIfPresent()
            if len(candidateIndices[0]) < kMax:
                candidateIndices = padArray(candidateIndices, self.numberOfParticles, kMin, kMax)
        elif kMaxPresent < self.k:
            candidateIndices = padArray(candidateIndices, self.numberOfParticles, kMaxPresent, self.k)
        picked = self.__getPickedNeighbourIndices(sortedIndices=candidateIndices, kMaxPresent=kMaxPresent, ks=ks)
        picked = self.__checkPickedForNeighbourhood(posDiff=posDiff, candidates=picked, kMaxPresent=kMaxPresent)
        selection = self.__createBooleanMaskFromPickedNeighbourIndices(picked)
        np.fill_diagonal(selection, True)
        return selection

    def getPickedNeighboursForNeighbourSelectionMechanism(self, neighbourSelectionMechanism, positions, orientations, neighbours, ks):
        match neighbourSelectionMechanism:
            case NeighbourSelectionMechanism.NEAREST:
                pickedNeighbours = self.pickPositionNeighbours(positions, neighbours, ks, isMin=True)
            case NeighbourSelectionMechanism.FARTHEST:
                pickedNeighbours = self.pickPositionNeighbours(positions, neighbours, ks, isMin=False)
            case NeighbourSelectionMechanism.LEAST_ORIENTATION_DIFFERENCE:
                pickedNeighbours = self.pickOrientationNeighbours(positions, orientations, neighbours, ks, isMin=True)
            case NeighbourSelectionMechanism.HIGHEST_ORIENTATION_DIFFERENCE:
                pickedNeighbours = self.pickOrientationNeighbours(positions, orientations, neighbours, ks, isMin=False)
            case NeighbourSelectionMechanism.RANDOM:
                pickedNeighbours = self.pickRandomNeighbours(positions, neighbours, ks)
            case NeighbourSelectionMechanism.ALL:
                pickedNeighbours = neighbours
        return pickedNeighbours

    def computeNewOrientations(self, neighbours, positions, orientations, nsms, ks, activationTimeDelays):
        """
        Computes the new orientation of every individual based on the neighbour selection mechanism, k and Vicsek-like
        averaging.

        Params:
            - neighbours (array of arrays of booleans): the identity of every neighbour of every individual
            - positions (array of floats): the position of every individual at the current timestep
            - orientations (array of floats): the orientation of every individual at the current timestep
            - switchTypeValues (array of ints): the chosen value for every individual at the current timestep

        Returns:
            An array of floats representing the orientations of all individuals after the current timestep
        """

        if self.switchSummary != None and self.switchSummary.isActive(SwitchType.K):
            ks = ks
        else:
            ks = np.array(self.numberOfParticles * [self.k])

        if self.switchSummary != None and self.switchSummary.isActive(SwitchType.NEIGHBOUR_SELECTION_MECHANISM):
            nsmsSwitch = self.switchSummary.getBySwitchType(SwitchType.NEIGHBOUR_SELECTION_MECHANISM)
            neighboursOrder = self.getPickedNeighboursForNeighbourSelectionMechanism(neighbourSelectionMechanism=nsmsSwitch.orderSwitchValue,
                                                                                     positions=positions,
                                                                                     orientations=orientations,
                                                                                     neighbours=neighbours,
                                                                                     ks=ks)
            neighboursDisorder = self.getPickedNeighboursForNeighbourSelectionMechanism(neighbourSelectionMechanism=nsmsSwitch.disorderSwitchValue,
                                                                                    positions=positions,
                                                                                    orientations=orientations,
                                                                                    neighbours=neighbours,
                                                                                    ks=ks)
            pickedNeighbours = np.where(((nsms == nsmsSwitch.orderSwitchValue)), neighboursDisorder, neighboursOrder)

        else:
            pickedNeighbours = self.getPickedNeighboursForNeighbourSelectionMechanism(neighbourSelectionMechanism=self.neighbourSelectionMechanism,
                                                                                      positions=positions,
                                                                                      orientations=orientations,
                                                                                      neighbours=neighbours,
                                                                                      ks=ks)

        np.fill_diagonal(pickedNeighbours, True)

        newOrientations = self.calculateMeanOrientations(orientations, pickedNeighbours)
        newOrientations = normalizeOrientations(orientations+self.generateNoise())

        orientations = revertTimeDelayedChanges(self.t, orientations, newOrientations, activationTimeDelays)

        return orientations

    def getDecisions(self, t, localOrders, previousLocalOrders, switchType, switchTypeValues, blocked):
        """
        Computes whether the individual chooses to use option A or option B as its value based on the local order,
        the average previous local order and a threshold.

        Params:
            - t (int): the current timestep
            - localOrders (array of floats): the local order from the point of view of every individual
            - previousLocalOrders (array of arrays of floats): the local order for every individual at every previous time step
            - switchTypeValues (array of ints): the current switchTypeValue selection for every individual

        Returns:
            A pandas Dataframe containing the switchTypeValues for every individual
        """
        switchInfo = self.switchSummary.getBySwitchType(switchType)
        switchDifferenceThresholdLower = switchInfo.lowerThreshold
        switchDifferenceThresholdUpper = switchInfo.upperThreshold

        prev = np.average(previousLocalOrders[max(t-switchInfo.numberPreviousStepsForThreshold, 0):t+1], axis=0)

        oldWithNewOrderValues = np.where(((localOrders >= switchDifferenceThresholdUpper) & (prev <= switchDifferenceThresholdUpper) & (blocked != True)), np.full(len(switchTypeValues), switchInfo.orderSwitchValue), switchTypeValues)
        updatedSwitchValues = np.where(((localOrders <= switchDifferenceThresholdLower) & (prev >= switchDifferenceThresholdLower) & (blocked != True)), np.full(len(switchTypeValues), switchInfo.disorderSwitchValue), oldWithNewOrderValues)
        return updatedSwitchValues

    def appendSwitchValues(self, nsms, ks, speeds, activationTimeDelays):
        if self.switchSummary == None:
            return
        if self.switchSummary.isActive(SwitchType.NEIGHBOUR_SELECTION_MECHANISM):
            self.switchTypeValuesHistory['nsms'].append(nsms)
        if self.switchSummary.isActive(SwitchType.K):
            self.switchTypeValuesHistory['ks'].append(ks)
        if self.switchSummary.isActive(SwitchType.SPEED):
            self.switchTypeValuesHistory['speeds'].append(speeds)
        if self.switchSummary.isActive(SwitchType.ACTIVATION_TIME_DELAY):
            self.switchTypeValuesHistory['activationTimeDelays'].append(activationTimeDelays)

    def prepareSimulation(self, initialState, dt, tmax):
         # Preparations and setting of parameters if they are not passed to the method

        if any(ele is None for ele in initialState):
            positions, orientations = self.__initializeState()
        else:
            positions, orientations = initialState

        nsms, ks, speeds, activationTimeDelays = self.initialiseSwitchingValues()

        #print(f"t=pre, order={ServiceMetric.computeGlobalOrder(orientations)}")

        if dt is None and tmax is not None:
            dt = 1

        if tmax is None:
            tmax = (10**3)*dt
            dt = np.average(10**(-2)*(np.max(self.domainSize)/speeds))

        self.tmax = tmax
        self.dt = dt

        # Initialisations for the loop and the return variables
        self.numIntervals=int(tmax/dt+1)

        self.localOrdersHistory = []
        self.positionsHistory = np.zeros((self.numIntervals,self.numberOfParticles,len(self.domainSize)))
        self.orientationsHistory = np.zeros((self.numIntervals,self.numberOfParticles,len(self.domainSize)))
        self.switchTypeValuesHistory = {'nsms': [], 'ks': [], 'speeds': [], 'activationTimeDelays': []}

        self.positionsHistory[0,:,:]=positions
        self.orientationsHistory[0,:,:]=orientations
        self.appendSwitchValues(nsms, ks, speeds, activationTimeDelays)

        return positions, orientations, nsms, ks, speeds, activationTimeDelays

    def handleEvents(self, t, positions, orientations, nsms, ks, speeds, activationTimeDelays):
        blocked = np.full(self.numberOfParticles, False)
        if self.events != None:
                for event in self.events:
                    orientations, nsms, ks, speeds, blocked = event.check(self.numberOfParticles, t, positions, orientations, nsms, ks, speeds, self.dt, activationTimeDelays, self.isActivationTimeDelayRelevantForEvents)
        return orientations, nsms, ks, speeds, blocked

    def simulate(self, initialState=(None, None, None), dt=None, tmax=None):
        """
        Runs the simulation experiment.
        First the parameters are computed if they are not passed.
        Then the positions, orientations and colours are computed for each particle at each time step.

        Params:
            - initialState (tuple of arrays) [optional]: A tuple containing the initial positions of all particles, their initial orientations and their initial switch type values
            - dt (int) [optional]: time step
            - tmax (int) [optional]: the total number of time steps of the experiment

        Returns:
            times, positionsHistory, orientationsHistory, coloursHistory, switchTypeValueHistory. All of them as ordered arrays so that they can be matched by index matching
        """

        positions, orientations, nsms, ks, speeds, activationTimeDelays = self.prepareSimulation(initialState=initialState, dt=dt, tmax=tmax)
        for t in range(self.numIntervals):
            self.t = t
            #if t % 5000 == 0:
                #print(f"t={t}/{self.tmax}")

            orientations, nsms, ks, speeds, blocked = self.handleEvents(t, positions, orientations, nsms, ks, speeds, activationTimeDelays)

            # all neighbours (including self)
            neighbours = getNeighboursWithLimitedVision(positions=positions, orientations=orientations, domainSize=self.domainSize,
                                                                            radius=self.radius, degreesOfVision=self.degreesOfVision)

            if self.switchSummary != None:
                localOrders = computeLocalOrders(orientations, neighbours)
                self.localOrdersHistory.append(localOrders)

                if SwitchType.NEIGHBOUR_SELECTION_MECHANISM in self.switchSummary.switches.keys():
                    nsms = self.getDecisions(t, localOrders, self.localOrdersHistory, SwitchType.NEIGHBOUR_SELECTION_MECHANISM, nsms, blocked)
                if SwitchType.K in self.switchSummary.switches.keys():
                    ks = self.getDecisions(t, localOrders, self.localOrdersHistory, SwitchType.K, ks, blocked)
                if SwitchType.SPEED in self.switchSummary.switches.keys():
                    speeds = self.getDecisions(t, localOrders, self.localOrdersHistory, SwitchType.SPEED, speeds, blocked)
                if SwitchType.ACTIVATION_TIME_DELAY in self.switchSummary.switches.keys():
                    activationTimeDelays = self.getDecisions(t, localOrders, self.localOrdersHistory, SwitchType.ACTIVATION_TIME_DELAY, activationTimeDelays, blocked)

            orientations = self.computeNewOrientations(neighbours, positions, orientations, nsms, ks, activationTimeDelays)

            positions += self.dt*(orientations.T * speeds).T
            positions += -self.domainSize*np.floor(positions/self.domainSize)

            self.positionsHistory[t,:,:]=positions
            self.orientationsHistory[t,:,:]=orientations
            self.appendSwitchValues(nsms, ks, speeds, activationTimeDelays)

            # if t % 500 == 0:
            #     print(f"t={t}, order={ServiceMetric.computeGlobalOrder(orientations)}")

        return (self.dt*np.arange(self.numIntervals), self.positionsHistory, self.orientationsHistory), self.switchTypeValuesHistory


# Example

In [9]:
domainSize = (22.36, 22.36)
domainSize = (25, 25)
#domainSize = (50, 50)
noisePercentage = 1
noise = getNoiseAmplitudeValueForPercentage(noisePercentage)
#noise = 0
n = 10
speed = 1

threshold = 0.1
numberOfPreviousSteps = 100

# TODO: check why N-F always returns to order -> only with high radius

radius = 100
k = 1
nsm = NeighbourSelectionMechanism.NEAREST

infoNsm = SwitchInformation(switchType=SwitchType.NEIGHBOUR_SELECTION_MECHANISM,
                            values=(NeighbourSelectionMechanism.FARTHEST, NeighbourSelectionMechanism.NEAREST),
                            thresholds=[threshold],
                            numberPreviousStepsForThreshold=numberOfPreviousSteps
                            )

infoK = SwitchInformation(switchType=SwitchType.K,
                        values=(5, 1),
                        thresholds=[threshold],
                        numberPreviousStepsForThreshold=numberOfPreviousSteps
                        )

infoSpeed = SwitchInformation(switchType=SwitchType.SPEED,
                        values=(1, 0.1),
                        thresholds=[threshold],
                        numberPreviousStepsForThreshold=numberOfPreviousSteps
                        )

switchSummary = SwitchSummary([infoK])

"""
switchType = SwitchType.NEIGHBOUR_SELECTION_MECHANISM
switchValues = (NeighbourSelectionMechanism.FARTHEST,NeighbourSelectionMechanism.NEAREST)
"""

"""
switchType = SwitchType.K
switchValues = (5,1)
"""
"""
switchType = SwitchType.SPEED
switchValues = (0.1, 1)
"""

tmax = 5000

threshold = [0.1]

event = ExternalStimulusOrientationChangeEvent(startTimestep=1000,
                                               duration=1000,
                                               domainSize=domainSize,
                                               eventEffect=EventEffect.ALIGN_TO_FIXED_ANGLE,
                                               distributionType=DistributionType.LOCAL_SINGLE_SITE,
                                               areas=[[domainSize[0]/2, domainSize[1]/2, radius]],
                                               angle=np.pi,
                                               noisePercentage=1,
                                               radius=radius,
                                               numberOfAffected=1,
                                               eventSelectionType=EventSelectionType.RANDOM
                                               )


initialState = createOrderedInitialDistributionEquidistancedIndividual(None, domainSize, n, angleX=0.5, angleY=0.5)
simulator = VicsekWithNeighbourSelection(domainSize=domainSize,
                                         radius=radius,
                                         noise=noise,
                                         numberOfParticles=n,
                                         k=k,
                                         neighbourSelectionMechanism=nsm,
                                         speed=speed,
                                         switchSummary=switchSummary,
                                         degreesOfVision=np.pi*2,
                                         events=[event])
simulationData, switchTypeValues = simulator.simulate(initialState=initialState, tmax=tmax)
#simulationData, switchTypeValues = simulator.simulate(tmax=tmax)

saveModel(simulationData=simulationData, path="test.json",
                            modelParams=simulator.getParameterSummary())


print("done")

Radius is set. The full number of affected particles may not be reached.
done


In [16]:
def getBaseValues():
  tmax = 5000
  noisePercentage = 1
  noise = getNoiseAmplitudeValueForPercentage(noisePercentage)
  speed = 1
  threshold = 0.1
  density = 0.05
  radius = 20
  domainSize = (50, 50)
  n = getNumberOfParticlesForConstantDensity(density, domainSize)
  eventEffect = EventEffect.ALIGN_TO_FIXED_ANGLE
  targetOrder = 1
  #initialState = createOrderedInitialDistributionEquidistanced(domainSize, n)
  return tmax, noise, speed, density, radius, domainSize, n, eventEffect, targetOrder

def getFinalOrderForSwitching(tmax, nsm, k, switchSummary, density, radius, noise, speed, domainSize, n, eventEffect, targetOrder):
  results = []
  numIts = 10
  for i in range(numIts):
    print(f"{i}/{numIts}")
    event = ExternalStimulusOrientationChangeEvent(startTimestep=1000,
                                                  duration=1000,
                                                  domainSize=domainSize,
                                                  eventEffect=eventEffect,
                                                  distributionType=DistributionType.LOCAL_SINGLE_SITE,
                                                  areas=[[domainSize[0]/2, domainSize[1]/2, radius]],
                                                  radius=radius,
                                                  eventSelectionType=EventSelectionType.RANDOM,
                                                  angle=np.pi,
                                                  )
    simulator = VicsekWithNeighbourSelection(domainSize=domainSize,
                                            radius=radius,
                                            noise=noise,
                                            numberOfParticles=n,
                                            neighbourSelectionMechanism=nsm,
                                            k=k,
                                            speed=speed,
                                            switchSummary=switchSummary,
                                            events=[event])
    simulationData, switchTypeValues = simulator.simulate(tmax=tmax)
    times, positions, orientations = simulationData
    orders = [computeGlobalOrder(orients) for orients in orientations[-1000:]]
    results.append(np.average(orders))
  if targetOrder > 0:
    return np.average(results)
  else:
    return 1-np.average(results)

def getFinalOrderForKSwitching(threshold, previousSteps):
  previousSteps = int(previousSteps)
  nsm = NeighbourSelectionMechanism.NEAREST
  k = 1
  tmax, noise, speed, density, findClustersWithRadius, domainSize, n, eventEffect, targetOrder = getBaseValues()
  switchInfo = SwitchInformation(SwitchType.K,
                                 values=(5,1),
                                 thresholds=[threshold],
                                 numberPreviousStepsForThreshold=previousSteps)
  switchSummary = SwitchSummary([switchInfo])
  return getFinalOrderForSwitching(tmax, nsm, k, switchSummary, density, radius, noise, speed, domainSize, n, eventEffect, targetOrder)



In [11]:
# hyperparameters: density, radius, noise, speed, threshold, numberOfPreviousSteps
getFinalOrderForKSwitching(0.1, 100)

0/10
1/10
2/10
3/10
4/10
5/10
6/10
7/10
8/10
9/10


0.22961144188954252

In [12]:
pbounds = {'threshold': (0, 1), 'previousSteps': (1, 5000)}

In [17]:
optimizer = BayesianOptimization(
    f=getFinalOrderForKSwitching,
    pbounds=pbounds,
    verbose=2,
    random_state=1
)

In [None]:
optimizer.maximize(
    init_points=2,
    n_iter=3
)

|   iter    |  target   | previo... | threshold |
-------------------------------------------------
0/10
1/10


In [15]:
print(optimizer.max)

{'target': 0.277839892100003, 'params': {'previousSteps': 747.3842591554741, 'threshold': 0.6288117640027626}}
