Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions webgraph/src/utils/par_sort_pairs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use anyhow::{ensure, Context, Result};
use dsi_bitstream::traits::NE;
use dsi_progress_logger::{concurrent_progress_logger, ProgressLog};
use rayon::prelude::*;
use rayon::Yield;
use rdst::RadixSort;
use thread_local::ThreadLocal;

Expand Down Expand Up @@ -292,7 +293,8 @@ impl<L> ParSortPairs<L> {
|| {
(
pl.clone(),
sorter_thread_states
loop {
if let Ok(state) = sorter_thread_states
.get_or(|| {
RefCell::new(SorterThreadState {
worker_id: worker_id.fetch_add(1, Ordering::Relaxed),
Expand All @@ -302,7 +304,21 @@ impl<L> ParSortPairs<L> {
sorted_pairs: (0..num_partitions).map(|_| Vec::new()).collect(),
})
})
.borrow_mut(),
.try_borrow_mut() {
// usually succeeds in the first attempt
break state;
}
// This thread is already borrowing its state higher in the call stack,
// but rayon is calling us again because of work stealing.
// But we cannot work right now (without allocating a new state, that is)
// so we yield back to rayon so it can resume the task that is already
// running in this thread.
match rayon::yield_now() {
None => panic!("rayon::yield_now() claims we are not running in a thread pool"),
Some(Yield::Idle) => panic!("Thread state is already borrowed, but there are no other tasks running"),
Some(Yield::Executed) => (),
}
}
)
},
|(pl, thread_state), pair| -> Result<_> {
Expand Down