From 26d4308c6acbbde642be4621f5e3487b24ed8bd1 Mon Sep 17 00:00:00 2001 From: Andrew Paseltiner Date: Fri, 16 Dec 2016 18:10:09 -0500 Subject: [PATCH] Replace invalid use of `&mut` with `UnsafeCell` in `std::sync::mpsc` Closes #36934 --- src/libstd/sync/mpsc/mod.rs | 192 +++++++++------------ src/libstd/sync/mpsc/oneshot.rs | 293 +++++++++++++++++--------------- src/libstd/sync/mpsc/shared.rs | 106 ++++++------ src/libstd/sync/mpsc/stream.rs | 65 +++---- 4 files changed, 328 insertions(+), 328 deletions(-) diff --git a/src/libstd/sync/mpsc/mod.rs b/src/libstd/sync/mpsc/mod.rs index 9f51d3e87f3f7..0c3f31455cc41 100644 --- a/src/libstd/sync/mpsc/mod.rs +++ b/src/libstd/sync/mpsc/mod.rs @@ -348,7 +348,7 @@ impl !Sync for Sender { } /// owned by one thread, but it can be cloned to send to other threads. #[stable(feature = "rust1", since = "1.0.0")] pub struct SyncSender { - inner: Arc>>, + inner: Arc>, } #[stable(feature = "rust1", since = "1.0.0")] @@ -426,10 +426,10 @@ pub enum TrySendError { } enum Flavor { - Oneshot(Arc>>), - Stream(Arc>>), - Shared(Arc>>), - Sync(Arc>>), + Oneshot(Arc>), + Stream(Arc>), + Shared(Arc>), + Sync(Arc>), } #[doc(hidden)] @@ -487,7 +487,7 @@ impl UnsafeFlavor for Receiver { /// ``` #[stable(feature = "rust1", since = "1.0.0")] pub fn channel() -> (Sender, Receiver) { - let a = Arc::new(UnsafeCell::new(oneshot::Packet::new())); + let a = Arc::new(oneshot::Packet::new()); (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a))) } @@ -532,7 +532,7 @@ pub fn channel() -> (Sender, Receiver) { /// ``` #[stable(feature = "rust1", since = "1.0.0")] pub fn sync_channel(bound: usize) -> (SyncSender, Receiver) { - let a = Arc::new(UnsafeCell::new(sync::Packet::new(bound))); + let a = Arc::new(sync::Packet::new(bound)); (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a))) } @@ -578,38 +578,30 @@ impl Sender { pub fn send(&self, t: T) -> Result<(), SendError> { let (new_inner, ret) = match *unsafe { self.inner() } { Flavor::Oneshot(ref p) => { - unsafe { - let p = p.get(); - if !(*p).sent() { - return (*p).send(t).map_err(SendError); - } else { - let a = - Arc::new(UnsafeCell::new(stream::Packet::new())); - let rx = Receiver::new(Flavor::Stream(a.clone())); - match (*p).upgrade(rx) { - oneshot::UpSuccess => { - let ret = (*a.get()).send(t); - (a, ret) - } - oneshot::UpDisconnected => (a, Err(t)), - oneshot::UpWoke(token) => { - // This send cannot panic because the thread is - // asleep (we're looking at it), so the receiver - // can't go away. - (*a.get()).send(t).ok().unwrap(); - token.signal(); - (a, Ok(())) - } + if !p.sent() { + return p.send(t).map_err(SendError); + } else { + let a = Arc::new(stream::Packet::new()); + let rx = Receiver::new(Flavor::Stream(a.clone())); + match p.upgrade(rx) { + oneshot::UpSuccess => { + let ret = a.send(t); + (a, ret) + } + oneshot::UpDisconnected => (a, Err(t)), + oneshot::UpWoke(token) => { + // This send cannot panic because the thread is + // asleep (we're looking at it), so the receiver + // can't go away. + a.send(t).ok().unwrap(); + token.signal(); + (a, Ok(())) } } } } - Flavor::Stream(ref p) => return unsafe { - (*p.get()).send(t).map_err(SendError) - }, - Flavor::Shared(ref p) => return unsafe { - (*p.get()).send(t).map_err(SendError) - }, + Flavor::Stream(ref p) => return p.send(t).map_err(SendError), + Flavor::Shared(ref p) => return p.send(t).map_err(SendError), Flavor::Sync(..) => unreachable!(), }; @@ -624,41 +616,43 @@ impl Sender { #[stable(feature = "rust1", since = "1.0.0")] impl Clone for Sender { fn clone(&self) -> Sender { - let (packet, sleeper, guard) = match *unsafe { self.inner() } { + let packet = match *unsafe { self.inner() } { Flavor::Oneshot(ref p) => { - let a = Arc::new(UnsafeCell::new(shared::Packet::new())); - unsafe { - let guard = (*a.get()).postinit_lock(); + let a = Arc::new(shared::Packet::new()); + { + let guard = a.postinit_lock(); let rx = Receiver::new(Flavor::Shared(a.clone())); - match (*p.get()).upgrade(rx) { + let sleeper = match p.upgrade(rx) { oneshot::UpSuccess | - oneshot::UpDisconnected => (a, None, guard), - oneshot::UpWoke(task) => (a, Some(task), guard) - } + oneshot::UpDisconnected => None, + oneshot::UpWoke(task) => Some(task), + }; + a.inherit_blocker(sleeper, guard); } + a } Flavor::Stream(ref p) => { - let a = Arc::new(UnsafeCell::new(shared::Packet::new())); - unsafe { - let guard = (*a.get()).postinit_lock(); + let a = Arc::new(shared::Packet::new()); + { + let guard = a.postinit_lock(); let rx = Receiver::new(Flavor::Shared(a.clone())); - match (*p.get()).upgrade(rx) { + let sleeper = match p.upgrade(rx) { stream::UpSuccess | - stream::UpDisconnected => (a, None, guard), - stream::UpWoke(task) => (a, Some(task), guard), - } + stream::UpDisconnected => None, + stream::UpWoke(task) => Some(task), + }; + a.inherit_blocker(sleeper, guard); } + a } Flavor::Shared(ref p) => { - unsafe { (*p.get()).clone_chan(); } + p.clone_chan(); return Sender::new(Flavor::Shared(p.clone())); } Flavor::Sync(..) => unreachable!(), }; unsafe { - (*packet.get()).inherit_blocker(sleeper, guard); - let tmp = Sender::new(Flavor::Shared(packet.clone())); mem::swap(self.inner_mut(), tmp.inner_mut()); } @@ -669,10 +663,10 @@ impl Clone for Sender { #[stable(feature = "rust1", since = "1.0.0")] impl Drop for Sender { fn drop(&mut self) { - match *unsafe { self.inner_mut() } { - Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); }, - Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); }, - Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); }, + match *unsafe { self.inner() } { + Flavor::Oneshot(ref p) => p.drop_chan(), + Flavor::Stream(ref p) => p.drop_chan(), + Flavor::Shared(ref p) => p.drop_chan(), Flavor::Sync(..) => unreachable!(), } } @@ -690,7 +684,7 @@ impl fmt::Debug for Sender { //////////////////////////////////////////////////////////////////////////////// impl SyncSender { - fn new(inner: Arc>>) -> SyncSender { + fn new(inner: Arc>) -> SyncSender { SyncSender { inner: inner } } @@ -710,7 +704,7 @@ impl SyncSender { /// information. #[stable(feature = "rust1", since = "1.0.0")] pub fn send(&self, t: T) -> Result<(), SendError> { - unsafe { (*self.inner.get()).send(t).map_err(SendError) } + self.inner.send(t).map_err(SendError) } /// Attempts to send a value on this channel without blocking. @@ -724,14 +718,14 @@ impl SyncSender { /// receiver has received the data or not if this function is successful. #[stable(feature = "rust1", since = "1.0.0")] pub fn try_send(&self, t: T) -> Result<(), TrySendError> { - unsafe { (*self.inner.get()).try_send(t) } + self.inner.try_send(t) } } #[stable(feature = "rust1", since = "1.0.0")] impl Clone for SyncSender { fn clone(&self) -> SyncSender { - unsafe { (*self.inner.get()).clone_chan(); } + self.inner.clone_chan(); SyncSender::new(self.inner.clone()) } } @@ -739,7 +733,7 @@ impl Clone for SyncSender { #[stable(feature = "rust1", since = "1.0.0")] impl Drop for SyncSender { fn drop(&mut self) { - unsafe { (*self.inner.get()).drop_chan(); } + self.inner.drop_chan(); } } @@ -772,7 +766,7 @@ impl Receiver { loop { let new_port = match *unsafe { self.inner() } { Flavor::Oneshot(ref p) => { - match unsafe { (*p.get()).try_recv() } { + match p.try_recv() { Ok(t) => return Ok(t), Err(oneshot::Empty) => return Err(TryRecvError::Empty), Err(oneshot::Disconnected) => { @@ -782,7 +776,7 @@ impl Receiver { } } Flavor::Stream(ref p) => { - match unsafe { (*p.get()).try_recv() } { + match p.try_recv() { Ok(t) => return Ok(t), Err(stream::Empty) => return Err(TryRecvError::Empty), Err(stream::Disconnected) => { @@ -792,7 +786,7 @@ impl Receiver { } } Flavor::Shared(ref p) => { - match unsafe { (*p.get()).try_recv() } { + match p.try_recv() { Ok(t) => return Ok(t), Err(shared::Empty) => return Err(TryRecvError::Empty), Err(shared::Disconnected) => { @@ -801,7 +795,7 @@ impl Receiver { } } Flavor::Sync(ref p) => { - match unsafe { (*p.get()).try_recv() } { + match p.try_recv() { Ok(t) => return Ok(t), Err(sync::Empty) => return Err(TryRecvError::Empty), Err(sync::Disconnected) => { @@ -875,7 +869,7 @@ impl Receiver { loop { let new_port = match *unsafe { self.inner() } { Flavor::Oneshot(ref p) => { - match unsafe { (*p.get()).recv(None) } { + match p.recv(None) { Ok(t) => return Ok(t), Err(oneshot::Disconnected) => return Err(RecvError), Err(oneshot::Upgraded(rx)) => rx, @@ -883,7 +877,7 @@ impl Receiver { } } Flavor::Stream(ref p) => { - match unsafe { (*p.get()).recv(None) } { + match p.recv(None) { Ok(t) => return Ok(t), Err(stream::Disconnected) => return Err(RecvError), Err(stream::Upgraded(rx)) => rx, @@ -891,15 +885,13 @@ impl Receiver { } } Flavor::Shared(ref p) => { - match unsafe { (*p.get()).recv(None) } { + match p.recv(None) { Ok(t) => return Ok(t), Err(shared::Disconnected) => return Err(RecvError), Err(shared::Empty) => unreachable!(), } } - Flavor::Sync(ref p) => return unsafe { - (*p.get()).recv(None).map_err(|_| RecvError) - } + Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError), }; unsafe { mem::swap(self.inner_mut(), new_port.inner_mut()); @@ -952,7 +944,7 @@ impl Receiver { loop { let port_or_empty = match *unsafe { self.inner() } { Flavor::Oneshot(ref p) => { - match unsafe { (*p.get()).recv(Some(deadline)) } { + match p.recv(Some(deadline)) { Ok(t) => return Ok(t), Err(oneshot::Disconnected) => return Err(Disconnected), Err(oneshot::Upgraded(rx)) => Some(rx), @@ -960,7 +952,7 @@ impl Receiver { } } Flavor::Stream(ref p) => { - match unsafe { (*p.get()).recv(Some(deadline)) } { + match p.recv(Some(deadline)) { Ok(t) => return Ok(t), Err(stream::Disconnected) => return Err(Disconnected), Err(stream::Upgraded(rx)) => Some(rx), @@ -968,14 +960,14 @@ impl Receiver { } } Flavor::Shared(ref p) => { - match unsafe { (*p.get()).recv(Some(deadline)) } { + match p.recv(Some(deadline)) { Ok(t) => return Ok(t), Err(shared::Disconnected) => return Err(Disconnected), Err(shared::Empty) => None, } } Flavor::Sync(ref p) => { - match unsafe { (*p.get()).recv(Some(deadline)) } { + match p.recv(Some(deadline)) { Ok(t) => return Ok(t), Err(sync::Disconnected) => return Err(Disconnected), Err(sync::Empty) => None, @@ -1020,23 +1012,19 @@ impl select::Packet for Receiver { loop { let new_port = match *unsafe { self.inner() } { Flavor::Oneshot(ref p) => { - match unsafe { (*p.get()).can_recv() } { + match p.can_recv() { Ok(ret) => return ret, Err(upgrade) => upgrade, } } Flavor::Stream(ref p) => { - match unsafe { (*p.get()).can_recv() } { + match p.can_recv() { Ok(ret) => return ret, Err(upgrade) => upgrade, } } - Flavor::Shared(ref p) => { - return unsafe { (*p.get()).can_recv() }; - } - Flavor::Sync(ref p) => { - return unsafe { (*p.get()).can_recv() }; - } + Flavor::Shared(ref p) => return p.can_recv(), + Flavor::Sync(ref p) => return p.can_recv(), }; unsafe { mem::swap(self.inner_mut(), @@ -1049,25 +1037,21 @@ impl select::Packet for Receiver { loop { let (t, new_port) = match *unsafe { self.inner() } { Flavor::Oneshot(ref p) => { - match unsafe { (*p.get()).start_selection(token) } { + match p.start_selection(token) { oneshot::SelSuccess => return Installed, oneshot::SelCanceled => return Abort, oneshot::SelUpgraded(t, rx) => (t, rx), } } Flavor::Stream(ref p) => { - match unsafe { (*p.get()).start_selection(token) } { + match p.start_selection(token) { stream::SelSuccess => return Installed, stream::SelCanceled => return Abort, stream::SelUpgraded(t, rx) => (t, rx), } } - Flavor::Shared(ref p) => { - return unsafe { (*p.get()).start_selection(token) }; - } - Flavor::Sync(ref p) => { - return unsafe { (*p.get()).start_selection(token) }; - } + Flavor::Shared(ref p) => return p.start_selection(token), + Flavor::Sync(ref p) => return p.start_selection(token), }; token = t; unsafe { @@ -1080,16 +1064,10 @@ impl select::Packet for Receiver { let mut was_upgrade = false; loop { let result = match *unsafe { self.inner() } { - Flavor::Oneshot(ref p) => unsafe { (*p.get()).abort_selection() }, - Flavor::Stream(ref p) => unsafe { - (*p.get()).abort_selection(was_upgrade) - }, - Flavor::Shared(ref p) => return unsafe { - (*p.get()).abort_selection(was_upgrade) - }, - Flavor::Sync(ref p) => return unsafe { - (*p.get()).abort_selection() - }, + Flavor::Oneshot(ref p) => p.abort_selection(), + Flavor::Stream(ref p) => p.abort_selection(was_upgrade), + Flavor::Shared(ref p) => return p.abort_selection(was_upgrade), + Flavor::Sync(ref p) => return p.abort_selection(), }; let new_port = match result { Ok(b) => return b, Err(p) => p }; was_upgrade = true; @@ -1142,11 +1120,11 @@ impl IntoIterator for Receiver { #[stable(feature = "rust1", since = "1.0.0")] impl Drop for Receiver { fn drop(&mut self) { - match *unsafe { self.inner_mut() } { - Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); }, - Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_port(); }, - Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_port(); }, - Flavor::Sync(ref mut p) => unsafe { (*p.get()).drop_port(); }, + match *unsafe { self.inner() } { + Flavor::Oneshot(ref p) => p.drop_port(), + Flavor::Stream(ref p) => p.drop_port(), + Flavor::Shared(ref p) => p.drop_port(), + Flavor::Sync(ref p) => p.drop_port(), } } } diff --git a/src/libstd/sync/mpsc/oneshot.rs b/src/libstd/sync/mpsc/oneshot.rs index 767e9f96ac8e4..b8e50c9297b64 100644 --- a/src/libstd/sync/mpsc/oneshot.rs +++ b/src/libstd/sync/mpsc/oneshot.rs @@ -39,7 +39,8 @@ use self::MyUpgrade::*; use sync::mpsc::Receiver; use sync::mpsc::blocking::{self, SignalToken}; -use core::mem; +use cell::UnsafeCell; +use ptr; use sync::atomic::{AtomicUsize, Ordering}; use time::Instant; @@ -57,10 +58,10 @@ pub struct Packet { // Internal state of the chan/port pair (stores the blocked thread as well) state: AtomicUsize, // One-shot data slot location - data: Option, + data: UnsafeCell>, // when used for the second time, a oneshot channel must be upgraded, and // this contains the slot for the upgrade - upgrade: MyUpgrade, + upgrade: UnsafeCell>, } pub enum Failure { @@ -90,42 +91,44 @@ enum MyUpgrade { impl Packet { pub fn new() -> Packet { Packet { - data: None, - upgrade: NothingSent, + data: UnsafeCell::new(None), + upgrade: UnsafeCell::new(NothingSent), state: AtomicUsize::new(EMPTY), } } - pub fn send(&mut self, t: T) -> Result<(), T> { - // Sanity check - match self.upgrade { - NothingSent => {} - _ => panic!("sending on a oneshot that's already sent on "), - } - assert!(self.data.is_none()); - self.data = Some(t); - self.upgrade = SendUsed; - - match self.state.swap(DATA, Ordering::SeqCst) { - // Sent the data, no one was waiting - EMPTY => Ok(()), - - // Couldn't send the data, the port hung up first. Return the data - // back up the stack. - DISCONNECTED => { - self.state.swap(DISCONNECTED, Ordering::SeqCst); - self.upgrade = NothingSent; - Err(self.data.take().unwrap()) + pub fn send(&self, t: T) -> Result<(), T> { + unsafe { + // Sanity check + match *self.upgrade.get() { + NothingSent => {} + _ => panic!("sending on a oneshot that's already sent on "), } + assert!((*self.data.get()).is_none()); + ptr::write(self.data.get(), Some(t)); + ptr::write(self.upgrade.get(), SendUsed); + + match self.state.swap(DATA, Ordering::SeqCst) { + // Sent the data, no one was waiting + EMPTY => Ok(()), + + // Couldn't send the data, the port hung up first. Return the data + // back up the stack. + DISCONNECTED => { + self.state.swap(DISCONNECTED, Ordering::SeqCst); + ptr::write(self.upgrade.get(), NothingSent); + Err((&mut *self.data.get()).take().unwrap()) + } - // Not possible, these are one-use channels - DATA => unreachable!(), + // Not possible, these are one-use channels + DATA => unreachable!(), - // There is a thread waiting on the other end. We leave the 'DATA' - // state inside so it'll pick it up on the other end. - ptr => unsafe { - SignalToken::cast_from_usize(ptr).signal(); - Ok(()) + // There is a thread waiting on the other end. We leave the 'DATA' + // state inside so it'll pick it up on the other end. + ptr => { + SignalToken::cast_from_usize(ptr).signal(); + Ok(()) + } } } } @@ -133,13 +136,15 @@ impl Packet { // Just tests whether this channel has been sent on or not, this is only // safe to use from the sender. pub fn sent(&self) -> bool { - match self.upgrade { - NothingSent => false, - _ => true, + unsafe { + match *self.upgrade.get() { + NothingSent => false, + _ => true, + } } } - pub fn recv(&mut self, deadline: Option) -> Result> { + pub fn recv(&self, deadline: Option) -> Result> { // Attempt to not block the thread (it's a little expensive). If it looks // like we're not empty, then immediately go through to `try_recv`. if self.state.load(Ordering::SeqCst) == EMPTY { @@ -167,73 +172,77 @@ impl Packet { self.try_recv() } - pub fn try_recv(&mut self) -> Result> { - match self.state.load(Ordering::SeqCst) { - EMPTY => Err(Empty), - - // We saw some data on the channel, but the channel can be used - // again to send us an upgrade. As a result, we need to re-insert - // into the channel that there's no data available (otherwise we'll - // just see DATA next time). This is done as a cmpxchg because if - // the state changes under our feet we'd rather just see that state - // change. - DATA => { - self.state.compare_and_swap(DATA, EMPTY, Ordering::SeqCst); - match self.data.take() { - Some(data) => Ok(data), - None => unreachable!(), + pub fn try_recv(&self) -> Result> { + unsafe { + match self.state.load(Ordering::SeqCst) { + EMPTY => Err(Empty), + + // We saw some data on the channel, but the channel can be used + // again to send us an upgrade. As a result, we need to re-insert + // into the channel that there's no data available (otherwise we'll + // just see DATA next time). This is done as a cmpxchg because if + // the state changes under our feet we'd rather just see that state + // change. + DATA => { + self.state.compare_and_swap(DATA, EMPTY, Ordering::SeqCst); + match (&mut *self.data.get()).take() { + Some(data) => Ok(data), + None => unreachable!(), + } } - } - // There's no guarantee that we receive before an upgrade happens, - // and an upgrade flags the channel as disconnected, so when we see - // this we first need to check if there's data available and *then* - // we go through and process the upgrade. - DISCONNECTED => { - match self.data.take() { - Some(data) => Ok(data), - None => { - match mem::replace(&mut self.upgrade, SendUsed) { - SendUsed | NothingSent => Err(Disconnected), - GoUp(upgrade) => Err(Upgraded(upgrade)) + // There's no guarantee that we receive before an upgrade happens, + // and an upgrade flags the channel as disconnected, so when we see + // this we first need to check if there's data available and *then* + // we go through and process the upgrade. + DISCONNECTED => { + match (&mut *self.data.get()).take() { + Some(data) => Ok(data), + None => { + match ptr::replace(self.upgrade.get(), SendUsed) { + SendUsed | NothingSent => Err(Disconnected), + GoUp(upgrade) => Err(Upgraded(upgrade)) + } } } } - } - // We are the sole receiver; there cannot be a blocking - // receiver already. - _ => unreachable!() + // We are the sole receiver; there cannot be a blocking + // receiver already. + _ => unreachable!() + } } } // Returns whether the upgrade was completed. If the upgrade wasn't // completed, then the port couldn't get sent to the other half (it will // never receive it). - pub fn upgrade(&mut self, up: Receiver) -> UpgradeResult { - let prev = match self.upgrade { - NothingSent => NothingSent, - SendUsed => SendUsed, - _ => panic!("upgrading again"), - }; - self.upgrade = GoUp(up); - - match self.state.swap(DISCONNECTED, Ordering::SeqCst) { - // If the channel is empty or has data on it, then we're good to go. - // Senders will check the data before the upgrade (in case we - // plastered over the DATA state). - DATA | EMPTY => UpSuccess, - - // If the other end is already disconnected, then we failed the - // upgrade. Be sure to trash the port we were given. - DISCONNECTED => { self.upgrade = prev; UpDisconnected } - - // If someone's waiting, we gotta wake them up - ptr => UpWoke(unsafe { SignalToken::cast_from_usize(ptr) }) + pub fn upgrade(&self, up: Receiver) -> UpgradeResult { + unsafe { + let prev = match *self.upgrade.get() { + NothingSent => NothingSent, + SendUsed => SendUsed, + _ => panic!("upgrading again"), + }; + ptr::write(self.upgrade.get(), GoUp(up)); + + match self.state.swap(DISCONNECTED, Ordering::SeqCst) { + // If the channel is empty or has data on it, then we're good to go. + // Senders will check the data before the upgrade (in case we + // plastered over the DATA state). + DATA | EMPTY => UpSuccess, + + // If the other end is already disconnected, then we failed the + // upgrade. Be sure to trash the port we were given. + DISCONNECTED => { ptr::replace(self.upgrade.get(), prev); UpDisconnected } + + // If someone's waiting, we gotta wake them up + ptr => UpWoke(SignalToken::cast_from_usize(ptr)) + } } } - pub fn drop_chan(&mut self) { + pub fn drop_chan(&self) { match self.state.swap(DISCONNECTED, Ordering::SeqCst) { DATA | DISCONNECTED | EMPTY => {} @@ -244,7 +253,7 @@ impl Packet { } } - pub fn drop_port(&mut self) { + pub fn drop_port(&self) { match self.state.swap(DISCONNECTED, Ordering::SeqCst) { // An empty channel has nothing to do, and a remotely disconnected // channel also has nothing to do b/c we're about to run the drop @@ -254,7 +263,7 @@ impl Packet { // There's data on the channel, so make sure we destroy it promptly. // This is why not using an arc is a little difficult (need the box // to stay valid while we take the data). - DATA => { self.data.take().unwrap(); } + DATA => unsafe { (&mut *self.data.get()).take().unwrap(); }, // We're the only ones that can block on this port _ => unreachable!() @@ -267,62 +276,66 @@ impl Packet { // If Ok, the value is whether this port has data, if Err, then the upgraded // port needs to be checked instead of this one. - pub fn can_recv(&mut self) -> Result> { - match self.state.load(Ordering::SeqCst) { - EMPTY => Ok(false), // Welp, we tried - DATA => Ok(true), // we have some un-acquired data - DISCONNECTED if self.data.is_some() => Ok(true), // we have data - DISCONNECTED => { - match mem::replace(&mut self.upgrade, SendUsed) { - // The other end sent us an upgrade, so we need to - // propagate upwards whether the upgrade can receive - // data - GoUp(upgrade) => Err(upgrade), - - // If the other end disconnected without sending an - // upgrade, then we have data to receive (the channel is - // disconnected). - up => { self.upgrade = up; Ok(true) } + pub fn can_recv(&self) -> Result> { + unsafe { + match self.state.load(Ordering::SeqCst) { + EMPTY => Ok(false), // Welp, we tried + DATA => Ok(true), // we have some un-acquired data + DISCONNECTED if (*self.data.get()).is_some() => Ok(true), // we have data + DISCONNECTED => { + match ptr::replace(self.upgrade.get(), SendUsed) { + // The other end sent us an upgrade, so we need to + // propagate upwards whether the upgrade can receive + // data + GoUp(upgrade) => Err(upgrade), + + // If the other end disconnected without sending an + // upgrade, then we have data to receive (the channel is + // disconnected). + up => { ptr::write(self.upgrade.get(), up); Ok(true) } + } } + _ => unreachable!(), // we're the "one blocker" } - _ => unreachable!(), // we're the "one blocker" } } // Attempts to start selection on this port. This can either succeed, fail // because there is data, or fail because there is an upgrade pending. - pub fn start_selection(&mut self, token: SignalToken) -> SelectionResult { - let ptr = unsafe { token.cast_to_usize() }; - match self.state.compare_and_swap(EMPTY, ptr, Ordering::SeqCst) { - EMPTY => SelSuccess, - DATA => { - drop(unsafe { SignalToken::cast_from_usize(ptr) }); - SelCanceled - } - DISCONNECTED if self.data.is_some() => { - drop(unsafe { SignalToken::cast_from_usize(ptr) }); - SelCanceled - } - DISCONNECTED => { - match mem::replace(&mut self.upgrade, SendUsed) { - // The other end sent us an upgrade, so we need to - // propagate upwards whether the upgrade can receive - // data - GoUp(upgrade) => { - SelUpgraded(unsafe { SignalToken::cast_from_usize(ptr) }, upgrade) - } + pub fn start_selection(&self, token: SignalToken) -> SelectionResult { + unsafe { + let ptr = token.cast_to_usize(); + match self.state.compare_and_swap(EMPTY, ptr, Ordering::SeqCst) { + EMPTY => SelSuccess, + DATA => { + drop(SignalToken::cast_from_usize(ptr)); + SelCanceled + } + DISCONNECTED if (*self.data.get()).is_some() => { + drop(SignalToken::cast_from_usize(ptr)); + SelCanceled + } + DISCONNECTED => { + match ptr::replace(self.upgrade.get(), SendUsed) { + // The other end sent us an upgrade, so we need to + // propagate upwards whether the upgrade can receive + // data + GoUp(upgrade) => { + SelUpgraded(SignalToken::cast_from_usize(ptr), upgrade) + } - // If the other end disconnected without sending an - // upgrade, then we have data to receive (the channel is - // disconnected). - up => { - self.upgrade = up; - drop(unsafe { SignalToken::cast_from_usize(ptr) }); - SelCanceled + // If the other end disconnected without sending an + // upgrade, then we have data to receive (the channel is + // disconnected). + up => { + ptr::write(self.upgrade.get(), up); + drop(SignalToken::cast_from_usize(ptr)); + SelCanceled + } } } + _ => unreachable!(), // we're the "one blocker" } - _ => unreachable!(), // we're the "one blocker" } } @@ -330,7 +343,7 @@ impl Packet { // blocked thread will no longer be visible to any other threads. // // The return value indicates whether there's data on this port. - pub fn abort_selection(&mut self) -> Result> { + pub fn abort_selection(&self) -> Result> { let state = match self.state.load(Ordering::SeqCst) { // Each of these states means that no further activity will happen // with regard to abortion selection @@ -356,16 +369,16 @@ impl Packet { // // We then need to check to see if there was an upgrade requested, // and if so, the upgraded port needs to have its selection aborted. - DISCONNECTED => { - if self.data.is_some() { + DISCONNECTED => unsafe { + if (*self.data.get()).is_some() { Ok(true) } else { - match mem::replace(&mut self.upgrade, SendUsed) { + match ptr::replace(self.upgrade.get(), SendUsed) { GoUp(port) => Err(port), _ => Ok(true), } } - } + }, // We woke ourselves up from select. ptr => unsafe { diff --git a/src/libstd/sync/mpsc/shared.rs b/src/libstd/sync/mpsc/shared.rs index 2a9618251ff52..f9e0290416432 100644 --- a/src/libstd/sync/mpsc/shared.rs +++ b/src/libstd/sync/mpsc/shared.rs @@ -24,6 +24,8 @@ use core::cmp; use core::intrinsics::abort; use core::isize; +use cell::UnsafeCell; +use ptr; use sync::atomic::{AtomicUsize, AtomicIsize, AtomicBool, Ordering}; use sync::mpsc::blocking::{self, SignalToken}; use sync::mpsc::mpsc_queue as mpsc; @@ -44,7 +46,7 @@ const MAX_STEALS: isize = 1 << 20; pub struct Packet { queue: mpsc::Queue, cnt: AtomicIsize, // How many items are on this channel - steals: isize, // How many times has a port received without blocking? + steals: UnsafeCell, // How many times has a port received without blocking? to_wake: AtomicUsize, // SignalToken for wake up // The number of channels which are currently using this packet. @@ -72,7 +74,7 @@ impl Packet { Packet { queue: mpsc::Queue::new(), cnt: AtomicIsize::new(0), - steals: 0, + steals: UnsafeCell::new(0), to_wake: AtomicUsize::new(0), channels: AtomicUsize::new(2), port_dropped: AtomicBool::new(false), @@ -95,7 +97,7 @@ impl Packet { // threads in select(). // // This can only be called at channel-creation time - pub fn inherit_blocker(&mut self, + pub fn inherit_blocker(&self, token: Option, guard: MutexGuard<()>) { token.map(|token| { @@ -122,7 +124,7 @@ impl Packet { // To offset this bad increment, we initially set the steal count to // -1. You'll find some special code in abort_selection() as well to // ensure that this -1 steal count doesn't escape too far. - self.steals = -1; + unsafe { *self.steals.get() = -1; } }); // When the shared packet is constructed, we grabbed this lock. The @@ -133,7 +135,7 @@ impl Packet { drop(guard); } - pub fn send(&mut self, t: T) -> Result<(), T> { + pub fn send(&self, t: T) -> Result<(), T> { // See Port::drop for what's going on if self.port_dropped.load(Ordering::SeqCst) { return Err(t) } @@ -218,7 +220,7 @@ impl Packet { Ok(()) } - pub fn recv(&mut self, deadline: Option) -> Result { + pub fn recv(&self, deadline: Option) -> Result { // This code is essentially the exact same as that found in the stream // case (see stream.rs) match self.try_recv() { @@ -239,37 +241,38 @@ impl Packet { } match self.try_recv() { - data @ Ok(..) => { self.steals -= 1; data } + data @ Ok(..) => unsafe { *self.steals.get() -= 1; data }, data => data, } } // Essentially the exact same thing as the stream decrement function. // Returns true if blocking should proceed. - fn decrement(&mut self, token: SignalToken) -> StartResult { - assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); - let ptr = unsafe { token.cast_to_usize() }; - self.to_wake.store(ptr, Ordering::SeqCst); - - let steals = self.steals; - self.steals = 0; - - match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) { - DISCONNECTED => { self.cnt.store(DISCONNECTED, Ordering::SeqCst); } - // If we factor in our steals and notice that the channel has no - // data, we successfully sleep - n => { - assert!(n >= 0); - if n - steals <= 0 { return Installed } + fn decrement(&self, token: SignalToken) -> StartResult { + unsafe { + assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); + let ptr = token.cast_to_usize(); + self.to_wake.store(ptr, Ordering::SeqCst); + + let steals = ptr::replace(self.steals.get(), 0); + + match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) { + DISCONNECTED => { self.cnt.store(DISCONNECTED, Ordering::SeqCst); } + // If we factor in our steals and notice that the channel has no + // data, we successfully sleep + n => { + assert!(n >= 0); + if n - steals <= 0 { return Installed } + } } - } - self.to_wake.store(0, Ordering::SeqCst); - drop(unsafe { SignalToken::cast_from_usize(ptr) }); - Abort + self.to_wake.store(0, Ordering::SeqCst); + drop(SignalToken::cast_from_usize(ptr)); + Abort + } } - pub fn try_recv(&mut self) -> Result { + pub fn try_recv(&self) -> Result { let ret = match self.queue.pop() { mpsc::Data(t) => Some(t), mpsc::Empty => None, @@ -303,23 +306,23 @@ impl Packet { match ret { // See the discussion in the stream implementation for why we // might decrement steals. - Some(data) => { - if self.steals > MAX_STEALS { + Some(data) => unsafe { + if *self.steals.get() > MAX_STEALS { match self.cnt.swap(0, Ordering::SeqCst) { DISCONNECTED => { self.cnt.store(DISCONNECTED, Ordering::SeqCst); } n => { - let m = cmp::min(n, self.steals); - self.steals -= m; + let m = cmp::min(n, *self.steals.get()); + *self.steals.get() -= m; self.bump(n - m); } } - assert!(self.steals >= 0); + assert!(*self.steals.get() >= 0); } - self.steals += 1; + *self.steals.get() += 1; Ok(data) - } + }, // See the discussion in the stream implementation for why we try // again. @@ -341,7 +344,7 @@ impl Packet { // Prepares this shared packet for a channel clone, essentially just bumping // a refcount. - pub fn clone_chan(&mut self) { + pub fn clone_chan(&self) { let old_count = self.channels.fetch_add(1, Ordering::SeqCst); // See comments on Arc::clone() on why we do this (for `mem::forget`). @@ -355,7 +358,7 @@ impl Packet { // Decrement the reference count on a channel. This is called whenever a // Chan is dropped and may end up waking up a receiver. It's the receiver's // responsibility on the other end to figure out that we've disconnected. - pub fn drop_chan(&mut self) { + pub fn drop_chan(&self) { match self.channels.fetch_sub(1, Ordering::SeqCst) { 1 => {} n if n > 1 => return, @@ -371,9 +374,9 @@ impl Packet { // See the long discussion inside of stream.rs for why the queue is drained, // and why it is done in this fashion. - pub fn drop_port(&mut self) { + pub fn drop_port(&self) { self.port_dropped.store(true, Ordering::SeqCst); - let mut steals = self.steals; + let mut steals = unsafe { *self.steals.get() }; while { let cnt = self.cnt.compare_and_swap(steals, DISCONNECTED, Ordering::SeqCst); cnt != DISCONNECTED && cnt != steals @@ -390,7 +393,7 @@ impl Packet { } // Consumes ownership of the 'to_wake' field. - fn take_to_wake(&mut self) -> SignalToken { + fn take_to_wake(&self) -> SignalToken { let ptr = self.to_wake.load(Ordering::SeqCst); self.to_wake.store(0, Ordering::SeqCst); assert!(ptr != 0); @@ -406,13 +409,13 @@ impl Packet { // // This is different than the stream version because there's no need to peek // at the queue, we can just look at the local count. - pub fn can_recv(&mut self) -> bool { + pub fn can_recv(&self) -> bool { let cnt = self.cnt.load(Ordering::SeqCst); - cnt == DISCONNECTED || cnt - self.steals > 0 + cnt == DISCONNECTED || cnt - unsafe { *self.steals.get() } > 0 } // increment the count on the channel (used for selection) - fn bump(&mut self, amt: isize) -> isize { + fn bump(&self, amt: isize) -> isize { match self.cnt.fetch_add(amt, Ordering::SeqCst) { DISCONNECTED => { self.cnt.store(DISCONNECTED, Ordering::SeqCst); @@ -427,7 +430,7 @@ impl Packet { // // The code here is the same as in stream.rs, except that it doesn't need to // peek at the channel to see if an upgrade is pending. - pub fn start_selection(&mut self, token: SignalToken) -> StartResult { + pub fn start_selection(&self, token: SignalToken) -> StartResult { match self.decrement(token) { Installed => Installed, Abort => { @@ -443,7 +446,7 @@ impl Packet { // // This is similar to the stream implementation (hence fewer comments), but // uses a different value for the "steals" variable. - pub fn abort_selection(&mut self, _was_upgrade: bool) -> bool { + pub fn abort_selection(&self, _was_upgrade: bool) -> bool { // Before we do anything else, we bounce on this lock. The reason for // doing this is to ensure that any upgrade-in-progress is gone and // done with. Without this bounce, we can race with inherit_blocker @@ -477,12 +480,15 @@ impl Packet { thread::yield_now(); } } - // if the number of steals is -1, it was the pre-emptive -1 steal - // count from when we inherited a blocker. This is fine because - // we're just going to overwrite it with a real value. - assert!(self.steals == 0 || self.steals == -1); - self.steals = steals; - prev >= 0 + unsafe { + // if the number of steals is -1, it was the pre-emptive -1 steal + // count from when we inherited a blocker. This is fine because + // we're just going to overwrite it with a real value. + let old = self.steals.get(); + assert!(*old == 0 || *old == -1); + *old = steals; + prev >= 0 + } } } } diff --git a/src/libstd/sync/mpsc/stream.rs b/src/libstd/sync/mpsc/stream.rs index 61c8316467d9a..47cd8977fda23 100644 --- a/src/libstd/sync/mpsc/stream.rs +++ b/src/libstd/sync/mpsc/stream.rs @@ -22,8 +22,10 @@ pub use self::UpgradeResult::*; pub use self::SelectionResult::*; use self::Message::*; +use cell::UnsafeCell; use core::cmp; use core::isize; +use ptr; use thread; use time::Instant; @@ -42,7 +44,7 @@ pub struct Packet { queue: spsc::Queue>, // internal queue for all message cnt: AtomicIsize, // How many items are on this channel - steals: isize, // How many times has a port received without blocking? + steals: UnsafeCell, // How many times has a port received without blocking? to_wake: AtomicUsize, // SignalToken for the blocked thread to wake up port_dropped: AtomicBool, // flag if the channel has been destroyed. @@ -79,14 +81,14 @@ impl Packet { queue: unsafe { spsc::Queue::new(128) }, cnt: AtomicIsize::new(0), - steals: 0, + steals: UnsafeCell::new(0), to_wake: AtomicUsize::new(0), port_dropped: AtomicBool::new(false), } } - pub fn send(&mut self, t: T) -> Result<(), T> { + pub fn send(&self, t: T) -> Result<(), T> { // If the other port has deterministically gone away, then definitely // must return the data back up the stack. Otherwise, the data is // considered as being sent. @@ -99,7 +101,7 @@ impl Packet { Ok(()) } - pub fn upgrade(&mut self, up: Receiver) -> UpgradeResult { + pub fn upgrade(&self, up: Receiver) -> UpgradeResult { // If the port has gone away, then there's no need to proceed any // further. if self.port_dropped.load(Ordering::SeqCst) { return UpDisconnected } @@ -107,7 +109,7 @@ impl Packet { self.do_send(GoUp(up)) } - fn do_send(&mut self, t: Message) -> UpgradeResult { + fn do_send(&self, t: Message) -> UpgradeResult { self.queue.push(t); match self.cnt.fetch_add(1, Ordering::SeqCst) { // As described in the mod's doc comment, -1 == wakeup @@ -141,7 +143,7 @@ impl Packet { } // Consumes ownership of the 'to_wake' field. - fn take_to_wake(&mut self) -> SignalToken { + fn take_to_wake(&self) -> SignalToken { let ptr = self.to_wake.load(Ordering::SeqCst); self.to_wake.store(0, Ordering::SeqCst); assert!(ptr != 0); @@ -151,13 +153,12 @@ impl Packet { // Decrements the count on the channel for a sleeper, returning the sleeper // back if it shouldn't sleep. Note that this is the location where we take // steals into account. - fn decrement(&mut self, token: SignalToken) -> Result<(), SignalToken> { + fn decrement(&self, token: SignalToken) -> Result<(), SignalToken> { assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); let ptr = unsafe { token.cast_to_usize() }; self.to_wake.store(ptr, Ordering::SeqCst); - let steals = self.steals; - self.steals = 0; + let steals = unsafe { ptr::replace(self.steals.get(), 0) }; match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) { DISCONNECTED => { self.cnt.store(DISCONNECTED, Ordering::SeqCst); } @@ -173,7 +174,7 @@ impl Packet { Err(unsafe { SignalToken::cast_from_usize(ptr) }) } - pub fn recv(&mut self, deadline: Option) -> Result> { + pub fn recv(&self, deadline: Option) -> Result> { // Optimistic preflight check (scheduling is expensive). match self.try_recv() { Err(Empty) => {} @@ -199,16 +200,16 @@ impl Packet { // a steal, so offset the decrement here (we already have our // "steal" factored into the channel count above). data @ Ok(..) | - data @ Err(Upgraded(..)) => { - self.steals -= 1; + data @ Err(Upgraded(..)) => unsafe { + *self.steals.get() -= 1; data - } + }, data => data, } } - pub fn try_recv(&mut self) -> Result> { + pub fn try_recv(&self) -> Result> { match self.queue.pop() { // If we stole some data, record to that effect (this will be // factored into cnt later on). @@ -221,26 +222,26 @@ impl Packet { // a pretty slow operation, of swapping 0 into cnt, taking steals // down as much as possible (without going negative), and then // adding back in whatever we couldn't factor into steals. - Some(data) => { - if self.steals > MAX_STEALS { + Some(data) => unsafe { + if *self.steals.get() > MAX_STEALS { match self.cnt.swap(0, Ordering::SeqCst) { DISCONNECTED => { self.cnt.store(DISCONNECTED, Ordering::SeqCst); } n => { - let m = cmp::min(n, self.steals); - self.steals -= m; + let m = cmp::min(n, *self.steals.get()); + *self.steals.get() -= m; self.bump(n - m); } } - assert!(self.steals >= 0); + assert!(*self.steals.get() >= 0); } - self.steals += 1; + *self.steals.get() += 1; match data { Data(t) => Ok(t), GoUp(up) => Err(Upgraded(up)), } - } + }, None => { match self.cnt.load(Ordering::SeqCst) { @@ -269,7 +270,7 @@ impl Packet { } } - pub fn drop_chan(&mut self) { + pub fn drop_chan(&self) { // Dropping a channel is pretty simple, we just flag it as disconnected // and then wakeup a blocker if there is one. match self.cnt.swap(DISCONNECTED, Ordering::SeqCst) { @@ -279,7 +280,7 @@ impl Packet { } } - pub fn drop_port(&mut self) { + pub fn drop_port(&self) { // Dropping a port seems like a fairly trivial thing. In theory all we // need to do is flag that we're disconnected and then everything else // can take over (we don't have anyone to wake up). @@ -309,7 +310,7 @@ impl Packet { // continue to fail while active senders send data while we're dropping // data, but eventually we're guaranteed to break out of this loop // (because there is a bounded number of senders). - let mut steals = self.steals; + let mut steals = unsafe { *self.steals.get() }; while { let cnt = self.cnt.compare_and_swap( steals, DISCONNECTED, Ordering::SeqCst); @@ -332,7 +333,7 @@ impl Packet { // Tests to see whether this port can receive without blocking. If Ok is // returned, then that's the answer. If Err is returned, then the returned // port needs to be queried instead (an upgrade happened) - pub fn can_recv(&mut self) -> Result> { + pub fn can_recv(&self) -> Result> { // We peek at the queue to see if there's anything on it, and we use // this return value to determine if we should pop from the queue and // upgrade this channel immediately. If it looks like we've got an @@ -351,7 +352,7 @@ impl Packet { } // increment the count on the channel (used for selection) - fn bump(&mut self, amt: isize) -> isize { + fn bump(&self, amt: isize) -> isize { match self.cnt.fetch_add(amt, Ordering::SeqCst) { DISCONNECTED => { self.cnt.store(DISCONNECTED, Ordering::SeqCst); @@ -363,7 +364,7 @@ impl Packet { // Attempts to start selecting on this port. Like a oneshot, this can fail // immediately because of an upgrade. - pub fn start_selection(&mut self, token: SignalToken) -> SelectionResult { + pub fn start_selection(&self, token: SignalToken) -> SelectionResult { match self.decrement(token) { Ok(()) => SelSuccess, Err(token) => { @@ -387,7 +388,7 @@ impl Packet { } // Removes a previous thread from being blocked in this port - pub fn abort_selection(&mut self, + pub fn abort_selection(&self, was_upgrade: bool) -> Result> { // If we're aborting selection after upgrading from a oneshot, then // we're guarantee that no one is waiting. The only way that we could @@ -403,7 +404,7 @@ impl Packet { // this end. This is fine because we know it's a small bounded windows // of time until the data is actually sent. if was_upgrade { - assert_eq!(self.steals, 0); + assert_eq!(unsafe { *self.steals.get() }, 0); assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); return Ok(true) } @@ -444,8 +445,10 @@ impl Packet { thread::yield_now(); } } - assert_eq!(self.steals, 0); - self.steals = steals; + unsafe { + assert_eq!(*self.steals.get(), 0); + *self.steals.get() = steals; + } // if we were previously positive, then there's surely data to // receive