## Basic PRM Algorithm

Algorithms:

1. construct a roadmap in a probabilistic way (for a given workspace)
  - roadmap is represented by an undirected graph $G= (V,E)$.
  - nodes in $V$ are robot configurations sampled randomly from a uniform  distribution over $Q_{free}$, the free configuration space.
  - edges in $E$ represent connectedness in $Q_{free}$ (hence are undirected) and correspond to collision-free paths; an edge $(q1, q2)$ is such a path connecting configurations $q_1$ and $q_2$.
  - each edge is a local path generated by a local planner, $\Delta$; a simple local planner connects two nodes by a straight path in $Q_{free}$, if such a path exists.

2. query roadmap to solve for a path between a start and a goal configuration ($q_{start}$ and $q_{goal}$, respectively)
  - connect $q_{start}$ and $q_{goal}$ to two nodes $q'$ and $q''$, respectively, in $V$, if possible
  - search the graph for a sequence of edges in $E$ connecting $q'$ to $q''$
  - then recompute local paths and concatenate them from $q_{start}$ to $q_{goal}$

### Basic PRM Pseudocode

In [None]:
# Algorithm 1: roadmap construction algorithm (pseudocode)

# Inputs:
# n: number of nodes to put in the roadmap
# k: number of closest neighbours to examine for each configuration
#
# Output:
# roadmap G = (V,E)

# V = {}
# E = {}
#
# while len(V) < n:
#
#   while True:
#     q = sample_configuration(Q, uniform)
#
#     if is_collision_free(q):
#       V.append(q)
#       break
#
# for q in V:
#   q_neighbourhood = get_k_neighbours(q, k, distance_metric, Q)
#
#     for q_neighbour in q_neighbourhood:
#       if (q, q_neighbour) not in E and \
#           local_path_exists(q, q_neighbour, local_planner):
#         E.append((q, q_neighbour))


In [None]:
# Algorithm 2: query roadmap algorithm (pseudocode)

# Inputs:
# qinit: the initial configuration
# qgoal: the goal configuration
# k: the number of closest neighbours to examine for each configuration
# G = (V,E): the roadmap computed by algorithm 1
#
# Output:
# A path from qinit to qgoal or "failure"

# qinit_neighbourhood = get_neighbours(qinit, k, distance_metric, V)
# qgoal_neighbourhood = get_neighbours(qgoal, k, distance_metric, V)
# V.extend({qinit, qgoal})
# qprime = get_closest_neighbours(qinit, qinit_neighbourhood)  # NOTE: return ranked list of neighbours (same as get_neighbours)
#
# for close_neighbour in qprime:
#   if local_path_exists(qinit, close_neighbour, local_planner):
#     E.append((qinit, close_neighbour))
#     break
#
# q_prime = get_closest_neighbours(qgoal, qgoal_neighbourhood)
# for close_neighbour in q_prime:
#   if local_path_exists(qgoal, close_neighbour, local_planner):
#     E.append((qinit, close_neighbour))
#     break
#
# P = find_shortest_path(start=qinit, end=qgoal, on=G)
# if P is not NULL:
#   return P
# else:
#   return 'failure'

### Algorithm Implementations

In [26]:
# Implementation of Algorithm 1

# Assumptions:
# Q is the closed bounded 2D Euclidean rectangle from [0,0] to [x,y]
# each obstacle is a closed disk in Q defined by centre coordinates and a radius
# local_planner finds a straight line path between points or returns None

from decimal import Decimal
from typing import Generic, Callable, Literal, Protocol, TypeVar


# define generics: Location, NDConfiguration
NDConfiguration = Location = TypeVar("Location", bound=tuple[float,...])
DistanceMetric = Callable[[Location, Location], float]

