<a href="https://colab.research.google.com/github/speedhawk/LLM-A-/blob/main/LLM_Astar_Demo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
from IPython.display import HTML, display

def set_css():
  display(HTML('''
  <style>
    pre {
        white-space: pre-wrap;
    }
  </style>
  '''))
get_ipython().events.register('pre_run_cell', set_css)

In [None]:
!pip install openai==0.28

In [None]:
import math
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import json
import openai
import copy
import re
import openpyxl
import seaborn as sns
import seaborn.objects as so

In [None]:
class Astar_LLM_ImgData():
  def __init__(self, path, start, goal, env_discription, map_range, max_margin, map_sheet='Sheet1', dis_table_sheet='Sheet2') -> None:
      self.file_path = path
      self.map_sheet = map_sheet
      self.dis_table_sheet = dis_table_sheet
      self.map_range = map_range
      self.max_margin = max_margin
      self.map = None
      self.dis_table = None
      self.result_demo = np.ones((self.max_margin, self.max_margin, 3))
      self.action_space = {0: [-1, 0],
                           1: [1, 0],
                           2: [0, 1],
                           3: [0, -1],
                           4: [-1, 1],
                           5: [-1, -1],
                           6: [1, 1],
                           7: [1, -1]}

      self.start_node = start
      self.target_node = goal
      self.cur_node = self.start_node
      self.env_discription = env_discription
      self.pre_node = None

      self.step = 0
      self.min_step = 0
      self.min_dis = 1024
      self.min_node = None

      self.open_queue = {}   # component structure: {step: neighbour node}
      self.close_queue = {str(self.start_node): self.start_node}  # structure: {'current node': parent node}
      self.par_nodes = {}   # component structure: {step: current node}


      self.model_name = "gpt-3.5-turbo-16k"

      self.openai_key = 'YOUR_KEY'


      self.ini_massages = None


  def generate_map(self):
    wb = openpyxl.load_workbook(self.file_path)
    ws = wb[self.map_sheet]
    _range = self.map_range
    map = []
    for row in ws[_range]:
      map_row = []
      for cell in row:
        map_row.append(cell.value)
      map.append(map_row)
    map = np.array(map) # shape: [32, 32] or [16, 16]
    self.map = map

  def generate_dis_table(self):
    wb = openpyxl.load_workbook(self.file_path)
    ws = wb[self.dis_table_sheet]
    _range = self.map_range
    dis_table = []
    for row in ws[_range]:
      dis_table_row = []
      for cell in row:
        dis_table_row.append(cell.value)
      dis_table.append(dis_table_row)
    dis_table = np.array(dis_table) # shape: [32, 32] or [16, 16]
    self.dis_table = dis_table

  def message_initialize(self):
    openai.api_key = self.openai_key
    messages = [{'role': 'system', 'content': 'You are a good assistant! Man!'}, ]

    # tell the basic information of the map
    grid_world_inf = "We have a " + str(self.max_margin) + "*" + str(self.max_margin) + " grid map in which there are totally " + str(int(self.max_margin * self.max_margin)) + " grids inside. In this map, we use [i, j] to represent the grid of ith row and jth column. Additionally, there is a binary number which is ‘1’ or ‘0’ in each grid to represent the obstacle conditions that ‘1’ means the grid is free whilst ‘0’ means the grid has been occupied by an obstacle and the agent cannot move to it. Our objective is to make an agent at the starting node to bypass obstacles and reach the designated target node. Considering the token limitation problem, please correctly and precisely remember the information above and do not return to much contexts for not only this but also the following prompt. Here is the detail information below. "
    print("User: ", grid_world_inf)
    # grid_world_inf = "We have a 32*32 grid map in which there are totally 1024 grids inside. In this map, we use [i, j] to represent the grid of ith row and jth column. Additionally, there is a binary number which is ‘1’ or ‘0’ in each grid to represent the obstacle conditions that ‘1’ means the grid is free whilst ‘0’ means the grid has been occupied by an obstacle and the agent cannot move to it. Our objective is to make an agent at the starting node to bypass obstacles and reach the designated target node. Considering the token limitation problem, please correctly and precisely remember the information above and do not return to much contexts for not only this but also the following prompt. Here is the detail information below. "
    # print("User: ", grid_world_inf)
    messages.append({'role': 'user', 'content': grid_world_inf}, )
    chat_for_task = openai.ChatCompletion.create(model=self.model_name, messages=messages)
    reply_for_task = chat_for_task.choices[0].message.content
    print(f"ChatGPT: {reply_for_task}")
    messages.append({'role': 'assistant', 'content': reply_for_task})

    # tell the start_node and target_node:
    start_and_target = "Start node: " + str(self.start_node) + "; Target node: " + str(self.target_node) + " which is at the agent's lowerleft direction. "
    print("User: ", start_and_target)
    messages.append({'role': 'user', 'content': start_and_target}, )
    chat_for_task = openai.ChatCompletion.create(model=self.model_name, messages=messages)
    reply_for_task = chat_for_task.choices[0].message.content
    print(f"ChatGPT: {reply_for_task}")
    messages.append({'role': 'assistant', 'content': reply_for_task})

    # tell the obstacle information.
    obstacles_inf = self.env_discription
    print("User: ", obstacles_inf)
    messages.append({'role': 'user', 'content': obstacles_inf}, )
    chat_for_task = openai.ChatCompletion.create(model=self.model_name, messages=messages)
    reply_for_task = chat_for_task.choices[0].message.content
    print(f"ChatGPT: {reply_for_task}")
    messages.append({'role': 'assistant', 'content': reply_for_task})

    # tell the agent action space and distance standard
    action_inf = "Agent action space (set current coordinate of the agent is [i, j]): Action 0: move up ([i, j] -> [i-1, j]); Action 1: move down ([i, j] -> [i+1, j]); Action 2: move right ([i, j] -> [i, j+1]); Action 3: move left ([i, j] -> [i, j-1]); Action 4: move upper right [i, j] -> [i-1, j+1] Action 5: move upper left ([i, j] -> [i-1, j-1]); Action 6: move lower right ([i, j] -> [i+1, j+1]); Action 7: move lower left ([i, j] -> [i+1, j-1]). Notice: 1. In this map, row index i increases from upward direction to downward direction, whilst column index j increases from leftward direction to righward direction. 2. When the agent is located in a grid adjacent to an obstacle, some actions in the action space will not be achieved. Please dynamically adjust the agent's actions at runtime. Distance standard: Euclidean distance. Considering the token limitation problem, please correctly and precisely remember the information above and do not return to much contexts for not only this but also the following prompt."
    print("User: ", action_inf)
    messages.append({'role': 'user', 'content': action_inf}, )
    chat_for_task = openai.ChatCompletion.create(model=self.model_name, messages=messages)
    reply_for_task = chat_for_task.choices[0].message.content
    print(f"ChatGPT: {reply_for_task}")
    messages.append({'role': 'assistant', 'content': reply_for_task})


    # tell the objective
    obj_inf = "Our objective is to move the agent step by step with the actions above bypassing the obstacles from the start node to the target node. Considering the token limitation problem, please correctly and precisely remember the information above and do not return to much contexts for not only this but also the following prompt."
    print("User: ", obj_inf)
    messages.append({'role': 'user', 'content': obj_inf}, )
    chat_for_task = openai.ChatCompletion.create(model=self.model_name, messages=messages)
    reply_for_task = chat_for_task.choices[0].message.content
    print(f"ChatGPT: {reply_for_task}")
    messages.append({'role': 'assistant', 'content': reply_for_task})

    self.ini_massages = copy.deepcopy(messages)

  def extract_node(self, reply):
    """
    This method is to extract the nodes of the search space in each step molocation. Commonly, there are multi-numbers
    of coordinate tuples in the context. Therefore, it is fairly different from extract_node() function.
    """
    context = reply
    pattern = r'\[[\d,\s]+\]'
    try:
      lists = re.findall(pattern, context)
      search_nodes = [eval(lst) for lst in lists]
    except IndexError:
      print('Empty list. It is caused by error output of LLM.')
    finally:
      if len(search_nodes) == 0:
        return search_nodes
      else:
        return search_nodes[-1]

  def opt_node_select(self, opt_actions):
    """
    param:
    opt_actions: the action list return by function extract_node()
    """

    opt_sup_node = self.cur_node
    if sum(i>7 for i in opt_actions) != 0 or sum(i<0 for i in opt_actions) != 0 or \
    len(opt_actions) != len(set(opt_actions)) or len(opt_actions) == 0:   # error results
      return
    else:
      for i in range(len(opt_actions)):                           # collect neighbour nodes into open.queue
        act = opt_actions[i]
        sup_x = self.cur_node[0] + self.action_space[act][0]
        sup_y = self.cur_node[1] + self.action_space[act][1]
        sup_node = [sup_x, sup_y]
        if not(0 <= sup_x < self.max_margin) or not(0 <= sup_y < self.max_margin):        # move out of range
          continue
        elif abs(self.dis_table[sup_x][sup_y]) == 1024:           # move to obstacles
          continue
        elif str(sup_node) in list(self.close_queue.keys()):      # duplicated planing (meet the same node inside)
          continue
        elif sup_node in list(self.open_queue.values()):          # when duplicated searching, select the node with minimum G-cost as the parent node.

          par_key = list(self.open_queue.keys())[list(self.open_queue.values()).index(sup_node)]
          par = self.par_nodes[par_key]
          g_par = abs(self.dis_table[self.start_node[0]][self.start_node[1]] - self.dis_table[par[0]][par[1]])
          g_cur = abs(self.dis_table[self.start_node[0]][self.start_node[1]] - self.dis_table[self.cur_node[0]][self.cur_node[1]])
          if g_cur < g_par:
            sup_key = list(self.open_queue.keys())[list(self.open_queue.values()).index(sup_node)]
            ori_par = self.par_nodes[sup_key]
            self.par_nodes[sup_key] = self.cur_node

        else: # add new nodes
          end = 0 if len(self.open_queue) == 0 else list(self.open_queue.keys())[-1] + 1
          self.open_queue[end] = sup_node
          self.par_nodes[end] = self.cur_node

      # select optimal node based on f-cost

      opt_key = min(self.open_queue, key = lambda i: abs(self.dis_table[self.start_node[0]][self.start_node[1]] - \
                                                         self.dis_table[self.open_queue[i][0]][self.open_queue[i][1]]) + \
                    abs(self.dis_table[self.open_queue[i][0]][self.open_queue[i][1]]))

      print(f'opt_key: {opt_key}')

      to_close = self.open_queue.pop(opt_key)
      to_close_par = self.par_nodes.pop(opt_key)

      # update parent node

      self.close_queue[str(to_close)] = to_close_par

      # update current node of agent
      self.pre_node = to_close_par
      self.cur_node = to_close

