Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

core: New task API

  • Loading branch information...
commit 4220dcf1e9de2c2d2c329ecefa80108b63a69145 1 parent fbc95ba
@brson brson authored
Showing with 959 additions and 765 deletions.
  1. +40 −44 doc/tutorial.md
  2. +0 −2  src/comp/driver/rustc.rs
  3. +2 −2 src/compiletest/procsrv.rs
  4. +5 −3 src/libcore/comm.rs
  5. +767 −416 src/libcore/task.rs
  6. +2 −8 src/libcore/vec.rs
  7. +6 −14 src/libstd/test.rs
  8. +9 −0 src/rt/rust_builtin.cpp
  9. +14 −8 src/test/bench/msgsend.rs
  10. +5 −3 src/test/bench/shootout-pfib.rs
  11. +1 −1  src/test/bench/shootout-threadring.rs
  12. +1 −1  src/test/bench/task-perf-spawnalot.rs
  13. +11 −9 src/test/bench/task-perf-word-count.rs
  14. +0 −11 src/test/run-pass/binops.rs
  15. +2 −6 src/test/run-pass/issue-507.rs
  16. +3 −2 src/test/run-pass/issue-783.rs
  17. +0 −15 src/test/run-pass/join.rs
  18. +0 −21 src/test/run-pass/linked-failure.rs
  19. +3 −2 src/test/run-pass/lots-a-fail.rs
  20. +1 −1  src/test/run-pass/morestack5.rs
  21. +1 −1  src/test/run-pass/morestack6.rs
  22. +3 −2 src/test/run-pass/send-iloop.rs
  23. +0 −9 src/test/run-pass/spawn-module-qualified.rs
  24. +1 −2  src/test/run-pass/spawn.rs
  25. +1 −7 src/test/run-pass/task-comm-1.rs
  26. +4 −2 src/test/run-pass/task-comm-12.rs
  27. +1 −2  src/test/run-pass/task-comm-13.rs
  28. +0 −31 src/test/run-pass/task-comm-2.rs
  29. +7 −5 src/test/run-pass/task-comm-3.rs
  30. +14 −22 src/test/run-pass/task-comm-7.rs
  31. +0 −51 src/test/run-pass/task-comm-8.rs
  32. +5 −3 src/test/run-pass/task-comm-9.rs
  33. +11 −9 src/test/run-pass/task-comm.rs
  34. +12 −14 src/test/run-pass/task-killjoin.rs
  35. +0 −18 src/test/run-pass/task-spawn-connected.rs
  36. +4 −4 src/test/run-pass/terminate-in-initializer.rs
  37. +3 −2 src/test/run-pass/too-much-recursion.rs
  38. +3 −2 src/test/run-pass/unwind-box.rs
  39. +3 −2 src/test/run-pass/unwind-resource.rs
  40. +3 −2 src/test/run-pass/unwind-resource2.rs
  41. +3 −2 src/test/run-pass/unwind-unique.rs
  42. +4 −2 src/test/run-pass/yield.rs
  43. +4 −2 src/test/run-pass/yield1.rs
