From 93a075f93b4c9b7c05f7607011efedf66db6db5e Mon Sep 17 00:00:00 2001 From: "Y. T. Chung" Date: Wed, 15 Apr 2015 02:48:55 +0800 Subject: [PATCH 01/13] add yield with state, add Blocked state --- src/coroutine.rs | 38 +++++++++++++++++++++++++++----------- src/lib.rs | 1 + 2 files changed, 28 insertions(+), 11 deletions(-) diff --git a/src/coroutine.rs b/src/coroutine.rs index 96eb544..a291b2f 100644 --- a/src/coroutine.rs +++ b/src/coroutine.rs @@ -60,7 +60,7 @@ use context::Context; use stack::{StackPool, Stack}; /// State of a Coroutine -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, Eq, PartialEq)] pub enum State { /// Waiting its child to return Normal, @@ -68,6 +68,9 @@ pub enum State { /// Suspended. Can be waked up by `resume` Suspended, + /// Blocked. Can be waked up by `resume` + Blocked, + /// Running Running, @@ -249,7 +252,7 @@ extern "C" fn coroutine_initialize(_: usize, f: *mut ()) -> ! { let ret = unsafe { try(move|| func.invoke(())) }; - COROUTINE_ENVIRONMENT.with(move|env| { + let state = COROUTINE_ENVIRONMENT.with(move|env| { let env: &mut Environment = unsafe { transmute(env.get()) }; let cur: &mut Box = unsafe { @@ -259,11 +262,10 @@ extern "C" fn coroutine_initialize(_: usize, f: *mut ()) -> ! { match ret { Ok(..) => { env.running_state = None; - cur.set_state(State::Finished); + + State::Finished } Err(err) => { - cur.set_state(State::Panicked); - { use std::io::stderr; use std::io::Write; @@ -281,12 +283,14 @@ extern "C" fn coroutine_initialize(_: usize, f: *mut ()) -> ! { } env.running_state = Some(err); + + State::Panicked } } }); loop { - Coroutine::sched(); + Coroutine::yield_now(state); } } @@ -339,7 +343,10 @@ impl Coroutine { /// Yield the current running Coroutine #[inline] - pub fn sched() { + pub fn yield_now(state: State) { + // Cannot yield with Running state + assert!(state != State::Running); + COROUTINE_ENVIRONMENT.with(|env| unsafe { let env: &mut Environment = transmute(env.get()); @@ -347,16 +354,25 @@ impl Coroutine { transmute(env.current_running.as_unsafe_cell().get()) }; - match from_coro.state() { - State::Finished | State::Panicked => {}, - _ => from_coro.set_state(State::Suspended), - } + from_coro.set_state(state); let to_coro: &mut Coroutine = transmute(from_coro.parent); Context::swap(&mut from_coro.saved_context, &to_coro.saved_context); }) } + /// Yield the current running Coroutine with `Suspended` state + #[inline] + pub fn sched() { + Coroutine::yield_now(State::Suspended) + } + + /// Yield the current running Coroutine with `Blocked` state + #[inline] + pub fn block() { + Coroutine::yield_now(State::Blocked) + } + /// Get a Handle to the current running Coroutine. /// /// It is unsafe because it is an undefined behavior if you resume a Coroutine diff --git a/src/lib.rs b/src/lib.rs index 6e928de..2a95ee7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,5 +26,6 @@ pub use self::coroutine::Builder; pub mod context; pub mod coroutine; pub mod stack; +pub mod processor; mod thunk; // use self-maintained thunk, because std::thunk is temporary. mod sys; From 60c395936a6fa8c213b8c68e310bc0838a5c6866 Mon Sep 17 00:00:00 2001 From: "Y. T. Chung" Date: Mon, 20 Apr 2015 11:02:34 +0800 Subject: [PATCH 02/13] add sync module --- src/coroutine.rs | 2 +- src/lib.rs | 3 +- src/sync/mod.rs | 24 +++++++++++++ src/sync/mpsc.rs | 40 +++++++++++++++++++++ src/sync/mutex.rs | 86 ++++++++++++++++++++++++++++++++++++++++++++ src/sync/spinlock.rs | 64 +++++++++++++++++++++++++++++++++ 6 files changed, 217 insertions(+), 2 deletions(-) create mode 100644 src/sync/mod.rs create mode 100644 src/sync/mpsc.rs create mode 100644 src/sync/mutex.rs create mode 100644 src/sync/spinlock.rs diff --git a/src/coroutine.rs b/src/coroutine.rs index e6c3841..dda4183 100644 --- a/src/coroutine.rs +++ b/src/coroutine.rs @@ -488,7 +488,7 @@ pub fn spawn(f: F) -> Handle /// Get the current Coroutine /// /// Equavalent to `Coroutine::current`. -pub unsafe fn current() -> Handle { +pub fn current() -> Handle { Coroutine::current() } diff --git a/src/lib.rs b/src/lib.rs index 067ea10..9489c2c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,7 +13,7 @@ html_favicon_url = "http://www.rust-lang.org/favicon.ico")] #![feature(box_syntax, std_misc, libc, asm, core, alloc, test, unboxed_closures, page_size)] -#![feature(rustc_private)] +#![feature(rustc_private, optin_builtin_traits)] #[macro_use] extern crate log; extern crate libc; @@ -28,5 +28,6 @@ pub mod context; pub mod coroutine; pub mod stack; pub mod processor; +pub mod sync; mod thunk; // use self-maintained thunk, because std::thunk is temporary. mod sys; diff --git a/src/sync/mod.rs b/src/sync/mod.rs new file mode 100644 index 0000000..2667ef8 --- /dev/null +++ b/src/sync/mod.rs @@ -0,0 +1,24 @@ +// The MIT License (MIT) + +// Copyright (c) 2015 Rustcc Developers + +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: + +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. + +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +pub mod mpsc; +pub mod spinlock; +pub mod mutex; diff --git a/src/sync/mpsc.rs b/src/sync/mpsc.rs new file mode 100644 index 0000000..c63d2f4 --- /dev/null +++ b/src/sync/mpsc.rs @@ -0,0 +1,40 @@ +// The MIT License (MIT) + +// Copyright (c) 2015 Rustcc Develpers + +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: + +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. + +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +use std::sync::mpsc; + +pub struct Sender { + inner: mpsc::Sender, +} + +pub struct SyncSender { + inner: mpsc::SyncSender, +} + +unsafe impl Send for SyncSender {} + +impl !Sync for SyncSender {} + +pub struct Receiver { + inner: mpsc::Receiver, +} + + diff --git a/src/sync/mutex.rs b/src/sync/mutex.rs new file mode 100644 index 0000000..5f8615f --- /dev/null +++ b/src/sync/mutex.rs @@ -0,0 +1,86 @@ +// The MIT License (MIT) + +// Copyright (c) 2015 Rustcc Developers + +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: + +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. + +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +use std::collections::LinkedList; + +use sync::spinlock::SpinLock; +use coroutine::{self, Coroutine, Handle}; + +pub struct Mutex { + lock: SpinLock, + inner: T, + wait_list: LinkedList, +} + +impl Mutex { + pub fn new(inner: T) -> Mutex { + Mutex { + lock: SpinLock::new(), + inner: inner, + wait_list: LinkedList::new(), + } + } + + pub fn into_inner(self) -> T { + self.inner + } + + pub fn lock<'a>(&'a mut self) -> LockGuard<'a, T> { + match self.try_lock() { + Some(lg) => lg, + None => { + + + LockGuard::new(self) + } + } + } + + pub fn try_lock<'a>(&'a self) -> Option> { + if self.lock.try_lock() { + Some(LockGuard::new(self)) + } else { + None + } + } + + fn unlock(&self) { + + } +} + +pub struct LockGuard<'a, T: 'a> { + mutex: &'a Mutex, +} + +impl<'a, T: 'a> LockGuard<'a, T> { + fn new(mutex: &'a Mutex) -> LockGuard<'a, T> { + LockGuard { + mutex: mutex, + } + } +} + +impl<'a, T: 'a> Drop for LockGuard<'a, T> { + fn drop(&mut self) { + self.mutex.unlock() + } +} diff --git a/src/sync/spinlock.rs b/src/sync/spinlock.rs new file mode 100644 index 0000000..4718c0f --- /dev/null +++ b/src/sync/spinlock.rs @@ -0,0 +1,64 @@ +// The MIT License (MIT) + +// Copyright (c) 2015 Rustcc Developers + +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: + +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. + +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +use std::sync::atomic::{AtomicBool, Ordering}; + +/// Spinlock +pub struct SpinLock { + flag: AtomicBool, +} + +impl SpinLock { + /// Create a new Spinlock + pub fn new() -> SpinLock { + SpinLock { + flag: AtomicBool::new(false), + } + } + + pub fn try_lock(&self) -> bool { + !self.flag.compare_and_swap(false, true, Ordering::Acquire) + } + + pub fn lock(&self) { + while !self.try_lock() {} + } + + pub fn unlock(&self) { + self.flag.store(false, Ordering::Release) + } +} + +#[cfg(test)] +mod test { + use super::SpinLock; + + #[test] + fn test_spinlock_basic() { + let lock = SpinLock::new(); + + assert!(lock.try_lock()); + + assert!(!lock.try_lock()); + + lock.unlock(); + } +} From 1c8b182bd510bf8b919bad5b5f4bf774c786d6d3 Mon Sep 17 00:00:00 2001 From: "Y. T. Chung" Date: Tue, 5 May 2015 23:07:23 +0800 Subject: [PATCH 03/13] basic implementation of mutex --- src/sync/mutex.rs | 98 +++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 82 insertions(+), 16 deletions(-) diff --git a/src/sync/mutex.rs b/src/sync/mutex.rs index 5f8615f..227c243 100644 --- a/src/sync/mutex.rs +++ b/src/sync/mutex.rs @@ -19,62 +19,85 @@ // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -use std::collections::LinkedList; +use std::collections::VecDeque; +use std::ops::{Deref, DerefMut}; +use std::cell::UnsafeCell; use sync::spinlock::SpinLock; use coroutine::{self, Coroutine, Handle}; pub struct Mutex { lock: SpinLock, - inner: T, - wait_list: LinkedList, + inner: UnsafeCell, + wait_list: UnsafeCell>, } impl Mutex { pub fn new(inner: T) -> Mutex { Mutex { lock: SpinLock::new(), - inner: inner, - wait_list: LinkedList::new(), + inner: UnsafeCell::new(inner), + wait_list: UnsafeCell::new(VecDeque::new()), } } pub fn into_inner(self) -> T { - self.inner + unsafe { + self.inner.into_inner() + } } - pub fn lock<'a>(&'a mut self) -> LockGuard<'a, T> { - match self.try_lock() { - Some(lg) => lg, - None => { - - - LockGuard::new(self) + pub fn lock<'a>(&'a self) -> LockGuard<'a, T> { + if !self.lock.try_lock() { + let current = coroutine::current(); + unsafe { + let wait_list: &mut VecDeque = &mut *self.wait_list.get(); + wait_list.push_back(current); } + coroutine::sched(); } + + LockGuard::new(self, &self.inner) } pub fn try_lock<'a>(&'a self) -> Option> { if self.lock.try_lock() { - Some(LockGuard::new(self)) + Some(LockGuard::new(self, &self.inner)) } else { None } } fn unlock(&self) { - + self.lock.unlock(); + let front = unsafe { + let wait_list: &mut VecDeque = &mut *self.wait_list.get(); + wait_list.pop_front() + }; + + match front { + Some(hdl) => { + coroutine::resume(&hdl).unwrap(); + }, + None => {} + } } } +unsafe impl Send for Mutex {} + +unsafe impl Sync for Mutex {} + pub struct LockGuard<'a, T: 'a> { mutex: &'a Mutex, + data: &'a UnsafeCell, } impl<'a, T: 'a> LockGuard<'a, T> { - fn new(mutex: &'a Mutex) -> LockGuard<'a, T> { + fn new(mutex: &'a Mutex, data: &'a UnsafeCell) -> LockGuard<'a, T> { LockGuard { mutex: mutex, + data: data, } } } @@ -84,3 +107,46 @@ impl<'a, T: 'a> Drop for LockGuard<'a, T> { self.mutex.unlock() } } + +impl<'a, T: 'a> Deref for LockGuard<'a, T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { + &*self.data.get() + } + } +} + +impl<'a, T: 'a> DerefMut for LockGuard<'a, T> { + fn deref_mut(&mut self) -> &mut T { + unsafe { + &mut *self.data.get() + } + } +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use coroutine::{spawn, sched}; + + use super::Mutex; + + #[test] + fn test_mutex_basic() { + let lock = Arc::new(Mutex::new(1)); + + for _ in 0..10 { + let lock = lock.clone(); + spawn(move|| { + let mut guard = lock.lock(); + + *guard += 1; + }).resume(); + } + + assert_eq!(*lock.lock(), 11); + } +} From 9ed5d19439144c6721ed67083bc3ac1e4a694ace Mon Sep 17 00:00:00 2001 From: "Y. T. Chung" Date: Wed, 13 May 2015 20:20:13 +0800 Subject: [PATCH 04/13] refactor a little --- src/coroutine.rs | 56 ++++++++++++++++++++++------------------------- src/lib.rs | 1 + src/sync/mutex.rs | 43 ++++++++++++++++-------------------- 3 files changed, 46 insertions(+), 54 deletions(-) diff --git a/src/coroutine.rs b/src/coroutine.rs index a0c8682..52bff9e 100644 --- a/src/coroutine.rs +++ b/src/coroutine.rs @@ -125,33 +125,31 @@ impl Handle { _ => {} } - COROUTINE_ENVIRONMENT.with(|env| { - let env: &mut Environment = unsafe { transmute(env.get()) }; + let env = Environment::current(); - let from_coro_hdl = Coroutine::current(); - let from_coro: &mut Coroutine = unsafe { - let c: &mut Box = transmute(from_coro_hdl.as_unsafe_cell().get()); - c.deref_mut() - }; + let from_coro_hdl = Coroutine::current(); + let from_coro: &mut Coroutine = unsafe { + let c: &mut Box = &mut *from_coro_hdl.as_unsafe_cell().get(); + c.deref_mut() + }; - let to_coro: &mut Box = unsafe { - transmute(self.as_unsafe_cell().get()) - }; + let to_coro: &mut Box = unsafe { + &mut *self.as_unsafe_cell().get() + }; - // Save state - to_coro.set_state(State::Running); - to_coro.parent = from_coro; - from_coro.set_state(State::Normal); + // Save state + to_coro.set_state(State::Running); + to_coro.parent = from_coro; + from_coro.set_state(State::Normal); - env.current_running = self.clone(); - Context::swap(&mut from_coro.saved_context, &to_coro.saved_context); - env.current_running = from_coro_hdl; + env.current_running = self.clone(); + Context::swap(&mut from_coro.saved_context, &to_coro.saved_context); + env.current_running = from_coro_hdl; - match env.running_state.take() { - Some(err) => Err(err), - None => Ok(()), - } - }) + match env.running_state.take() { + Some(err) => Err(err), + None => Ok(()), + } } /// Join this Coroutine. @@ -181,7 +179,7 @@ impl Handle { #[inline] pub fn state(&self) -> State { unsafe { - let c: &mut Box = transmute(self.as_unsafe_cell().get()); + let c: &mut Box = &mut *self.as_unsafe_cell().get(); c.state() } } @@ -190,7 +188,7 @@ impl Handle { #[inline] pub fn set_state(&self, state: State) { unsafe { - let c: &mut Box = transmute(self.as_unsafe_cell().get()); + let c: &mut Box = &mut *self.as_unsafe_cell().get(); c.set_state(state) } } @@ -236,10 +234,8 @@ impl Drop for Coroutine { fn drop(&mut self) { match self.current_stack_segment.take() { Some(stack) => { - COROUTINE_ENVIRONMENT.with(|env| { - let env: &mut Environment = unsafe { transmute(env.get()) }; - env.stack_pool.give_stack(stack); - }); + let env = Environment::current(); + env.stack_pool.give_stack(stack); }, None => {} } @@ -346,7 +342,7 @@ impl Coroutine { &mut *env.current_running.as_unsafe_cell().get(); from_coro.set_state(state); - let to_coro: &mut Coroutine = transmute(from_coro.parent); + let to_coro: &mut Coroutine = &mut *from_coro.parent; Context::swap(&mut from_coro.saved_context, &to_coro.saved_context); } } @@ -408,7 +404,7 @@ impl Environment { let coro = unsafe { let coro = Coroutine::empty(Some("".to_string()), State::Running); coro.borrow_mut().parent = { - let itself: &mut Box = transmute(coro.as_unsafe_cell().get()); + let itself: &mut Box = &mut *coro.as_unsafe_cell().get(); itself.deref_mut() }; // Point to itself coro diff --git a/src/lib.rs b/src/lib.rs index 9489c2c..cd33e15 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,6 +14,7 @@ #![feature(box_syntax, std_misc, libc, asm, core, alloc, test, unboxed_closures, page_size)] #![feature(rustc_private, optin_builtin_traits)] +#![feature(scoped)] #[macro_use] extern crate log; extern crate libc; diff --git a/src/sync/mutex.rs b/src/sync/mutex.rs index 227c243..cb34981 100644 --- a/src/sync/mutex.rs +++ b/src/sync/mutex.rs @@ -29,7 +29,6 @@ use coroutine::{self, Coroutine, Handle}; pub struct Mutex { lock: SpinLock, inner: UnsafeCell, - wait_list: UnsafeCell>, } impl Mutex { @@ -37,7 +36,6 @@ impl Mutex { Mutex { lock: SpinLock::new(), inner: UnsafeCell::new(inner), - wait_list: UnsafeCell::new(VecDeque::new()), } } @@ -49,11 +47,6 @@ impl Mutex { pub fn lock<'a>(&'a self) -> LockGuard<'a, T> { if !self.lock.try_lock() { - let current = coroutine::current(); - unsafe { - let wait_list: &mut VecDeque = &mut *self.wait_list.get(); - wait_list.push_back(current); - } coroutine::sched(); } @@ -70,17 +63,6 @@ impl Mutex { fn unlock(&self) { self.lock.unlock(); - let front = unsafe { - let wait_list: &mut VecDeque = &mut *self.wait_list.get(); - wait_list.pop_front() - }; - - match front { - Some(hdl) => { - coroutine::resume(&hdl).unwrap(); - }, - None => {} - } } } @@ -129,6 +111,7 @@ impl<'a, T: 'a> DerefMut for LockGuard<'a, T> { #[cfg(test)] mod test { use std::sync::Arc; + use std::thread; use coroutine::{spawn, sched}; @@ -136,17 +119,29 @@ mod test { #[test] fn test_mutex_basic() { - let lock = Arc::new(Mutex::new(1)); + let lock = Arc::new(Mutex::new(0)); + + let mut futs = Vec::new(); for _ in 0..10 { + println!("??"); let lock = lock.clone(); - spawn(move|| { - let mut guard = lock.lock(); + let fut = thread::scoped(move|| { + spawn(move|| { + let mut guard = lock.lock(); + for _ in 0..100_0000 { + *guard += 1; + } + println!("HERE!!"); + }).resume().unwrap(); + }); + futs.push(fut); + } - *guard += 1; - }).resume(); + for fut in futs.into_iter() { + fut.join(); } - assert_eq!(*lock.lock(), 11); + assert_eq!(*lock.lock(), 100_0000 * 10); } } From 24ff67031090d6f34d39428fb5da07ecdfcc6e15 Mon Sep 17 00:00:00 2001 From: "Y. T. Chung" Date: Sat, 23 May 2015 23:25:44 +0800 Subject: [PATCH 05/13] add scheduler and net io support --- Cargo.toml | 4 + src/lib.rs | 5 + src/net/mod.rs | 45 +++++ src/net/tcp.rs | 292 ++++++++++++++++++++++++++++++ src/net/udp.rs | 109 +++++++++++ src/scheduler.rs | 463 +++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 918 insertions(+) create mode 100644 src/net/mod.rs create mode 100644 src/net/tcp.rs create mode 100644 src/net/udp.rs create mode 100644 src/scheduler.rs diff --git a/Cargo.toml b/Cargo.toml index c1e1518..7673b0a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,10 @@ path = "src/lib.rs" [dependencies] mmap = "*" +deque = "*" + +[dependencies.mio] +git = "https://github.com/carllerche/mio.git" [dev-dependencies] num_cpus = "*" diff --git a/src/lib.rs b/src/lib.rs index ea88184..5414347 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,12 +19,17 @@ extern crate libc; extern crate test; extern crate mmap; +extern crate deque; +extern crate mio; pub use coroutine::Builder; pub use coroutine::{Coroutine, spawn, sched, current}; +pub use scheduler::Scheduler; pub mod context; pub mod coroutine; pub mod stack; +pub mod scheduler; +pub mod net; mod thunk; // use self-maintained thunk, because std::thunk is temporary. May be replaced by FnBox in the future. mod sys; diff --git a/src/net/mod.rs b/src/net/mod.rs new file mode 100644 index 0000000..da60929 --- /dev/null +++ b/src/net/mod.rs @@ -0,0 +1,45 @@ +// The MIT License (MIT) + +// Copyright (c) 2015 Rustcc Developers + +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: + +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. + +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +pub use self::tcp::{TcpListener, TcpStream, TcpSocket}; +pub use self::udp::UdpSocket; + +use std::io; +use std::net::{ToSocketAddrs, SocketAddr}; + +pub mod tcp; +pub mod udp; + +fn each_addr(addr: A, mut f: F) -> io::Result + where F: FnMut(&SocketAddr) -> io::Result +{ + let mut last_err = None; + for addr in try!(addr.to_socket_addrs()) { + match f(&addr) { + Ok(l) => return Ok(l), + Err(e) => last_err = Some(e), + } + } + Err(last_err.unwrap_or_else(|| { + io::Error::new(io::ErrorKind::InvalidInput, + "could not resolve to any addresses") + })) +} diff --git a/src/net/tcp.rs b/src/net/tcp.rs new file mode 100644 index 0000000..28e87fd --- /dev/null +++ b/src/net/tcp.rs @@ -0,0 +1,292 @@ +// The MIT License (MIT) + +// Copyright (c) 2015 Rustcc Developers + +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: + +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. + +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +use std::io; +use std::net::{ToSocketAddrs, SocketAddr}; +use std::ops::{Deref, DerefMut}; + +use mio::{self, Interest}; +use mio::buf::{Buf, MutBuf, MutSliceBuf, SliceBuf}; + +use scheduler::Scheduler; + +pub struct TcpSocket(::mio::tcp::TcpSocket); + +impl TcpSocket { + /// Returns a new, unbound, non-blocking, IPv4 socket + pub fn v4() -> io::Result { + Ok(TcpSocket(try!(::mio::tcp::TcpSocket::v4()))) + } + + /// Returns a new, unbound, non-blocking, IPv6 socket + pub fn v6() -> io::Result { + Ok(TcpSocket(try!(::mio::tcp::TcpSocket::v6()))) + } + + pub fn connect(addr: A) -> io::Result<(TcpStream, bool)> { + // let (stream, complete) = try!(self.0.connect(addr)); + // Ok((TcpStream(stream), complete)) + + super::each_addr(addr, |a| { + match a { + &SocketAddr::V4(..) => try!(TcpSocket::v4()).0.connect(a), + &SocketAddr::V6(..) => try!(TcpSocket::v6()).0.connect(a), + } + }).map(|(stream, complete)| (TcpStream(stream), complete)) + } + + pub fn listen(self, backlog: usize) -> io::Result { + Ok(TcpListener(try!(self.0.listen(backlog)))) + } +} + +impl Deref for TcpSocket { + type Target = ::mio::tcp::TcpSocket; + + fn deref(&self) -> &::mio::tcp::TcpSocket { + &self.0 + } +} + +impl DerefMut for TcpSocket { + fn deref_mut(&mut self) -> &mut ::mio::tcp::TcpSocket { + &mut self.0 + } +} + +pub struct TcpListener(::mio::tcp::TcpListener); + +impl TcpListener { + pub fn bind(addr: A) -> io::Result { + // let listener = try!(::mio::tcp::TcpListener::bind(addr)); + + // Ok(TcpListener(listener)) + super::each_addr(addr, ::mio::tcp::TcpListener::bind).map(TcpListener) + } + + pub fn accept(&self) -> io::Result { + match self.0.accept() { + Ok(None) => { + debug!("accept WouldBlock; going to register into eventloop"); + }, + Ok(Some(stream)) => { + return Ok(TcpStream(stream)); + }, + Err(err) => { + return Err(err); + } + } + + try!(Scheduler::current().wait_event(&self.0, Interest::readable())); + + match self.0.accept() { + Ok(None) => { + panic!("accept WouldBlock; Coroutine was awaked by readable event"); + }, + Ok(Some(stream)) => { + Ok(TcpStream(stream)) + }, + Err(err) => { + Err(err) + } + } + } + + pub fn try_clone(&self) -> io::Result { + Ok(TcpListener(try!(self.0.try_clone()))) + } +} + +impl Deref for TcpListener { + type Target = ::mio::tcp::TcpListener; + + fn deref(&self) -> &::mio::tcp::TcpListener { + &self.0 + } +} + +impl DerefMut for TcpListener { + fn deref_mut(&mut self) -> &mut ::mio::tcp::TcpListener { + &mut self.0 + } +} + +pub struct TcpStream(mio::tcp::TcpStream); + +impl TcpStream { + pub fn connect(addr: A) -> io::Result { + // let stream = try!(mio::tcp::TcpStream::connect(addr)); + + // Ok(TcpStream(stream)) + super::each_addr(addr, ::mio::tcp::TcpStream::connect).map(TcpStream) + } + + pub fn peer_addr(&self) -> io::Result { + self.0.peer_addr() + } + + pub fn local_addr(&self) -> io::Result { + self.0.local_addr() + } + + pub fn try_clone(&self) -> io::Result { + let stream = try!(self.0.try_clone()); + + Ok(TcpStream(stream)) + } +} + +impl io::Read for TcpStream { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + use mio::TryRead; + + let mut buf = MutSliceBuf::wrap(buf); + let mut total_len = 0; + while buf.has_remaining() { + match self.0.read(&mut buf) { + Ok(None) => { + debug!("TcpStream read WouldBlock"); + break; + }, + Ok(Some(0)) => { + debug!("TcpStream read 0 bytes; may be EOF"); + return Ok(total_len); + }, + Ok(Some(len)) => { + debug!("TcpStream read {} bytes", len); + total_len += len; + }, + Err(err) => { + return Err(err); + } + } + } + + if total_len != 0 { + // We got something, just return! + return Ok(total_len); + } + + debug!("Read: Going to register event"); + try!(Scheduler::current().wait_event(&self.0, Interest::readable())); + debug!("Read: Got read event"); + + while buf.has_remaining() { + match self.0.read(&mut buf) { + Ok(None) => { + debug!("TcpStream read WouldBlock"); + break; + }, + Ok(Some(0)) => { + debug!("TcpStream read 0 bytes; may be EOF"); + break; + }, + Ok(Some(len)) => { + debug!("TcpStream read {} bytes", len); + total_len += len; + }, + Err(err) => { + return Err(err); + } + } + } + + Ok(total_len) + } +} + +impl io::Write for TcpStream { + fn write(&mut self, buf: &[u8]) -> io::Result { + use mio::TryWrite; + + let mut buf = SliceBuf::wrap(buf); + let mut total_len = 0; + + while buf.has_remaining() { + match self.0.write(&mut buf) { + Ok(None) => { + debug!("TcpStream write WouldBlock"); + break; + }, + Ok(Some(0)) => { + debug!("TcpStream write 0 bytes; may be EOF"); + return Ok(total_len); + }, + Ok(Some(len)) => { + debug!("TcpStream written {} bytes", len); + total_len += len; + }, + Err(err) => { + return Err(err) + } + } + } + + if total_len != 0 { + // We have written something, return it! + return Ok(total_len) + } + + debug!("Write: Going to register event"); + try!(Scheduler::current().wait_event(&self.0, Interest::writable())); + debug!("Write: Got write event"); + + while buf.has_remaining() { + match self.0.write(&mut buf) { + Ok(None) => { + debug!("TcpStream write WouldBlock"); + break; + }, + Ok(Some(0)) => { + debug!("TcpStream write 0 bytes; may be EOF"); + break; + }, + Ok(Some(len)) => { + debug!("TcpStream written {} bytes", len); + total_len += len; + }, + Err(err) => { + return Err(err) + } + } + } + + Ok(total_len) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +impl Deref for TcpStream { + type Target = ::mio::tcp::TcpStream; + + fn deref(&self) -> &::mio::tcp::TcpStream { + &self.0 + } +} + +impl DerefMut for TcpStream { + fn deref_mut(&mut self) -> &mut ::mio::tcp::TcpStream { + &mut self.0 + } +} diff --git a/src/net/udp.rs b/src/net/udp.rs new file mode 100644 index 0000000..88eb559 --- /dev/null +++ b/src/net/udp.rs @@ -0,0 +1,109 @@ +// The MIT License (MIT) + +// Copyright (c) 2015 Rustcc Developers + +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: + +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. + +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +use std::ops::{Deref, DerefMut}; +use std::io; +use std::net::SocketAddr; + +use mio::Interest; +use mio::buf::{Buf, MutBuf}; + +use scheduler::Scheduler; + +pub struct UdpSocket(::mio::udp::UdpSocket); + +impl UdpSocket { + /// Returns a new, unbound, non-blocking, IPv4 UDP socket + pub fn v4() -> io::Result { + Ok(UdpSocket(try!(::mio::udp::UdpSocket::v4()))) + } + + /// Returns a new, unbound, non-blocking, IPv6 UDP socket + pub fn v6() -> io::Result { + Ok(UdpSocket(try!(::mio::udp::UdpSocket::v6()))) + } + + pub fn bound(addr: &SocketAddr) -> io::Result { + Ok(UdpSocket(try!(::mio::udp::UdpSocket::bound(addr)))) + } + + pub fn try_clone(&self) -> io::Result { + Ok(UdpSocket(try!(self.0.try_clone()))) + } + + pub fn send_to(&self, buf: &mut B, target: &SocketAddr) -> io::Result> { + match try!(self.0.send_to(buf, target)) { + None => { + debug!("UdpSocket send_to WOULDBLOCK"); + }, + Some(..) => { + return Ok(Some(())); + } + } + + try!(Scheduler::current().wait_event(&self.0, Interest::writable())); + + match try!(self.0.send_to(buf, target)) { + None => { + panic!("UdpSocket send_to WOULDBLOCK"); + }, + Some(..) => { + return Ok(Some(())); + } + } + } + + pub fn recv_from(&self, buf: &mut B) -> io::Result> { + match try!(self.0.recv_from(buf)) { + None => { + debug!("UdpSocket recv_from WOULDBLOCK"); + }, + Some(addr) => { + return Ok(Some(addr)); + } + } + + try!(Scheduler::current().wait_event(&self.0, Interest::readable())); + + match try!(self.0.recv_from(buf)) { + None => { + panic!("UdpSocket recv_from WOULDBLOCK"); + }, + Some(addr) => { + return Ok(Some(addr)); + } + } + } +} + +impl Deref for UdpSocket { + type Target = ::mio::udp::UdpSocket; + + fn deref(&self) -> &::mio::udp::UdpSocket { + return &self.0 + } +} + +impl DerefMut for UdpSocket { + fn deref_mut(&mut self) -> &mut ::mio::udp::UdpSocket { + return &mut self.0 + } +} diff --git a/src/scheduler.rs b/src/scheduler.rs new file mode 100644 index 0000000..7f9cf72 --- /dev/null +++ b/src/scheduler.rs @@ -0,0 +1,463 @@ +// The MIT License (MIT) + +// Copyright (c) 2015 Rustcc Developers + +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: + +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. + +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +use std::thread; +use std::sync::mpsc::{channel, Sender, Receiver, TryRecvError}; +use std::sync::{Mutex, Once, ONCE_INIT}; +use std::mem; +use std::cell::UnsafeCell; +use std::io; +#[cfg(target_os = "linux")] +use std::os::unix::io::AsRawFd; +#[cfg(target_os = "linux")] +use std::convert::From; +use std::sync::atomic::{ATOMIC_BOOL_INIT, AtomicBool, AtomicUsize, Ordering}; +use std::sync::Arc; +use std::collections::VecDeque; + +use coroutine::{State, Handle, Coroutine, Options}; + +use deque::{BufferPool, Stealer, Worker, Stolen}; + +use mio::{EventLoop, Evented, Handler, Token, ReadHint, Interest, PollOpt}; +use mio::util::Slab; +#[cfg(target_os = "linux")] +use mio::Io; + +static mut THREAD_HANDLES: *const Mutex, Stealer)>> = + 0 as *const Mutex, Stealer)>>; +static THREAD_HANDLES_ONCE: Once = ONCE_INIT; +static SCHEDULER_HAS_STARTED: AtomicBool = ATOMIC_BOOL_INIT; + +fn schedulers() -> &'static Mutex, Stealer)>> { + unsafe { + THREAD_HANDLES_ONCE.call_once(|| { + let handles: Box, Stealer)>>> = + Box::new(Mutex::new(Vec::new())); + + THREAD_HANDLES = mem::transmute(handles); + }); + + & *THREAD_HANDLES + } +} + +thread_local!(static SCHEDULER: UnsafeCell = UnsafeCell::new(Scheduler::new())); + +pub enum SchedMessage { + NewNeighbor(Sender, Stealer), + Shutdown, +} + +const MAX_PRIVATE_WORK_NUM: usize = 10; + +pub struct Scheduler { + workqueue: Worker, + workstealer: Stealer, + + commchannel: Receiver, + + neighbors: Vec<(Sender, Stealer)>, + + eventloop: EventLoop, + handler: SchedulerHandler, + + private_work: VecDeque, +} + +impl Scheduler { + + fn new() -> Scheduler { + let bufpool = BufferPool::new(); + let (worker, stealer) = bufpool.deque(); + + let (tx, rx) = channel(); + + let scheds = schedulers(); + let mut guard = scheds.lock().unwrap(); + + for &(ref rtx, _) in guard.iter() { + let _ = rtx.send(SchedMessage::NewNeighbor(tx.clone(), stealer.clone())); + } + + let neighbors = guard.clone(); + guard.push((tx, stealer.clone())); + + Scheduler { + workqueue: worker, + workstealer: stealer, + + commchannel: rx, + + neighbors: neighbors, + + eventloop: EventLoop::new().unwrap(), + handler: SchedulerHandler::new(), + + private_work: VecDeque::new(), + } + } + + pub fn current() -> &'static mut Scheduler { + SCHEDULER.with(|s| unsafe { + &mut *s.get() + }) + } + + pub fn spawn(f: F) + where F: FnOnce() + Send + 'static { + let coro = Coroutine::spawn(f); + Scheduler::current().ready(coro); + + Coroutine::sched(); + } + + pub fn spawn_opts(f: F, opt: Options) + where F: FnOnce() + Send + 'static { + let coro = Coroutine::spawn_opts(f, opt); + Scheduler::current().ready(coro); + + Coroutine::sched(); + } + + pub fn ready(&mut self, work: Handle) { + if self.private_work.len() >= MAX_PRIVATE_WORK_NUM { + self.workqueue.push(work); + } else { + self.private_work.push_back(work); + } + } + + pub fn run(f: F, threads: usize) + where F: FnOnce() + Send + 'static { + + assert!(threads >= 1, "Threads must >= 1"); + if SCHEDULER_HAS_STARTED.compare_and_swap(false, true, Ordering::SeqCst) != false { + panic!("Schedulers are already running!"); + } + + // Start worker threads first + let counter = Arc::new(AtomicUsize::new(0)); + for tid in 0..threads - 1 { + let counter = counter.clone(); + thread::Builder::new().name(format!("Thread {}", tid)).spawn(move|| { + let current = Scheduler::current(); + counter.fetch_add(1, Ordering::SeqCst); + current.schedule(); + }).unwrap(); + } + + while counter.load(Ordering::SeqCst) != threads - 1 {} + + Scheduler::spawn(|| { + struct Guard; + + // Send Shutdown to all schedulers + impl Drop for Guard { + fn drop(&mut self) { + let guard = match schedulers().lock() { + Ok(g) => g, + Err(poisoned) => poisoned.into_inner() + }; + + for &(ref chan, _) in guard.iter() { + let _ = chan.send(SchedMessage::Shutdown); + } + } + } + + let _guard = Guard; + + f(); + }); + + Scheduler::current().schedule(); + + SCHEDULER_HAS_STARTED.store(false, Ordering::SeqCst); + } + + fn resume_coroutine(&mut self, work: Handle) { + match work.state() { + State::Suspended | State::Blocked => { + debug!("Resuming Coroutine: {:?}", work); + + if let Err(err) = work.resume() { + let msg = match err.downcast_ref::<&'static str>() { + Some(s) => *s, + None => match err.downcast_ref::() { + Some(s) => &s[..], + None => "Box", + } + }; + + error!("Coroutine panicked! {:?}", msg); + } + + match work.state() { + State::Normal | State::Running => { + unreachable!(); + }, + State::Suspended => { + debug!("Coroutine suspended, going to be resumed next round"); + self.ready(work); + }, + State::Blocked => { + debug!("Coroutine blocked, maybe waiting for I/O"); + }, + State::Finished | State::Panicked => { + debug!("Coroutine state: {:?}, will not be resumed automatically", work.state()); + } + } + }, + _ => { + error!("Trying to resume coroutine {:?}, but its state is {:?}", + work, work.state()); + } + } + } + + fn schedule(&mut self) { + loop { + match self.commchannel.try_recv() { + Ok(SchedMessage::NewNeighbor(tx, st)) => { + self.neighbors.push((tx, st)); + }, + Ok(SchedMessage::Shutdown) => { + info!("Shutting down"); + break; + }, + Err(TryRecvError::Empty) => {}, + _ => panic!("Receiving from channel: Unknown message") + } + + if !self.handler.slabs.is_empty() { + self.eventloop.run_once(&mut self.handler).unwrap(); + } + + debug!("Trying to resume all ready coroutines: {:?}", thread::current().name()); + // Run all ready coroutines + let mut need_steal = true; + // while let Some(work) = self.workqueue.pop() { + // while let Stolen::Data(work) = self.workstealer.steal() { + // need_steal = false; + // self.resume_coroutine(work); + // } + + while let Some(work) = self.private_work.pop_front() { + need_steal = false; + self.resume_coroutine(work); + } + + if need_steal { + if let Stolen::Data(work) = self.workstealer.steal() { + need_steal = false; + self.resume_coroutine(work); + } + } + + if !need_steal || !self.handler.slabs.is_empty() { + continue; + } + + debug!("Trying to steal from neighbors: {:?}", thread::current().name()); + + // if self.neighbors.len() > 0 { + // let neighbor_idx = ::rand::random::() % self.neighbors.len(); + // let stolen = { + // let &(_, ref neighbor_stealer) = &self.neighbors[neighbor_idx]; + // neighbor_stealer.steal() + // }; + + // if let Stolen::Data(coro) = stolen { + // self.resume_coroutine(coro); + // continue; + // } + // } + let mut has_stolen = false; + let stolen_works = self.neighbors.iter() + .filter_map(|&(_, ref st)| + if let Stolen::Data(w) = st.steal() { + Some(w) + } else { + None + }) + .collect::>(); + for work in stolen_works.into_iter() { + has_stolen = true; + self.resume_coroutine(work); + } + + if !has_stolen { + thread::sleep_ms(100); + } + } + } + + // fn resume(&mut self, handle: Handle) { + // self.workqueue.push(handle); + // } +} + +const MAX_TOKEN_NUM: usize = 102400; +impl SchedulerHandler { + fn new() -> SchedulerHandler { + SchedulerHandler { + // slabs: Slab::new_starting_at(Token(1), MAX_TOKEN_NUM), + slabs: Slab::new(MAX_TOKEN_NUM), + } + } +} + +#[cfg(any(target_os = "linux", + target_os = "android"))] +impl Scheduler { + pub fn wait_event(&mut self, fd: &E, interest: Interest) -> io::Result<()> { + let token = self.handler.slabs.insert((Coroutine::current(), From::from(fd.as_raw_fd()))).unwrap(); + try!(self.eventloop.register_opt(fd, token, interest, + PollOpt::level()|PollOpt::oneshot())); + + debug!("wait_event: Blocked current Coroutine ...; token={:?}", token); + Coroutine::block(); + debug!("wait_event: Waked up; token={:?}", token); + + Ok(()) + } +} + +#[cfg(any(target_os = "linux", + target_os = "android"))] +struct SchedulerHandler { + slabs: Slab<(Handle, Io)>, +} + +#[cfg(any(target_os = "linux", + target_os = "android"))] +impl Handler for SchedulerHandler { + type Timeout = (); + type Message = (); + + fn writable(&mut self, event_loop: &mut EventLoop, token: Token) { + + debug!("In writable, token {:?}", token); + + match self.slabs.remove(token) { + Some((hdl, fd)) => { + // Linux EPoll needs to explicit EPOLL_CTL_DEL the fd + event_loop.deregister(&fd).unwrap(); + mem::forget(fd); + Scheduler::current().ready(hdl); + }, + None => { + warn!("No coroutine is waiting on writable {:?}", token); + } + } + + } + + fn readable(&mut self, event_loop: &mut EventLoop, token: Token, hint: ReadHint) { + + debug!("In readable, token {:?}, hint {:?}", token, hint); + + match self.slabs.remove(token) { + Some((hdl, fd)) => { + // Linux EPoll needs to explicit EPOLL_CTL_DEL the fd + event_loop.deregister(&fd).unwrap(); + mem::forget(fd); + Scheduler::current().ready(hdl); + }, + None => { + warn!("No coroutine is waiting on readable {:?}", token); + } + } + + } +} + +#[cfg(any(target_os = "macos", + target_os = "freebsd", + target_os = "dragonfly", + target_os = "ios", + target_os = "bitrig", + target_os = "openbsd"))] +impl Scheduler { + pub fn wait_event(&mut self, fd: &E, interest: Interest) -> io::Result<()> { + let token = self.handler.slabs.insert(Coroutine::current()).unwrap(); + try!(self.eventloop.register_opt(fd, token, interest, + PollOpt::level()|PollOpt::oneshot())); + + debug!("wait_event: Blocked current Coroutine ...; token={:?}", token); + Coroutine::block(); + debug!("wait_event: Waked up; token={:?}", token); + + Ok(()) + } +} + +#[cfg(any(target_os = "macos", + target_os = "freebsd", + target_os = "dragonfly", + target_os = "ios", + target_os = "bitrig", + target_os = "openbsd"))] +struct SchedulerHandler { + slabs: Slab, +} + +#[cfg(any(target_os = "macos", + target_os = "freebsd", + target_os = "dragonfly", + target_os = "ios", + target_os = "bitrig", + target_os = "openbsd"))] +impl Handler for SchedulerHandler { + type Timeout = (); + type Message = (); + + fn writable(&mut self, _: &mut EventLoop, token: Token) { + + debug!("In writable, token {:?}", token); + + match self.slabs.remove(token) { + Some(hdl) => { + Scheduler::current().ready(hdl); + }, + None => { + warn!("No coroutine is waiting on writable {:?}", token); + } + } + + } + + fn readable(&mut self, _: &mut EventLoop, token: Token, hint: ReadHint) { + + debug!("In readable, token {:?}, hint {:?}", token, hint); + + match self.slabs.remove(token) { + Some(hdl) => { + Scheduler::current().ready(hdl); + }, + None => { + warn!("No coroutine is waiting on readable {:?}", token); + } + } + + } +} From 8333d3cc742aa01115ed5384fc6d1dc75a3c9803 Mon Sep 17 00:00:00 2001 From: "Y. T. Chung" Date: Sat, 23 May 2015 23:39:13 +0800 Subject: [PATCH 06/13] add a barge, removed useless feature --- .travis.yml | 4 +++- Cargo.toml | 8 -------- README.md | 2 +- 3 files changed, 4 insertions(+), 10 deletions(-) diff --git a/.travis.yml b/.travis.yml index a13c8de..31b94c5 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,4 +1,6 @@ language: rust +rust: nightly + os: - linux - osx @@ -6,4 +8,4 @@ script: - cargo build -v - cargo test -v - cargo run --example simple - - cargo doc --no-deps \ No newline at end of file + - cargo doc --no-deps diff --git a/Cargo.toml b/Cargo.toml index 7673b0a..4390fda 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,14 +14,6 @@ homepage = "https://github.com/rustcc/coroutine-rs" build = "build.rs" keywords = ["coroutine", "green", "thread", "fiber"] -[features] - -default = [ - "enable-stack-recycle" -] - -enable-stack-recycle = [] - [build-dependencies] gcc = "*" log = "*" diff --git a/README.md b/README.md index ad9ddcb..464b323 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # coroutine-rs -[![Build Status](https://travis-ci.org/rustcc/coroutine-rs.png?branch=master)](https://travis-ci.org/rustcc/coroutine-rs) +[![Build Status](https://travis-ci.org/rustcc/coroutine-rs.png?branch=master)](https://travis-ci.org/rustcc/coroutine-rs) [![crates.io](https://img.shields.io/crates/v/coroutine.svg)](https://crates.io/crates/coroutine) Coroutine library in Rust From 4261a83a57567d1f20d40ee944aa3f9d7526c989 Mon Sep 17 00:00:00 2001 From: "Y. T. Chung" Date: Sun, 24 May 2015 01:50:36 +0800 Subject: [PATCH 07/13] use stack instead of parent pointer --- src/coroutine.rs | 47 ++++++++++++++++++++++++----------------------- src/sync/mpsc.rs | 20 ++++++++++++++++++++ 2 files changed, 44 insertions(+), 23 deletions(-) diff --git a/src/coroutine.rs b/src/coroutine.rs index f60884d..e36427b 100644 --- a/src/coroutine.rs +++ b/src/coroutine.rs @@ -83,7 +83,6 @@ use std::rt::unwind::try; use std::any::Any; use std::cell::UnsafeCell; use std::ops::Deref; -use std::ptr; use std::sync::Arc; use std::cell::RefCell; @@ -174,13 +173,11 @@ impl Handle { // Save state to_coro.set_state(State::Running); - to_coro.parent = from_coro; from_coro.set_state(State::Normal); - env.current_running = self.clone(); + env.coroutine_stack.push(self.clone()); Context::swap(&mut from_coro.saved_context, &to_coro.saved_context); } - env.current_running = from_coro_hdl; match env.running_state.take() { Some(err) => Err(err), @@ -245,9 +242,6 @@ pub struct Coroutine { /// Always valid if the task is alive and not running. saved_context: Context, - /// Parent coroutine, will always be valid - parent: *mut Coroutine, - /// State state: State, @@ -279,7 +273,9 @@ extern "C" fn coroutine_initialize(_: usize, f: *mut ()) -> ! { let env = Environment::current(); - let cur: &mut Coroutine = unsafe { env.current_running.get_inner_mut() }; + let cur: &mut Coroutine = unsafe { + env.coroutine_stack.last().expect("Impossible happened! No current coroutine!").get_inner_mut() + }; let state = match ret { Ok(..) => { @@ -320,7 +316,6 @@ impl Coroutine { Handle::new(Coroutine { current_stack_segment: None, saved_context: Context::empty(), - parent: ptr::null_mut(), state: state, name: name, }) @@ -330,7 +325,6 @@ impl Coroutine { Handle::new(Coroutine { current_stack_segment: Some(stack), saved_context: ctx, - parent: ptr::null_mut(), state: state, name: name, }) @@ -364,12 +358,19 @@ impl Coroutine { assert!(state != State::Running); let env = Environment::current(); - unsafe { - let from_coro = env.current_running.get_inner_mut(); - from_coro.set_state(state); + if env.coroutine_stack.len() == 1 { + // Environment root + return; + } - let to_coro: &mut Coroutine = &mut *from_coro.parent; - Context::swap(&mut from_coro.saved_context, &to_coro.saved_context); + unsafe { + match (env.coroutine_stack.pop(), env.coroutine_stack.last()) { + (Some(from_coro), Some(to_coro)) => { + from_coro.set_state(state); + Context::swap(&mut from_coro.get_inner_mut().saved_context, &to_coro.saved_context); + }, + _ => unreachable!() + } } } @@ -391,7 +392,8 @@ impl Coroutine { /// in more than one native thread. #[inline] pub fn current() -> Handle { - Environment::current().current_running.clone() + Environment::current().coroutine_stack.last().map(|hdl| hdl.clone()) + .expect("Impossible happened! No current coroutine!") } #[inline(always)] @@ -418,8 +420,7 @@ thread_local!(static COROUTINE_ENVIRONMENT: UnsafeCell = UnsafeCell struct Environment { stack_pool: StackPool, - current_running: Handle, - __main_coroutine: Handle, + coroutine_stack: Vec, running_state: Option>, } @@ -427,16 +428,16 @@ struct Environment { impl Environment { /// Initialize a new environment fn new() -> Environment { - let coro = unsafe { + let st = unsafe { + let mut st = Vec::new(); let coro = Coroutine::empty(Some("".to_string()), State::Running); - coro.0.borrow_mut().parent = coro.get_inner_mut(); // Point to itself - coro + st.push(coro); + st }; Environment { stack_pool: StackPool::new(), - current_running: coro.clone(), - __main_coroutine: coro, + coroutine_stack: st, running_state: None, } diff --git a/src/sync/mpsc.rs b/src/sync/mpsc.rs index c63d2f4..91f5909 100644 --- a/src/sync/mpsc.rs +++ b/src/sync/mpsc.rs @@ -21,6 +21,9 @@ use std::sync::mpsc; +use coroutine::Coroutine; + +#[derive(Clone)] pub struct Sender { inner: mpsc::Sender, } @@ -37,4 +40,21 @@ pub struct Receiver { inner: mpsc::Receiver, } +impl Sender { + fn new(inner: mpsc::Sender) -> Sender { + Sender { + inner: inner, + } + } + + pub fn send(&self, data: T) -> Result<(), mpsc::SendError> { + try!(self.inner.send(data)); + Coroutine::sched(); + } + + pub fn try_send(&self, data: T) -> Result<(), mpsc::TrySendError> { + + } +} + From 2a4fd6725c788894d4cf4b83c525059c0661a9ab Mon Sep 17 00:00:00 2001 From: "Y. T. Chung" Date: Sun, 24 May 2015 11:21:04 +0800 Subject: [PATCH 08/13] use rustfmt, state normal cannot be resumed --- src/coroutine.rs | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/src/coroutine.rs b/src/coroutine.rs index e36427b..cd63350 100644 --- a/src/coroutine.rs +++ b/src/coroutine.rs @@ -160,6 +160,8 @@ impl Handle { match self.state() { State::Finished | State::Running => return Ok(()), State::Panicked => panic!("Trying to resume a panicked coroutine"), + State::Normal => panic!("Coroutine {:?} is waiting for its child to return, cannot resume!", + self.name().unwrap_or("")), _ => {} } @@ -201,7 +203,7 @@ impl Handle { pub fn join(&self) -> ResumeResult<()> { loop { match self.state() { - State::Suspended => try!(self.resume()), + State::Suspended | State::Blocked => try!(self.resume()), _ => break, } } @@ -332,22 +334,21 @@ impl Coroutine { /// Spawn a Coroutine with options pub fn spawn_opts(f: F, opts: Options) -> Handle - where F: FnOnce() + Send + 'static { + where F: FnOnce() + Send + 'static + { let env = Environment::current(); let mut stack = env.stack_pool.take_stack(opts.stack_size); - let ctx = Context::new(coroutine_initialize, - 0, - f, - &mut stack); + let ctx = Context::new(coroutine_initialize, 0, f, &mut stack); Coroutine::new(opts.name, stack, ctx, State::Suspended) } /// Spawn a Coroutine with default options pub fn spawn(f: F) -> Handle - where F: FnOnce() + Send + 'static { + where F: FnOnce() + Send + 'static + { Coroutine::spawn_opts(f, Default::default()) } @@ -485,7 +486,8 @@ impl Builder { /// Spawn a new Coroutine, and return a handle for it. pub fn spawn(self, f: F) -> Handle - where F: FnOnce() + Send + 'static { + where F: FnOnce() + Send + 'static + { Coroutine::spawn_opts(f, self.opts) } } @@ -494,7 +496,8 @@ impl Builder { /// /// Equavalent to `Coroutine::spawn`. pub fn spawn(f: F) -> Handle - where F: FnOnce() + Send + 'static { + where F: FnOnce() + Send + 'static +{ Builder::new().spawn(f) } From 83b012a905c59cbe3bd797501ddce0bd9e5a32e0 Mon Sep 17 00:00:00 2001 From: "Y. T. Chung" Date: Sun, 24 May 2015 11:32:05 +0800 Subject: [PATCH 09/13] add license, add one more badge --- LICENSE-MIT | 25 +++++++++++++++++++++++++ README.md | 2 +- 2 files changed, 26 insertions(+), 1 deletion(-) create mode 100644 LICENSE-MIT diff --git a/LICENSE-MIT b/LICENSE-MIT new file mode 100644 index 0000000..25bedb8 --- /dev/null +++ b/LICENSE-MIT @@ -0,0 +1,25 @@ +Copyright (c) 2014 Rustcc Developers + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md index 464b323..cb86051 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # coroutine-rs -[![Build Status](https://travis-ci.org/rustcc/coroutine-rs.png?branch=master)](https://travis-ci.org/rustcc/coroutine-rs) [![crates.io](https://img.shields.io/crates/v/coroutine.svg)](https://crates.io/crates/coroutine) +[![Build Status](https://img.shields.io/travis/zonyitoo/coroutine-rs.svg)](https://travis-ci.org/rustcc/coroutine-rs) [![crates.io](https://img.shields.io/crates/v/coroutine.svg)](https://crates.io/crates/coroutine) [![crates.io](https://img.shields.io/crates/l/coroutine.svg)](https://crates.io/crates/coroutine) Coroutine library in Rust From 89a9bb9d2de717d50f1186f32ba7c63e9a80b36c Mon Sep 17 00:00:00 2001 From: "Y. T. Chung" Date: Sun, 24 May 2015 11:33:52 +0800 Subject: [PATCH 10/13] wrong url --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index cb86051..d6d6bdf 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # coroutine-rs -[![Build Status](https://img.shields.io/travis/zonyitoo/coroutine-rs.svg)](https://travis-ci.org/rustcc/coroutine-rs) [![crates.io](https://img.shields.io/crates/v/coroutine.svg)](https://crates.io/crates/coroutine) [![crates.io](https://img.shields.io/crates/l/coroutine.svg)](https://crates.io/crates/coroutine) +[![Build Status](https://img.shields.io/travis/rustcc/coroutine-rs.svg)](https://travis-ci.org/rustcc/coroutine-rs) [![crates.io](https://img.shields.io/crates/v/coroutine.svg)](https://crates.io/crates/coroutine) [![crates.io](https://img.shields.io/crates/l/coroutine.svg)](https://crates.io/crates/coroutine) Coroutine library in Rust From 42acad2129cc244c2847a2b28164d0577d000c55 Mon Sep 17 00:00:00 2001 From: "Y. T. Chung" Date: Sat, 30 May 2015 01:17:19 +0800 Subject: [PATCH 11/13] removed unfinished scheduler and io lib --- Cargo.toml | 4 ---- src/lib.rs | 5 ----- 2 files changed, 9 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4390fda..d59d808 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,10 +24,6 @@ path = "src/lib.rs" [dependencies] mmap = "*" -deque = "*" - -[dependencies.mio] -git = "https://github.com/carllerche/mio.git" [dev-dependencies] num_cpus = "*" diff --git a/src/lib.rs b/src/lib.rs index 5414347..ea88184 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,17 +19,12 @@ extern crate libc; extern crate test; extern crate mmap; -extern crate deque; -extern crate mio; pub use coroutine::Builder; pub use coroutine::{Coroutine, spawn, sched, current}; -pub use scheduler::Scheduler; pub mod context; pub mod coroutine; pub mod stack; -pub mod scheduler; -pub mod net; mod thunk; // use self-maintained thunk, because std::thunk is temporary. May be replaced by FnBox in the future. mod sys; From e7643f9dfff329890c97b24f678f34ba15898462 Mon Sep 17 00:00:00 2001 From: "Y. T. Chung" Date: Sun, 31 May 2015 02:17:10 +0800 Subject: [PATCH 12/13] bench context switch --- src/coroutine.rs | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/src/coroutine.rs b/src/coroutine.rs index cd63350..6e2df5b 100644 --- a/src/coroutine.rs +++ b/src/coroutine.rs @@ -637,9 +637,9 @@ mod bench { use super::Coroutine; #[bench] - fn bench_coroutine_spawning_with_recycle(b: &mut Bencher) { + fn bench_coroutine_spawning(b: &mut Bencher) { b.iter(|| { - let _ = Coroutine::spawn(move|| {}).join(); + let _ = Coroutine::spawn(move|| {}); }); } @@ -681,4 +681,21 @@ mod bench { assert_eq!(result, MAX_NUMBER); }); } + + #[bench] + fn bench_context_switch(b: &mut Bencher) { + let coro = Coroutine::spawn(|| { + loop { + // 3. Save current context + // 4. Switch + Coroutine::sched(); + } + }); + + b.iter(|| { + // 1. Save current context + // 2. Switch + let _ = coro.resume(); + }); + } } From d5f91f02c50d799fa6653d25cc96c4c7a1dc11be Mon Sep 17 00:00:00 2001 From: "Y. T. Chung" Date: Sun, 31 May 2015 02:18:36 +0800 Subject: [PATCH 13/13] remove scheduler by now --- src/net/mod.rs | 45 ----- src/net/tcp.rs | 292 --------------------------- src/net/udp.rs | 109 ---------- src/scheduler.rs | 463 ------------------------------------------- src/sync/mod.rs | 24 --- src/sync/mpsc.rs | 60 ------ src/sync/mutex.rs | 147 -------------- src/sync/spinlock.rs | 64 ------ 8 files changed, 1204 deletions(-) delete mode 100644 src/net/mod.rs delete mode 100644 src/net/tcp.rs delete mode 100644 src/net/udp.rs delete mode 100644 src/scheduler.rs delete mode 100644 src/sync/mod.rs delete mode 100644 src/sync/mpsc.rs delete mode 100644 src/sync/mutex.rs delete mode 100644 src/sync/spinlock.rs diff --git a/src/net/mod.rs b/src/net/mod.rs deleted file mode 100644 index da60929..0000000 --- a/src/net/mod.rs +++ /dev/null @@ -1,45 +0,0 @@ -// The MIT License (MIT) - -// Copyright (c) 2015 Rustcc Developers - -// Permission is hereby granted, free of charge, to any person obtaining a copy of -// this software and associated documentation files (the "Software"), to deal in -// the Software without restriction, including without limitation the rights to -// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of -// the Software, and to permit persons to whom the Software is furnished to do so, -// subject to the following conditions: - -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. - -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS -// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR -// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER -// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN -// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - -pub use self::tcp::{TcpListener, TcpStream, TcpSocket}; -pub use self::udp::UdpSocket; - -use std::io; -use std::net::{ToSocketAddrs, SocketAddr}; - -pub mod tcp; -pub mod udp; - -fn each_addr(addr: A, mut f: F) -> io::Result - where F: FnMut(&SocketAddr) -> io::Result -{ - let mut last_err = None; - for addr in try!(addr.to_socket_addrs()) { - match f(&addr) { - Ok(l) => return Ok(l), - Err(e) => last_err = Some(e), - } - } - Err(last_err.unwrap_or_else(|| { - io::Error::new(io::ErrorKind::InvalidInput, - "could not resolve to any addresses") - })) -} diff --git a/src/net/tcp.rs b/src/net/tcp.rs deleted file mode 100644 index 28e87fd..0000000 --- a/src/net/tcp.rs +++ /dev/null @@ -1,292 +0,0 @@ -// The MIT License (MIT) - -// Copyright (c) 2015 Rustcc Developers - -// Permission is hereby granted, free of charge, to any person obtaining a copy of -// this software and associated documentation files (the "Software"), to deal in -// the Software without restriction, including without limitation the rights to -// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of -// the Software, and to permit persons to whom the Software is furnished to do so, -// subject to the following conditions: - -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. - -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS -// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR -// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER -// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN -// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - -use std::io; -use std::net::{ToSocketAddrs, SocketAddr}; -use std::ops::{Deref, DerefMut}; - -use mio::{self, Interest}; -use mio::buf::{Buf, MutBuf, MutSliceBuf, SliceBuf}; - -use scheduler::Scheduler; - -pub struct TcpSocket(::mio::tcp::TcpSocket); - -impl TcpSocket { - /// Returns a new, unbound, non-blocking, IPv4 socket - pub fn v4() -> io::Result { - Ok(TcpSocket(try!(::mio::tcp::TcpSocket::v4()))) - } - - /// Returns a new, unbound, non-blocking, IPv6 socket - pub fn v6() -> io::Result { - Ok(TcpSocket(try!(::mio::tcp::TcpSocket::v6()))) - } - - pub fn connect(addr: A) -> io::Result<(TcpStream, bool)> { - // let (stream, complete) = try!(self.0.connect(addr)); - // Ok((TcpStream(stream), complete)) - - super::each_addr(addr, |a| { - match a { - &SocketAddr::V4(..) => try!(TcpSocket::v4()).0.connect(a), - &SocketAddr::V6(..) => try!(TcpSocket::v6()).0.connect(a), - } - }).map(|(stream, complete)| (TcpStream(stream), complete)) - } - - pub fn listen(self, backlog: usize) -> io::Result { - Ok(TcpListener(try!(self.0.listen(backlog)))) - } -} - -impl Deref for TcpSocket { - type Target = ::mio::tcp::TcpSocket; - - fn deref(&self) -> &::mio::tcp::TcpSocket { - &self.0 - } -} - -impl DerefMut for TcpSocket { - fn deref_mut(&mut self) -> &mut ::mio::tcp::TcpSocket { - &mut self.0 - } -} - -pub struct TcpListener(::mio::tcp::TcpListener); - -impl TcpListener { - pub fn bind(addr: A) -> io::Result { - // let listener = try!(::mio::tcp::TcpListener::bind(addr)); - - // Ok(TcpListener(listener)) - super::each_addr(addr, ::mio::tcp::TcpListener::bind).map(TcpListener) - } - - pub fn accept(&self) -> io::Result { - match self.0.accept() { - Ok(None) => { - debug!("accept WouldBlock; going to register into eventloop"); - }, - Ok(Some(stream)) => { - return Ok(TcpStream(stream)); - }, - Err(err) => { - return Err(err); - } - } - - try!(Scheduler::current().wait_event(&self.0, Interest::readable())); - - match self.0.accept() { - Ok(None) => { - panic!("accept WouldBlock; Coroutine was awaked by readable event"); - }, - Ok(Some(stream)) => { - Ok(TcpStream(stream)) - }, - Err(err) => { - Err(err) - } - } - } - - pub fn try_clone(&self) -> io::Result { - Ok(TcpListener(try!(self.0.try_clone()))) - } -} - -impl Deref for TcpListener { - type Target = ::mio::tcp::TcpListener; - - fn deref(&self) -> &::mio::tcp::TcpListener { - &self.0 - } -} - -impl DerefMut for TcpListener { - fn deref_mut(&mut self) -> &mut ::mio::tcp::TcpListener { - &mut self.0 - } -} - -pub struct TcpStream(mio::tcp::TcpStream); - -impl TcpStream { - pub fn connect(addr: A) -> io::Result { - // let stream = try!(mio::tcp::TcpStream::connect(addr)); - - // Ok(TcpStream(stream)) - super::each_addr(addr, ::mio::tcp::TcpStream::connect).map(TcpStream) - } - - pub fn peer_addr(&self) -> io::Result { - self.0.peer_addr() - } - - pub fn local_addr(&self) -> io::Result { - self.0.local_addr() - } - - pub fn try_clone(&self) -> io::Result { - let stream = try!(self.0.try_clone()); - - Ok(TcpStream(stream)) - } -} - -impl io::Read for TcpStream { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - use mio::TryRead; - - let mut buf = MutSliceBuf::wrap(buf); - let mut total_len = 0; - while buf.has_remaining() { - match self.0.read(&mut buf) { - Ok(None) => { - debug!("TcpStream read WouldBlock"); - break; - }, - Ok(Some(0)) => { - debug!("TcpStream read 0 bytes; may be EOF"); - return Ok(total_len); - }, - Ok(Some(len)) => { - debug!("TcpStream read {} bytes", len); - total_len += len; - }, - Err(err) => { - return Err(err); - } - } - } - - if total_len != 0 { - // We got something, just return! - return Ok(total_len); - } - - debug!("Read: Going to register event"); - try!(Scheduler::current().wait_event(&self.0, Interest::readable())); - debug!("Read: Got read event"); - - while buf.has_remaining() { - match self.0.read(&mut buf) { - Ok(None) => { - debug!("TcpStream read WouldBlock"); - break; - }, - Ok(Some(0)) => { - debug!("TcpStream read 0 bytes; may be EOF"); - break; - }, - Ok(Some(len)) => { - debug!("TcpStream read {} bytes", len); - total_len += len; - }, - Err(err) => { - return Err(err); - } - } - } - - Ok(total_len) - } -} - -impl io::Write for TcpStream { - fn write(&mut self, buf: &[u8]) -> io::Result { - use mio::TryWrite; - - let mut buf = SliceBuf::wrap(buf); - let mut total_len = 0; - - while buf.has_remaining() { - match self.0.write(&mut buf) { - Ok(None) => { - debug!("TcpStream write WouldBlock"); - break; - }, - Ok(Some(0)) => { - debug!("TcpStream write 0 bytes; may be EOF"); - return Ok(total_len); - }, - Ok(Some(len)) => { - debug!("TcpStream written {} bytes", len); - total_len += len; - }, - Err(err) => { - return Err(err) - } - } - } - - if total_len != 0 { - // We have written something, return it! - return Ok(total_len) - } - - debug!("Write: Going to register event"); - try!(Scheduler::current().wait_event(&self.0, Interest::writable())); - debug!("Write: Got write event"); - - while buf.has_remaining() { - match self.0.write(&mut buf) { - Ok(None) => { - debug!("TcpStream write WouldBlock"); - break; - }, - Ok(Some(0)) => { - debug!("TcpStream write 0 bytes; may be EOF"); - break; - }, - Ok(Some(len)) => { - debug!("TcpStream written {} bytes", len); - total_len += len; - }, - Err(err) => { - return Err(err) - } - } - } - - Ok(total_len) - } - - fn flush(&mut self) -> io::Result<()> { - Ok(()) - } -} - -impl Deref for TcpStream { - type Target = ::mio::tcp::TcpStream; - - fn deref(&self) -> &::mio::tcp::TcpStream { - &self.0 - } -} - -impl DerefMut for TcpStream { - fn deref_mut(&mut self) -> &mut ::mio::tcp::TcpStream { - &mut self.0 - } -} diff --git a/src/net/udp.rs b/src/net/udp.rs deleted file mode 100644 index 88eb559..0000000 --- a/src/net/udp.rs +++ /dev/null @@ -1,109 +0,0 @@ -// The MIT License (MIT) - -// Copyright (c) 2015 Rustcc Developers - -// Permission is hereby granted, free of charge, to any person obtaining a copy of -// this software and associated documentation files (the "Software"), to deal in -// the Software without restriction, including without limitation the rights to -// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of -// the Software, and to permit persons to whom the Software is furnished to do so, -// subject to the following conditions: - -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. - -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS -// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR -// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER -// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN -// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - -use std::ops::{Deref, DerefMut}; -use std::io; -use std::net::SocketAddr; - -use mio::Interest; -use mio::buf::{Buf, MutBuf}; - -use scheduler::Scheduler; - -pub struct UdpSocket(::mio::udp::UdpSocket); - -impl UdpSocket { - /// Returns a new, unbound, non-blocking, IPv4 UDP socket - pub fn v4() -> io::Result { - Ok(UdpSocket(try!(::mio::udp::UdpSocket::v4()))) - } - - /// Returns a new, unbound, non-blocking, IPv6 UDP socket - pub fn v6() -> io::Result { - Ok(UdpSocket(try!(::mio::udp::UdpSocket::v6()))) - } - - pub fn bound(addr: &SocketAddr) -> io::Result { - Ok(UdpSocket(try!(::mio::udp::UdpSocket::bound(addr)))) - } - - pub fn try_clone(&self) -> io::Result { - Ok(UdpSocket(try!(self.0.try_clone()))) - } - - pub fn send_to(&self, buf: &mut B, target: &SocketAddr) -> io::Result> { - match try!(self.0.send_to(buf, target)) { - None => { - debug!("UdpSocket send_to WOULDBLOCK"); - }, - Some(..) => { - return Ok(Some(())); - } - } - - try!(Scheduler::current().wait_event(&self.0, Interest::writable())); - - match try!(self.0.send_to(buf, target)) { - None => { - panic!("UdpSocket send_to WOULDBLOCK"); - }, - Some(..) => { - return Ok(Some(())); - } - } - } - - pub fn recv_from(&self, buf: &mut B) -> io::Result> { - match try!(self.0.recv_from(buf)) { - None => { - debug!("UdpSocket recv_from WOULDBLOCK"); - }, - Some(addr) => { - return Ok(Some(addr)); - } - } - - try!(Scheduler::current().wait_event(&self.0, Interest::readable())); - - match try!(self.0.recv_from(buf)) { - None => { - panic!("UdpSocket recv_from WOULDBLOCK"); - }, - Some(addr) => { - return Ok(Some(addr)); - } - } - } -} - -impl Deref for UdpSocket { - type Target = ::mio::udp::UdpSocket; - - fn deref(&self) -> &::mio::udp::UdpSocket { - return &self.0 - } -} - -impl DerefMut for UdpSocket { - fn deref_mut(&mut self) -> &mut ::mio::udp::UdpSocket { - return &mut self.0 - } -} diff --git a/src/scheduler.rs b/src/scheduler.rs deleted file mode 100644 index 7f9cf72..0000000 --- a/src/scheduler.rs +++ /dev/null @@ -1,463 +0,0 @@ -// The MIT License (MIT) - -// Copyright (c) 2015 Rustcc Developers - -// Permission is hereby granted, free of charge, to any person obtaining a copy of -// this software and associated documentation files (the "Software"), to deal in -// the Software without restriction, including without limitation the rights to -// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of -// the Software, and to permit persons to whom the Software is furnished to do so, -// subject to the following conditions: - -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. - -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS -// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR -// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER -// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN -// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - -use std::thread; -use std::sync::mpsc::{channel, Sender, Receiver, TryRecvError}; -use std::sync::{Mutex, Once, ONCE_INIT}; -use std::mem; -use std::cell::UnsafeCell; -use std::io; -#[cfg(target_os = "linux")] -use std::os::unix::io::AsRawFd; -#[cfg(target_os = "linux")] -use std::convert::From; -use std::sync::atomic::{ATOMIC_BOOL_INIT, AtomicBool, AtomicUsize, Ordering}; -use std::sync::Arc; -use std::collections::VecDeque; - -use coroutine::{State, Handle, Coroutine, Options}; - -use deque::{BufferPool, Stealer, Worker, Stolen}; - -use mio::{EventLoop, Evented, Handler, Token, ReadHint, Interest, PollOpt}; -use mio::util::Slab; -#[cfg(target_os = "linux")] -use mio::Io; - -static mut THREAD_HANDLES: *const Mutex, Stealer)>> = - 0 as *const Mutex, Stealer)>>; -static THREAD_HANDLES_ONCE: Once = ONCE_INIT; -static SCHEDULER_HAS_STARTED: AtomicBool = ATOMIC_BOOL_INIT; - -fn schedulers() -> &'static Mutex, Stealer)>> { - unsafe { - THREAD_HANDLES_ONCE.call_once(|| { - let handles: Box, Stealer)>>> = - Box::new(Mutex::new(Vec::new())); - - THREAD_HANDLES = mem::transmute(handles); - }); - - & *THREAD_HANDLES - } -} - -thread_local!(static SCHEDULER: UnsafeCell = UnsafeCell::new(Scheduler::new())); - -pub enum SchedMessage { - NewNeighbor(Sender, Stealer), - Shutdown, -} - -const MAX_PRIVATE_WORK_NUM: usize = 10; - -pub struct Scheduler { - workqueue: Worker, - workstealer: Stealer, - - commchannel: Receiver, - - neighbors: Vec<(Sender, Stealer)>, - - eventloop: EventLoop, - handler: SchedulerHandler, - - private_work: VecDeque, -} - -impl Scheduler { - - fn new() -> Scheduler { - let bufpool = BufferPool::new(); - let (worker, stealer) = bufpool.deque(); - - let (tx, rx) = channel(); - - let scheds = schedulers(); - let mut guard = scheds.lock().unwrap(); - - for &(ref rtx, _) in guard.iter() { - let _ = rtx.send(SchedMessage::NewNeighbor(tx.clone(), stealer.clone())); - } - - let neighbors = guard.clone(); - guard.push((tx, stealer.clone())); - - Scheduler { - workqueue: worker, - workstealer: stealer, - - commchannel: rx, - - neighbors: neighbors, - - eventloop: EventLoop::new().unwrap(), - handler: SchedulerHandler::new(), - - private_work: VecDeque::new(), - } - } - - pub fn current() -> &'static mut Scheduler { - SCHEDULER.with(|s| unsafe { - &mut *s.get() - }) - } - - pub fn spawn(f: F) - where F: FnOnce() + Send + 'static { - let coro = Coroutine::spawn(f); - Scheduler::current().ready(coro); - - Coroutine::sched(); - } - - pub fn spawn_opts(f: F, opt: Options) - where F: FnOnce() + Send + 'static { - let coro = Coroutine::spawn_opts(f, opt); - Scheduler::current().ready(coro); - - Coroutine::sched(); - } - - pub fn ready(&mut self, work: Handle) { - if self.private_work.len() >= MAX_PRIVATE_WORK_NUM { - self.workqueue.push(work); - } else { - self.private_work.push_back(work); - } - } - - pub fn run(f: F, threads: usize) - where F: FnOnce() + Send + 'static { - - assert!(threads >= 1, "Threads must >= 1"); - if SCHEDULER_HAS_STARTED.compare_and_swap(false, true, Ordering::SeqCst) != false { - panic!("Schedulers are already running!"); - } - - // Start worker threads first - let counter = Arc::new(AtomicUsize::new(0)); - for tid in 0..threads - 1 { - let counter = counter.clone(); - thread::Builder::new().name(format!("Thread {}", tid)).spawn(move|| { - let current = Scheduler::current(); - counter.fetch_add(1, Ordering::SeqCst); - current.schedule(); - }).unwrap(); - } - - while counter.load(Ordering::SeqCst) != threads - 1 {} - - Scheduler::spawn(|| { - struct Guard; - - // Send Shutdown to all schedulers - impl Drop for Guard { - fn drop(&mut self) { - let guard = match schedulers().lock() { - Ok(g) => g, - Err(poisoned) => poisoned.into_inner() - }; - - for &(ref chan, _) in guard.iter() { - let _ = chan.send(SchedMessage::Shutdown); - } - } - } - - let _guard = Guard; - - f(); - }); - - Scheduler::current().schedule(); - - SCHEDULER_HAS_STARTED.store(false, Ordering::SeqCst); - } - - fn resume_coroutine(&mut self, work: Handle) { - match work.state() { - State::Suspended | State::Blocked => { - debug!("Resuming Coroutine: {:?}", work); - - if let Err(err) = work.resume() { - let msg = match err.downcast_ref::<&'static str>() { - Some(s) => *s, - None => match err.downcast_ref::() { - Some(s) => &s[..], - None => "Box", - } - }; - - error!("Coroutine panicked! {:?}", msg); - } - - match work.state() { - State::Normal | State::Running => { - unreachable!(); - }, - State::Suspended => { - debug!("Coroutine suspended, going to be resumed next round"); - self.ready(work); - }, - State::Blocked => { - debug!("Coroutine blocked, maybe waiting for I/O"); - }, - State::Finished | State::Panicked => { - debug!("Coroutine state: {:?}, will not be resumed automatically", work.state()); - } - } - }, - _ => { - error!("Trying to resume coroutine {:?}, but its state is {:?}", - work, work.state()); - } - } - } - - fn schedule(&mut self) { - loop { - match self.commchannel.try_recv() { - Ok(SchedMessage::NewNeighbor(tx, st)) => { - self.neighbors.push((tx, st)); - }, - Ok(SchedMessage::Shutdown) => { - info!("Shutting down"); - break; - }, - Err(TryRecvError::Empty) => {}, - _ => panic!("Receiving from channel: Unknown message") - } - - if !self.handler.slabs.is_empty() { - self.eventloop.run_once(&mut self.handler).unwrap(); - } - - debug!("Trying to resume all ready coroutines: {:?}", thread::current().name()); - // Run all ready coroutines - let mut need_steal = true; - // while let Some(work) = self.workqueue.pop() { - // while let Stolen::Data(work) = self.workstealer.steal() { - // need_steal = false; - // self.resume_coroutine(work); - // } - - while let Some(work) = self.private_work.pop_front() { - need_steal = false; - self.resume_coroutine(work); - } - - if need_steal { - if let Stolen::Data(work) = self.workstealer.steal() { - need_steal = false; - self.resume_coroutine(work); - } - } - - if !need_steal || !self.handler.slabs.is_empty() { - continue; - } - - debug!("Trying to steal from neighbors: {:?}", thread::current().name()); - - // if self.neighbors.len() > 0 { - // let neighbor_idx = ::rand::random::() % self.neighbors.len(); - // let stolen = { - // let &(_, ref neighbor_stealer) = &self.neighbors[neighbor_idx]; - // neighbor_stealer.steal() - // }; - - // if let Stolen::Data(coro) = stolen { - // self.resume_coroutine(coro); - // continue; - // } - // } - let mut has_stolen = false; - let stolen_works = self.neighbors.iter() - .filter_map(|&(_, ref st)| - if let Stolen::Data(w) = st.steal() { - Some(w) - } else { - None - }) - .collect::>(); - for work in stolen_works.into_iter() { - has_stolen = true; - self.resume_coroutine(work); - } - - if !has_stolen { - thread::sleep_ms(100); - } - } - } - - // fn resume(&mut self, handle: Handle) { - // self.workqueue.push(handle); - // } -} - -const MAX_TOKEN_NUM: usize = 102400; -impl SchedulerHandler { - fn new() -> SchedulerHandler { - SchedulerHandler { - // slabs: Slab::new_starting_at(Token(1), MAX_TOKEN_NUM), - slabs: Slab::new(MAX_TOKEN_NUM), - } - } -} - -#[cfg(any(target_os = "linux", - target_os = "android"))] -impl Scheduler { - pub fn wait_event(&mut self, fd: &E, interest: Interest) -> io::Result<()> { - let token = self.handler.slabs.insert((Coroutine::current(), From::from(fd.as_raw_fd()))).unwrap(); - try!(self.eventloop.register_opt(fd, token, interest, - PollOpt::level()|PollOpt::oneshot())); - - debug!("wait_event: Blocked current Coroutine ...; token={:?}", token); - Coroutine::block(); - debug!("wait_event: Waked up; token={:?}", token); - - Ok(()) - } -} - -#[cfg(any(target_os = "linux", - target_os = "android"))] -struct SchedulerHandler { - slabs: Slab<(Handle, Io)>, -} - -#[cfg(any(target_os = "linux", - target_os = "android"))] -impl Handler for SchedulerHandler { - type Timeout = (); - type Message = (); - - fn writable(&mut self, event_loop: &mut EventLoop, token: Token) { - - debug!("In writable, token {:?}", token); - - match self.slabs.remove(token) { - Some((hdl, fd)) => { - // Linux EPoll needs to explicit EPOLL_CTL_DEL the fd - event_loop.deregister(&fd).unwrap(); - mem::forget(fd); - Scheduler::current().ready(hdl); - }, - None => { - warn!("No coroutine is waiting on writable {:?}", token); - } - } - - } - - fn readable(&mut self, event_loop: &mut EventLoop, token: Token, hint: ReadHint) { - - debug!("In readable, token {:?}, hint {:?}", token, hint); - - match self.slabs.remove(token) { - Some((hdl, fd)) => { - // Linux EPoll needs to explicit EPOLL_CTL_DEL the fd - event_loop.deregister(&fd).unwrap(); - mem::forget(fd); - Scheduler::current().ready(hdl); - }, - None => { - warn!("No coroutine is waiting on readable {:?}", token); - } - } - - } -} - -#[cfg(any(target_os = "macos", - target_os = "freebsd", - target_os = "dragonfly", - target_os = "ios", - target_os = "bitrig", - target_os = "openbsd"))] -impl Scheduler { - pub fn wait_event(&mut self, fd: &E, interest: Interest) -> io::Result<()> { - let token = self.handler.slabs.insert(Coroutine::current()).unwrap(); - try!(self.eventloop.register_opt(fd, token, interest, - PollOpt::level()|PollOpt::oneshot())); - - debug!("wait_event: Blocked current Coroutine ...; token={:?}", token); - Coroutine::block(); - debug!("wait_event: Waked up; token={:?}", token); - - Ok(()) - } -} - -#[cfg(any(target_os = "macos", - target_os = "freebsd", - target_os = "dragonfly", - target_os = "ios", - target_os = "bitrig", - target_os = "openbsd"))] -struct SchedulerHandler { - slabs: Slab, -} - -#[cfg(any(target_os = "macos", - target_os = "freebsd", - target_os = "dragonfly", - target_os = "ios", - target_os = "bitrig", - target_os = "openbsd"))] -impl Handler for SchedulerHandler { - type Timeout = (); - type Message = (); - - fn writable(&mut self, _: &mut EventLoop, token: Token) { - - debug!("In writable, token {:?}", token); - - match self.slabs.remove(token) { - Some(hdl) => { - Scheduler::current().ready(hdl); - }, - None => { - warn!("No coroutine is waiting on writable {:?}", token); - } - } - - } - - fn readable(&mut self, _: &mut EventLoop, token: Token, hint: ReadHint) { - - debug!("In readable, token {:?}, hint {:?}", token, hint); - - match self.slabs.remove(token) { - Some(hdl) => { - Scheduler::current().ready(hdl); - }, - None => { - warn!("No coroutine is waiting on readable {:?}", token); - } - } - - } -} diff --git a/src/sync/mod.rs b/src/sync/mod.rs deleted file mode 100644 index 2667ef8..0000000 --- a/src/sync/mod.rs +++ /dev/null @@ -1,24 +0,0 @@ -// The MIT License (MIT) - -// Copyright (c) 2015 Rustcc Developers - -// Permission is hereby granted, free of charge, to any person obtaining a copy of -// this software and associated documentation files (the "Software"), to deal in -// the Software without restriction, including without limitation the rights to -// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of -// the Software, and to permit persons to whom the Software is furnished to do so, -// subject to the following conditions: - -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. - -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS -// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR -// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER -// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN -// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - -pub mod mpsc; -pub mod spinlock; -pub mod mutex; diff --git a/src/sync/mpsc.rs b/src/sync/mpsc.rs deleted file mode 100644 index 91f5909..0000000 --- a/src/sync/mpsc.rs +++ /dev/null @@ -1,60 +0,0 @@ -// The MIT License (MIT) - -// Copyright (c) 2015 Rustcc Develpers - -// Permission is hereby granted, free of charge, to any person obtaining a copy of -// this software and associated documentation files (the "Software"), to deal in -// the Software without restriction, including without limitation the rights to -// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of -// the Software, and to permit persons to whom the Software is furnished to do so, -// subject to the following conditions: - -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. - -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS -// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR -// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER -// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN -// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - -use std::sync::mpsc; - -use coroutine::Coroutine; - -#[derive(Clone)] -pub struct Sender { - inner: mpsc::Sender, -} - -pub struct SyncSender { - inner: mpsc::SyncSender, -} - -unsafe impl Send for SyncSender {} - -impl !Sync for SyncSender {} - -pub struct Receiver { - inner: mpsc::Receiver, -} - -impl Sender { - fn new(inner: mpsc::Sender) -> Sender { - Sender { - inner: inner, - } - } - - pub fn send(&self, data: T) -> Result<(), mpsc::SendError> { - try!(self.inner.send(data)); - Coroutine::sched(); - } - - pub fn try_send(&self, data: T) -> Result<(), mpsc::TrySendError> { - - } -} - - diff --git a/src/sync/mutex.rs b/src/sync/mutex.rs deleted file mode 100644 index cb34981..0000000 --- a/src/sync/mutex.rs +++ /dev/null @@ -1,147 +0,0 @@ -// The MIT License (MIT) - -// Copyright (c) 2015 Rustcc Developers - -// Permission is hereby granted, free of charge, to any person obtaining a copy of -// this software and associated documentation files (the "Software"), to deal in -// the Software without restriction, including without limitation the rights to -// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of -// the Software, and to permit persons to whom the Software is furnished to do so, -// subject to the following conditions: - -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. - -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS -// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR -// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER -// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN -// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - -use std::collections::VecDeque; -use std::ops::{Deref, DerefMut}; -use std::cell::UnsafeCell; - -use sync::spinlock::SpinLock; -use coroutine::{self, Coroutine, Handle}; - -pub struct Mutex { - lock: SpinLock, - inner: UnsafeCell, -} - -impl Mutex { - pub fn new(inner: T) -> Mutex { - Mutex { - lock: SpinLock::new(), - inner: UnsafeCell::new(inner), - } - } - - pub fn into_inner(self) -> T { - unsafe { - self.inner.into_inner() - } - } - - pub fn lock<'a>(&'a self) -> LockGuard<'a, T> { - if !self.lock.try_lock() { - coroutine::sched(); - } - - LockGuard::new(self, &self.inner) - } - - pub fn try_lock<'a>(&'a self) -> Option> { - if self.lock.try_lock() { - Some(LockGuard::new(self, &self.inner)) - } else { - None - } - } - - fn unlock(&self) { - self.lock.unlock(); - } -} - -unsafe impl Send for Mutex {} - -unsafe impl Sync for Mutex {} - -pub struct LockGuard<'a, T: 'a> { - mutex: &'a Mutex, - data: &'a UnsafeCell, -} - -impl<'a, T: 'a> LockGuard<'a, T> { - fn new(mutex: &'a Mutex, data: &'a UnsafeCell) -> LockGuard<'a, T> { - LockGuard { - mutex: mutex, - data: data, - } - } -} - -impl<'a, T: 'a> Drop for LockGuard<'a, T> { - fn drop(&mut self) { - self.mutex.unlock() - } -} - -impl<'a, T: 'a> Deref for LockGuard<'a, T> { - type Target = T; - - fn deref(&self) -> &T { - unsafe { - &*self.data.get() - } - } -} - -impl<'a, T: 'a> DerefMut for LockGuard<'a, T> { - fn deref_mut(&mut self) -> &mut T { - unsafe { - &mut *self.data.get() - } - } -} - -#[cfg(test)] -mod test { - use std::sync::Arc; - use std::thread; - - use coroutine::{spawn, sched}; - - use super::Mutex; - - #[test] - fn test_mutex_basic() { - let lock = Arc::new(Mutex::new(0)); - - let mut futs = Vec::new(); - - for _ in 0..10 { - println!("??"); - let lock = lock.clone(); - let fut = thread::scoped(move|| { - spawn(move|| { - let mut guard = lock.lock(); - for _ in 0..100_0000 { - *guard += 1; - } - println!("HERE!!"); - }).resume().unwrap(); - }); - futs.push(fut); - } - - for fut in futs.into_iter() { - fut.join(); - } - - assert_eq!(*lock.lock(), 100_0000 * 10); - } -} diff --git a/src/sync/spinlock.rs b/src/sync/spinlock.rs deleted file mode 100644 index 4718c0f..0000000 --- a/src/sync/spinlock.rs +++ /dev/null @@ -1,64 +0,0 @@ -// The MIT License (MIT) - -// Copyright (c) 2015 Rustcc Developers - -// Permission is hereby granted, free of charge, to any person obtaining a copy of -// this software and associated documentation files (the "Software"), to deal in -// the Software without restriction, including without limitation the rights to -// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of -// the Software, and to permit persons to whom the Software is furnished to do so, -// subject to the following conditions: - -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. - -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS -// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR -// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER -// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN -// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - -use std::sync::atomic::{AtomicBool, Ordering}; - -/// Spinlock -pub struct SpinLock { - flag: AtomicBool, -} - -impl SpinLock { - /// Create a new Spinlock - pub fn new() -> SpinLock { - SpinLock { - flag: AtomicBool::new(false), - } - } - - pub fn try_lock(&self) -> bool { - !self.flag.compare_and_swap(false, true, Ordering::Acquire) - } - - pub fn lock(&self) { - while !self.try_lock() {} - } - - pub fn unlock(&self) { - self.flag.store(false, Ordering::Release) - } -} - -#[cfg(test)] -mod test { - use super::SpinLock; - - #[test] - fn test_spinlock_basic() { - let lock = SpinLock::new(); - - assert!(lock.try_lock()); - - assert!(!lock.try_lock()); - - lock.unlock(); - } -}