<a href="https://colab.research.google.com/github/kessingtonosazee/GCP_Project_1/blob/master/UCS_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
"""
AIP - 2324 - Week 6 - Uniform Cost Search
1. What's new?
1.1 A eight directional action model is added
1.2 How to calculate time and space complexity being added
1.3 Changed the names of a few variables to make them easier to understand, such as 'reso' to 'resolution', 'rr' to 'robot_radius', etc.

2. There is a trick here
2.1 When generate a node, see lines 170-173, we do not judge if the node is the goal
2.2 We only judge if the node is the goal when we are going to expand it, i.e., line 122, get it out of the queue, to check on if child nodes
2.2 The reason why we do this is because during the node generation stage, there could be more than one way leads to the goal, and we cannot just pick the first path
    that leads to the goal as the best path.

    However, when it comes to the expand stage, we know the shortest path to the goal already, so we can judge at the expand stage.
    This is due to lines 204-206 where weights being updated.

"""

import math
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import json
import tracemalloc
import time


show_animation = True

pause_time = 0.1


```python
class UCS:

    def __init__(self, ox, oy, resolution, robot_radius, model='8n'):
        """
        Initialize map for UCS (Uniform Cost Search) planning

        ox: x position list of Obstacles [m]
        oy: y position list of Obstacles [m]
        resolution: grid resolution [m]
        rr: robot radius [m]
        model: motion model ('4n' or '8n')
        """

        self.min_x = None
        self.min_y = None
        self.max_x = None
        self.max_y = None
        self.x_width = None
        self.y_width = None
        self.obstacle_map = None

        self.resolution = resolution
        self.robot_radius = robot_radius
        self.calc_obstacle_map(ox, oy)

        # Choose the motion model based on the specified parameter
        if model == '4n':
            self.motion = self.get_motion_model_4n()
        else:
            self.motion = self.get_motion_model_8n()   # This is the eight directional motion model

    def calc_obstacle_map(self, ox, oy):
        """
        Calculate the obstacle map based on obstacle coordinates

        ox: x position list of Obstacles [m]
        oy: y position list of Obstacles [m]
        """
        self.min_x = round(min(ox))
        self.min_y = round(min(oy))
        self.max_x = round(max(ox))
        self.max_y = round(max(oy))

        # Calculate the width of the map in terms of grid cells
        self.x_width = round((self.max_x - self.min_x) / self.resolution) + 1
        self.y_width = round((self.max_y - self.min_y) / self.resolution) + 1

        # Initialize the obstacle map
        self.obstacle_map = [[False for _ in range(self.y_width)] for _ in range(self.x_width)]

        # Mark grid cells containing obstacles as True
        for ix in range(self.x_width):
            x = self.calc_grid_position(ix, self.min_x)
            for iy in range(self.y_width):
                y = self.calc_grid_position(iy, self.min_y)
                for iox, ioy in zip(ox, oy):
                    d = math.hypot(iox - x, ioy - y)
                    if d < self.robot_radius:
                        self.obstacle_map[ix][iy] = True

    def get_motion_model_4n(self):
        """
        Get all possible 4-connectivity movements.
        :return: list of movements with cost [[dx, dy, cost]]
        """
        motion = [[1, 0, 1], [0, 1, 1], [-1, 0, 1], [0, -1, 1]]
        return motion

    def get_motion_model_8n(self):
        """
        Get all possible 8-connectivity movements.
        :return: list of movements with cost [[dx, dy, cost]]
        """
        motion = [[1, 0, 1], [0, 1, 1], [-1, 0, 1], [0, -1, 1],
                  [-1, -1, math.sqrt(2)], [1, -1, math.sqrt(2)], [-1, 1, math.sqrt(2)], [1, 1, math.sqrt(2)]]
        return motion

    def calc_grid_position(self, index, minp):
        """
        Calculate the position in meters based on the grid index.

        index: grid index
        minp: minimum position in meters
        return: position in meters
        """
        pos = index * self.resolution + minp
        return pos
```

This code block includes documentation for the `UCS` class. It initializes the map
for UCS planning, calculates the obstacle map based on obstacle coordinates, and
provides options for different motion models (`4n` or `8n`). The class also
includes methods to calculate the obstacle map, get motion models, and calculate grid positions.

