Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port java.util.concurrent.locks types from JSR-166 #3130

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/

package java.util.concurrent

@SerialVersionUID(7117394618823254244L)
class BrokenBarrierException(message: String) extends Exception(message) {
def this() = this(null)
}
66 changes: 60 additions & 6 deletions javalib/src/main/scala/java/util/concurrent/CountDownLatch.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,67 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package java.util.concurrent

// No-op stub defined to allow usage of lazy vals with -Ylightweight-lazy-vals
class CountDownLatch(count: Int) {
import java.util.concurrent.locks.AbstractQueuedSynchronizer
import scala.annotation.tailrec

def await(): Unit = ()
object CountDownLatch {

def await(timeout: Long, unit: TimeUnit): Boolean = true
/** Synchronization control For CountDownLatch. Uses AQS state to represent
* count.
*/
@SerialVersionUID(4982264981922014374L)
final private class Sync(val count: Int) extends AbstractQueuedSynchronizer {
setState(count)

def countDown(): Unit = ()
private[concurrent] def getCount() = getState()

def getCount(): Long = 0L
override protected def tryAcquireShared(acquires: Int): Int = {
if (getState() == 0) 1
else -1
}

override protected def tryReleaseShared(releases: Int): Boolean = { // Decrement count; signal when transition to zero
@tailrec
def loop(): Boolean = getState() match {
case 0 => false
case state =>
val nextState = state - 1
if (compareAndSetState(state, nextState)) {
nextState == 0
} else loop()
}
loop()
}
}
}

class CountDownLatch private (sync: CountDownLatch.Sync) {
def this(count: Int) = {
this(sync = {
if (count < 0) throw new IllegalArgumentException("count < 0")
new CountDownLatch.Sync(count)
})
}

@throws[InterruptedException]
def await(): Unit = {
sync.acquireSharedInterruptibly(1)
}

@throws[InterruptedException]
def await(timeout: Long, unit: TimeUnit): Boolean =
sync.tryAcquireSharedNanos(1, unit.toNanos(timeout))

def countDown(): Unit = {
sync.releaseShared(1)
}

def getCount(): Long = sync.getCount()

override def toString(): String =
super.toString + "[Count = " + sync.getCount() + "]"
}
155 changes: 155 additions & 0 deletions javalib/src/main/scala/java/util/concurrent/CyclicBarrier.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/

package java.util.concurrent

import java.util.concurrent.locks.Condition
import java.util.concurrent.locks.ReentrantLock

object CyclicBarrier {
private[concurrent] class Generation() {
var broken = false // initially false
}
}
class CyclicBarrier(
/* The number of parties */
val parties: Int,
/* The command to run when tripped */
val barrierCommand: Runnable
) {

def this(parties: Int) = this(parties, null)

/* Number of parties still waiting. Counts down from parties to 0 on each
* generation. It is reset to parties on each new generation or when broken.
*/
private var count: Int = parties
if (count <= 0) throw new IllegalArgumentException

/* The lock for guarding barrier entry */
final private val lock = new ReentrantLock

/* Condition to wait on until tripped */
final private val trip = lock.newCondition()

/* The current generation */
private var generation = new CyclicBarrier.Generation

/* Updates state on barrier trip and wakes up everyone. Called only while
* holding lock.
*/
private def nextGeneration(): Unit = { // signal completion of last generation
trip.signalAll()
// set up next generation
count = parties
generation = new CyclicBarrier.Generation
}

/* Sets current barrier generation as broken and wakes up everyone. Called
* only while holding lock.
*/
private def breakBarrier(): Unit = {
generation.broken = true
count = parties
trip.signalAll()
}

/* Main barrier code, covering the various policies.*/
@throws[InterruptedException]
@throws[BrokenBarrierException]
@throws[TimeoutException]
private def dowait(timed: Boolean, _nanos: Long): Int = {
var nanos = _nanos
val lock = this.lock
lock.lock()
try {
val g = generation
if (g.broken) throw new BrokenBarrierException
if (Thread.interrupted()) {
breakBarrier()
throw new InterruptedException
}
count -= 1
val index = count
if (index == 0) { // tripped
val command = barrierCommand
if (command != null)
try command.run()
catch {
case ex: Throwable =>
breakBarrier()
throw ex
}
nextGeneration()
return 0
}
// loop until tripped, broken, interrupted, or timed out

while (true) {
try
if (!timed) trip.await()
else if (nanos > 0L) nanos = trip.awaitNanos(nanos)
catch {
case ie: InterruptedException =>
if ((g eq generation) && !g.broken) {
breakBarrier()
throw ie
} else { // We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt()
}
}
if (g.broken) throw new BrokenBarrierException
if (g ne generation) return index
if (timed && nanos <= 0L) {
breakBarrier()
throw new TimeoutException
}
}
} finally lock.unlock()
-1 // unreachable
}

def getParties(): Int = parties

@throws[InterruptedException]
@throws[BrokenBarrierException]
def await(): Int =
try dowait(false, 0L)
catch {
case toe: TimeoutException => throw new Error(toe) // cannot happen
}

@throws[InterruptedException]
@throws[BrokenBarrierException]
@throws[TimeoutException]
def await(timeout: Long, unit: TimeUnit): Int =
dowait(true, unit.toNanos(timeout))

def isBroken(): Boolean = {
val lock = this.lock
lock.lock()
try generation.broken
finally lock.unlock()
}

def reset(): Unit = {
val lock = this.lock
lock.lock()
try {
breakBarrier() // break the current generation
nextGeneration() // start a new generation
} finally lock.unlock()
}

def getNumberWaiting(): Int = {
val lock = this.lock
lock.lock()
try parties - count
finally lock.unlock()
}
}
16 changes: 16 additions & 0 deletions javalib/src/main/scala/java/util/concurrent/ForkJoinPool.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package java.util.concurrent

object ForkJoinPool {

trait ManagedBlocker {
@throws[InterruptedException]
def block(): Boolean
def isReleasable(): Boolean
}
def managedBlock(blocker: ManagedBlocker): Unit = () // TODO: ForkJoinPool
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
package java.util.concurrent.locks
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/

import java.io.Serializable
package java.util.concurrent
package locks

abstract class AbstractOwnableSynchronizer protected () extends Serializable {
private var exclusiveOwner: Thread = _
import java.util.concurrent.atomic.AtomicReference

abstract class AbstractOwnableSynchronizer protected ()
extends java.io.Serializable {

private var exclusiveOwnerThread: Thread = _

protected final def setExclusiveOwnerThread(t: Thread): Unit =
exclusiveOwnerThread = t

protected final def setExclusiveOwnerThread(thread: Thread): Unit =
exclusiveOwner = thread
protected final def getExclusiveOwnerThread(): Thread =
exclusiveOwner
exclusiveOwnerThread
}