### Useful references
- OSMnx docs: https://osmnx.readthedocs.io/en/stable/
- OSMnx examples: https://github.com/gboeing/osmnx-examples
- Shapely manual: https://shapely.readthedocs.io/en/stable/manual.html
- Folium docs: https://python-visualization.github.io/folium/index.html
- Available markers icons: https://www.w3schools.com/bootstrap/bootstrap_ref_comp_glyphs.asp
- Directed angle between vectors: https://it.mathworks.com/matlabcentral/answers/180131-how-can-i-find-the-angle-between-two-vectors-including-directional-information

In [1]:
import os
import warnings
import random
import pickle
import numpy as np
import pandas as pd
import folium
import shapely
import geopandas as gpd
import networkx as nx
import osmnx as ox
from pyproj import Geod
from math import atan2, degrees
from shapely.geometry import LineString
from typing import Optional, Tuple, List

Config

In [2]:
DATA_FOLDER = "../data/"
GEOG_CRS = "EPSG:4326"
PROJ_CRS = "EPSG:3857"

ox.settings.log_console = True
pd.set_option("display.max_columns", None)
warnings.filterwarnings('ignore')

random.seed(42)
geod = Geod(ellps="WGS84")

In [3]:
interv_dist_weights = dict(
    traffic_light=2.6767,
    turn_left=14.7726,
    turn_right=13.8288,
    curve=0.0 
)

co2_weights = dict(
    traffic_light=0.0060,
    turn_left=0.0,
    turn_right=0.0088,
    curve=0.0049 
)

Utils

In [8]:
def load_obs(filepath) -> gpd.GeoDataFrame:
    """
    Load observations from a csv file.
    """
    df = pd.read_csv(filepath, sep=r"\+ACI-,\+ACI-", names=["lat", "lon"], engine="python")
    df = df.drop(df.index[0])

    # remove prefix from lat column
    df["lat"] = df["lat"].str.replace("+ACI-", "", regex=False).str.replace(",", ".").astype(float)
    # remove suffix from lng column
    df["lon"] = df["lon"].str.replace("+ACI-", "", regex=False).str.replace(",", ".").astype(float)

    points = gpd.points_from_xy(df.lon, df.lat)
    return gpd.GeoDataFrame(df, geometry=points, crs=GEOG_CRS)


def graph_containing_obs(obs: gpd.GeoDataFrame, distance: float = 1000) -> nx.MultiDiGraph:
    """
    Create a graph of the area containing the observations.
    """
    
    assert(obs.crs == GEOG_CRS)

    # project the gdf to PROJ_CRS
    obs_proj = obs.to_crs(PROJ_CRS)

    # get the bounding box of the projected gdf
    bbox_proj = obs_proj.total_bounds

    # enlarge bbox_proj by <distance> meters
    bbox_proj = [bbox_proj[0] - distance, bbox_proj[1] - distance, bbox_proj[2] + distance, bbox_proj[3] + distance]

    # bbox_proj to polygon
    bbox_proj_geom = gpd.GeoSeries([shapely.geometry.box(*bbox_proj)], crs=PROJ_CRS).geometry
    bbox_poly = bbox_proj_geom.to_crs(GEOG_CRS).geometry.unary_union

    # create a graph from the bounding box
    return ox.graph_from_polygon(bbox_poly, network_type="drive"), bbox_poly


def impute_missing_geometries(G: nx.MultiDiGraph) -> nx.MultiDiGraph:
    """
    Impute missing geometries in the graph
    """

    # only simplified edges have the geometry attribute
    # see https://stackoverflow.com/questions/64333794/osmnx-graph-from-point-and-geometry-information

    x_lookup = nx.get_node_attributes(G, "x")
    y_lookup = nx.get_node_attributes(G, "y")

    for u, v, data in G.edges(data=True):
        if "geometry" not in data:
            data["geometry"] = LineString([(x_lookup[u], y_lookup[u]), (x_lookup[v], y_lookup[v])])

    return G