**There are totally three different environments with corresponding three sizes. Please select one and run it to load.**

In [None]:
map_name = 'Aisle'
path = '/content/gdrive/MyDrive/LLM_Astar/maps/' + map_name + '.xlsx'
start = [1, 30]
goal = [29, 2]
map_range = 'A1:AF32'
max_margin = 32
env_discription = "Obstacles: This map mainly includes two obstacles respectively located at region A and B. This is the detail decription of grid regions occupied by obstacles, we deployed i and j as the index of row and column: Region A: i: [0:22], j: [13:18]; Region B: i: [26:31], j: [13:18]. Considering the token limitation problem, please correctly and precisely remember the information above and do not return to much contexts for not only this but also the following prompt. "

env = Astar_LLM_ImgData(path, start, goal, env_discription, map_range, max_margin)

In [None]:
map_name = 'Canyon'
path = '/content/gdrive/MyDrive/LLM_Astar/maps/' + map_name + '.xlsx'
start = [3, 23]
goal = [21, 1]
map_range = 'A1:AF32'
max_margin = 32
env_discription = "Obstacles: This map mainly includes four obstacles respectively located at region A, B, C and D. This is the detail decription of grid regions occupied by obstacles, we deployed i and j as the index of row and column: Region A: i: [0:15], j: [0:21]; Region B: i: [0:15], j: [25:31]; C: i: [22:31], j: [0:6]; Region D: i: [22:31], j: [10:31]. Considering the token limitation problem, please correctly and precisely remember the information above and do not return to much contexts for not only this but also the following prompt. "

