Scheduler + I/O #7265

wants to merge 128 commits into

7 participants


r? @graydon, @nikomatsakis, @pcwalton, or @catamorphism

Sorry this is so huge, but it's been accumulating for about a month. There's lots of stuff here, mostly oriented toward enabling multithreaded scheduling and improving compatibility between the old and new runtimes. Adds task pinning so that we can create the 'platform thread' in servo.

Here is the current runtime setup code.

About half of this has already been reviewed.

brson and others added some commits May 20, 2013
@brson brson core::rt: Move uv idle tests to idle mod 29d8300
@brson brson core::rt: Add bindings for async uv handles 8072690
@Aatch Aatch Add AtomicUint newtype 6d8d73c
@brson brson core: Add AtomicInt and cleanup 8f77a6f
@brson brson core::rt: Add RemoteCallback trait and uv implementation
This is used for signalling the event loop from other threads.
@brson brson core::rt: Add SchedHandle type 41c2168
@brson brson core::rt: Scheduler takes a WorkQueue
This will be for implementing a work-sharing strategy
@brson brson core::rt: Remove UvEventLoop::new_scheduler function 7f107c4
@brson brson core::rt: Add a very basic multi-threaded scheduling test 3f8095e
@brson brson core::rt: Add SleeperList
Just a simple place to stuff handles to sleeping schedulers.
@brson brson core::rt: Add SleeperList to Scheduler ed8c359
@brson brson core::rt: Add run_in_mt_newsched_task test function 5043ea2
@brson brson core::rt: Outline the full multithreaded scheduling algo. Implement s…
@brson brson core::rt: Fix an infinite recursion bug f343e61
@brson brson core::rt: Change the signature of context switching methods to avoid …
…infinite recursion
@brson brson Merge remote-tracking branch 'brson/io' into incoming
@brson brson core::rt: Add some notes about optimizations ca2eebd
@brson brson core::rt: Begin recording scheduler metrics 8eb358b
@brson brson core::rt: Fix two multithreading bugs and add a threadring test
This properly distributes the load now
@brson brson core::rt: deny(unused_imports, unused_mut, unused_variable) ea633b4
@brson brson core: Make atomic methods public e2bedb1
@brson brson std::rt: Use AtomicUint instead of intrinsics in comm 2e6d51f
@brson brson std::rt: Destroy the task start closure while in task context f7e242a
@brson brson std::rt: Remove in incorrect assert 1507df8
@brson brson core::rt: Implement SharedChan 422f663
@brson brson core::rt: Add SharedPort 51d257f
@brson brson core::rt: Add `MegaPipe`, an unbounded, multiple producer/consumer, l…
…ock-free queue
@brson brson std: Fix stage0 build
@brson brson rt: Add rust_get_num_cpus f9a5005
@brson brson std::rt: Configure test threads with RUST_TEST_THREADS. Default is nc…
…ores x2
@brson brson std::rt: Fix stream test to be parallel d6ccc6b
@brson brson std::rt: Fix a race in the UvRemoteCallback dtor d4de99a
@brson brson std::rt: Reduce task stack size to 1MB d83d38c
@toddaaro toddaaro debugged a compiler ICE when merging local::borrow changes into the m…
…ain io branch and modified the incoming new file to be api-compatible
@brson brson std::rt: Work around a dynamic borrowck bug 84d2695
@toddaaro toddaaro A basic implementation of pinning tasks to schedulers. No IO interact…
…ions have been planned for, and no forwarding of tasks off special schedulers is supported.
@brson brson std::rt: Remove old files e7213aa
Eric Reed Removing redundant libuv bindings eb11274
@toddaaro toddaaro added functionality to tell schedulers to refuse to run tasks that ar…
…e not pinned to them
Eric Reed Added libuv UDP function bindings. 39a575f
Eric Reed Corrected libuv UDP bindings. 5393e43
@brson brson std::rt: Add JoinLatch
This is supposed to be an efficient way to link the lifetimes
of tasks into a tree. JoinLatches form a tree and when `release`
is called they wait on children then signal the parent.