def add_edge_linearity(G: nx.MultiDiGraph) -> nx.MultiDiGraph:
    """
    Add edge linearity to the graph
    """
    
    for u, v, k, data in G.edges(keys=True, data=True):
        if "geometry" not in data:
            print("geometry not in data", u, v)

        assert "geometry" in data
        
        start_x, start_y = data["geometry"].coords[0]
        end_x, end_y = data["geometry"].coords[-1]

        assert abs(start_x - G.nodes[u]["x"]) < 0.001
        assert abs(start_y - G.nodes[u]["y"]) < 0.001
        assert abs(end_x - G.nodes[v]["x"]) < 0.001
        assert abs(end_y - G.nodes[v]["y"]) < 0.001


        greatc_old = ox.distance.great_circle_vec(G.nodes[u]["y"], G.nodes[u]["x"], G.nodes[v]["y"], G.nodes[v]["x"])
        greatc_dist = ox.distance.great_circle_vec(start_y, start_x, end_y, end_x) 

        if abs(greatc_dist - greatc_old) > 1:
            print("len", data["length"])
            print("greatc", greatc_dist)
            print("greatc wrong", greatc_old)
            print("u:", G.nodes[u], "v:", G.nodes[v])
            print("start", (start_x, start_y), "end", (end_x, end_y))
            print()

        length = data["length"]

        data["linearity"] = greatc_dist / length

    return G


def marker_from_row(row: pd.Series, m: folium.Map, icon: str = "info-sign", color: str = "blue"):
    """
    row: Series representing the POI to plot
    m: folium map object
    icon: icon name
    color: marker color
    """
    long, lat = row.geometry.x, row.geometry.y
    folium.Marker(
        location=[lat, long],
        popup=row.to_string(),
        icon=folium.Icon(icon=icon),
        color=color
    ).add_to(m)
    

def path_to_uvk(G, path):
    node_pairs = zip(path[:-1], path[1:])
    uvk = ((u, v, min(G[u][v].items(), key=lambda k: k[1]["length"])[0]) for u, v in node_pairs)
    return uvk 


def perpendicular_line(ab: LineString, cd_length : Optional[float] = None) -> LineString:
    if cd_length is None:
        cd_length = ab.length / 2

    left = list(ab.parallel_offset(cd_length / 2, 'left').coords)
    right = list(ab.parallel_offset(cd_length / 2, 'right').coords)
    c = [
        (left[0][0] + left[1][0]) / 2,
        (left[0][1] + left[1][1]) / 2
    ]
    d = [
        (right[0][0] + right[1][0]) / 2,
        (right[0][1] + right[1][1]) / 2
    ]
    cd = LineString([c, d])

    return cd   


def generate_midpoints(orig_node: int, dest_node: int, n_midpoints: int, nodes: gpd.GeoDataFrame, G_proj: nx.MultiDiGraph) -> List[int]:
    """
    orig_node: origin node osmid
    dest_node: destination node osmid
    n_midpoints: number of midpoints to generate
    nodes: nodes GeoDataFrame
    G_proj: projected graph
    """
    assert nodes.crs == GEOG_CRS
    assert G_proj.graph['crs'] == PROJ_CRS

    orig_point, _ = ox.projection.project_geometry(nodes.loc[orig_node, 'geometry'], crs=GEOG_CRS, to_crs=PROJ_CRS)
    orig_coords = orig_point.coords[0]
    dest_point, _ = ox.projection.project_geometry(nodes.loc[dest_node, 'geometry'], crs=GEOG_CRS, to_crs=PROJ_CRS)
    dest_coords = dest_point.coords[0]

    ab = LineString([orig_coords, dest_coords])
    cd = perpendicular_line(ab)

    ab_geog, _ = ox.projection.project_geometry(ab, crs=PROJ_CRS, to_crs=GEOG_CRS)
    cd_geog, _ = ox.projection.project_geometry(cd, crs=PROJ_CRS, to_crs=GEOG_CRS)
    
    spacing = cd.length / (n_midpoints-1)

    X, Y = zip(*ox.utils_geo.interpolate_points(geom=cd, dist=spacing))

    midpoints = ox.distance.nearest_nodes(G_proj, X, Y)

    return midpoints


def multiple_paths(G, orig_node, dest_node, midpoints, weight='length'):
    paths = []
    for midpoint in midpoints:
        gen_1 = ox.shortest_path(G, orig_node, midpoint, weight)
        if gen_1 is None:   # no path found
            continue

        temp = list(gen_1)
        temp.pop()

        gen_2 = ox.shortest_path(G, midpoint, dest_node, weight)
        if gen_2 is None:   # no path found
            continue
        
        temp += list(gen_2)
        paths.append(temp)

    return paths