View
84 doc/tutorial.md
@@ -2375,10 +2375,10 @@ module `task`. Let's begin with the simplest one, `task::spawn()`:
~~~~
let some_value = 22;
-let child_task = task::spawn {||
+task::spawn {||
std::io::println("This executes in the child task.");
std::io::println(#fmt("%d", some_value));
-};
+}
~~~~
The argument to `task::spawn()` is a [unique
@@ -2456,70 +2456,66 @@ let result = comm::recv(port);
## Creating a task with a bi-directional communication path
A very common thing to do is to spawn a child task where the parent
-and child both need to exchange messages with each other. The function
-`task::spawn_connected()` supports this pattern. We'll look briefly at
-how it is used.
+and child both need to exchange messages with each
+other. The function `task::spawn_listener()` supports this pattern. We'll look
+briefly at how it is used.
-To see how `spawn_connected()` works, we will create a child task
+To see how `spawn_listener()` works, we will create a child task
which receives `uint` messages, converts them to a string, and sends
the string in response. The child terminates when `0` is received.
Here is the function which implements the child task:
~~~~
-fn stringifier(from_par: comm::port<uint>,
- to_par: comm::chan<str>) {
+fn stringifier(from_parent: comm::port<uint>,
+ to_parent: comm::chan<str>) {
let value: uint;
do {
- value = comm::recv(from_par);
- comm::send(to_par, uint::to_str(value, 10u));
+ value = comm::recv(from_parent);
+ comm::send(to_parent, uint::to_str(value, 10u));
} while value != 0u;
}
~~~~
+
You can see that the function takes two parameters. The first is a
port used to receive messages from the parent, and the second is a
channel used to send messages to the parent. The body itself simply
loops, reading from the `from_par` port and then sending its response
to the `to_par` channel. The actual response itself is simply the
strified version of the received value, `uint::to_str(value)`.
-
+
Here is the code for the parent task:
+
~~~~
-# fn stringifier(from_par: comm::port<uint>,
-# to_par: comm::chan<str>) {
-# comm::send(to_par, "22");
-# comm::send(to_par, "23");
-# comm::send(to_par, "0");
+# fn stringifier(from_parent: comm::port<uint>,
+# to_parent: comm::chan<str>) {
+# comm::send(to_parent, "22");
+# comm::send(to_parent, "23");
+# comm::send(to_parent, "0");
# }
fn main() {
- let t = task::spawn_connected(stringifier);
- comm::send(t.to_child, 22u);
- assert comm::recv(t.from_child) == "22";
- comm::send(t.to_child, 23u);
- assert comm::recv(t.from_child) == "23";
- comm::send(t.to_child, 0u);
- assert comm::recv(t.from_child) == "0";
-}
-~~~~
-
-The call to `spawn_connected()` on the first line will instantiate the
-various ports and channels and startup the child task. The returned
-value, `t`, is a record of type `task::connected_task<uint,str>`. In
-addition to the task id of the child, this record defines two fields,
-`from_child` and `to_child`, which contain the port and channel
-respectively for communicating with the child. Those fields are used
-here to send and receive three messages from the child task.
-
-## Joining a task
-
-The function `spawn_joinable()` is used to spawn a task that can later
-be joined. This is implemented by having the child task send a message
-when it has completed (either successfully or by failing). Therefore,
-`spawn_joinable()` returns a structure containing both the task ID and
-the port where this message will be sent---this structure type is
-called `task::joinable_task`. The structure can be passed to
-`task::join()`, which simply blocks on the port, waiting to receive
-the message from the child task.
+ let from_child = comm::port();
+ let to_parent = comm::chan(from_child);
+ let to_child = task::spawn_listener {|from_parent|
+ stringifier(from_parent, to_parent);
+ };
+ comm::send(to_child, 22u);
+ assert comm::recv(from_child) == "22";
+ comm::send(to_child, 23u);
+ assert comm::recv(from_child) == "23";
+ comm::send(to_child, 0u);
+ assert comm::recv(from_child) == "0";
+}
+~~~~
+
+The parent first sets up a port to receive data from and a channel
+that the child can use to send data to that port. The call to
+`spawn_listener()` will spawn the child task, providing it with a port
+on which to receive data from its parent, and returning to the parent
+the associated channel. Finally, the closure passed to
+`spawn_listener()` that forms the body of the child task captures the
+`to_parent` channel in its environment, so both parent and child
+can send and receive data to and from the other.
## The supervisor relationship
View
2  src/comp/driver/rustc.rs
@@ -143,8 +143,6 @@ fn monitor(f: fn~(diagnostic::emitter)) {
alt task::try {||
- task::unsupervise();
-
// The 'diagnostics emitter'. Every error, warning, etc. should
// go through this function.
let demitter = fn@(cmsp: option<(codemap::codemap, codemap::span)>,
View
4 src/compiletest/procsrv.rs
@@ -54,11 +54,11 @@ fn run(lib_path: str, prog: str, args: [str],
writeclose(pipe_in.out, input);
let p = comm::port();
let ch = comm::chan(p);
- task::spawn_sched(1u) {||
+ task::spawn_sched(task::single_threaded) {||
let errput = readclose(pipe_err.in);
comm::send(ch, (2, errput));
};
- task::spawn_sched(1u) {||
+ task::spawn_sched(task::single_threaded) {||
let output = readclose(pipe_out.in);
comm::send(ch, (1, output));
};
View
8 src/libcore/comm.rs
@@ -35,8 +35,9 @@ enum rust_port {}
#[abi = "cdecl"]
native mod rustrt {
+ fn get_task_id() -> task_id;
fn chan_id_send<T: send>(t: *sys::type_desc,
- target_task: task::task, target_port: port_id,
+ target_task: task_id, target_port: port_id,
data: T) -> ctypes::uintptr_t;
fn new_port(unit_sz: ctypes::size_t) -> *rust_port;
@@ -58,6 +59,7 @@ native mod rusti {
fn call_with_retptr<T: send>(&&f: fn@(*uint)) -> T;
}
+type task_id = int;
type port_id = int;
// It's critical that this only have one variant, so it has a record
@@ -75,7 +77,7 @@ type port_id = int;
over other channels."
)]
enum chan<T: send> {
- chan_t(task::task, port_id)
+ chan_t(task_id, port_id)
}
resource port_ptr<T: send>(po: *rust_port) {
@@ -208,7 +210,7 @@ fn peek<T: send>(p: port<T>) -> bool {
port used to construct it."
)]
fn chan<T: send>(p: port<T>) -> chan<T> {
- chan_t(task::get_task(), rustrt::get_port_id(***p))
+ chan_t(rustrt::get_task_id(), rustrt::get_port_id(***p))
}
#[test]
View
1,183 src/libcore/task.rs
@@ -1,5 +1,4 @@
-/*
-Module: task
+#[doc = "
Task management.
@@ -13,564 +12,916 @@ true: when a parent task fails its children will continue executing. When
the root (main) task fails, all tasks fail, and then so does the entire
process.
-A task may remove itself from this failure propagation mechanism by
-calling the <unsupervise> function, after which failure will only
-result in the termination of that task.
-
Tasks may execute in parallel and are scheduled automatically by the runtime.
Example:
-> spawn {||
-> log(debug, "Hello, World!");
-> };
+ spawn {||
+ log(error, \"Hello, World!\");
+ }
-*/
-import cast = unsafe::reinterpret_cast;
-import comm;
-import ptr;
-import c = ctypes;
+"];
export task;
-export joinable_task;
-export yield;
-export task_notification;
-export join;
-export unsupervise;
export task_result;
-export tr_success;
-export tr_failure;
-export get_task;
+export notification;
+export sched_mode;
+export sched_opts;
+export task_opts;
+export task_builder::{};
+
+export default_task_opts;
+export mk_task_builder;
+export get_opts;
+export set_opts;
+export add_wrapper;
+export run;
+
+export future_result;
+export future_task;
+export unsupervise;
+export run_listener;
+
export spawn;
-export spawn_joinable;
-export spawn_connected;
+export spawn_listener;
export spawn_sched;
-export connected_fn;
-export connected_task;
-export currently_unwinding;
export try;
-#[abi = "rust-intrinsic"]
-native mod rusti {
- // these must run on the Rust stack so that they can swap stacks etc:
- fn task_yield(task: *rust_task, &killed: bool);
+export yield;
+export failing;
+export get_task;
+
+
+/* Data types */
+
+#[doc = "A handle to a task"]
+enum task = task_id;
+
+#[doc = "
+
+Indicates the manner in which a task exited.
+
+A task that completes without failing and whose supervised children complete
+without failing is considered to exit successfully.
+
+FIXME: This description does not indicate the current behavior for linked
+failure.
+
+"]
+enum task_result {
+ success,
+ failure,
}
-type rust_closure = {
- fnptr: c::intptr_t, envptr: c::intptr_t
+#[doc = "
+
+A message type for notifying of task lifecycle events
+
+"]
+enum notification {
+ #[doc = "Sent when a task exits with the task handle and result"]
+ exit(task, task_result)
+}
+
+#[doc = "Scheduler modes"]
+enum sched_mode {
+ #[doc = "All tasks run in the same OS thread"]
+ single_threaded,
+ #[doc = "Tasks are distributed among available CPUs"]
+ thread_per_core,
+ #[doc = "Each task runs in its own OS thread"]
+ thread_per_task,
+ #[doc = "Tasks are distributed among a fixed number of OS threads"]
+ manual_threads(uint),
+}
+
+#[doc = "
+
+Scheduler configuration options
+
+Fields:
+
+* sched_mode - The operating mode of the scheduler
+
+* native_stack_size - The size of the native stack, in bytes
+
+ Rust code runs on Rust-specific stacks. When Rust code calls native code
+ (via functions in native modules) it switches to a typical, large stack
+ appropriate for running code written in languages like C. By default these
+ native stacks have unspecified size, but with this option their size can
+ be precisely specified.
+
+"]
+type sched_opts = {
+ mode: sched_mode,
+ native_stack_size: option<uint>,
};
-#[link_name = "rustrt"]
-#[abi = "cdecl"]
-native mod rustrt {
- fn rust_get_sched_id() -> sched_id;
- fn rust_new_sched(num_threads: c::uintptr_t) -> sched_id;
+#[doc = "
- fn get_task_id() -> task_id;
- fn rust_get_task() -> *rust_task;
+Task configuration options
- fn new_task() -> task_id;
- fn rust_new_task_in_sched(id: sched_id) -> task_id;
+Fields:
- fn rust_task_config_notify(
- id: task_id, &&chan: comm::chan<task_notification>);
+* supervise - Do not propagate failure to the parent task
- fn start_task(id: task, closure: *rust_closure);
+ All tasks are linked together via a tree, from parents to children. By
+ default children are 'supervised' by their parent and when they fail
+ so too will their parents. Settings this flag to false disables that
+ behavior.
- fn rust_task_is_unwinding(rt: *rust_task) -> bool;
- fn unsupervise();
-}
+* notify_chan - Enable lifecycle notifications on the given channel
-/* Section: Types */
+* sched - Specify the configuration of a new scheduler to create the task in
-type rust_task = *ctypes::void;
+ By default, every task is created in the same scheduler as its
+ parent, where it is scheduled cooperatively with all other tasks
+ in that scheduler. Some specialized applications may want more
+ control over their scheduling, in which case they can be spawned
+ into a new scheduler with the specific properties required.
-type sched_id = int;
-type task_id = int;
+ This is of particular importance for libraries which want to call
+ into native code that blocks. Without doing so in a different
+ scheduler other tasks will be impeded or even blocked indefinitely.
-/*
-Type: task
+"]
+type task_opts = {
+ supervise: bool,
+ notify_chan: option<comm::chan<notification>>,
+ sched: option<sched_opts>,
+};
+
+#[doc = "
-A handle to a task
-*/
-type task = task_id;
+The task builder type.
-/*
-Function: spawn
+Provides detailed control over the properties and behavior of new tasks.
-Creates and executes a new child task
+"]
+// NB: Builders are designed to be single-use because they do stateful
+// things that get weird when reusing - e.g. if you create a result future
+// it only applies to a single task, so then you have to maintain some
+// potentially tricky state to ensure that everything behaves correctly
+// when you try to reuse the builder to spawn a new task. We'll just
+// sidestep that whole issue by making builder's uncopyable and making
+// the run function move them in.
+enum task_builder = {
+ mutable opts: task_opts,
+ mutable gen_body: fn@(+fn~()) -> fn~(),
+ can_not_copy: option<comm::port<()>>
+};
-Sets up a new task with its own call stack and schedules it to be
-executed. Upon execution, the closure `f()` will be invoked.
-Parameters:
+/* Task construction */
-f - A function to execute in the new task
+fn default_task_opts() -> task_opts {
+ #[doc = "
-Returns:
+ The default task options
-A handle to the new task
-*/
-fn spawn(+f: fn~()) -> task {
- spawn_inner(f, none, new_task_in_this_sched)
-}
+ By default all tasks are supervised by their parent, are spawned
+ into the same scheduler, and do not post lifecycle notifications.
-fn spawn_inner(
- -f: fn~(),
- notify: option<comm::chan<task_notification>>,
- new_task: fn() -> task_id
-) -> task unsafe {
- let closure: *rust_closure = unsafe::reinterpret_cast(ptr::addr_of(f));
- #debug("spawn: closure={%x,%x}", (*closure).fnptr, (*closure).envptr);
- let id = new_task();
+ "];
- // set up notifications if they are enabled.
- option::may(notify) {|c|
- rustrt::rust_task_config_notify(id, c);
+ {
+ supervise: true,
+ notify_chan: none,
+ sched: none
}
+}
- rustrt::start_task(id, closure);
- unsafe::leak(f);
- ret id;
+fn mk_task_builder() -> task_builder {
+ #[doc = "Construct a task_builder"];
+
+ let body_identity = fn@(+body: fn~()) -> fn~() { body };
+
+ task_builder({
+ mutable opts: default_task_opts(),
+ mutable gen_body: body_identity,
+ can_not_copy: none
+ })
+}
+
+fn get_opts(builder: task_builder) -> task_opts {
+ #[doc = "Get the task_opts associated with a task_builder"];
+
+ builder.opts
+}
+
+fn set_opts(builder: task_builder, opts: task_opts) {
+ #[doc = "
+
+ Set the task_opts associated with a task_builder
+
+ To update a single option use a pattern like the following:
+
+ set_opts(builder, {
+ supervise: false
+ with get_opts(builder)
+ });
+
+ "];
+
+ builder.opts = opts;
}
-fn new_task_in_this_sched() -> task_id {
- rustrt::new_task()
+fn add_wrapper(builder: task_builder, gen_body: fn@(+fn~()) -> fn~()) {
+ #[doc = "
+
+ Add a wrapper to the body of the spawned task.
+
+ Before the task is spawned it is passed through a 'body generator'
+ function that may perform local setup operations as well as wrap
+ the task body in remote setup operations. With this the behavior
+ of tasks can be extended in simple ways.
+
+ This function augments the current body generator with a new body
+ generator by applying the task body which results from the
+ existing body generator to the new body generator.
+
+ "];
+
+ let prev_gen_body = builder.gen_body;
+ builder.gen_body = fn@(+body: fn~()) -> fn~() {
+ gen_body(prev_gen_body(body))
+ };
}
-fn new_task_in_new_sched(num_threads: uint) -> task_id {
- let sched_id = rustrt::rust_new_sched(num_threads);
- rustrt::rust_new_task_in_sched(sched_id)
+fn run(-builder: task_builder, +f: fn~()) {
+ #[doc(desc = "
+
+ Creates and exucutes a new child task
+
+ Sets up a new task with its own call stack and schedules it to run
+ the provided unique closure. The task has the properties and behavior
+ specified by `builder`.
+
+ ", failure = "
+
+ When spawning into a new scheduler, the number of threads requested
+ must be greater than zero.
+
+ ")];
+
+ let body = builder.gen_body(f);
+ spawn_raw(builder.opts, body);
}
-/*
-Function: spawn_sched
-Creates a new scheduler and executes a task on it. Tasks subsequently
-spawned by that task will also execute on the new scheduler. When
-there are no more tasks to execute the scheduler terminates.
+/* Builder convenience functions */
+
+fn future_result(builder: task_builder) -> future::future<task_result> {
+ #[doc = "
+
+ Get a future representing the exit status of the task.
+
+ Taking the value of the future will block until the child task terminates.
-Arguments:
+ Note that the future returning by this function is only useful for
+ obtaining the value of the next task to be spawning with the
+ builder. If additional tasks are spawned with the same builder
+ then a new result future must be obtained prior to spawning each
+ task.
-num_threads - The number of OS threads to dedicate schedule tasks on
-f - A unique closure to execute as a task on the new scheduler
+ "];
-Failure:
+ // FIXME (1087, 1857): Once linked failure and notification are
+ // handled in the library, I can imagine implementing this by just
+ // registering an arbitrary number of task::on_exit handlers and
+ // sending out messages.
-The number of threads must be greater than 0
+ let po = comm::port();
+ let ch = comm::chan(po);
-*/
-fn spawn_sched(num_threads: uint, +f: fn~()) -> task {
- if num_threads < 1u {
- fail "Can not create a scheduler with no threads";
+ set_opts(builder, {
+ notify_chan: some(ch)
+ with get_opts(builder)
+ });
+
+ future::from_fn {||
+ alt comm::recv(po) {
+ exit(_, result) { result }
+ }
}
- spawn_inner(f, none, bind new_task_in_new_sched(num_threads))
-}
-
-/*
-Type: joinable_task
-
-A task that sends notification upon termination
-*/
-type joinable_task = (task, comm::port<task_notification>);
-
-fn spawn_joinable(+f: fn~()) -> joinable_task {
- let notify_port = comm::port();
- let notify_chan = comm::chan(notify_port);
- let task = spawn_inner(f, some(notify_chan), new_task_in_this_sched);
- ret (task, notify_port);
- /*
- resource notify_rsrc(data: (comm::chan<task_notification>,
- task,
- @mutable task_result)) {
- let (chan, task, tr) = data;
- let msg = exit(task, *tr);
- comm::send(chan, msg);
+}
+
+fn future_task(builder: task_builder) -> future::future<task> {
+ #[doc = "Get a future representing the handle to the new task"];
+
+ let po = comm::port();
+ let ch = comm::chan(po);
+ add_wrapper(builder) {|body|
+ fn~[move body]() {
+ comm::send(ch, get_task());
+ body();
+ }
}
+ future::from_port(po)
+}
- let notify_port = comm::port();
- let notify_chan = comm::chan(notify_port);
- let g = fn~[copy notify_chan; move f]() {
- let this_task = rustrt::get_task_id();
- let result = @mutable tr_failure;
- let _rsrc = notify_rsrc((notify_chan, this_task, result));
- f();
- *result = tr_success; // rsrc will fire msg when fn returns
- };
- let task = spawn(g);
- ret (task, notify_port);
- */
+fn unsupervise(builder: task_builder) {
+ #[doc = "Configures the new task to not propagate failure to its parent"];
+
+ set_opts(builder, {
+ supervise: false
+ with get_opts(builder)
+ });
}
-/*
-Tag: task_result
+fn run_listener<A:send>(-builder: task_builder,
+ +f: fn~(comm::port<A>)) -> comm::chan<A> {
+ #[doc = "
-Indicates the manner in which a task exited
-*/
-enum task_result {
- /* Variant: tr_success */
- tr_success,
- /* Variant: tr_failure */
- tr_failure,
+ Runs a new task while providing a channel from the parent to the child
+
+ Sets up a communication channel from the current task to the new
+ child task, passes the port to child's body, and returns a channel
+ linked to the port to the parent.
+
+ This encapsulates some boilerplate handshaking logic that would
+ otherwise be required to establish communication from the parent
+ to the child.
+ "];
+
+ let setup_po = comm::port();
+ let setup_ch = comm::chan(setup_po);
+
+ run(builder, fn~[move f]() {
+ let po = comm::port();
+ let ch = comm::chan(po);
+ comm::send(setup_ch, ch);
+ f(po);
+ });
+
+ comm::recv(setup_po)
}
-/*
-Tag: task_notification
-Message sent upon task exit to indicate normal or abnormal termination
-*/
-enum task_notification {
- /* Variant: exit */
- exit(task, task_result),
+/* Spawn convenience functions */
+
+fn spawn(+f: fn~()) {
+ #[doc = "
+
+ Creates and exucutes a new child task
+
+ Sets up a new task with its own call stack and schedules it to run
+ the provided unique closure.
+
+ This function is equivalent to `run(mk_task_builder(), f)`.
+ "];
+
+ run(mk_task_builder(), f);
}
-/*
-Type: connected_fn
+fn spawn_listener<A:send>(+f: fn~(comm::port<A>)) -> comm::chan<A> {
+ #[doc = "
-The prototype for a connected child task function. Such a function will be
-supplied with a channel to send messages to the parent and a port to receive
-messages from the parent. The type parameter `ToCh` is the type for messages
-sent from the parent to the child and `FrCh` is the type for messages sent
-from the child to the parent. */
-type connected_fn<ToCh, FrCh> = fn~(comm::port<ToCh>, comm::chan<FrCh>);
+ Runs a new task while providing a channel from the parent to the child
-/*
-Type: connected_fn
+ Sets up a communication channel from the current task to the new
+ child task, passes the port to child's body, and returns a channel
+ linked to the port to the parent.
-The result type of <spawn_connected>
-*/
-type connected_task<ToCh, FrCh> = {
- from_child: comm::port<FrCh>,
- to_child: comm::chan<ToCh>,
- task: task
-};
+ This encapsulates some boilerplate handshaking logic that would
+ otherwise be required to establish communication from the parent
+ to the child.
-/*
-Function: spawn_connected
+ The simplest way to establish bidirectional communication between
+ a parent in child is as follows:
-Spawns a child task along with a port/channel for exchanging messages
-with the parent task. The type `ToCh` represents messages sent to the child
-and `FrCh` messages received from the child.
+ let po = comm::port();
+ let ch = comm::chan(po);
+ let ch = spawn_listener {|po|
+ // Now the child has a port called 'po' to read from and
+ // an environment-captured channel called 'ch'.
+ };
+ // Likewise, the parent has both a 'po' and 'ch'
+
+ This function is equivalent to `run_listener(mk_task_builder(), f)`.
+
+ "];
+
+ run_listener(mk_task_builder(), f)
+}
-Parameters:
+fn spawn_sched(mode: sched_mode, +f: fn~()) {
+ #[doc(desc = "
-f - the child function to execute
+ Creates a new scheduler and executes a task on it
-Returns:
+ Tasks subsequently spawned by that task will also execute on
+ the new scheduler. When there are no more tasks to execute the
+ scheduler terminates.
-The new child task along with the port to receive messages and the channel
-to send messages.
-*/
-fn spawn_connected<ToCh:send, FrCh:send>(+f: connected_fn<ToCh, FrCh>)
- -> connected_task<ToCh,FrCh> {
- let from_child_port = comm::port::<FrCh>();
- let from_child_chan = comm::chan(from_child_port);
- let get_to_child_port = comm::port::<comm::chan<ToCh>>();
- let get_to_child_chan = comm::chan(get_to_child_port);
- let child_task = spawn(fn~[move f]() {
- let to_child_port = comm::port::<ToCh>();
- comm::send(get_to_child_chan, comm::chan(to_child_port));
- f(to_child_port, from_child_chan);
+ ", failure = "
+
+ In manual threads mode the number of threads requested must be
+ greater than zero.
+
+ ")];
+
+ let builder = mk_task_builder();
+ set_opts(builder, {
+ sched: some({
+ mode: mode,
+ native_stack_size: none
+ })
+ with get_opts(builder)
});
- let to_child_chan = comm::recv(get_to_child_port);
- ret {from_child: from_child_port,
- to_child: to_child_chan,
- task: child_task};
+ run(builder, f);
}
-/* Section: Operations */
+fn try<T:send>(+f: fn~() -> T) -> result::t<T,()> {
+ #[doc(desc = "
-/*
-Type: get_task
+ Execute a function in another task and return either the return value
+ of the function or result::err.
-Retreives a handle to the currently executing task
-*/
-fn get_task() -> task { rustrt::get_task_id() }
+ ", return = "
-/*
-Function: yield
+ If the function executed successfully then try returns result::ok
+ containing the value returned by the function. If the function fails
+ then try returns result::err containing nil.
-Yield control to the task scheduler
+ ")];
+
+ let po = comm::port();
+ let ch = comm::chan(po);
+ let builder = mk_task_builder();
+ unsupervise(builder);
+ let result = future_result(builder);
+ run(builder, fn~[move f]() {
+ comm::send(ch, f());
+ });
+ alt future::get(result) {
+ success { result::ok(comm::recv(po)) }
+ failure { result::err(()) }
+ }
+}
+
+
+/* Lifecycle functions */
-The scheduler may schedule another task to execute.
-*/
fn yield() {
- let task = rustrt::rust_get_task();
+ #[doc = "Yield control to the task scheduler"];
+
+ let task_ = rustrt::rust_get_task();
let killed = false;
- rusti::task_yield(task, killed);
- if killed && !currently_unwinding() {
+ rusti::task_yield(task_, killed);
+ if killed && !failing() {
fail "killed";
}
}
-/*
-Function: join
+fn failing() -> bool {
+ #[doc = "True if the running task has failed"];
-Wait for a child task to exit
+ rustrt::rust_task_is_unwinding(rustrt::rust_get_task())
+}
-The child task must have been spawned with <spawn_joinable>, which
-produces a notification port that the child uses to communicate its
-exit status.
+fn get_task() -> task {
+ #[doc = "Get a handle to the running task"];
-Returns:
+ task(rustrt::get_task_id())
+}
-A task_result indicating whether the task terminated normally or failed
-*/
-fn join(task_port: joinable_task) -> task_result {
- let (id, port) = task_port;
- alt comm::recv::<task_notification>(port) {
- exit(_id, res) {
- if _id == id {
- ret res
- } else {
- fail #fmt["join received id %d, expected %d", _id, id]
+
+/* Internal */
+
+type sched_id = int;
+type task_id = int;
+
+// These are both opaque runtime/compiler types that we don't know the
+// structure of and should only deal with via unsafe pointer
+type rust_task = ctypes::void;
+type rust_closure = ctypes::void;
+
+fn spawn_raw(opts: task_opts, +f: fn~()) unsafe {
+
+ let f = if opts.supervise {
+ f
+ } else {
+ // FIXME: The runtime supervision API is weird here because it
+ // was designed to let the child unsupervise itself, when what
+ // we actually want is for parents to unsupervise new
+ // children.
+ fn~[move f]() {
+ rustrt::unsupervise();
+ f();
}
+ };
+
+ let fptr = ptr::addr_of(f);
+ let closure: *rust_closure = unsafe::reinterpret_cast(fptr);
+
+ let task_id = alt opts.sched {
+ none {
+ rustrt::new_task()
+ }
+ some(sched_opts) {
+ new_task_in_new_sched(sched_opts)
}
+ };
+
+ option::may(opts.notify_chan) {|c|
+ // FIXME (1087): Would like to do notification in Rust
+ rustrt::rust_task_config_notify(task_id, c);
}
-}
-/*
-Function: unsupervise
+ rustrt::start_task(task_id, closure);
+ unsafe::leak(f);
-Detaches this task from its parent in the task tree
+ fn new_task_in_new_sched(opts: sched_opts) -> task_id {
+ if opts.native_stack_size != none {
+ fail "native_stack_size scheduler option unimplemented";
+ }
-An unsupervised task will not propagate its failure up the task tree
-*/
-fn unsupervise() {
- rustrt::unsupervise();
-}
+ let num_threads = alt opts.mode {
+ single_threaded { 1u }
+ thread_per_core {
+ fail "thread_per_core scheduling mode unimplemented"
+ }
+ thread_per_task {
+ fail "thread_per_task scheduling mode unimplemented"
+ }
+ manual_threads(threads) {
+ if threads == 0u {
+ fail "can not create a scheduler with no threads";
+ }
+ threads
+ }
+ };
-/*
-Function: currently_unwinding()
+ let sched_id = rustrt::rust_new_sched(num_threads);
+ rustrt::rust_new_task_in_sched(sched_id)
+ }
-True if we are currently unwinding after a failure.
-*/
-fn currently_unwinding() -> bool {
- rustrt::rust_task_is_unwinding(rustrt::rust_get_task())
}
-/*
-Function: try
+#[abi = "rust-intrinsic"]
+native mod rusti {
+ fn task_yield(task: *rust_task, &killed: bool);
+}
-Execute a function in another task and return either the return value
-of the function or result::err.
+native mod rustrt {
+ fn rust_get_sched_id() -> sched_id;
+ fn rust_new_sched(num_threads: ctypes::uintptr_t) -> sched_id;
-Returns:
+ fn get_task_id() -> task_id;
+ fn rust_get_task() -> *rust_task;
-If the function executed successfully then try returns result::ok
-containing the value returned by the function. If the function fails
-then try returns result::err containing nil.
-*/
-fn try<T:send>(+f: fn~() -> T) -> result::t<T,()> {
- let p = comm::port();
- let ch = comm::chan(p);
- alt join(spawn_joinable {||
- unsupervise();
- comm::send(ch, f());
- }) {
- tr_success { result::ok(comm::recv(p)) }
- tr_failure { result::err(()) }
+ fn new_task() -> task_id;
+ fn rust_new_task_in_sched(id: sched_id) -> task_id;
+
+ fn rust_task_config_notify(
+ id: task_id, &&chan: comm::chan<notification>);
+
+ fn start_task(id: task_id, closure: *rust_closure);
+
+ fn rust_task_is_unwinding(rt: *rust_task) -> bool;
+ fn unsupervise();
+}
+
+
+#[test]
+fn test_spawn_raw_simple() {
+ let po = comm::port();
+ let ch = comm::chan(po);
+ spawn_raw(default_task_opts()) {||
+ comm::send(ch, ());
}
+ comm::recv(po);
}
-#[cfg(test)]
-mod tests {
- // FIXME: Leaks on windows
- #[test]
- #[ignore(cfg(target_os = "win32"))]
- fn test_unsupervise() {
- fn f() { unsupervise(); fail; }
- spawn {|| f();};
+#[test]
+#[ignore(cfg(target_os = "win32"))]
+fn test_spawn_raw_unsupervise() {
+ let opts = {
+ supervise: false
+ with default_task_opts()
+ };
+ spawn_raw(opts) {||
+ fail;
}
+}
- #[test]
- fn test_lib_spawn() {
- fn foo() { #error("Hello, World!"); }
- spawn {|| foo();};
+#[test]
+#[ignore(cfg(target_os = "win32"))]
+fn test_spawn_raw_notify() {
+ let task_po = comm::port();
+ let task_ch = comm::chan(task_po);
+ let notify_po = comm::port();
+ let notify_ch = comm::chan(notify_po);
+
+ let opts = {
+ notify_chan: some(notify_ch)
+ with default_task_opts()
+ };
+ spawn_raw(opts) {||
+ comm::send(task_ch, get_task());
}
+ let task_ = comm::recv(task_po);
+ assert comm::recv(notify_po) == exit(task_, success);
- #[test]
- fn test_lib_spawn2() {
- fn foo(x: int) { assert (x == 42); }
- spawn {|| foo(42);};
+ let opts = {
+ supervise: false,
+ notify_chan: some(notify_ch)
+ with default_task_opts()
+ };
+ spawn_raw(opts) {||
+ comm::send(task_ch, get_task());
+ fail;
}
+ let task_ = comm::recv(task_po);
+ assert comm::recv(notify_po) == exit(task_, failure);
+}
- #[test]
- fn test_join_chan() {
- fn winner() { }
+#[test]
+fn test_run_basic() {
+ let po = comm::port();
+ let ch = comm::chan(po);
+ let builder = mk_task_builder();
+ run(builder) {||
+ comm::send(ch, ());
+ }
+ comm::recv(po);
+}
- let t = spawn_joinable {|| winner();};
- alt join(t) {
- tr_success {/* yay! */ }
- _ { fail "invalid task status received" }
+#[test]
+fn test_add_wrapper() {
+ let po = comm::port();
+ let ch = comm::chan(po);
+ let builder = mk_task_builder();
+ add_wrapper(builder) {|body|
+ fn~() {
+ body();
+ comm::send(ch, ());
}
}
+ run(builder) {||}
+ comm::recv(po);
+}
- // FIXME: Leaks on windows
- #[test]
- #[ignore(cfg(target_os = "win32"))]
- fn test_join_chan_fail() {
- fn failer() { unsupervise(); fail }
+#[test]
+#[ignore(cfg(target_os = "win32"))]
+fn test_future_result() {
+ let builder = mk_task_builder();
+ let result = future_result(builder);
+ run(builder) {||}
+ assert future::get(result) == success;
+
+ let builder = mk_task_builder();
+ let result = future_result(builder);
+ unsupervise(builder);
+ run(builder) {|| fail }
+ assert future::get(result) == failure;
+}
- let t = spawn_joinable {|| failer();};
- alt join(t) {
- tr_failure {/* yay! */ }
- _ { fail "invalid task status received" }
- }
- }
+#[test]
+fn test_future_task() {
+ let po = comm::port();
+ let ch = comm::chan(po);
+ let builder = mk_task_builder();
+ let task1 = future_task(builder);
+ run(builder) {|| comm::send(ch, get_task()) }
+ assert future::get(task1) == comm::recv(po);
+}
- #[test]
- fn spawn_polymorphic() {
- fn foo<T:send>(x: T) { log(error, x); }
- spawn {|| foo(true);};
- spawn {|| foo(42);};
- }
+#[test]
+fn test_spawn_listiner_bidi() {
+ let po = comm::port();
+ let ch = comm::chan(po);
+ let ch = spawn_listener {|po|
+ // Now the child has a port called 'po' to read from and
+ // an environment-captured channel called 'ch'.
+ let res = comm::recv(po);
+ assert res == "ping";
+ comm::send(ch, "pong");
+ };
+ // Likewise, the parent has both a 'po' and 'ch'
+ comm::send(ch, "ping");
+ let res = comm::recv(po);
+ assert res == "pong";
+}
- #[test]
- fn try_success() {
- alt try {||
- "Success!"
- } {
- result::ok("Success!") { }
- _ { fail; }
- }
+#[test]
+fn test_try_success() {
+ alt try {||
+ "Success!"
+ } {
+ result::ok("Success!") { }
+ _ { fail; }
}
+}
- #[test]
- #[ignore(cfg(target_os = "win32"))]
- fn try_fail() {
- alt try {||
- fail
- } {
- result::err(()) { }
- _ { fail; }
- }
+#[test]
+#[ignore(cfg(target_os = "win32"))]
+fn test_try_fail() {
+ alt try {||
+ fail
+ } {
+ result::err(()) { }
+ _ { fail; }
}
+}
- #[test]
- #[should_fail]
- #[ignore(cfg(target_os = "win32"))]
- fn spawn_sched_no_threads() {
- spawn_sched(0u) {|| };
- }
+#[test]
+#[should_fail]
+#[ignore(cfg(target_os = "win32"))]
+fn test_spawn_sched_no_threads() {
+ spawn_sched(manual_threads(0u)) {|| };
+}
- #[test]
- fn spawn_sched_1() {
- let po = comm::port();
- let ch = comm::chan(po);
+#[test]
+fn test_spawn_sched() {
+ let po = comm::port();
+ let ch = comm::chan(po);
- fn f(i: int, ch: comm::chan<()>) {
- let parent_sched_id = rustrt::rust_get_sched_id();
+ fn f(i: int, ch: comm::chan<()>) {
+ let parent_sched_id = rustrt::rust_get_sched_id();
- spawn_sched(1u) {||
- let child_sched_id = rustrt::rust_get_sched_id();
- assert parent_sched_id != child_sched_id;
+ spawn_sched(single_threaded) {||
+ let child_sched_id = rustrt::rust_get_sched_id();
+ assert parent_sched_id != child_sched_id;
- if (i == 0) {
- comm::send(ch, ());
- } else {
- f(i - 1, ch);
- }
- };
+ if (i == 0) {
+ comm::send(ch, ());
+ } else {
+ f(i - 1, ch);
+ }
+ };
- }
- f(10, ch);
- comm::recv(po);
}
+ f(10, ch);
+ comm::recv(po);
+}
- #[test]
- fn spawn_sched_childs_on_same_sched() {
- let po = comm::port();
- let ch = comm::chan(po);
-
- spawn_sched(1u) {||
- let parent_sched_id = rustrt::rust_get_sched_id();
- spawn {||
- let child_sched_id = rustrt::rust_get_sched_id();
- // This should be on the same scheduler
- assert parent_sched_id == child_sched_id;
- comm::send(ch, ());
- };
+#[test]
+fn test_spawn_sched_childs_on_same_sched() {
+ let po = comm::port();
+ let ch = comm::chan(po);
+
+ spawn_sched(single_threaded) {||
+ let parent_sched_id = rustrt::rust_get_sched_id();
+ spawn {||
+ let child_sched_id = rustrt::rust_get_sched_id();
+ // This should be on the same scheduler
+ assert parent_sched_id == child_sched_id;
+ comm::send(ch, ());
};
+ };
- comm::recv(po);
- }
+ comm::recv(po);
+}
- #[nolink]
- native mod rt {
- fn rust_dbg_lock_create() -> *ctypes::void;
- fn rust_dbg_lock_destroy(lock: *ctypes::void);
- fn rust_dbg_lock_lock(lock: *ctypes::void);
- fn rust_dbg_lock_unlock(lock: *ctypes::void);
- fn rust_dbg_lock_wait(lock: *ctypes::void);
- fn rust_dbg_lock_signal(lock: *ctypes::void);
- }
+#[nolink]
+#[cfg(test)]
+native mod testrt {
+ fn rust_dbg_lock_create() -> *ctypes::void;
+ fn rust_dbg_lock_destroy(lock: *ctypes::void);
+ fn rust_dbg_lock_lock(lock: *ctypes::void);
+ fn rust_dbg_lock_unlock(lock: *ctypes::void);
+ fn rust_dbg_lock_wait(lock: *ctypes::void);
+ fn rust_dbg_lock_signal(lock: *ctypes::void);
+}
- #[test]
- fn spawn_sched_blocking() {
+#[test]
+fn test_spawn_sched_blocking() {
- // Testing that a task in one scheduler can block natively
- // without affecting other schedulers
- iter::repeat(20u) {||
+ // Testing that a task in one scheduler can block natively
+ // without affecting other schedulers
+ iter::repeat(20u) {||
- let start_po = comm::port();
- let start_ch = comm::chan(start_po);
- let fin_po = comm::port();
- let fin_ch = comm::chan(fin_po);
+ let start_po = comm::port();
+ let start_ch = comm::chan(start_po);
+ let fin_po = comm::port();
+ let fin_ch = comm::chan(fin_po);
- let lock = rt::rust_dbg_lock_create();
+ let lock = testrt::rust_dbg_lock_create();
- spawn_sched(1u) {||
- rt::rust_dbg_lock_lock(lock);
+ spawn_sched(single_threaded) {||
+ testrt::rust_dbg_lock_lock(lock);
- comm::send(start_ch, ());
+ comm::send(start_ch, ());
- // Block the scheduler thread
- rt::rust_dbg_lock_wait(lock);
- rt::rust_dbg_lock_unlock(lock);
+ // Block the scheduler thread
+ testrt::rust_dbg_lock_wait(lock);
+ testrt::rust_dbg_lock_unlock(lock);
- comm::send(fin_ch, ());
- };
+ comm::send(fin_ch, ());
+ };
- // Wait until the other task has its lock
- comm::recv(start_po);
+ // Wait until the other task has its lock
+ comm::recv(start_po);
- fn pingpong(po: comm::port<int>, ch: comm::chan<int>) {
- let val = 20;
- while val > 0 {
- val = comm::recv(po);
- comm::send(ch, val - 1);
- }
+ fn pingpong(po: comm::port<int>, ch: comm::chan<int>) {
+ let val = 20;
+ while val > 0 {
+ val = comm::recv(po);
+ comm::send(ch, val - 1);
}
-
- let setup_po = comm::port();
- let setup_ch = comm::chan(setup_po);
- let parent_po = comm::port();
- let parent_ch = comm::chan(parent_po);
- spawn {||
- let child_po = comm::port();
- comm::send(setup_ch, comm::chan(child_po));
- pingpong(child_po, parent_ch);
- };
-
- let child_ch = comm::recv(setup_po);
- comm::send(child_ch, 20);
- pingpong(parent_po, child_ch);
- rt::rust_dbg_lock_lock(lock);
- rt::rust_dbg_lock_signal(lock);
- rt::rust_dbg_lock_unlock(lock);
- comm::recv(fin_po);
- rt::rust_dbg_lock_destroy(lock);
}
+
+ let setup_po = comm::port();
+ let setup_ch = comm::chan(setup_po);
+ let parent_po = comm::port();
+ let parent_ch = comm::chan(parent_po);
+ spawn {||
+ let child_po = comm::port();
+ comm::send(setup_ch, comm::chan(child_po));
+ pingpong(child_po, parent_ch);
+ };
+
+ let child_ch = comm::recv(setup_po);
+ comm::send(child_ch, 20);
+ pingpong(parent_po, child_ch);
+ testrt::rust_dbg_lock_lock(lock);
+ testrt::rust_dbg_lock_signal(lock);
+ testrt::rust_dbg_lock_unlock(lock);
+ comm::recv(fin_po);
+ testrt::rust_dbg_lock_destroy(lock);
}
+}
+#[cfg(test)]
+fn avoid_copying_the_body(spawnfn: fn(+fn~())) {
+ let p = comm::port::<uint>();
+ let ch = comm::chan(p);
+
+ let x = ~1;
+ let x_in_parent = ptr::addr_of(*x) as uint;
+
+ spawnfn(fn~[move x]() {
+ let x_in_child = ptr::addr_of(*x) as uint;
+ comm::send(ch, x_in_child);
+ });
+
+ let x_in_child = comm::recv(p);
+ assert x_in_parent == x_in_child;
}
+#[test]
+fn test_avoid_copying_the_body_spawn() {
+ avoid_copying_the_body(spawn);
+}
-// Local Variables:
-// mode: rust;
-// fill-column: 78;
-// indent-tabs-mode: nil
-// c-basic-offset: 4
-// buffer-file-coding-system: utf-8-unix
-// End:
+#[test]
+fn test_avoid_copying_the_body_spawn_listener() {
+ avoid_copying_the_body {|f|
+ spawn_listener(fn~[move f](_po: comm::port<int>) {
+ f();
+ });
+ }
+}
+
+#[test]
+fn test_avoid_copying_the_body_run() {
+ avoid_copying_the_body {|f|
+ let builder = mk_task_builder();
+ run(builder, fn~[move f]() {
+ f();
+ });
+ }
+}
+
+#[test]
+fn test_avoid_copying_the_body_run_listener() {
+ avoid_copying_the_body {|f|
+ let builder = mk_task_builder();
+ run_listener(builder,fn~[move f](_po: comm::port<int>) {
+ f();
+ });
+ }
+}
+
+#[test]
+fn test_avoid_copying_the_body_try() {
+ avoid_copying_the_body {|f|
+ try(fn~[move f]() {
+ f();
+ });
+ }
+}
+
+#[test]
+fn test_avoid_copying_the_body_future_task() {
+ avoid_copying_the_body {|f|
+ let builder = mk_task_builder();
+ future_task(builder);
+ run(builder, fn~[move f]() {
+ f();
+ });
+ }
+}
+
+#[test]
+fn test_avoid_copying_the_body_unsupervise() {
+ avoid_copying_the_body {|f|
+ let builder = mk_task_builder();
+ unsupervise(builder);
+ run(builder, fn~[move f]() {
+ f();
+ });
+ }
+}
View
10 src/libcore/vec.rs
@@ -1929,16 +1929,10 @@ mod tests {
}
#[test]
- // FIXME: Windows can't undwind
+ #[should_fail]
#[ignore(cfg(target_os = "win32"))]
fn test_init_empty() {
-
- let r = task::join(
- task::spawn_joinable {||
- task::unsupervise();
- init::<int>([]);
- });
- assert r == task::tr_failure
+ init::<int>([]);
}
#[test]
View
20 src/libstd/test.rs
@@ -316,13 +316,12 @@ fn run_test(+test: test_desc, monitor_ch: comm::chan<monitor_msg>) {
task::spawn {||
let testfn = test.fn;
- let test_task = task::spawn_joinable {||
- configure_test_task();
- testfn();
- };
-
- let task_result = task::join(test_task);
- let test_result = calc_result(test, task_result == task::tr_success);
+ let builder = task::mk_task_builder();
+ let result_future = task::future_result(builder);
+ task::unsupervise(builder);
+ task::run(builder, testfn);
+ let task_result = future::get(result_future);
+ let test_result = calc_result(test, task_result == task::success);
comm::send(monitor_ch, (test, test_result));
};
}
@@ -337,13 +336,6 @@ fn calc_result(test: test_desc, task_succeeded: bool) -> test_result {
}
}
-// Call from within a test task to make sure it's set up correctly
-fn configure_test_task() {
- // If this task fails we don't want that failure to propagate to the
- // test runner or else we couldn't keep running tests
- task::unsupervise();
-}
-
#[cfg(test)]
mod tests {
View
9 src/rt/rust_builtin.cpp
@@ -539,6 +539,11 @@ chan_id_send(type_desc *t, rust_task_id target_task_id,
// FIXME: make sure this is thread-safe
bool sent = false;
rust_task *task = rust_task_thread::get_task();
+
+ LOG(task, comm, "chan_id_send task: 0x%" PRIxPTR
+ " port: 0x%" PRIxPTR, (uintptr_t) target_task_id,
+ (uintptr_t) target_port_id);
+
rust_task *target_task = task->kernel->get_task_by_id(target_task_id);
if(target_task) {
rust_port *port = target_task->get_port_by_id(target_port_id);
@@ -547,8 +552,12 @@ chan_id_send(type_desc *t, rust_task_id target_task_id,
scoped_lock with(target_task->lock);
port->deref();
sent = true;
+ } else {
+ LOG(task, comm, "didn't get the port");
}
target_task->deref();
+ } else {
+ LOG(task, comm, "didn't get the task");
}
return (uintptr_t)sent;
}
View
22 src/test/bench/msgsend.rs
@@ -28,22 +28,28 @@ fn server(requests: comm::port<request>, responses: comm::chan<uint>) {
}
fn run(args: [str]) {
- let server = task::spawn_connected(server);
+ let from_child = comm::port();
+ let to_parent = comm::chan(from_child);
+ let to_child = task::spawn_listener {|po|
+ server(po, to_parent);
+ };
let size = uint::from_str(args[1]);
let workers = uint::from_str(args[2]);
let start = std::time::precise_time_s();
- let to_child = server.to_child;
- let worker_tasks = [];
+ let to_child = to_child;
+ let worker_results = [];
uint::range(0u, workers) {|_i|
- worker_tasks += [task::spawn_joinable {||
+ let builder = task::mk_task_builder();
+ worker_results += [task::future_result(builder)];
+ task::run(builder) {||
uint::range(0u, size / workers) {|_i|
comm::send(to_child, bytes(100u));
}
- }];
+ };
}
- vec::iter(worker_tasks) {|t| task::join(t); }
- comm::send(server.to_child, stop);
- let result = comm::recv(server.from_child);
+ vec::iter(worker_results) {|r| future::get(r); }
+ comm::send(to_child, stop);
+ let result = comm::recv(from_child);
let end = std::time::precise_time_s();
let elapsed = end - start;
std::io::stdout().write_str(#fmt("Count is %?\n", result));
View
8 src/test/bench/shootout-pfib.rs
@@ -69,11 +69,13 @@ fn stress_task(&&id: int) {
}
fn stress(num_tasks: int) {
- let tasks = [];
+ let results = [];
range(0, num_tasks) {|i|
- tasks += [task::spawn_joinable {|| stress_task(i); }];
+ let builder = task::mk_task_builder();
+ results += [task::future_result(builder)];
+ task::run(builder) {|| stress_task(i); }
}
- for t in tasks { task::join(t); }
+ for r in results { future::get(r); }
}
fn main(argv: [str]) {
View
2  src/test/bench/shootout-threadring.rs
@@ -10,7 +10,7 @@ fn start(+token: int) {
let ch = iter::foldl(bind int::range(2, n_threads + 1, _),
comm::chan(p)) { |ch, i|
let id = n_threads + 2 - i;
- let {to_child, _} = task::spawn_connected::<int, int> {|p, _ch|
+ let to_child = task::spawn_listener::<int> {|p|
roundtrip(id, p, ch)
};
to_child
View
2  src/test/bench/task-perf-spawnalot.rs
@@ -7,7 +7,7 @@ import str;
fn f(&&n: uint) {
let i = 0u;
while i < n {
- task::join(task::spawn_joinable {|| g(); });
+ task::try {|| g() };
i += 1u;
}
}
View
20 src/test/bench/task-perf-word-count.rs
@@ -15,7 +15,6 @@ import option::{some, none};
import std::{map, io, time};
import io::reader_util;
-import task::joinable_task;
import comm::chan;
import comm::port;
import comm::recv;
@@ -59,12 +58,14 @@ mod map_reduce {
enum reduce_proto { emit_val(int), done, ref, release, }
fn start_mappers(ctrl: chan<ctrl_proto>, -inputs: [str]) ->
- [joinable_task] {
- let tasks = [];
+ [future::future<task::task_result>] {
+ let results = [];
for i: str in inputs {
- tasks += [task::spawn_joinable {|| map_task(ctrl, i)}];
+ let builder = task::mk_task_builder();
+ results += [task::future_result(builder)];
+ task::run(builder) {|| map_task(ctrl, i)}
}
- ret tasks;
+ ret results;
}
fn map_task(ctrl: chan<ctrl_proto>, input: str) {
@@ -137,7 +138,7 @@ mod map_reduce {
reducers = map::new_str_hash();
let num_mappers = vec::len(inputs) as int;
- let tasks = start_mappers(chan(ctrl), inputs);
+ let results = start_mappers(chan(ctrl), inputs);
while num_mappers > 0 {
alt recv(ctrl) {
@@ -158,8 +159,9 @@ mod map_reduce {
// log(error, "creating new reducer for " + k);
let p = port();
let ch = chan(p);
- tasks +=
- [task::spawn_joinable{||reduce_task(k, ch)}];
+ let builder = task::mk_task_builder();
+ results += [task::future_result(builder)];
+ task::run(builder) {||reduce_task(k, ch)}
c = recv(p);
reducers.insert(k, c);
}
@@ -171,7 +173,7 @@ mod map_reduce {
reducers.values {|v| send(v, done); }
- for t in tasks { task::join(t); }
+ for r in results { future::get(r); }
}
}
View
11 src/test/run-pass/binops.rs
@@ -96,16 +96,6 @@ fn test_ptr() unsafe {
assert p1 >= p2;
}
-fn test_task() {
- fn f() { }
- let f1 = f, f2 = f;
- let t1 = task::spawn {|| f1(); };
- let t2 = task::spawn {|| f2(); };
-
- assert (t1 == t1);
- assert (t1 != t2);
-}
-
fn test_fn() {
fn f() { }
fn g() { }
@@ -147,7 +137,6 @@ fn main() {
test_port();
test_chan();
test_ptr();
- test_task();
test_fn();
test_native_fn();
}
View
8 src/test/run-pass/issue-507.rs
@@ -8,7 +8,6 @@
use std;
import task;
-import task::join;
import comm;
import comm::chan;
import comm::send;
@@ -18,21 +17,18 @@ import comm::recv;
fn grandchild(c: chan<int>) { send(c, 42); }
fn child(c: chan<int>) {
- let _grandchild = task::spawn_joinable {|| grandchild(c); };
- join(_grandchild);
+ task::spawn {|| grandchild(c); }
}
fn main() {
let p = comm::port();
let ch = chan(p);
- let _child = task::spawn_joinable {|| child(ch); };
+ task::spawn {|| child(ch); }
let x: int = recv(p);
log(debug, x);
assert (x == 42);
-
- join(_child);
}
View
5 src/test/run-pass/issue-783.rs
@@ -21,6 +21,7 @@ fn a() {
}
fn main() {
- let t = spawn_joinable {|| a(); };
- join(t);
+ iter::repeat(100u) {||
+ spawn {|| a(); }
+ }
}
View
15 src/test/run-pass/join.rs
@@ -1,15 +0,0 @@
-// -*- rust -*-
-
-use std;
-
-import task::*;
-
-fn main() {
- let other = spawn_joinable {|| child(); };
- #error("1");
- yield();
- join(other);
- #error("3");
-}
-
-fn child() { #error("2"); }
View
21 src/test/run-pass/linked-failure.rs
@@ -1,21 +0,0 @@
-// -*- rust -*-
-// xfail-win32
-use std;
-import task;
-import comm::port;
-import comm::recv;
-
-fn child() { assert (1 == 2); }
-
-fn parent() {
- // Since this task isn't supervised it won't bring down the whole
- // process
- task::unsupervise();
- let p = port::<int>();
- task::spawn {|| child(); };
- let x = recv(p);
-}
-
-fn main() {
- task::spawn {|| parent(); };
-}
View
5 src/test/run-pass/lots-a-fail.rs
@@ -9,12 +9,13 @@ fn die() {
}
fn iloop() {
- task::unsupervise();
task::spawn {|| die(); };
}
fn main() {
uint::range(0u, 100u) {|_i|
- task::spawn {|| iloop(); };
+ let builder = task::mk_task_builder();
+ task::unsupervise(builder);
+ task::run(builder) {|| iloop(); };
}
}
View
2  src/test/run-pass/morestack5.rs
@@ -12,7 +12,7 @@ fn getbig(&&i: int) {
fn main() {
let sz = 400u;
while sz < 500u {
- task::join(task::spawn_joinable {|| getbig(200) });
+ task::try {|| getbig(200) };
sz += 1u;
}
}
View
2  src/test/run-pass/morestack6.rs
@@ -61,6 +61,6 @@ fn main() {
for f in fns {
let sz = rng.next() % 256u32 + 256u32;
let frame_backoff = rng.next() % 10u32 + 1u32;
- task::join(task::spawn_joinable {|| runtest(f, frame_backoff);});
+ task::try {|| runtest(f, frame_backoff) };
}
}
View
5 src/test/run-pass/send-iloop.rs
@@ -9,7 +9,6 @@ fn die() {
}
fn iloop() {
- task::unsupervise();
task::spawn {|| die(); };
let p = comm::port::<()>();
let c = comm::chan(p);
@@ -23,6 +22,8 @@ fn iloop() {
fn main() {
uint::range(0u, 16u) {|_i|
- task::spawn {|| iloop(); };
+ let builder = task::mk_task_builder();
+ task::unsupervise(builder);
+ task::run(builder) {|| iloop(); }
}
}
View
9 src/test/run-pass/spawn-module-qualified.rs
@@ -1,9 +0,0 @@
-use std;
-import task::join;
-import task::spawn_joinable;
-
-fn main() { let x = spawn_joinable {|| m::child(10); }; join(x); }
-
-mod m {
- fn child(&&i: int) { log(debug, i); }
-}
View
3  src/test/run-pass/spawn.rs
@@ -5,8 +5,7 @@ use std;
import task;
fn main() {
- let t = task::spawn_joinable {|| child(10); };
- task::join(t);
+ task::spawn {|| child(10); };
}
fn child(&&i: int) { log(error, i); assert (i == 10); }
View
8 src/test/run-pass/task-comm-1.rs
@@ -1,14 +1,8 @@
-use std;
-
-import task::spawn_joinable;
-import task::join;
-
fn main() { test00(); }
fn start() { #debug("Started / Finished task."); }
fn test00() {
- let t = spawn_joinable {|| start(); };
- join(t);
+ task::try {|| start() };
#debug("Completing.");
}
View
6 src/test/run-pass/task-comm-12.rs
@@ -7,7 +7,9 @@ fn start(&&task_number: int) { #debug("Started / Finished task."); }
fn test00() {
let i: int = 0;
- let t = task::spawn_joinable {|| start(i); };
+ let builder = task::mk_task_builder();
+ let r = task::future_result(builder);
+ task::run(builder) {|| start(i); };
// Sleep long enough for the task to finish.
let i = 0;
@@ -17,7 +19,7 @@ fn test00() {
}
// Try joining tasks that have already finished.
- task::join(t);
+ future::get(r);
#debug("Joined task.");
}
View
3  src/test/run-pass/task-comm-13.rs
@@ -12,7 +12,6 @@ fn main() {
#debug("Check that we don't deadlock.");
let p = comm::port::<int>();
let ch = comm::chan(p);
- let a = task::spawn_joinable {|| start(ch, 0, 10); };
- task::join(a);
+ task::try {|| start(ch, 0, 10) };
#debug("Joined task");
}
View
31 src/test/run-pass/task-comm-2.rs
@@ -1,31 +0,0 @@
-// xfail-win32
-use std;
-
-import task;
-
-fn main() {
- #debug("===== SPAWNING and JOINING THREAD TASKS =====");
- test00();
-}
-
-fn start(&&task_number: int) {
- #debug("Started task.");
- let i: int = 0;
- while i < 10000 { i = i + 1; }
- #debug("Finished task.");
-}
-
-fn test00() {
- let number_of_tasks: int = 8;
-
- let i: int = 0;
- let tasks = [];
- while i < number_of_tasks {
- i = i + 1;
- tasks += [task::spawn_joinable {|| start(i); }];
- }
-
- for t in tasks { task::join(t); }
-
- #debug("Joined all task.");
-}
View
12 src/test/run-pass/task-comm-3.rs
@@ -30,17 +30,19 @@ fn test00() {
let i: int = 0;
// Create and spawn tasks...
- let tasks = [];
+ let results = [];
while i < number_of_tasks {
- tasks += [task::spawn_joinable {||
+ let builder = task::mk_task_builder();
+ results += [task::future_result(builder)];
+ task::run(builder) {||
test00_start(ch, i, number_of_messages)
- }];
+ }
i = i + 1;
}
// Read from spawned tasks...
let sum = 0;
- for t in tasks {
+ for r in results {
i = 0;
while i < number_of_messages {
let value = recv(po);
@@ -50,7 +52,7 @@ fn test00() {
}
// Join spawned tasks...
- for t in tasks { task::join(t); }
+ for r in results { future::get(r); }
#debug("Completed: Final number is: ");
log(error, sum);
View
36 src/test/run-pass/task-comm-7.rs
@@ -1,9 +1,6 @@
use std;
import task;
import comm;
-import comm::chan;
-import comm::recv;
-import comm::port;
fn main() { test00(); }
@@ -15,40 +12,35 @@ fn test00_start(c: comm::chan<int>, start: int, number_of_messages: int) {
fn test00() {
let r: int = 0;
let sum: int = 0;
- let p = port();
+ let p = comm::port();
let number_of_messages: int = 10;
- let c = chan(p);
+ let c = comm::chan(p);
- let t0 = task::spawn_joinable {||
+ task::spawn {||
test00_start(c, number_of_messages * 0, number_of_messages);
- };
- let t1 = task::spawn_joinable {||
+ }
+ task::spawn {||
test00_start(c, number_of_messages * 1, number_of_messages);
- };
- let t2 = task::spawn_joinable {||
+ }
+ task::spawn {||
test00_start(c, number_of_messages * 2, number_of_messages);
- };
- let t3 = task::spawn_joinable {||
+ }
+ task::spawn {||
test00_start(c, number_of_messages * 3, number_of_messages);
- };
+ }
let i: int = 0;
while i < number_of_messages {
- r = recv(p);
+ r = comm::recv(p);
sum += r;
- r = recv(p);
+ r = comm::recv(p);
sum += r;
- r = recv(p);
+ r = comm::recv(p);
sum += r;
- r = recv(p);
+ r = comm::recv(p);
sum += r;
i += 1;
}
- task::join(t0);
- task::join(t1);
- task::join(t2);
- task::join(t3);
-
assert (sum == number_of_messages * 4 * (number_of_messages * 4 - 1) / 2);
}
View
51 src/test/run-pass/task-comm-8.rs
@@ -1,51 +0,0 @@
-use std;
-import task;
-import comm;
-
-fn main() { test00(); }
-
-fn test00_start(c: comm::chan<int>, start: int, number_of_messages: int) {
- let i: int = 0;
- while i < number_of_messages { comm::send(c, start + i); i += 1; }
-}
-
-fn test00() {
- let r: int = 0;
- let sum: int = 0;
- let p = comm::port();
- let c = comm::chan(p);
- let number_of_messages: int = 10;
-
- let t0 = task::spawn_joinable {||
- test00_start(c, number_of_messages * 0, number_of_messages);
- };
- let t1 = task::spawn_joinable {||
- test00_start(c, number_of_messages * 1, number_of_messages);
- };
- let t2 = task::spawn_joinable {||
- test00_start(c, number_of_messages * 2, number_of_messages);
- };
- let t3 = task::spawn_joinable {||
- test00_start