diff --git a/benchmark_runner.py b/benchmark_runner.py index 7abf97d4..100e38b3 100644 --- a/benchmark_runner.py +++ b/benchmark_runner.py @@ -1,6 +1,12 @@ import argparse from typing import Optional -from src.domain.enums import DatasetSize + +import yaml + +from src import Config +from src.application.common import logger +from src.application.common.monitor_utils import _save_run_metadata +from src.domain.enums import DatasetSize, StopReason from src.presentation.configuration import initialize_dependencies from src.presentation.entrypoints import ( setup_benchmarking_framework, @@ -39,6 +45,21 @@ def benchmark_runner() -> None: run_id=run_id, benchmark_run=benchmark_run, dataset_size=dataset_size ) + if _is_skipped(script_id): + logger.info(f"Experiment '{script_id}' marked skip=true in benchmarks.yml. Recording as skipped.") + _save_run_metadata( + query_id=script_id, + run_id=run_id, + achieved_iterations=0, + failed_iterations=0, + stop_reason=StopReason.FAILED, + ci_half_width_seconds=None, + ci_half_width_relative=None, + mean_elapsed_seconds=None, + median_elapsed_seconds=None, + ) + return + match _strip_dataset_size_suffix(script_id): case "bbox-filtering-duckdb": bbox_filtering_duckdb() @@ -118,6 +139,15 @@ def _strip_dataset_size_suffix(script_id: str) -> str: return script_id +def _is_skipped(script_id: str) -> bool: + with open(Config.BENCHMARK_FILE) as f: + cfg = yaml.safe_load(f) + for exp in cfg.get("experiments", []): + if exp.get("id") == script_id and exp.get("skip"): + return True + return False + + def _get_args() -> tuple[str, int, Optional[str], DatasetSize]: parser = argparse.ArgumentParser("doppa-data") parser.add_argument( diff --git a/benchmarks.yml b/benchmarks.yml index 61306815..6b1b6c87 100644 --- a/benchmarks.yml +++ b/benchmarks.yml @@ -203,6 +203,7 @@ experiments: cpu: 4 memory_gb: 16 dataset_size: large + skip: true # OOM on Azure PostgreSQL Flex Server at large scale related_script_ids: - national-scale-spatial-join-duckdb-large @@ -292,6 +293,7 @@ experiments: cpu: 4 memory_gb: 16 dataset_size: medium + skip: true # executor OOM: RangeJoin spatial index exceeds Standard_D4s_v3 memory at medium scale related_script_ids: - national-scale-spatial-join-databricks-broadcast-2-nodes-medium @@ -300,6 +302,7 @@ experiments: cpu: 4 memory_gb: 16 dataset_size: large + skip: true # executor OOM: RangeJoin spatial index exceeds Standard_D4s_v3 memory at large scale related_script_ids: - national-scale-spatial-join-databricks-partitioned-4-nodes-large - national-scale-spatial-join-databricks-partitioned-16-nodes-large @@ -317,6 +320,7 @@ experiments: cpu: 4 memory_gb: 16 dataset_size: medium + skip: true # executor OOM: RangeJoin spatial index exceeds Standard_D4s_v3 memory at medium scale related_script_ids: - national-scale-spatial-join-databricks-broadcast-4-nodes-medium @@ -325,6 +329,7 @@ experiments: cpu: 4 memory_gb: 16 dataset_size: large + skip: true # executor OOM: RangeJoin spatial index exceeds Standard_D4s_v3 memory at large scale related_script_ids: - national-scale-spatial-join-databricks-partitioned-2-nodes-large - national-scale-spatial-join-databricks-partitioned-16-nodes-large @@ -344,6 +349,7 @@ experiments: cpu: 4 memory_gb: 16 dataset_size: medium + skip: true # executor OOM: RangeJoin spatial index exceeds Standard_D4s_v3 memory at medium scale related_script_ids: - national-scale-spatial-join-databricks-broadcast-8-nodes-medium - national-scale-spatial-join-duckdb-medium @@ -354,6 +360,7 @@ experiments: cpu: 4 memory_gb: 16 dataset_size: large + skip: true # executor OOM: RangeJoin spatial index exceeds Standard_D4s_v3 memory at large scale related_script_ids: - national-scale-spatial-join-databricks-broadcast-8-nodes-large @@ -362,6 +369,7 @@ experiments: cpu: 4 memory_gb: 16 dataset_size: large + skip: true # executor OOM: RangeJoin spatial index exceeds Standard_D4s_v3 memory at large scale related_script_ids: - national-scale-spatial-join-databricks-partitioned-2-nodes-large - national-scale-spatial-join-databricks-partitioned-4-nodes-large diff --git a/src/infra/infrastructure/services/databricks_service.py b/src/infra/infrastructure/services/databricks_service.py index 08e4a35a..d7e82236 100644 --- a/src/infra/infrastructure/services/databricks_service.py +++ b/src/infra/infrastructure/services/databricks_service.py @@ -156,6 +156,14 @@ def _create_cluster( # the classic Spark planner respects. "spark.databricks.photon.enabled": "false", "spark.serializer": "org.apache.spark.serializer.KryoSerializer", + # AQE can rewrite Sedona's spatial join plans (RangeJoin, + # BroadcastIndexJoin) back into BroadcastNestedLoopJoin at + # execution time, even when the static plan is correct. + # Setting at cluster level ensures it's active before any + # notebook code runs — notebook-level spark.conf.set() may + # not take effect for job submissions on DBR 15.x. + "spark.sql.adaptive.enabled": "false", + "spark.sql.autoBroadcastJoinThreshold": "-1", }, } response = requests.post( diff --git a/src/presentation/databricks/national_scale_spatial_join_broadcast.py b/src/presentation/databricks/national_scale_spatial_join_broadcast.py index ccf75c53..5d155ba4 100644 --- a/src/presentation/databricks/national_scale_spatial_join_broadcast.py +++ b/src/presentation/databricks/national_scale_spatial_join_broadcast.py @@ -159,7 +159,8 @@ .orderBy(F.desc("building_count")) ) -cardinality = result.count() +collected = result.collect() +cardinality = len(collected) elapsed_seconds = time.perf_counter() - start_time spark.conf.set("spark.sql.adaptive.enabled", _original_aqe) diff --git a/src/presentation/databricks/national_scale_spatial_join_partitioned.py b/src/presentation/databricks/national_scale_spatial_join_partitioned.py index ed800683..6333195f 100644 --- a/src/presentation/databricks/national_scale_spatial_join_partitioned.py +++ b/src/presentation/databricks/national_scale_spatial_join_partitioned.py @@ -170,7 +170,8 @@ .agg(F.count(F.col("b.geometry")).alias("building_count")) .orderBy(F.desc("building_count")) ) - cardinality = result.count() + collected = result.collect() + cardinality = len(collected) elapsed_seconds = time.perf_counter() - start_time print(f"Spatial join complete. Regions with matched buildings: {cardinality}")