## Original implementation
# def path_metrics(nodes, path_edges):
#     events = []
#     left_turns = 0
#     right_turns = 0
#     iterator = path_edges.itertuples()
#     prev = next(iterator)
#     prev_end = LineString(prev.geometry.coords[-2:])

#     for curr in iterator:
#         u, v, _ = curr.Index
        
#         if nodes.loc[u, 'street_count'] < 3:
#             continue
            
#         curr_start = LineString(curr.geometry.coords[:2])
            
#         # folium.PolyLine(locations=[coord[::-1] for coord in prev_end.coords], color="red").add_to(m)
#         # folium.PolyLine(locations=[coord[::-1] for coord in curr_start.coords], color="green").add_to(m)
        
#         x1 = prev_end.coords[1][0] - prev_end.coords[0][0]
#         y1 = prev_end.coords[1][1] - prev_end.coords[0][1]
#         x2 = curr_start.coords[1][0] - curr_start.coords[0][0]
#         y2 = curr_start.coords[1][1] - curr_start.coords[0][1]
#         theta = degrees(atan2(x1 * y2 - y1 * x2, x1 * x2 + y1 * y2))
        
#         print(f"Node {u} --> Angle between edge {prev.osmid} and {curr.osmid}: {theta} degrees")
        
#         # svolta a sinistra: angolo tra prev e curr compreso tra 25 e 155
#         if 25 <= theta <= 155:
#             left_turns += 1
#             events.append((nodes.loc[u].geometry.x, nodes.loc[u].geometry.y, "left_turn"))
#         elif -155 <= theta <= -25:
#             right_turns += 1
#             events.append((nodes.loc[u].geometry.x, nodes.loc[u].geometry.y, "right_turn"))
        
#         prev = curr
#         prev_end = LineString(prev.geometry.coords[-2:])

#     return events


def detect_turns(nodes: gpd.GeoDataFrame, path_edges: gpd.GeoDataFrame) -> Tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]:
    left_turns = []
    right_turns = []

    iterator = path_edges.itertuples()
    prev = next(iterator)
    prev_end = LineString(prev.geometry.coords[-2:])

    for curr in iterator:
        u, v, _ = curr.Index
        
        if nodes.loc[u, 'street_count'] < 3:
            continue
            
        curr_start = LineString(curr.geometry.coords[:2])
            
        # folium.PolyLine(locations=[coord[::-1] for coord in prev_end.coords], color="red").add_to(m)
        # folium.PolyLine(locations=[coord[::-1] for coord in curr_start.coords], color="green").add_to(m)
        
        x1 = prev_end.coords[1][0] - prev_end.coords[0][0]
        y1 = prev_end.coords[1][1] - prev_end.coords[0][1]
        x2 = curr_start.coords[1][0] - curr_start.coords[0][0]
        y2 = curr_start.coords[1][1] - curr_start.coords[0][1]
        theta = degrees(atan2(x1 * y2 - y1 * x2, x1 * x2 + y1 * y2))
        
        # print(f"Node {u} --> Angle between edge {prev.Index} and {curr.Index}: {theta} degrees")
        
        # svolta a sinistra: angolo tra prev e curr compreso tra 25 e 155
        if 25 <= theta <= 155:
            item = {}
            item['node'] = u
            item['prev'] = prev.Index
            item['curr'] = curr.Index
            item['angle'] = theta
            item['geometry'] = nodes.loc[u].geometry
            left_turns.append(item)
        elif -155 <= theta <= -25:
            item = {}
            item['node'] = u
            item['prev'] = prev.Index
            item['curr'] = curr.Index
            item['angle'] = theta
            item['geometry'] = nodes.loc[u].geometry
            right_turns.append(item)
        
        prev = curr
        prev_end = LineString(prev.geometry.coords[-2:])

    if left_turns:
        left_turns = gpd.GeoDataFrame(left_turns, geometry='geometry', crs=GEOG_CRS)
    else:
        left_turns = None
        
    if right_turns:
        right_turns = gpd.GeoDataFrame(right_turns, geometry='geometry', crs=GEOG_CRS)
    else:
        right_turns = None

    return left_turns, right_turns


