### Setup

In [None]:
import os
import osmnx as ox
import networkx as nx
from graph_search import *
from search import *
import networkx as nx
import pickle
import numpy as np

In [None]:
# Armenia Cities
with open(r"graphs/armenia_cities.pkl", "rb") as f:
    armenia_cities_graph = pickle.load(f)

# Armenia Cities & Villages
with open(r"graphs/armenia_cities_villages.pkl", "rb") as f:
    armenia_cities_villages_graph = pickle.load(f)

# Yerevan
with open(r"graphs/yerevan.pkl", "rb") as f:
    yerevan_graph = pickle.load(f)

In [None]:
print("Cities:", armenia_cities_graph.number_of_nodes(), "nodes")
print("Cities & Villages:", armenia_cities_villages_graph.number_of_nodes(), "nodes")
print("Yerevan:", yerevan_graph.number_of_nodes(), "nodes")

# Armenia Cities

In [None]:
G = armenia_cities_graph
G.nodes

In [None]:
test_pairs = [
    ("Meghri", "Tashir"),      # south → north
    ("Yerevan", "Goris"),      # medium distance
    ("Dilijan", "Alaverdi")    # short distance
]

UCS search

In [None]:
def run_ucs_for_pair(G, start_city, goal_city):
    print(f"\n UCS: {start_city} → {goal_city}")

    start_node_id = start_city
    goal_node_id  = goal_city

    initial = GraphState(G, start_node_id)
    goal_test = SimpleGoalTest(goal_node_id)

    # Run UCS
    ucs = UCSGraphSearch()
    solution_ucs = ucs.find_solution(initial, goal_test)

    # Print results
    if solution_ucs:
        path = extract_node_id_path(solution_ucs)
        print("Solution path:", path)
        print("Total cost (meters):", solution_ucs.path_cost)
        print("Nodes expanded:", ucs.get_number_of_nodes_in_last_search())
        return ucs.get_number_of_nodes_in_last_search()
    else:
        print("No solution found.")
        return None


In [None]:
ucs_results = {}

for start, goal in test_pairs:
    expanded = run_ucs_for_pair(G, start, goal)
    ucs_results[(start, goal)] = expanded

In [None]:
expanded_values = [v for v in ucs_results.values() if v is not None]
ucs_average = np.mean(expanded_values)

print("UCS expanded nodes per pair:", ucs_results)
print("Average UCS expanded nodes:", ucs_average)

A* Search

In [None]:
def run_astar_for_pair(G, start_city, goal_city):
    print(f"\n A*: {start_city} → {goal_city}")

    start_node_id = start_city
    goal_node_id  = goal_city

    initial = GraphState(G, start_node_id)
    goal_test = SimpleGoalTest(goal_node_id)

    # Heuristic centered at the goal node
    h = build_euclidean_heuristic(G, goal_node_id)

    # Run A*
    astar = AStarGraphSearch()
    solution_astar = astar.find_solution(initial, goal_test, h)

    # Print results
    if solution_astar:
        path = extract_node_id_path(solution_astar)
        print("Solution path:", path)
        print("Total cost (meters):", solution_astar.path_cost)
        print("Nodes expanded:", astar.get_expanded_node_count())
        return astar.get_expanded_node_count()
    else:
        print("No solution found.")
        return None

In [None]:
astar_results = {}

for start, goal in test_pairs:
    expanded = run_astar_for_pair(G, start, goal)
    astar_results[(start, goal)] = expanded

In [None]:
expanded_values_astar = [v for v in astar_results.values() if v is not None]
astar_average = np.mean(expanded_values_astar)

print("A* expanded nodes per pair:", astar_results)
print("Average A* expanded nodes:", astar_average)

Bidirectional Search

In [None]:
def run_bidir_for_pairs(G, start_node_id, goal_node_id):

    initial = GraphState(G, start_node_id)
    goal_test = SimpleGoalTest(goal_node_id)

    bidirectional = BidirectionalGraphSearch()
    solution = bidirectional.find_solution(initial, goal_test)

    if solution:
        path = extract_node_id_path(solution)
        print(f"\nSolution path:", path)
        print("Total cost (meters):", solution.path_cost)
        print("Nodes expanded:", bidirectional.get_expanded_node_count())
        return bidirectional.get_expanded_node_count()   # <-- return value
    else:
        print("No solution found.")
        return None

bidir_results = {}