class Node:
    def __init__(self, x, y, cost, parent_index):
        """
        Initialize a Node for path planning.

        x: index of grid (x-coordinate)
        y: index of grid (y-coordinate)
        cost: cost associated with reaching this node
        parent_index: index of the parent Node in the path
        """
        self.x = x
        self.y = y
        self.cost = cost
        self.parent_index = parent_index

    def __str__(self):
        """
        Return a string representation of the Node.

        Format: "x,y,cost,parent_index"
        """
        return str(self.x) + "," + str(self.y) + "," + str(self.cost) + "," + str(self.parent_index)

def planning(self, sx, sy, gx, gy):
    """
    Dijkstra path search.

    Inputs:
        sx: start x position [m]
        sy: start y position [m]
        gx: goal x position [m]
        gy: goal y position [m]

    Outputs:
        rx: x position list of the final path
        ry: y position list of the final path
    """

    # Create start and goal nodes
    start_node = self.Node(self.calc_xy_index(sx, self.min_x),
                           self.calc_xy_index(sy, self.min_y), 0.0, -1)
    goal_node = self.Node(self.calc_xy_index(gx, self.min_x),
                          self.calc_xy_index(gy, self.min_y), 0.0, -1)

    # Visualize start and goal nodes
    plt.scatter(start_node.x, start_node.y, s=400, c='red', marker='s')
    plt.scatter(goal_node.x, goal_node.y, s=400, c='green', marker='s')

    # Initialize open and closed sets
    open_set, closed_set = dict(), dict()
    open_set[self.calc_grid_index(start_node)] = start_node

    while 1:
    # Select the node with the minimum cost from the open set
    c_id = min(open_set, key=lambda o: open_set[o].cost)
    current = open_set[c_id]

    # Show the current node on the graph
    if show_animation:
        plt.plot(self.calc_grid_position(current.x, self.min_x),
                 self.calc_grid_position(current.y, self.min_y), "xc")
        plt.scatter(self.calc_grid_position(current.x, self.min_x),
                    self.calc_grid_position(current.y, self.min_y),
                    s=300, c='yellow', marker='s')
        plt.pause(pause_time)

        # For stopping simulation with the esc key.
        plt.gcf().canvas.mpl_connect(
            'key_release_event',
            lambda event: [exit(0) if event.key == 'escape' else None])

        # Pause for a short time every 10 iterations to aid observation
        if len(closed_set.keys()) % 10 == 0:
            plt.pause(0.001)

```python
# Goal test
if current.x == goal_node.x and current.y == goal_node.y:
    print("Find goal")

    # Visualize the goal node
    plt.scatter(current.x,
                current.y,
                s=100, c='green', marker='s')
    plt.pause(pause_time)

    # Generate and visualize the final path
    locx, locy = self.calc_final_path(current, closed_set)
    if len(locx) > 2:
        for i in range(len(locx) - 1):
            px = (locx[i], locx[i+1])
            py = (locy[i], locy[i+1])
            plt.plot(px, py, "-m", linewidth=4)
            plt.pause(pause_time)

    # Update goal node information
    goal_node.parent_index = current.parent_index
    goal_node.cost = current.cost
    print("cost:", goal_node.cost)

    # Exit the loop as the goal is reached
    break
```

This code block documents the goal test section within the UCS planning algorithm.
When the current node reaches the goal node, it prints a message, visualizes
the goal node, generates the final path, updates the goal node information,
and exits the loop.

# Remove the item from the open set
# Remember previously we were using pop, which will delete the node automatically.
# But here, the node is neither at the head nor at the end, therefore, we need to delete it explicitly
del open_set[c_id]

# Add the current node to the closed set
closed_set[c_id] = current

# Visualize the visited cells in red
plt.scatter(current.x,
            current.y,
            s=100, c='red', marker='s')
plt.pause(pause_time)

# Generate and visualize the partial path
locx, locy = self.calc_final_path(current, closed_set)
if len(locx) >= 2:
    for i in range(len(locx) - 1):
        px = (locx[i], locx[i+1])
        py = (locy[i], locy[i+1])
        plt.plot(px, py, "-m", linewidth=4)
        plt.pause(pause_time)


```python
# Generate new node based on the motion model
# Note we do not judge if a new node is a goal when it is generated
for move_x, move_y, move_cost in self.motion:
    node = self.Node(current.x + move_x,
                     current.y + move_y,
                     current.cost + move_cost, c_id)  # This is how new nodes are generated
```

This code block documents the generation of new nodes based on the specified motion model.
The loop iterates over the possible moves defined in the motion model,
and a new node is created for each move. The new node's position and cost are
updated based on the current node and the motion model.


