Skip to content

Commit

Permalink
v2.4.0 networkx library based graph
Browse files Browse the repository at this point in the history
Use networkx library
  • Loading branch information
jannikmi committed Aug 19, 2022
2 parents 322871e + ae8c2ee commit 4575e1b
Show file tree
Hide file tree
Showing 12 changed files with 318 additions and 508 deletions.
14 changes: 11 additions & 3 deletions CHANGELOG.rst
@@ -1,10 +1,18 @@
Changelog
=========

TODOs

* pending major release: remove separate prepare step?! initialise in one step during initialisation
* Numba JIT compilation of utils. line speed profiling for highest impact of refactoring
* allow input of complex geometries: input coords, and edges separately (polygons are special case)


2.4.0 (2022-08-18)
-------------------

* A* and graph representation based on ``networkx`` library -> new dependency

TODO pending major release: remove separate prepare step?! initialise in one step during initialisation
TODO Numba JIT compilation of utils. line speed profiling for highest impact of refactoring
TODO improve A* implementation (away from OOP)


2.3.0 (2022-08-18)
Expand Down
3 changes: 2 additions & 1 deletion extremitypathfinder/__init__.py
@@ -1,3 +1,4 @@
from .extremitypathfinder import PolygonEnvironment
from .utils import load_pickle

__all__ = ("PolygonEnvironment",)
__all__ = ("PolygonEnvironment", "load_pickle")
2 changes: 1 addition & 1 deletion extremitypathfinder/command_line.py
Expand Up @@ -2,7 +2,7 @@

from extremitypathfinder import PolygonEnvironment
from extremitypathfinder.configs import BOUNDARY_JSON_KEY, HOLES_JSON_KEY
from extremitypathfinder.helper_fcts import read_json
from extremitypathfinder.utils import read_json

