In [1]:
import math
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import random as rng

In [2]:
GRID_SIZE = 64
GRID_SIZE_2 = GRID_SIZE ** 2
MID = GRID_SIZE // 2
HMAP_TRIALS = 10
TRIALS = 100
TURNS = 81
ALPHA = 1.0
PHI = 1.0
EPS = 0.0000001
DRONES = 32
TASKS = 1024

In [None]:
def init_grids():
    drone_grid = np.zeros((GRID_SIZE, GRID_SIZE), dtype=float)
    drone_grid[MID][MID] = DRONES
    return drone_grid, np.ones((GRID_SIZE, GRID_SIZE), dtype=float)


def find_best_max(grid, cache=[]):
    best_x, best_y, best_max = 0, 0, grid[0][0]
    for x in range(GRID_SIZE):
        for y in range(GRID_SIZE):
            if (x, y) in cache:
                continue
            if grid[x][y] > best_max + EPS:
                best_x, best_y, best_max = x, y, grid[x][y]
            elif best_max - EPS <= grid[x][y] and grid[x][y] <= best_max + EPS:
                xy_dist = math.sqrt((x - MID - 0.5)**2 + (y - MID - 0.5)**2)
                best_dist = math.sqrt((best_x - MID - 0.5)**2 + (best_y - MID - 0.5)**2)
                if xy_dist < best_dist - EPS:
                    best_x, best_y, best_max = x, y, grid[x][y]
    return (best_x, best_y)


def find_random_cell(grid):
    normal, choice = grid.sum(), rng.uniform(0, 1)
    for x in range(GRID_SIZE):
        for y in range(GRID_SIZE):
            choice -= (grid[x][y] / normal)
            if choice <= EPS:
                return (x, y)
    raise Exception("No choice")


def get_cell_weight(grid, x, y, is_cylindrical, ordinal_weight, diagonal_weight, is_ordinal):
    if is_cylindrical:
        return grid[x % GRID_SIZE][y % GRID_SIZE] * (ordinal_weight if is_ordinal else diagonal_weight)
    if x < 0 or GRID_SIZE <= x or y < 0 or GRID_SIZE <= y:
        return 0.0
    if (x == 0 or x == GRID_SIZE - 1) and (y == 0 or y == GRID_SIZE - 1):
        normal = diagonal_weight + 2 * ordinal_weight
        weight = (ordinal_weight / normal) if is_ordinal else (diagonal_weight / normal)
        return grid[x][y] * weight
    if x == 0 or x == GRID_SIZE - 1 or y == 0 or y == GRID_SIZE - 1:
        normal = 2 * diagonal_weight + 3 * ordinal_weight
        weight = (ordinal_weight / normal) if is_ordinal else (diagonal_weight / normal)
        return grid[x][y] * weight
    return grid[x][y] * (ordinal_weight if is_ordinal else diagonal_weight)
        
    

def apply_update(grid, is_cylindrical, ordinal_weight, diagonal_weight):
    grid_t = np.zeros((GRID_SIZE, GRID_SIZE), dtype=float)
    for x in range(GRID_SIZE):
        for y in range(GRID_SIZE):
            grid_t[x][y] += get_cell_weight(grid, x, y - 1, is_cylindrical, ordinal_weight, diagonal_weight, True)
            grid_t[x][y] += get_cell_weight(grid, x + 1, y, is_cylindrical, ordinal_weight, diagonal_weight, True)
            grid_t[x][y] += get_cell_weight(grid, x, y + 1, is_cylindrical, ordinal_weight, diagonal_weight, True)
            grid_t[x][y] += get_cell_weight(grid, x - 1, y, is_cylindrical, ordinal_weight, diagonal_weight, True)
            grid_t[x][y] += get_cell_weight(grid, x - 1, y - 1, is_cylindrical, ordinal_weight, diagonal_weight, False)
            grid_t[x][y] += get_cell_weight(grid, x + 1, y - 1, is_cylindrical, ordinal_weight, diagonal_weight, False)
            grid_t[x][y] += get_cell_weight(grid, x + 1, y + 1, is_cylindrical, ordinal_weight, diagonal_weight, False)
            grid_t[x][y] += get_cell_weight(grid, x - 1, y + 1, is_cylindrical, ordinal_weight, diagonal_weight, False)
    return grid_t