# define protocol for Roadmap: a static-typing interface for implementing classes
class UndirectedWeightedGraph(Protocol):
  """
  This class represents an Abstract Undirected weighted graph protocol.
    Useful for static type checking and providing an interface contract
    for implementing classes to honour.
  """

  def neighbours(self, node: Location) -> list[Location]:
    """
    Gets the neighbours of the given node.
      A neighbour has an edge from the given node to itself.

      Args:
        node (Node): The node for which neighbours are required.

      Returns:
        list[Node]: A list of all neighbouring Nodes.
    """
    ...

  def cost(self, fro: Location, to: Location) -> float:
    """
    Calculates the weight of the edge between fro and to nodes.

      Returns:
        float: The weight of the edge
    """
    ...

# define vector class
class Vector2D:
  """
    This class represents a 2D vector.
    Args:
      x (float): The x-coordinate of the vector.
      y (float): The y-coordinate of the vector.

    Attributes:
      x (float): The x-coordinate of the vector.
      y (float): The y-coordinate of the vector.

    Methods:
      norm(self, vector): Calculate the magnitude of the vector.
      __sub__(self, other): Overload the minus operator to perform vector subtraction.
      __add__(self, other): Overload the plus operator to perform vector addition.
      distance(self, other): Calculate the distance between two vectors.
      displacement(self, other): Aliases vector subtraction.
      unit_vector(self): Calculate the unit vector of the vector.

    Sample code:
      ```python
        ### Create two vector instances
        v1 = Vector(3, 4)
        v2 = Vector(1, 2)

        ### Calculate the norm of v1
        print(Vector.norm(v1))  # Output: 5.0

        ### Calculate the displacement between v1 and v2
        displacement = v1 - v2
        print(displacement.x, displacement.y)  # Output: (2, 2)

        ### Calculate the distance between v1 and v2
        print(v1.distance(v2))  # Output: 2.8284271247461903

        ### Calculate the unit vector of v1
        unit_v1 = v1.unit_vector()
        print(unit_v1.x, unit_v1.y)  # Output: (0.6, 0.8)
      ```
  """

  def __init__(self, x, y):
    self.x = x
    self.y = y

  @classmethod
  def from_configuration2D(cls, config):
    """ create Vector2D object from a Configuration2D object """

    return cls(config.x, config.y)

  @classmethod
  def get_displacement_vector(cls, start_vector, end_vector):
    """ Calculate displacement vector that points from start to end """

    return end_vector - start_vector

  # define static norm method
  @staticmethod
  def norm(vector) -> float:
    """ Calculate the magnitude(length) of the vector """

    return (vector.x ** 2 + vector.y ** 2) ** 0.5

  def __sub__(self, other):
    """ Overload the minus operator to perform vector subtraction """

    return Vector2D(self.x - other.x, self.y - other.y)

  def __add__(self, other):
    """ Overload the plus operator to perform vector addition """

    return Vector2D(self.x + other.x, self.y + other.y)

  def __truediv__(self, scalar):
    """ Overload division operator to perform scalar division """

    # TODO: catch divide by zero error
    return self * (1 / scalar)

  def __mul__(self, scalar):
    """ Overload the multiply operator to perform scalar multiplication """

    return Vector2D(self.x * scalar, self.y * scalar)

  def __eq__(self, other):
    return (self.x, self.y) == (other.x, other.y)

  def __hash__(self):
    return hash((self.x, self.y))

  def __repr__(self):
    return f'Vector2D({self.x},{self.y})'

  def distance(self, other):
    """ Calculate the scalar distance between two vectors """

    return Vector2D.norm(other - self)

  def unit_vector(self):
    """ Calculate the unit vector """

    mag = Vector2D.norm(self)
    return self / mag

  def is_within_rho(self, other, rho: float):
    # check that distance between self and other is less than or equal to rho
    return self.distance(other) <= Decimal(value=rho)

  def as_tuple(self):
    return (self.x, self.y)

  def to_configuration2D(self):
    return Configuration2D(self.x, self.y)

