Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduling should be topology/NUMA aware #319

Open
nagisa opened this issue Apr 17, 2017 · 24 comments
Open

Scheduling should be topology/NUMA aware #319

nagisa opened this issue Apr 17, 2017 · 24 comments

Comments

@nagisa
Copy link

nagisa commented Apr 17, 2017

Currently rayon spawns as many threads as there are logical PUs on the machine and does nothing more as far as I can tell. This is subpar. Instead it should look at the hardware topology and make an informed decision on how many threads to spawn and where to schedule them. Even more so now, that the rayon-core will share the same pool of workers across all the different users of rayon.

@cuviper
Copy link
Member

cuviper commented Apr 17, 2017

I agree we should have something, but it's a hard problem in general. To really be NUMA-optimized, programs ought to use a NUMA-aware allocator and/or migrate pages between nodes as needed, both of which are really outside of the scope of rayon. We don't have any idea of what data might be access by a job, let alone where it lives, except for perhaps the immediate collection being iterated.

As a first step, I think we could do better just by offering a node-restricted threadpool. You can already accomplish this manually, by setting num_threads and using a start_handler to set the scheduling affinity to the appropriate CPU mask. I think wrapping this in a nice API would get us an "80%" solution, where experts can use other means to allocate their memory on a specific node, then launch rayon into that node.

For deeper integration, I think the job stealing would need to be aware of the NUMA hierarchy, which can span multiple levels. Start by only trying to steal jobs from your own node, then look one level out, etc. But this is still a fairly crude way to attempt to keep memory locality, as again we don't really know the way data is allocated.

We should also research what other threading libraries in other languages do for NUMA.

@cuviper
Copy link
Member

cuviper commented Apr 17, 2017

Even more so now, that the rayon-core will share the same pool of workers across all the different users of rayon.

FWIW, the workers were also shared before rayon-core, except when your project resolved to multiple distinct rayon versions. The point of rayon-core is to make sure we have that common pool even when there are multiple rayon versions.

There's ThreadPool::new() if you really want your own private pool, and I'm imagining that setting affinity to a NUMA node could be just another part of the Configuration.

@nikomatsakis
Copy link
Member

nikomatsakis commented Apr 19, 2017

I basically agree with @cuviper =) -- solid goal, difficult to achieve well. Giving people the ability to configure things would be an obvious starting point.

@farnoy
Copy link

farnoy commented Aug 20, 2018

Another thing to consider would be a split-LLC topology, I think all of this can be detected using hwloc, there seem to be Rust bindings available.

This issue affects me greatly, so I would like to help with implementation. I have a NUMA and split-LLC (two L3 caches per NUMA node) CPU, so I can test it myself.

I am no expert on the matter and have not researched work-stealing queues and algorithms, but my naive idea would be as follows.

  1. Keep a list of queues to steal from for each locality bucket (think NUMA node)
  2. Have each worker update the location of its own queue in that data structure periodically, to reflect the latest scheduling information from the OS
  3. Prioritize stealing work-items from the same bucket as the worker, going to other buckets if there is nothing to steal

As far as memory locality is concerned, I think that if work-stealing queues are allocated in unique pages of memory, the OS will migrate them automatically. We could always try to do things like that explicitly.

Would this solution introduce expensive locking to maintain these lists of queues? Is there a clever way to reduce contention? My intuition would be that in a loaded system, worker threads would eventually settle down on specific CPUs and this migration would be very rare, unless external pressure forces the OS to move rayon threads around. It seems to me that this initial cost seems similar to the one exerted by work-stealing operations that hammer that one queue where the work initiated in. Atomics and coherency can't be cheap at that level of contention.

Any feedback and guidance is appreciated.

@cuviper cuviper changed the title Schedulling should be NUMA aware Scheduling should be NUMA aware May 31, 2019
@cuviper
Copy link
Member

cuviper commented May 31, 2019

Here's a user's experience where OpenMP fared much better than Rayon:

