Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion benchmark_runner.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import argparse
from typing import Optional
from src.domain.enums import DatasetSize

import yaml

from src import Config
from src.application.common import logger
from src.application.common.monitor_utils import _save_run_metadata
from src.domain.enums import DatasetSize, StopReason
from src.presentation.configuration import initialize_dependencies
from src.presentation.entrypoints import (
setup_benchmarking_framework,
Expand Down Expand Up @@ -39,6 +45,21 @@ def benchmark_runner() -> None:
run_id=run_id, benchmark_run=benchmark_run, dataset_size=dataset_size
)

if _is_skipped(script_id):
logger.info(f"Experiment '{script_id}' marked skip=true in benchmarks.yml. Recording as skipped.")
_save_run_metadata(
query_id=script_id,
run_id=run_id,
achieved_iterations=0,
failed_iterations=0,
stop_reason=StopReason.FAILED,
ci_half_width_seconds=None,
ci_half_width_relative=None,
mean_elapsed_seconds=None,
median_elapsed_seconds=None,
)
return

match _strip_dataset_size_suffix(script_id):
case "bbox-filtering-duckdb":
bbox_filtering_duckdb()
Expand Down Expand Up @@ -118,6 +139,15 @@ def _strip_dataset_size_suffix(script_id: str) -> str:
return script_id


def _is_skipped(script_id: str) -> bool:
with open(Config.BENCHMARK_FILE) as f:
cfg = yaml.safe_load(f)
for exp in cfg.get("experiments", []):
if exp.get("id") == script_id and exp.get("skip"):
return True
return False


def _get_args() -> tuple[str, int, Optional[str], DatasetSize]:
parser = argparse.ArgumentParser("doppa-data")
parser.add_argument(
Expand Down
8 changes: 8 additions & 0 deletions benchmarks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ experiments:
cpu: 4
memory_gb: 16
dataset_size: large
skip: true # OOM on Azure PostgreSQL Flex Server at large scale
related_script_ids:
- national-scale-spatial-join-duckdb-large

Expand Down Expand Up @@ -292,6 +293,7 @@ experiments:
cpu: 4
memory_gb: 16
dataset_size: medium
skip: true # executor OOM: RangeJoin spatial index exceeds Standard_D4s_v3 memory at medium scale
related_script_ids:
- national-scale-spatial-join-databricks-broadcast-2-nodes-medium

Expand All @@ -300,6 +302,7 @@ experiments:
cpu: 4
memory_gb: 16
dataset_size: large
skip: true # executor OOM: RangeJoin spatial index exceeds Standard_D4s_v3 memory at large scale
related_script_ids:
- national-scale-spatial-join-databricks-partitioned-4-nodes-large
- national-scale-spatial-join-databricks-partitioned-16-nodes-large
Expand All @@ -317,6 +320,7 @@ experiments:
cpu: 4
memory_gb: 16
dataset_size: medium
skip: true # executor OOM: RangeJoin spatial index exceeds Standard_D4s_v3 memory at medium scale
related_script_ids:
- national-scale-spatial-join-databricks-broadcast-4-nodes-medium

Expand All @@ -325,6 +329,7 @@ experiments:
cpu: 4
memory_gb: 16
dataset_size: large
skip: true # executor OOM: RangeJoin spatial index exceeds Standard_D4s_v3 memory at large scale
related_script_ids:
- national-scale-spatial-join-databricks-partitioned-2-nodes-large
- national-scale-spatial-join-databricks-partitioned-16-nodes-large
Expand All @@ -344,6 +349,7 @@ experiments:
cpu: 4
memory_gb: 16
dataset_size: medium
skip: true # executor OOM: RangeJoin spatial index exceeds Standard_D4s_v3 memory at medium scale
related_script_ids:
- national-scale-spatial-join-databricks-broadcast-8-nodes-medium
- national-scale-spatial-join-duckdb-medium
Expand All @@ -354,6 +360,7 @@ experiments:
cpu: 4
memory_gb: 16
dataset_size: large
skip: true # executor OOM: RangeJoin spatial index exceeds Standard_D4s_v3 memory at large scale
related_script_ids:
- national-scale-spatial-join-databricks-broadcast-8-nodes-large

Expand All @@ -362,6 +369,7 @@ experiments:
cpu: 4
memory_gb: 16
dataset_size: large
skip: true # executor OOM: RangeJoin spatial index exceeds Standard_D4s_v3 memory at large scale
related_script_ids:
- national-scale-spatial-join-databricks-partitioned-2-nodes-large
- national-scale-spatial-join-databricks-partitioned-4-nodes-large
Expand Down
8 changes: 8 additions & 0 deletions src/infra/infrastructure/services/databricks_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,14 @@ def _create_cluster(
# the classic Spark planner respects.
"spark.databricks.photon.enabled": "false",
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
# AQE can rewrite Sedona's spatial join plans (RangeJoin,
# BroadcastIndexJoin) back into BroadcastNestedLoopJoin at
# execution time, even when the static plan is correct.
# Setting at cluster level ensures it's active before any
# notebook code runs — notebook-level spark.conf.set() may
# not take effect for job submissions on DBR 15.x.
"spark.sql.adaptive.enabled": "false",
"spark.sql.autoBroadcastJoinThreshold": "-1",
},
}
response = requests.post(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@
.orderBy(F.desc("building_count"))
)

cardinality = result.count()
collected = result.collect()
cardinality = len(collected)
elapsed_seconds = time.perf_counter() - start_time

spark.conf.set("spark.sql.adaptive.enabled", _original_aqe)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@
.agg(F.count(F.col("b.geometry")).alias("building_count"))
.orderBy(F.desc("building_count"))
)
cardinality = result.count()
collected = result.collect()
cardinality = len(collected)
elapsed_seconds = time.perf_counter() - start_time

print(f"Spatial join complete. Regions with matched buildings: {cardinality}")
Expand Down
Loading