def heatmap_experiment(is_cylindrical, ordinal_weight, diagonal_weight):
    drone_grid, task_grid = init_grids()
    reward = np.zeros(TURNS + 1, dtype=float)
    for turn in range(1, TURNS + 1):
        drone_grid = apply_update(drone_grid, is_cylindrical, ordinal_weight, diagonal_weight)
        for _ in range(DRONES):
            x, y = find_random_cell(drone_grid)
            task_grid[x][y] *= (1.0 - PHI)
        task_grid = apply_update(task_grid, is_cylindrical, ordinal_weight, diagonal_weight) 
        reward[turn] = (GRID_SIZE_2 - task_grid.sum()) / GRID_SIZE_2
    return reward

def experiment(is_cylindrical, ordinal_weight, diagonal_weight):
    final_reward = np.zeros(TURNS + 1, dtype=float)
    for trial in range(HMAP_TRIALS):
        final_reward += heatmap_experiment(is_cylindrical, ordinal_weight, diagonal_weight)
    final_reward /= HMAP_TRIALS
    return final_reward

In [None]:
neumann = experiment(False, 0.25, 0)
neumann_cyl = experiment(True, 0.25, 0)

moore = experiment(False, 0.125, 0.125)
moore_diff = experiment(False, 0.175, 0.075)
moore_cyl = experiment(True, 0.125, 0.125)
moore_diff_cyl = experiment(True, 0.175, 0.075)

In [3]:
def get_random_neighbor(x, y, use_alpha=False):
    if use_alpha and rng.uniform(0, 1) > ALPHA:
        return (x, y)
    x_prime, y_prime = x, y
    while x_prime == x and y_prime == y:
        angle = rng.uniform(0, 2 * math.pi)
        vx = x + 0.5 + math.cos(angle) * math.sqrt(2)
        vy = y + 0.5 + math.sin(angle) * math.sqrt(2)
        while vx < 0 or GRID_SIZE < vx:
            vx = abs(vx) if vx < 0 else vx
            vx = (2 * GRID_SIZE - vx) if GRID_SIZE < vx else vx
        while vy < 0 or GRID_SIZE < vy:
            vy = abs(vy) if vy < 0 else vy
            vy = (2 * GRID_SIZE - vy) if GRID_SIZE < vy else vy
        x_prime, y_prime = min(GRID_SIZE - 1, math.floor(vx + EPS)), min(GRID_SIZE - 1, math.floor(vy + EPS))
    return (x_prime, y_prime)


def get_best_neighbor(x, y, drone_map=None, task_map=None, no_colls=False):
    if task_map is None or len(task_map) == 0:
        return get_random_neighbor(x, y)
    neighbors = [(i, j) for j in range(max(y-1, 0), min(y+2, GRID_SIZE)) for i in range(max(x-1, 0), min(x+2, GRID_SIZE))]
    neighbors = list(filter(lambda loc: task_map[loc[0], loc[1]] == 1, neighbors))
    if no_colls and (not drone_map is None) and len(drone_map) > 0:
        neighbors_prime = list(filter(lambda loc: drone_map[loc[0], loc[1]] == 0, neighbors))
        if neighbors_prime is None or len(neighbors_prime) == 0:
            neighbors_prime = neighbors
        neighbors = neighbors_prime
    choice = None
    if neighbors is None or len(neighbors) == 0:
        choice = get_random_neighbor(x, y)
    else:
        choice = rng.choice(neighbors)
    if no_colls:
        drone_map[choice[0], choice[1]] = 1
    return choice


def sim_random_walk(random_placement=False, use_motion=False, be_selfish=False, be_oblivious=False, no_colls=False):
    drones = [(MID, MID) for _ in range(DRONES)]
    drone_map = np.zeros((GRID_SIZE, GRID_SIZE))
    drone_map[MID, MID] = 1
    tasks, num_tasks = None, None
    if random_placement:
        tasks = [(rng.randrange(GRID_SIZE), rng.randrange(GRID_SIZE)) for _ in range(TASKS)]
        num_tasks = TASKS
    else:
        tasks = [(i, j) for j in range(GRID_SIZE) for i in range(GRID_SIZE)]
        num_tasks = GRID_SIZE_2
    task_map = None
    if be_oblivious:
        task_map = np.ones((DRONES, GRID_SIZE, GRID_SIZE))
        for i, drone in enumerate(drones):
            task_map[i, drone[0], drone[1]] = 0
    else:
        task_map = np.ones((GRID_SIZE, GRID_SIZE))
        task_map[MID, MID] = 0
    reward = np.zeros(TURNS + 1, dtype=float)
    for turn in range(1, TURNS + 1):
        for drone in drones:
            drone_map[drone[0], drone[1]] = 0
        if be_selfish and be_oblivious:
            drones = [get_best_neighbor(drone[0], drone[1], drone_map=drone_map, task_map=task_map[i], no_colls=no_colls) for i, drone in enumerate(drones)]
        elif be_selfish:
            drones = [get_best_neighbor(drone[0], drone[1], drone_map=drone_map, task_map=task_map, no_colls=no_colls) for i, drone in enumerate(drones)]
        else:
            drones = [get_random_neighbor(drone[0], drone[1]) for drone in drones]
        for i, drone in enumerate(drones):
            drone_map[drone[0], drone[1]] = 1
            if be_oblivious:
                task_map[i, drone[0], drone[1]] = 0
            else:
                task_map[drone[0], drone[1]] = 0
        tasks_t = []
        for task in tasks:
            if drone_map[task[0], task[1]] == 1 and rng.uniform(0, 1) < PHI:
                continue
            if not use_motion:
                tasks_t.append(task)
                continue
            tasks_t.append(get_random_neighbor(task[0], task[1], use_alpha=True))
        tasks = tasks_t
        reward[turn] = (num_tasks - len(tasks)) / num_tasks
    return reward


def random_walk_experiment(random_placement=False, use_motion=False, be_selfish=False, be_oblivious=False, no_colls=False):
    final_reward = np.zeros(TURNS + 1, dtype=float)
    for trial in range(TRIALS):
        final_reward += sim_random_walk(random_placement=random_placement, use_motion=use_motion, be_selfish=be_selfish, be_oblivious=be_oblivious, no_colls=no_colls)
    final_reward /= TRIALS
    return final_reward

In [4]:
random_explore = random_walk_experiment()

In [5]:
random_explore_selfish_oblivious = random_walk_experiment(be_selfish=True, be_oblivious=True)

In [None]:
random_explore_selfish_oblivious_no_colls = random_walk_experiment(be_selfish=True, be_oblivious=True, no_colls=True)

In [None]:
random_explore_motion = random_walk_experiment(random_placement=True, use_motion=True, be_selfish=False, no_colls=False)

In [None]:
random_explore_selfish_motion = random_walk_experiment(random_placement=True, use_motion=True, be_selfish=True, no_colls=False)

In [None]:
random_explore_nocolls = random_walk_experiment(random_placement=False, use_motion=False, be_selfish=True, no_colls=True)

In [None]:
random_explore_nocolls_motion = random_walk_experiment(random_placement=True, use_motion=True, be_selfish=True, no_colls=True)

In [None]:
print(random_explore[-1], random_explore_selfish[-1], random_explore_nocolls[-1], random_explore_motion[-1], random_explore_selfish_motion[-1], random_explore_nocolls_motion[-1])

In [None]:
x_labels = ['rwalk', 'selfish', 'rwalk_moving', 'selfish_moving']
x_colors = ['black', 'blue', 'gray', 'green']
x_axis = np.arange(TURNS + 1)

plt.plot(x_axis, random_explore, label=x_labels[0], color=x_colors[0])
plt.plot(x_axis, random_explore_naive, label=x_labels[1], color=x_colors[1])

plt.title(f'Stochastic Random Walk vs. Selfish Exploration')
plt.xlabel('Turns')
plt.ylabel('Mean Reward')
plt.yticks(np.arange(0, 0.5, step=0.05))
plt.legend()

In [None]:
x_labels = ['rwalk', 'naive', 'rwalk_moving', 'naive_moving']
x_colors = ['black', 'blue', 'gray', 'green']
x_axis = np.arange(TURNS + 1)

plt.plot(x_axis, random_explore, label=x_labels[0], color=x_colors[0])
plt.plot(x_axis, random_explore_naive, label=x_labels[1], color=x_colors[1])
plt.plot(x_axis, random_explore_motion, label=x_labels[2], color=x_colors[2])
plt.plot(x_axis, random_explore_naive_motion, label=x_labels[3], color=x_colors[3])

plt.title(f'Stochastic Random Walk vs. Naive Exploration')
plt.xlabel('Turns')
plt.ylabel('Mean Reward')
plt.yticks(np.arange(0, 0.5, step=0.05))
plt.legend()

