In [1]:
import heapq
import sys
import numpy as np
import random
import pygame
import math

pygame 2.4.0 (SDL 2.26.4, Python 3.9.13)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [2]:
#'xnym' -> [n,m]
def stateNameToCoords(name):
    return [int(name.split('x')[1].split('y')[0]), int(name.split('x')[1].split('y')[1])]

def topKey(queue):
    #queue.sort()
    # print(queue)
    if len(queue) > 0:
        return queue[0][:2]
    else:
        # print('empty queue!')
        return (float('inf'), float('inf'))

def heuristic_from_s(Graph, id, s):
    x_distance = abs(int(id.split('x')[1][0]) - int(s.split('x')[1][0])) ##int(id.split('x')[1][0]) - int(s.split('x')[1][0])**2
    y_distance = abs(int(id.split('y')[1][0]) - int(s.split('y')[1][0])) ##int(id.split('y')[1][0]) - int(s.split('y')[1][0])**2
    return x_distance + y_distance  ##np.sqrt(x_distance + y_distance)

def calculateKey(Graph, id, s_current, k_m):
    return (min(Graph.graph[id].g, Graph.graph[id].rhs) + heuristic_from_s(Graph, id, s_current) + k_m 
            + alpha * Graph.cells[stateNameToCoords(id)[1]][stateNameToCoords(id)[0]]
            + alpha * Graph.topography[stateNameToCoords(id)[1]][stateNameToCoords(id)[0]], 
            min(Graph.graph[id].g, Graph.graph[id].rhs)
            + alpha * Graph.cells[stateNameToCoords(id)[1]][stateNameToCoords(id)[0]]
            + alpha * Graph.topography[stateNameToCoords(id)[1]][stateNameToCoords(id)[0]]) 


def updateVertex(Graph, queue, id, s_current, k_m):
    #   s_goal = Graph.goal
    if id != Graph.goal:
        min_rhs = float('inf')
        for i in Graph.graph[id].children:
            min_rhs = min(min_rhs, Graph.graph[i].g + Graph.graph[id].children[i])
        Graph.graph[id].rhs = min_rhs
        
    id_in_queue = [item for item in queue if id in item]
    if id_in_queue != []:
        if len(id_in_queue) != 1:
            raise ValueError('more than one ' + id + ' in the queue!')
        queue.remove(id_in_queue[0])
    if Graph.graph[id].rhs != Graph.graph[id].g:
        heapq.heappush(queue, calculateKey(Graph, id, s_current, k_m) + (id,))


def computeShortestPath(Graph, queue, s_start, k_m):
    while (Graph.graph[s_start].rhs != Graph.graph[s_start].g) or (topKey(queue) < calculateKey(Graph, s_start, s_start, k_m)):
        k_old = topKey(queue)
        u = heapq.heappop(queue)[2]
        
        if k_old < calculateKey(Graph, u, s_start, k_m):
            heapq.heappush(queue, calculateKey(Graph, u, s_start, k_m) + (u,))
        elif Graph.graph[u].g > Graph.graph[u].rhs:
            Graph.graph[u].g = Graph.graph[u].rhs
            for i in Graph.graph[u].parents:
                updateVertex(Graph, queue, i, s_start, k_m)
        else:
            Graph.graph[u].g = float('inf')
            updateVertex(Graph, queue, u, s_start, k_m)
            for i in Graph.graph[u].parents:
                updateVertex(Graph, queue, i, s_start, k_m)

def nextInShortestPath(Graph, s_current):
    min_rhs = float('inf')
    s_next = None
    if Graph.graph[s_current].rhs == float('inf'):
        #print('You are done stuck')
        #sys.exit()
        return s_current
    else:
        for i in Graph.graph[s_current].children:
            # print(i)
            child_cost = Graph.graph[i].g + Graph.graph[s_current].children[i]
            # print(child_cost)
            if (child_cost) < min_rhs:
                min_rhs = child_cost
                s_next = i
        if s_next:
            return s_next
        else:
            #raise ValueError('could not find child for transition!')
            return s_current

