In [12]:
import os
import re
import json
import numpy as np
import heapq
from matplotlib import colors
import matplotlib.pyplot as plt
import torch
import networkx as nx
from networkx.readwrite import json_graph
from graph_to_prompt import graph_to_prompt
from transformers import AutoModelForCausalLM, AutoTokenizer
from tqdm.auto import tqdm

In [2]:
def plot_graph(G, path = None, start = None, goal = None, checking_edges = None, weight_tag = True, llm_nodes = None):

    pos = nx.spring_layout(G, seed=42)

    plt.figure(figsize=(12, 8))

    nx.draw_networkx_edges(G, pos, edge_color='gray', alpha=0.5, arrows=True,
                        connectionstyle='arc3,rad=0.1')

    nx.draw_networkx_nodes(G, pos, node_size=400, node_color='lightblue')
    nx.draw_networkx_labels(G, pos, font_size=8, font_weight='bold')

    if path is not None:
        nx.draw_networkx_nodes([start], pos, node_size=400, node_color='yellow')
        nx.draw_networkx_nodes([goal], pos, node_size=400, node_color='green')
        path_edges = list(zip(path, path[1:]))

        print(f"A* 경로 : {path}")
        nx.draw_networkx_edges(G, pos, edgelist=checking_edges, edge_color='orange', alpha= 0.3, width=4,
                            arrows=True, connectionstyle='arc3,rad=0.1')
        nx.draw_networkx_edges(G, pos, edgelist=path_edges, edge_color='red', width=2,
                            arrows=True, connectionstyle='arc3,rad=0.1')
        if llm_nodes is None :
            title = f"A* Shortest Path from {start} to {goal}"
            
        else:
            title = f"LLM A* Shortest Path from {start} to {goal} (llm nodes = {llm_nodes})"
            nx.draw_networkx_nodes(llm_nodes, pos, node_size=400, node_color='red')
    else :
        title = "our graph"
        

    
    if weight_tag:
        edge_labels = nx.get_edge_attributes(G, 'weight')
        nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels, font_color='green')

    plt.title(title, fontsize=14)
    plt.axis('off')
    plt.tight_layout()
    plt.savefig(os_path+title+".png",dpi=200)
    plt.show()

def plot_bus_graph(G, path=None, start=None, goal=None,
                   checking_edges=None, weight_tag=True,
                   route_tag=False, node_size=50,
                   title="Bus Routes Graph"):
    """
    - path: A* 결과 (정류장ID list)
    - start, goal: 노드(정류장ID)
    - checking_edges: 탐색 중 확인한 엣지
    - weight_tag: True면 edge weight 표시
    - route_tag: True면 edge에 노선명(route_name) 표시
    - node_size: 노드 크기 
    """

    # (1) 노드 위치를 (x, y) 좌표로 지정
    pos = {}
    for n in G.nodes():
        x_coord = G.nodes[n].get('x', 0.0)
        y_coord = G.nodes[n].get('y', 0.0)
        pos[n] = (x_coord, y_coord)

    # (2) 그림 크기, 축 등
    plt.figure(figsize=(20, 16))

    # (3) 전체 엣지
    nx.draw_networkx_edges(
        G, pos,
        edge_color='gray',
        alpha=0.5,
        arrows=True,
        connectionstyle='arc3,rad=0.05'
    )

    # (4) path 강조
    if path:
        path_edges = list(zip(path, path[1:]))

        # 탐색 중 확인한 엣지 (주황색)
        if checking_edges:
            nx.draw_networkx_edges(
                G, pos,
                edgelist=checking_edges,
                edge_color='orange',
                alpha=0.4,
                width=3,
                arrows=True,
                connectionstyle='arc3,rad=0.05'
            )
        # 최종 경로 (빨간색)
        nx.draw_networkx_edges(
            G, pos,
            edgelist=path_edges,
            edge_color='red',
            width=5,
            arrows=True,
            connectionstyle='arc3,rad=0.05'
        )

    # (5) 노드 표시
    else:
        labels = {node: str(node) for node in G.nodes()}
        nx.draw_networkx_nodes(
            G, pos,
            node_size=node_size,
            node_color='lightblue'
        )

    # (6) 엣지 라벨
    if weight_tag:
        edge_labels = {
            (u, v): G[u][v].get('weight', '')
            for u, v in G.edges()
        }
        nx.draw_networkx_edge_labels(
            G, pos,
            edge_labels=edge_labels,
            font_color='green'
        )
    elif route_tag:
        edge_labels = {
            (u, v): G[u][v].get('route_name', '')
            for u, v in G.edges()
        }
        nx.draw_networkx_edge_labels(
            G, pos,
            edge_labels=edge_labels,
            font_color='blue'
        )


    if path and start and goal:
        title = f"{title}\nStart: {start}, Goal: {goal}"

    plt.title(title, fontsize=13)
    plt.axis('off')
    plt.tight_layout()
    # plt.savefig(os_path + title + '.png' , dpi=200)
    # plt.show()

