# Get the Network


## Download latest OSM Network from Geofabrik and clip it to Zurich

In [7]:
import json
import logging
import pandas as pd
import geopandas as gpd
from pyrosm import OSM, get_data
import time

# ------------ Config
START = time.time()
IS_TEST = False
TEST_GPKG = "./data/working_data/is_test.gpkg"
TEST_AREA = 1000
OUT_GPKG = "./data/working_data/zurich_walking_edges_filtered.gpkg"
OUT_LAYER = "edges"

# ------------ Logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

# ------------ Helper: shared test polygon (no disk writes)
def compute_test_polygon(fp_pbf, name="Zürich", radius_m=500, target_epsg=2056):
    osm_tmp = OSM(fp_pbf)
    boundary = osm_tmp.get_boundaries(name=name, boundary_type="administrative")
    if boundary is None or len(boundary) == 0:
        raise RuntimeError(f"No administrative boundary found for {name}.")
    boundary_city = boundary[boundary["admin_level"] == "8"].iloc[0]

    # buffer in LV95, then convert back to boundary CRS
    city_poly = gpd.GeoSeries([boundary_city.geometry], crs=boundary.crs).to_crs(target_epsg)
    center = city_poly.centroid.iloc[0]
    test_area = center.buffer(radius_m)
    poly = gpd.GeoSeries([test_area], crs=target_epsg).to_crs(boundary.crs).iloc[0]
    return poly

# ------------ 1) Fetch PBF & determine bounding polygon
logging.info("Loading Zurich PBF (canton-wide extract)…")
fp = get_data("Zuerich", directory="./data")

if IS_TEST:
    poly = compute_test_polygon(fp, name="Zürich", radius_m=TEST_AREA, target_epsg=2056)
    logging.info("Using TEST AREA around city center (radius ~%dm).", TEST_AREA)
else:
    osm_tmp = OSM(fp)
    boundary = osm_tmp.get_boundaries(name="Zürich", boundary_type="administrative")
    boundary_city = boundary[boundary["admin_level"] == "8"].iloc[0]
    poly = boundary_city.geometry
    logging.info("Using FULL city boundary (admin_level=8).")

# ------------ 2) Load OSM with bounding polygon & extract walking network
logging.info("Reloading OSM with bounding polygon…")
osm = OSM(fp, bounding_box=poly)

logging.info("Extracting walking network (edges only, with 'access')…")
edges = osm.get_network(network_type="walking", nodes=False, extra_attributes=["access"])
logging.info("Extracted %d edges (before filtering).", len(edges))

# to LV95
edges = edges.to_crs(2056)

# ------------ 3) Vectorized filtering
logging.info("Filtering edges by 'highway' and 'access' (vectorized)…")

strict_hw = ["footway", "path", "pedestrian", "steps"]
optional_hw = ["living_street", "corridor", "platform"]
keep_hw = set(strict_hw + optional_hw)

access_keep = ["agricultural", "destination", "no", "permissive", "yes", None]
keep_access = set(access_keep)

def _to_scalar(v):
    if isinstance(v, (list, tuple, set)):
        try:
            return next(iter(v)) if len(v) > 0 else pd.NA
        except TypeError:
            return next(iter(v), pd.NA)
    return v

if "highway" in edges.columns:
    edges["highway"] = edges["highway"].map(_to_scalar)
else:
    logging.warning("No 'highway' column found – skipping highway filter.")

if "access" in edges.columns:
    edges["access"] = edges["access"].map(_to_scalar)
else:
    logging.warning("No 'access' column found – skipping access filter.")

if "highway" in edges.columns:
    before = len(edges)
    edges = edges[edges["highway"].isin(keep_hw)]
    logging.info("Kept %d edges after highway filtering (from %d).", len(edges), before)

if "access" in edges.columns:
    before = len(edges)
    mask_access = edges["access"].isin(keep_access)
    if None in keep_access:
        mask_access = mask_access | edges["access"].isna()
    edges = edges[mask_access]
    logging.info("Kept %d edges after access filtering (from %d).", len(edges), before)

# ------------ 4) Sanitize attributes & compute lengths
logging.info("Sanitizing attributes (lists/dicts → strings)…")
def sanitize_columns(gdf):
    for col in gdf.columns:
        if col == gdf.geometry.name:
            continue
        gdf[col] = gdf[col].apply(
            lambda v: json.dumps(v, ensure_ascii=False)
            if isinstance(v, (dict, list, set, tuple)) else v
        )
    return gdf

edges = sanitize_columns(edges)
edges["length_m"] = edges.geometry.length

# ------------ 5) Write
logging.info("Writing GeoPackage: %s :: %s", OUT_GPKG, OUT_LAYER)
edges.to_file(OUT_GPKG, layer=OUT_LAYER, driver="GPKG")
logging.info("Done.")


2025-08-23 12:42:19,629 - INFO - Loading Zurich PBF (canton-wide extract)…
2025-08-23 12:42:41,367 - INFO - Using FULL city boundary (admin_level=8).
2025-08-23 12:42:41,367 - INFO - Reloading OSM with bounding polygon…
2025-08-23 12:42:41,384 - INFO - Extracting walking network (edges only, with 'access')…
You are setting values through chained assignment. Currently this works in certain cases, but when using Copy-on-Write (which will become the default behaviour in pandas 3.0) this will never work to update the original DataFrame or Series, because the intermediate object on which we are setting values will behave as a copy.
A typical example is when you are setting values in a column of a DataFrame, like:

df["col"][row_indexer] = value

Use `df.loc[row_indexer, "col"] = values` instead, to perform the assignment in a single step and ensure this keeps updating the original `df`.

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.h

# Combine Data from City of Zurich and OSM to one walking network

1. 1.5 m buffer from OSM and Zurich street network (square buffer ends)
2. Union of both
3. Fill holes smaller than 10 m²
4. -0.5 m square buffer
5. Merge multipart features into single parts
6. Keep the longest one

In [None]:
import logging
import geopandas as gpd
from shapely.geometry import Polygon
from shapely.ops import unary_union
from pyrosm import OSM, get_data

# ------------ Config
CRS_TARGET = 2056

OSM_EDGES_GPKG = "./data/working_data/zurich_walking_edges_filtered.gpkg"
OSM_EDGES_LAYER = "edges"

ZURICH_STREETS_GPKG = "./data/input_data/Routing_City_Network.gpkg"
ZURICH_STREETS_LAYER = "netz_fuss"

OUT_GPKG = "./data/working_data/zurich_walk_network_union.gpkg"
OUT_LAYER = "walk_union"

# Buffer parameters
BUF_POS = 1.5
BUF_NEG = -0.5
HOLE_MAX_AREA = 15

# ------------ Logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")


def ensure_crs(gdf: gpd.GeoDataFrame, crs_epsg: int) -> gpd.GeoDataFrame:
    if gdf.crs is None:
        raise ValueError("Input GeoDataFrame has no CRS. Please set it first.")
    if gdf.crs.to_epsg() != crs_epsg:
        return gdf.to_crs(crs_epsg)
    return gdf

# same test polygon as Script 1 — no disk writes
def compute_test_polygon(fp_pbf, name="Zürich", radius_m=500, target_epsg=2056):
    osm_tmp = OSM(fp_pbf)
    boundary = osm_tmp.get_boundaries(name=name, boundary_type="administrative")
    if boundary is None or len(boundary) == 0:
        raise RuntimeError(f"No administrative boundary found for {name}.")
    boundary_city = boundary[boundary["admin_level"] == "8"].iloc[0]
    city_poly = gpd.GeoSeries([boundary_city.geometry], crs=boundary.crs).to_crs(target_epsg)
    center = city_poly.centroid.iloc[0]
    test_area = center.buffer(radius_m)
    poly = gpd.GeoSeries([test_area], crs=target_epsg).iloc[0]
    return poly  # already in target_epsg

def remove_small_holes(geom, max_area: float):
    if geom is None or geom.is_empty:
        return geom
    if geom.geom_type == "Polygon":
        kept_interiors = []
        for interior in geom.interiors:
            hole_poly = Polygon(interior)
            if hole_poly.area >= max_area:
                kept_interiors.append(interior)
        return Polygon(geom.exterior, kept_interiors)
    elif geom.geom_type == "MultiPolygon":
        parts = [remove_small_holes(p, max_area) for p in geom.geoms]
        return unary_union([p for p in parts if p and not p.is_empty])
    else:
        return geom