https://www.reddit.com/r/rust/comments/bto10h/update_a_scaling_comparison_between_rustrayon_and/

In my own comments and experiments on 128-thread POWER9, I found that while we fared worse on all threads, Rayon and OpenMP actually perform pretty similarly when pinned to a single NUMA node. In fact Rayon showed higher throughput on 4/5 tests, overall 6.17s elapsed time vs. 6.62s.

@jack-pappas
Copy link

Intel's OpenMP library is implemented on top of their open-source, Apache2-licensed Thread Building Blocks (TBB) library; perhaps there are some ideas there that can be ported into Rayon to improve performance (on NUMA architectures or otherwise)?

https://github.com/intel/tbb

@cuviper to your comment above (from a couple years back): TBB includes a custom allocator, tbbmalloc, which is also part of their open-source repo (under the src/tbbmalloc folder). Maybe that could be used with Rust's custom allocator functionality? (If so, I'd guess linking+invoking tbbmalloc would be the easiest way at first; then maybe it could be ported later if it proves fruitful.)

@cuviper
Copy link
Member

cuviper commented Jun 3, 2019

@jack-pappas My impression is that tbbmalloc is about scaling the allocator itself. That may be beneficial if you're allocating from different threads, but I know that glibc has made its own scalability improvements in recent years too.

Also, I don't see any mention of NUMA in the TBB source. From this forum post, it sounds like they also punt on this issue:

TBB task scheduler does not control threads distribution across available cores. This completely belongs to OS scheduler domain.

It might be an interesting data point to port that Babel Stream code to TBB though.

@cuviper
Copy link
Member

cuviper commented Jun 3, 2019

My wild guess is that OpenMP just has more predictable memory access than Rayon's work-stealing pool, so memory that's first touched from a thread on one particular NUMA node will probably continue to be used only on that thread/node.

@wagnerf42
Copy link
Contributor

are we sure the vectors are not reallocated in the motivating example ? got bit several times by that.

also, i'll be doing a bit of numa aware scheduling in rayon-adaptive but not right now.

@nagisa nagisa changed the title Scheduling should be NUMA aware Scheduling should be topology/NUMA aware Feb 16, 2020
@nagisa
Copy link
Author

nagisa commented Feb 16, 2020

Re-titling the issue to be more general than just "NUMA" as nowadays even within a single NUMA node certain logical cores are a better choices than the others when deciding where to send work.

@retep998
Copy link

Consider using the CPU Set API and/or the NUMA stuff for Windows.

@gdonval
Copy link

gdonval commented Nov 11, 2022

Has there been work going on regarding this issue?

@HadrienG2
Copy link
Contributor

HadrienG2 commented Oct 28, 2023

Since all idiomatic Rust bindings to hwloc are pretty incomplete, wrong in places (e.g. they assume every object has a CpuSet, which is not true of I/O objects), and abandoned, I've been forking hwloc2-rs into an hwlocality successor. It is now close-ish to being released, I just want to add a lot more test coverage, do a final general API review, and add more examples to the docs. Should be done in a couple of months.

Once that is done, and assuming sufficient spare time budget, I would like to use it to write a locality-aware work-stealing scheduler demonstrator that uses a pinned thread pool with a hierarchical work-stealing architecture : steal from your hyperthread, then from your L3 cache shard, then from your Die, then from your Package, then from anywhere in the Machine. The actual hierarchy would not be hardcoded like this, but automatically generated from the hwloc topology tree by translating each node with multiple children into a crossbeam deque (or something like it).

I'd implement a join() entry point and a scheduling overhead microbenchmark (most likely good old stupid recursive fibonacci), then compare it to a simple flat thread pool (threads are pinned, but there is a single top-level crossbeam deque spanning all of them), and to rayon. This would allow me to answer the following questions:

  • How beneficial is it to take locality into account at the scheduling level?
  • How much code complexity does it add?
  • Does rayon's join() have reasonable scheduling overhead compared to a simple flat pinned thread pool, or are there other inefficiencies to take care of there?