In [3]:
def get_nums_list(s):
    nums_str = re.findall(r'\d+', s)  
    nums = list(map(int, nums_str))
    return  nums

In [4]:
class Request_llm:
    def __init__(self, model_name):
        if model_name == 'llama':
            self.max_new_tokens = 200
            self.tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-3.2-3B-Instruct")
            self.model = AutoModelForCausalLM.from_pretrained("meta-llama/Llama-3.2-3B-Instruct", device_map="auto",load_in_8bit=True)
            #self.model.save_model("models/meta-llama/Llama-3.2-3B-Instruct")
        

        self.model.config.pad_token_id = self.model.config.eos_token_id

    def get_waypoints(self, j, start, goal, n_points):
        prompt = graph_to_prompt(j, start, goal, n_points)
        inputs = self.tokenizer(
                    prompt,
                    return_tensors="pt",
                    return_attention_mask=True
                )
        
        input_ids = inputs["input_ids"]
        inputs["input_ids"] = inputs["input_ids"].to(self.model.device)
        inputs["attention_mask"] = inputs["attention_mask"].to(self.model.device)

        outputs = self.model.generate(
                **inputs,
                max_new_tokens = self.max_new_tokens
            )
        generated_token = outputs[:, input_ids.shape[-1] :]
        result = self.tokenizer.batch_decode(generated_token, skip_special_tokens=True)[0]
        waypoints = get_nums_list(result)
        return waypoints


In [5]:
def astar(G, start, goal, heuristic):
    count = 0
    checking_edges = set()

    open_set = []
    heapq.heappush(open_set, (heuristic[start], 0, start))

    came_from = {}
    g_score = {node: float('inf') for node in G.nodes}
    g_score[start] = 0

    f_score = {node: float('inf') for node in G.nodes}
    f_score[start] = heuristic[start]

    visited = set()

    while open_set:
        _, current_g, current = heapq.heappop(open_set)

        if current == goal:
            path = [current]
            while current in came_from:
                current = came_from[current]
                path.append(current)
            return path[::-1], current_g, count, list(checking_edges)

        visited.add(current)

        for neighbor in G.successors(current):
            count = count + 1
            if neighbor in visited:
                continue

            tentative_g = g_score[current] + G[current][neighbor]['weight']
            checking_edges.add((current, neighbor))
            if tentative_g < g_score[neighbor]:
                came_from[neighbor] = current
                g_score[neighbor] = tentative_g
                f_score[neighbor] = tentative_g + heuristic[neighbor]
                heapq.heappush(open_set, (f_score[neighbor], tentative_g, neighbor))

    return None, None, count, list(checking_edges)


In [6]:
def llm_astar(G, start, goal, heuristic, llm, obstacles=None):

    nodes = list(G.nodes)

    llm = list(set(llm))
    
    T = []

    for ts in llm:
        if ts in nodes:
            T.append(ts)

    if obstacles is None:
        obstacles = set()
    else:
        obstacles = set(obstacles)
    print(llm)

    T.reverse()

    if not T or T[0] != start:
        T.insert(0, start)
    if T[-1] != goal:
        T.append(goal)

    T = [t for t in T if t not in obstacles]

    t_idx = 1 if len(T) > 1 else 0
    t = T[t_idx]

    dist_to_t = nx.single_source_dijkstra_path_length(G, t, weight='weight')

    count = 0
    checking_edges = set()
    came_from = {}
    visited = set()

    g_score = {n: float('inf') for n in G.nodes}
    g_score[start] = 0

    f_score = {n: float('inf') for n in G.nodes}
    f_score[start] = g_score[start] + heuristic[start] + dist_to_t.get(start, float('inf'))

    open_set = []
    heapq.heappush(open_set, (f_score[start], g_score[start], start))


    while open_set:
        f_cur, g_cur, current = heapq.heappop(open_set)


        if current == goal:
            path = [current]
            while current in came_from:
                current = came_from[current]
                path.append(current)
            return path[::-1], g_cur, count, list(checking_edges)

        visited.add(current)


        if current == t and t_idx < len(T) - 1:
            t_idx += 1
            t = T[t_idx]
            dist_to_t = nx.single_source_dijkstra_path_length(G, t, weight='weight')

            new_open = []
            for _, g_old, node in open_set:
                new_f = g_score[node] + heuristic[node] + dist_to_t.get(node, float('inf'))
                new_open.append((new_f, g_old, node))
            open_set = new_open
            heapq.heapify(open_set)


        for nbr in G.successors(current):
            count += 1
            if nbr in visited or nbr in obstacles:
                continue
            w = G[current][nbr].get('weight', 1)
            tentative_g = g_score[current] + w
            checking_edges.add((current, nbr))

            if tentative_g < g_score[nbr]:
                came_from[nbr] = current
                g_score[nbr] = tentative_g
                f = tentative_g + heuristic[nbr] + dist_to_t.get(nbr, float('inf'))
                f_score[nbr] = f
                heapq.heappush(open_set, (f, tentative_g, nbr))


    return None, None, count, list(checking_edges)


In [7]:
with open("graphs/sejong_bus.json", "r") as json_file:
    j = json.load(json_file)

G = json_graph.node_link_graph(j)
nodes = G.nodes
edges = G.edges
stations = list(G.nodes())


