In [3]:
import glob

import geopandas as gpd
import matplotlib.pyplot as plt
import numba
import numpy as np
import pandas as pd
from libpysal.graph import read_parquet
from sklearn.preprocessing import PowerTransformer, RobustScaler, StandardScaler
import momepy as mm

regions_datadir = "/data/uscuni-ulce/"
data_dir = "/data/uscuni-ulce/processed_data/"
eubucco_files = glob.glob(regions_datadir + "eubucco_raw/*")
graph_dir = data_dir + "neigh_graphs/"
chars_dir = "/data/uscuni-ulce/processed_data/chars/"

In [4]:
from core.cluster_validation import generate_enc_groups
from core.utils import used_keys

In [19]:
region_hulls = gpd.read_parquet(
        regions_datadir + "regions/" + "regions_hull.parquet"
    ).to_crs('epsg:4326')

In [20]:
for region_id, region_hull in region_hulls.iterrows():
    region_hull = region_hull["convex_hull"]
    if region_id == 69300: break
region_id

69300

In [21]:
hull_boundary = region_hull.bounds

In [22]:
# !conda install -c conda-forge overturemaps -y

In [23]:
from typing import List, Optional

import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.dataset as ds
import pyarrow.fs as fs
import json
import os
import sys
from typing import Optional
import pyarrow.parquet as pq
import shapely.wkb

In [24]:
def record_batch_reader(overture_type, bbox=None) -> Optional[pa.RecordBatchReader]:
    """
    Return a pyarrow RecordBatchReader for the desired bounding box and s3 path
    """
    path = _dataset_path(overture_type)

    if bbox:
        xmin, ymin, xmax, ymax = bbox
        filter = (
            (pc.field("bbox", "xmin") < xmax)
            & (pc.field("bbox", "xmax") > xmin)
            & (pc.field("bbox", "ymin") < ymax)
            & (pc.field("bbox", "ymax") > ymin)
        )
    else:
        filter = None

    dataset = ds.dataset(
        path, filesystem=fs.S3FileSystem(anonymous=True, region="us-west-2")
    )
    batches = dataset.to_batches(filter=filter)

    # to_batches() can yield many batches with no rows. I've seen
    # this cause downstream crashes or other negative effects. For
    # example, the ParquetWriter will emit an empty row group for
    # each one bloating the size of a parquet file. Just omit
    # them so the RecordBatchReader only has non-empty ones. Use
    # the generator syntax so the batches are streamed out
    non_empty_batches = (b for b in batches if b.num_rows > 0)

    geoarrow_schema = geoarrow_schema_adapter(dataset.schema)
    reader = pa.RecordBatchReader.from_batches(geoarrow_schema, non_empty_batches)
    return reader


def geoarrow_schema_adapter(schema: pa.Schema) -> pa.Schema:
    """
    Convert a geoarrow-compatible schema to a proper geoarrow schema

    This assumes there is a single "geometry" column with WKB formatting

    Parameters
    ----------
    schema: pa.Schema

    Returns
    -------
    pa.Schema
    A copy of the input schema with the geometry field replaced with
    a new one with the proper geoarrow ARROW:extension metadata

    """
    geometry_field_index = schema.get_field_index("geometry")
    geometry_field = schema.field(geometry_field_index)
    geoarrow_geometry_field = geometry_field.with_metadata(
        {b"ARROW:extension:name": b"geoarrow.wkb"}
    )

    geoarrow_schema = schema.set(geometry_field_index, geoarrow_geometry_field)

    return geoarrow_schema


type_theme_map = {
    "locality": "admins",
    "locality_area": "admins",
    "administrative_boundary": "admins",
    "building": "buildings",
    "building_part": "buildings",
    "division": "divisions",
    "division_area": "divisions",
    "place": "places",
    "segment": "transportation",
    "connector": "transportation",
    "infrastructure": "base",
    "land": "base",
    "land_cover": "base",
    "land_use": "base",
    "water": "base",
}


def _dataset_path(overture_type: str) -> str:
    """
    Returns the s3 path of the Overture dataset to use. This assumes overture_type has
    been validated, e.g. by the CLI

    """
    # Map of sub-partition "type" to parent partition "theme" for forming the
    # complete s3 path. Could be discovered by reading from the top-level s3
    # location but this allows to only read the files in the necessary partition.
    theme = type_theme_map[overture_type]
    return f"overturemaps-us-west-2/release/2024-06-13-beta.1/theme={theme}/type={overture_type}/"