Of course, the answer is likely to be hardware dependent, but thankfully I have access to a nice range of x86 hardware:

  • Mostly-symmetric machines where the only form of inner locality is L1d cache sharing between hyperthreads.
  • A Zen 2 machine where the L3 cache is sharded between several sets of CPU cores within a single socket.
  • A big NUMA server with two sockets adressing completely unrelated RAM.
  • ...so the main flavor of x86 locality weirdness that I'm missing is a post-Adler Lake Intel CPU with P-cores and E-cores. Maybe someone else can run the benchmark on one for me if I get there?

Then I could run another test which actually manipulates some data to demonstrate the cache locality benefits. Need to think a bit about a good guinea pig task, but the general idea is that thread A should spawn a task that is best processed by thread A itself or another thread close to it because the data is hot in thread A's cache and higher cache levels above thread A. Most likely some variation of map-reduce would do the trick?

Assuming I manage to do all of this, it would provide good input to this discussion : how much benefit can we hope to get from adding NUMA-aware scheduling to rayon, and how much code complexity would it cost in return?

@Alnaimi-
Copy link

@HadrienG2, I have been following your work at https://github.com/HadrienG2/hwlocality. Any updates on progress?

Thanks!

@HadrienG2
Copy link
Contributor

HadrienG2 commented Jan 19, 2024

Hi @Alnaimi-,

Like all plans, the above plan needed a number of adjustments when meeting the real world 😉

First, producing an hwlocality v1 that is up to the quality standards that I want is taking longer than I expected for a number of reasons. Namely other stuff competing for my work hours, people starting to use it (which is super cool) and reporting tons of issues with unusual build configurations (which is time consuming to fix); and some features like hwloc group object creation being just unexpectedly hard to test because I actually need to reverse some undocumented parts of the hwloc API contract.

For those who can't wait and don't mind running untested code that may have a few bugs and/or still get a few API adjustments in the future, my recommendation remains to use a cargo git dependency if you can, but a v1.0.0-alpha.1 is also available for those for whom going through crates.io is a must (which, as a nice bonus, also allows me to have a reasonably up-to-date version of the docs on docs.rs).

In parallel, I had some spare time at home at the end of last year and I decided to use it to start exploring the thread pool problem without waiting for hwlocality to be 100% finished. The current product of this ongoing exploration is https://github.com/hadrienG2/viscose .


Here are the rules of the game that I set for myself:

  • I did not focus on faithfully reproducing the complete Rayon feature set, which would take ages, but only on getting a good join() implementation. My idea is that once you have a solid join() almost everything else in Rayon can be built on top of it.
  • My main figure of merit for design decisions was the performance of workloads that are limited by join performance (as stressed by the classic fibonacci microbenchmark) or by memory/cache access performance (as stressed by a bunch of STREAM-style microbenchmarks : squaring each element of an array, summing them...), since I believe this is where locality optimizations are most likely to have a positive and negative impact.
  • I tried for a while to strictly mimick rayon's scheduling hint free user experience, but am increasingly leaning towards having optional scheduling hints because these can help a lot for some workloads (e.g. having a way to dynamically switch to a sequential join() implementation once enough work has been spawned, for some workload-dependent definition of enough, enormously helps workload that spawn lots of tiny tasks).

And here are some preliminary observations (short of time to do proper benchmark plots/stats).