The default value will be changed to `edges="edges" in NetworkX 3.6.


  nx.node_link_graph(data, edges="links") to preserve current behavior, or
  nx.node_link_graph(data, edges="edges") for forward compatibility.


In [8]:
# figure 2 
start = stations[1588]            
goal = stations[1780]

heuristic_table = {station: (((G.nodes[goal]['x'] - G.nodes[station]['x']) ** 2 + (G.nodes[goal]['y'] - G.nodes[station]['y']) ** 2) ** 0.5) for station in nodes}

path, cost, count, checking_edges = astar(G, start, goal, heuristic_table)
print("Count       :", count)
print("Cost        :", cost)
print("# of Checking edges:", len(checking_edges))
print("Path        :", path)

Count       : 590
Cost        : 22
# of Checking edges: 322
Path        : [6805517, 6805513, 6806243, 6806241, 6806245, 6800955, 6800947, 6806274, 6800031, 6801875, 6801149, 6805706, 6801172, 6806306]


In [9]:
# table(random) A*
experiments_samples_A_star = []

while True:
    start, goal = np.random.choice(nodes, 2)
    start, goal = start.item(), goal.item()

    heuristic_table = {station: (((G.nodes[goal]['x'] - G.nodes[station]['x']) ** 2 + (G.nodes[goal]['y'] - G.nodes[station]['y']) ** 2) ** 0.5) for station in nodes}

    path, cost, count, checking_edges = astar(G, start, goal, heuristic_table)
    if path is not None:
        experiments_samples_A_star.append({'point' : (start, goal), 'path':path, 'cost':cost, 'count':count, 'checking_edges' : len(checking_edges)})
    if len(experiments_samples_A_star)>150:
        break

In [10]:
llm_model = Request_llm('llama')

The `load_in_4bit` and `load_in_8bit` arguments are deprecated and will be removed in the future versions. Please, pass a `BitsAndBytesConfig` object in `quantization_config` argument instead.


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

In [14]:
experiments_samples_LLM_A_star = []

for i, sample in (enumerate(tqdm(experiments_samples_A_star[:10]))):
    start, goal = sample['point']

    heuristic_table = {station: (((G.nodes[goal]['x'] - G.nodes[station]['x']) ** 2 + (G.nodes[goal]['y'] - G.nodes[station]['y']) ** 2) ** 0.5) for station in nodes}

    waypoints = llm_model.get_waypoints(j, start, goal, 5)
    
    path, cost, count, checking_edges= llm_astar(G, start, goal, heuristic_table, waypoints)
    if path is not None:
        experiments_samples_LLM_A_star.append({'point' : (start, goal), 'path':path, 'cost':cost, 'count':count, 'checking_edges' : len(checking_edges)})
        cost_ratio = 1 - cost/sample['cost']
        count_ratio = 1 -count/sample['count']
        checking_edges_ratio = 1 - len(checking_edges)/sample['checking_edges']
        print("Sample ", i)
        print("Count ratio       :", 1 - count_ratio)
        print("Cost ratio        :", cost_ratio)
        print("# of Checking edges ratio:", 1 - checking_edges_ratio)

  0%|          | 0/10 [00:00<?, ?it/s]

Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.


[6800004, 6800005, 6800006, 6800007, 6805647]
Sample  0
Count ratio       : 0.04422604422604426
Cost ratio        : 0.0
# of Checking edges ratio: 0.05653266331658291
[6805632, 6805633, 6805629, 6805630, 6805631]
Sample  1
Count ratio       : 1.0184815184815186
Cost ratio        : 0.0
# of Checking edges ratio: 1.0122950819672132


Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.


[6801864, 6801904, 6801905, 6801911, 6801886]
Sample  2
Count ratio       : 0.9695357833655706
Cost ratio        : 0.0
# of Checking edges ratio: 0.9475766567754699


Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.


[6800372, 6800373, 6800374, 6800375, 6800376]
Sample  3
Count ratio       : 0.05601527689369834
Cost ratio        : 0.0
# of Checking edges ratio: 0.07594936708860756


Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.


[70, 40, 10, 20, 30]
Sample  4
Count ratio       : 0.02819548872180455
Cost ratio        : 0.0
# of Checking edges ratio: 0.03881987577639756


Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.


[6800002, 6800005, 70, 6800007, 6800009]
Sample  5
Count ratio       : 1.3087027914614122
Cost ratio        : 0.0
# of Checking edges ratio: 1.253682487725041


Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.


[70, 40, 10, 20, 30]
Sample  6
Count ratio       : 0.06794682422451992
Cost ratio        : 0.0
# of Checking edges ratio: 0.09989909182643797


Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.


[40, 70]
Sample  7
Count ratio       : 0.06951286261631084
Cost ratio        : 0.0
# of Checking edges ratio: 0.0843507214206437


Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.


[6806240, 6806236, 6806237, 6806238, 6806239]
Sample  8
Count ratio       : 0.1502890173410405
Cost ratio        : 0.0
# of Checking edges ratio: 0.18196457326892113
[70, 40, 10, 20, 30]
Sample  9
Count ratio       : 0.059082892416225774
Cost ratio        : 0.0
# of Checking edges ratio: 0.08318890814558055


In [None]:
# table(random) LLM-A*
experiments_samples_LLM_A_star = []

for sample in experiments_samples_A_star[:10]:
    start, goal = sample['point']
    waypoints = llm_model.get_waypoints(j, start, goal, n_points)
    path, cost, count, checking_edges= llm_astar(G, start, goal, heuristic_table, waypoints)

print("Count       :", count)
print("Cost        :", cost)
print("# of Checking edges:", len(checking_edges))
print("Path        :", path)