## Alternative implementation for buffers
# def path_buffer(path_data: gpd.GeoDataFrame, meters: int)-> gpd.GeoDataFrame:
#     if path_data.crs != PROJ_CRS:
#         path_data = ox.project_gdf(path_data, to_crs=PROJ_CRS)

#     gs = path_data.buffer(meters)
#     geom = gs.unary_union

#     buffer_gdf = gpd.GeoDataFrame(geometry=[geom], crs=gs.crs)

#     return buffer_gdf


# def count_traffic_lights(traffic_lights: gpd.GeoDataFrame, buffer: gpd.GeoDataFrame) -> int:
#     if traffic_lights.crs != PROJ_CRS:
#         traffic_lights = traffic_lights.to_crs(PROJ_CRS)
    
#     if buffer.crs != PROJ_CRS:
#         buffer = buffer.to_crs(PROJ_CRS)

#     return traffic_lights.sjoin(buffer)   


def traffic_lights_along_path(traffic_lights: gpd.GeoDataFrame, path_data: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
    if traffic_lights.crs != PROJ_CRS:
        traffic_lights = traffic_lights.to_crs(PROJ_CRS)
    
    if path_data.crs != PROJ_CRS:
        path_data = path_data.to_crs(PROJ_CRS)

    result = []
    edges_iterator = path_data.iterrows()
    _, prev_edge = next(edges_iterator)
    i = 1
    for edge_index, edge in edges_iterator:
        for light_index, light in traffic_lights.iterrows():
            if (edge['geometry'].buffer(1).difference(prev_edge['geometry'].buffer(1))).intersects(light['geometry']) \
            and \
            (edge['reversed'] == False and light['traffic_signals:direction'] != 'backward'
            or edge['reversed'] == True and light['traffic_signals:direction'] != 'forward'):        
                item = {}
                item['osmid'] = light['osmid']
                item['direction'] = light['traffic_signals:direction']
                item['reversed'] = edge['reversed']
                item['edge'] = edge_index
                item['geometry'] = light['geometry']
                result.append(item)
                
        prev_edge = edge
        i += 1

    if result:
        return gpd.GeoDataFrame(result, crs=PROJ_CRS).to_crs(GEOG_CRS)
 
    return None


def detect_curves(path_data: gpd.GeoDataFrame, lin_th: float = 0.95) -> gpd.GeoDataFrame:
    result = path_data[path_data['linearity'] < lin_th]

    if result.shape[0]:
        result.loc[:, 'geometry'] = result['geometry'].apply(lambda x: x.representative_point())
        return result[['name', 'linearity', 'geometry']]

    return None


def plot_pois(m: folium.Map, pois: gpd.GeoDataFrame, icon: str = "info-sign", color: str = "blue") -> folium.Map:
    """
    m: folium map object
    pois: POIs GeoDataFrame
    icon: icon name for markers
    color: marker color
    """
    pois.apply(lambda row: marker_from_row(row, m, icon, color), axis=1)
    return m


def path_score(weights, n_left_turns, n_right_turns, n_curves, n_lights):
    return weights['turn_left']*n_left_turns + \
        weights['turn_right']*n_right_turns + \
        weights['curve']*n_curves + \
        weights['traffic_light']*n_lights

### Graph and POIs loading
- Load gps track from csv
- Create graph
- Enrich graph with linearity
- Save graph and traffic lights in the area

In [9]:
obs = load_obs(os.path.join(DATA_FOLDER, "platoon_route.csv"))
G, bbox_poly = graph_containing_obs(obs, distance=2000)
G = ox.add_edge_bearings(G)
G = ox.add_edge_speeds(G)
G = ox.add_edge_travel_times(G)
G = impute_missing_geometries(G)
G = add_edge_linearity(G)

ox.save_graphml(G, os.path.join(DATA_FOLDER, "graph.graphml"))

traffic_lights = ox.geometries_from_polygon(bbox_poly, tags={'highway': 'traffic_signals'})
traffic_lights.to_file(os.path.join(DATA_FOLDER, "traffic_lights.geojson"), driver="GeoJSON")

- Load graph

In [10]:
G = ox.load_graphml(os.path.join(DATA_FOLDER, "graph.graphml"))
G_proj = ox.project_graph(G, to_crs=PROJ_CRS)
traffic_lights = gpd.read_file(os.path.join(DATA_FOLDER, "traffic_lights.geojson"))

- Data cleaning

In [11]:
nodes, edges = ox.graph_to_gdfs(G)
nodes['df_index'] = nodes.index
edges['df_index'] = edges.index
edges['linearity'] = edges['linearity'].astype(float) # TODO: handle NaNs

- Plot graph

In [None]:
m = edges.explore(tiles="CartoDB dark matter", popup=True, tooltip=False, color="white")
m = nodes.explore(m=m, popup=True, tooltip=False, color="white")
m

### Paths generation
- Shortest path

In [None]:
# Origin and destination nodes
orig_node = 18244816
dest_node = 77777630

# Cost to minimize (pick from edge attributes)
weight = 'length'

path = ox.shortest_path(G, orig_node, dest_node, weight)
uvk = path_to_uvk(G, path)

marker_from_row(row=nodes.loc[orig_node], m=m, icon="flag", color="green")
marker_from_row(row=nodes.loc[dest_node], m=m, icon="screenshot", color="red")

path_edges = edges.loc[uvk]
path_edges["df_index"] = path_edges.index

m = path_edges.explore(m=m, popup=True, tooltip=False, color="DeepSkyBlue")
m

- Multiple paths
    - Find the line from orig to dest
    - Create the perpendicular and sample midpoinst
    - Compute paths as orig->midpoint + midpoint->dest

In [None]:
orig_point, _ = ox.projection.project_geometry(nodes.loc[orig_node, 'geometry'], crs=GEOG_CRS, to_crs=PROJ_CRS)
orig_coords = orig_point.coords[0]
dest_point, _ = ox.projection.project_geometry(nodes.loc[dest_node, 'geometry'], crs=GEOG_CRS, to_crs=PROJ_CRS)
dest_coords = dest_point.coords[0]

ab = LineString([orig_coords, dest_coords])
cd = perpendicular_line(ab)

In [None]:
ab_geog, _ = ox.projection.project_geometry(ab, crs=PROJ_CRS, to_crs=GEOG_CRS)
cd_geog, _ = ox.projection.project_geometry(cd, crs=PROJ_CRS, to_crs=GEOG_CRS)

In [None]:
ox.folium._make_folium_polyline(ab_geog, color="grey").add_to(m)
ox.folium._make_folium_polyline(cd_geog, color="grey").add_to(m)
m

In [None]:
num_paths = 10
spacing = cd.length / (num_paths-1)

In [None]:
X, Y = zip(*ox.utils_geo.interpolate_points(geom=cd, dist=spacing))

In [None]:
midpoints, dist = ox.distance.nearest_nodes(G_proj, X, Y, return_dist=True)

In [None]:
paths = multiple_paths(G, orig_node, dest_node, midpoints)

- Prepend shortest and fastest path to the list of paths

In [None]:
shortest_path = ox.shortest_path(G, orig_node, dest_node, weight='length')
fastest_path = ox.shortest_path(G, orig_node, dest_node, weight='travel_time')
paths.insert(0, shortest_path)
paths.insert(1, fastest_path)

- Get edge data

In [None]:
uvks = (path_to_uvk(G, path) for path in paths)

In [None]:
paths_data = []
for uvk in uvks:
    path_edges = edges.loc[uvk]
    path_edges["df_index"] = path_edges.index
    paths_data.append(path_edges)

- Plot paths

In [None]:
# generate a color for each path
colors = ox.plot.get_colors(n=num_paths, cmap='rainbow', return_hex=True)

for path_data, color in zip(paths_data, colors):
    m = path_data.explore(m=m, popup=True, tooltip=False, color=color)

m

- Linearity treshold choice

In [None]:
lin_th = 0.95

In [None]:
m2 = nodes.explore(tiles="CartoDB positron", popup=True, tooltip=False, color="black")
m2 = edges[edges.linearity < lin_th].explore(m=m2, popup=True, tooltip=False, color="red")
edges[edges.linearity >= lin_th].explore(m=m2, popup=True, tooltip=False, color="green")
edges[edges.linearity <= 0.000001].explore(m=m2, popup=True, tooltip=False, color="pink")

### Path stats

In [None]:
left_turns = []
right_turns = []
curves = []
lights = []

for path_data in paths_data:
    l, r = detect_turns(nodes, path_data)
    left_turns.append(l)
    right_turns.append(r)

    curves.append(detect_curves(path_data))
    lights.append(traffic_lights_along_path(traffic_lights, path_data))

In [None]:
traffic_lights.explore(m=m, color="lime")

In [None]:
# for i in range(1, buffers.shape[0]):
#     buffers.iloc[i] = buffers.iloc[i].difference(buffers.iloc[i-1])

In [None]:
# paths_data[1] = paths_data[1].to_crs(PROJ_CRS).rename(columns={'geometry': 'geometry_edge'})
# paths_data[1] = paths_data[1].set_geometry(buffers)
# result = paths_data[1].sjoin(traffic_lights.to_crs(PROJ_CRS))

### Paths ranking

In [None]:
interv_dist_scores = []
co2_scores = []

for lt, rt, c, li in zip(left_turns, right_turns, curves, lights):
    n_lt = lt.shape[0] if lt is not None else 0
    n_rt = rt.shape[0] if rt is not None else 0
    n_c = c.shape[0] if c is not None else 0
    n_li = li.shape[0] if li is not None else 0

    interv_dist_scores.append(path_score(interv_dist_weights, n_lt, n_rt, n_c, n_li))
    co2_scores.append(path_score(co2_weights, n_lt, n_rt, n_c, n_li))

In [None]:
interv_dist_scores

In [None]:
co2_scores

### Evaluation on test data

In [None]:
uvk = list(pd.read_csv(os.path.join(DATA_FOLDER, "edgeid_list.csv")).itertuples(index=False, name=None))

In [None]:
path_data = edges.loc[uvk]

In [None]:
lt, rt = detect_turns(nodes, path_data)
c = detect_curves(path_data)
li = traffic_lights_along_path(traffic_lights, path_data)

In [None]:
n_lt = lt.shape[0] if lt is not None else 0
n_rt = rt.shape[0] if rt is not None else 0
n_c = c.shape[0] if c is not None else 0
n_li = li.shape[0] if li is not None else 0

In [None]:
path_score(interv_dist_weights, n_lt, n_rt, n_c, n_li)

In [None]:
path_score(co2_weights, n_lt, n_rt, n_c, n_li)

In [None]:
# TODO: check gdfs are not None
m = edges.explore(tiles="cartodb positron", color="black", popup=True, tooltip=False)
m = path_data.explore(m=m, color="blue", popup=True, tooltip=False)
m = plot_pois(m, lt, icon="chevron-left")
m = plot_pois(m, rt, icon="chevron-right")
m = plot_pois(m, c, icon="link")
m = plot_pois(m, li, icon="info-sign")
m

- Save results on CSV

In [None]:
# TODO: check gdfs are not None
lt.to_csv(os.path.join(DATA_FOLDER, "platoon_left_turns.csv"), index=False)
rt.to_csv(os.path.join(DATA_FOLDER, "platoon_right_turns.csv"), index=False)
c.to_csv(os.path.join(DATA_FOLDER, "platoon_curves.csv"))
li.to_csv(os.path.join(DATA_FOLDER, "platoon_traffic_lights.csv"), index=False)

- num. assoluto semafori
-  verso: senso antiorario
- percorso ottimo che minimizza distanza interveicolo e emissioni co2
- ogni left turn, right turn, curva, semaforo ha un suo peso
- trovare percorso che migliora consumo co2 rispetto a shortest path

### Simulations
- The number of disp_norep tuples is $\frac{n\_samples!}{(n\_samples-2)!}$

In [8]:
n_samples = 100

In [9]:
sample_nodes_0 = random.choices(list(G.nodes()), k=n_samples)

In [10]:
temp = ox.utils_geo.sample_points(G_proj.to_undirected(), n=n_samples)
sample_nodes_1, _, _ = zip(*temp.index)
sample_nodes_1 = list(sample_nodes_1)

In [11]:
sample_nodes = sample_nodes_1
disp_rep = [(n1, n2) for n1 in sample_nodes for n2 in sample_nodes]
disp_norep = [d for d in disp_rep if len(d)==len(set(d))]

In [12]:
ods = random.choices(disp_norep, k=len(disp_norep) // n_samples)

- Plot to see OD tuples spatial distribution

In [13]:
m = nodes.loc[sample_nodes].explore(color="red")
nodes.loc[sample_nodes].explore(m=m, color="blue")

for n1, n2 in ods:
    line = LineString([nodes.loc[n1].geometry.coords[0], nodes.loc[n2].geometry.coords[0]])
    folium.PolyLine(locations=[coord[::-1] for coord in line.coords], color="blue").add_to(m)
m

In [None]:
n_candidates = 5
sim_results = {}

for orig_node, dest_node in ods:
    paths_osmids = {}

    paths_osmids["shortest_length"] = ox.shortest_path(G, orig_node, dest_node, weight='length')
    paths_osmids["shortest_time"] = ox.shortest_path(G, orig_node, dest_node, weight='travel_time')

    midpoints = generate_midpoints(orig_node=orig_node, dest_node=dest_node, n_midpoints=n_candidates, nodes=nodes, G_proj=G_proj)

    for i, p in enumerate(multiple_paths(G, orig_node, dest_node, midpoints)):
        paths_osmids[f"gen_{i}"] = p

    # --------------------------------
    paths_data = {}

    for path_key, path_osmid in paths_osmids.items():
        uvk = path_to_uvk(G, path_osmid)
        paths_data[path_key] = edges.loc[uvk]

    # --------------------------------
    detections = {}
    for path_key, path_data in paths_data.items():
        print(path_data.shape[0], end=" ")

        lt, rt = detect_turns(nodes, path_data)
        c = detect_curves(path_data)
        li = traffic_lights_along_path(traffic_lights, path_data)
        detections[path_key] = dict(
            left_turns=lt,
            right_turns=rt,
            curves=c,
            lights=li
        )

    # --------------------------------
    scores = {}
    for path_key, detection in detections.items():
        n_lt = detection['left_turns'].shape[0] if detection['left_turns'] is not None else 0
        n_rt = detection['right_turns'].shape[0] if detection['right_turns'] is not None else 0
        n_c = detection['curves'].shape[0] if detection['curves'] is not None else 0
        n_li = detection['lights'].shape[0] if detection['lights'] is not None else 0

        scores[path_key] = dict(
            interv_dist=path_score(interv_dist_weights, n_lt, n_rt, n_c, n_li),
            co2=path_score(co2_weights, n_lt, n_rt, n_c, n_li)
        )

    # --------------------------------
    sim_results[(orig_node, dest_node)] = dict(
        paths_osmids=paths_osmids,
        paths_data=paths_data,
        detections=detections,
        scores=scores
    )

- Save simulation results with pickle

In [4]:
# Save results
with open(os.path.join(DATA_FOLDER, "sim_results_2.pickle"), "wb") as f:
    pickle.dump(sim_results, f)

NameError: name 'sim_results' is not defined

In [None]:
sim_results[(36685306, 3447133292)]["scores"]

- Load simulation results

In [6]:
# load sim_results.pickle
with open(os.path.join(DATA_FOLDER, "sim_results_2.pickle"), "rb") as f:
    sim_results = pickle.load(f)

In [12]:
rows = {}
for result_key, result_value in sim_results.items():
    orig_node, dest_node = result_key
    
    row = {}
    row['od'] = result_key
    row["start_latitude"] = nodes.loc[orig_node].geometry.y  # TODO: include in sim_results
    row["start_longitude"] = nodes.loc[orig_node].geometry.x  # TODO: include in sim_results
    row["end_latitude"] = nodes.loc[dest_node].geometry.y  # TODO: include in sim_results
    row["end_longitude"] = nodes.loc[dest_node].geometry.x  # TODO: include in sim_results
    row["length_shortest_path"] = result_value["paths_data"]["shortest_length"]["length"].sum()
    row["interv_dist_shortest_path"] = result_value["scores"]["shortest_length"]["interv_dist"]
    row["co2_shortest_path"] = result_value["scores"]["shortest_length"]["co2"]

    best_interv_path_name = None
    best_interv_score = None

    best_co2_path_name = None
    best_co2_score = None

    for path_name, scores in result_value["scores"].items():
        # consider only generated paths
        if not path_name.startswith("gen_"):
            continue

        # ensure the considered path is different from the shortest path
        if result_value["paths_osmids"][path_name] == result_value["paths_osmids"]["shortest_length"]:
            continue

        if best_interv_path_name is None or scores["interv_dist"] < best_interv_score:
            best_interv_path_name = path_name
            best_interv_score = scores["interv_dist"]

        if best_co2_path_name is None or scores["co2"] < best_co2_score:
            best_co2_path_name = path_name
            best_co2_score = scores["co2"]

        if best_interv_path_name is not None:
            # length of the path with the best interv_dist score   
            row["length_best_interv_dist"] = result_value["paths_data"][best_interv_path_name]["length"].sum()

            # interv_dist score of the path with the best interv_dist score
            row["interv_dist_best_interv_dist"] = result_value["scores"][best_interv_path_name]["interv_dist"]

            # co2 score of the path with the best interv_dist score
            row["co2_best_interv_dist"] = result_value["scores"][best_interv_path_name]["co2"]

            # flag indicating if the best interv_dist path is the shortest path (debugging purposes)
            row["best_interv_dist_is_shortest"] = result_value["paths_osmids"][best_interv_path_name] == result_value["paths_osmids"]["shortest_length"]

        if best_co2_path_name is not None:
            # length of the path with the best co2 score
            row["length_best_co2"] = result_value["paths_data"][best_co2_path_name]["length"].sum()

            # interv_dist score of the path with the best co2 score
            row["interv_dist_best_co2"] = result_value["scores"][best_co2_path_name]["interv_dist"]

            # co2 score of the path with the best co2 score
            row["co2_best_co2"] = result_value["scores"][best_co2_path_name]["co2"]

            # flag indicating if the best co2 path is the shortest path (debugging purposes)
            row["best_co2_is_shortest"] = result_value["paths_osmids"][best_co2_path_name] == result_value["paths_osmids"]["shortest_length"]

    rows[result_key] = row

df = pd.DataFrame(rows).T

In [13]:
print("PATH GENERATION STATS (DEBUGGING PURPOSE)")

long_enough = df["length_shortest_path"] > 500.0
print("Percent of paths long enough:", long_enough.sum() / df.shape[0])

# check if a best interv_dist path different than shortest path exists
best_interv_dist_ne_shortest = ~df["length_best_interv_dist"].isna()
print("Percent of success in generating best_interv_dist paths:", best_interv_dist_ne_shortest.sum() / df.shape[0])

# check if a best co2 path different than shortest path exists
best_co2_ne_shortest = ~df["length_best_co2"].isna()
print("Percent of success in generating best_co2 paths:", best_co2_ne_shortest.sum() / df.shape[0])

df_clean = df[long_enough & best_interv_dist_ne_shortest & best_co2_ne_shortest]

print()
print("SCORE COMPARISON STATS (EVALUATION PURPOSE)")

perc = df_clean[df_clean["interv_dist_best_interv_dist"] < df_clean["interv_dist_shortest_path"]].shape[0] / df_clean.shape[0]
print("Percent of success in generating paths with better interv_dist than shortest: ", perc)

perc = ((df_clean["interv_dist_shortest_path"] - df_clean["interv_dist_best_interv_dist"]) / df_clean["interv_dist_shortest_path"]).mean()
print("Avg perc of improvement in interv_dist score: ", perc)

perc = df_clean[df_clean["co2_best_interv_dist"] < df_clean["co2_shortest_path"]].shape[0] / df_clean.shape[0]
print("Percent of success in generating paths with better co2 than shortest: ", perc)

perc = ((df_clean["co2_shortest_path"] - df_clean["co2_best_co2"]) / df_clean["co2_shortest_path"]).mean()
print("Avg perc of improvement in co2 score: ", perc)

PATH GENERATION STATS (DEBUGGING PURPOSE)
Percent of paths long enough: 0.9591836734693877
Percent of success in generating best_interv_dist paths: 0.9693877551020408
Percent of success in generating best_co2 paths: 0.9693877551020408

SCORE COMPARISON STATS (EVALUATION PURPOSE)
Percent of success in generating paths with better interv_dist than shortest:  0.2765957446808511
Avg perc of improvement in interv_dist score:  -0.3123936695553955
Percent of success in generating paths with better co2 than shortest:  0.2127659574468085
Avg perc of improvement in co2 score:  -0.14092672275929122


In [20]:
df.to_csv(os.path.join(DATA_FOLDER, "df.csv"), index=False)