In [1]:
import numpy as np

In [2]:
gen = np.random.default_rng(seed=0)
gen.uniform()

0.6369616873214543

In [3]:
def generate_new_direction(prev_dir, curr_arm, directional_inertia, rng):
    
    # If not at an edge, use directional_inertia to determine if should keep or switch direction
    if curr_arm % 5:
        return prev_dir if rng.uniform() < directional_inertia else -prev_dir
    
    # If at an edge, immediately return opposite direction
    return -prev_dir
        

In [4]:
def generate_new_arm(curr_arm, new_dir, neighbor_preference, independent_arm_bias, rng):
    
    # Should agent transition to a neighbor arm?
    if rng.uniform() < neighbor_preference:
        return int(curr_arm + new_dir)
    # But how do we take into account the fact that curr_arm=4 and new_dir=+1 FORCES a neighbor transition?
    elif not ((curr_arm+new_dir)%5):    
        return int(curr_arm + new_dir)
    # If not, use independent arm bias to choose which arm to go to

    all_arms = np.arange(6)

    # Identify the possible arms
    available_arms = all_arms[curr_arm+2:] if new_dir > 0 else all_arms[:curr_arm-1]
    available_arm_probs = independent_arm_bias[available_arms]
    normalized_available_arm_probs = np.divide(available_arm_probs, np.sum(available_arm_probs))
    #print('{0}, {1}'.format(curr_arm, new_dir))
    # return the new_arm based off of normalized independent_arm_biases
    return rng.choice(available_arms, p=normalized_available_arm_probs)

In [5]:
def generate_walk(walk_len, directional_inertia, neighbor_preference, independent_arm_bias, seed=0):
    
    gen = np.random.default_rng(seed=seed)
    all_arms = np.arange(6)
    
    prev_arm = 4
    curr_arm = 3
    
    walk = [prev_arm, curr_arm]
    
    for _ in np.arange(walk_len-2):
        #print(prev_arm, curr_arm, )
        # current direction is +- 1, depending on whether curr_arm >= prev_arm
        # We can assume curr_arm != prev_arm
        curr_dir = ((curr_arm >= prev_arm) - 0.5) * 2
    
        new_dir = generate_new_direction(curr_dir, curr_arm, directional_inertia, gen)
        new_arm = generate_new_arm(curr_arm, new_dir, neighbor_preference, independent_arm_bias, gen)
        
        walk.append(new_arm)
        
        prev_arm = curr_arm
        curr_arm = new_arm
        
    return walk

In [6]:
def empirical_indep_arm_bias(walk):
    
    arm_idx = np.arange(6)
    walk_len = len(walk)
    arm_biases = [np.count_nonzero(walk == idx) / walk_len for idx in arm_idx]
    
    return arm_biases

In [7]:
# Independent arm bias - array of probabilities. One for each arm - probability of going there
# irrespective of current state

INDEPENDENT_ARM_BIAS = np.random.randint(30, size=6)
INDEPENDENT_ARM_BIAS = np.divide(INDEPENDENT_ARM_BIAS, np.sum(INDEPENDENT_ARM_BIAS))
INDEPENDENT_ARM_BIAS

array([0.18181818, 0.20909091, 0.2       , 0.08181818, 0.21818182,
       0.10909091])

In [8]:
# Directional inertia. How likely is agent to continue going direction defined by previous transition?

DIRECTIONAL_INERTIA = 0.68
DIRECTIONAL_INERTIA

0.68

In [9]:
# Neighbor bias. Preference for animal to choose an adjacent arm over a non-adjacent arm

NEIGHBOR_PREFERENCE = 0.7
NEIGHBOR_PREFERENCE

0.7

In [10]:
WALK_LENGTH = 100000
WALK_LENGTH

100000

In [18]:
hstack = INDEPENDENT_ARM_BIAS
for seed in np.arange(10):
    walk_1000000 = generate_walk(WALK_LENGTH, DIRECTIONAL_INERTIA, NEIGHBOR_PREFERENCE, INDEPENDENT_ARM_BIAS, seed=seed)
    hstack = np.vstack([empirical_indep_arm_bias(walk_1000000), hstack])
hstack