def scanForObstacles(Graph, queue, s_current, k_m):
    states_to_update = {}
    
    for neighbor in Graph.graph[s_current].children:
        neighbor_coords = stateNameToCoords(neighbor)
        states_to_update[neighbor] = Graph.cells[neighbor_coords[1]][neighbor_coords[0]]
        
    new_obstacle = False
    
    for state in states_to_update:
        if states_to_update[state] < 0:  # found cell with obstacle
            # print('found obstacle in ', state)
            for neighbor in Graph.graph[state].children:
                # first time to observe this obstacle where one wasn't before
                if(Graph.graph[state].children[neighbor] != float('inf')):
                    neighbor_coords = stateNameToCoords(state)
                    Graph.cells[neighbor_coords[1]][neighbor_coords[0]] = -2
                    Graph.graph[neighbor].children[state] = float('inf')
                    Graph.graph[state].children[neighbor] = float('inf')
                    updateVertex(Graph, queue, state, s_current, k_m)
                    new_obstacle = True
    return new_obstacle



def Move(Graph, s_current):
    if(s_current == Graph.goal):
        return 'goal'
    else:
        s_new = nextInShortestPath(Graph, s_current)
        new_coords = stateNameToCoords(s_new)
        if(Graph.cells[new_coords[1]][new_coords[0]] == -1):  # just ran into new obstacle
            s_new = s_current  # need to hold tight and scan/replan first
        
        return s_new
        
def Rescan(Graph, queue, s_current,k_m):
    ## step안에 들어왔을 때 실행
    results = scanForObstacles(Graph, queue, s_current, k_m)
    k_m += heuristic_from_s(Graph, s_current, Graph.goal)
    #s_last = s_current
    computeShortestPath(Graph, queue, s_current, k_m)

    return k_m


def initDStarLite(Graph, queue, s_start, s_goal, k_m):
    Graph.graph[s_goal].rhs = 0
    heapq.heappush(queue, calculateKey(Graph, s_goal, s_start, k_m) + (s_goal,))
    #print(queue[0])
    computeShortestPath(Graph, queue, s_start, k_m)
    return (Graph, queue, k_m)

In [3]:
class Node:
    def __init__(self, id):
        self.id = id
        # dictionary of parent node ID's
        # key = id of parent
        # value = (edge cost,)
        self.parents = {}
        # dictionary of children node ID's
        # key = id of child
        # value = (edge cost,)
        self.children = {}
        # g approximation
        self.g = float('inf')
        # rhs value
        self.rhs = float('inf')

    def __str__(self):
        return 'Node: ' + self.id + ' g: ' + str(self.g) + ' rhs: ' + str(self.rhs)

    def __repr__(self):
        return self.__str__()

    def update_parents(self, parents):
        self.parents = parents


class Graph:
    def __init__(self):
        self.graph = {}

    def __str__(self):
        msg = 'Graph:'
        for i in self.graph:
            msg += '\n  node: ' + i + ' g: ' + str(self.graph[i].g) + ' rhs: ' + str(self.graph[i].rhs)
        return msg

    def __repr__(self):
        return self.__str__()

    def setStart(self, id):
        if(self.graph[id]):
            self.start = id
        else:
            raise ValueError('start id not in graph')

    def setGoal(self, id):
        if(self.graph[id]):
            self.goal = id
        else:
            raise ValueError('goal id not in graph')