This structure creates zombie tasks which currently keep the entire
task allocated. Zombie tasks are supposed to be tombstoned but that
code does not work correctly.
@brson brson std::rt: Change the Task constructors to reflect a tree fd148cd
@brson brson std::rt: Tasks must have an unwinder. Simpler 90fbe38
Eric Reed Added a utility function to extract the udp handle from udp send requ…
Eric Reed added bindings to extract udp handle from udp send requests 03fe59a
Eric Reed Added a UdpWatcher and UdpSendRequest with associated callbacks a7f92c9
@toddaaro toddaaro redesigned the pinning to pin deal with things on dequeue, not on enq…
@brson brson Merge remote-tracking branch 'toddaaro/io' into io b08c446
@brson brson std::rt: Tasks contain a JoinLatch 505ef7e
@brson brson Merge remote-tracking branch 'brson/io-wip' into io
@brson brson Merge remote-tracking branch 'brson/io'
Eric Reed added wrappers about uv_ip{4,6}_{port,name} 9687437
Eric Reed Added a RtioUdpStream trait b51d188
Eric Reed added a function to convert C's ipv4 data structure into the Rust ipv…
…4 data structure.
Eric Reed added Eq and TotalEq instances for IpAddr 4744375
Eric Reed stated to implement UdpStream e42f28c
Eric Reed Started to implemented UdpStream 33ae193
Eric Reed Merge remote-tracking branch 'upstream/io' into io
@brson brson std::rt: Add util mod and num_cpus function 3281f5b
@brson brson std::rt: Check exchange count on exit 9ef4c41
@brson brson std::rt: move abort function to util module 021e81f
@brson brson std: Rename `abort!` to `rtabort!` to match other macros b5fbec9
@brson brson std::rt: Turn on multithreaded scheduling 5b2dc52
@brson brson std::rt: Improve the rtabort! macro 29ad8e1
@brson brson std::rt: Set the process exit code 915aaa7
@brson brson std::rt: Correct the numbers of default cores 5722c95
@brson brson std::rt: Document and cleanup the run function e1555f9
Eric Reed Wrote the Eq instance of IpAddr in a slightly different way. d777ba0
@toddaaro toddaaro Modified a match in resume_task_from_queue that was returning an int …
…that was then matched on to instead use an enum.
@brson brson Merge pull request #3 from toddaaro/io
Updated a match expression to use an enum instead of dispatching on an integer
Eric Reed Changed visibility from being on the impl to being on methods per lan…
…guage syntax change.
@brson brson std::rt: Update GC metadata in init 5086c08
Eric Reed socket based UDP io ac49b74
Eric Reed derived instances of Eq and TotalEq for IpAddr rather than implement …
…them manually.
@brson brson std: Make newsched failures log correctly 391bb0b
Eric Reed Merge remote-tracking branch 'upstream/io' into io 55dda46
@brson brson Merge remote-tracking branch 'brson/io-wip' into io bbf5469
@brson brson std::rt: Whitespace 4d39253
@brson brson Merge remote-tracking branch 'brson/io' into io-upstream

Note this doesn't turn on the new scheduler by default yet.


I pushed a number of new commits that port minor runtime features. rustc and the test runner both work with newsched now.


@nikomatsakis This moves the dynamic borrow check code from unstable::lang to rt::borrowck.


don't think this needs to be in a cell

thanks. fixed


I can't find the corresponding commit for this code, so I'll put a comment here. In

322     // XXX: Hitting TLS twice to check if the scheduler exists
323     // then to check for the task is not good for perf

This could be fixed with a borrow function analogous to option map_default. Something like do Local::borrow_or_default::<Scheduler>(GlobalContext) |sched| { ... }.

324     if unsafe { rust_try_get_task().is_not_null() } {
325         return OldTaskContext;
326     } else {
327         if Local::exists::<Scheduler>() {
328             let context = Cell::new_empty();

This cell appears to be unnecessary. Just turn the put_back calls into return statements.

329             do Local::borrow::<Scheduler, ()> |sched| {
330                 if sched.in_task_context() {
331                     context.put_back(TaskContext);
332                 } else {
333                     context.put_back(SchedulerContext);
334                 }
335             }
336             return context.take();
337         } else {
338             return GlobalContext;
339         }
340     }
Eric Reed Merge remote-tracking branch 'upstream/io' into io

Confusing comment. Really should say per-scheduler-thread queue, and "shared queue" refers to global-across-all-cpus, right?

yeah, 'per-task' wasn't right

and 'shared queue' means global, yes