JSON_HELP_MSG = (
"path to the JSON file to be read. "
Expand Down
238 changes: 118 additions & 120 deletions extremitypathfinder/extremitypathfinder.py
@@ -1,7 +1,7 @@
import pickle
from copy import deepcopy
from typing import Dict, List, Optional, Tuple
from typing import Dict, Iterable, List, Optional, Set, Tuple

import networkx as nx
import numpy as np

from extremitypathfinder.configs import (
Expand All @@ -12,28 +12,19 @@
PATH_TYPE,
InputCoords,
)
from extremitypathfinder.helper_classes import DirectedHeuristicGraph
from extremitypathfinder.helper_fcts import (
from extremitypathfinder.utils import (
check_data_requirements,
cmp_reps_n_distances,
compute_extremity_idxs,
compute_graph,
convert_gridworld,
find_identical_single,
find_visible,
get_repr_n_dists,
get_distance,
is_within_map,
)

# TODO possible to allow polygon consisting of 2 vertices only(=barrier)? lots of functions need at least 3 vertices atm


# is not a helper function to make it an importable part of the package
def load_pickle(path=DEFAULT_PICKLE_NAME):
print("loading map from:", path)
with open(path, "rb") as f:
return pickle.load(f)


# TODO document parameters
class PolygonEnvironment:
"""Class allowing to use polygons to represent "2D environments" and use them for path finding.
Expand All @@ -43,15 +34,18 @@ class PolygonEnvironment:
[1] Vinther, Anders Strand-Holm, Magnus Strand-Holm Vinther, and Peyman Afshani.
"`Pathfinding in Two-dimensional Worlds
<https://www.cs.au.dk/~gerth/advising/thesis/anders-strand-holm-vinther_magnus-strand-holm-vinther.pdf>`__"
TODO document parameters
"""

nr_edges: int
prepared: bool = False
holes: List[np.ndarray]
extremity_indices: List[int]
reprs_n_distances: Dict[int, np.ndarray]
graph: DirectedHeuristicGraph
temp_graph: Optional[DirectedHeuristicGraph] = None # for storing and plotting the graph during a query
graph: nx.DiGraph
# TODO
temp_graph: Optional[nx.DiGraph] = None # for storing and plotting the graph during a query
boundary_polygon: np.ndarray
coords: np.ndarray
edge_vertex_idxs: np.ndarray
Expand Down Expand Up @@ -116,7 +110,6 @@ def store(
offset = 0
extremity_idxs = set()
for poly in list_of_polygons:

poly_extr_idxs = compute_extremity_idxs(poly)
poly_extr_idxs = {i + offset for i in poly_extr_idxs}
extremity_idxs |= poly_extr_idxs
Expand Down Expand Up @@ -148,13 +141,15 @@ def store(
mask[i] = True

self.nr_vertices = nr_total_pts
# start and goal points will be stored after all polygon coordinates
self.idx_start = nr_total_pts
self.idx_goal = nr_total_pts + 1
self.edge_vertex_idxs = edge_vertex_idxs
self.vertex_edge_idxs = vertex_edge_idxs
self.coords = coords
self.extremity_indices = extremity_idxs
self.extremity_mask = mask

self.reprs_n_distances = {i: get_repr_n_dists(i, coords) for i in extremity_idxs}
self.reprs_n_distances = {i: cmp_reps_n_distances(i, coords) for i in extremity_idxs}

def store_grid_world(
self,
Expand Down Expand Up @@ -208,11 +203,6 @@ def prepare(self):
if self.prepared:
raise ValueError("this environment is already prepared. load new polygons first.")

nr_extremities = len(self.extremity_indices)
if nr_extremities == 0:
self.graph = DirectedHeuristicGraph()
return

self.graph = compute_graph(
self.nr_edges,
self.extremity_indices,
Expand All @@ -224,16 +214,36 @@ def prepare(self):
)
self.prepared = True

def within_map(self, coords: InputCoords):
def within_map(self, coords: np.ndarray) -> bool:
"""checks if the given coordinates lie within the boundary polygon and outside of all holes
:param coords: numerical tuple representing coordinates
:return: whether the given coordinate is a valid query point
"""
boundary = self.boundary_polygon
holes = self.holes
p = np.array(coords, dtype=float)
return is_within_map(p, boundary, holes)
return is_within_map(coords, self.boundary_polygon, self.holes)

def get_visible_idxs(
self,
origin: int,
candidates: Iterable[int],
coords: np.ndarray,
vert_idx2repr: np.ndarray,
vert_idx2dist: np.ndarray,
) -> Set[int]:
# Note: points with equal coordinates should not be considered visible (will be merged later)
candidates = {i for i in candidates if not vert_idx2dist[i] == 0.0}
edge_idxs2check = set(range(self.nr_edges))
return find_visible(
origin,
candidates,
edge_idxs2check,
coords,
vert_idx2repr,
vert_idx2dist,
self.extremity_mask,
self.edge_vertex_idxs,
self.vertex_edge_idxs,
)

def find_shortest_path(
self,
Expand All @@ -259,127 +269,115 @@ def find_shortest_path(
if not self.prepared:
self.prepare()

if verify and not self.within_map(start_coordinates):
coords_start = np.array(start_coordinates, dtype=float)
coords_goal = np.array(goal_coordinates, dtype=float)
if verify and not self.within_map(coords_start):
raise ValueError("start point does not lie within the map")
if verify and not self.within_map(goal_coordinates):
if verify and not self.within_map(coords_goal):
raise ValueError("goal point does not lie within the map")

coords_start = np.array(start_coordinates)
coords_goal = np.array(goal_coordinates)
if np.array_equal(coords_start, coords_goal):
# start and goal are identical and can be reached instantly
return [start_coordinates, goal_coordinates], 0.0

nr_edges = self.nr_edges
vertex_edge_idxs = self.vertex_edge_idxs
edge_vertex_idxs = self.edge_vertex_idxs
# temporarily extend data structures
extremity_mask = np.append(self.extremity_mask, (False, False))
coords = np.append(self.coords, (coords_start, coords_goal), axis=0)
idx_start = self.nr_vertices
idx_goal = self.nr_vertices + 1

# start and goal nodes could be identical with one ore more of the vertices
start = self.idx_start
goal = self.idx_goal
# temporarily extend data structure
# Note: start and goal nodes could be identical with one ore more of the vertices
# BUT: this is an edge case -> compute visibility as usual and later try to merge with the graph

# create temporary graph
# DirectedHeuristicGraph implements __deepcopy__() to not change the original precomputed self.graph
# but to still not create real copies of vertex instances!
graph = deepcopy(self.graph)
# TODO make more performant, avoid real copy
# graph = self.graph
coords = np.append(self.coords, (coords_start, coords_goal), axis=0)
self._coords_tmp = coords # for plotting including the start and goal indices

# check the goal node first (earlier termination possible)
idx_origin = idx_goal
# the visibility of only the graphs nodes has to be checked (not all extremities!)
# points with the same angle representation should not be considered visible
# (they also cause errors in the algorithms, because their angle repr is not defined!)
origin = goal
# the visibility of only the graph nodes has to be checked (not all extremities!)
# IMPORTANT: also check if the start node is visible from the goal node!
# NOTE: all edges are being checked, it is computationally faster to compute all visibilities in one go
candidate_idxs = self.graph.all_nodes
candidate_idxs.add(idx_start)
edge_idxs2check = set(range(nr_edges))
vert_idx2repr, vert_idx2dist = get_repr_n_dists(idx_origin, coords)
candidate_idxs = {i for i in candidate_idxs if not vert_idx2dist[i] == 0.0}
visible_idxs = find_visible(
idx_origin,
candidate_idxs,
edge_idxs2check,
extremity_mask,
coords,
vertex_edge_idxs,
edge_vertex_idxs,
vert_idx2repr,
vert_idx2dist,
)
visibles_n_distances_map = {i: vert_idx2dist[i] for i in visible_idxs}

if len(visibles_n_distances_map) == 0:
candidate_idxs: Set[int] = set(self.graph.nodes)
candidate_idxs.add(start)
repr_n_dists = cmp_reps_n_distances(origin, coords)
self.reprs_n_distances[origin] = repr_n_dists
vert_idx2repr, vert_idx2dist = repr_n_dists
visibles_goal = self.get_visible_idxs(origin, candidate_idxs, coords, vert_idx2repr, vert_idx2dist)
if len(visibles_goal) == 0:
# The goal node does not have any neighbours. Hence there is not possible path to the goal.
return [], None

for i, d in visibles_n_distances_map.items():
if i == idx_start:
# IMPORTANT geometrical property of this problem: it is always shortest to directly reach a node
# instead of visiting other nodes first (there is never an advantage through reduced edge weight)
# -> when goal is directly reachable, there can be no other shorter path to it. Terminate
return [start_coordinates, goal_coordinates], d
# IMPORTANT geometrical property of this problem: it is always shortest to directly reach a node
# instead of visiting other nodes first (there is never an advantage through reduced edge weight)
# -> when goal is directly reachable, there can be no other shorter path to it. Terminate
if start in visibles_goal:
d = vert_idx2dist[start]
return [start_coordinates, goal_coordinates], d

# add unidirectional edges to the temporary graph
# add edges in the direction: extremity (v) -> goal
graph.add_directed_edge(i, idx_goal, d)
# create temporary graph
# DirectedHeuristicGraph implements __deepcopy__() to not change the original precomputed self.graph
# but to still not create real copies of vertex instances!
graph = self.graph.copy()
# TODO avoid real copy to make make more performant
# graph = self.graph
# nr_edges_before = len(graph.edges)

idx_origin = idx_start
# add unidirectional edges in the direction: extremity (v) -> goal
for i in visibles_goal:
graph.add_edge(i, goal, weight=vert_idx2dist[i])

origin = start
# the visibility of only the graphs nodes have to be checked
# the goal node does not have to be considered, because of the earlier check
edge_idxs2check = set(range(nr_edges)) # new copy
vert_idx2repr, vert_idx2dist = get_repr_n_dists(idx_origin, coords)
candidate_idxs = {i for i in self.graph.get_all_nodes() if not vert_idx2dist[i] == 0.0}
visible_idxs = find_visible(
idx_origin,
candidate_idxs,
edge_idxs2check,
extremity_mask,
coords,
vertex_edge_idxs,
edge_vertex_idxs,
vert_idx2repr,
vert_idx2dist,
)
repr_n_dists = cmp_reps_n_distances(origin, coords)
self.reprs_n_distances[origin] = repr_n_dists
vert_idx2repr, vert_idx2dist = repr_n_dists
visibles_start = self.get_visible_idxs(origin, candidate_idxs, coords, vert_idx2repr, vert_idx2dist)

if len(visible_idxs) == 0:
if len(visibles_start) == 0:
# The start node does not have any neighbours. Hence there is no possible path to the goal.
return [], None

# add edges in the direction: start -> extremity
visibles_n_distances_map = {i: vert_idx2dist[i] for i in visible_idxs}
graph.add_multiple_directed_edges(idx_start, visibles_n_distances_map)

# Note: also here unnecessary edges in the graph could be deleted when start or goal lie
# Note: also here unnecessary edges in the graph could be deleted
# optimising the graph here however is more expensive than beneficial,
# as it is only being used for a single query
# as the graph is only being used for a single query
for i in visibles_start:
graph.add_edge(start, i, weight=vert_idx2dist[i])

def l2_distance(n1, n2):
return get_distance(n1, n2, self.reprs_n_distances)

# apply mapping to start and goal index as well
start_mapped = find_identical_single(start, graph.nodes, self.reprs_n_distances)
if start_mapped != start:
nx.relabel_nodes(graph, {start: start_mapped}, copy=False)

# ATTENTION: update to new coordinates
graph.coord_map = {i: coords[i] for i in graph.all_nodes}
graph.join_identical()
goal_mapped = find_identical_single(goal, graph.nodes, self.reprs_n_distances)
if goal_mapped != goal_mapped:
nx.relabel_nodes(graph, {goal: goal_mapped}, copy=False)

vertex_id_path, distance = graph.modified_a_star(idx_start, idx_goal, coords_goal)
self._idx_start_tmp, self._idx_goal_tmp = start_mapped, goal_mapped # for plotting

id_path = nx.astar_path(graph, start_mapped, goal_mapped, heuristic=l2_distance, weight="weight")

# clean up
# TODO re-use the same graph
# graph.remove_node(idx_start)
# graph.remove_node(idx_goal)
# TODO re-use the same graph. need to keep track of all merged edges
# if start_mapped == start:
# graph.remove_node(start)
# if goal_mapped==goal:
# graph.remove_node(goal)
# nr_edges_after = len(graph.edges)
# if not nr_edges_after == nr_edges_before:
# raise ValueError

if free_space_after:
del graph # free the memory

else:
self.temp_graph = graph

# extract the coordinates from the path
vertex_path = [tuple(coords[i]) for i in vertex_id_path]
return vertex_path, distance

# compute distance
distance = 0.0
v1 = id_path[0]
for v2 in id_path[1:]:
distance += l2_distance(v1, v2)
v1 = v2

if __name__ == "__main__":
# TODO command line support. read polygons and holes from .json files?
pass
# extract the coordinates from the path
path = [tuple(coords[i]) for i in id_path]
return path, distance

0 comments on commit 4575e1b

Please sign in to comment.