In [4]:
#그래프 객체(장애물위치, 장애물cost, 지형지물 정보, 노드 정보 등)
class GridWorld(Graph):
    def __init__(self, x_dim, y_dim):
        self.x_dim = x_dim
        self.y_dim = y_dim
        self.obstacles=[] #((y,x),cost)
        self.cells=np.zeros((y_dim,x_dim))
        self.topography=np.random.rand(y_dim,x_dim)
        self.graph = {}
        self.generateGraphFromGrid()
        # self.printGrid()

    def __str__(self):
        msg = 'Graph:'
        for i in self.graph:
            msg += '\n  node: ' + i + ' g: ' + str(self.graph[i].g) + ' rhs: ' + str(self.graph[i].rhs) + ' neighbors: ' + str(self.graph[i].children)
        return msg

    def __repr__(self):
        return self.__str__()

    def printGrid(self):
        print('** GridWorld **')
        for row in self.cells:
            print(row)

    def printGValues(self):
        for j in range(self.y_dim):
            str_msg = ""
            for i in range(self.x_dim):
                node_id = 'x' + str(i) + 'y' + str(j)
                node = self.graph[node_id]
                if node.g == float('inf'):
                    str_msg += ' - '
                else:
                    str_msg += ' ' + str(node.g) + ' '
            print(str_msg)

    def generateGraphFromGrid(self):
        edge = 1
        cross=1.4
        for i in range(len(self.cells)):
            row = self.cells[i]
            for j in range(len(row)):
                # print('graph node ' + str(i) + ',' + str(j))
                node = Node('x' + str(i) + 'y' + str(j))
                if i > 0:  # not top row
                    node.parents['x' + str(i - 1) + 'y' + str(j)] = edge
                    node.children['x' + str(i - 1) + 'y' + str(j)] = edge
                if i + 1 < self.y_dim:  # not bottom row
                    node.parents['x' + str(i + 1) + 'y' + str(j)] = edge
                    node.children['x' + str(i + 1) + 'y' + str(j)] = edge
                if j > 0:  # not left col
                    node.parents['x' + str(i) + 'y' + str(j - 1)] = edge
                    node.children['x' + str(i) + 'y' + str(j - 1)] = edge
                if j + 1 < self.x_dim:  # not right col
                    node.parents['x' + str(i) + 'y' + str(j + 1)] = edge
                    node.children['x' + str(i) + 'y' + str(j + 1)] = edge
                '''if i > 0 and j > 0:
                    node.parents['x' + str(i - 1) + 'y' + str(j - 1)] = cross
                    node.children['x' + str(i - 1) + 'y' + str(j - 1)] = cross    
                if i > 0 and j + 1 < self.x_dim:
                    node.parents['x' + str(i - 1) + 'y' + str(j + 1)] = cross
                    node.children['x' + str(i - 1) + 'y' + str(j + 1)] = cross
                if i + 1 < self.y_dim and j > 0:
                    node.parents['x' + str(i + 1) + 'y' + str(j - 1)] = cross
                    node.children['x' + str(i + 1) + 'y' + str(j - 1)] = cross
                if i + 1 < self.y_dim and j + 1 < self.x_dim:
                    node.parents['x' + str(i + 1) + 'y' + str(j + 1)] = cross
                    node.children['x' + str(i + 1) + 'y' + str(j + 1)] = cross'''
                self.graph['x' + str(i) + 'y' + str(j)] = node
    
    #장애물 초기화 및 이동 반영
    def setObstacle(self):
        self.cells=np.zeros((self.y_dim,self.x_dim))
        for obs in self.obstacles:
            id=obs[0]
            cost=obs[1]
            self.cells[id[0]][id[1]]=-1
                
            for m in move:
                boundary=(id[0]+m[0],id[1]+m[1])
                if boundary in [i[0] for i in self.obstacles]:
                    continue
                if (0<=boundary[0]<self.y_dim) and (0<=boundary[1]<self.x_dim): 
                    self.cells[boundary[0]][boundary[1]]+=cost

In [7]:
# Define some colors
BLACK = (0, 0, 0)
WHITE = (255, 255, 255)
GREEN = (0, 255, 0)
RED   = (255, 0, 0)
GRAY = (145, 145, 102)
DARKGRAY= (100, 100, 100)
BLUE = (0, 0, 200)
PINK = (255,0,255)

#장애물 및 현재위치 이동 가능한 경로
#move=[(-1,-1),(-1,0),(-1,1),(0,-1),(0,1),(1,-1),(1,0),(1,1)]
move=[(-1,0),(0,-1),(0,1),(1,0)]

def colors(grid):
    if grid==0:
        return WHITE
    if grid==1:
        return GREEN
    if grid==-1:
        return RED
    if grid==-2:
        return BLACK
    return max((255-int((255-255)*grid/2),255-int((255-204)*grid/2),255-int((255-102)*grid/2)),(255,204,102))

# This sets the width and height of each grid location
width = 40
height = 40

# This sets the margin between each cell
margin = 2

dimension_x = 15
dimension_y = 15
VIEWING_RANGE = 1

# obstacle num ratio of grid
obstacle_ratio=0.1
fixed_obs_ratio=0.9
move_chance=0.3
alpha = 0.5

# Set the height and width of the screen
WINDOW_SIZE = [(width + margin) * dimension_x + margin,
               (height + margin) * dimension_y + margin]