env = Astar_LLM_ImgData(path, start, goal, env_discription, map_range, max_margin)

In [None]:
map_name = 'Double_door'
path = '/content/gdrive/MyDrive/LLM_Astar/maps/' + map_name + '.xlsx'
start = [1, 29]
goal = [28, 1]
map_range = 'A1:AF32'
max_margin = 32
env_discription = "Obstacles: This map includes four obstacles respectively located at region A, B, C, and D. This is the detail decription of grid regions occupied by obstacles, we deployed i and j as the index of row and column: Region A: i: [0:26], j: [0:10]; Region B: i: [0:20], j: [11:21]; Region C: i: [30:31], j: [10:21]; Region D: i: [23:31], j: [21:31] except the grid i=23 and j=21. Considering the token limitation problem, please correctly and precisely remember the information above and do not return to much contexts for not only this but also the following prompt. "

env = Astar_LLM_ImgData(path, start, goal, env_discription, map_range, max_margin)

In [None]:
map_name = 'Aisle_24'
path = '/content/gdrive/MyDrive/LLM_Astar/maps/' + map_name + '.xlsx'
start = [1, 22]
goal = [21, 2]
map_range = 'A1:X24'
max_margin = 24
env_discription = "Obstacles: This map mainly includes two obstacles respectively located at region A and B. This is the detail decription of grid regions occupied by obstacles, we deployed i and j as the index of row and column: Region A: i: [0:15], j: [9:13]; Region B: i: [19:23], j: [9:13]. Considering the token limitation problem, please correctly and precisely remember the information above and do not return to much contexts for not only this but also the following prompt."
env = Astar_LLM_ImgData(path, start, goal, env_discription, map_range, max_margin)

