## Implementation of Naive Spiral with interface

In [159]:
import numpy as np
import h3
import folium
import itertools
from typing import Tuple
from scipy.spatial.distance import euclidean

In [160]:
from abc import ABC, abstractmethod

# Define an interface using an abstract base class
class PathFinder(ABC):

    def __init__(self):
        pass

    @abstractmethod
    def find_next_step(self, current_position: Tuple[int, int], prob_map: np.ndarray) -> Tuple[int, int]:
        """
        Find the next step and waypoints to follow.

        Args:
            current_position (Tuple[int, int]): Current coordinates (x, y) lat, long
            prob_map: np.ndarray - of size 7*..7 equal to size of dimension
        Returns:
            Tuple[int, int] - Next step coordinates in long lat (x, y).
        """
        pass

In [161]:
# Util
def hex_to_binary(hex_string):
    try:
        # Remove the '0x' prefix if it exists
        hex_string = hex_string.lstrip('0x')
        # Convert the hexadecimal string to an integer
        decimal_value = int(hex_string, 16)
        # Convert the integer to a binary string
        binary_string = bin(decimal_value)
        return binary_string
    except ValueError:
        return "Invalid hexadecimal input"
    
def octal_list_to_binary(octal_list):
    binary_result = ''
    for octal_digit in octal_list:
        # Convert each octal digit to its binary representation and append to the result
        binary_digit = bin(octal_digit)[2:].zfill(3)
        binary_result += binary_digit
    return binary_result

def binary_to_octal_list(binary_string, n):
    try:
        # Remove the '0b' prefix if it exists
        binary_string = binary_string.lstrip('0b')
        # Get the last 'n' characters from the binary string
        last_n_binary = binary_string[-n*3:]
        # Pad the binary string with zeros if its length is not a multiple of 3
        while len(last_n_binary) % 3 != 0:
            last_n_binary = '0' + last_n_binary
        # Split the binary string into groups of 3 characters
        binary_groups = [last_n_binary[i:i+3] for i in range(0, len(last_n_binary), 3)]
        # Convert each group to decimal and store in a list
        octal_list = [int(group, 2) for group in binary_groups]
        return octal_list
    except ValueError:
        return "Invalid binary input"

def binary_to_hex(binary_string):
    try:
        # Remove the '0b' prefix if it exists
        binary_string = binary_string.lstrip('0b')
        # Convert the binary string to an integer
        decimal_value = int(binary_string, 2)
        # Convert the integer to a hexadecimal string
        hex_string = hex(decimal_value)[2:]
        return hex_string
    except ValueError:
        return "Invalid binary input"
    
def hex_to_array_index(hex_string: str, prob_map: np.ndarray) -> Tuple[int, int, int, int]:
    """Convert a hexagon's H3 string index into an array index.

    Args:
        hex_string (str): The H3 index of the hexagon in string.
        prob_map (np.ndarray): A numpy array of (7,7,7,7) representing the probability in each hexagon.

    Returns:
        Tuple[int, int, int, int]: Corresponding indices in the `prob_map` for the given hexagon.
    """
    binary_input = hex_to_binary(hex_string)
    octal_output = binary_to_octal_list(binary_input, len(prob_map.shape))
    return tuple(octal_output)

def array_index_to_hex(centre_hex: str, indices: Tuple[int, int, int, int] ,prob_map: np.ndarray) -> str:
    """Convert a array index of size 4 into the corresponding hexagon's H3 string index.

    Args:
        centre_hex (str): The H3 index of the central reference hexagon in string format needed for the prefix.
        indices (Tuple[int, int, int, int]): Indices of size 4 in the `prob_map` to be converted.
        prob_map (np.ndarray): A numpy array of (7,7,7,7) representing the probability in each hexagon.

    Returns:
        str: The corresponding H3 index of the given array indices.
    """
    centre_bin = hex_to_binary(centre_hex)
    bin_prefix = centre_bin[:-len(prob_map.shape) * 3]
    hex_binary = bin_prefix + octal_list_to_binary(indices)
    hex_idx = binary_to_hex(hex_binary)
    return hex_idx

def distance_between_2_hexas(a: str, b: str) -> float:
    """Calculate the Euclidean distance between the centers of two hexagons.

    Args:
        a (str): The H3 index of the first hexagon in string.
        b (str): The H3 index of the second hexagon in string.

    Returns:
        float: The Euclidean distance between the two hexagon centers using latitude and longitude in the unit of the input
    """
    dist = euclidean(h3.h3_to_geo(a), h3.h3_to_geo(b))
    return dist


In [162]:
# Utils for visualisation

