Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .claude/sweep-performance-state.csv
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ geodesic,2026-03-31T18:00:00Z,N/A,compute-bound,0,,
geotiff,2026-04-16T18:00:00Z,SAFE,IO-bound,0,fixed-in-tree,"Fixed-in-tree 2026-04-16: (1) read_geotiff_dask caps total chunk count at 1M and auto-scales chunks with a UserWarning when exceeded (linear graph scaling confirmed: 16384 tasks/1109ms vs 16 tasks/65ms baseline). (2) _nvcomp_batch_compress deflate path now batches GPU->CPU adler32 transfers via a single contiguous .get() and memoryview slicing, eliminating N per-tile sync points. 435 geotiff tests pass (2 pre-existing matplotlib deepcopy unrelated)."
glcm,2026-03-31T18:00:00Z,SAFE,compute-bound,0,,"Downgraded to MEDIUM. da.stack without rechunk is scheduling overhead, not OOM risk."
hillshade,2026-04-16T12:00:00Z,SAFE,compute-bound,0,,"Re-audit after Horn's method rewrite (PR 1175): clean stencil, map_overlap depth=(1,1), no materialization. Zero findings."
hydro,2026-05-01,RISKY,memory-bound,0,1416,"Fixed-in-tree 2026-05-01: hand_mfd._hand_mfd_dask now assembles via da.map_blocks instead of eager da.block of pre-computed tiles (matches hand_dinf pattern). Remaining MEDIUM: sink_d8 CCL fully materializes labels (inherently global), flow_accumulation_mfd frac_bdry held in driver dict instead of memmap-backed BoundaryStore. D8 iterative paths (flow_accum/fill/watershed/basin/stream_*) use serial-tile sweep with memmap-backed boundary store -- per-tile RAM bounded but driver iterates O(diameter) times. flow_direction_*, flow_path/snap_pour_point/twi/hand_d8/hand_dinf are SAFE."
kde,2026-04-14T12:00:00Z,SAFE,compute-bound,0,,Graph construction serialized per-tile. _filter_points_to_tile scans all points per tile. No HIGH findings.
mahalanobis,2026-03-31T18:00:00Z,SAFE,compute-bound,0,,False positive. Numpy path materializes by design. Dask path uses lazy reductions + map_blocks.
morphology,2026-03-31T18:00:00Z,SAFE,compute-bound,0,,
Expand Down
72 changes: 43 additions & 29 deletions xrspatial/hydro/hand_mfd.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,35 +597,49 @@ def _hand_mfd_dask(fractions_da, flow_accum_da, elev_da, threshold,

boundaries = boundaries.snapshot()

# Assemble
rows = []
for iy in range(n_tile_y):
row = []
for ix in range(n_tile_x):
y_start = sum(chunks_y[:iy])
y_end = y_start + chunks_y[iy]
x_start = sum(chunks_x[:ix])
x_end = x_start + chunks_x[ix]

fr_chunk = np.asarray(
fractions_da[:, y_start:y_end, x_start:x_end].compute(),
dtype=np.float64)
fa_chunk = np.asarray(
flow_accum_da.blocks[iy, ix].compute(), dtype=np.float64)
el_chunk = np.asarray(
elev_da.blocks[iy, ix].compute(), dtype=np.float64)
_, h, w = fr_chunk.shape

exits = _compute_exit_labels_mfd(
iy, ix, boundaries, frac_bdry,
chunks_y, chunks_x, n_tile_y, n_tile_x)

tile = _hand_mfd_tile_kernel(
fr_chunk, fa_chunk, el_chunk, h, w, threshold, *exits)
row.append(da.from_array(tile, chunks=tile.shape))
rows.append(row)

return da.block(rows)
# Lazy assembly: each tile is recomputed on demand from the converged
# boundary state. Driver memory holds only the captured ``boundaries``
# / ``frac_bdry`` snapshots (boundary strips, not full tiles), so peak
# memory scales with chunk size rather than the full grid.
#
# ``fractions_da`` is 3D (8, H, W); we cannot align its chunks with the
# 2D output via map_blocks directly. We pre-compute the (y_start,
# y_end, x_start, x_end) offsets per tile so each map_blocks closure
# call only triggers its own fractions slice.
cum_y = np.zeros(n_tile_y + 1, dtype=np.int64)
np.cumsum(chunks_y, out=cum_y[1:])
cum_x = np.zeros(n_tile_x + 1, dtype=np.int64)
np.cumsum(chunks_x, out=cum_x[1:])

def _tile_fn(fa_block, el_block, block_info=None):
if block_info is None or 0 not in block_info:
return np.full(fa_block.shape, np.nan, dtype=np.float64)
iy, ix = block_info[0]['chunk-location']
y_start = int(cum_y[iy])
y_end = int(cum_y[iy + 1])
x_start = int(cum_x[ix])
x_end = int(cum_x[ix + 1])

fr_chunk = np.asarray(
fractions_da[:, y_start:y_end, x_start:x_end].compute(),
dtype=np.float64)
fa_chunk = np.asarray(fa_block, dtype=np.float64)
el_chunk = np.asarray(el_block, dtype=np.float64)
_, h, w = fr_chunk.shape

exits = _compute_exit_labels_mfd(
iy, ix, boundaries, frac_bdry,
chunks_y, chunks_x, n_tile_y, n_tile_x)

return _hand_mfd_tile_kernel(
fr_chunk, fa_chunk, el_chunk, h, w, threshold, *exits)

return da.map_blocks(
_tile_fn,
flow_accum_da, elev_da,
dtype=np.float64,
meta=np.array((), dtype=np.float64),
)


def _hand_mfd_dask_cupy(fractions_da, flow_accum_da, elev_da, threshold,
Expand Down
55 changes: 55 additions & 0 deletions xrspatial/hydro/tests/test_hand_mfd.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,61 @@ def test_numpy_equals_dask(self, chunks):
equal_nan=True, rtol=1e-10)


@dask_array_available
class TestHandMfdDaskLazyAssembly:
"""Regression for issue #1416.

The dask assembly phase must use ``map_blocks`` so each tile is
recomputed on demand from the converged boundary state. An earlier
implementation built a list-of-lists of pre-computed numpy arrays
and called ``da.block``, which held every tile in driver memory at
once and OOMed on large inputs.
"""

def test_dask_result_is_lazy(self):
import dask.array as da

fracs = _make_all_east(8, 8)
fa = np.ones((8, 8), dtype=np.float64)
elev = np.zeros((8, 8), dtype=np.float64)
fd_da, fa_da, el_da = _make_hand_mfd_rasters(
fracs, fa, elev, backend='dask', chunks=(2, 2))

result = hand_mfd(fd_da, fa_da, el_da, threshold=1)

# The result must be a dask-backed DataArray with intact chunking.
assert isinstance(result.data, da.Array)
assert result.data.chunks == ((2, 2, 2, 2), (2, 2, 2, 2))

def test_dask_assembly_does_not_materialize_all_tiles(self):
"""The lazy graph must contain a separate task per output chunk.

``da.block`` of pre-computed arrays produces a single ``from_array``
task plus concatenation overhead. ``map_blocks`` produces one
kernel-call task per chunk, which is what we need so that only one
tile is in RAM at a time during execution.
"""
import dask.array as da

fracs = _make_all_east(6, 6)
fa = np.ones((6, 6), dtype=np.float64)
elev = np.zeros((6, 6), dtype=np.float64)
fd_da, fa_da, el_da = _make_hand_mfd_rasters(
fracs, fa, elev, backend='dask', chunks=(2, 2))

result = hand_mfd(fd_da, fa_da, el_da, threshold=1)

# 3x3 = 9 output chunks. Number of leaf keys for the result name
# must match the chunk count.
result_arr = result.data
assert isinstance(result_arr, da.Array)
result_keys = [
k for k in result_arr.__dask_graph__()
if isinstance(k, tuple) and k[0] == result_arr.name
]
assert len(result_keys) == 9


@cuda_and_cupy_available
class TestHandMfdCuPy:
def test_numpy_equals_cupy(self):
Expand Down
Loading