Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Enhanced Lift's future support

  • Loading branch information...
commit ce124b1468ecf0aaedd3f81fe8e96759015b3970 1 parent ce4bf6f
@dpp dpp authored
View
155 core/actor/src/main/scala/net/liftweb/actor/LAFuture.scala
@@ -19,29 +19,75 @@ package actor
import common._
+/**
+ * A container that contains a calculated value
+ * or may contain one in the future
+ */
class LAFuture[T] /*extends Future[T]*/ {
- @volatile private[this] var item: T = _
- @volatile private[this] var satisfied = false
+ private var item: T = _
+ private var satisfied = false
+ private var aborted = false
+ private var toDo: List[T => Unit] = Nil
- def satisfy(value: T): Unit = synchronized {
- if (!satisfied) {
- item = value
- satisfied = true
+ /**
+ * Satify the future... perform the calculation
+ * the results in setting a value for the future
+ */
+ def satisfy(value: T): Unit = {
+ val funcs = synchronized {
+ try {
+ if (!satisfied && !aborted) {
+ item = value
+ satisfied = true
+ toDo
+ } else Nil
+ } finally {
+ notifyAll()
+ }
}
- notifyAll()
+ funcs.foreach(f => LAScheduler.execute(() => f(value)))
}
+ /**
+ * Get the future value
+ */
def get: T = synchronized {
if (satisfied) item
else {
this.wait()
if (satisfied) item
+ else if (aborted) throw new AbortedFutureException()
else get
}
}
+ /**
+ * Execute the function with the value. If the
+ * value has not been satisfied, execute the function
+ * when the value is satified
+ */
+ def foreach(f: T => Unit) {
+ val todo = synchronized {
+ if (satisfied) {
+ val v = item
+ () => f(v)
+ } else {
+ toDo ::= f
+ () => ()
+ }
+ }
+
+ todo()
+ }
+
+ /**
+ * Get the future value or if the value is not
+ * satisfied after the timeout period, return an
+ * Empty
+ */
def get(timeout: Long): Box[T] = synchronized {
if (satisfied) Full(item)
+ else if (aborted) Empty
else {
try {
wait(timeout)
@@ -52,5 +98,100 @@ class LAFuture[T] /*extends Future[T]*/ {
}
}
}
+
+ /**
+ * Has the future been satisfied
+ */
+ def isSatisfied: Boolean = synchronized {satisfied}
+
+ /**
+ * Has the future been aborted
+ */
+ def isAborted: Boolean = synchronized {aborted}
+
+ /**
+ * Abort the future. It can never be satified
+ */
+ def abort() {
+ synchronized {
+ if (!satisfied && !aborted) {
+ aborted = true
+ notifyAll()
+ }
+ }
+ }
+}
+
+/**
+ * Thrown if an LAFuture is aborted during a get
+ */
+final class AbortedFutureException() extends Exception("Aborted Future")
+
+object LAFuture {
+ /**
+ * Collect all the future values into the aggregate future
+ * The returned future will be satisfied when all the
+ * collected futures are satisfied
+ */
+ def collect[T](future: LAFuture[T]*): LAFuture[List[T]] = {
+ val sync = new Object
+ val len = future.length
+ val vals = new collection.mutable.ArrayBuffer[T](len)
+ var gotCnt = 0
+ val ret = new LAFuture[List[T]]
+
+ future.toList.zipWithIndex.foreach {
+ case (f, idx) =>
+ f.foreach {
+ v => sync.synchronized {
+ vals(idx) = v
+ gotCnt += 1
+ if (gotCnt >= len) {
+ ret.satisfy(vals.toList)
+ }
+ }
+ }
+ }
+
+ ret
+ }
+
+ /**
+ * Collect all the future values into the aggregate future
+ * The returned future will be satisfied when all the
+ * collected futures are satisfied or if any of the
+ * futures is Empty, then immediately satisfy the
+ * returned future with an Empty
+ */
+ def collectAll[T](future: LAFuture[Box[T]]*): LAFuture[Box[List[T]]] = {
+ val sync = new Object
+ val len = future.length
+ val vals = new collection.mutable.ArrayBuffer[T](len)
+ var gotCnt = 0
+ val ret = new LAFuture[Box[List[T]]]
+
+ future.toList.zipWithIndex.foreach {
+ case (f, idx) =>
+ f.foreach {
+ vb => sync.synchronized {
+ vb match {
+ case Full(v) => {
+ vals(idx) = v
+ gotCnt += 1
+ if (gotCnt >= len) {
+ ret.satisfy(Full(vals.toList))
+ }
+ }
+
+ case eb: EmptyBox => {
+ ret.satisfy(eb)
+ }
+ }
+ }
+ }
+ }
+
+ ret
+ }
}
View
10 core/actor/src/main/scala/net/liftweb/actor/LiftActor.scala
@@ -24,7 +24,7 @@ trait ILAExecute {
def shutdown(): Unit
}
-object LAScheduler {
+object LAScheduler extends Loggable {
@volatile
var onSameThread = false
@@ -48,7 +48,13 @@ object LAScheduler {
new LinkedBlockingQueue)
def execute(f: () => Unit): Unit =
- es.execute(new Runnable{def run() {f()}})
+ es.execute(new Runnable{def run() {
+ try {
+ f()
+ } catch {
+ case e: Exception => logger.error("Lift Actor Scheduler", e)
+ }
+ }})
def shutdown(): Unit = {
es.shutdown()
Please sign in to comment.
Something went wrong with that request. Please try again.