In [1]:
area_name = 'Poznan'
output_folder = f'data/{area_name}'

output_file = f'{output_folder}/cleaned.gpkg'

In [2]:
import geopandas
import pandas as pd
import osmnx as ox
import networkx as nx
import shapely

In [3]:
final_cways = geopandas.read_file(output_file, layer="cleaned")

  for feature in features_lst:


In [4]:
epsg = final_cways.crs.to_epsg()
epsg

2177

# Recreate the carriageways network

In [5]:
from shapely.geometry import LineString, MultiLineString
import numpy as np

all_cways = final_cways

def fix_precision(geom, decimals=2):
    if geom.type == "LineString":
        return LineString([np.round(p, decimals) for p in geom.coords])
    elif geom.type == "MultiLineString":
        lines = []
        for l in geom.geoms:
            lines.append(fix_precision(l))
        return MultiLineString(lines)

def get_lexicographic_boundaries(f):
    round_geom = fix_precision(f['geometry'])
    
    try:
        return [str(round_geom.coords[0]), str(round_geom.coords[-1])]
    except Exception as e:
        print(f['uuid'])
        raise e

tmp = all_cways.apply(get_lexicographic_boundaries, axis=1)
all_cways['u_node'] = [a[0] for a in tmp]
all_cways['v_node'] = [a[1] for a in tmp]

nodes = set(list(pd.unique(all_cways['u_node'])) + list(pd.unique(all_cways['v_node'])))

node_map = {}
for uuid, n in enumerate(nodes):
    node_map[n] = uuid

def get_node_uuid(f):
    return [node_map[f['u_node']], node_map[f['v_node']]]

tmp = all_cways.apply(get_node_uuid, axis=1)
all_cways['u'] = [a[0] for a in tmp]
all_cways['v'] = [a[1] for a in tmp]

tab = { "osmid": [], "geometry": [] }
for g in node_map:
    tab["osmid"].append(node_map[g])
    tab["geometry"].append(shapely.wkt.loads("POINT{}".format(g.replace(",", ""))))

cway_nodes = geopandas.GeoDataFrame(tab, geometry="geometry", crs=f"EPSG:{epsg}")

all_cways["geometry"] = all_cways.apply(lambda f: fix_precision(f["geometry"]), axis=1)

final_network = ox.graph_from_gdfs(cway_nodes, all_cways.set_index(["u", "v", "key"]))

In [6]:
# Get main weakly connected component as a graph
final_network = final_network.subgraph(max(nx.weakly_connected_components(final_network), key=len))

len(final_network.edges())

102510

In [7]:
from tqdm.notebook import trange, tqdm

In [8]:
# Find false dead-ends
deadend_roads = []

for osmid, node_data in tqdm(final_network.nodes(data=True)):
    deg = final_network.degree(osmid)
    if deg != 1:
        continue

    road_uuids = [a["road_uuid"] for u, v, a in final_network.in_edges(osmid, data=True)] + [a["road_uuid"] for u, v, a in final_network.out_edges(osmid, data=True)]

    uuids = [a["cway_uuid"] for u, v, a in final_network.in_edges(osmid, data=True)] + [a["cway_uuid"] for u, v, a in final_network.out_edges(osmid, data=True)]

    is_dual = [a["dual"] for u, v, a in final_network.in_edges(osmid, data=True)] + [a["dual"] for u, v, a in final_network.out_edges(osmid, data=True)]

    if not all(is_dual):
        continue
    
    assert len(road_uuids) == 1

    road_uuid = road_uuids[0]

    for uuid in uuids:
        deadend_roads.append({"cway_uuid": uuid, "road_uuid": road_uuid, "geometry": node_data["geometry"]})

  0%|          | 0/82072 [00:00<?, ?it/s]

In [9]:
temp_output = geopandas.GeoDataFrame(deadend_roads, geometry="geometry", crs=epsg)
temp_output.to_file(f"{output_folder}/deadend_roads.gpkg", driver="GPKG")

In [10]:
# Filter for dual carriageways that are dead ends on both sides
s = temp_output.groupby("road_uuid")["road_uuid"].count()
road_uuids = s[s > 1].index.tolist()
len(road_uuids)

deadend_cways = temp_output[temp_output["road_uuid"].isin(road_uuids)]

In [11]:
from shapely.geometry import Point, LineString

cways_to_edit = final_cways[final_cways["cway_uuid"].isin(deadend_cways["cway_uuid"])]

new_features = []

for road_uuid in road_uuids:
    sides = cways_to_edit[cways_to_edit["road_uuid"] == road_uuid]

    try:
        assert len(sides) == 2
        assert len(sides[sides["side"] == 1]) == 1
        assert len(sides[sides["side"] == 0]) == 1
    except:
        print(f"Weird {road_uuid}. Skipping...")
        continue

    n = sides.iloc[0].copy()

    left_side = sides[sides["side"] == 1].iloc[0]
    right_side = sides[sides["side"] == 0].iloc[0]

    # Check if we need to reverse
    left_side_point = deadend_cways[deadend_cways["cway_uuid"] == left_side["cway_uuid"]].iloc[0]["geometry"]

    if left_side_point.distance(Point(left_side["geometry"].coords[-1])) > left_side_point.distance(Point(left_side["geometry"].coords[0])):
        left_side, right_side = right_side, left_side

    n["geometry"] = LineString([left_side["geometry"].coords[-1], right_side["geometry"].coords[0]])

    new_features.append(n)

new_df = geopandas.GeoDataFrame(new_features, geometry="geometry", crs=epsg)

all_cways = geopandas.GeoDataFrame(pd.concat([final_cways, new_df]), geometry="geometry", crs=epsg)
all_cways["real_oneway"] = all_cways.apply(lambda f: "yes" if f["dual"] == 1 else f["oneway"], axis=1)

all_cways.to_file(f"{output_folder}/super_final.gpkg", driver="GPKG")