In [2]:
# import libraries
import osmnx as ox
import pandas as pd
import folium
import networkx as nx
import pickle

from geopy.distance import geodesic
from networkx.algorithms.shortest_paths.weighted import _weight_function
from heapq import heappop, heappush
from itertools import count

from functools import lru_cache
from geopy.geocoders import Nominatim
import warnings
# remove warnings
warnings.filterwarnings("ignore")

In [3]:
# finding the coordinates of a place from the address
def get_coordinates(location_name):
    # Initialize the Nominatim geocoder
    geolocator = Nominatim(user_agent="Modified_A_start_Search_in_OSMNX")
    location = geolocator.geocode(location_name)
    if location:
        latitude = location.latitude
        longitude = location.longitude
        return longitude, latitude
    else:
        print("Unable to find the coordinates of the location")


## Load the graph and charging stations info


In [4]:
# Load the graph of place
# specify the location
place_name = "Delhi, India"
#G = ox.graph_from_place(place_name, network_type='drive', simplify=True)
# pickle.dump(G, open('filename.pickle', 'wb'))
#charging_stations = ox.geometries_from_place(place_name, tags={'amenity': 'charging_station'})

# load the delhi graph
G = pickle.load(open("delhi.pickle", "rb"))

# load the charging info
charging_stations = pd.read_csv("charging_stations.csv")
charging_nodes = {}
charging_distances = {}
# iterate through the dataframe
for i, station in charging_stations.iterrows():
    # get the coordinates
    x = station["longitude"]
    y = station["lattitude"]
    # get the nearest node
    nearest_node = station["nearest_node"]
    # get the distance
    dist = station["distance"]
    charging_nodes[nearest_node] = (x, y)
    charging_distances[nearest_node] = dist

In [50]:
charging_stations = ox.geometries_from_place(place_name, tags={'amenity': 'charging_station'})

In [8]:
'''charging_distances = {}
charging_nodes = {}

for i, station in charging_stations.iterrows():
    if station['geometry'].geom_type != 'Point':
        continue
    x, y = station['geometry'].x, station['geometry'].y
    nearest_node, dist = ox.nearest_nodes(G, x, y, return_dist=True)
    charging_distances[nearest_node] = dist
    charging_nodes[nearest_node] = (x, y)

charging_stations = []
columns = ["longitude", "lattitude", "nearest_node", "distance"]
for node in charging_distances:
    charging_stations.append([charging_nodes[node][0], charging_nodes[node][1], node, charging_distances[node]])
with open("charging_stations.csv", "w") as f:
    writter = csv.writer(f)
    writter.writerow(columns)
    writter.writerows(charging_stations)'''
print()




In [5]:
# get source coordinates
source_address = "Delhi International Airport"#, Delhi, India"
# source_coordinates = get_coordinates("source address")
source_coordinates = (77.0851248, 28.555524) # for Delhi International Airport

In [6]:
# get destination coordinates
destination_address = "Red Fort"#, Delhi, India"
# destination_coordinates = get_coordinates('destionation address')
destination_coordinates = (77.24079592327321, 28.65608095) # for Red Fort


In [7]:
class Graph():
    def __init__(self, G, charging_distances, weight='length'):
        """Returns a list of nodes in a shortest path between source and target using the modified A* algorithm.

        There may be more than one shortest path.  This returns only one.

        Parameters
        ----------
        G : NetworkX graph
        
        charging_destinations : dict
            A dictionary containing the nodes that are closest to the charging stations
            & their distances from from the charging stations
        
        weight : string
            The edge data key corresponding to the edge weight
        """
        self.G = G
        self.charging_distances = charging_distances
        self.weight = _weight_function(self.G, weight)
        self.G_succ = self.G._adj # For speed-up (and works for both directed and undirected graphs)

    def astar_path(self, source, target, max_range, charge):
        """
        Parameters:
        ----------
        source : node
            Starting node for path

        target : node
            Ending node for path

        max_range : int
            The maximum distance that the vehicle can travel on a single charge in kms
        
        charge : int
            The initial charge of the vehicle in percentage

        """
        if source not in self.G or target not in self.G:
            msg = f"Either source {source} or target {target} is not in G"
            raise nx.NodeNotFound(msg)

        # define function to find distance from target node
        @lru_cache(maxsize=None)
        def distance_from_target(u, v):
            return geodesic((self.G.nodes[u]['x'], self.G.nodes[u]['y']), (self.G.nodes[v]['x'], self.G.nodes[v]['y'])).meters
        
        push = heappush
        pop = heappop

        # The queue stores priority, node, cost to reach, and parent.
        # Uses Python heapq to keep in priority order.
        # Add a counter to the queue to prevent the underlying heap from
        # attempting to compare the nodes themselves. The hash breaks ties in the
        # priority and is guaranteed unique for all nodes in the graph.

        c = count()
        queue = [(0, charge, next(c), source, 0, None)] # 0 cost to reach source

        # Maps enqueued nodes to distance of discovered paths and the
        # computed heuristics to target. We avoid computing the heuristics
        # more than once and inserting the node into the queue too many times.
        enqueued = {}
        # Maps explored nodes to parent closest to the source.
        explored = {}

        while queue:
            # pop the smallest item from queue
            _, curcharge, _, curnode, dist, parent = pop(queue)

            # check if the node has a charging station nearby
            if curnode in self.charging_distances:
                curcharge = 100

            if curnode == target:
                path = [curnode]
                node = parent
                while node is not None:
                    path.append(node)
                    node = explored[node]
                path.reverse()
                return path, curcharge, explored
            
            if curnode in explored:
                # Do not override the parent of starting node
                if explored[curnode] is None:
                    continue
                
                if enqueued[curnode] < dist:
                    continue
            
            explored[curnode] = parent

            for neighbor, w in self.G_succ[curnode].items():
                edge_weight = self.weight(curnode, neighbor, w)
                ncost = dist + edge_weight
                new_charge = curcharge - edge_weight/(max_range*10) # old charge - percentage battery used to travel edge_weight
                # If the new charge is less than 0, the vehicle cannot reach the neighbor
                if new_charge < 0:
                    continue
    
                if neighbor in enqueued:
                    # A less costly path is already known
                    if enqueued[curnode] <= ncost:
                        continue
                h = distance_from_target(neighbor, target)

                enqueued[neighbor] = ncost
                push(queue, ((ncost + h) * (400 - new_charge)/400, new_charge, next(c), neighbor, ncost, curnode))
        return None, None, explored
    
    def get_edge_distance(self, u, v):
        return self.weight(u, v, G.adj[u][v])


In [34]:
# define the maximum range of the vehicle
max_range = 30
initial_charge = 45 # charging in %
source_node, distance_from_source = ox.nearest_nodes(G, *source_coordinates, return_dist=True)
destination_node, distance_from_destination = ox.nearest_nodes(G, *destination_coordinates, return_dist=True)
# find the path from source node to destination node
graph_obj = Graph(G=G, charging_distances=charging_distances, weight='length')
path, charge, explored = graph_obj.astar_path(source=source_node, target=destination_node, max_range=max_range, charge=initial_charge)


In [179]:
# define heuristic from node to destination node
def distance_from_destination(u, v):
    return geodesic((G.nodes[u]['x'], G.nodes[u]['y']), (G.nodes[v]['x'], G.nodes[v]['y'])).meters
# find the path using A* algorithm in networkx
path = nx.astar_path(G, source_node, destination_node, heuristic=distance_from_destination, weight='length')

In [167]:
print(path)

[4572504342, 4572504360, 1377318095, 1794070301, 1987508656, 1987508655, 1800466427, 1800466416, 1800466415, 1986214319, 1986226701, 5482133017, 10301921041, 8373361266, 4230117286, 10981451587, 1986214419, 1986214400, 6855561887, 6816510148, 3195457994, 1478655816, 6822400108, 345576017, 278163583, 910665235, 9792383644, 5985299778, 277419824, 9792407221, 9792383712, 7047751867, 7047577234, 5179521923, 4776237946, 915368944, 246513647, 6669107781, 11792964332, 6554718342, 6669107782, 9835235882, 9835234361, 9835234362, 5339325445, 5339323504, 9843524744, 4930107692, 10303750895, 10303750899, 10264741954, 267253217, 267253220, 5345583358, 5345583360, 267249903, 267249905, 1271972179, 267249911, 11301690340, 3661513066, 10264255641, 10264255633, 3661513068, 3661513070, 11462925576, 395135103, 1762977534, 1869716895, 249783308, 345265121, 432874005, 249783313, 249783316, 5681296953, 5681321307, 5681296956, 5681296951, 5681296958, 5681296959, 9873910595, 5681296960, 5681296950, 458640661,

In [33]:

if not charge:
    print(f"No path found from {source_address} to {destination_address} with the {initial_charge:.2f} charge and {max_range} range.")
else:
    print("Results:")
    print("-------")
    print(f"Optimal path found from {source_address} to {destination_address}\n with the {initial_charge:.2f}% charge and {max_range}km range.")
    distance = sum(graph_obj.get_edge_distance(path[i], path[i+1]) for i in range(len(path)-1))
    print()
    print(f"Distance: {distance:,.2f} meters")
    print()

    # find the number of charging stations in the path
    numCharge = 0
    for charging_node in path:
        if charging_node in charging_distances:
            numCharge += 1
    print("Number of charging stations in the path: ", numCharge)
    print()

Results:
-------
Optimal path found from Delhi International Airport to Red Fort
 with the 100.00% charge and 100km range.

Distance: 23,880.33 meters

Number of charging stations in the path:  3



In [119]:
# for finding the distances between the nodes
G_succ = G._adj
weight = _weight_function(G, 'length')
max_range = 30 * 1000
# find charging stations in the path for minimal charging
def find_charging_stations(G, path, charging_nodes, charge):
    distances = [0]
    for i in range(len(path)-1):
        u, v = path[i], path[i+1]
        distance = weight(u, v, G_succ[u][v])
        distances.append(distance)
    total_dist = 0
    arr, indexes = [], []
    for i in range(len(distances)):
        total_dist += distances[i]
        if path[i] in charging_nodes:
            indexes.append(i)
            arr.append(total_dist)
            total_dist = 0
    # find the minimum number of charging stations required
    return arr, indexes, distances
arr, indexes, distances = find_charging_stations(G, path, charging_nodes, charge=initial_charge)
print(arr)

def dfs_ef(i, charge):
    if i == len(arr):
        return []    
    if arr[i] - max_range * charge/100 > 20:
        return [i] + dfs_ef(i+1, 80)
    else:
        charged = [i] + dfs_ef(i+1, 80)
        not_charged = dfs_ef(i+1, charge - (arr[i]/max_range)*100)
        if len(charged) < len(not_charged):
            return charged
        else:
            return not_charged
index = set(dfs_ef(0, initial_charge))
if not index:
    print("Efficient charging options not found")
    def dfs(i, charge):
        if i == len(arr):
            return []  
        if arr[i] > max_range * charge/100:
            return [i] + dfs(i+1, 100)
        else:
            charged = [i] + dfs(i+1, 100)
            not_charged = dfs(i+1, charge - (arr[i]/max_range)*100)
            if len(charged) < len(not_charged):
                return charged
            else:
                return not_charged
    index = set(dfs(0, initial_charge))
print(index)
print([indexes[idx] for idx in index])


[14928.000000000002, 4676.1140000000005, 365.5590000000001]
Efficient charging options not found
set()
[]


In [32]:
# Create a Folium map centered around location
curmap = folium.Map(location=[28.5499202, 77.1142483], zoom_start=12)  # center coordinates

# Add the shortest path to the map as a polyline
# Get coordinates of nodes in the shortest path
if False:
    for node1, node2 in explored.items():
        if node1 == None or node2 == None:
            continue
        path_coords = [(G.nodes[node1]['y'], G.nodes[node1]['x']), (G.nodes[node2]['y'], G.nodes[node2]['x'])]
        folium.PolyLine(locations=path_coords, color='yellow', weight=3, opacity=0.7).add_to(curmap)
if charge:
    path_coords = [(source_coordinates[1], source_coordinates[0])]
    for node in path1:
        path_coords.append((G.nodes[node]['y'], G.nodes[node]['x']))
        # add the path to the charging station & back to the node
    path_coords += [(destination_coordinates[1], destination_coordinates[0])]
    folium.PolyLine(locations=path_coords, color='red', weight=5, opacity=0.7).add_to(curmap)
if charge:
    path_coords = [(source_coordinates[1], source_coordinates[0])]
    for node in path2:
        path_coords.append((G.nodes[node]['y'], G.nodes[node]['x']))
        # add the path to the charging station & back to the node
    path_coords += [(destination_coordinates[1], destination_coordinates[0])]
    folium.PolyLine(locations=path_coords, color='green', weight=5, opacity=0.7).add_to(curmap)
if charge:
    path_coords = [(source_coordinates[1], source_coordinates[0])]
    for node in path3:
        path_coords.append((G.nodes[node]['y'], G.nodes[node]['x']))
        # add the path to the charging station & back to the node
    path_coords += [(destination_coordinates[1], destination_coordinates[0])]
    folium.PolyLine(locations=path_coords, color='blue', weight=5, opacity=0.7).add_to(curmap)
folium.Marker(location=(source_coordinates[1], source_coordinates[0]), icon=folium.Icon(color='blue')).add_to(curmap)
folium.Marker(location=(destination_coordinates[1], destination_coordinates[0]), icon=folium.Icon(color='blue')).add_to(curmap)
# add charging station points in the graph
"""for node in G.nodes:
    if random.choice(range(100)) != 5:
        continue
    coordinates = G.nodes[node]['y'], G.nodes[node]['x']
    folium.Marker(location=coordinates, icon=folium.Icon(color='green')).add_to(curmap)"""
if True:
    for node in charging_nodes:
        folium.Marker(location=(G.nodes[node]['y'], G.nodes[node]['x']), icon=folium.Icon(color='red')).add_to(curmap)

'''for node in charging_distances:
    #if node in shortest_path_nodes:
    folium.Marker(location=[G.nodes[node]['y'], G.nodes[node]['x']], icon=folium.Icon(color='green')).add_to(curmap)'''
'''for i, idx in enumerate(indexes):
    if i in index:
        folium.Marker(location=[G.nodes[path[idx]]['y'], G.nodes[path[idx]]['x']], icon=folium.Icon(color='green')).add_to(curmap)
    else:
        folium.Marker(location=[G.nodes[path[idx]]['y'], G.nodes[path[idx]]['x']], icon=folium.Icon(color='red')).add_to(curmap)'''
# Save the map to an HTML file or display it
curmap.save('map.html')  # Save as HTML file
curmap  # Display the map in Jupyter Notebook or IPython