def save_layer(gdf: gpd.GeoDataFrame, layer: str):
    """Save intermediate layers to a single GPKG when IS_TEST=True."""
    gdf.to_file(TEST_GPKG, layer=layer, driver="GPKG")
    logging.info("Wrote test layer: %s (%d features)", layer, len(gdf))

In [9]:
# ------------ 1) Load data
logging.info("Loading OSM edges…")
gdf_osm = gpd.read_file(OSM_EDGES_GPKG, layer=OSM_EDGES_LAYER)
gdf_osm = ensure_crs(gdf_osm, CRS_TARGET)

logging.info("Loading Zurich street network…")
gdf_zrh = gpd.read_file(ZURICH_STREETS_GPKG, layer=ZURICH_STREETS_LAYER)
gdf_zrh = ensure_crs(gdf_zrh, CRS_TARGET)

# ------------ 1a) Optional test area: identical to Script 1
if IS_TEST:
    logging.info("Clipping both layers with identical TEST AREA (radius ~%dm)…", TEST_AREA)
    fp = get_data("Zuerich", directory="./data")
    test_area = compute_test_polygon(fp, name="Zürich", radius_m=TEST_AREA, target_epsg=CRS_TARGET)
    gdf_osm = gdf_osm.clip(test_area)
    gdf_zrh = gdf_zrh.clip(test_area)

# Save stage 1 layers
save_layer(gdf_osm, "1_osm_edges")
save_layer(gdf_zrh, "1_zrh_edges")

# ------------ 2) Buffers
logging.info("Buffering OSM edges (+%.2fm)…", BUF_POS)
gdf_buf_osm = gpd.GeoDataFrame(geometry=gdf_osm.geometry.buffer(BUF_POS), crs=CRS_TARGET)
save_layer(gdf_buf_osm, "2_buf_osm")

logging.info("Buffering ZRH edges (+%.2fm)…", BUF_POS)
gdf_buf_zrh = gpd.GeoDataFrame(geometry=gdf_zrh.geometry.buffer(BUF_POS), crs=CRS_TARGET)
save_layer(gdf_buf_zrh, "2_buf_zrh")

# ------------ 3) Union
logging.info("Union of both buffers…")
union_geom = unary_union(list(gdf_buf_osm.geometry) + list(gdf_buf_zrh.geometry))
gdf_union = gpd.GeoDataFrame(geometry=[union_geom], crs=CRS_TARGET)
save_layer(gdf_union, "3_union")

# ------------ 4) Remove small holes
logging.info("Removing interior holes smaller than %.1f m²…", HOLE_MAX_AREA)
union_noholes = remove_small_holes(union_geom, HOLE_MAX_AREA)
gdf_union_noholes = gpd.GeoDataFrame(geometry=[union_noholes], crs=CRS_TARGET)
save_layer(gdf_union_noholes, "4_union_noholes")

# ------------ 5) Negative buffer
logging.info("Applying negative buffer %.2fm…", BUF_NEG)
contracted = union_noholes.buffer(BUF_NEG)
gdf_contracted = gpd.GeoDataFrame(geometry=[contracted], crs=CRS_TARGET)
save_layer(gdf_contracted, "5_contracted")

# ------------ 6) Multipart → singlepart
logging.info("Exploding multipart geometries…")
gs_single = gpd.GeoSeries([contracted], crs=f"EPSG:{CRS_TARGET}").explode(index_parts=False).reset_index(drop=True)
gdf_single = gpd.GeoDataFrame(geometry=gs_single, crs=f"EPSG:{CRS_TARGET}")
save_layer(gdf_single, "6_exploded")

# ------------ 7) Keep largest polygon & simplify
gdf_single["area_m2"] = gdf_single.geometry.area
gdf_top1 = gdf_single.sort_values("area_m2", ascending=False).head(1).reset_index(drop=True)
save_layer(gdf_top1, "7_top1")

logging.info("Simplifying geometry (tolerance=0.5)…")
gdf_top1["geometry"] = gdf_top1.geometry.simplify(tolerance=0.5, preserve_topology=True)
save_layer(gdf_top1, "7_simplified")

# ------------ 8) Write final output (always)
logging.info("Writing final result: %s :: %s", OUT_GPKG, OUT_LAYER)
gdf_top1.to_file(OUT_GPKG, layer=OUT_LAYER, driver="GPKG")
logging.info("Done. %d polygon(s) written.", len(gdf_top1))