In [None]:
map_name = 'Canyon_24'
path = '/content/gdrive/MyDrive/LLM_Astar/maps/' + map_name + '.xlsx'
start = [2, 17]
goal = [15, 2]
map_range = 'A1:X24'
max_margin = 24
env_discription = "Obstacles: This map mainly includes four obstacles respectively located at region A, B, C and D. This is the detail decription of grid regions occupied by obstacles, we deployed i and j as the index of row and column: Region A: i: [0:11], j: [0:15]; Region B: i: [0:11], j: [19:23]; C: i: [16:23], j: [0:4]; Region D: i: [16:23], j: [8:23]. Considering the token limitation problem, please correctly and precisely remember the information above and do not return to much contexts for not only this but also the following prompt. "
env = Astar_LLM_ImgData(path, start, goal, env_discription, map_range, max_margin)

In [None]:
map_name = 'Double_door_24'
path = '/content/gdrive/MyDrive/LLM_Astar/maps/' + map_name + '.xlsx'
start = [1, 22]
goal = [19, 1]
map_range = 'A1:X24'
max_margin = 24
env_discription = "Obstacles: This map includes four obstacles respectively located at region A, B, C, and D. This is the detail decription of grid regions occupied by obstacles, we deployed i and j as the index of row and column: Region A: i: [0:17], j: [0:7]; Region B: i: [0:15], j: [8:16]; Region C: i: [21:23], j: [7:15]; Region D: i: [18:23], j: [16:23] except the region where i=18 and j=16. Considering the token limitation problem, please correctly and precisely remember the information above and do not return to much contexts for not only this but also the following prompt. "
env = Astar_LLM_ImgData(path, start, goal, env_discription, map_range, max_margin)

In [None]:
map_name = 'Aisle_16'
path = '/content/gdrive/MyDrive/LLM_Astar/maps/' + map_name + '.xlsx'
start = [1, 13]
goal = [14, 1]
env_discription = "Obstacles: This map mainly includes two obstacles respectively located at region A and B. This is the detail decription of grid regions occupied by obstacles, we deployed i and j as the index of row and column: Region A: i: [0:8], j: [7:9]; Region B: i: [12:15], j: [7:9]. Considering the token limitation problem, please correctly and precisely remember the information above and do not return to much contexts for not only this but also the following prompt."
map_range ='A1:P16'
max_margin = 16
env = Astar_LLM_ImgData(path, start, goal, env_discription, map_range, max_margin)