array([[0.15022   , 0.19717   , 0.15997   , 0.14579   , 0.20125   ,
        0.1456    ],
       [0.15052   , 0.19761   , 0.16181   , 0.14522   , 0.19898   ,
        0.14586   ],
       [0.15166   , 0.1986    , 0.16053   , 0.14375   , 0.19964   ,
        0.14582   ],
       [0.1482    , 0.19286   , 0.15976   , 0.14701   , 0.20427   ,
        0.1479    ],
       [0.15021   , 0.19535   , 0.16076   , 0.14646   , 0.20186   ,
        0.14536   ],
       [0.14905   , 0.19491   , 0.1603    , 0.14502   , 0.20297   ,
        0.14775   ],
       [0.1524    , 0.19854   , 0.16218   , 0.14504   , 0.19792   ,
        0.14392   ],
       [0.15184   , 0.19586   , 0.16193   , 0.14515   , 0.19982   ,
        0.1454    ],
       [0.15163   , 0.19731   , 0.16106   , 0.14497   , 0.19917   ,
        0.14586   ],
       [0.15089   , 0.1967    , 0.16035   , 0.14542   , 0.2013    ,
        0.14534   ],
       [0.18181818, 0.20909091, 0.2       , 0.08181818, 0.21818182,
        0.10909091]])

In [12]:
# Start with directional inertia
# probabilities of 1 away, 2 away etc.
# just optimize - find parameters that give rise to the statistics we want

In [13]:
len(walk_1000000)

100000

In [14]:
walk_1000000[:10]

[4, 3, 4, 5, 3, 5, 4, 3, 1, 2]

In [15]:
print(np.hstack([empirical_indep_arm_bias(walk_1000000), INDEPENDENT_ARM_BIAS]))

[0.15022    0.19717    0.15997    0.14579    0.20125    0.1456
 0.18181818 0.20909091 0.2        0.08181818 0.21818182 0.10909091]


In [16]:
INDEPENDENT_ARM_BIAS

array([0.18181818, 0.20909091, 0.2       , 0.08181818, 0.21818182,
       0.10909091])

In [17]:
# Is directional inertia computed including or excluding edge arms?
# Being at an edge arm forces one to change directions. This biases directional
# inertia to be lower if these transitions are included during its calculation

In [None]:
# just have a sweep policy and a random policy

In [58]:
curr_arm = 5

-1

In [26]:
def generate_random_arms(walk_length, seed=0):
    all_arms = np.arange(6)
    gen = np.random.default_rng(seed)
    curr_arm = gen.choice(all_arms)
    walk = [curr_arm]
    
    for step in np.arange(walk_length - 1):
        
        # don't allow repeat visit
        available_arms = np.delete(all_arms, curr_arm)
        curr_arm = gen.choice(available_arms)
        
        walk.append(curr_arm)
    
    return walk

In [69]:
def generate_sweeping_arms(walk_length, seed=0):
    all_arms = np.arange(6)
    gen = np.random.default_rng(seed)
    curr_arm = gen.choice(all_arms)
    curr_dir = gen.choice([-1, 1]) if curr_arm % 5 else int(((curr_arm<2.5)-0.5)*2)
    walk = [curr_arm]
    
    for step in np.arange(walk_length - 1):
        
        curr_arm = curr_arm + curr_dir
        curr_dir = curr_dir if curr_arm % 5 else int(((curr_arm<2.5)-0.5)*2)
        
        walk.append(curr_arm)
    
    return walk

In [68]:
x = generate_sweeping_arms(10000, seed=1)
x[:20]

[2, 3, 4, 5, 4, 3, 2, 1, 0, 1, 2, 3, 4, 5, 4, 3, 2, 1, 0, 1]

In [27]:
x = generate_random_arms(10000)

In [70]:
empirical_indep_arm_bias(x)

[0.1, 0.2, 0.2, 0.2, 0.2, 0.1]

In [30]:
1 / 6

0.16666666666666666

