Skip to content

Commit

Permalink
feat: add debug mode and refactor linestring_2d conversion (#96)
Browse files Browse the repository at this point in the history
* feat: added debug mode and refactored linestring_2d conversion

* feat: further simplify linestring_2d parsing

* chore: add debug mode to tests

* chore: add debug flag to combining multiple files scenario
  • Loading branch information
RaczeQ committed May 7, 2024
1 parent 5d19b3c commit ceb1b5a
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 21 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Debug mode that keeps all temporary files for further inspection, activated with `debug` flag

### Changed

- Refactored parsing native `LINESTRING_2D` types when reading them from saved parquet file

## [0.7.2] - 2024-04-28

### Changed
Expand Down
16 changes: 16 additions & 0 deletions quackosm/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def convert_pbf_to_gpq(
osm_way_polygon_features_config: Optional[Union[OsmWayPolygonConfig, dict[str, Any]]] = None,
save_as_wkt: bool = False,
verbosity_mode: Literal["silent", "transient", "verbose"] = "transient",
debug: bool = False,
) -> Path:
"""
Convert PBF file to GeoParquet file.
Expand Down Expand Up @@ -77,6 +78,8 @@ def convert_pbf_to_gpq(
verbosity mode. Can be one of: silent, transient and verbose. Silent disables
output completely. Transient tracks progress, but removes output after finished.
Verbose leaves all progress outputs in the stdout. Defaults to "transient".
debug (bool, optional): If turned on, will keep all temporary files after operation
for debugging. Defaults to `False`.
Returns:
Path: Path to the generated GeoParquet file.
Expand Down Expand Up @@ -229,6 +232,7 @@ def convert_pbf_to_gpq(
working_directory=working_directory,
osm_way_polygon_features_config=osm_way_polygon_features_config,
verbosity_mode=verbosity_mode,
debug=debug,
).convert_pbf_to_gpq(
pbf_path=pbf_path,
result_file_path=result_file_path,
Expand All @@ -254,6 +258,7 @@ def convert_geometry_to_gpq(
save_as_wkt: bool = False,
verbosity_mode: Literal["silent", "transient", "verbose"] = "transient",
allow_uncovered_geometry: bool = False,
debug: bool = False,
) -> Path:
"""
Get a GeoParquet file with OpenStreetMap features within given geometry.
Expand Down Expand Up @@ -308,6 +313,8 @@ def convert_geometry_to_gpq(
allow_uncovered_geometry (bool): Suppress an error if some geometry parts aren't covered
by any OSM extract. Works only when PbfFileReader is asked to download OSM extracts
automatically. Defaults to `False`.
debug (bool, optional): If turned on, will keep all temporary files after operation
for debugging. Defaults to `False`.
Returns:
Path: Path to the generated GeoParquet file.
Expand Down Expand Up @@ -414,6 +421,7 @@ def convert_geometry_to_gpq(
osm_extract_source=osm_extract_source,
verbosity_mode=verbosity_mode,
allow_uncovered_geometry=allow_uncovered_geometry,
debug=debug,
).convert_geometry_filter_to_gpq(
result_file_path=result_file_path,
keep_all_tags=keep_all_tags,
Expand All @@ -435,6 +443,7 @@ def get_features_gdf(
working_directory: Union[str, Path] = "files",
osm_way_polygon_features_config: Optional[Union[OsmWayPolygonConfig, dict[str, Any]]] = None,
verbosity_mode: Literal["silent", "transient", "verbose"] = "transient",
debug: bool = False,
) -> gpd.GeoDataFrame:
"""
Get features GeoDataFrame from a PBF file or list of PBF files.
Expand Down Expand Up @@ -480,6 +489,8 @@ def get_features_gdf(
verbosity mode. Can be one of: silent, transient and verbose. Silent disables
output completely. Transient tracks progress, but removes output after finished.
Verbose leaves all progress outputs in the stdout. Defaults to "transient".
debug (bool, optional): If turned on, will keep all temporary files after operation
for debugging. Defaults to `False`.
Returns:
gpd.GeoDataFrame: GeoDataFrame with OSM features.
Expand Down Expand Up @@ -599,6 +610,7 @@ def get_features_gdf(
working_directory=working_directory,
osm_way_polygon_features_config=osm_way_polygon_features_config,
verbosity_mode=verbosity_mode,
debug=debug,
).get_features_gdf(
file_paths=file_paths,
keep_all_tags=keep_all_tags,
Expand All @@ -620,6 +632,7 @@ def get_features_gdf_from_geometry(
osm_way_polygon_features_config: Optional[Union[OsmWayPolygonConfig, dict[str, Any]]] = None,
verbosity_mode: Literal["silent", "transient", "verbose"] = "transient",
allow_uncovered_geometry: bool = False,
debug: bool = False,
) -> gpd.GeoDataFrame:
"""
Get a GeoParquet file with OpenStreetMap features within given geometry.
Expand Down Expand Up @@ -668,6 +681,8 @@ def get_features_gdf_from_geometry(
allow_uncovered_geometry (bool): Suppress an error if some geometry parts aren't covered
by any OSM extract. Works only when PbfFileReader is asked to download OSM extracts
automatically. Defaults to `False`.
debug (bool, optional): If turned on, will keep all temporary files after operation
for debugging. Defaults to `False`.
Returns:
gpd.GeoDataFrame: GeoDataFrame with OSM features.
Expand Down Expand Up @@ -728,6 +743,7 @@ def get_features_gdf_from_geometry(
osm_extract_source=osm_extract_source,
verbosity_mode=verbosity_mode,
allow_uncovered_geometry=allow_uncovered_geometry,
debug=debug,
).get_features_gdf_from_geometry(
keep_all_tags=keep_all_tags,
explode_tags=explode_tags,
Expand Down
75 changes: 56 additions & 19 deletions quackosm/pbf_file_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def __init__(
osm_extract_source: Union[OsmExtractSource, str] = OsmExtractSource.geofabrik,
verbosity_mode: Literal["silent", "transient", "verbose"] = "transient",
allow_uncovered_geometry: bool = False,
debug: bool = False,
) -> None:
"""
Initialize PbfFileReader.
Expand All @@ -133,7 +134,7 @@ def __init__(
Config used to determine which closed way features are polygons.
Modifications to this config left are left for experienced OSM users.
Defaults to predefined "osm_way_polygon_features.json".
parquet_compression (str): Compression of intermediate parquet files.
parquet_compression (str, optional): Compression of intermediate parquet files.
Check https://duckdb.org/docs/sql/statements/copy#parquet-options for more info.
Defaults to "snappy".
osm_extract_source (Union[OsmExtractSource, str], optional): A source for automatic
Expand All @@ -143,8 +144,10 @@ def __init__(
verbosity mode. Can be one of: silent, transient and verbose. Silent disables
output completely. Transient tracks progress, but removes output after finished.
Verbose leaves all progress outputs in the stdout. Defaults to "transient".
allow_uncovered_geometry (bool): Suppress an error if some geometry parts aren't
covered by any OSM extract. Defaults to `False`.
allow_uncovered_geometry (bool, optional): Suppress an error if some geometry parts
aren't covered by any OSM extract. Defaults to `False`.
debug (bool, optional): If turned on, will keep all temporary files after operation
for debugging. Defaults to `False`.
Raises:
InvalidGeometryFilter: When provided geometry filter has parts without area.
Expand All @@ -168,6 +171,7 @@ def __init__(
self.connection: duckdb.DuckDBPyConnection = None
self.encountered_query_exception = False
self.verbosity_mode = verbosity_mode
self.debug = debug
self.task_progress_tracker: TaskProgressTracker = None

self.rows_per_group = PbfFileReader.ROWS_PER_GROUP_MEMORY_CONFIG[0]
Expand Down Expand Up @@ -248,6 +252,10 @@ def convert_pbf_to_gpq(

with tempfile.TemporaryDirectory(dir=self.working_directory.resolve()) as self.tmp_dir_name:
self.tmp_dir_path = Path(self.tmp_dir_name)

if self.debug:
self.tmp_dir_path = self._prepare_debug_directory()

try:
self.encountered_query_exception = False
self.connection = _set_up_duckdb_connection(tmp_dir_path=self.tmp_dir_path)
Expand Down Expand Up @@ -382,6 +390,9 @@ def convert_geometry_filter_to_gpq(
with tempfile.TemporaryDirectory(
dir=self.working_directory.resolve()
) as tmp_dir_name:
if self.debug:
tmp_dir_name = self._prepare_debug_directory() # type: ignore[assignment] # noqa: PLW2901

try:
joined_parquet_table = self._drop_duplicated_features_in_pyarrow_table(
parsed_geoparquet_files
Expand Down Expand Up @@ -492,6 +503,9 @@ def get_features_gdf(

if parsed_geoparquet_files:
with tempfile.TemporaryDirectory(dir=self.working_directory.resolve()) as tmp_dir_name:
if self.debug:
tmp_dir_name = self._prepare_debug_directory() # type: ignore[assignment] # noqa: PLW2901

try:
joined_parquet_table = self._drop_duplicated_features_in_pyarrow_table(
parsed_geoparquet_files
Expand Down Expand Up @@ -620,6 +634,8 @@ def _drop_duplicated_features_in_joined_table(
COMPRESSION '{self.parquet_compression}'
)
"""
if self.debug:
log_message(f"Saved to directory: {output_file_name}")
self._run_query(query, run_in_separate_process=True, tmp_dir_path=tmp_dir_path)
return pq.read_table(output_file_name)

Expand Down Expand Up @@ -655,6 +671,8 @@ def _drop_duplicated_features_in_joined_table_one_by_one(
COMPRESSION '{self.parquet_compression}'
)
"""
if self.debug:
log_message(f"Saved to directory: {filtered_result_parquet_file}")
connection.sql(query)
result_parquet_files.extend(filtered_result_parquet_file.glob("*.parquet"))
return pq.read_table(result_parquet_files)
Expand Down Expand Up @@ -1282,7 +1300,12 @@ def _prefilter_elements_ids(
relations_filtered_ids=relations_filtered_ids,
)

def _delete_directories(self, directories: Union[str, Path, list[Union[str, Path]]]) -> None:
def _delete_directories(
self, directories: Union[str, Path, list[Union[str, Path]]], override_debug: bool = False
) -> None:
if self.debug and not override_debug:
return

_directories = []
if isinstance(directories, (str, Path)):
_directories = [directories]
Expand All @@ -1304,6 +1327,14 @@ def _delete_directories(self, directories: Union[str, Path, list[Union[str, Path
finally:
tries -= 1

def _prepare_debug_directory(self) -> Path:
if self.debug:
dir_path = Path(self.working_directory) / "debug" / secrets.token_hex(16)
self._delete_directories(dir_path, override_debug=True)
dir_path.mkdir(exist_ok=True, parents=True)
return dir_path
raise RuntimeError("Cannot prepare debug directory when debug mode is not activated.")

def _generate_osm_tags_sql_filter(self) -> str:
"""Prepare features filter clauses based on tags filter."""
positive_filter_clauses: list[str] = []
Expand Down Expand Up @@ -1422,6 +1453,8 @@ def _save_parquet_file(
)
"""
self._run_query(query, run_in_separate_process)
if self.debug:
log_message(f"Saved to directory: {file_path}")
return self.connection.sql(f"SELECT * FROM read_parquet('{file_path}/**')")

def _run_query(
Expand Down Expand Up @@ -1483,6 +1516,8 @@ def _calculate_unique_ids_to_parquet(
)
"""
)
if self.debug:
log_message(f"Saved to directory: {result_path}")

return self.connection.sql(f"SELECT * FROM read_parquet('{result_path}/**')")

Expand Down Expand Up @@ -1775,6 +1810,8 @@ def _group_ways(
)
"""
)
if self.debug:
log_message(f"Saved to directory: {grouped_ways_path}")

return groups

Expand Down Expand Up @@ -1815,6 +1852,8 @@ def _construct_ways_linestrings(
self.rows_per_group > PbfFileReader.ROWS_PER_GROUP_MEMORY_CONFIG[0]
),
)
if self.debug:
log_message(f"Saved to directory: {current_destination_path}")

self._delete_directories(current_ways_group_path)

Expand Down Expand Up @@ -1884,8 +1923,8 @@ def _get_filtered_ways_with_proper_geometry(
tags,
(CASE
WHEN is_polygon
THEN linestring_to_polygon_wkt(linestring)
ELSE linestring_to_linestring_wkt(linestring)
THEN linestring_to_polygon_geometry(linestring)
ELSE linestring_to_linestring_geometry(linestring)
END)::GEOMETRY AS geometry
FROM
required_ways_with_linestrings w
Expand Down Expand Up @@ -1913,7 +1952,7 @@ def _get_filtered_relations_with_geometry(
r.id,
COALESCE(r.ref_role, 'outer') as ref_role,
r.ref,
linestring_to_linestring_wkt(w.linestring)::GEOMETRY as geometry
linestring_to_linestring_geometry(w.linestring)::GEOMETRY as geometry
FROM ({osm_parquet_files.relations_with_unnested_way_refs.sql_query()}) r
SEMI JOIN ({osm_parquet_files.relations_filtered_ids.sql_query()}) fr
ON r.id = fr.id
Expand Down Expand Up @@ -2076,6 +2115,8 @@ def _save_parquet_file_with_geometry(
)
"""
)
if self.debug:
log_message(f"Saved to directory: {file_path}")

return self.connection.sql(
f"""
Expand Down Expand Up @@ -2438,18 +2479,14 @@ def _set_up_duckdb_connection(
connection.install_extension(extension_name)
connection.load_extension(extension_name)

connection.sql(
"""
CREATE OR REPLACE MACRO linestring_to_linestring_wkt(ls) AS
'LINESTRING (' || array_to_string([pt.x || ' ' || pt.y for pt in ls], ', ') || ')';
"""
)
connection.sql(
"""
CREATE OR REPLACE MACRO linestring_to_polygon_wkt(ls) AS
'POLYGON ((' || array_to_string([pt.x || ' ' || pt.y for pt in ls], ', ') || '))';
"""
)
connection.sql("""
CREATE OR REPLACE MACRO linestring_to_linestring_geometry(ls) AS
ls::struct(x DECIMAL(10, 7), y DECIMAL(10, 7))[]::LINESTRING_2D::GEOMETRY;
""")
connection.sql("""
CREATE OR REPLACE MACRO linestring_to_polygon_geometry(ls) AS
[ls::struct(x DECIMAL(10, 7), y DECIMAL(10, 7))[]]::POLYGON_2D::GEOMETRY;
""")

return connection

Expand Down
4 changes: 2 additions & 2 deletions tests/base/test_pbf_file_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def test_antwerpen_and_brussels_invalid_linear_ring() -> None:
def test_combining_files_different_techniques(
mocker: MockerFixture, operation_mode: str, patch_methods: int
) -> None:
"""Test if all files merging techniques work as expected."""
"""Test if all files merging techniques work as expected in debug mode."""
if patch_methods > 0:
# Leave _drop_duplicated_features_in_joined_table as backup
mocker.patch(
Expand All @@ -185,7 +185,7 @@ def test_combining_files_different_techniques(
],
ignore_cache=True,
)
single_result_gdf = PbfFileReader().get_features_gdf(
single_result_gdf = PbfFileReader(debug=True).get_features_gdf(
file_paths=[monaco_file_path], ignore_cache=True
)

Expand Down

0 comments on commit ceb1b5a

Please sign in to comment.