In [None]:
x_labels = ['neumann', 'n:cyl', 'moore', 'm:bias', 'm:cyl', 'm:cyl:bias', 'rwalk']
x_colors = ['blue', 'orange', 'green', 'red', 'purple', 'pink', 'black']
x_axis = np.arange(TURNS + 1)

plt.plot(x_axis, neumann, label=x_labels[0], color=x_colors[0])
plt.plot(x_axis, neumann_cyl, label=x_labels[1], color=x_colors[1])

plt.plot(x_axis, moore, label=x_labels[2], color=x_colors[2])
plt.plot(x_axis, moore_diff, label=x_labels[3], color=x_colors[3])
plt.plot(x_axis, moore_cyl, label=x_labels[4], color=x_colors[4])
plt.plot(x_axis, moore_diff_cyl, label=x_labels[5], color=x_colors[5])


plt.plot(x_axis, random_walk, label=x_labels[6], color=x_colors[6])

plt.title(f'Stochastic Random Walk Heatmaps')
plt.xlabel('Turns')
plt.ylabel('Reward')
plt.yticks(np.arange(0, 0.3, step=0.02))
plt.legend()

In [None]:
[
    np.square((random_walk - neumann)).mean(),
    np.square((random_walk - neumann_cyl)).mean(),
    np.square((random_walk - moore)).mean(),
    np.square((random_walk - moore_diff)).mean(),
    np.square((random_walk - moore_cyl)).mean(),
    np.square((random_walk - moore_diff_cyl)).mean()
]

In [None]:
plt.plot(x_axis, moore, label=x_labels[2], color=x_colors[2])
plt.plot(x_axis, moore_diff, label=x_labels[3], color=x_colors[3])
plt.plot(x_axis, moore_cyl, label=x_labels[4], color=x_colors[4])
plt.plot(x_axis, moore_diff_cyl, label=x_labels[5], color=x_colors[5])

plt.plot(x_axis, random_walk, label=x_labels[6], color=x_colors[6])

plt.title(f'Stochastic Random Walk Heatmaps')
plt.xlabel('Turns')
plt.ylabel('Reward')
plt.yticks(np.arange(0, 0.3, step=0.02))
plt.legend()

In [None]:
mses = [np.zeros(17, dtype=float), np.zeros(17, dtype=float), np.zeros(17, dtype=float), np.zeros(17, dtype=float)]
for t in range(0, 65, 4):
    DRONES = t if t > 0 else 1
    rwalk = random_walk_experiment()
    for trial in range(HMAP_TRIALS):
        mses[0][t//4] += np.square(rwalk - heatmap_experiment(False, 0.125, 0.125)).mean()
        mses[1][t//4] += np.square(rwalk - heatmap_experiment(False, 0.175, 0.075)).mean()
        mses[2][t//4] += np.square(rwalk - heatmap_experiment(True, 0.125, 0.125)).mean()
        mses[3][t//4] += np.square(rwalk - heatmap_experiment(True, 0.175, 0.075)).mean()
    mses[0][t//4] /= HMAP_TRIALS
    mses[1][t//4] /= HMAP_TRIALS
    mses[2][t//4] /= HMAP_TRIALS
    mses[3][t//4] /= HMAP_TRIALS

In [None]:
x_axis = np.arange(17) * 4
x_axis[0] = 1

plt.plot(x_axis, mses[0], label=x_labels[2], color=x_colors[2])
plt.plot(x_axis, mses[1], label=x_labels[3], color=x_colors[3])
plt.plot(x_axis, mses[2], label=x_labels[4], color=x_colors[4])
plt.plot(x_axis, mses[3], label=x_labels[5], color=x_colors[5])

plt.title(f'Heatmap Variations MSEs')
plt.xlabel('Drones')
plt.ylabel('MSE')
plt.yticks(np.arange(0, 0.00015, step=0.00002))
plt.legend()

In [None]:
moore_cyl_diff_mse = []
for t in range(0, 65, 4):
    DRONES = t if t > 0 else 1
    rwalk = random_walk_experiment()
    exp_mse = np.zeros(HMAP_TRIALS, dtype=float)
    for trial in range(HMAP_TRIALS):
        exp_mse[trial] = np.square(rwalk - heatmap_experiment(True, 0.175, 0.075)).mean()
    moore_cyl_diff_mse.append(exp_mse)

In [None]:
for mse in moore_cyl_diff_mse:
    print(mse.mean(), mse.std())