Permalink
Browse files

Rename Suspendable* to NonBlocking*

  • Loading branch information...
1 parent a7a7ffa commit 55ef0fee8dd89d82875858748f6b77f09ca23c0e @chirino chirino committed Apr 11, 2012
@@ -1,6 +1,6 @@
package org.koolapp.stream
-import org.koolapp.stream.support.SuspendableCursorAdapter
+import org.koolapp.stream.support.NonBlockingCursorAdapter
import java.io.Closeable
/**
@@ -12,12 +12,12 @@ public trait Cursor: Closeable {
}
/**
- * Converts this [[Cursor]] to a [[SuspendableCursor]] if it not already
+ * Converts this [[Cursor]] to a [[NonBlockingCursorCursor]] if it not already
*/
-inline fun Cursor.toSuspendableCursor(): SuspendableCursor {
- return if (this is SuspendableCursor) {
+inline fun Cursor.toNonBlockingCursorCursor(): NonBlockingCursor {
+ return if (this is NonBlockingCursor) {
this
} else {
- SuspendableCursorAdapter(this)
+ NonBlockingCursorAdapter(this)
}
}
@@ -0,0 +1,14 @@
+package org.koolapp.stream
+
+/**
+ * A [[Cursor]] which can woken up by a [[NonBlockingHandler]]
+ * to resume event delivery.
+ */
+public trait NonBlockingCursor : Cursor {
+
+ /**
+ * Wakes up the cursor so that it continues to raise events. A cursor usually
+ * goes into a sleep state if the NonBlockingHandler cannot accept another event.
+ */
+ public fun wakeup(): Unit
+}
@@ -7,22 +7,24 @@ package org.koolapp.stream
* A [[Stream]] will invoke only one of these methods at a time from a single thread, so the handler does
* not have to worry about concurrent access.
*
- * The sequence of events is Open, Next*, (Complete|Error) so that there will always be an Open first
- * then zero to many Next events and finally either Complete or Error
+ * The sequence of events is onOpen, offerNext*, (onComplete|onError) so that there will always be an onOpen first
+ * then zero to many offerNext events and finally either onComplete or onError.
*/
-public abstract class SuspendableHandler<in T> {
+public abstract class NonBlockingHandler<in T> {
/**
- * Receives the [[Cursor]] when the stream is opened in case
- * the handler wishes to close the cursor itself
+ * Receives the [[NonBlockingCursorCursor]] when the stream is opened in case
+ * the handler wishes to close the cursor or if it needs to wake up the cursor
+ * at a later time.
*/
- public abstract fun onOpen(cursor: SuspendableCursor): Unit
+ public abstract fun onOpen(cursor: NonBlockingCursor): Unit
/**
* Receives the next value of a stream and attempts to process it, returning *true* if its processed
* or *false* if it cannot be processed right now.
*
- * If this method returns false then the stream should suspend itself
+ * If this method returns false then the cursor MAY not deliver the handler any
+ * more events until you wake it up by calling the [[NonBlockingCursorCursor]]'s `wakeup()` method.
*/
public abstract fun offerNext(next: T): Boolean
@@ -31,21 +31,21 @@ public abstract class Stream<out T> {
* returning *false* if the next event cannot be processed yet to allow flow control to
* kick in
*/
- public open fun openSuspendable(nextBlock: (T) -> Boolean): Cursor {
- return open(FunctionSuspendableHandler(nextBlock))
+ public open fun openNonBlockingCursor(nextBlock: (T) -> Boolean): Cursor {
+ return open(FunctionNonBlockingHandler(nextBlock))
}
/**
- * Opens the stream of events using a [[SuspendableHandler]] so that flow control
+ * Opens the stream of events using a [[NonBlockingHandler]] so that flow control
* can be used to suspend the stream if the handler cannot consume an offered next event.
*
* [[Stream]] implementation classes which can implement flow control should override this
- * function to provide support for the [[SuspendableCursor]]
+ * function to provide support for the [[NonBlockingCursorCursor]]
*/
- public open fun open(suspendableHandler: SuspendableHandler<T>): SuspendableCursor {
- val handler = SuspendableHandlerAdapter(suspendableHandler)
+ public open fun open(suspendableHandler: NonBlockingHandler<T>): NonBlockingCursor {
+ val handler = NonBlockingHandlerAdapter(suspendableHandler)
val cursor = open(handler)
- return cursor.toSuspendableCursor()
+ return cursor.toNonBlockingCursorCursor()
}
/**
@@ -1,25 +0,0 @@
-package org.koolapp.stream
-
-/**
- * A [[Cursor]] which can be suspended and resumed if a [[SuspendableHandler]]
- * needs some flow control.
- */
-public trait SuspendableCursor : Cursor {
-
-// /**
-// * Returns true if this cursor is currently suspended
-// * due to flow control
-// */
-// public fun isSuspended(): Boolean
-//
-// /**
-// * Suspends this cursor which should prevent it raising more events until
-// * the [[resume()]] function is called
-// */
-// public fun suspend(): Unit
-
- /**
- * Resumes this cursor to continue raising events again after its been suspended
- */
- public fun resume(): Unit
-}
@@ -7,16 +7,16 @@ import org.koolapp.stream.*
import java.io.Closeable
/**
- * Adapts a [[SuspendableHandler]] to work with a regular [[Stream]] which works with a [[Handler]]
+ * Adapts a [[NonBlockingHandler]] to work with a regular [[Stream]] which works with a [[Handler]]
* by buffering up any events which could not be offered so they can be retried before the next event is
* offered
*/
-open class SuspendableHandlerAdapter<T>(val delegate: SuspendableHandler<T>): Handler<T>() {
+open class NonBlockingHandlerAdapter<T>(val delegate: NonBlockingHandler<T>): Handler<T>() {
val buffer: Queue<T> = ArrayDeque<T>()
- var suspendableCursor: SuspendableCursor? = null
+ var suspendableCursor: NonBlockingCursor? = null
public override fun onOpen(cursor: Cursor) {
- val newCursor = cursor.toSuspendableCursor()
+ val newCursor = cursor.toNonBlockingCursorCursor()
suspendableCursor = newCursor
delegate.onOpen(newCursor)
}
@@ -55,21 +55,11 @@ open class SuspendableHandlerAdapter<T>(val delegate: SuspendableHandler<T>): Ha
}
}
-class SuspendableCursorAdapter(val delegate: Cursor): AbstractCursor(), SuspendableCursor {
-// val suspended = AtomicBoolean(false)
+class NonBlockingCursorAdapter(val delegate: Cursor): AbstractCursor(), NonBlockingCursor {
-// public override fun isSuspended(): Boolean {
-// return suspended.get()
-// }
-
- public override fun resume() {
-// suspended.set(false)
+ public override fun wakeup() {
}
-// public override fun suspend() {
-// suspended.set(true)
-// }
-
protected override fun doClose() {
delegate.close()
}
@@ -78,7 +68,7 @@ class SuspendableCursorAdapter(val delegate: Cursor): AbstractCursor(), Suspenda
/**
* Useful base class for [[Handler]] to avoid having to implement [[onComplete()]] or [[onError()]]
*/
-abstract class AbstractSuspendableHandler<T> : SuspendableHandler<T>(), Closeable {
+abstract class AbstractNonBlockingHandler<T> : NonBlockingHandler<T>(), Closeable {
private val closedFlag = AtomicBoolean(false)
public override fun close() {
@@ -94,9 +84,9 @@ abstract class AbstractSuspendableHandler<T> : SuspendableHandler<T>(), Closeabl
public fun isClosed(): Boolean = closedFlag.get()
- var cursor: SuspendableCursor? = null
+ var cursor: NonBlockingCursor? = null
- public override fun onOpen(cursor: SuspendableCursor) {
+ public override fun onOpen(cursor: NonBlockingCursor) {
$cursor = cursor
}
@@ -116,9 +106,9 @@ abstract class AbstractSuspendableHandler<T> : SuspendableHandler<T>(), Closeabl
}
/**
- * Allows a function to be converted into an [[SuspendableHandler]] so we can use a simple function to consume events
+ * Allows a function to be converted into an [[NonBlockingHandler]] so we can use a simple function to consume events
*/
-class FunctionSuspendableHandler<T>(val fn: (T) -> Boolean) : AbstractSuspendableHandler<T>() {
+class FunctionNonBlockingHandler<T>(val fn: (T) -> Boolean) : AbstractNonBlockingHandler<T>() {
public override fun offerNext(next: T): Boolean {
return (fn)(next)
}
Oops, something went wrong.

0 comments on commit 55ef0fe

Please sign in to comment.