In [None]:
import math
import heapq
import time
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
X = 0
Y = 1

Implementation of A* search:

In [None]:
# Define the Cell class
class Cell:
    def __init__(self):
        self.parentnode1 = 0
        self.parentnode2 = 0
        self.f = float('inf')
        self.g = float('inf')
        self.h = 0

def ingrid(s0, s1, row, column):
    cellcolumn = s0 #grid.shape[1]
    cellrow = s1 #grid.shape[0]    
    return (row >= 0) and (row < cellrow) and (column >= 0) and (column < cellcolumn)

def unblocked(grid, row, column):
    return grid[row][column] == 0

def destination(row, column, dest):
    return row == dest[0] and column == dest[1]

# Calculate the heuristic value
def calculate_h_value(row, column, dest):
    return ((row - dest[0]) ** 2 + (column - dest[1]) ** 2) ** 0.5

# Site path from source to destination
def bestpath(cell_details, dest):
    path = []
    row = dest[0]
    column = dest[1]


    # Trace path using parent cells to define new rows
    while not (cell_details[row][column].parentnode1 == row and cell_details[row][column].parentnode2 == column):
        path.append((row, column))
        newrow = cell_details[row][column].parentnode1
        newcol = cell_details[row][column].parentnode2
        row = newrow
        column = newcol

    # Add the source cell to the path
    path.append((row, column))
    path.reverse()

    return path

def a_star(grid, src, dest, s0, s1):

    #if not ingrid(grid, src[0], src[1]) or not ingrid(grid, dest[0], dest[1]):
    #    print("Source or destination is invalid")
    #    return

    #if not unblocked(grid, src[0], src[1]) or not unblocked(grid, dest[0], dest[1]):
    #    print("Source or the destination is blocked")
    #    return
    
    cellcolumn = s0 #grid.shape[1]
    cellrow = s1 #grid.shape[0] 

    # Initialize the closed list (visited cells)
    closedcells = [[False for _ in range(cellcolumn)] for _ in range(cellrow)]
    # Initialize the details of each cell
    cell_details = [[Cell() for _ in range(cellcolumn)] for _ in range(cellrow)]

    # Initialize the source cell
    a = src[0]
    b = src[1]
    cell_details[a][b].f = 0
    cell_details[a][b].g = 0
    cell_details[a][b].h = 0
    cell_details[a][b].parentnode1 = a
    cell_details[a][b].parentnode2 = b

    # List open cells list from source and flag for a gound destination
    opencells = []
    heapq.heappush(opencells, (0.0, a, b))
    destinationfound = False

    # Main loop of A* search algorithm
    while len(opencells) > 0:

        usednode = heapq.heappop(opencells)

        # Mark the cell as used
        a = usednode[1]
        b = usednode[2]
        closedcells[a][b] = True

        # For each direction, check possibilities
        directions = [(0, 1), (0, -1), (1, 0), (-1, 0), (1, 1), (1, -1), (-1, 1), (-1, -1)]
        for dir in directions:
            tempcell1 = a + dir[0]
            tempcell2 = b + dir[1]

            # If the new cell is in the grid, unblocked, and not visited and is the destination
            if ingrid(s0, s1, tempcell1, tempcell2) and unblocked(grid, tempcell1, tempcell2) and not closedcells[tempcell1][tempcell2]:
                if destination(tempcell1, tempcell2, dest):
                    # Set the parent of the destination cell and print path
                    cell_details[tempcell1][tempcell2].parentnode1 = a
                    cell_details[tempcell1][tempcell2].parentnode2 = b
                    #print("Path to destination is available!")
                    path_found = bestpath(cell_details, dest)

                    return path_found
                else:
                    # Calculate the new f, g, and h values
                    newg = cell_details[a][b].g + 1.0
                    newh = calculate_h_value(tempcell1, tempcell2, dest)
                    newf = newg + newh

                    # If the cell is not in the open list or the new f value is smaller update its details
                    if cell_details[tempcell1][tempcell2].f == float('inf') or cell_details[tempcell1][tempcell2].f > newf:
                        heapq.heappush(opencells, (newf, tempcell1, tempcell2))
                        cell_details[tempcell1][tempcell2].f = newf
                        cell_details[tempcell1][tempcell2].g = newg
                        cell_details[tempcell1][tempcell2].h = newh
                        cell_details[tempcell1][tempcell2].parentnode1 = a
                        cell_details[tempcell1][tempcell2].parentnode2 = b


    if not destinationfound:
        print("Path unavailabe, due to blockages.")

Implementation of Theta* search:

In [None]:
def heuristic(a, b):
    return math.sqrt((a[0] - b[0]) ** 2 + (a[1] - b[1]) ** 2)


def line_of_sight(s, s_prime, grid):
    x0, y0 = s
    x1, y1 = s_prime
    dx = abs(x1 - x0)
    dy = abs(y1 - y0)
    sx = 1 if x0 < x1 else -1
    sy = 1 if y0 < y1 else -1
    err = dx - dy
    while (x0, y0) != (x1, y1):
        if grid.iloc[x0, y0] == 1:
            return False
        e2 = err * 2
        if e2 > -dy:
            err -= dy
            x0 += sx
        if e2 < dx:
            err += dx
            y0 += sy
    return True

def theta_star(grid, start, goal):
    g_score = {}
    parent = {}

    g_score[start] = 0
    parent[start] = start

    open = {}
    open[start] = g_score[start] + heuristic(start, goal)
    
    closed = []
    while len(open) > 0:
        open = {k: v for k, v in reversed(sorted(open.items(), key=lambda item: item[1]))}
        
        current = open.popitem()[0]

        if current == goal:
            path = []
            while current != start:
                path.append(current)
                current = parent[current]
            path.append(start)
            return list(reversed(path))
        closed.append(current)

        for dx, dy in [(-1, -1), (-1, 0), (-1, 1), (0, -1), (0, 1), (1, -1), (1, 0), (1, 1)]:
            neighbor = (current[0] + dx, current[1] + dy)
            if 0 <= neighbor[0] < grid.shape[0] and 0 <= neighbor[1] < grid.shape[1] and grid.iloc[neighbor[0], neighbor[1]] != 1:
                if neighbor not in closed:
                    if neighbor not in open:
                        g_score[neighbor] = math.inf
                        parent[neighbor] = None
                    if line_of_sight(parent[current], neighbor, grid):
                        if g_score[parent[current]] + heuristic(parent[current], neighbor) < g_score[neighbor]:
                            g_score[neighbor] = g_score[parent[current]] + heuristic(parent[current], neighbor)
                            parent[neighbor] = parent[current]
                            if neighbor in open:
                                open.pop(neighbor)
                            open[neighbor] = g_score[neighbor] + heuristic(neighbor, goal)
                    else:
                        if g_score[current] + heuristic(current, neighbor) < g_score[neighbor]:
                            g_score[neighbor] = g_score[current] + heuristic(current, neighbor)
                            parent[neighbor] = current
                            if neighbor in open:
                                open.pop(neighbor)
                            open[neighbor] = g_score[neighbor] + heuristic(neighbor, goal)

    print(len(open))

Implementation of the shortest path for the Dubin's car:

In [None]:
# Utilities for working with 2D vectors:
def perp(v):
    return np.array([v[X] * -1.0, v[Y]])

def magnitude(v):
    return math.sqrt((v[X] * v[X]) + (v[Y] * v[Y]))

def distance(v1, v2):
    return magnitude(v1 - v2)

def cross2d(x, y):
    return x[..., 0] * y[..., 1] - x[..., 1] * y[..., 0]

In [None]:
# The representation of a Dubin's car:
class Waypoint:
    def __init__(self, pos, heading, r_min):
        self.pos = pos
        self.heading = heading
        self.r_min = r_min

    def left(self):
        return self.pos + perp(self.heading) * self.r_min * -1.0

    def right(self):
        return self.pos + perp(self.heading) * self.r_min

In [None]:
# Some functions which are useful in calculating LSR, LSL, RSL, and RSR:
def directional_arc(center, rad, p0, p1, dir):
    v0 = p0 - center
    v1 = p1 - center
    theta = np.arctan2(v0[Y], v0[X]) - np.arctan2(v1[Y], v1[X])
    if theta < 0.0 and dir == 0:
        theta = theta + 2.0 * math.pi
    elif theta > 0.0 and dir == 1:
        theta = theta - 2.0 * math.pi
    return abs(theta * rad)

def external_tangent(c0_pos, c0_rad, c1_pos, c1_rad):
    gamma = -1.0 * np.arctan2(c1_pos[Y] - c0_pos[Y], c1_pos[X] - c0_pos[X])
    beta = np.arcsin((c1_rad - c0_rad) / distance(c0_pos, c1_pos))
    alpha = gamma - beta

    p0 = np.array([c0_pos[X] + c0_rad * np.sin(alpha), c0_pos[Y] + c0_rad * np.cos(alpha)])
    p1 = np.array([c1_pos[X] + c1_rad * np.sin(alpha), c1_pos[Y] + c1_rad * np.cos(alpha)])
    p2 = np.array([c0_pos[X] - c0_rad * np.sin(alpha), c0_pos[Y] - c0_rad * np.cos(alpha)])
    p3 = np.array([c1_pos[X] - c1_rad * np.sin(alpha), c1_pos[Y] - c1_rad * np.cos(alpha)])
    return (p0, p1, p2, p3)

def internal_tangent(c0_pos, c0_rad, c1_pos, c1_rad):
    hypotenuse = magnitude(c1_pos - c0_pos)
    short = c0_rad + c1_rad

    phi = np.arctan2(c1_pos[Y] - c0_pos[Y], c1_pos[X] - c0_pos[X]) + np.arcsin(short / hypotenuse) - (math.pi / 2.0)
    p0 = np.array([c0_pos[X] + c0_rad * math.cos(phi), c0_pos[Y] + c0_rad * math.sin(phi)])
    p1 = np.array([c1_pos[X] + c1_rad * math.cos(phi + math.pi), c1_pos[Y] + c1_rad * math.sin(phi + math.pi)])

    phi = np.arctan2(c1_pos[Y] - c0_pos[Y], c1_pos[X] - c0_pos[X]) - np.arcsin(short / hypotenuse) + (math.pi / 2.0)
    p2 = np.array([c0_pos[X] + c0_rad * math.cos(phi), c0_pos[Y] + c0_rad * math.sin(phi)])
    p3 = np.array([c1_pos[X] + c1_rad * math.cos(phi + math.pi), c1_pos[Y] + c1_rad * math.sin(phi + math.pi)])

    return (p0, p1, p2, p3)

In [None]:
# Methods for calculating LSR, LSL, RSL, and RSR:
def lsr(start, end):
    start_left = start.left()
    end_right = end.right()
    p = internal_tangent(start_left, start.r_min, end_right, end.r_min)
    arc0_len = directional_arc(start_left, start.r_min, start.pos, p[2], 0)
    arc1_len = directional_arc(end_right, end.r_min, p[3], end.pos, 1)
    return distance(p[2], p[3]) + arc0_len + arc1_len

def lsl(start, end):
    start_left = start.left()
    end_left = end.left()
    p = external_tangent(start_left, start.r_min, end_left, end.r_min)
    arc0_len = directional_arc(start_left, start.r_min, start.pos, p[2], 0)
    arc1_len = directional_arc(end_left, end.r_min, p[3], end.pos, 0)
    return distance(p[2], p[3]) + arc0_len + arc1_len

def rsl(start, end):
    start_right = start.right()
    end_left = end.left()
    p = internal_tangent(start_right, start.r_min, end_left, end.r_min)
    arc0_len = directional_arc(start_right, start.r_min, start.pos, p[0], 1)
    arc1_len = directional_arc(end_left, end.r_min, p[1], end.pos, 0)
    return distance(p[0], p[1]) + arc0_len + arc1_len

def rsr(start, end):
    start_right = start.right()
    end_right = end.right()
    p = external_tangent(start_right, start.r_min, end_right, end.r_min)
    arc0_len = directional_arc(start_right, start.r_min, start.pos, p[2], 1)
    arc1_len = directional_arc(end_right, end.r_min, p[3], end.pos, 1)
    return distance(p[2], p[3]) + arc0_len + arc1_len

def shortest_word(start, end):
    lsr_len = lsr(start, end)
    lsl_len = lsl(start, end)
    rsl_len = rsl(start, end)
    rsr_len = rsr(start, end)
    if lsr_len < lsl_len and lsr_len < rsl_len and lsr_len < rsr_len:
        return lsr_len
    elif lsl_len < lsr_len and lsl_len < rsl_len and lsl_len < rsr_len:
        return lsl_len
    elif rsl_len < lsr_len and rsl_len < lsl_len and rsl_len < rsr_len:
        return rsl_len
    else:  #rsr_len < lsr_len and rsr_len < lsl_len and rsr_len < rsl_len:
        return rsr_len
    
def waypoints_from_path(path):
    p_len = len(path)
    waypoints = list()
    start_waypoint = Waypoint(np.array([path[0][X], path[0][Y]]), np.array([0.0, 1.0]), 1.20)
    waypoints.append(start_waypoint)
    for i in range(2, p_len):
        v_current = np.array([path[i - 1][X], path[i - 1][Y]])
        v_previous = np.array([path[i - 2][X], path[i - 2][Y]]) - v_current
        v_next = np.array([path[i][X], path[i][Y]]) - v_current
        heading = v_previous - v_next
        waypoint = Waypoint(v_current, heading, 1.20)
        waypoints.append(waypoint)
    end_waypoint = Waypoint(np.array([path[p_len - 1][X], path[p_len - 1][Y]]), np.array([0.0, 1.0]), 1.20)
    waypoints.append(end_waypoint)
    return waypoints

def waypoints_length(waypoints):
    length = 0.0
    for i in range(1, len(waypoints)):
        start = waypoints[i - 1]
        end = waypoints[i]
        length += shortest_word(start, end)
    return length

Some utility functions:

In [None]:

def plot_path(grid, path, start, goal):
    grid_numeric = grid.apply(pd.to_numeric, errors='coerce').fillna(1).astype(int)
    fig, ax = plt.subplots()
    ax.imshow(grid_numeric, cmap=plt.cm.binary)

    for (x, y) in path:
        grid_numeric.iloc[x, y] = 2

    ax.scatter(start[1], start[0], marker='o', color='green')
    ax.scatter(goal[1], goal[0], marker='x', color='red')
    ax.plot([y for x, y in path], [x for x, y in path], color='blue', linewidth=0.6)

    plt.show()

def get_start_goal(grid):
    start = (0, 0)
    goal = (0, 0)

    rows = grid.shape[0]
    cols = grid.shape[1]
    for i in range(10, rows - 1):
        for j in range(10, cols - 1):
            if grid.iloc[i, j] == 0:
                start = (j, i)
                break
        else:
            continue
        break

    for i in range(rows - 10, 0, -1):
        for j in range(cols - 10, 0, -1):
            if grid.iloc[i, j] == 0:
                goal = (j, i)
                break
        else:
            continue
        break
    return (start, goal)

def path_length(path):
    length = 0
    for i in range(1, len(path)):
        previous = np.array([path[i - 1][X], path[i - 1][Y]])
        current = np.array([path[i][X], path[i][Y]])
        length += distance(previous, current)
    return length

In [None]:
grid = pd.read_csv('grid.csv')
(start, goal) = get_start_goal(grid)

a_star_time = time.time()
path0 = a_star(grid.values.tolist(), start, goal, grid.shape[0], grid.shape[1])
a_star_time = time.time() - a_star_time

theta_star_time = time.time()
path1 = theta_star(grid, start, goal)
theta_star_time = time.time() - theta_star_time

print(f"A* completetion time: {a_star_time}")
print(f"A* path length: {path_length(path0)}")
plot_path(grid, path0, start, goal)
print(f"Theta* completetion time: {theta_star_time}")
print(f"Theta* path length: {path_length(path1)}")
plot_path(grid, path1, start, goal)

In [None]:
test_data = []
for i in range(0, 50):
    file_name = f"./maps_103x103/grid_{i}.csv"
    grid = pd.read_csv(file_name)

    (start, goal) = get_start_goal(grid)

    a_star_time = time.time()
    a_path = a_star(grid.values.tolist(), start, goal, grid.shape[0], grid.shape[1])
    a_star_time = time.time() - a_star_time

    theta_star_time = time.time()
    theta_path = theta_star(grid, start, goal)
    theta_star_time = time.time() - theta_star_time

    theta_star_smoothed_time = time.time()
    waypoints = waypoints_from_path(theta_path)
    theta_star_smoothed_time = time.time() - theta_star_smoothed_time + theta_star_time

    a_path_len = path_length(a_path)
    theta_path_len = path_length(theta_path)
    theta_path_smoothed_len = waypoints_length(waypoints)

    test_data.append((a_star_time, a_path_len, theta_star_time, theta_path_len, theta_star_smoothed_time, theta_path_smoothed_len))

In [None]:
frame = pd.DataFrame.from_records(test_data, columns=["A* Time", "A* Length", "Theta* Time", "Theta* Length", "Theta* Smoothed Time", "Theta* Smoothed Length"])
frame.to_csv('./test_results_103x103.csv')