Skip to content

Commit

Permalink
sync: Make Lock more similar to std::sync::Mutex (#1573)
Browse files Browse the repository at this point in the history
This renames `Lock` to `Mutex`, and brings the API more in line with `std::sync::Mutex`.

In partcular, locking now only takes `&self`, with the expectation that you place the `Mutex` in an `Arc` (or something similar) to share it between threads.

Fixes #1544.
Part of #1210.
  • Loading branch information
jonhoo committed Sep 19, 2019
1 parent d1f60ac commit e3415d8
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 195 deletions.
4 changes: 2 additions & 2 deletions tokio-sync/src/lib.rs
Expand Up @@ -29,13 +29,13 @@ macro_rules! if_fuzz {
}}
}

mod lock;
mod loom;
pub mod mpsc;
mod mutex;
pub mod oneshot;
pub mod semaphore;
mod task;
pub mod watch;

pub use lock::{Lock, LockGuard};
pub use mutex::{Mutex, MutexGuard};
pub use task::AtomicWaker;
173 changes: 0 additions & 173 deletions tokio-sync/src/lock.rs

This file was deleted.

148 changes: 148 additions & 0 deletions tokio-sync/src/mutex.rs
@@ -0,0 +1,148 @@
//! An asynchronous `Mutex`-like type.
//!
//! This module provides [`Mutex`], a type that acts similarly to an asynchronous `Mutex`, with one
//! major difference: the [`MutexGuard`] returned by `lock` is not tied to the lifetime of the
//! `Mutex`. This enables you to acquire a lock, and then pass that guard into a future, and then
//! release it at some later point in time.
//!
//! This allows you to do something along the lines of:
//!
//! ```rust,no_run
//! use tokio::sync::Mutex;
//! use std::sync::Arc;
//!
//! #[tokio::main]
//! async fn main() {
//! let data1 = Arc::new(Mutex::new(0));
//! let data2 = Arc::clone(&data1);
//!
//! tokio::spawn(async move {
//! let mut lock = data2.lock().await;
//! *lock += 1;
//! });
//!
//! let mut lock = data1.lock().await;
//! *lock += 1;
//! }
//! ```
//!
//! [`Mutex`]: struct.Mutex.html
//! [`MutexGuard`]: struct.MutexGuard.html

use crate::semaphore;

use futures_util::future::poll_fn;
use std::cell::UnsafeCell;
use std::fmt;
use std::ops::{Deref, DerefMut};

/// An asynchronous mutual exclusion primitive useful for protecting shared data
///
/// Each mutex has a type parameter (`T`) which represents the data that it is protecting. The data
/// can only be accessed through the RAII guards returned from `lock`, which
/// guarantees that the data is only ever accessed when the mutex is locked.
#[derive(Debug)]
pub struct Mutex<T> {
c: UnsafeCell<T>,
s: semaphore::Semaphore,
}

/// A handle to a held `Mutex`.
///
/// As long as you have this guard, you have exclusive access to the underlying `T`. The guard
/// internally keeps a reference-couned pointer to the original `Mutex`, so even if the lock goes
/// away, the guard remains valid.
///
/// The lock is automatically released whenever the guard is dropped, at which point `lock`
/// will succeed yet again.
#[derive(Debug)]
pub struct MutexGuard<'a, T> {
lock: &'a Mutex<T>,
permit: semaphore::Permit,
}

// As long as T: Send, it's fine to send and share Mutex<T> between threads.
// If T was not Send, sending and sharing a Mutex<T> would be bad, since you can access T through
// Mutex<T>.
unsafe impl<T> Send for Mutex<T> where T: Send {}
unsafe impl<T> Sync for Mutex<T> where T: Send {}
unsafe impl<'a, T> Sync for MutexGuard<'a, T> where T: Send + Sync {}

#[test]
fn bounds() {
fn check<T: Send>() {}
check::<MutexGuard<'_, u32>>();
}

impl<T> Mutex<T> {
/// Creates a new lock in an unlocked state ready for use.
pub fn new(t: T) -> Self {
Self {
c: UnsafeCell::new(t),
s: semaphore::Semaphore::new(1),
}
}

/// A future that resolves on acquiring the lock and returns the `MutexGuard`.
pub async fn lock(&self) -> MutexGuard<'_, T> {
let mut permit = semaphore::Permit::new();
poll_fn(|cx| permit.poll_acquire(cx, &self.s))
.await
.unwrap_or_else(|_| {
// The semaphore was closed. but, we never explicitly close it, and we have a
// handle to it through the Arc, which means that this can never happen.
unreachable!()
});

MutexGuard { lock: self, permit }
}
}

impl<'a, T> Drop for MutexGuard<'a, T> {
fn drop(&mut self) {
if self.permit.is_acquired() {
self.permit.release(&self.lock.s);
} else if ::std::thread::panicking() {
// A guard _should_ always hold its permit, but if the thread is already panicking,
// we don't want to generate a panic-while-panicing, since that's just unhelpful!
} else {
unreachable!("Permit not held when MutexGuard was dropped")
}
}
}

impl<T> From<T> for Mutex<T> {
fn from(s: T) -> Self {
Self::new(s)
}
}

impl<T> Default for Mutex<T>
where
T: Default,
{
fn default() -> Self {
Self::new(T::default())
}
}

impl<'a, T> Deref for MutexGuard<'a, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
assert!(self.permit.is_acquired());
unsafe { &*self.lock.c.get() }
}
}

impl<'a, T> DerefMut for MutexGuard<'a, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
assert!(self.permit.is_acquired());
unsafe { &mut *self.lock.c.get() }
}
}

impl<'a, T: fmt::Display> fmt::Display for MutexGuard<'a, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&**self, f)
}
}
19 changes: 7 additions & 12 deletions tokio-sync/tests/lock.rs → tokio-sync/tests/mutex.rs
@@ -1,12 +1,13 @@
#![warn(rust_2018_idioms)]

use tokio_sync::Lock;
use std::sync::Arc;
use tokio_sync::Mutex;
use tokio_test::task::spawn;
use tokio_test::{assert_pending, assert_ready};

#[test]
fn straight_execution() {
let mut l = Lock::new(100);
let l = Mutex::new(100);

{
let mut t = spawn(l.lock());
Expand All @@ -22,21 +23,15 @@ fn straight_execution() {
}
{
let mut t = spawn(l.lock());
let mut g = assert_ready!(t.poll());
let g = assert_ready!(t.poll());
assert_eq!(&*g, &98);

// We can continue to access the guard even if the lock is dropped
drop(t);
drop(l);
*g = 97;
assert_eq!(&*g, &97);
}
}

#[test]
fn readiness() {
let mut l1 = Lock::new(100);
let mut l2 = l1.clone();
let l1 = Arc::new(Mutex::new(100));
let l2 = Arc::clone(&l1);
let mut t1 = spawn(l1.lock());
let mut t2 = spawn(l2.lock());

Expand All @@ -55,7 +50,7 @@ fn readiness() {
#[test]
#[ignore]
fn lock() {
let mut lock = Lock::new(false);
let mut lock = Mutex::new(false);
let mut lock2 = lock.clone();
std::thread::spawn(move || {
Expand Down

0 comments on commit e3415d8

Please sign in to comment.