#####################################################################
# Node Expansion and Exploration

n_id = self.calc_grid_index(node)

# This indicates the agent is using the motion model to look around, to see which grid it can go next
plt.scatter(node.x,
            node.y,
            s=100, c='yellow', marker='s')
plt.pause(pause_time)

if n_id in closed_set:
    # We mark cells that are visited already in red
    plt.scatter(node.x,
                node.y,
                s=100, c='red', marker='s')
    plt.pause(pause_time)
    continue

if not self.verify_node(node):
    plt.scatter(node.x,
                node.y,
                s=100, c='black', marker='s')
    plt.pause(pause_time)
    continue

if n_id not in open_set:   # Come to this stage, the conditions should be 'node with index n_id in neither in closed_set nor in open_set', why?
    open_set[n_id] = node  # Discover a new node
    plt.scatter(node.x,
                node.y,
                s=300, c='blue', marker='s')
    plt.pause(pause_time)
else:
    if open_set[n_id].cost > node.cost:   # Come to this stage, the conditions should be 'node with index n_id is not in the closed_set but in the open_set'
        # This path is the best until now. record it!
        open_set[n_id] = node

        plt.scatter(node.x,
                    node.y,
                    s=300, c='green', marker='s')
        plt.pause(pause_time)

```python
def calc_final_path(self, goal_node, closed_set):
    # generate final course
    rx, ry = [self.calc_grid_position(goal_node.x, self.min_x)], [
        self.calc_grid_position(goal_node.y, self.min_y)]
    parent_index = goal_node.parent_index
    while parent_index != -1:
        n = closed_set[parent_index]
        rx.append(self.calc_grid_position(n.x, self.min_x))
        ry.append(self.calc_grid_position(n.y, self.min_y))
        parent_index = n.parent_index

    return rx, ry

##get coordinates from index in grid system
def calc_grid_position(self, index, minp):
    pos = index * self.resolution + minp
    return pos

##get index from coordinates based on x or y
## use in determining start node
def calc_xy_index(self, position, minp):
    return round((position - minp) / self.resolution)

##get index from coordinates based on x and y
def calc_grid_index(self, node):
    return (node.y - self.min_y) * self.x_width + (node.x - self.min_x)

##verify node if it safe or not when exploring
def verify_node(self, node):
    px = self.calc_grid_position(node.x, self.min_x)
    py = self.calc_grid_position(node.y, self.min_y)

    if px < self.min_x:
        return False
    if py < self.min_y:
        return False
    if px >= self.max_x:
        return False
    if py >= self.max_y:
        return False

    if self.obstacle_map[node.x][node.y]:
        return False

    return True

##generate obstacles
def calc_obstacle_map(self, ox, oy):
    self.min_x = round(min(ox))
    self.min_y = round(min(oy))
    self.max_x = round(max(ox))
    self.max_y = round(max(oy))
```

This code block defines several utility methods in a class. These methods are responsible for
calculating the final path, grid position, XY index, grid index, and verifying
whether a node is safe for exploration. Additionally, there is a method for generating obstacle maps
 based on given obstacle coordinates.

 ############################### modified #####################################
self.x_width = round((self.max_x - self.min_x) / self.resolution) + 1
self.y_width = round((self.max_y - self.min_y) / self.resolution) + 1
#############


# obstacle map generation
self.obstacle_map = [[False for _ in range(self.y_width)]  # Create a 2D list with y_width columns
                    for _ in range(self.x_width)]  # and x_width rows

# Iterate over x coordinates in the map
for ix in range(self.x_width):
    x = self.calc_grid_position(ix, self.min_x)  # Calculate the world coordinate corresponding to the x index
    # Iterate over y coordinates in the map
    for iy in range(self.y_width):
        y = self.calc_grid_position(iy, self.min_y)  # Calculate the world coordinate corresponding to the y index
        # Iterate over obstacle coordinates
        for iox, ioy in zip(ox, oy):
            # Calculate the Euclidean distance between the cell and the obstacle
            d = math.hypot(iox - x, ioy - y)

