Skip to content

Commit

Permalink
Merge pull request #32 from zonyitoo/master
Browse files Browse the repository at this point in the history
Do not context switch inside the `with` of thread local environment
  • Loading branch information
zonyitoo committed May 13, 2015
2 parents 632a665 + 9992c50 commit 094760c
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 92 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "coroutine"
version = "0.1.2"
version = "0.1.5"
authors = [
"Y. T. Chung <zonyitoo@gmail.com>",
"Young.Wu <doomsplayer@gmail.com>",
Expand Down
194 changes: 103 additions & 91 deletions src/coroutine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,17 @@ use context::Context;
use stack::{StackPool, Stack};

/// State of a Coroutine
#[derive(Debug, Copy, Clone)]
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum State {
/// Waiting its child to return
Normal,

/// Suspended. Can be waked up by `resume`
Suspended,

/// Blocked. Can be waked up by `resume`
Blocked,

/// Running
Running,

Expand Down Expand Up @@ -122,33 +125,31 @@ impl Handle {
_ => {}
}

COROUTINE_ENVIRONMENT.with(|env| {
let env: &mut Environment = unsafe { transmute(env.get()) };
let env = Environment::current();

let from_coro_hdl = Coroutine::current();
let from_coro: &mut Coroutine = unsafe {
let c: &mut Box<Coroutine> = transmute(from_coro_hdl.as_unsafe_cell().get());
c.deref_mut()
};
let from_coro_hdl = Coroutine::current();
let from_coro: &mut Coroutine = unsafe {
let c: &mut Box<Coroutine> = &mut *from_coro_hdl.as_unsafe_cell().get();
c.deref_mut()
};

let to_coro: &mut Box<Coroutine> = unsafe {
transmute(self.as_unsafe_cell().get())
};
let to_coro: &mut Box<Coroutine> = unsafe {
&mut *self.as_unsafe_cell().get()
};

// Save state
to_coro.set_state(State::Running);
to_coro.parent = from_coro;
from_coro.set_state(State::Normal);
// Save state
to_coro.set_state(State::Running);
to_coro.parent = from_coro;
from_coro.set_state(State::Normal);

env.current_running = self.clone();
Context::swap(&mut from_coro.saved_context, &to_coro.saved_context);
env.current_running = from_coro_hdl;
env.current_running = self.clone();
Context::swap(&mut from_coro.saved_context, &to_coro.saved_context);
env.current_running = from_coro_hdl;

match env.running_state.take() {
Some(err) => Err(err),
None => Ok(()),
}
})
match env.running_state.take() {
Some(err) => Err(err),
None => Ok(()),
}
}

/// Join this Coroutine.
Expand Down Expand Up @@ -178,7 +179,7 @@ impl Handle {
#[inline]
pub fn state(&self) -> State {
unsafe {
let c: &mut Box<Coroutine> = transmute(self.as_unsafe_cell().get());
let c: &mut Box<Coroutine> = &mut *self.as_unsafe_cell().get();
c.state()
}
}
Expand All @@ -187,7 +188,7 @@ impl Handle {
#[inline]
pub fn set_state(&self, state: State) {
unsafe {
let c: &mut Box<Coroutine> = transmute(self.as_unsafe_cell().get());
let c: &mut Box<Coroutine> = &mut *self.as_unsafe_cell().get();
c.set_state(state)
}
}
Expand Down Expand Up @@ -233,10 +234,8 @@ impl Drop for Coroutine {
fn drop(&mut self) {
match self.current_stack_segment.take() {
Some(stack) => {
COROUTINE_ENVIRONMENT.with(|env| {
let env: &mut Environment = unsafe { transmute(env.get()) };
env.stack_pool.give_stack(stack);
});
let env = Environment::current();
env.stack_pool.give_stack(stack);
},
None => {}
}
Expand All @@ -249,44 +248,43 @@ extern "C" fn coroutine_initialize(_: usize, f: *mut ()) -> ! {

let ret = unsafe { try(move|| func.invoke(())) };

COROUTINE_ENVIRONMENT.with(move|env| {
let env: &mut Environment = unsafe { transmute(env.get()) };
let env = Environment::current();

let cur: &mut Box<Coroutine> = unsafe {
transmute(env.current_running.as_unsafe_cell().get())
};
let cur: &mut Box<Coroutine> = unsafe {
&mut *env.current_running.as_unsafe_cell().get()
};

match ret {
Ok(..) => {
env.running_state = None;
cur.set_state(State::Finished);
}
Err(err) => {
cur.set_state(State::Panicked);

{
use std::io::stderr;
use std::io::Write;
let msg = match err.downcast_ref::<&'static str>() {
Some(s) => *s,
None => match err.downcast_ref::<String>() {
Some(s) => &s[..],
None => "Box<Any>",
}
};

let name = cur.name().unwrap_or("<unnamed>");

let _ = writeln!(&mut stderr(), "Coroutine '{}' panicked at '{}'", name, msg);
}
let state = match ret {
Ok(..) => {
env.running_state = None;

env.running_state = Some(err);
State::Finished
}
Err(err) => {
{
use std::io::stderr;
use std::io::Write;
let msg = match err.downcast_ref::<&'static str>() {
Some(s) => *s,
None => match err.downcast_ref::<String>() {
Some(s) => &s[..],
None => "Box<Any>",
}
};

let name = cur.name().unwrap_or("<unnamed>");

let _ = writeln!(&mut stderr(), "Coroutine '{}' panicked at '{}'", name, msg);
}

env.running_state = Some(err);

State::Panicked
}
});
};

loop {
Coroutine::sched();
Coroutine::yield_now(state);
}
}

Expand Down Expand Up @@ -315,20 +313,15 @@ impl Coroutine {
pub fn spawn_opts<F>(f: F, opts: Options) -> Handle
where F: FnOnce() + Send + 'static {

COROUTINE_ENVIRONMENT.with(move|env| {
unsafe {
let env: &mut Environment = transmute(env.get());

let mut stack = env.stack_pool.take_stack(opts.stack_size);
let env = Environment::current();
let mut stack = env.stack_pool.take_stack(opts.stack_size);

let ctx = Context::new(coroutine_initialize,
0,
f,
&mut stack);
let ctx = Context::new(coroutine_initialize,
0,
f,
&mut stack);

Coroutine::new(opts.name, stack, ctx, State::Suspended)
}
})
Coroutine::new(opts.name, stack, ctx, State::Suspended)
}

/// Spawn a Coroutine with default options
Expand All @@ -339,22 +332,31 @@ impl Coroutine {

/// Yield the current running Coroutine
#[inline]
pub fn sched() {
COROUTINE_ENVIRONMENT.with(|env| unsafe {
let env: &mut Environment = transmute(env.get());

let from_coro: &mut Box<Coroutine> = {
transmute(env.current_running.as_unsafe_cell().get())
};
pub fn yield_now(state: State) {
// Cannot yield with Running state
assert!(state != State::Running);

match from_coro.state() {
State::Finished | State::Panicked => {},
_ => from_coro.set_state(State::Suspended),
}
let env = Environment::current();
unsafe {
let from_coro: &mut Box<Coroutine> =
&mut *env.current_running.as_unsafe_cell().get();
from_coro.set_state(state);

let to_coro: &mut Coroutine = transmute(from_coro.parent);
let to_coro: &mut Coroutine = &mut *from_coro.parent;
Context::swap(&mut from_coro.saved_context, &to_coro.saved_context);
})
}
}

/// Yield the current running Coroutine with `Suspended` state
#[inline]
pub fn sched() {
Coroutine::yield_now(State::Suspended)
}

/// Yield the current running Coroutine with `Blocked` state
#[inline]
pub fn block() {
Coroutine::yield_now(State::Blocked)
}

/// Get a Handle to the current running Coroutine.
Expand All @@ -363,10 +365,7 @@ impl Coroutine {
/// in more than one native thread.
#[inline]
pub fn current() -> Handle {
COROUTINE_ENVIRONMENT.with(move|env| unsafe {
let env: &mut Environment = transmute(env.get());
env.current_running.clone()
})
Environment::current().current_running.clone()
}

#[inline(always)]
Expand Down Expand Up @@ -405,7 +404,7 @@ impl Environment {
let coro = unsafe {
let coro = Coroutine::empty(Some("<Environment Root Coroutine>".to_string()), State::Running);
coro.borrow_mut().parent = {
let itself: &mut Box<Coroutine> = transmute(coro.as_unsafe_cell().get());
let itself: &mut Box<Coroutine> = &mut *coro.as_unsafe_cell().get();
itself.deref_mut()
}; // Point to itself
coro
Expand All @@ -419,6 +418,12 @@ impl Environment {
running_state: None,
}
}

fn current() -> &'static mut Environment {
COROUTINE_ENVIRONMENT.with(|env| unsafe {
&mut *env.get()
})
}
}

/// Coroutine configuration. Provides detailed control over the properties and behavior of new Coroutines.
Expand Down Expand Up @@ -472,10 +477,17 @@ pub fn spawn<F>(f: F) -> Handle
/// Get the current Coroutine
///
/// Equavalent to `Coroutine::current`.
pub unsafe fn current() -> Handle {
pub fn current() -> Handle {
Coroutine::current()
}

/// Resume a Coroutine
///
/// Equavalent to `Coroutine::resume`.
pub fn resume(coro: &Handle) -> ResumeResult<()> {
coro.resume()
}

/// Yield the current Coroutine
///
/// Equavalent to `Coroutine::sched`.
Expand Down

0 comments on commit 094760c

Please sign in to comment.