2025-08-23 12:43:16,793 - INFO - Loading OSM edges…
2025-08-23 12:43:17,403 - INFO - Loading Zurich street network…
2025-08-23 12:43:17,991 - INFO - Created 41,496 records
2025-08-23 12:43:18,036 - INFO - Wrote test layer: 1_osm_edges (41496 features)
2025-08-23 12:43:18,170 - INFO - Created 30,853 records
2025-08-23 12:43:18,234 - INFO - Wrote test layer: 1_zrh_edges (30853 features)
2025-08-23 12:43:18,234 - INFO - Buffering OSM edges (+1.50m)…
2025-08-23 12:43:23,435 - INFO - Created 41,496 records
2025-08-23 12:43:23,568 - INFO - Wrote test layer: 2_buf_osm (41496 features)
2025-08-23 12:43:23,568 - INFO - Buffering ZRH edges (+1.50m)…
2025-08-23 12:43:24,185 - INFO - Created 30,853 records
2025-08-23 12:43:24,285 - INFO - Wrote test layer: 2_buf_zrh (30853 features)
2025-08-23 12:43:24,285 - INFO - Union of both buffers…
2025-08-23 12:43:48,653 - INFO - Created 1 records
2025-08-23 12:43:48,704 - INFO - Wrote test layer: 3_union (1 features)
2025-08-23 12:43:48,704 - INFO - Removi

No. 9 I will do by a QGIS Plugin calles BecaGIS. (via qgis_process)

In [10]:
# ------------ 9) Skeletonize the network
!qgis_process run becagis:Skeleton -- INPUT=.\data\working_data\zurich_walk_network_union.gpkg POSTPROCESSING=0 OUTPUT=.\data\working_data\zurich_walking_network_final.shp


----------------
Inputs
----------------

INPUT:	.\data\working_data\zurich_walk_network_union.gpkg
OUTPUT:	.\data\working_data\zurich_walking_network_final.shp
POSTPROCESSING:	0



----------------
Results
----------------

OUTPUT:	.\data\working_data\zurich_walking_network_final.shp




## No. 10: Spatial Join with OSM Attributes
(Outdated)


In [None]:
# ------------ 10) Join layers according to midpoints

import geopandas as gpd
from shapely.geometry import LineString, Point, MultiPoint
from shapely.ops import snap, unary_union

# --- Pfade & Parameter
LAYER1_PATH = r"./data/working_data/zurich_walking_network_final.shp"
LAYER2_GPKG = TEST_GPKG
LAYER2_NAME = "1_osm_edges"
OUT_GPKG = r"./data/zurich_walking_network_final.gpkg"
OUT_LAYER = "network_segments"
TARGET_CRS = "EPSG:2056"
MAX_DISTANCE_METERS = 10
DROP_COLS = ["fid", "area_m2"]

# --- Parameter fuer Netzreparatur
USE_PREP = True
SNAP_TOL_M = 2
MIN_SEGMENT_LEN = 0

def endpoints_multipoint(gdf):
    pts = []
    for geom in gdf.geometry:
        if geom and not geom.is_empty:
            if geom.geom_type == "LineString":
                cs = list(geom.coords)
                if len(cs) >= 2:
                    pts.extend([Point(cs[0]), Point(cs[-1])])
            elif geom.geom_type == "MultiLineString":
                for ls in geom.geoms:
                    cs = list(ls.coords)
                    if len(cs) >= 2:
                        pts.extend([Point(cs[0]), Point(cs[-1])])
    return MultiPoint(pts) if pts else MultiPoint()

def prepare_network_shapely(gdf: gpd.GeoDataFrame, snap_tol=1.0) -> gpd.GeoDataFrame:
    g = gdf.copy()
    g = g[g.geometry.notna()].copy()
    protected = endpoints_multipoint(g)

    # Linien leicht glätten (kann auch weggelassen werden)
    g["geometry"] = g.geometry.simplify(snap_tol)

    # Zuruecksnappen auf geschuetzte Endpunkte
    g["geometry"] = g.geometry.apply(lambda geom: snap(geom, protected, snap_tol))

    # Re-noden (alle Kreuzungen in echte Schnittkanten verwandeln)
    unioned = unary_union(g.geometry.values)
    if unioned.geom_type == "MultiLineString":
        lines = list(unioned.geoms)
    elif unioned.geom_type == "LineString":
        lines = [unioned]
    else:
        lines = []

    out = gpd.GeoDataFrame(geometry=lines, crs=g.crs)
    # Mini-Segmente raus
    out["length"] = out.length
    out = out[out["length"] > MIN_SEGMENT_LEN].drop(columns="length")
    return out

def explode_to_lines(gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
    g = gdf[gdf.geometry.notna()].copy().explode(index_parts=False)
    return g.loc[g.geometry.type == "LineString"].copy()

def lines_to_segments(gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
    rows = []
    for _, row in gdf.iterrows():
        coords = list(row.geometry.coords)
        for i in range(len(coords) - 1):
            rows.append({**row.drop(labels=["geometry"]).to_dict(),
                         "geometry": LineString([coords[i], coords[i+1]])})
    segs = gpd.GeoDataFrame(rows, geometry="geometry", crs=gdf.crs)
    segs.reset_index(drop=True, inplace=True)
    return segs

def midpoint(ls: LineString) -> Point:
    return ls.interpolate(0.5, normalized=True)

def drop_if_present(df, cols):
    return df[[c for c in df.columns if c not in cols]]

# --- Load + CRS
layer1 = gpd.read_file(LAYER1_PATH)
layer2 = gpd.read_file(LAYER2_GPKG, layer=LAYER2_NAME)
if layer1.crs != TARGET_CRS: layer1 = layer1.to_crs(TARGET_CRS)
if layer2.crs != TARGET_CRS: layer2 = layer2.to_crs(TARGET_CRS)

# --- Preprocessing fuer Layer 1
if USE_PREP:
    layer1 = prepare_network_shapely(layer1, snap_tol=SNAP_TOL_M)

# 1) Segmente aus Layer 1
layer1_segments = lines_to_segments(explode_to_lines(layer1))
layer1_segments["seg_id"] = layer1_segments.index

# 2–3) Layer 2 vorbereiten + Midpoints
layer2_lines = explode_to_lines(layer2).copy()
layer2_lines["edge_id"] = layer2_lines.index
seg_mid = gpd.GeoDataFrame(layer1_segments[["seg_id"]].copy(),
                           geometry=layer1_segments.geometry.apply(midpoint),
                           crs=layer1_segments.crs)
edge_mid = gpd.GeoDataFrame(layer2_lines[["edge_id"]].copy(),
                            geometry=layer2_lines.geometry.apply(midpoint),
                            crs=layer2_lines.crs)

# 4) Nearest match (Midpoint → Midpoint)
joined_pts = gpd.sjoin_nearest(
    seg_mid, edge_mid, how="left",
    max_distance=MAX_DISTANCE_METERS,
    distance_col="dist_mid2mid",
    lsuffix="_seg", rsuffix="_edge"
)

# 5) Mapping + Edge-Attribute mergen
mapping = joined_pts[["seg_id", "edge_id", "dist_mid2mid"]]
out = layer1_segments.merge(mapping, on="seg_id", how="left")
edge_attrs = layer2_lines.drop(columns=["geometry"])
out = out.merge(edge_attrs, on="edge_id", how="left", suffixes=("", "_edge"))

# 6) Clean + Save
out = drop_if_present(out, DROP_COLS)
out.to_file(OUT_GPKG, driver="GPKG", layer=OUT_LAYER, index=False)
print(f"Wrote: {OUT_GPKG} (layer='{OUT_LAYER}')")
import winsound
winsound.Beep(800, 200)
winsound.Beep(1200, 400)
winsound.Beep(1600, 400)

END_TIME = time.time()
DURATION = END_TIME - START
AREA_KM2 = (3.14159 * TEST_AREA * TEST_AREA) / 1_000_000
print(f"Test area: {AREA_KM2:.2f} km² | Duration: {DURATION:.2f} seconds | Duration for 1 km²: {DURATION / AREA_KM2:.2f} seconds")

2025-08-23 20:51:27,863 - INFO - Created 1,183,576 records


Wrote: ./data/zurich_walking_network_final.gpkg (layer='network_segments')
Test area: 3.14 km² | Duration: 29349.99 seconds | Duration for 1 km²: 9342.40 seconds


## No. 11: Remove dead-end segments

In [10]:
# ----------------------------------------------------------
# Iteratively remove short stubs (leaf segments) OR null-degree segments
# (N iterations, robust to column overlaps)
# ----------------------------------------------------------

import geopandas as gpd
import networkx as nx
import pandas as pd

OUT_GPKG = r"./data/zurich_walking_network_final.gpkg"
OUT_LAYER = "network_segments"
EDGES_CLEAN_LAYER = "network_segments_cleaned"

MIN_LEN = 50  # meters (Stub-Threshold)
N_ITER = 2     # number of iterations

def clean_once(gdf, min_len):
    """Eine Iteration: degrees berechnen, Stummel < min_len oder Null-Degrees entfernen."""
    # Index resetten -> 'idx' passt sicher zum DataFrame
    gdf = gdf.reset_index(drop=True).copy()

    # alte Spalten weg, falls vorhanden (verhindert Overlap)
    for col in ["deg_u", "deg_v", "is_leaf", "length_m"]:
        if col in gdf.columns:
            gdf.drop(columns=col, inplace=True)

    # Graph aus Start-/Endpunkten bauen
    G = nx.Graph()
    for idx, geom in enumerate(gdf.geometry):
        coords = list(geom.coords)
        if len(coords) >= 2:
            start, end = coords[0], coords[-1]
            G.add_edge(start, end, idx=idx)

    # Degrees
    deg = dict(G.degree())
    edges_df = nx.to_pandas_edgelist(G)
    edges_df["deg_u"] = edges_df["source"].map(deg)
    edges_df["deg_v"] = edges_df["target"].map(deg)
    edges_df["is_leaf"] = (edges_df["deg_u"] == 1) | (edges_df["deg_v"] == 1)

    # Join zurück (per idx)
    deg_map = edges_df.set_index("idx")[["deg_u", "deg_v", "is_leaf"]]
    gdf = gdf.join(deg_map, how="left")

    # Länge (achte auf metrisches CRS – falls nicht, vorher .to_crs(2056))
    gdf["length_m"] = gdf.geometry.length

    # Löschmasken
    mask_stubs = (gdf["is_leaf"].fillna(False)) & (gdf["length_m"] < min_len)
    mask_nulls = (gdf["deg_u"].isna()) | (gdf["deg_v"].isna())

    to_remove_idx = gdf.index[mask_stubs | mask_nulls]
    gdf_clean = gdf.drop(index=to_remove_idx)

    return gdf_clean, int(len(to_remove_idx))

# ------------------- MAIN -------------------
gdf = gpd.read_file(OUT_GPKG, layer=OUT_LAYER)
gdf = gdf[gdf.geometry.type == "LineString"].copy()

# Falls nötig, in metrisches CRS (für korrekte Meter-Längen)
if (gdf.crs is None) or gdf.crs.is_geographic:
    gdf = gdf.to_crs(2056)  # LV95

total_removed = 0
for i in range(N_ITER):
    gdf, removed = clean_once(gdf, MIN_LEN)
    total_removed += removed
    print(f"Iteration {i+1}: removed {removed} segments, remaining {len(gdf)}")
    if removed == 0:
        break

print(f"Total removed after {i+1} iterations: {total_removed}")

# Export
gdf.to_file(OUT_GPKG, driver="GPKG", layer=EDGES_CLEAN_LAYER, index=False)
print(f"Saved cleaned edges to layer '{EDGES_CLEAN_LAYER}' in {OUT_GPKG}")


  mask_stubs = (gdf["is_leaf"].fillna(False)) & (gdf["length_m"] < min_len)


Iteration 1: removed 1078427 segments, remaining 105147
Iteration 2: removed 2558 segments, remaining 102589
Total removed after 2 iterations: 1080985
Saved cleaned edges to layer 'network_segments_cleaned' in ./data/zurich_walking_network_final.gpkg


## Compute Network Statistics

In [9]:
# ============================================================
# Compute network graph statistics for 3 GeoPackages
# ============================================================
import geopandas as gpd
import pandas as pd
import networkx as nx
from shapely.geometry import LineString, MultiLineString
import logging

# ------------------------------------------------------------
# CONFIGURATION
# ------------------------------------------------------------
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

CRS_TARGET = 2056  # LV95 coordinate system

# Replace these with your actual file paths:
FILES = {
    "OSM Filtered": "./data/working_data/is_test.gpkg",
    "Stadt Zurich": "./data/working_data/is_test.gpkg",
    "Final Cleaned": r"D:\Masterarbeit\03_Model\Scripts\3_Classificatiaon\3_Classification\data\network_segments_with_predictions_full.parquet",
}

# Replace these with the correct layer names in each GPKG:
LAYERS = {
    "OSM Filtered": "1_osm_edges",
    "Stadt Zurich": "1_zrh_edges",
    "Final Cleaned": "network_segments_with_predictions_full",
}

# ------------------------------------------------------------
# HELPER FUNCTIONS
# ------------------------------------------------------------
def get_endpoints(geom):
    """Return start and end coordinates of LineString or MultiLineString."""
    if isinstance(geom, LineString):
        coords = list(geom.coords)
        return coords[0], coords[-1]
    elif isinstance(geom, MultiLineString):
        all_coords = [list(line.coords) for line in geom.geoms if len(line.coords) > 1]
        if not all_coords:
            return None, None
        flat = [pt for line in all_coords for pt in line]
        return flat[0], flat[-1]
    else:
        return None, None


def compute_network_stats(gdf):
    """Compute simple network statistics from a GeoDataFrame."""
    # Keep only line geometries
    gdf = gdf[gdf.geometry.type.isin(["LineString", "MultiLineString"])].copy()
    if len(gdf) == 0:
        return None

    gdf = gdf.to_crs(CRS_TARGET)
    gdf["length_m"] = gdf.geometry.length
    gdf["endpoints"] = gdf.geometry.apply(get_endpoints)
    gdf = gdf.dropna(subset=["endpoints"])

    # Build undirected graph
    G = nx.Graph()
    for idx, (s, e) in enumerate(gdf["endpoints"]):
        if (s is None) or (e is None):
            continue
        G.add_edge(s, e, fid=idx, length=gdf.iloc[idx]["length_m"])

    # Compute metrics
    n_edges = G.number_of_edges()
    n_nodes = G.number_of_nodes()
    n_components = nx.number_connected_components(G)
    largest_cc = max(nx.connected_components(G), key=len) if n_components > 0 else []
    mean_degree = sum(dict(G.degree()).values()) / n_nodes if n_nodes > 0 else 0
    total_length_km = gdf["length_m"].sum() / 1000

    return {
        "n_edges": n_edges,
        "n_nodes": n_nodes,
        "total_length_km": round(total_length_km, 1),
        "n_components": n_components,
        "largest_component_nodes": len(largest_cc),
        "mean_degree": round(mean_degree, 2),
    }


# ------------------------------------------------------------
# MAIN EXECUTION
# ------------------------------------------------------------
results = {}
for name in FILES.keys():
    try:
        logging.info(f"Processing {name} ...")
        fp = FILES[name]

        # special case: parquet
        if fp.endswith(".parquet"):
            gdf = gpd.read_parquet(fp)
        else:
            gdf = gpd.read_file(fp, layer=LAYERS[name])

        stats = compute_network_stats(gdf)
        if stats:
            results[name] = stats
        else:
            logging.warning(f"No valid line geometries in {name}.")
    except Exception as e:
        logging.error(f"Error processing {name}: {e}")


# ------------------------------------------------------------
# EXPORT RESULTS
# ------------------------------------------------------------
if results:
    df = pd.DataFrame(results).T
    df.to_csv("./data/working_data/network_graph_stats.csv")
    print("\n=== Network Graph Statistics ===")
    print(df)
    print("\nSaved to ./data/working_data/network_graph_stats.csv")
else:
    print("⚠️ No valid results computed.")


2025-10-13 11:46:18,861 - INFO - Processing OSM Filtered ...
2025-10-13 11:46:23,312 - INFO - Processing Stadt Zurich ...
2025-10-13 11:46:24,782 - INFO - Processing Final Cleaned ...



=== Network Graph Statistics ===
                n_edges  n_nodes  total_length_km  n_components  \
OSM Filtered    41409.0  57837.0           1939.7       16985.0   
Stadt Zurich    30763.0  21046.0           1864.4          47.0   
Final Cleaned  102590.0  83712.0           2353.4           2.0   

               largest_component_nodes  mean_degree  
OSM Filtered                    4238.0         1.43  
Stadt Zurich                   20942.0         2.92  
Final Cleaned                  83708.0         2.45  

Saved to ./data/working_data/network_graph_stats.csv
