From 5512b5e791d3997de9baf67ecce0576654a49c9f Mon Sep 17 00:00:00 2001 From: Brendan Collins Date: Wed, 29 Apr 2026 10:50:50 -0700 Subject: [PATCH] Guard sink_d8() against unbounded memory allocations (#1356) The numpy and cupy paths allocate H*W working buffers (labels and BFS queues on CPU, labels grid on GPU) before any sanity check on the input size. Passing a sufficiently large in-memory raster can OOM the host or device. Add per-module _BYTES_PER_PIXEL (24) and _GPU_BYTES_PER_PIXEL (8) constants and _check_memory / _check_gpu_memory helpers that raise MemoryError when the projected working set exceeds 50% of available RAM / free GPU memory. Wire the guards into the eager numpy and cupy branches of sink_d8(); dask paths skip the guard since per-tile allocations are bounded. Mirrors the pattern from #1318/#1319 and the rest of the hydro guard series. --- xrspatial/hydro/sink_d8.py | 85 +++++++++++++++++++++++++++ xrspatial/hydro/tests/test_sink_d8.py | 66 +++++++++++++++++++++ 2 files changed, 151 insertions(+) diff --git a/xrspatial/hydro/sink_d8.py b/xrspatial/hydro/sink_d8.py index 1a139d31..54baa3c1 100644 --- a/xrspatial/hydro/sink_d8.py +++ b/xrspatial/hydro/sink_d8.py @@ -32,6 +32,89 @@ class cupy: # type: ignore[no-redef] from xrspatial.dataset_support import supports_dataset +# ===================================================================== +# Memory guards +# ===================================================================== +# +# CPU peak working set per pixel for ``_sink_cpu``: +# labels : float64 -> 8 +# queue_r : int64 -> 8 +# queue_c : int64 -> 8 +# Total ~24 bytes/pixel. The caller-provided ``flow_dir`` array already +# lives in RAM before the kernel runs and is not double-counted here. +_BYTES_PER_PIXEL = 24 + +# GPU peak working set per pixel for ``_sink_cupy``: +# labels : float64 -> 8 +# Total ~8 bytes/pixel. ``flow_dir_data`` already lives on the device +# before the kernel runs and is not double-counted here. +_GPU_BYTES_PER_PIXEL = 8 + + +def _available_memory_bytes(): + """Best-effort estimate of available host memory in bytes.""" + try: + with open('/proc/meminfo', 'r') as f: + for line in f: + if line.startswith('MemAvailable:'): + return int(line.split()[1]) * 1024 # kB -> bytes + except (OSError, ValueError, IndexError): + pass + try: + import psutil + return psutil.virtual_memory().available + except (ImportError, AttributeError): + pass + return 2 * 1024 ** 3 + + +def _available_gpu_memory_bytes(): + """Best-effort estimate of free GPU memory in bytes. + + Returns 0 if CuPy / CUDA is unavailable or the query fails -- callers + use that as a sentinel meaning "no GPU info, skip the guard". + """ + try: + import cupy as _cp + free, _total = _cp.cuda.runtime.memGetInfo() + return int(free) + except Exception: + return 0 + + +def _check_memory(height, width): + """Raise MemoryError if the BFS kernel would exceed 50% of RAM.""" + required = int(height) * int(width) * _BYTES_PER_PIXEL + available = _available_memory_bytes() + if required > 0.5 * available: + raise MemoryError( + f"sink_d8 on a {height}x{width} grid requires " + f"~{required / 1e9:.1f} GB of working memory but only " + f"~{available / 1e9:.1f} GB is available. Use a " + f"dask-backed DataArray for out-of-core processing." + ) + + +def _check_gpu_memory(height, width): + """Raise MemoryError if the CuPy kernel would exceed 50% of free GPU RAM. + + Skips the check (returns silently) when ``_available_gpu_memory_bytes`` + cannot determine the free memory -- e.g. on hosts without CUDA, where + the kernel will fail at the cupy.asarray boundary anyway. + """ + available = _available_gpu_memory_bytes() + if available <= 0: + return + required = int(height) * int(width) * _GPU_BYTES_PER_PIXEL + if required > 0.5 * available: + raise MemoryError( + f"sink_d8 on a {height}x{width} grid requires " + f"~{required / 1e9:.1f} GB of GPU working memory but only " + f"~{available / 1e9:.1f} GB is free on the active device. " + f"Use a dask+cupy DataArray for out-of-core processing." + ) + + # ===================================================================== # CPU kernel # ===================================================================== @@ -255,9 +338,11 @@ def sink_d8(flow_dir: xr.DataArray, data = flow_dir.data if isinstance(data, np.ndarray): + _check_memory(*data.shape) out = _run_numpy(data) elif has_cuda_and_cupy() and is_cupy_array(data): + _check_gpu_memory(*data.shape) out = _sink_cupy(data) elif has_cuda_and_cupy() and is_dask_cupy(flow_dir): diff --git a/xrspatial/hydro/tests/test_sink_d8.py b/xrspatial/hydro/tests/test_sink_d8.py index cee78f87..174d095e 100644 --- a/xrspatial/hydro/tests/test_sink_d8.py +++ b/xrspatial/hydro/tests/test_sink_d8.py @@ -175,3 +175,69 @@ def test_numpy_equals_dask_cupy(): dcp_result = sink(dcp_agg) np.testing.assert_allclose( np_result.data, dcp_result.data.compute().get(), equal_nan=True) + + +# --------------------------------------------------------------------------- +# Memory guard +# --------------------------------------------------------------------------- + +class TestMemoryGuard: + """Memory guard on the eager numpy / cupy backends.""" + + def test_numpy_huge_raster_raises(self): + """Numpy backend raises MemoryError when projected RAM exceeds budget.""" + from unittest.mock import patch + + flow_dir = np.zeros((4, 4), dtype=np.float64) + agg = create_test_raster(flow_dir, backend='numpy') + + with patch( + "xrspatial.hydro.sink_d8._available_memory_bytes", + return_value=1, + ): + with pytest.raises(MemoryError, match="working memory"): + sink(agg) + + def test_numpy_normal_input_succeeds(self): + """Normal-size raster passes the guard with real memory.""" + flow_dir = np.zeros((10, 10), dtype=np.float64) + agg = create_test_raster(flow_dir, backend='numpy') + result = sink(agg) + assert result.shape == (10, 10) + + @dask_array_available + def test_dask_path_skips_guard(self): + """Dask backend bypasses the guard -- per-tile allocations are bounded.""" + from unittest.mock import patch + + flow_dir = np.zeros((20, 20), dtype=np.float64) + agg = create_test_raster(flow_dir, backend='dask+numpy', chunks=(5, 5)) + + with patch( + "xrspatial.hydro.sink_d8._available_memory_bytes", + return_value=1, + ): + result = sink(agg) + _ = result.data[:4, :4].compute() + + def test_error_message_mentions_dask(self): + """The error message should suggest the dask alternative.""" + from unittest.mock import patch + + flow_dir = np.zeros((4, 4), dtype=np.float64) + agg = create_test_raster(flow_dir, backend='numpy') + + with patch( + "xrspatial.hydro.sink_d8._available_memory_bytes", + return_value=1, + ): + with pytest.raises(MemoryError, match="dask"): + sink(agg) + + def test_byte_per_pixel_constants(self): + """Pin the documented per-pixel costs so refactors flag accidental changes.""" + import importlib + mod = importlib.import_module("xrspatial.hydro.sink_d8") + + assert mod._BYTES_PER_PIXEL == 24 + assert mod._GPU_BYTES_PER_PIXEL == 8