In [None]:
map_name = 'Canyon_16'
path = '/content/gdrive/MyDrive/LLM_Astar/maps/' + map_name + '.xlsx'
start = [2, 10]
goal = [11, 1]
env_discription = "Obstacles: This map mainly includes four obstacles respectively located at region A, B, C and D. This is the detail decription of grid regions occupied by obstacles, we deployed i and j as the index of row and column: Region A: i: [0:8], j: [0:9]; Region B: i: [0:8], j: [12:15]; C: i: [12:15], j: [0:3]; Region D: i: [12:15], j: [6:15]. Considering the token limitation problem, please correctly and precisely remember the information above and do not return to much contexts for not only this but also the following prompt. "
map_range ='A1:P16'
max_margin = 16
env = Astar_LLM_ImgData(path, start, goal, env_discription, map_range, max_margin)

In [None]:
map_name = 'Double_door_16'
path = '/content/gdrive/MyDrive/LLM_Astar/maps/' + map_name + '.xlsx'
start = [1, 15]
goal = [14, 1]
env_discription = "Obstacles: This map includes four obstacles respectively located at region A, B, C, and D. This is the detail decription of grid regions occupied by obstacles, we deployed i and j as the index of row and column: Region A: i: [0:12], j: [0:4]; Region B: i: [0:10], j: [5:10]; Region C: i=15, j: [5:9]; Region D: i: [13:15], j: [10:15] except the grid i=13 and j=10. Considering the token limitation problem, please correctly and precisely remember the information above and do not return to much contexts for not only this but also the following prompt. "
map_range ='A1:P16'
max_margin = 16
env = Astar_LLM_ImgData(path, start, goal, env_discription, map_range, max_margin)

In [None]:
env.generate_map()
env.generate_dis_table()
env.generate_dis_table()

In [None]:

# Initialize language description of our map

env.message_initialize()


while env.cur_node != env.target_node:

  messages = copy.deepcopy(env.ini_massages)

  # ask for subset of action space

  ask_opt_actions_inf = "Now, the agent has moved from " + str(env.pre_node) + " to " + str(env.cur_node) +" which is the current position. Considering the current node the agent stays at above, based on the information of target nodes obstacles, action space and our objective, please for only the next step return me a subset of action space in which there are several optimal actions satisfied with the factors below: 1. Take care the obstacle regions I told you. Please always be careful do not move to them. 2. Take care the action space which includes at most 8 actions allowed, especially for some cells adjacent to the obstacles. 3. All of the optimal actions should serve for the purpose ‘achieve and reach the target node’. The agent must move in the correct direction while avoiding obstacles. Please prudentially deal with the causality relationship among obstacles and the correct direction and dinamically adjust and improve the solution. After selecting actions, please put the corresponding action numbers into a list and come out it. Considering the token limitation problem, please correctly and precisely remember the information above and do not return to much contexts for not only this but also the following prompt."
  print("User: ", ask_opt_actions_inf)
  messages.append({'role': 'user', 'content': ask_opt_actions_inf}, )
  chat_for_task = openai.ChatCompletion.create(model=env.model_name, messages=messages)
  reply_for_task = chat_for_task.choices[0].message.content
  print(f"ChatGPT: {reply_for_task}")
  messages.append({'role': 'assistant', 'content': reply_for_task})

  ask_opt_actions_inf = "Please come out ONLY the action number list ITSELF again with the format 'opt_actions: [1, 2, 3]' and WITHOUT ANY OTHER CONTENTS WITH LIST FORMAT. This is exclusively for the convenience for extracting the action number list. So please do not output redundant information with the list format. "
  print("User: ", ask_opt_actions_inf)
  messages.append({'role': 'user', 'content': ask_opt_actions_inf}, )
  chat_for_task = openai.ChatCompletion.create(model=env.model_name, messages=messages)
  reply_for_task = chat_for_task.choices[0].message.content
  print(f"ChatGPT: {reply_for_task}")
  messages.append({'role': 'assistant', 'content': reply_for_task})

  # extract optimal action list & save in search nodes memory

  opt_actions = env.extract_node(reply_for_task)
  # select the optimal next step and save in optimal path memory

  env.opt_node_select(opt_actions)
  # update step