# define configuration class with immutable x,y coordinates
# TODO: possible optimisation is to define this as a static type
# Configuration2D = tuple[float, float]
class Configuration2D(Generic[NDConfiguration]):
  """ Class representing a 2D configuration. """

  def __init__(self, x, y):
    """ Initialize the 2D configuration with x and y coordinates """

    self.x = x
    self.y = y

  # TODO: Implement immutable coordinates and update docstring
  # @property.method()
  # @prop.setter

  @classmethod
  def from_vector2D(cls, vector):
    """ create Configuration2D object from a Vector2D object """

    return cls(vector.x, vector.y)

  def to_vector2D(self):
    return Vector2D(self.x, self.y)

  def as_tuple(self):
    return (self.x, self.y)

  def __getitem__(self, index):
    return self.as_tuple()[index]

  def __eq__(self, other):
    return (self.x, self.y) == (other.x, other.y)

  def __hash__(self):
    return hash((self.x, self.y))

  def __str__(self):
    return f'Configuration2D({self.x},{self.y})'

# define obstacle class
class DiskObstacle2D:
  """
    This class represents a disk-shaped obstacle.

    Args:
      centre (tuple): The (x, y) coordinates of the center of the disk.
      radius (float): The radius of the disk.

    Attributes:
      centre (tuple): The (x, y) coordinates of the center of the disk.
      radius (float): The radius of the disk.
  """

  def __init__(self, centre, radius):
    self.centre: Location = centre
    self.radius = radius

  def colliding_with(self, configuration):
    # check if the configuration is within the obstacle
    return self.contains(configuration)

  def contains(self, configuration):
    from math import hypot
    # calculate the distance between the configuration and the obstacle centre
    distance = hypot(configuration.x - self.centre[0],
                     configuration.y - self.centre[1])

    return distance <= self.radius

# define bounds class
class RectangularBounds2D:

  def __init(self,
             x_bounds: tuple[float, float],
             y_bounds: tuple[float, float]):
    # set boundary extremes
    self.x_min = Decimal(min(x_bounds))
    self.x_max = Decimal(max(x_bounds))
    self.y_min = Decimal(min(y_bounds))
    self.y_max = Decimal(max(y_bounds))

    self.x_bounds = (self.x_min, self.x_max)
    self.y_bounds = (self.y_min, self.y_max)

  def is_within_bounds(self, configuration):
    # check if the configuration is within bounds
    in_x_bounds = configuration.x >= self.x_min and configuration.x <= self.x_max
    in_y_bounds = configuration.y >= self.y_min and configuration.y <= self.y_max

    if in_x_bounds and in_y_bounds:
      return True

    return False

# define configuration space class
class ConfigurationSpace2D:
  def __init__(self, bounds, obstacle_list):
    # initialise configuration space with bounds, obstacle list, metric
    self.bounds = bounds  # TODO: create from tuple and preferred bounds class
    self.obstacle_list = obstacle_list

  def is_collision_free(self, configuration):
    # check if configuration is in bounds
    if not self.bounds.is_within_bounds(configuration):
      return False

    # check if the given configuration is in collision with any obstacles
    for obstacle in self.obstacle_list:
      if obstacle.colliding_with(configuration):
        return False

    return True

  def distance_metric(self, start_configuration, goal_configuration) -> float:
    """
    Implement distance metric protocol.

      Returns:
        float:
          The scalar distance between start_configuration and goal_configuration
    """

    return Vector2D.norm(goal_configuration.to_vector2D() -
                  start_configuration.to_vector2D())