def get_all_overture_types() -> List[str]:
    return list(type_theme_map.keys())

def get_writer(output_format, path, schema):
    if output_format == "geojson":
        writer = GeoJSONWriter(path)
    elif output_format == "geojsonseq":
        writer = GeoJSONSeqWriter(path)
    elif output_format == "geoparquet":
        # Update the geoparquet metadata to remove the file-level bbox which
        # will no longer apply to this file. Since we cannot write the field at
        # the end, just remove it as it's optional. Let the per-row bounding
        # boxes do all the work.
        metadata = schema.metadata
        geo = json.loads(metadata[b"geo"])
        for column in geo["columns"].values():
            column.pop("bbox")
        metadata[b"geo"] = json.dumps(geo).encode("utf-8")
        schema = schema.with_metadata(metadata)

        writer = pq.ParquetWriter(path, schema)
    return writer

def copy(reader, writer):
    while True:
        try:
            batch = reader.read_next_batch()
        except StopIteration:
            break
        if batch.num_rows > 0:
            writer.write_batch(batch)

In [25]:


def download(bbox, output_format, output, type_):
    if output is None:
        output = sys.stdout

    reader = record_batch_reader(type_, bbox)
    if reader is None:
        return

    with get_writer(output_format, output, schema=reader.schema) as writer:
        copy(reader, writer)

In [30]:
%%time

type_ = 'segment'
output_format = 'geoparquet'

# download(hull_boundary, output_format, f'../data/prague_overture_{type_}.{output_format}', type_)

CPU times: user 2 μs, sys: 0 ns, total: 2 μs
Wall time: 4.29 μs


In [36]:
%%time
batches = record_batch_reader(type_, hull_boundary).read_all()

CPU times: user 2.35 s, sys: 1.25 s, total: 3.6 s
Wall time: 2min 31s


In [48]:
gdf = gpd.GeoDataFrame.from_arrow(batches)

In [59]:
gdf = gdf.iloc[gdf.sindex.query(region_hull, predicate='intersects')]

In [62]:
## service road removed
query = "living_street|motorway|motorway_link|pedestrian|primary|primary_link|residential|secondary|secondary_link|tertiary|tertiary_link|trunk|trunk_link|unclassified"
approved_roads = query.split('|')

In [63]:
gdf = gdf[gdf['class'].isin(approved_roads)]

In [69]:
gdf = gdf.sort_values('geometry').reset_index(drop=True)

In [70]:
gdf.to_parquet(data_dir + f"streets/streets_{region_id}.parquet")

In [71]:
streets = gpd.read_parquet(data_dir + f"/streets/streets_{region_id}.parquet")

graph = mm.gdf_to_nx(streets)
graph = mm.node_degree(graph)
graph = mm.subgraph(
    graph,
    radius=5,
    meshedness=True,
    cds_length=False,
    mode="sum",
    degree="degree",
    length="mm_len",
    mean_node_degree=False,
    proportion={0: True, 3: True, 4: True},
    cyclomatic=False,
    edge_node_ratio=False,
    gamma=False,
    local_closeness=True,
    closeness_weight="mm_len",
    node_density=True,
    verbose=False,
)
graph = mm.cds_length(graph, radius=3, name="ldsCDL", verbose=False)
graph = mm.clustering(graph, name="xcnSCl")
graph = mm.mean_node_dist(graph, name="mtdMDi", verbose=False)

nodes, edges = mm.nx_to_gdf(graph, spatial_weights=False)
edges = edges.sort_values('geometry')

In [72]:
edges.geometry.length

0        0.039749
3        0.016040
6        0.004324
10       0.006531
8        0.008161
           ...   
32374    0.000259
32373    0.000317
59858    0.000032
32371    0.002135
59857    0.001264
Length: 59859, dtype: float64

In [73]:
streets.geometry.length

0        0.039749
1        0.016040
2        0.004324
3        0.006531
4        0.008161
           ...   
59854    0.000259
59855    0.000317
59856    0.000032
59857    0.002135
59858    0.001264
Length: 59859, dtype: float64