Skip to content

Commit

Permalink
sync: support larger number of semaphore permits (#2607)
Browse files Browse the repository at this point in the history
  • Loading branch information
kornelski committed Jul 21, 2020
1 parent cbb4abc commit c344aac
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 10 deletions.
25 changes: 16 additions & 9 deletions tokio/src/sync/batch_semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub(crate) struct AcquireError(());
pub(crate) struct Acquire<'a> {
node: Waiter,
semaphore: &'a Semaphore,
num_permits: u16,
num_permits: u32,
queued: bool,
}

Expand Down Expand Up @@ -103,6 +103,8 @@ impl Semaphore {
const PERMIT_SHIFT: usize = 1;

/// Creates a new semaphore with the initial number of permits
///
/// Maximum number of permits on 32-bit platforms is `1<<29`.
pub(crate) fn new(permits: usize) -> Self {
assert!(
permits <= Self::MAX_PERMITS,
Expand Down Expand Up @@ -159,9 +161,14 @@ impl Semaphore {
}
}

pub(crate) fn try_acquire(&self, num_permits: u16) -> Result<(), TryAcquireError> {
let mut curr = self.permits.load(Acquire);
pub(crate) fn try_acquire(&self, num_permits: u32) -> Result<(), TryAcquireError> {
assert!(
num_permits as usize <= Self::MAX_PERMITS,
"a semaphore may not have more than MAX_PERMITS permits ({})",
Self::MAX_PERMITS
);
let num_permits = (num_permits as usize) << Self::PERMIT_SHIFT;
let mut curr = self.permits.load(Acquire);
loop {
// Has the semaphore closed?git
if curr & Self::CLOSED > 0 {
Expand All @@ -182,7 +189,7 @@ impl Semaphore {
}
}

pub(crate) fn acquire(&self, num_permits: u16) -> Acquire<'_> {
pub(crate) fn acquire(&self, num_permits: u32) -> Acquire<'_> {
Acquire::new(self, num_permits)
}

Expand Down Expand Up @@ -247,7 +254,7 @@ impl Semaphore {
fn poll_acquire(
&self,
cx: &mut Context<'_>,
num_permits: u16,
num_permits: u32,
node: Pin<&mut Waiter>,
queued: bool,
) -> Poll<Result<(), AcquireError>> {
Expand Down Expand Up @@ -356,7 +363,7 @@ impl fmt::Debug for Semaphore {
}

impl Waiter {
fn new(num_permits: u16) -> Self {
fn new(num_permits: u32) -> Self {
Waiter {
waker: UnsafeCell::new(None),
state: AtomicUsize::new(num_permits as usize),
Expand Down Expand Up @@ -409,7 +416,7 @@ impl Future for Acquire<'_> {
}

impl<'a> Acquire<'a> {
fn new(semaphore: &'a Semaphore, num_permits: u16) -> Self {
fn new(semaphore: &'a Semaphore, num_permits: u32) -> Self {
Self {
node: Waiter::new(num_permits),
semaphore,
Expand All @@ -418,14 +425,14 @@ impl<'a> Acquire<'a> {
}
}

fn project(self: Pin<&mut Self>) -> (Pin<&mut Waiter>, &Semaphore, u16, &mut bool) {
fn project(self: Pin<&mut Self>) -> (Pin<&mut Waiter>, &Semaphore, u32, &mut bool) {
fn is_unpin<T: Unpin>() {}
unsafe {
// Safety: all fields other than `node` are `Unpin`

is_unpin::<&Semaphore>();
is_unpin::<&mut bool>();
is_unpin::<u16>();
is_unpin::<u32>();

let this = self.get_unchecked_mut();
(
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/sync/rwlock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl<'a, T: ?Sized> ReleasingPermit<'a, T> {
lock: &'a RwLock<T>,
num_permits: u16,
) -> Result<ReleasingPermit<'a, T>, AcquireError> {
lock.s.acquire(num_permits).await?;
lock.s.acquire(num_permits.into()).await?;
Ok(Self { num_permits, lock })
}
}
Expand Down

0 comments on commit c344aac

Please sign in to comment.