Skip to content

Commit

Permalink
Port java.util.concurrent.Semaphore from JSR-166 (#3164)
Browse files Browse the repository at this point in the history
  • Loading branch information
WojciechMazur committed Feb 15, 2023
1 parent 735e674 commit 1263a66
Show file tree
Hide file tree
Showing 2 changed files with 877 additions and 75 deletions.
193 changes: 137 additions & 56 deletions javalib/src/main/scala/java/util/concurrent/Semaphore.scala
Original file line number Diff line number Diff line change
@@ -1,85 +1,166 @@
// Ported from Scala.js commit: 9dc4d5b dated: 11 Oct 2018
/*
* Scala.js (https://www.scala-js.org/)
*
* Copyright EPFL.
*
* Licensed under Apache License 2.0
* (https://www.apache.org/licenses/LICENSE-2.0).
*
* See the NOTICE file distributed with this work for
* additional information regarding copyright ownership.
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/

package java.util.concurrent
import java.util.Collection
import java.util.concurrent.locks.AbstractQueuedSynchronizer
import scala.annotation.tailrec

import java.util.{Collection, Collections}
object Semaphore {

class Semaphore(private[this] var permits: Int, fairness: Boolean)
extends java.io.Serializable {
/** Synchronization implementation for semaphore. Uses AQS state to represent
* permits. Subclassed into fair and nonfair versions.
*/
@SerialVersionUID(1192457210091910933L)
abstract private[concurrent] class Sync private[concurrent] (val permits: Int)
extends AbstractQueuedSynchronizer {
setState(permits)
final private[concurrent] def getPermits(): Int = getState()
@tailrec
final private[concurrent] def nonfairTryAcquireShared(
acquires: Int
): Int = {
val available: Int = getState()
val remaining: Int = available - acquires
if (remaining < 0 || compareAndSetState(available, remaining)) remaining
else nonfairTryAcquireShared(acquires)
}

def this(permits: Int) = this(permits, false)
@tailrec
override final protected def tryReleaseShared(releases: Int): Boolean = {
val current: Int = getState()
val next: Int = current + releases
if (next < current) { // overflow
throw new Error("Maximum permit count exceeded")
}
if (compareAndSetState(current, next)) true
else tryReleaseShared(releases)
}

// These methods can’t be implemented because they block
// def acquire(): Unit
// def acquire(permits: Int): Unit
// def acquireUninterruptibly(): Unit
// def acquireUninterruptibly(permits: Int): Unit
// def tryAcquire(permits: Int, timeout: Long, unit: TimeUnit): Boolean
// def tryAcquire(timeout: Long, unit: TimeUnit): Boolean
@tailrec
final private[concurrent] def reducePermits(reductions: Int): Unit = {
val current: Int = getState()
val next: Int = current - reductions
if (next > current) { // underflow
throw new Error("Permit count underflow")
}
if (!compareAndSetState(current, next))
reducePermits(reductions)
}

def availablePermits(): Int = permits
@tailrec
final private[concurrent] def drainPermits(): Int = {
val current: Int = getState()
if (current == 0 || compareAndSetState(current, 0)) current
else drainPermits()
}
}

def drainPermits(): Int = {
val old = permits
permits = 0
old
/** NonFair version
*/
@SerialVersionUID(-2694183684443567898L)
final private[concurrent] class NonfairSync private[concurrent] (
override val permits: Int
) extends Semaphore.Sync(permits) {
override protected def tryAcquireShared(acquires: Int): Int =
nonfairTryAcquireShared(acquires)
}

/* One would expect that the accessor methods delegate to `getQueuedThreads`,
* but that is not the JDK behavior. In the absence of a specification, we
* replicate the JDK behavior. Notably, because the documentation of
* `getQueuedThreads` mentions that it is intended for extensive monitoring,
* not overriding. The fact that the method is not final is hence likely an
* oversight.
/** Fair version
*/
@SerialVersionUID(2014338818796000944L)
final private[concurrent] class FairSync private[concurrent] (
override val permits: Int
) extends Semaphore.Sync(permits) {
override protected def tryAcquireShared(acquires: Int): Int = {
if (hasQueuedPredecessors()) -1
else {
val available: Int = getState()
val remaining: Int = available - acquires
if (remaining < 0 || compareAndSetState(available, remaining))
remaining
else tryAcquireShared(acquires)
}
}
}
}

@SerialVersionUID(-3222578661600680210L)
class Semaphore private (sync: Semaphore.Sync) extends Serializable {

protected def getQueuedThreads(): Collection[Thread] = Collections.emptySet()
def this(permits: Int) = {
this(sync = new Semaphore.NonfairSync(permits))
}

def this(permits: Int, fair: Boolean) = {
this(
sync =
if (fair) new Semaphore.FairSync(permits)
else new Semaphore.NonfairSync(permits)
)
}

final def getQueueLength(): Int = 0
@throws[InterruptedException]
def acquire(): Unit = sync.acquireSharedInterruptibly(1)

final def hasQueuedThreads(): Boolean = false
def acquireUninterruptibly(): Unit = sync.acquireShared(1)

def isFair(): Boolean = fairness
def tryAcquire(): Boolean = sync.nonfairTryAcquireShared(1) >= 0

protected def reducePermits(reduction: Int): Unit = {
requireNonNegative(reduction)
permits -= reduction
@throws[InterruptedException]
def tryAcquire(timeout: Long, unit: TimeUnit): Boolean =
sync.tryAcquireSharedNanos(1, unit.toNanos(timeout))

def release(): Unit = sync.releaseShared(1)

@throws[InterruptedException]
def acquire(permits: Int): Unit = {
if (permits < 0) throw new IllegalArgumentException
sync.acquireSharedInterruptibly(permits)
}

def acquireUninterruptibly(permits: Int): Unit = {
if (permits < 0) throw new IllegalArgumentException
sync.acquireShared(permits)
}

def release(): Unit = release(1)
def tryAcquire(permits: Int): Boolean = {
if (permits < 0) { throw new IllegalArgumentException }
sync.nonfairTryAcquireShared(permits) >= 0
}

@throws[InterruptedException]
def tryAcquire(permits: Int, timeout: Long, unit: TimeUnit): Boolean = {
if (permits < 0) throw new IllegalArgumentException
sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout))
}

def release(permits: Int): Unit = {
requireNonNegative(permits)
this.permits += permits
if (permits < 0) throw new IllegalArgumentException
sync.releaseShared(permits)
}

override def toString: String =
s"${super.toString}[Permits = ${permits}]"
def availablePermits(): Int = sync.getPermits()

def tryAcquire(): Boolean = tryAcquire(1)
def drainPermits(): Int = sync.drainPermits()

def tryAcquire(permits: Int): Boolean = {
requireNonNegative(permits)
if (this.permits >= permits) {
this.permits -= permits
true
} else {
false
}
protected def reducePermits(reduction: Int): Unit = {
if (reduction < 0) { throw new IllegalArgumentException }
sync.reducePermits(reduction)
}

@inline private def requireNonNegative(n: Int): Unit = {
if (n < 0)
throw new IllegalArgumentException
def isFair(): Boolean = sync.isInstanceOf[Semaphore.FairSync]

final def hasQueuedThreads(): Boolean = sync.hasQueuedThreads()

final def getQueueLength(): Int = sync.getQueueLength()

protected def getQueuedThreads(): Collection[Thread] = sync.getQueuedThreads()

override def toString(): String = {
super.toString() + "[Permits = " + sync.getPermits() + "]"
}
}

0 comments on commit 1263a66

Please sign in to comment.