In [7]:
import platform
import re
import subprocess
from collections.abc import Iterable
from pathlib import Path
from typing import Optional
import geopandas as gpd
import pyogrio
import six

In [8]:
FEATURES_INDEX = "feature_id"

In [25]:
# Copyright (C) 2011 by Hong Minhee <http://dahlia.kr/>,
#                       Robert Kajic <http://github.com/kajic>
# Copyright (C) 2020 by Salesforce.com, Inc

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.


# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import json


def parse_hstore_tags(tags: str) -> dict[str, Optional[str]]:
    """
    Parse hstore tags to python dict.

    This function has been copied from pghstore library
    https://github.com/heroku/pghstore/blob/main/src/pghstore/_native.py
    since it can't be installed on Windows.
    """
    ESCAPE_RE = re.compile(r"\\(.)")

    PAIR_RE = re.compile(
        r'\s*(?:"(?P<kq>(?:[^\\"]|\\.)*)")\s*=>\s*'
        r'(?:"(?P<vq>(?:[^\\"]|\\.)*)"|(?P<vn>NULL))'
        r"\s*(?:(?P<ts>,)|$)",
        re.IGNORECASE,
    )

    def _unescape(s: str) -> str:
        return ESCAPE_RE.sub(r"\1", s)

    def _parse(
        string: str, encoding: str = "utf-8"
    ) -> Iterable[tuple[str, Optional[str]]]:
        if isinstance(string, six.binary_type):
            string = string.decode(encoding)

        string = string.strip()
        offset = 0
        term_sep = None
        for match in PAIR_RE.finditer(string):
            if match.start() > offset:
                raise ValueError("malformed hstore value: position %d" % offset)

            key = value = None
            kq = match.group("kq")
            if kq:
                key = _unescape(kq)

            if key is None:
                raise ValueError(
                    "Malformed hstore value starting at position %d" % offset
                )

            vq = match.group("vq")
            if vq:
                value = _unescape(vq)
            elif match.group("vn"):
                value = ""
            else:
                value = ""

            yield key, value

            term_sep = match.group("ts")

            offset = match.end()

        if len(string) > offset or term_sep:
            raise ValueError("malformed hstore value: position %d" % offset)

    return dict(_parse(tags, encoding="utf-8"))


def transform_pbf_to_gpkg(extract_file_path: Path, layer_name: str) -> Path:
    """Uses GDAL ogr2ogr to transform PBF file into GPKG."""
    extract_name = extract_file_path.stem[:-4]
    input_file = extract_file_path
    output_file = extract_file_path.parent / f"{extract_name}_{layer_name}.gpkg"
    config_file = extract_file_path.parent / "osmconf.ini"
    args = [
        "ogr2ogr" if platform.system() != "Windows" else "ogr2ogr.exe",
        str(output_file),
        str(input_file),
        layer_name,
        "-oo",
        f"CONFIG_FILE={config_file}",
    ]
    p = subprocess.Popen(
        args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=-1
    )
    _, err = p.communicate()
    rc = p.returncode
    if rc > 0:
        raise RuntimeError(rc, err)

    return output_file


def read_features_with_pyogrio(extract_file_path: Path) -> gpd.GeoDataFrame:
    """Read features from *.osm.pbf file using pyogrio."""
    gdfs = []
    extract_name = extract_file_path.stem[:-4]
    if Path(f'{extract_name}.geoparquet').exists():
        return

    for layer_name in (
        "points",
        "lines",
        "multilinestrings",
        "multipolygons",
        "other_relations",
    ):
        gpkg_file_path = transform_pbf_to_gpkg(extract_file_path, layer_name)
        gdf = pyogrio.read_dataframe(gpkg_file_path)

        if layer_name == "points":
            gdf[FEATURES_INDEX] = "node/" + gdf["osm_id"]
        elif layer_name == "lines":
            gdf[FEATURES_INDEX] = "way/" + gdf["osm_id"]
        elif layer_name in ("multilinestrings", "other_relations"):
            gdf[FEATURES_INDEX] = "relation/" + gdf["osm_id"]
        elif layer_name == "multipolygons":
            gdf[FEATURES_INDEX] = gdf.apply(
                lambda row: (
                    "relation/" + row["osm_id"]
                    if row["osm_id"] is not None
                    else "way/" + row["osm_way_id"]
                ),
                axis=1,
            )

        gdfs.append(gdf)

    final_gdf = gpd.pd.concat(gdfs)
    final_gdf = final_gdf[~final_gdf["all_tags"].isnull()]
    final_gdf["tags"] = final_gdf["all_tags"].apply(parse_hstore_tags)
    non_relations = ~final_gdf[FEATURES_INDEX].str.startswith("relation/")
    relations = final_gdf[FEATURES_INDEX].str.startswith("relation/")
    matching_relations = relations & final_gdf["tags"].apply(
        lambda x: x.get("type") in ("boundary", "multipolygon")
    )
    final_gdf = final_gdf[non_relations | matching_relations]
    final_gdf["tags"] = final_gdf["tags"].apply(json.dumps)
    final_gdf.geometry = final_gdf.geometry.make_valid()
    final_gdf = final_gdf[[FEATURES_INDEX, "tags", "geometry"]].set_index(
        FEATURES_INDEX
    )
    final_gdf.to_parquet(f"{extract_name}.geoparquet")
    return final_gdf

In [26]:
for filepath in Path('.').resolve().glob('*.osm.pbf'):
    read_features_with_pyogrio(filepath)