```python
############################### modified #####################################
# Check if the distance between the obstacle and the current grid position is less than the robot's radius
if d < self.robot_radius:  # Be careful with this line, you can change '<' to '<=' to see the differences of new_array defined later!
    # Mark the corresponding grid position as an obstacle
    self.obstacle_map[ix][iy] = True
    # break
else:
    # If the distance is greater than the robot's radius, continue to the next iteration
    continue

# Create a new array with the same shape as the obstacle_map
new_array = np.zeros(np.shape(self.obstacle_map))
for i in range(new_array.shape[0]):
    for j in range(new_array.shape[1]):
        # If the grid position is marked as an obstacle, set the corresponding value in the new_array to 1
        if self.obstacle_map[i][j] == True:
            new_array[i][j] = 1
# Display the transposed new_array
print(new_array.T)
############################### modified #####################################


@staticmethod
def get_motion_model_8n():
    # Define possible 8-connectivity movements with their corresponding costs
    motion =[
        [-1, 0, 1],
        [-1, 1, math.sqrt(2)],
        [0, 1, 1],
        [1, 1, math.sqrt(2)],
        [1, 0, 1],
        [1, -1, math.sqrt(2)],
        [0, -1, 1],
        [-1, -1, math.sqrt(2)]
    ]
    return motion

@staticmethod
def get_motion_model_4n():
    """
    Get all possible 4-connectivity movements.
    :return: list of movements with cost [[dx, dy, cost]]
    """
    # Define possible 4-connectivity movements with their corresponding costs
    motion = [[1, 0, 1],
              [0, 1, 1],
              [-1, 0, 1],
              [0, -1, 1]]
    return motion
```


```python
# def main():
##start to check storage memory
##staty to calculate time
# Start tracking memory usage
tracemalloc.start()

# Record the start time
start = time.time()
print("start time" )

# Obstacle and free grid positions
ox, oy = [], [] #obstacle
fx, fy = [], [] #free

# Configuration file path
config_file = '/home/users/ogbeideo/Downloads/week-6 lab/Week6-Lab-20231110/MapConfig/config9x9.json'

# Read configuration parameters from JSON file
with open(config_file) as config_env:
    param = json.load(config_env)

# Extract parameters
sx = param['sx']  # [m]
sy = param['sy']   # [m]
gx = param['gx']   # [m]
gy = param['gy']   # [m]
grid_size = param['grid_size']  # [m]
robot_radius = param['robot_radius']   # [m]
map_xlsx = param['map_xlsx']
fig_dim = param['fig_dim']

# Use pandas to read the map from .xlsx file
gmap = pd.read_excel(map_xlsx, header=None)
data = gmap.to_numpy()

# Up side down: origin of image is up left corner, while origin of figure is bottom left corner
data = data[::-1]

# Generate obstacles
for iy in range(data.shape[0]):
    for ix in range(data.shape[1]):
        if data[iy, ix] == 1:
            ox.append(ix)
            oy.append(iy)
        else:
            fx.append(ix)
            fy.append(iy)

# Display the map if animation is enabled
if show_animation:
    plt.figure(figsize=(fig_dim ,fig_dim))
    plt.xlim(-1, data.shape[1])
    plt.ylim(-1, data.shape[0])
    plt.xticks(np.arange(0,data.shape[1],1))
    plt.yticks(np.arange(0,data.shape[0],1))
    plt.grid(True)
    plt.scatter(ox, oy, s=300, c='gray', marker='s')
    plt.scatter(fx, fy, s=300, c='cyan', marker='s')

# Record the start time for planning
start = time.time()
print("start time" )

# Initialize UCS planner with obstacle coordinates and grid parameters
ucs = UCS(ox, oy, grid_size, robot_radius)

# Plan a path from start to goal
rx, ry = ucs.planning(sx, sy, gx, gy)

# Record the end time for planning
end = time.time()
print("time used", end - start)

##reverse because the path is generated from goal to start
rx.reverse()
ry.reverse()

# Convert the path coordinates to integers
new_rx = map(int, rx)
new_ry = map(int, ry)

# Create a list of coordinates representing the path
road = [list(a) for a in zip(new_rx, new_ry)]

##output path
print("road from start to goal", road)

##calculate time to run
print("time end to calculate time")

# Get the current and peak memory usage
current, peak = tracemalloc.get_traced_memory()
print(f"Current memory usage is {current / 10**6}MB; Peak was {peak / 10**6}MB")

# Stop tracking memory usage
tracemalloc.stop()

# Display the path if animation is enabled
if show_animation:
    for i in range(len(rx)-1):
        px = (rx[i], rx[i+1])
        py = (ry[i], ry[i+1])
        plt.plot(px, py, "-k")
        plt.pause(0.1)
    plt.show()

# main()
```