In [23]:
from data.graph_loader import GraphLoader
from data.gtfs_loader import GTFSLoader
import pandas as pd
import geopandas as gpd
import osmnx as ox
import networkx as nx

def openMap(m):
    html = "map.html"
    m.save(html)

    import webbrowser
    webbrowser.open(html)

In [2]:
graph_loader = GraphLoader()
graph_walk = graph_loader.create_graph_walk("data/graphs/ZMG_walk", "data/osm/ZMG_enclosure_2km.geojson")


Loading graph from data/graphs/ZMG_walk.pkl


In [None]:
gtfs_loader = GTFSLoader()
transit_df = gtfs_loader.load_transit_dataframe("data/gtfs")
stops_df = gtfs_loader.load_stops_dataframe("data/gtfs", transit_df)


In [16]:
def create_graph_transit(transit_df, stops_df):
    # create graph transit adjacency list in stops_df
    G = nx.DiGraph()

    for idx, stop in stops_df.iterrows():
        stop_id = stop['stop_id']
        G.add_node(stop_id, pos=stop['geometry'], stop_name=stop['stop_name'], x=stop['stop_lon'], y=stop['stop_lat'], routes=stop['routes_by_stop'])
        next_stops = stop['next_stop_id']
        for next_stop_id, edge_data in next_stops.items():
            G.add_edge(stop_id, next_stop_id, **edge_data)
    return G

graph_transit = create_graph_transit(transit_df, stops_df)

   

In [27]:
#get first edge in the walk graph
walk_gdf = ox.graph_to_gdfs(graph_walk, nodes=True, edges=False, fill_edge_geometry=True)
#create dataframe from walk_gdf
walk_df = pd.DataFrame(walk_gdf)


stops_gdf = gpd.GeoDataFrame(stops_df, geometry='geometry', crs='EPSG:4326')

In [None]:
def index_to_coord(index:int, grid_size:int):
    row = index // grid_size
    col = index % grid_size
    return (row, col)

def coord_to_index(coord:tuple, grid_size:int):
    row, col = coord
    return row * grid_size + col    

def manhattan_heuristic(node:int, goal:int, grid_size:int):
    node_row, node_col = index_to_coord(node, grid_size)
    goal_row, goal_col = index_to_coord(goal, grid_size)
    return abs(node_row - goal_row) + abs(node_col - goal_col)

def euclidean_heuristic(node:int, goal:int, grid_size:int):
    node_row, node_col = index_to_coord(node, grid_size)
    goal_row, goal_col = index_to_coord(goal, grid_size)
    return ((node_row - goal_row) ** 2 + (node_col - goal_col) ** 2) ** 0.5

def dijkstra(graph:list[list[tuple]], src:int, dst:int, heuristic=None):
    size = len(graph)
    #accumulated cost for each node from the start
    cost = size * [None]
    #stores the node that it was coming from
    previous = size * [None]
    #stores if the node has been already been proccessed
    visited = size * [False]
    #visited order for post anymation
    visited_order = []
    #"minumum heap" of the candidates. Implemented as a simple list in each insertion we take care of insert in the proper index
    queue = []
    #to return the path from src to dst
    path = []
    
    #current node is beeing proccesed, cost is 0 for the src node
    current = src
    cost[src] = 0

    #graph is an adjancecy list to avoid O(n) search for neighbors and O(n^4) given a matrix of n^2*n^2 space complexity.
    while True:

        #visit all the neigbors of the current node
        for neighbor, weight in graph[current]:

            if not visited[neighbor]:
                if heuristic is not None and heuristic.lower() == "manhattan":
                    heuristic_cost = manhattan_heuristic(neighbor, dst, int(size**0.5)) 
                elif heuristic is not None and heuristic.lower() == "euclidean":
                    heuristic_cost = euclidean_heuristic(neighbor, dst, int(size**0.5))
                else:
                    heuristic_cost = 0
                tentative_cost = cost[current] + weight
                #update if we found a better path or this is the first time visiting
                if cost[neighbor] is None or cost[neighbor] > tentative_cost:
                    #add cost from previous and the previous node
                    cost[neighbor] = tentative_cost
                    previous[neighbor] = current
                    # Add it into the queue keeping it sorted by cost
                    priority_cost = tentative_cost + heuristic_cost
                    index = 0
                    while index < len(queue) and priority_cost >= queue[index][0]:
                        index += 1
                    queue.insert(index, (priority_cost, neighbor))
        
        #current node is visited since it checked all neighbors, finish if the visited one was the dst
        if current == dst:
            visited[dst] = True
            break
        else:
            visited[current] = True
            visited_order.append(current)
        
        # Check if queue is empty destination not reachable
        if not queue:
            return None, None, visited_order
        
        #determine the next node to be visited, select the first element of the queue
        #Skip nodes that are already visited (stale queue entries)
        while queue:
            _, current = queue.pop(0)
            if not visited[current]:  # Only process if not already visited
                break
        else:
            # All nodes in queue were already visited, no path exists
            return None, None, visited_order

    
    #recreate the path based on the previous going backwards but store it in forward 
    node = dst
    while node is not None:
        path.append(node)
        node = previous[node]
    path.reverse()
    
    return path, cost[dst], visited_order
