From 2433de2c65cdf4097ed9c19abd324492708664d8 Mon Sep 17 00:00:00 2001 From: Brendan Collins Date: Wed, 29 Apr 2026 10:40:01 -0700 Subject: [PATCH] Guard stream_link_dinf() against unbounded memory allocations (#1343) Add memory guards to the eager numpy and cupy paths of stream_link_dinf, mirroring the pattern from #1335 (stream_link_d8) and #1341 (stream_link_mfd). D-inf encodes one continuous downstream angle per cell, so there is no (8, H, W) per-neighbor weight buffer like in MFD -- working set matches the d8 budget at 32 B/px (CPU) and 40 B/px (GPU). Guard raises MemoryError when projected usage exceeds 50% of available host or device memory, with the dimensions and a pointer to the dask path. Dask and dask+cupy paths bypass the guard since they process per-tile within chunk-bounded memory. --- xrspatial/hydro/stream_link_dinf.py | 95 +++++++++++++++++++ .../hydro/tests/test_stream_link_dinf.py | 77 +++++++++++++++ 2 files changed, 172 insertions(+) diff --git a/xrspatial/hydro/stream_link_dinf.py b/xrspatial/hydro/stream_link_dinf.py index 0aa497da..e5222ab8 100644 --- a/xrspatial/hydro/stream_link_dinf.py +++ b/xrspatial/hydro/stream_link_dinf.py @@ -60,6 +60,99 @@ class cupy: # type: ignore[no-redef] ) +# ===================================================================== +# Memory guards +# ===================================================================== +# +# CPU peak working set per pixel for ``_stream_link_dinf_cpu``: +# link_id : float64 -> 8 +# in_degree : int32 -> 4 +# orig_indeg : int32 -> 4 +# queue_r : int64 -> 8 +# queue_c : int64 -> 8 +# Total ~32 bytes/pixel. D-inf encodes one continuous downstream angle +# per cell, so there is no (8, H, W) per-neighbor weight buffer like in +# the MFD variant -- the working set matches the d8 budget. The +# caller-provided ``flow_dir`` and ``flow_accum`` arrays already live in +# RAM before the kernel runs and are not double-counted here. +_BYTES_PER_PIXEL = 32 + +# GPU peak working set per pixel for ``_stream_link_dinf_cupy``: +# angles_f64 : float64 -> 8 +# stream_mask_i8 : int8 -> 1 +# in_degree : int32 -> 4 +# orig_indeg : int32 -> 4 +# state : int32 -> 4 +# link_id : float64 -> 8 +# Total ~29 bytes/pixel. The ``fa_cp`` input copy adds another 8 B/px +# on the device. Use 40 B/px as a conservative budget covering both. +_GPU_BYTES_PER_PIXEL = 40 + + +def _available_memory_bytes(): + """Best-effort estimate of available host memory in bytes.""" + try: + with open('/proc/meminfo', 'r') as f: + for line in f: + if line.startswith('MemAvailable:'): + return int(line.split()[1]) * 1024 # kB -> bytes + except (OSError, ValueError, IndexError): + pass + try: + import psutil + return psutil.virtual_memory().available + except (ImportError, AttributeError): + pass + return 2 * 1024 ** 3 + + +def _available_gpu_memory_bytes(): + """Best-effort estimate of free GPU memory in bytes. + + Returns 0 if CuPy / CUDA is unavailable or the query fails -- callers + use that as a sentinel meaning "no GPU info, skip the guard". + """ + try: + import cupy as _cp + free, _total = _cp.cuda.runtime.memGetInfo() + return int(free) + except Exception: + return 0 + + +def _check_memory(height, width): + """Raise MemoryError if the kernel would exceed 50% of available RAM.""" + required = int(height) * int(width) * _BYTES_PER_PIXEL + available = _available_memory_bytes() + if required > 0.5 * available: + raise MemoryError( + f"stream_link_dinf on a {height}x{width} grid requires " + f"~{required / 1e9:.1f} GB of working memory but only " + f"~{available / 1e9:.1f} GB is available. Use a " + f"dask-backed DataArray for out-of-core processing." + ) + + +def _check_gpu_memory(height, width): + """Raise MemoryError if the CuPy kernel would exceed 50% of free GPU RAM. + + Skips the check (returns silently) when ``_available_gpu_memory_bytes`` + cannot determine the free memory -- e.g. on hosts without CUDA, where + the kernel will fail at the cupy.asarray boundary anyway. + """ + available = _available_gpu_memory_bytes() + if available <= 0: + return + required = int(height) * int(width) * _GPU_BYTES_PER_PIXEL + if required > 0.5 * available: + raise MemoryError( + f"stream_link_dinf on a {height}x{width} grid requires " + f"~{required / 1e9:.1f} GB of GPU working memory but only " + f"~{available / 1e9:.1f} GB is free on the active device. " + f"Use a dask+cupy DataArray for out-of-core processing." + ) + + # ===================================================================== # CPU kernel # ===================================================================== @@ -1192,6 +1285,7 @@ def stream_link_dinf(flow_dir_dinf: xr.DataArray, fa_data = flow_accum.data if isinstance(fd_data, np.ndarray): + _check_memory(*fd_data.shape) fd = fd_data.astype(np.float64) fa = np.asarray(fa_data, dtype=np.float64) stream_mask = np.where(fa >= threshold, 1, 0).astype(np.int8) @@ -1201,6 +1295,7 @@ def stream_link_dinf(flow_dir_dinf: xr.DataArray, out = _stream_link_dinf_cpu(fd, stream_mask, h, w) elif has_cuda_and_cupy() and is_cupy_array(fd_data): + _check_gpu_memory(*fd_data.shape) import cupy as cp fa_cp = cp.asarray(fa_data, dtype=cp.float64) fd_cp = fd_data.astype(cp.float64) diff --git a/xrspatial/hydro/tests/test_stream_link_dinf.py b/xrspatial/hydro/tests/test_stream_link_dinf.py index 40dc5d6e..aecf5c9d 100644 --- a/xrspatial/hydro/tests/test_stream_link_dinf.py +++ b/xrspatial/hydro/tests/test_stream_link_dinf.py @@ -166,3 +166,80 @@ def test_dask_matches_numpy(): np.testing.assert_array_equal( np.nan_to_num(np_result.values, nan=-999), np.nan_to_num(dask_result.values, nan=-999)) + + +# ==================================================================== +# Memory guard tests +# ==================================================================== + +class TestMemoryGuard: + """Memory guard on the eager numpy / cupy backends.""" + + def test_numpy_huge_raster_raises(self): + """Numpy backend raises MemoryError when projected RAM exceeds budget.""" + from unittest.mock import patch + + angles = np.zeros((4, 4), dtype=np.float64) + accum = np.ones((4, 4), dtype=np.float64) + + with patch( + "xrspatial.hydro.stream_link_dinf._available_memory_bytes", + return_value=1, + ): + with pytest.raises(MemoryError, match="working memory"): + _call(angles, accum, threshold=1) + + def test_numpy_normal_input_succeeds(self): + """Normal-size raster passes the guard with real memory.""" + angles = np.array([[ANGLE_E, ANGLE_E, PIT]], dtype=np.float64) + accum = np.array([[1.0, 2.0, 3.0]], dtype=np.float64) + result = _call(angles, accum, threshold=1) + assert result.shape == (1, 3) + + @dask_array_available + def test_dask_path_skips_guard(self): + """Dask backend bypasses the guard -- per-tile allocations are bounded.""" + from unittest.mock import patch + + angles = np.zeros((6, 6), dtype=np.float64) + accum = np.ones((6, 6), dtype=np.float64) + da_a = create_test_raster(angles, backend='dask', chunks=(3, 3)) + da_fa = create_test_raster(accum, backend='dask', chunks=(3, 3)) + + with patch( + "xrspatial.hydro.stream_link_dinf._available_memory_bytes", + return_value=1, + ): + result = stream_link_dinf(da_a, da_fa, threshold=1) + _ = result.data[:3, :3].compute() + + def test_error_message_mentions_dimensions(self): + """The error message should mention the grid dimensions and dask.""" + from unittest.mock import patch + + angles = np.zeros((7, 9), dtype=np.float64) + accum = np.ones((7, 9), dtype=np.float64) + + with patch( + "xrspatial.hydro.stream_link_dinf._available_memory_bytes", + return_value=1, + ): + with pytest.raises(MemoryError, match=r"7x9.*dask"): + _call(angles, accum, threshold=1) + + @cuda_and_cupy_available + def test_cupy_huge_raster_raises(self): + """CuPy backend raises MemoryError when projected GPU RAM exceeds budget.""" + from unittest.mock import patch + + angles = np.zeros((4, 4), dtype=np.float64) + accum = np.ones((4, 4), dtype=np.float64) + cp_a = create_test_raster(angles, backend='cupy') + cp_fa = create_test_raster(accum, backend='cupy') + + with patch( + "xrspatial.hydro.stream_link_dinf._available_gpu_memory_bytes", + return_value=1, + ): + with pytest.raises(MemoryError, match="GPU working memory"): + stream_link_dinf(cp_a, cp_fa, threshold=1)