def draw_arrow(screen, colour, start, end):
    pygame.draw.line(screen,colour,start,end,1)
    rotation = math.degrees(math.atan2(start[1]-end[1], end[0]-start[0]))+90
    rad=12
    pygame.draw.polygon(screen, BLACK, ((end[0]+rad*math.sin(math.radians(rotation)), end[1]+rad*math.cos(math.radians(rotation))), (end[0]+rad*math.sin(math.radians(rotation-120)), end[1]+rad*math.cos(math.radians(rotation-120))), (end[0]+rad*math.sin(math.radians(rotation+120)), end[1]+rad*math.cos(math.radians(rotation+120)))))


transparent = pygame.Surface((3*width+3*margin,3*height+3*margin))  # the size of your rect
transparent.set_alpha(128)                # alpha level
transparent.fill(DARKGRAY)           # this fills the entire surface

<rect(0, 0, 126, 126)>

In [8]:
def main():
    
    # Loop until the user clicks the close button.
    done = False
    
    s_start=input("Enter start (xnym, where n,m are coordinates) : ")
    s_goal= input("Enter goal (xnym, where n,m are coordinates)  : ")
    #s_start = 'x1y2'
    #s_goal = 'x9y7'

    graph = GridWorld(dimension_x, dimension_y)
    
    goal_coords = stateNameToCoords(s_goal)
    start_coords= stateNameToCoords(s_start)

    graph.setStart(s_start)
    graph.setGoal(s_goal)
          
    k_m = 0
    s_last = s_start
    queue = []

    graph, queue, k_m = initDStarLite(graph, queue, s_start, s_goal, k_m)
    
    #obstacle grid크기에서 일정 비율만큼 랜덤 위치 생성 / obstacle cost 각각 랜덤 생성(0~1)
    max_obstacle_num=int(obstacle_ratio*dimension_x*dimension_y)
    fixed_obstacle_num=int(max_obstacle_num*fixed_obs_ratio)
    ob_num=0
    fix_num=0
    while ob_num<max_obstacle_num:
        ob_x=random.randint(0,dimension_x-1)
        ob_y=random.randint(0,dimension_y-1)
        
        if fix_num<fixed_obstacle_num:
            movable=0
            cost=0
        else:
            movable=1
            cost=random.random()
            
        if graph.cells[ob_y][ob_x]==-1:
            continue
        elif [ob_x,ob_y]==goal_coords:
            continue
        elif [ob_x,ob_y]==start_coords:
            continue
        else:
            graph.obstacles.append(((ob_y,ob_x),cost,movable))
            graph.setObstacle()
            ob_num+=1
            fix_num+=1

    s_current = s_start
    pos_coords = stateNameToCoords(s_current)
    
    robot_centers=[]

    # Initialize pygame
    pygame.init()

    screen = pygame.display.set_mode(WINDOW_SIZE)

    # Set title of screen
    pygame.display.set_caption("D* Lite Path Planning")

    # Used to manage how fast the screen updates
    clock = pygame.time.Clock()
    basicfont = pygame.font.SysFont('Comic Sans MS', 22)

    # -------- Main Program Loop -----------
    while done==False:
        for event in pygame.event.get():   # User did something
            if event.type == pygame.QUIT:  # If user clicked close
                done = True  # Flag that we are done so we exit this loop
            elif event.type == pygame.KEYDOWN and event.key == pygame.K_SPACE:
                
                #현재위치가 obstacle한테 잡혔는지 확인
                if s_current in ['x{}y{}'.format(i[0][1],i[0][0]) for i in graph.obstacles]:
                    print('Failed !')
                    done=True
                    break
                
                #아군 먼저 이동
                print('move')
                s_new = Move(graph, s_current)
                if s_new == 'goal':
                    print('Goal Reached!')
                    done = True
                else:
                    # print('setting s_current to ', s_new)
                    s_current = s_new
                    pos_coords = stateNameToCoords(s_current)# print('got pos coords: ', pos_coords)
                
                #장애물 0.3확률로 랜덤 움직임
                if graph.obstacles:
                    for obstacle in graph.obstacles:
                        if obstacle[2]:
                            if random.random()>move_chance:
                                dx,dy=random.choice(move)
                                new_x,new_y=obstacle[0][1]+dx,obstacle[0][0]+dy
                                if 0<=new_x<dimension_x and 0<=new_y<dimension_y and (new_y,new_x) not in [obs[0] for obs in graph.obstacles]:
                                    graph.obstacles.remove(obstacle)
                                    graph.obstacles.append(((new_y,new_x),obstacle[1],obstacle[2]))

                    graph.setObstacle()
                    
                k_m = Rescan(graph, queue, s_current, k_m)
            
            #그리드 마우스 클릭으로 obstacle 추가
            '''elif event.type == pygame.MOUSEBUTTONDOWN:
                # User clicks the mouse. Get the position
                pos = pygame.mouse.get_pos()
                # Change the x/y screen coordinates to grid coordinates
                column = pos[0] // (width + margin)
                row = pos[1] // (height + margin)
                # Set that location to one
                if(graph.cells[row][column] != -1): #########장애물 변경
                    cost=random.random()
                    graph.obstacles.append(((row,column),cost))
                    graph.setObstacles()'''

        # Set the screen background
        screen.fill(BLACK)


        # Draw the grid
        temp_grid=graph.cells.copy()
        temp_grid+=graph.topography#+temp_grid
        temp_grid=np.where(temp_grid<0,-1,temp_grid)
        
                
        for row in range(dimension_y):
            for column in range(dimension_x):
                
                COLOR =colors(temp_grid[row][column])
                if COLOR == RED:
                    if [i[2] for i in graph.obstacles if i[0]==(row,column)][0]==0:
                        COLOR = PINK
                        
                pygame.draw.rect(screen, COLOR,
                                 [(margin + width) * column + margin,
                                  (margin + height) * row + margin, width, height])
                node_name = 'x' + str(column) + 'y' + str(row)


        # fill in goal cell with GREEN
        pygame.draw.rect(screen, GREEN, [(margin + width) * start_coords[0] + margin,
                                         (margin + height) * start_coords[1] + margin, width, height])


        # fill in goal cell with RED
        pygame.draw.rect(screen, BLUE, [(margin + width) * goal_coords[0] + margin,
                                         (margin + height) * goal_coords[1] + margin, width, height])

        # draw moving robot, based on pos_coords

        # Set the new robot centre
        robot_center = [int(pos_coords[0] * (width + margin) + width / 2) + margin , int(pos_coords[1] * (height + margin) + height / 2) + margin]
        draw_arrow(screen, BLACK, robot_center,robot_center)

        # maintain a list of all cells traversed
        robot_centers.append(robot_center)

        if len(robot_centers)>1:
            for i in range(0,len(robot_centers)-1):
                pygame.draw.line(screen,BLACK,robot_centers[i],robot_centers[i+1],3)

        # grey out visible boxes for robot
        screen.blit(transparent,(robot_center[0]-1.5*width-margin,robot_center[1]-1.5*height-margin))    # (0,0) are the top-left coordinates

        # Limit to 60 frames per second
        clock.tick(20)

        # Go ahead and update the screen with what we've drawn.
        pygame.display.flip()
        

    # Be IDLE friendly. If you forget this line, the program will 'hang'
    # on exit.
    pygame.quit()



if __name__ == "__main__":
    main()

Enter start (xnym, where n,m are coordinates) : x0y0
Enter goal (xnym, where n,m are coordinates)  : x14y14
move


2023-05-29 12:53:01.870 python[28173:48604683] TSM AdjustCapsLockLEDForKeyTransitionHandling - _ISSetPhysicalKeyboardCapsLockLED Inhibit


move
move
move
move
move
move
move
move
move
move
move
move
move
move
move
move
move
move
move
move
move
move
move
move
move
move
move
move
move
move
move
move
move
Goal Reached!


In [5]:
import heapq

#move=[(-1,-1),(-1,0),(-1,1),(0,-1),(0,1),(1,-1),(1,0),(1,1)]
move=[(-1,0),(0,-1),(0,1),(1,0)]

dimension_x = 15
dimension_y = 15

# obstacle num ratio of grid
obstacle_ratio=0.1
fixed_obs_ratio=0.8
move_chance=0.3
alpha=0.5

s_start=input("Enter start (xnym, where n,m are coordinates) : ")
s_goal= input("Enter goal (xnym, where n,m are coordinates)  : ")
goal_coords = stateNameToCoords(s_goal)
start_coords= stateNameToCoords(s_start)

def stateNameToCoords(name):
    return [int(name.split('x')[1].split('y')[0]), int(name.split('x')[1].split('y')[1])]