First, at small CPU counts, the current rayon join implementation is very fast (a few µs per join from memory, I don't have the bench numbers at hand), and my desire not to regress with respect to this sets a strong constraint on how complex task scheduling algorithms can be. For example, I initially wanted to rigorously follow the tree-like organization of hardware in my scheduling algorithm (first try to distribute tasks within the current L3 cache shard, then if all workers there are busy try within the current NUMA node, then across the whole machine...), but in spite of hours of optimization work the locality benefits of this never recouped the cost of traversing a tree, so now I'm sticking with nearest neighbor search on a carefully ordered 1D worker list with a couple of tricks.

So far, my locality optimizations have mostly seemed to benefit smaller tasks, i.e. I've not seed a big difference in the performance of cache bound jobs whose working set covers the full L3 cache, but the performance of jobs whose working set only covers a bit more than one L2 cache improves significantly. I suspect that's because for large enough jobs, each worker eventually ends up having a stable, sizeable work queue, and doesn't need to exchange work with others anymore, so the only scheduler performance criterion that matters for larger jobs once they reach the steady state is work queue push/pop performance, which is something that crossbeam does reasonably well.

Overall, I'm doing as good as or better than rayon for most problem sizes, but I still take quite a hit from locality issues in my current implementation (big throughput drop when going from one L3 cache shard to two on Zen 2). I've reached the conclusion that this happens because in a binary fork-join model, there is no single locality-aware work stealing algorithm that is good for both 1/distributing work across workers at the start of a job, and for 2/rebalancing the workload one the job is running over all CPU cores.

To see why, recall that recursive join() generates big chunks of work in the first iterations and small chunks of work in the last iterations. At the beginning, the remote part of the first join() should contain 1/2 of the work, then the next join() recursion level spawns 1/4 of the work, the next one spawns 1/8 of the work, etc.

Because low-locality interconnects (between different NUMA nodes, Zen 2 L3 cache shards...) are not as fast as higher-locality interconnects (between cores in an L3 cache shard), it is better to send a few big chunks of work across the lower-locality interconnects than many small chunks of work. So when you're initially spawning a job, you want the first join() iterations to send lots work to very remote CPU cores, then the next join() iterations to send increasingly less work to increasingly closer CPU cores, until all CPU cores are busy.

Once all CPU cores are busy, your goal changes : now, you have hopefully evenly spread the work across the CPU, and the goal is only to feed back CPUs that have run out of work. The best way to feed them is for them to steal work from the nearest (by locality criterion) neighbor whenever needed, as this will be the most efficient form of communication and if the workload is balanced well, shouldn't be a frequent event.

The way I hope to resolve this tension between sending work far away at the start of a job and exchanging work with nearest neighbor in the steady state, is to switch to full-blown double-ended scheduling queues that, unlike crossbeam::deque, allow workers to give each other work, not just steal from each other. This will allow workers that handle new jobs to consciously give jobs to remote other workers, while keeping the work stealing logic local. I have written one such deque that seems to perform reasonably well, and the next step, when I'll find the time, is to implement the actual work-giving scheme, which will require some extra plumbing that I don't have yet.

@Alnaimi-
Copy link

You are a hero. Thanks for the detailed response. Really excited for this.

@HoKim98
Copy link

HoKim98 commented Feb 4, 2024

Hi @Alnaimi-,

Like all plans, the above plan needed a number of adjustments when meeting the real world 😉

First, producing an hwlocality v1 that is up to the quality standards that I want is taking longer than I expected for a number of reasons. Namely other stuff competing for my work hours, people starting to use it (which is super cool) and reporting tons of issues with unusual build configurations (which is time consuming to fix); and some features like hwloc group object creation being just unexpectedly hard to test because I actually need to reverse some undocumented parts of the hwloc API contract.

For those who can't wait and don't mind running untested code that may have a few bugs and/or still get a few API adjustments in the future, my recommendation remains to use a cargo git dependency if you can, but a v1.0.0-alpha.1 is also available for those for whom going through crates.io is a must (which, as a nice bonus, also allows me to have a reasonably up-to-date version of the docs on docs.rs).

In parallel, I had some spare time at home at the end of last year and I decided to use it to start exploring the thread pool problem without waiting for hwlocality to be 100% finished. The current product of this ongoing exploration is https://github.com/hadrienG2/viscose .

Here are the rules of the game that I set for myself:

* I did not focus on faithfully reproducing the complete Rayon feature set, which would take ages, but only on getting a good `join()` implementation. My idea is that once you have a solid `join()` almost everything else in Rayon can be built on top of it.

* My main figure of merit for design decisions was the performance of workloads that are limited by join performance (as stressed by the classic fibonacci microbenchmark) or by memory/cache access performance (as stressed by a bunch of STREAM-style microbenchmarks : squaring each element of an array, summing them...), since I believe this is where locality optimizations are most likely to have a positive and negative impact.

* I tried for a while to strictly mimick rayon's scheduling hint free user experience, but am increasingly leaning towards having optional scheduling hints because these can help a lot for some workloads (e.g. having a way to dynamically switch to a sequential join() implementation once enough work has been spawned, for some workload-dependent definition of enough, enormously helps workload that spawn lots of tiny tasks).

And here are some preliminary observations (short of time to do proper benchmark plots/stats).

First, at small CPU counts, the current rayon join implementation is very fast (a few µs per join from memory, I don't have the bench numbers at hand), and my desire not to regress with respect to this sets a strong constraint on how complex task scheduling algorithms can be. For example, I initially wanted to rigorously follow the tree-like organization of hardware in my scheduling algorithm (first try to distribute tasks within the current L3 cache shard, then if all workers there are busy try within the current NUMA node, then across the whole machine...), but in spite of hours of optimization work the locality benefits of this never recouped the cost of traversing a tree, so now I'm sticking with nearest neighbor search on a carefully ordered 1D worker list with a couple of tricks.

So far, my locality optimizations have mostly seemed to benefit smaller tasks, i.e. I've not seed a big difference in the performance of cache bound jobs whose working set covers the full L3 cache, but the performance of jobs whose working set only covers a bit more than one L2 cache improves significantly. I suspect that's because for large enough jobs, each worker eventually ends up having a stable, sizeable work queue, and doesn't need to exchange work with others anymore, so the only scheduler performance criterion that matters for larger jobs once they reach the steady state is work queue push/pop performance, which is something that crossbeam does reasonably well.

Overall, I'm doing as good as or better than rayon for most problem sizes, but I still take quite a hit from locality issues in my current implementation (big throughput drop when going from one L3 cache shard to two on Zen 2). I've reached the conclusion that this happens because in a binary fork-join model, there is no single locality-aware work stealing algorithm that is good for both 1/distributing work across workers at the start of a job, and for 2/rebalancing the workload one the job is running over all CPU cores.

To see why, recall that recursive join() generates big chunks of work in the first iterations and small chunks of work in the last iterations. At the beginning, the remote part of the first join() should contain 1/2 of the work, then the next join() recursion level spawns 1/4 of the work, the next one spawns 1/8 of the work, etc.

Because low-locality interconnects (between different NUMA nodes, Zen 2 L3 cache shards...) are not as fast as higher-locality interconnects (between cores in an L3 cache shard), it is better to send a few big chunks of work across the lower-locality interconnects than many small chunks of work. So when you're initially spawning a job, you want the first join() iterations to send lots work to very remote CPU cores, then the next join() iterations to send increasingly less work to increasingly closer CPU cores, until all CPU cores are busy.

Once all CPU cores are busy, your goal changes : now, you have hopefully evenly spread the work across the CPU, and the goal is only to feed back CPUs that have run out of work. The best way to feed them is for them to steal work from the nearest (by locality criterion) neighbor whenever needed, as this will be the most efficient form of communication and if the workload is balanced well, shouldn't be a frequent event.

The way I hope to resolve this tension between sending work far away at the start of a job and exchanging work with nearest neighbor in the steady state, is to switch to full-blown double-ended scheduling queues that, unlike crossbeam::deque, allow workers to give each other work, not just steal from each other. This will allow workers that handle new jobs to consciously give jobs to remote other workers, while keeping the work stealing logic local. I have written one such deque that seems to perform reasonably well, and the next step, when I'll find the time, is to implement the actual work-giving scheme, which will require some extra plumbing that I don't have yet.

Thanks to @HadrienG2, I'm able to boost ~2x rayon performance in NVIDIA DGX-2, which uses 8 NUMA nodes, by binding each rayon thread into a physical CPU in the same NUMA node. It's implementation is very simple like: https://github.com/ulagbulag/kiss-icp-rs/blob/cf6471e0091a41a2481f1400f8ea5aba1b0ce6ec/core/src/lib.rs#L101

@HoKim98
Copy link

HoKim98 commented Feb 4, 2024

I created a sample library that enables NUMA-aware thread placement for existing rayon projects. It boosts x20 rayon sum on my NVIDIA DGX-2 with less CPU cores!

Machine OS Metric Elapsed time (OFF) Elapsed time (ON) Performance Boost
NVIDIA DGX-2 Ubuntu 22.04 rayon_sum 5,175,824 ns/iter (+/- 2,767,386) 250,119 ns/iter (+/- 21,970) 20.69x

Project repository: https://github.com/ulagbulag/sas
Benchmark source code (rayon_sum): https://github.com/ulagbulag/sas/blob/master/benches/rayon_sum.rs

Usage

fn main() {
    // Just place this function on the above of main function.
    sas::init();

    // ... your heavy works
}

@adamreichold
Copy link
Collaborator

@kerryeon Do I understand the library correctly insofar it relies on

threads.into_par_iter().for_each(|idx| ...);

running each closure on exactly one of the worker threads? If so, I am not sure this properly guaranteed and you may want to look into using spawn_handler to inject your own worker thread setup code reliably.

@HoKim98
Copy link

HoKim98 commented Feb 4, 2024

@kerryeon Do I understand the library correctly insofar it relies on

threads.into_par_iter().for_each(|idx| ...);

running each closure on exactly one of the worker threads? If so, I am not sure this properly guaranteed and you may want to look into using spawn_handler to inject your own worker thread setup code reliably.

Thanks @adamreichold ! I found the bug and resolved by using spawn_broadcast. It ensures to spawn an asynchronous task on every thread in the rayon thread-pool.

::rayon::scope(|s| {
    s.spawn_broadcast({
        move |_, ctx| {
            println!("{} => {:?}", ctx.index(), ::std::thread::current().id());
        }
    });
});

@Alnaimi-
Copy link

Alnaimi- commented Jun 14, 2024

Hi all,

Just checking if there has been any progress on this.

While the direction is unclear, I am exploring alternatives like Glommio. Unfortunately, Glommio tasks are executed on a single thread per core by design, which tends to favor I/O-bound work over CPU-bound work. It also means significant amount of chunking/join work required to parallelise the code.

Given my needs for a join-based, work-stealing scheduler that is locality-aware, Glommio seems less favorable. I understand that there was an effort to develop a NUMA-aware extension for Rayon by @HadrienG2. However, it appears that progress has stalled since Jan.

Thanks!

@HadrienG2
Copy link
Contributor

Yeah, I've been swamped with other things lately and it will be a while before I can get back to this. Meanwhile, you should perhaps check out @HoKim98 's sas crate and see if it improves things for you already?

@Alnaimi-
Copy link

Alnaimi- commented Jun 14, 2024

Thanks for the update. I understand things get busy. I've been doing some tests with @HoKim98's SAS work over the past two days. My tests so far indicate that all tasks are currently scheduled exclusively onto a single, randomly selected node. Meaning on a 4-node, 192-core system, only 48 cores are ever used.

Interestingly, I've observed that the execution times with and without SAS appear to be similar. This is likely due to the overhead incurred from random access when using all 192 cores, whereas with only 48 cores, the workload is confined to a single node. Especially as the test includes allocating some large vectors. Other than that, I am not sure how to explain the observed behaviour.

@HadrienG2
Copy link
Contributor

Status update: Sorry, I probably won't ever finish this. If anyone else wants to take over and explore this approach further, the prototype is at https://github.com/hadrienG2/viscose and I'm available to answer any question about how it works (just create an issue in the repo, it will ping my mailbox).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests