From 28bcac306de7bab53ca2cf27ad8c804b5aecdcce Mon Sep 17 00:00:00 2001 From: Brendan Collins Date: Wed, 29 Apr 2026 10:50:46 -0700 Subject: [PATCH] Guard basin_d8() against unbounded memory allocations (#1357) basin_d8() on the numpy and cupy backends allocated H*W working arrays (~33 B/pixel CPU, ~28 B/pixel GPU) without checking available memory. A 50000x50000 input would request ~83 GB before failing. Add module-local _BYTES_PER_PIXEL / _GPU_BYTES_PER_PIXEL, _available_memory_bytes / _available_gpu_memory_bytes, _check_memory / _check_gpu_memory helpers (50% threshold) and call them from the numpy and cupy dispatch branches. The dask paths delegate to the watershed dask infrastructure and remain bounded per-tile, so they skip the guard. Tests cover the four guard outcomes plus a normal-sized success. --- xrspatial/hydro/basin_d8.py | 89 ++++++++++++++++++++++++++ xrspatial/hydro/tests/test_basin_d8.py | 73 +++++++++++++++++++++ 2 files changed, 162 insertions(+) diff --git a/xrspatial/hydro/basin_d8.py b/xrspatial/hydro/basin_d8.py index 11ed2fac..53988f35 100644 --- a/xrspatial/hydro/basin_d8.py +++ b/xrspatial/hydro/basin_d8.py @@ -37,6 +37,93 @@ class cupy: # type: ignore[no-redef] from xrspatial.dataset_support import supports_dataset +# ===================================================================== +# Memory guards +# ===================================================================== +# +# CPU peak working set per pixel for the numpy dispatch + ``_watershed_cpu``: +# fd (float64 cast) -> 8 +# labels (float64) -> 8 +# state (int8) -> 1 +# path_r (int64) -> 8 +# path_c (int64) -> 8 +# Total ~33 bytes/pixel. The caller's ``flow_dir`` array already lives in +# RAM before dispatch and is not double-counted. +_BYTES_PER_PIXEL = 33 + +# GPU peak working set per pixel for ``_basins_cupy``: +# flow_dir_f64 (float64) -> 8 +# labels (float64) -> 8 +# state (int32) -> 4 +# final where (float64) -> 8 +# Total 28 bytes/pixel on the device. +_GPU_BYTES_PER_PIXEL = 28 + + +def _available_memory_bytes(): + """Best-effort estimate of available host memory in bytes.""" + try: + with open('/proc/meminfo', 'r') as f: + for line in f: + if line.startswith('MemAvailable:'): + return int(line.split()[1]) * 1024 # kB -> bytes + except (OSError, ValueError, IndexError): + pass + try: + import psutil + return psutil.virtual_memory().available + except (ImportError, AttributeError): + pass + return 2 * 1024 ** 3 + + +def _available_gpu_memory_bytes(): + """Best-effort estimate of free GPU memory in bytes. + + Returns 0 if CuPy / CUDA is unavailable or the query fails -- callers + use that as a sentinel meaning "no GPU info, skip the guard". + """ + try: + import cupy as _cp + free, _total = _cp.cuda.runtime.memGetInfo() + return int(free) + except Exception: + return 0 + + +def _check_memory(height, width): + """Raise MemoryError if the basin kernel would exceed 50% of RAM.""" + required = int(height) * int(width) * _BYTES_PER_PIXEL + available = _available_memory_bytes() + if required > 0.5 * available: + raise MemoryError( + f"basin_d8 on a {height}x{width} grid requires " + f"~{required / 1e9:.1f} GB of working memory but only " + f"~{available / 1e9:.1f} GB is available. Use a " + f"dask-backed DataArray for out-of-core processing." + ) + + +def _check_gpu_memory(height, width): + """Raise MemoryError if the CuPy kernel would exceed 50% of free GPU RAM. + + Skips the check (returns silently) when ``_available_gpu_memory_bytes`` + cannot determine the free memory -- e.g. on hosts without CUDA, where + the kernel will fail at the cupy.asarray boundary anyway. + """ + available = _available_gpu_memory_bytes() + if available <= 0: + return + required = int(height) * int(width) * _GPU_BYTES_PER_PIXEL + if required > 0.5 * available: + raise MemoryError( + f"basin_d8 on a {height}x{width} grid requires " + f"~{required / 1e9:.1f} GB of GPU working memory but only " + f"~{available / 1e9:.1f} GB is free on the active device. " + f"Use a dask+cupy DataArray for out-of-core processing." + ) + + # ===================================================================== # CPU kernels # ===================================================================== @@ -360,6 +447,7 @@ def basin_d8(flow_dir: xr.DataArray, data = flow_dir.data if isinstance(data, np.ndarray): + _check_memory(*data.shape) from xrspatial.hydro.watershed_d8 import _watershed_cpu fd = data.astype(np.float64) h, w = fd.shape @@ -370,6 +458,7 @@ def basin_d8(flow_dir: xr.DataArray, out = _watershed_cpu(fd, labels, state, h, w) elif has_cuda_and_cupy() and is_cupy_array(data): + _check_gpu_memory(*data.shape) out = _basins_cupy(data) elif has_cuda_and_cupy() and is_dask_cupy(flow_dir): diff --git a/xrspatial/hydro/tests/test_basin_d8.py b/xrspatial/hydro/tests/test_basin_d8.py index 1a3612f8..0216c884 100644 --- a/xrspatial/hydro/tests/test_basin_d8.py +++ b/xrspatial/hydro/tests/test_basin_d8.py @@ -261,3 +261,76 @@ def test_basin_dask_cupy_random(): np.testing.assert_allclose( dk_result, dcp_result, equal_nan=True, ), f"Mismatch with chunks={chunks}" + + +# ==================================================================== +# Memory guard tests +# ==================================================================== + +class TestMemoryGuard: + """Memory guard on the eager numpy / cupy backends.""" + + def test_numpy_huge_raster_raises(self): + """Numpy backend raises MemoryError when projected RAM exceeds budget.""" + from unittest.mock import patch + + flow_dir = np.zeros((4, 4), dtype=np.float64) + fd_da = create_test_raster(flow_dir, backend='numpy') + + with patch( + "xrspatial.hydro.basin_d8._available_memory_bytes", + return_value=1, + ): + with pytest.raises(MemoryError, match="working memory"): + basin(fd_da) + + def test_numpy_normal_input_succeeds(self): + """Normal-size raster passes the guard with real memory.""" + flow_dir = np.zeros((10, 10), dtype=np.float64) + fd_da = create_test_raster(flow_dir, backend='numpy') + result = basin(fd_da) + assert result.shape == (10, 10) + + @dask_array_available + def test_dask_path_skips_guard(self): + """Dask backend bypasses the guard -- per-tile allocations are bounded.""" + from unittest.mock import patch + + flow_dir = np.zeros((6, 6), dtype=np.float64) + fd_da = create_test_raster(flow_dir, backend='dask', chunks=(3, 3)) + + with patch( + "xrspatial.hydro.basin_d8._available_memory_bytes", + return_value=1, + ): + result = basin(fd_da) + _ = result.data[:3, :3].compute() + + def test_error_message_mentions_dimensions(self): + """The error message should mention the grid dimensions and dask.""" + from unittest.mock import patch + + flow_dir = np.zeros((7, 9), dtype=np.float64) + fd_da = create_test_raster(flow_dir, backend='numpy') + + with patch( + "xrspatial.hydro.basin_d8._available_memory_bytes", + return_value=1, + ): + with pytest.raises(MemoryError, match=r"7x9.*dask"): + basin(fd_da) + + @cuda_and_cupy_available + def test_cupy_huge_raster_raises(self): + """CuPy backend raises MemoryError when projected GPU RAM exceeds budget.""" + from unittest.mock import patch + + flow_dir = np.zeros((4, 4), dtype=np.float64) + fd_da = create_test_raster(flow_dir, backend='cupy') + + with patch( + "xrspatial.hydro.basin_d8._available_gpu_memory_bytes", + return_value=1, + ): + with pytest.raises(MemoryError, match="GPU working memory"): + basin(fd_da)