Enter start (xnym, where n,m are coordinates) : x0y0
Enter goal (xnym, where n,m are coordinates)  : x14y14


In [6]:
test_num=100
success=0
move_num_list=[]
for idx in range(test_num):
    # Loop until the user clicks the close button.
    done = False
    
    graph = GridWorld(dimension_x, dimension_y)

    graph.setStart(s_start)
    graph.setGoal(s_goal)
         
    k_m = 0
    s_last = s_start
    queue = []

    graph, queue, k_m = initDStarLite(graph, queue, s_start, s_goal, k_m)
    
    #랜덤 이동 장애물 설치
    max_obstacle_num=int(obstacle_ratio*dimension_x*dimension_y)
    fixed_obstacle_num=int(max_obstacle_num*fixed_obs_ratio)
    ob_num=0
    fix_num=0
    while ob_num<max_obstacle_num:
        ob_x=random.randint(0,dimension_x-1)
        ob_y=random.randint(0,dimension_y-1)
        
        if fix_num<fixed_obstacle_num:
            movable=0
            cost=0
        else:
            movable=1
            cost=random.random()
            
        if graph.cells[ob_y][ob_x]==-1:
            continue
        elif [ob_x,ob_y]==goal_coords:
            continue
        elif [ob_x,ob_y]==start_coords:
            continue
        else:
            graph.obstacles.append(((ob_y,ob_x),cost,movable))
            graph.setObstacle()
            ob_num+=1
            fix_num+=1

    s_current = s_start
    pos_coords = stateNameToCoords(s_current)

    # -------- Main Program Loop -----------
    move_num=0
    while done==False:

        s_new = Move(graph, s_current)
        move_num+=1
        if s_new == 'goal':
            done = True
            print(f'{idx}. Success ! --> move {move_num} times')
            move_num_list.append(move_num)
            success+=1
        else:
            # print('setting s_current to ', s_new)
            s_current = s_new
            pos_coords = stateNameToCoords(s_current)# print('got pos coords: ', pos_coords)

        if graph.obstacles:
            for obstacle in graph.obstacles:
                if obstacle[2]:
                    if random.random()>move_chance:
                        dx,dy=random.choice(move)
                        new_x,new_y=obstacle[0][1]+dx,obstacle[0][0]+dy
                        if 0<=new_x<dimension_x and 0<=new_y<dimension_y and (new_y,new_x) not in graph.obstacles:
                            graph.obstacles.remove(obstacle)
                            graph.obstacles.append(((new_y,new_x),obstacle[1],obstacle[2]))
            graph.setObstacle()
            
        if s_current in ['x{}y{}'.format(i[0][1],i[0][0]) for i in graph.obstacles]:
            print(f'{idx}. Failed !')
            done=True
            
        k_m = Rescan(graph, queue, s_current, k_m)

print('\n-Success Ratio : {}%'.format(100*success/test_num))
print('-Average moving times : {}'.format(np.mean(move_num_list)))

0. Success ! --> move 29 times
1. Failed !
2. Success ! --> move 35 times
3. Success ! --> move 33 times
4. Success ! --> move 35 times
5. Success ! --> move 29 times
6. Success ! --> move 35 times
7. Failed !
8. Success ! --> move 31 times
9. Success ! --> move 33 times
10. Success ! --> move 33 times
11. Success ! --> move 31 times
12. Success ! --> move 29 times
13. Success ! --> move 33 times
14. Success ! --> move 33 times
15. Success ! --> move 31 times
16. Success ! --> move 29 times
17. Success ! --> move 29 times
18. Success ! --> move 30 times
19. Success ! --> move 29 times
20. Success ! --> move 31 times
21. Failed !
22. Success ! --> move 33 times
23. Success ! --> move 32 times
24. Success ! --> move 31 times
25. Success ! --> move 29 times
26. Success ! --> move 33 times
27. Failed !
28. Success ! --> move 31 times
29. Success ! --> move 33 times
30. Success ! --> move 29 times
31. Success ! --> move 29 times
32. Success ! --> move 31 times
33. Success ! --> move 29 time