**The code below is set to generate the result image.**

In [None]:
my_love_path = copy.deepcopy(env.close_queue)
my_love_search = copy.deepcopy(env.open_queue)
steps = len(my_love_path)
open_steps = len(my_love_search)
total_step = steps + open_steps

In [None]:
the_map = copy.deepcopy(env.map)

In [None]:
# beta: avg_steering_angle -> maximum number of deviation angle

def generate_traj_map(map):

    map1 = copy.deepcopy(map)
    map2 = copy.deepcopy(map1)
    map = np.dstack((map, map1, map2)).astype(float)
    # int type causes the exception of decimal recognition.

    # Color the searched space, composing the searched grid map

    start_x = env.start_node[0]
    start_y = env.start_node[1]
    cur_x = start_x
    cur_y = start_y

    for key in my_love_search:
      current_node = eval(key)
      x = current_node[0]
      y = current_node[1]
      # print(f"x: {x}")
      # print(f"y: {y}")
      map[x][y][0] = 0.0
      map[x][y][1] = 1.0
      map[x][y][2] = 0.0

    for key in my_love_path:
      x = my_love_path[key][0]
      y = my_love_path[key][1]
      map[x][y][0] = 0.0
      map[x][y][1] = 1.0
      map[x][y][2] = 0.0

    cur = goal
    # cur = env.cur_node
    parent = my_love_path[str(cur)]
    x_axis = [goal[0]]
    y_axis = [goal[1]]
    bas_dir_vector = np.array([0.0, 1.0])
    dis_vector = np.array(goal)-np.array(start)
    sum_angle = 0.0
    avg_angle = 0.0
    count = 0
    max_times = 0

    # trace back the path, calculating the length and avarage curvature

    while(parent != start):
      count += 1

      # update
      x_axis.append(parent[0])
      y_axis.append(parent[1])
      my_love_path.pop(str(cur))

      # caluculate angles

      dir_vector = np.array(cur) - np.array(parent)
      # print(f"bas_dir_vector: {bas_dir_vector}")

      # steering angle toward last vector
      angle = abs(np.arccos(np.clip(np.dot(dir_vector, bas_dir_vector)/(np.linalg.norm(dir_vector)*np.linalg.norm(bas_dir_vector)), -1.0, 1.0)))

      # steering angle toward displacement
      dis_angle = abs(np.arccos(np.clip(np.dot(dir_vector, dis_vector)/(np.linalg.norm(dir_vector)*np.linalg.norm(dis_vector)), -1.0, 1.0)))

      dis_angle = round(dis_angle, 3)
      angle = round(angle, 3)
      # print(f"angle: {angle}")

      sum_angle += angle
      avg_angle = round(sum_angle / float(count), 2)

      if dis_angle > math.pi / 2.0:
        max_times += 1

      cur = parent
      parent = my_love_path[str(cur)]
      bas_dir_vector = dir_vector
      # print(f"bas_dir_vector: {bas_dir_vector}\n")

    x_axis.append(env.start_node[0])
    y_axis.append(env.start_node[1])

    # return map, x_axis, y_axis, avg_angle
    return map, x_axis, y_axis, avg_angle, max_times

def color_path(map, x_axis, y_axis):

    color_0 = 1.0
    color_2 = 0.0
    delta_color = 1.0 / float(len(x_axis))
    for i in range(len(x_axis)):
      x = x_axis[i]
      y = y_axis[i]
      map[x][y][0] = color_0
      map[x][y][1] = 0.0
      map[x][y][2] = color_2

      color_0 -= delta_color
      color_2 += delta_color

    traj_map = (map * 255.0).astype(int)

    return traj_map, len(x_axis)


In [None]:
final_map, x_axis, y_axis, avg_angle, max_times = generate_traj_map(the_map)
final_map, path_len = color_path(final_map, x_axis, y_axis)

In [None]:
# Generate the interactive records
fig, axes = plt.subplots(1, 1, figsize=[12, 4])
axes.imshow(final_map)
axes.axis('off')