I'm skeptical about doing an atomic-free read here:

  • On a seq-cst architecture (x86), the only advantage over atomic ops is that atomic ops restrict compiler reordering -- if a nonatomic write causes a cache-line conflict, it will be just as expensive at runtime as an atomic write.
  • On a non-seq-cst architecture (ARM), removing atomics will help performance, but also introduces dangers depending on how the message queue is implemented. If a message sender writes into a buffer and then changes a pointer, a nonatomic reader might see the pointer before they see the buffer. See rust-lang#7021.

(A release barrier on the sender's end and an acquire barrier on the checker's end would probably be the best you can do safely.)

Regarding the second point, the non-atomic read would only be sufficient to know when you can skip the dequeue. If the atomic read results in a pointer then you would still have to go back and do it atomically to be correct.


should be Local::borrow perhaps?

The double-take was confusing until I noticed that interpret_message_queue puts it back. This must be one of the things aaron was saying he wanted effects for so much. I might prefer to make it return the sched, something like let (msg_exists, sched) = sched.interpret_message_queue().

Yeah, this should be borrow. I'll change it. Making interpret_message_queue return the scheduler is fine but I'll leave that for later.


should be Local::borrow


likewise as above; probably at least need acquire/read barrier


doesn't need to be a Cell


I'm sure you're looking for a better way to structure this ugly pattern:

unsafe {
    let sched = Local::unsafe_borrow::<Scheduler>();
    Context::swap(...ctx1..., ...ctx2...);

    // We could be executing in a different thread now 
    let sched = Local::unsafe_borrow::<Scheduler>();

Consider using a third-order function like this:

fn borrow_scheduler_with_ctx_swap(blk: fn(Scheduler,
                                          swap: fn(Scheduler, Context, Context) -> Scheduler)
                                         -> U) -> U

Used like:

do borrow_scheduler_with_ctx_swap |sched, swap_fn| {
    ... stuff ...
    let reborrowed_sched = swap_fn(sched, ...ctx1..., ...ctx2...);
    (*reborrowed_sched).run_cleanup_job(); // swap() could perhaps do this for you

Actually I think maybe it doesn't need to be 3rd-order, just 2nd-order, but Context::swap can require the scheduler as an argument, and give the new scheduler as the return value.

 635             let last_task_context = match last_task {
 636                 Some(t) => Some(&mut t.saved_context), None => None
 637             };
 638             let next_task_context = match self.current_task {
 639                 Some(ref mut t) => Some(&mut t.saved_context), None => None
 640             };

These could be written with the not-yet-existing map_mut; c.f. #7394


rustc doesn't run with the newrt for me, segfaults (linux amd64)


I think this will be pretty unfriendly on valgrind. Maybe should be in a bench test instead.


Some comments on tests which aren't in this PR:

1139     fn test_simple_scheduling() {
1140         do run_in_bare_thread {
1141             let mut task_ran = false;
1142             let task_ran_ptr: *mut bool = &mut task_ran;
1144             let mut sched = ~new_test_uv_sched();
1145             let task = ~do Coroutine::new_root(&mut sched.stack_pool) {
1146                 unsafe { *task_ran_ptr = true; }
1147             };
1148             sched.enqueue_task(task);
1149   ;
1150             assert!(task_ran);
1151         }
1152     }

Seems fragile. I feel like this wants a port/channel blocking mechanism ,and also task_ran should be an atomic bool with release/acquire in child/parent respectively.

1163                 let task = ~do Coroutine::new_root(&mut sched.stack_pool) {
1164                     unsafe { *task_count_ptr = *task_count_ptr + 1; }
1165                 };

Is there some guarantee that these tasks will all run on the same core? Otherwise this will race, and will need to use atomicint and xadd. Same comment for lines 1181 and 1220.

1188                     let task1 = Cell::new(task1);
1189                     sched.enqueue_task(task1.take());

(from test_swap_tasks_then) -- doesn't need to be in a cell


Is there a plan for making the rust_try call in impl Unwinder not rely / call out to on C++?

In unstable/

 42 pub fn run_in_bare_thread(f: ~fn()) {
 43     let (port, chan) = comm::stream();
 44     // FIXME #4525: Unfortunate that this creates an extra scheduler but it's
 45     // necessary since rust_raw_thread_join_delete is blocking

I think pthread_detach can be used to avoid needing to join.


"THIS IS AWFUL." so awful even the indenting is messed up


 20 // XXX: It would be nice to define regs as `~Option<Registers>` since
 21 // the registers are sometimes empty, but the discriminant would
 22 // then misalign the regs again.

Why not Option<~Registers>?

209 // XXX: ptr::offset is positive ints only
210 #[inline]
211 pub fn mut_offset<T>(ptr: *mut T, count: int) -> *mut T {
212     use core::sys::size_of;
213     (ptr as int + count * (size_of::<T>() as int)) as *mut T
214 }

Seems to work just fine with negative ints. If it's not meant to it should assert?


I don't see a reason this needs to be AtomicOption. AtomicPtr should work.

The AtomicPtr interface is for unsafe pointers and doesn't automatically handle Drop correctly. I don't trust it.

oh good point


Using return to get rid of the cell in rt::context doesn't work because you can't return from inside do. Regarding the context switching functions, I agree it would be better to pass the scheduler through the context switch. To do it without TLS requires modifying a bunch of assembly.

Re: your comments on test_simple_scheduling, it doesn't need atomics because it is a single-threaded test (run_in_newsched_task vs run_in_mt_newsched_task).

re: rust_try we'll eventually use an intrinsic for catch but llvm provides no way to throw so we'll either continue calling C++ or open_code the appropriate call to __cxa_throw.

re: Option<~Registers> because it would contain an allocation and the goal is to not.

re: mut_offset. It's defined for uint. I think your saying I could just cast my negative int to a uint, which is true.


@bblum thanks for the review


suggest additions:
"Sometimes the biggest disasters aren't noticed at all - no one's around to write horror stories."
"If you avoided all other threats, the complexity of your own successes would eventually get you."
"Look out on the universe. It does not care, and even with all our science there are some disasters that we can not avert. All evil and good is petty before Nature."


I don't understand the need for dropped_child. Seems like on every exit path, the condition

if !child_link.dropped_child {
    child_link.dropped_child = true;

will always be true, because (a) children tasks don't modify this field in the parent's data, and (b) each task will only go through this code path once.

The release code path executes multiple times for tombstoned parents, but the child ref is only dropped once.


this pattern appears 4 times; wants a helper function


More minimal:

        let orders = ~[ Order { // 0 0
            immediate: true, // not important
            succeed: true,
            orders: ~[ Order { // 1 0
                immediate: true, // important
                succeed: true,
                orders: ~[ Order { // 2 0
                    immediate: false, // important
                    succeed: true,
                    orders: ~[] 

...and none of the 'succeed' values matter. Still looking...

add a comment saying something like "this test case makes sure to cover line 195."


do tombstones.consume |_, t| { t.release(true); }

in fact this is the(?) bug; we are running afoul of rust-lang#4355 -- if you write let x = tombstones.pop(); x.release(); it works fine.

toddaaro and others added some commits Jun 26, 2013
@toddaaro toddaaro Refactored the runtime to view coroutines as a component of tasks, in…
…stead of tasks as a component of coroutines.
@brson brson rt: Add global_args_lock functions to 0e07c8d
@toddaaro toddaaro merging task/coroutine refactoring back into upstream 062bfd3
@toddaaro toddaaro removed unnecessary import that slipped in during merge 27818ea
@brson brson std::rt: Ignore homed task tests 6fd15ff
@toddaaro toddaaro A missing ! made it so that the testcase schedule_home_states was thr…
…owing spurious assert failures. Why this did not result in the test case failing previously is beyond me.
@brson brson Merge remote-tracking branch 'toddaaro/niots'
@brson brson std: Use the same task failure message as C++ rt f8a4d09
Eric Reed IPv6 support for UDP and TCP. e6c5779
Eric Reed Merge remote-tracking branch 'upstream/io' into io
Eric Reed converted TODOs into XXXs b60cf0c
@brson brson Merge remote-tracking branch 'mozilla/master'

Can this run concurrently with the below tube.recv() call? If so, they can race and the receive can block forever.

Likewise with the accesses to incoming_streams in fn accept() in impl RtioTcpListener for UvTcpListener. Seems to me like the server_tcp_watcher callback can preempt an accept call (or even itself?).

Is there some reason a pipe isn't suitable, perhaps with a sharedchan?

This is all single-threaded. run_in_newsched_task creates a single scheduler. It can't use a pipe because the send takes place in scheduler context. Similarly, all the I/O code is single-threaded. It's assumed that, under a multithreaded scheduler, any I/O interactions will first arrange to be running on the scheduler to which the I/O is bound.

hm ok, I see


uv/ has the following redundant cell:

313         let incoming_streams_cell = Cell::new(self.incoming_streams.clone());
315         let incoming_streams_cell = Cell::new(incoming_streams_cell.take());

It seems to me that everywhere this is called, the argument closure always does only sched.enqueue_task(task). Could this function just be switch_running_tasks(~self, next_task: ~Coroutine)? It would make it easier to reason about the set of places tasks could become blocked.

At the moment it could. I'm not sure off hand whether we'll need the closure in the future.

Eric Reed and others added some commits Jul 8, 2013
@bors bors added a commit that referenced this pull request Jul 10, 2013
@bors bors auto merge of #7265 : brson/rust/io-upstream, r=brson
r? @graydon, @nikomatsakis, @pcwalton, or @catamorphism

Sorry this is so huge, but it's been accumulating for about a month. There's lots of stuff here, mostly oriented toward enabling multithreaded scheduling and improving compatibility between the old and new runtimes. Adds task pinning so that we can create the 'platform thread' in servo.

[Here]( is the current runtime setup code.

About half of this has already been reviewed.
@bors bors closed this Jul 10, 2013





The exit status could be an atomic uint, which would eliminate the need for a separate lock. On x86 this would even avoid generating bus-locking instructions.

Also, since we have unsafe global variables now, can this be implemented in rust?

It should probably be an atomic, yes. It's not performance critical though. I'm not sure whether unsafe globals are snapshotted, but the plan is indeed to rewrite all these bits of C++ global state in Rust.


I feel like the check for whether there's any work left to do and the decision to go to sleep need to be atomic with respect to the this.work_queue.push(task); this.sleeper_list.pop() sequence in enqueue_task. Otherwise two schedulers trying to run both code sequences can race, and the no-work-to-do scheduler can go to sleep and miss the signal from the enqueue-task scheduler.

This won't lead to complete hangs, since at least the enqueue-task scheduler will still be awake, but it can cause slowdowns, since the enqueued task will have to wait for the only awake scheduler to finish its current job, even though others are sleeping and waiting for work.

Suggested fix: open-code the access to the sleeper list's exclusive, and put the task resume checks inside, like so:

let task = do sched.sleeper_list.with |sleeper_list_ptr| {
    match sched.work_queue.pop() {
        Some(task) => task,
        None => {

And in enqueue_task:

do sched.sleeper_list.with |sleeper_list_ptr| {
    do sleeper_list_ptr.pop().map_consume |mut handle| {

Using a pthread condvar (like the sched_loops in oldsched used to) would also work, but would have to replace the event_loop model (which, aiui, needs run_sched_once to return if it wants to block).

My intent here has been for the scheduler going to sleep to push to the sleeper list, then check the message queue and work queue again before actually going to sleep. If I understand you right, that prevents the race between going to sleep and receiving messages. The sleeper list needs to be lock free since it's a global resource.

Hmm, because of the pipe-like nature of message-queues, I think that would solve the two-scheduler race. However, (a) that would be extra overhead all the time, so maybe there could be a "double-checking" optimistic path, and (b) I think that could still have problems with three schedulers -- if A and B are on the sleeper queue, but A actually has work to do and doesn't need to be on the sleeper queue, then if C creates work and sends a message to A, B could still stay asleep for too long (3 pieces of work and only 2 schedulers awake).

I think there might be a way around this by having there be an acknowledgement step, where A sends back a message saying whether or not C should try to keep waking up more sleeping schedulers... but I haven't thought it through enough to know if it'll work for sure.

Another approach might be to do a broadcast-based approach, where whenever new work is created, all sleeping schedulers wake up briefly (and have most of them go back to sleep). This would cost more cpu cycles, but only ones that would be idle otherwise, so the woken scheduler that wins the race would get to start doing real work sooner.


The SharedPort::try_recv implementation wants some heavy commenting. I like it, but I had to puzzle over it for a while to figure out all the cases.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment