diff --git a/xrspatial/hydro/flow_path_dinf.py b/xrspatial/hydro/flow_path_dinf.py index d0423cd3..6e82fb88 100644 --- a/xrspatial/hydro/flow_path_dinf.py +++ b/xrspatial/hydro/flow_path_dinf.py @@ -42,6 +42,94 @@ class cupy: # type: ignore[no-redef] from xrspatial.dataset_support import supports_dataset +# ===================================================================== +# Memory guards +# ===================================================================== +# +# CPU peak working set per pixel for the eager ``flow_path_dinf`` branch: +# fd float64 cast : 8 +# sp float64 cast : 8 +# out float64 : 8 +# Total ~24 bytes/pixel. The caller-provided ``flow_dir`` and +# ``start_points`` arrays already live in RAM before the kernel runs and +# are not double-counted here. +_BYTES_PER_PIXEL = 24 + +# GPU peak working set per pixel for ``_flow_path_dinf_cupy``: that path +# copies ``flow_dir`` and ``start_points`` to host via +# ``.get().astype()`` and runs the CPU kernel before converting the +# float64 output back to device via ``cp.asarray``. Device-side +# residency at peak is the input float64 (8 B/px) plus the output +# float64 (8 B/px); host-side matches the 24 B/px CPU budget. Use +# 32 B/px as a conservative GPU budget covering both copies plus +# headroom. +_GPU_BYTES_PER_PIXEL = 32 + + +def _available_memory_bytes(): + """Best-effort estimate of available host memory in bytes.""" + try: + with open('/proc/meminfo', 'r') as f: + for line in f: + if line.startswith('MemAvailable:'): + return int(line.split()[1]) * 1024 # kB -> bytes + except (OSError, ValueError, IndexError): + pass + try: + import psutil + return psutil.virtual_memory().available + except (ImportError, AttributeError): + pass + return 2 * 1024 ** 3 + + +def _available_gpu_memory_bytes(): + """Best-effort estimate of free GPU memory in bytes. + + Returns 0 if CuPy / CUDA is unavailable or the query fails -- callers + use that as a sentinel meaning "no GPU info, skip the guard". + """ + try: + import cupy as _cp + free, _total = _cp.cuda.runtime.memGetInfo() + return int(free) + except Exception: + return 0 + + +def _check_memory(height, width): + """Raise MemoryError if the kernel would exceed 50% of available RAM.""" + required = int(height) * int(width) * _BYTES_PER_PIXEL + available = _available_memory_bytes() + if required > 0.5 * available: + raise MemoryError( + f"flow_path_dinf on a {height}x{width} grid requires " + f"~{required / 1e9:.1f} GB of working memory but only " + f"~{available / 1e9:.1f} GB is available. Use a " + f"dask-backed DataArray for out-of-core processing." + ) + + +def _check_gpu_memory(height, width): + """Raise MemoryError if the CuPy kernel would exceed 50% of free GPU RAM. + + Skips the check (returns silently) when ``_available_gpu_memory_bytes`` + cannot determine the free memory -- e.g. on hosts without CUDA, where + the kernel will fail at the cupy.asarray boundary anyway. + """ + available = _available_gpu_memory_bytes() + if available <= 0: + return + required = int(height) * int(width) * _GPU_BYTES_PER_PIXEL + if required > 0.5 * available: + raise MemoryError( + f"flow_path_dinf on a {height}x{width} grid requires " + f"~{required / 1e9:.1f} GB of GPU working memory but only " + f"~{available / 1e9:.1f} GB is free on the active device. " + f"Use a dask+cupy DataArray for out-of-core processing." + ) + + # ===================================================================== # Pure-Python angle decomposition (for dask tracing loop) # ===================================================================== @@ -340,12 +428,15 @@ def flow_path_dinf(flow_dir_dinf: xr.DataArray, sp_data = start_points.data if isinstance(fd_data, np.ndarray): + _check_memory(*fd_data.shape) fd = fd_data.astype(np.float64) sp = np.asarray(sp_data, dtype=np.float64) H, W = fd.shape out = _flow_path_dinf_cpu(fd, sp, H, W) elif has_cuda_and_cupy() and is_cupy_array(fd_data): + _check_gpu_memory(*fd_data.shape) + _check_memory(*fd_data.shape) out = _flow_path_dinf_cupy(fd_data, sp_data) elif has_cuda_and_cupy() and is_dask_cupy(flow_dir_dinf): diff --git a/xrspatial/hydro/tests/test_flow_path_dinf.py b/xrspatial/hydro/tests/test_flow_path_dinf.py index ab2e548f..8c199a3e 100644 --- a/xrspatial/hydro/tests/test_flow_path_dinf.py +++ b/xrspatial/hydro/tests/test_flow_path_dinf.py @@ -217,3 +217,91 @@ def test_numpy_equals_dask_cupy(): np.testing.assert_allclose( np_result.data, dcp_result.data.compute().get(), equal_nan=True) + + +# ==================================================================== +# Memory guard tests +# ==================================================================== + +class TestMemoryGuard: + """Memory guard on the eager numpy / cupy backends.""" + + def test_numpy_huge_raster_raises(self): + """Numpy backend raises MemoryError when projected RAM exceeds budget.""" + from unittest.mock import patch + + fd = np.full((4, 4), 0.0, dtype=np.float64) + fd[:, -1] = -1.0 + sp = np.full((4, 4), np.nan) + sp[0, 0] = 1.0 + fd_da, sp_da = _make_fd_and_sp(fd, sp) + + with patch( + "xrspatial.hydro.flow_path_dinf._available_memory_bytes", + return_value=1, + ): + with pytest.raises(MemoryError, match="working memory"): + flow_path_dinf(fd_da, sp_da) + + def test_numpy_normal_input_succeeds(self): + """Normal-size raster passes the guard with real memory.""" + fd = np.full((3, 5), 0.0, dtype=np.float64) + fd[:, -1] = -1.0 + sp = np.full((3, 5), np.nan) + sp[0, 0] = 7.0 + fd_da, sp_da = _make_fd_and_sp(fd, sp) + result = flow_path_dinf(fd_da, sp_da) + assert result.shape == (3, 5) + + @dask_array_available + def test_dask_path_skips_guard(self): + """Dask backend bypasses the guard -- per-tile allocations are bounded.""" + from unittest.mock import patch + + fd = np.full((6, 6), 0.0, dtype=np.float64) + fd[:, -1] = -1.0 + sp = np.full((6, 6), np.nan) + sp[0, 0] = 1.0 + fd_da, sp_da = _make_fd_and_sp(fd, sp, backend='dask', chunks=(3, 3)) + + with patch( + "xrspatial.hydro.flow_path_dinf._available_memory_bytes", + return_value=1, + ): + result = flow_path_dinf(fd_da, sp_da) + assert result is not None + + def test_error_message_mentions_dimensions(self): + """The error message should mention the grid dimensions and dask.""" + from unittest.mock import patch + + fd = np.full((7, 9), 0.0, dtype=np.float64) + fd[:, -1] = -1.0 + sp = np.full((7, 9), np.nan) + sp[0, 0] = 3.0 + fd_da, sp_da = _make_fd_and_sp(fd, sp) + + with patch( + "xrspatial.hydro.flow_path_dinf._available_memory_bytes", + return_value=1, + ): + with pytest.raises(MemoryError, match=r"7x9.*dask"): + flow_path_dinf(fd_da, sp_da) + + @cuda_and_cupy_available + def test_cupy_huge_raster_raises(self): + """CuPy backend raises MemoryError when projected GPU RAM exceeds budget.""" + from unittest.mock import patch + + fd = np.full((4, 4), 0.0, dtype=np.float64) + fd[:, -1] = -1.0 + sp = np.full((4, 4), np.nan) + sp[0, 0] = 1.0 + fd_da, sp_da = _make_fd_and_sp(fd, sp, backend='cupy') + + with patch( + "xrspatial.hydro.flow_path_dinf._available_gpu_memory_bytes", + return_value=1, + ): + with pytest.raises(MemoryError, match="GPU working memory"): + flow_path_dinf(fd_da, sp_da)