In [86]:
multi_w_exploration_graph = np.array([[1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],
                              [1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],
                              [0,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],
                              [0,0,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],
                              [0,0,0,1,1,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],
                              [0,0,0,0,0,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],
                              [0,0,0,0,0,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],
                              [0,0,0,0,0,0,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],
                              [0,0,0,0,1,0,0,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],
                              [0,0,0,0,0,0,0,0,1,1,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],
                              [0,0,0,0,0,0,0,0,0,0,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],
                              [0,0,0,0,0,0,0,0,0,0,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],
                              [0,0,0,0,0,0,0,0,0,0,0,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],
                              [0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0],
                              [0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,0,0,0,1,0,0,0,0,0,0,0,0,0,0],
                              [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,0,0,0,0,0,0,0,0,0,0,0,0],
                              [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,0,0,0,0,0,0,0,0,0,0,0],
                              [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,0,0,0,0,0,0,0,0,0,0],
                              [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,0,0,0,0,0,0,0,0,0],
                              [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,0,0,0,1,0,0,0,0,0],
                              [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,0,0,0,0,0,0,0],
                              [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,0,0,0,0,0,0],
                              [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,0,0,0,0,0],
                              [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,0,0,0,0],
                              [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,0,0,0,1],
                              [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,0,0],
                              [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,0],
                              [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1],
                              [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1]])

In [93]:
def get_path(pred, start, goal):
    path = [goal]
    k = goal
    while pred[start, k] != -9999:
        path.append(pred[start, k])
        k = pred[start, k]
    return path[::-1]

In [110]:
from scipy.sparse.csgraph import shortest_path

In [111]:
dists, pred = shortest_path(multi_w_exploration_graph, directed=False, method='FW', return_predecessors=True)

In [112]:
get_path(pred, 0, 28)

[0, 1, 2, 3, 4, 8, 9, 13, 14, 18, 19, 23, 24, 28]

In [141]:
def walk_random_arms(walk, walk_length, seed):

    # Arms are identified using the state at their ends, which happen to be arm_num*5
    all_arms = np.arange(6)
    gen = np.random.default_rng(seed)
    curr_arm = gen.choice(all_arms)

    # Arm number converts to state by factor of 5
    curr_arm_end_state = curr_arm * 5
    walk.append(curr_arm_end_state)
    arms = [curr_arm]
    # Calculate graph properties for shortest path calculation between arms
    dists, pred = shortest_path(multi_w_exploration_graph, directed=False, method='FW', return_predecessors=True)

    while len(walk) < walk_length:

        # Choose next arm, but don't include current arm in possibilities to avoid repeat visits
        available_arms = np.delete(all_arms, curr_arm)
        new_arm = gen.choice(available_arms)
        new_arm_end_state = new_arm * 5
        print(new_arm)
        # Using the adjacency matrix, create a trajectory using a shortest path traversal from end of curr_arm to end of new_arm
        trajectory = get_path(pred, curr_arm_end_state, new_arm_end_state)[1:]

        # Update curr_arm now that new_arm has been used
        curr_arm = new_arm
        curr_arm_end_state = curr_arm * 5

        walk.extend(trajectory)
        arms.append(new_arm)
        
    # The final trajectory could have made the walk too long. Truncate walk to be the correct size
    walk = walk[:walk_length]

    return walk, arms

In [218]:
def get_action_between(start_state, end_state):

    actions = start_state["actions"]
    end_state_arr = np.eye(29)[:, end_state["id"]]

    # Retrieve the action id that would have led to this transition. indexing into actions and then "id" is redundant, but kept in case action ids
    # ever don't align with indexes in actions list
    action_id = actions[np.argwhere([np.array_equal(end_state_arr, action["transition"]) for action in actions])[0, 0]]["id"]

    return action_id