# Return a colour hex value from a value between 0-1
def gradient_color(value):
    if value < 0:
        value = 0
    elif value > 1:
        value = 1

    r = int(255 * value)
    b = int(255 * value)
    g = int(255 * value)

    hex_string = '#{:02x}{:02x}{:02x}'.format(r, b, g)
    return hex_string

# Add H3 hexagons as polygons to the map, m
def add_hex_to_map(hexagon_values, m):
    keys = list(hexagon_values.keys())
    num_keys = len(keys)

    for i, hexagon_id in enumerate(hexagon_values):
        vertices = h3.h3_to_geo_boundary(hexagon_id)
        color = color = gradient_color(hexagon_values[hexagon_id])
        folium.Polygon(locations=vertices, color=color, fill=True, fill_opacity=0.6).add_to(m)
        folium.map.Marker(h3.h3_to_geo(hexagon_id),
                icon=folium.DivIcon(
                    icon_size=(10,10),
                    icon_anchor=(5,14),
                    html=f'<div style="font-size: 8pt">{i}</div>'
                )
                ).add_to(m)

In [163]:
class BayesianHexSearch(PathFinder):
    """A pathfinding algorithm in the H3 hexagonal grid system using probability.
    """

    def __init__(self, res: int, pos: Tuple[float, float]) -> None:
        """Initializes with given resolution and starting position

        Args:
            res (int): The H3 resolution for the hexagonal grid.
            pos (Tuple[float, float]): Starting position as a tuple of (latitude, longitude).
        """
        self.res = res
        self.trajectory = []
        self.center_hex = h3.geo_to_h3(pos[0],pos[1],resolution=self.res)

    def find_next_step(self, current_position: Tuple[float, float], prob_map: np.ndarray) -> Tuple[int, int]:
        """Determines the next waypoint based on current position and a probability map.

        Args:
            current_position (Tuple[float, float]): Current position as a tuple of (latitude, longitude).
            prob_map (np.ndarray): A numpy array of (7,7,7,7) representing the probability in each hexagon.

        Returns:
            Tuple[int, int]: Next waypoint as a tuple of (latitude, longitude).
        """
        # Initialise current position 
        curr_hexagon = h3.geo_to_h3(current_position[0],current_position[1],resolution=self.res)

        # Hex index of the highest probability
        max_index_flat = np.argmax(prob_map)
        max_indices = np.unravel_index(max_index_flat, prob_map.shape)
        max_hex_index = array_index_to_hex(self.center_hex, max_indices, prob_map)

        # Get neighbours
        neighbours = h3.k_ring(curr_hexagon, 1)
        
        # Initialise variables to find the nest best neighbour
        best_neighbour = None
        highest_score = 0

        for neighbour in neighbours:
            dist = distance_between_2_hexas(neighbour, max_hex_index)
            neighbour_prob = prob_map[hex_to_array_index(neighbour, prob_map)]
            score = dist *  100 +  neighbour_prob * 10
            if score > highest_score:
                best_neighbour = neighbour
                highest_score = score
        
        return h3.h3_to_geo(best_neighbour)


In [164]:
def update_map(prob_map: np.ndarray, waypoint: Tuple[float, float], res, f) -> np.ndarray:
    """Update the probability map using Baye's theorem.

    Args:
        prob_map (np.ndarray): A numpy array of (7,7,7,7) representing the probability in each hexagon.
        waypoint (Tuple[float, float]): Current search position as a tuple of (latitude, longitude).
        res (_type_): The H3 resolution of the hexagon associated with the position.
        f (_type_): The probability of detecting a person.

    Returns:
        np.ndarray: The updated probability map.
    """
    hex_waypoint = h3.geo_to_h3(waypoint[0], waypoint[1], resolution= res)
    array_index_waypoint = hex_to_array_index(hex_waypoint, prob_map)
    # Prior
    prior = prob_map[array_index_waypoint]
    # Posterior
    posterior = prior*(1-f)/(1-prior*f)
    prob_map[array_index_waypoint] = posterior
    # Disrtribute
    prob_map /= prob_map.sum()
    return prob_map

In [165]:
centre = (1.340640225800291, 103.962424829341)
res = 15
f = 0.99
path_finder = BayesianHexSearch(15, centre)
fake_map = np.random.rand(7, 7, 7, 7)
fake_map /= fake_map.sum()

output = {}
waypoint = centre
steps = 200
for i in range(steps):
    # Update Prob Map
    fake_map = update_map(fake_map, waypoint, res, f)
    waypoint = path_finder.find_next_step(waypoint, fake_map)
    output[h3.geo_to_h3(waypoint[0],waypoint[1],resolution = 15)] = (steps-i)/steps

In [166]:
# Vsualised
m = folium.Map(location=[centre[0],centre[1]], zoom_start=23, max_zoom=25)
add_hex_to_map(output, m)
display(m)