From ddbd7a6728c9af71538094fe0042ca2e35364d7f Mon Sep 17 00:00:00 2001 From: Valentin Lorentz Date: Mon, 6 Oct 2025 15:25:27 +0200 Subject: [PATCH] par_sort_pairs: Add support for parallel iterators of Result --- webgraph/src/utils/par_sort_pairs.rs | 48 ++++++++++++++++++++++++++-- 1 file changed, 46 insertions(+), 2 deletions(-) diff --git a/webgraph/src/utils/par_sort_pairs.rs b/webgraph/src/utils/par_sort_pairs.rs index ac47709d..63bc8639 100644 --- a/webgraph/src/utils/par_sort_pairs.rs +++ b/webgraph/src/utils/par_sort_pairs.rs @@ -95,9 +95,23 @@ impl ParSortPairs<()> { pub fn par_sort_pairs( &self, pairs: impl ParallelIterator, + ) -> Result>> { + self.try_par_sort_pairs::(pairs.map(Ok)) + } + + pub fn try_par_sort_pairs>( + &self, + pairs: impl ParallelIterator>, ) -> Result>> { Ok(self - .par_sort_labeled_pairs(&(), (), pairs.map(|(src, dst)| (src, dst, ())))? + .try_par_sort_labeled_pairs( + &(), + (), + pairs.map(|pair| -> Result<_> { + let (src, dst) = pair.map_err(Into::into)?; + Ok((src, dst, ())) + }), + )? .into_iter() .map(|into_iter| into_iter.into_iter().map(|(src, dst, ())| (src, dst))) .collect()) @@ -163,6 +177,35 @@ impl ParSortPairs { >, >, > + where + L: Copy + Send + Sync, + S: Sync + BitSerializer, + D: Clone + Send + Sync + BitDeserializer, + { + self.try_par_sort_labeled_pairs::( + serializer, + deserializer, + pairs.map(Ok), + ) + } + + pub fn try_par_sort_labeled_pairs>( + &self, + serializer: &S, + deserializer: D, + pairs: impl ParallelIterator>, + ) -> Result< + Vec< + impl IntoIterator< + Item = ( + usize, + usize, + >::DeserType, + ), + IntoIter: Clone + Send + Sync, + >, + >, + > where L: Copy + Send + Sync, S: Sync + BitSerializer, @@ -221,7 +264,8 @@ impl ParSortPairs { .borrow_mut(), ) }, - |(pl, thread_state), (src, dst, label)| -> Result<_> { + |(pl, thread_state), pair| -> Result<_> { + let (src, dst, label) = pair.map_err(Into::into)?; ensure!( src < self.num_nodes, "Expected {}, but got {src}",