In [156]:
def walk_sweeping_arms(walk, walk_length, seed):

    all_arms = np.arange(6)
    gen = np.random.default_rng(seed)
    curr_arm = gen.choice(all_arms)
    curr_dir = gen.choice([-1, 1]) if curr_arm % 5 else int(((curr_arm<2.5)-0.5)*2)

    # Arm number converts to state by factor of 5
    curr_arm_end_state = curr_arm * 5
    walk.append(curr_arm_end_state)

    # Calculate graph properties for shortest path calculation between arms
    dists, pred = shortest_path(multi_w_exploration_graph, directed=False, method='FW', return_predecessors=True)

    while len(walk) < walk_length+1:

        # Choose next arm, but don't include current arm in possibilities to avoid repeat visits
        new_arm = curr_arm + curr_dir
        new_arm_end_state = new_arm * 5
        new_dir = curr_dir if new_arm % 5 else int(((new_arm<2.5)-0.5)*2)

        # Using the adjacency matrix, create a trajectory using a shortest path traversal from end of curr_arm to end of new_arm
        trajectory = [self.locations[loc] for loc in get_path(pred, curr_arm_end_state, new_arm_end_state)]

        # Update curr_arm now that new_arm has been used
        curr_arm = new_arm
        curr_arm_end_state = curr_arm * 5

        curr_dir = new_dir

        walk.extend(trajectory)

    # The final trajectory could have made the walk too long. Truncate walk to be the correct size
    walk = walk[:walk_length+1]

    # identify observations at each location in completed walk

    walk_observations = [self.get_observation(loc) for loc in walk]
    walk_actions = [self.get_action_between(walk[i], walk[i+1]) for i in np.arange(len(walk)-1)]
    walk = [[walk[i], walk_actions[i]] for i in np.arange(len(walk))]

    return walk

In [225]:
def get_action(new_location, walk, repeat_bias_factor=2):
    # Build policy from action probability of each action of provided location dictionary
    policy = np.array([action['probability'] for action in new_location['actions']])        
    # Add a bias for repeating previous action to walk in straight lines, only if (this is not the first step) and (the previous action was a move)
    policy[[] if len(walk) == 0 or new_location['id'] == walk[-1][0]['id'] else walk[-1][2]] *= repeat_bias_factor
    # And renormalise policy (note that for unavailable actions, the policy was 0 and remains 0, so in that case no renormalisation needed)
    policy = policy / sum(policy) if sum(policy) > 0 else policy
    # Select action in new state
    new_action = int(np.flatnonzero(np.cumsum(policy)>np.random.rand())[0])
    # Return the new action
    return new_action

In [226]:
start_location = {"id":3,"observation":3,"x":0.125,"y":0.500,"in_locations":[2,3,4],"in_degree":3,"out_locations":[2,3,4],"out_degree":3,"actions":
[{"id":0,"transition":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"probability":0},
 {"id":1,"transition":[0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"probability":0.5},
 {"id":2,"transition":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"probability":0},
 {"id":3,"transition":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"probability":0},
 {"id":4,"transition":[0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"probability":0.5}]}

end_location = {"id":4,"observation":4,"x":0.250,"y":0.500,"in_locations":[3,4,8],"in_degree":3,"out_locations":[3,4,8],"out_degree":3,"actions":
[{"id":0,"transition":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"probability":0},
 {"id":1,"transition":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"probability":0},
 {"id":2,"transition":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"probability":0},
 {"id":3,"transition":[0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"probability":0.5},
 {"id":4,"transition":[0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"probability":0.5}]}

In [227]:
get_action(end_location, [[start_location]], repeat_bias_factor=2)

KeyError: 0

In [222]:
get_action_between(start_location, end_location)

4

In [217]:
len([0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0])

28

In [209]:
walk

[25,
 26,
 27,
 28,
 24,
 23,
 22,
 21,
 20,
 21,
 22,
 23,
 19,
 18,
 17,
 16,
 15,
 16,
 17,
 18,
 14,
 13,
 12,
 11,
 10,
 11,
 12,
 13,
 9,
 8,
 7,
 6,
 5,
 6,
 7,
 8,
 4,
 3,
 2,
 1,
 0,
 1,
 2,
 3,
 4,
 8,
 7,
 6,
 5,
 6,
 7,
 8,
 9,
 13,
 12,
 11,
 10,
 11,
 12,
 13,
 14,
 18,
 17,
 16,
 15,
 16,
 17,
 18,
 19,
 23,
 22,
 21,
 20,
 21,
 22,
 23,
 24,
 28,
 27,
 26,
 25,
 26,
 27,
 28,
 24,
 23,
 22,
 21,
 20,
 21,
 22,
 23,
 19,
 18,
 17,
 16,
 15,
 16,
 17,
 18]