for start, goal in test_pairs:
    expanded = run_bidir_for_pairs(G, start, goal)
    bidir_results[(start, goal)] = expanded

# Filter out None values
expanded_values = [v for v in bidir_results.values() if v is not None]

bidir_average = np.mean(expanded_values)

print(f"\nBidirectional expanded nodes per pair:", bidir_results)
print("Average Bidirectional expanded nodes:", bidir_average)


D* Lite

In [None]:
def run_dstar_for_pair(G, start_node_id, goal_node_id):

    # Convert MultiGraph → MultiDiGraph (D* Lite requires directed)
    G_dstar = nx.MultiDiGraph(G)

    # Prepare edge cost changer (default parameters)
    changer = EdgeCostChanger(G_dstar)

    print(f"\n D* Lite: {start_node_id} → {goal_node_id} ")

    # Initialize D* Lite search
    dlit = DStarSearch(G_dstar, start_node_id, goal_node_id, edge_cost_changer=changer)

    # Run the algorithm
    path, changed_edges, processed_count = dlit.main()

    if path:
        print("Path:", path)
        print("Path length (number of nodes):", len(path))
        print("Processed nodes:", processed_count)

        return processed_count  # return value for averaging

    else:
        print("No path found.")
        return None

dstar_results = {}

for start, goal in test_pairs:
    processed = run_dstar_for_pair(G, start, goal)
    dstar_results[(start, goal)] = processed

# Remove None values
processed_values = [v for v in dstar_results.values() if v is not None]

dstar_average = np.mean(processed_values)

print("\nD* Lite processed nodes per pair:", dstar_results)
print("Average processed nodes (D* Lite):", dstar_average)

# Armenia Cities and Villages

In [None]:
GV = armenia_cities_villages_graph
GV.nodes

In [None]:
test_pairs_villages = [
    ("Nrnadzor", "Lernahovit"),
    ("Maralik", "Ddmasar"),
    ("Gagarin", "Gnishik")
]

UCS

In [None]:
def run_ucs_for_pair(G, start_city, goal_city):
    print(f"\n UCS: {start_city} → {goal_city} ")

    # Define start + goal states
    initial = GraphState(G, start_city)
    goal_test = SimpleGoalTest(goal_city)

    ucs = UCSGraphSearch()
    solution = ucs.find_solution(initial, goal_test)

    if solution:
        path = extract_node_id_path(solution)
        expanded = ucs.get_number_of_nodes_in_last_search()

        print("Solution path:", path)
        print("Total cost (meters):", solution.path_cost)
        print("Nodes expanded:", expanded)
        return expanded
    else:
        print("No solution found.")
        return None

In [None]:
ucs_results_villages = {}
for start, goal in test_pairs_villages:
    ucs_results_villages[(start, goal)] = run_ucs_for_pair(GV, start, goal)


In [None]:
ucs_avg = np.mean([v for v in ucs_results_villages.values() if v is not None])
print("UCS average expanded nodes:", ucs_avg)

A* Search

In [None]:
def run_astar_for_pair(G, start_city, goal_city):
    print(f"\n A*: {start_city} → {goal_city}")

    # States
    initial = GraphState(G, start_city)
    goal_test = SimpleGoalTest(goal_city)

    # Build heuristic function (Euclidean)
    h = build_euclidean_heuristic(G, goal_city)

    astar = AStarGraphSearch()
    solution = astar.find_solution(initial, goal_test, h)

    if solution:
        path = extract_node_id_path(solution)
        expanded = astar.get_expanded_node_count()

        print("Solution path:", path)
        print("Total cost (meters):", solution.path_cost)
        print("Nodes expanded:", expanded)

        return expanded
    else:
        print("No solution found.")
        return None

In [None]:
astar_results_villages = {}
for start, goal in test_pairs_villages:
    astar_results_villages[(start, goal)] = run_astar_for_pair(GV, start, goal)


In [None]:
astar_avg = np.mean([v for v in astar_results_villages.values() if v is not None])
print("A* average expanded nodes:", astar_avg)

Bidirectional Search

In [None]:
def run_bidir_for_pairs(G, start_node_id, goal_node_id):

    initial = GraphState(G, start_node_id)
    goal_test = SimpleGoalTest(goal_node_id)

    bidirectional = BidirectionalGraphSearch()
    solution = bidirectional.find_solution(initial, goal_test)

    if solution:
        path = extract_node_id_path(solution)
        print(f"\nSolution path:", path)
        print("Total cost (meters):", solution.path_cost)
        print("Nodes expanded:", bidirectional.get_expanded_node_count())
        return bidirectional.get_expanded_node_count()   # <-- return value
    else:
        print("No solution found.")
        return None