# define local planner class: construct obstacle-free edge between q' & q''
class SimpleLocalPlanner2D:
  def __init__(self, Q_space, resolution=0.01):
    self.configuration_space = Q_space
    self.resolution = Decimal(resolution)

  def get_increment_vector(self, q_start, q_goal):
    v_start, v_goal = q_start.to_vector2D(), q_goal.to_vector2D()
    displacement_vector = Vector2D.get_displacement_vector(v_start, v_goal)
    direction_vector = displacement_vector.unit_vector()
    increment_vector = direction_vector * self.resolution

    return increment_vector

  def plan(self,
           start: Configuration2D,
           goal: Configuration2D) -> list[Configuration2D] | None:
    # generate resolution-apart configurations on straight line from start to goal
    # get increment vector (direction vector from start to goal scaled to resolution)
    # increment start to end
    # if collision: return None

    self.path = []

    v_increment = self.get_increment_vector(start, goal)
    current_config = start.to_vector2D()

    while self.configuration_space.is_collision_free(current_config):
      self.path.append(current_config)
      current_config += v_increment

      if current_config.is_within_rho(goal.to_vector2D(), rho=self.resolution):
        break

    if not current_config.is_within_rho(goal.to_vector2D(), rho=self.resolution):
      # no collision-free path exists
      return None

    # collision-free path exists
    assert self.configuration_space.is_collision_free(
        current_config), "current_config must be collision-free"

    assert current_config.is_within_rho(
        goal.to_vector2D(), self.resolution), "current_config must have reached goal"

    self.path = [x.to_configuration2D() for x in self.path]
    self.path.append(goal)
    return self.path

class Roadmap:
  """
  This class implements a roadmap as an UndirectedWeightedGraph.
  """

  def __init__(self, cost: DistanceMetric):
    self.edges: dict[Location, list[Location]] = {}
    self.cost: DistanceMetric = cost

  def neighbours(self, node: Location) -> list[Location]:
    return self.edges[node]

  def contains(
      self,
      needle: Location | tuple[Location, Location] = None,
      mode: Literal['n', 'e'] = "e"
    ) -> bool:
    # check if roadmap includes given node or edge

    if mode == "n":
      # check for node
      return needle in self.edges
    else:
      # check for edge
      # TODO: refactor edge in edges, check
      return (
          needle[0] in self.edges.get(
              needle[1], []) or needle[1] in self.edges.get(needle[0], [])
      )

# define ...
class ProbabilisticRoadmap:
  def __init__(self,
               num_samples: int,
               num_nearest_neighbours: int,
               Q: ConfigurationSpace2D,
               local_planner: SimpleLocalPlanner2D = None):
    self.configuration_space = Q
    self.num_samples = num_samples
    self.num_nearest_neighbours = num_nearest_neighbours
    self.local_planner = local_planner

    # initialise roadmap graph, G = (V,E)
    self.nodes: list[Location] = []
    self.edges: list[tuple[Location, Location]] = []
    self.roadmap: Roadmap = Roadmap(self.configuration_space.distance_metric)
    self._build_roadmap()

  def _build_roadmap(self):
    """
      Constructs the roadmap by sampling collision-free configurations
      and connecting them using a local planner.

      Returns:
        None
    """

    # sample num_sample configs
    while len(self.nodes) < self.num_samples:
      in_collision = True
      while in_collision:
        q = self.sample_configuration()
        in_collision = not self.configuration_space.is_collision_free(q)

      # add node for q to roadmap
      self.nodes.append(q)

    # add nodes to roadmap
    self.roadmap.edges = dict.fromkeys(self.nodes, [])

    # connect nodes
    for node in self.roadmap.edges:
      # get nearest neighbours
      neighbourhood = self.get_nearest_neighbours(node)

      # find local path to each neighbour
      for neighbour in neighbourhood:
        if not self.roadmap.contains(needle=(node, neighbour)) and \
           self.local_path_exists(node, neighbour):
          # add edge to roadmap
          self.roadmap.edges[node].append(neighbour)

  def sample_configuration(self):
    """
      Samples a collision-free configuration within the configuration space.

      Returns:
        Configuration2D: The sampled configuration.
    """

    return sample_uniform_configuration2D(self.configuration_space.bounds)

  def is_collision_free(self, configuration):
    """
      Checks if the given configuration is collision-free within the configuration space.

      Args:
        configuration (Configuration2D): The configuration to check for collisions.

      Returns:
        bool: True if the configuration is collision-free, False otherwise.
    """

    return self.configuration_space.is_collision_free(configuration)

  def get_nearest_neighbours(self, node: Location) -> list[Location]:
    """
      Finds the k nearest neighbors of the given node within the roadmap.

      Args:
        node (Location): The node for which to find nearest neighbors.

      Returns:
        list[Location]: A list of k nearest neighbor nodes.
    """
    return(
        get_nearest_neighbours2D(node, self.num_nearest_neighbours, self.nodes)
    )

  def local_path_exists(self, q_start: Configuration2D, q_goal: Configuration2D):
    """
    Checks if a collision-free local path exists between two configurations.

      Args:
        start_configuration (Configuration2D): The start configuration.
        end_configuration (Configuration2D): The end configuration.

      Returns:
        bool: True if local path exists, False otherwise.
    """

    if self.get_local_path(self, q_start, q_goal):
      return True

    return False

  def get_local_path(self, q_start: Configuration2D, q_goal: Configuration2D):
    """
    Checks for a collision-free path between two configurations.

      Args:
        start_configuration (Configuration2D): The start configuration.
        end_configuration (Configuration2D): The end configuration.

      Returns:
        List[Configuration2D] | None:
          A collision-free local path if it exists, None otherwise.
    """

    if self.local_planner is None:
      self.local_planner = SimpleLocalPlanner2D(self.configuration_space)

    return self.local_planner.plan(q_start, q_goal)

  def query_roadmap(self,
                    start_node: Location,
                    goal_node: Location,
                    num_nearest_neighbours: int = None):
    # get path from q_start to q_goal or return "Failure"

    k = num_nearest_neighbours or self.num_nearest_neighbours
    self.roadmap.edges[start_node] = []
    self.roadmap.edges[goal_node] = []
    start_neighbourhood = self.get_nearest_neighbours(start_node)
    goal_neighbourhood = self.get_nearest_neighbours(goal_node)

    connected = False
    # connect start and goal within the roadmap
    for neighbour in start_neighbourhood:
      if self.local_path_exists(start_node, neighbour):
        self.roadmap.edges[start_node].append(neighbour)
        connected = True
        break

    # terminate if start not connected
    if not connected:
      return "Failure"

    connected = False
    # connect goal within roadmap
    for neighbour in goal_neighbourhood:
      if self.local_path_exists(goal_node, neighbour):
        self.roadmap.edges[goal_node].append(neighbour)
        connected = True
        break

    # terminate if goal not connected
    if not connected:
      return "Failure"

    # assert current state
    assert (start_node in self.roadmap.edges), "start must have edge in roadmap"
    assert (goal_node in self.roadmap.edges), "goal must have edge in roadmap"

    # find path
    # TODO: implement A* search
    came_from, _ = a_star_search2D(
      graph=self.roadmap,
      start=start_node,
      goal=goal_node,
      heuristic=self.configuration_space.distance_metric
    )

    path: list[Location] = self.reconstruct_path(came_from, start_node, goal_node)

    return path or "Failure"

  def reconstruct_path(
      came_from: dict[Location, Location | None],
      start: Location,
      goal: Location,
      reverse: bool = True
    ) -> list[Location] | None:
    # pre-condition: [start, goal], came_from:{location: location}
    # post-condition: [start, ..., goal]
    # invariant: [..., came_from[current], current]
    # guard: current not None i.e. reached start

    reconstruction: list[Location] = []

    # early exit condition
    if goal not in came_from: return None

    # assert came_from assumption
    assert (came_from[start] is None), "start node must have None as parent"

    current: Location = goal
    while current is not None:
      reconstruction.append(current)
      current = came_from[current]

    if reverse:
      reconstruction.reverse()

    return reconstruction


# helper
import random
import decimal


def sample_uniform_configuration2D(
    bounds: RectangularBounds2D, resolution=0.01) -> Configuration2D:
  """
    Sample a 2D configuration from a uniform distribution over the given bounds.

    Args:
      bounds (RectangularBounds2D):
        The bounds within which to sample the 2D configuration.

    Returns:
      Configuration2D:
        The sampled 2D configuration in the form (x, y).

    Sample code:
      bounds = RectangularBounds2D((0, 10), (0, 10))
      sampled_config = sample_uniform_configuration2D(bounds)
      print(sampled_config)
  """

  # Sample x coordinate
  x_value = random.uniform(bounds.x_min, bounds.x_max)
  # Round the x value to two decimal places
  x_value = decimal.Decimal(x_value).quantize(decimal.Decimal("0.01"))
  # Sample y coordinate
  y_value = random.uniform(bounds.y_min, bounds.y_max)
  # Round the y value to two decimal places
  y_value = decimal.Decimal(y_value).quantize(decimal.Decimal("0.01"))
  # Return (x, y) coordinates as floats
  return Configuration2D(float(x_value), float(y_value))

def get_nearest_neighbours2D(
    node: Configuration2D,
    num_nearest_neighbours: int,
    node_list: list[Configuration2D]
  ) -> list[Configuration2D]:
  """
    Finds the k-nearest neighbors of the given node within the roadmap.

    Args:
      node (Configuration2D): The node for which to find nearest neighbors.
      num_nearest_neighbours (int): The number, k, of nearest nodes to return.
      node_list (list[Configuration2D]): List of nodes to search.

    Returns:
      list[Configuration2D]: A list of k nearest neighbor nodes.
  """

  # create node_dict to hold neighbour location and distance from node
  node_dict: dict[Location, float] = dict()
  origin_vec: Vector2D = node.to_vector2D()

  def default_metric(neighbour: Configuration2D) -> float:
    return origin_vec.distance(
        neighbour.to_vector2D()
    )

  nearest_neighbours = sorted(node_list, key=default_metric)

  assert nearest_neighbours[0] == node, "node should be the closest to itself"
  return nearest_neighbours[1:num_nearest_neighbours+1]


# use PriorityQueue for A* search
from queue import PriorityQueue
import math

def a_star_search2D(
    graph: UndirectedWeightedGraph,
    start: Configuration2D,
    goal: Configuration2D,
    heuristic: Callable[[Configuration2D, Configuration2D], float]
  ) -> list[Location]:

  frontier = PriorityQueue()  # priority value treated as ordinals
  frontier.put(item=(0, start))  # (priority, entry)

  came_from: dict[Configuration2D, Configuration2D|None] = {}
  cost_from_start: dict[Configuration2D, float] = {}

  came_from[start] = None
  cost_from_start[start] = 0

  while not frontier.empty():
    _, current_node = frontier.get()

    if current_node == goal:
      break

    for next_node in graph.neighbours(current_node):
      new_cost_from_start = cost_from_start[current_node] + graph.cost(
          fro=current_node, to=next_node)

      if new_cost_from_start < cost_from_start.get(next_node, math.inf):
        cost_from_start[next_node] = new_cost_from_start
        came_from[next_node] = current_node
        priority = new_cost_from_start + heuristic(next_node, goal)
        frontier.put(item=(priority, next_node))

  return came_from, cost_from_start


### Tasks and Scratch

In [None]:
# TnT:

# implement TODOs ()
# implement c-space visualisation ()
# implement rrt
# implement cpp version ()


In [24]:
# start = Node(Configuration2D(9,0))
# ss = Node(Configuration2D(9,1))
# sa = Node(Configuration2D(9,0o7))
# sd = Node(Configuration2D(9,0o1))
# sf = Node(Configuration2D(9,0o3))


# {ss,sd,sa,sf,start}
# d = {}
# list(d.keys())

# dict.fromkeys([1,2,3,4,5], [])

# assert(2), "iii"

# None or "Failure"

# from functools import partial

# def sum(a, b): return a+b

# sum(9,0)

# add12 = partial(sum, b=12)
# add12(1)



13

### PRM Tests

### PRM Visualisation