bidir_results_villages = {}

for start, goal in test_pairs_villages:
    expanded = run_bidir_for_pairs(GV, start, goal)
    bidir_results_villages[(start, goal)] = expanded

# Filter out None values
expanded_values = [v for v in bidir_results_villages.values() if v is not None]

bidir_average_villages = np.mean(expanded_values)

print(f"\nBidirectional expanded nodes per pair:", bidir_results_villages)
print("Average Bidirectional expanded nodes:", bidir_average_villages)

D*Lite

In [None]:
def run_dstar_for_pair_villages(GV, start_node_id, goal_node_id):

    # Convert MultiGraph → MultiDiGraph (D* Lite requires directed)
    GV_dstar = nx.MultiDiGraph(GV)

    # Prepare edge cost changer (default parameters)
    changer = EdgeCostChanger(GV_dstar)

    print(f"\n D* Lite: {start_node_id} → {goal_node_id} ")

    # Initialize D* Lite search
    dlit = DStarSearch(GV_dstar, start_node_id, goal_node_id, edge_cost_changer=changer)

    # Run the algorithm
    path, changed_edges, processed_count = dlit.main()

    if path:
        print("Path:", path)
        print("Path length (number of nodes):", len(path))
        print("Processed nodes:", processed_count)
        return processed_count

    else:
        print("No path found.")
        return None


# ---- Run D* Lite for villages graph ----

dstar_results_villages = {}

for start, goal in test_pairs_villages:
    processed = run_dstar_for_pair_villages(GV, start, goal)
    dstar_results_villages[(start, goal)] = processed

processed_values_villages = [v for v in dstar_results_villages.values() if v is not None]
dstar_average_villages = np.mean(processed_values_villages)

print("\nD* Lite processed nodes per pair (villages):", dstar_results_villages)
print("Average processed nodes (D* Lite villages):", dstar_average_villages)

# Yerevan

In [None]:
GY = yerevan_graph
GY.nodes

Since the nodes are coded into numbers, let's use coordinates to set the test pairs

In [None]:
def get_nearest_node(G, lat, lon):
    return ox.distance.nearest_nodes(G, X=[lon], Y=[lat])[0]

In [None]:
test_pairs_yerevan = {
    "Nubarashen → Avan": (
        get_nearest_node(yerevan_graph, 40.111017, 44.575968),
        get_nearest_node(yerevan_graph, 40.223863, 44.561266)
    ),
    "AUA → UFAR": (
        get_nearest_node(yerevan_graph, 40.204347, 44.516199),
        get_nearest_node(yerevan_graph, 40.216227, 44.531846)
    ),
    "Yerevan Mall → Dalma": (
        get_nearest_node(yerevan_graph, 40.175310, 44.507810),
        get_nearest_node(yerevan_graph, 40.189900, 44.486100)
    )
}

UCS

In [None]:
def run_ucs_yerevan(G, start, goal):
    initial = GraphState(G, start)
    goal_test = SimpleGoalTest(goal)

    ucs = UCSGraphSearch()
    solution = ucs.find_solution(initial, goal_test)

    if solution:
        expanded = ucs.get_number_of_nodes_in_last_search()
        print("  UCS Path cost:", solution.path_cost)
        print("  UCS Nodes expanded:", expanded)
        return expanded
    else:
        print("  UCS: No solution")
        return None

A* Search

In [None]:
def run_astar_yerevan(G, start, goal):
    initial = GraphState(G, start)
    goal_test = SimpleGoalTest(goal)

    h = build_euclidean_heuristic(G, goal)

    astar = AStarGraphSearch()
    solution = astar.find_solution(initial, goal_test, h)

    if solution:
        expanded = astar.get_expanded_node_count()
        print("  A* Path cost:", solution.path_cost)
        print("  A* Nodes expanded:", expanded)
        return expanded
    else:
        print("  A*: No solution")
        return None

In [None]:
ucs_expanded = []
astar_expanded = []

print("Yerevan Route Experiments")

for label, (start, goal) in test_pairs_yerevan.items():
    print(f"\n {label} ")
    print("Start node:", start, "| Goal node:", goal)

    u = run_ucs_yerevan(yerevan_graph, start, goal)
    a = run_astar_yerevan(yerevan_graph, start, goal)

    if u is not None:
        ucs_expanded.append(u)
    if a is not None:
        astar_expanded.append(a)

# Compute averages
ucs_avg = np.mean(ucs_expanded)
astar_avg = np.mean(astar_expanded)

print("\n AVERAGE EXPANDED NODES (Yerevan graph) ")
print("UCS avg expanded nodes:", ucs_avg)
print("A* avg expanded nodes:", astar_avg)


Bidirectional Search

In [None]:
def run_bidir_for_pairs(G, start_node_id, goal_node_id):

    initial = GraphState(G, start_node_id)
    goal_test = SimpleGoalTest(goal_node_id)

    bidirectional = BidirectionalGraphSearch()
    solution = bidirectional.find_solution(initial, goal_test)

    if solution:
        path = extract_node_id_path(solution)
        print(f"\nSolution path:", path)
        print("Total cost (meters):", solution.path_cost)
        print("Nodes expanded:", bidirectional.get_expanded_node_count())
        return bidirectional.get_expanded_node_count()
    else:
        print("No solution found.")
        return None

bidir_results_yerevan = {}

print("\n Bidirectional Search on Yerevan Graph ")

for pair_label, (start_node, goal_node) in test_pairs_yerevan.items():
    print(f"\n {pair_label} ")
    expanded = run_bidir_for_pairs(GY, start_node, goal_node)
    bidir_results_yerevan[pair_label] = expanded

expanded_values = [v for v in bidir_results_yerevan.values() if v is not None]

bidir_average_yerevan = np.mean(expanded_values)

print(f"\nBidirectional expanded nodes per pair:", bidir_results_yerevan)
print("Average Bidirectional expanded nodes:", bidir_average_yerevan)

D*Lite

In [None]:
def run_dstar_for_pair_yerevan(GY, start_node_id, goal_node_id):

    # Convert MultiGraph → MultiDiGraph (D* Lite requires directed)
    GY_dstar = nx.MultiDiGraph(GY)

    # Prepare edge cost changer (default parameters)
    changer = EdgeCostChanger(GY_dstar)

    print(f"\n D* Lite: {start_node_id} → {goal_node_id} ")

    # Initialize D* Lite search
    dlit = DStarSearch(GY_dstar, start_node_id, goal_node_id, edge_cost_changer=changer)

    # Run the algorithm
    path, changed_edges, processed_count = dlit.main()

    if path:
        print("Path:", path)
        print("Path length (number of nodes):", len(path))
        print("Processed nodes:", processed_count)
        return processed_count

    else:
        print("No path found.")
        return None


dstar_results_yerevan = {}

print("\n D* Lite on Yerevan Graph ")

for label, (start_node, goal_node) in test_pairs_yerevan.items():
    print(f"\n {label} ")
    processed = run_dstar_for_pair_yerevan(GY, start_node, goal_node)
    dstar_results_yerevan[label] = processed

processed_values_yerevan = [v for v in dstar_results_yerevan.values() if v is not None]
dstar_average_yerevan = np.mean(processed_values_yerevan)

print("\nD* Lite processed nodes per pair (Yerevan):", dstar_results_yerevan)
print("Average processed nodes (D* Lite Yerevan):", dstar_average_yerevan)

In [None]:
# ----- Build toy MultiGraph -----
PLACE = "Kentron, Yerevan, Armenia"   # small; fast for smoke tests
NETWORK = "drive"

# ---------- download (simplify=True by default here) ----------
G = ox.graph.graph_from_place(PLACE, network_type=NETWORK, simplify=True)

# keep only the largest connected component (weak is fine for roads)
G = ox.truncate.largest_component(G, strongly=False)

keep_node = {"x", "y", "street_count"}
keep_edge = {"length", "highway", "name", "maxspeed"}

for _, d in G.nodes(data=True):
    for k in list(d.keys()):
        if k not in keep_node:
            d.pop(k, None)

for u, v, k, d in G.edges(keys=True, data=True):
    for kk in list(d.keys()):
        if kk not in keep_edge:
            d.pop(kk, None)
fig, ax = ox.plot.plot_graph(
    G,
    figsize=(6, 6),
    bgcolor="white",
    node_color="#2f4f4f",
    node_size=6,
    edge_color="#877778",
    edge_linewidth=0.8,
    show=False,
    close=False,
)