Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
...
compare: org=com.twitter,name=libkestrel,version=1.2.0
Checking mergeability… Don't worry, you can still create the pull request.
  • 4 commits
  • 13 files changed
  • 0 commit comments
  • 1 contributor
View
6 CHANGELOG
@@ -1,3 +1,9 @@
+1.2.0
+-----
+release: 6 july 2012
+
+- avoid creation of timer tasks when caller is willing to wait "forever"
+
1.1.0
-----
release: 29 may 2012
View
2  project/Build.scala
@@ -15,7 +15,7 @@ object Libkestrel extends Build {
).settings(
name := "libkestrel",
organization := "com.twitter",
- version := "1.1.0-SNAPSHOT",
+ version := "1.2.0",
scalaVersion := "2.9.1",
// time-based tests cannot be run in parallel
View
2  project/plugins.sbt
@@ -21,6 +21,6 @@ resolvers <<= (resolvers) { r =>
externalResolvers <<= (resolvers) map identity
-addSbtPlugin("com.twitter" % "sbt-package-dist" % "1.0.5")
+addSbtPlugin("com.twitter" % "sbt-package-dist" % "1.0.6")
addSbtPlugin("com.twitter" %% "sbt-scalatest-runner" % "1.0.2")
View
10 src/main/scala/com/twitter/libkestrel/BlockingQueue.scala
@@ -18,17 +18,22 @@ package com.twitter.libkestrel
import com.twitter.util.{Future, Time}
+sealed abstract class Deadline
+case class Before(deadline: Time) extends Deadline
+case object Forever extends Deadline
+
trait BlockingQueue[A <: AnyRef] {
def put(item: A): Boolean
def putHead(item: A)
def size: Int
def get(): Future[Option[A]]
- def get(deadline: Time): Future[Option[A]]
+ def get(deadline: Deadline): Future[Option[A]]
def poll(): Future[Option[A]]
def pollIf(predicate: A => Boolean): Future[Option[A]]
def flush()
def toDebug: String
def close()
+ def waiterCount: Int
}
trait Transaction[A <: AnyRef] {
@@ -41,9 +46,10 @@ trait TransactionalBlockingQueue[A <: AnyRef] {
def put(item: A): Boolean
def size: Int
def get(): Future[Option[Transaction[A]]]
- def get(deadline: Time): Future[Option[Transaction[A]]]
+ def get(deadline: Deadline): Future[Option[Transaction[A]]]
def poll(): Future[Option[Transaction[A]]]
def flush()
def toDebug: String
def close()
+ def waiterCount: Int
}
View
48 src/main/scala/com/twitter/libkestrel/ConcurrentBlockingQueue.scala
@@ -97,7 +97,7 @@ final class ConcurrentBlockingQueue[A <: AnyRef](
/**
* A queue of readers, some waiting with a timeout, others polling.
* `consumers` tracks the order for fairness, but `waiterSet` and `pollerSet` are
- * the definitive sets: a waiter/poller may be the queue, but not in the set, which
+ * the definitive sets: a waiter/poller may be in the queue, but not in the set, which
* just means that they had a timeout set and gave up or were rejected due to an
* empty queue.
*/
@@ -191,12 +191,35 @@ final class ConcurrentBlockingQueue[A <: AnyRef](
/**
* Get the next item from the queue, waiting forever if necessary.
*/
- def get(): Future[Option[A]] = get(None)
+ def get(): Future[Option[A]] = get(Forever)
/**
* Get the next item from the queue if it arrives before a timeout.
*/
- def get(deadline: Time): Future[Option[A]] = get(Some(deadline))
+ def get(deadline: Deadline): Future[Option[A]] = {
+ val promise = new Promise[Option[A]]
+ waiterSet.put(promise, promise)
+ val timerTask =
+ deadline match {
+ case Before(time) =>
+ val timerTask = timer.schedule(time) {
+ if (waiterSet.remove(promise) ne null) {
+ promise.setValue(None)
+ }
+ }
+ Some(timerTask)
+ case Forever => None
+ }
+ consumers.add(Waiter(promise, timerTask))
+ promise.onCancellation {
+ waiterSet.remove(promise)
+ timerTask.foreach { _.cancel() }
+ }
+ if (!queue.isEmpty || !headQueue.isEmpty) handoff()
+ promise
+ }
+
+
/**
* Get the next item from the queue if one is immediately available.
@@ -223,25 +246,6 @@ final class ConcurrentBlockingQueue[A <: AnyRef](
headQueue.clear()
}
- private def get(deadline: Option[Time]): Future[Option[A]] = {
- val promise = new Promise[Option[A]]
- waiterSet.put(promise, promise)
- val timerTask = deadline.map { d =>
- timer.schedule(d) {
- if (waiterSet.remove(promise) ne null) {
- promise.setValue(None)
- }
- }
- }
- consumers.add(Waiter(promise, timerTask))
- promise.onCancellation {
- waiterSet.remove(promise)
- timerTask.foreach { _.cancel() }
- }
- if (!queue.isEmpty || !headQueue.isEmpty) handoff()
- promise
- }
-
/**
* This is the only code path allowed to remove an item from `queue` or `consumers`.
*/
View
11 src/main/scala/com/twitter/libkestrel/JournaledBlockingQueue.scala
@@ -16,7 +16,6 @@
package com.twitter.libkestrel
-import com.twitter.conversions.time._
import com.twitter.util.{Future, Time}
import java.nio.ByteBuffer
@@ -51,14 +50,16 @@ private trait JournaledBlockingQueueMixin[A] {
def close() {
queue.close()
}
+
+ def waiterCount = reader.waiterCount
}
private[libkestrel] class JournaledBlockingQueue[A <: AnyRef](val queue: JournaledQueue, val codec: Codec[A])
extends BlockingQueue[A] with JournaledBlockingQueueMixin[A] {
- def get(): Future[Option[A]] = get(100.days.fromNow)
+ def get(): Future[Option[A]] = get(Forever)
- def get(deadline: Time): Future[Option[A]] = {
+ def get(deadline: Deadline): Future[Option[A]] = {
reader.get(Some(deadline)).map { optItem =>
optItem.map { item =>
reader.commit(item.id)
@@ -85,9 +86,9 @@ private[libkestrel] class TransactionalJournaledBlockingQueue[A <: AnyRef](
val queue: JournaledQueue, val codec: Codec[A])
extends TransactionalBlockingQueue[A] with JournaledBlockingQueueMixin[A] {
- def get(): Future[Option[Transaction[A]]] = get(100.days.fromNow)
+ def get(): Future[Option[Transaction[A]]] = get(Forever)
- def get(deadline: Time): Future[Option[Transaction[A]]] = {
+ def get(deadline: Deadline): Future[Option[Transaction[A]]] = {
reader.get(Some(deadline)).map { optItem =>
optItem.map { queueItem =>
new Transaction[A] {
View
4 src/main/scala/com/twitter/libkestrel/JournaledQueue.scala
@@ -514,7 +514,7 @@ class JournaledQueue(
* Remove and return an item from the queue, if there is one.
* If no deadline is given, an item is only returned if one is immediately available.
*/
- def get(deadline: Option[Time], peeking: Boolean = false): Future[Option[QueueItem]] = {
+ def get(deadline: Option[Deadline], peeking: Boolean = false): Future[Option[QueueItem]] = {
if (closed) return Future.value(None)
discardExpired()
val startTime = Time.now
@@ -601,7 +601,7 @@ class JournaledQueue(
/**
* Peek at the head item in the queue, if there is one.
*/
- def peek(deadline: Option[Time]): Future[Option[QueueItem]] = {
+ def peek(deadline: Option[Deadline]): Future[Option[QueueItem]] = {
get(deadline, true)
}
View
34 src/main/scala/com/twitter/libkestrel/SimpleBlockingQueue.scala
@@ -3,7 +3,6 @@ package com.twitter.libkestrel
import java.util.LinkedHashSet
import scala.collection.mutable
import scala.collection.JavaConverters._
-import com.twitter.conversions.time._
import com.twitter.util.{Duration, Future, Promise, Time, TimeoutException, Timer, TimerTask}
object SimpleBlockingQueue {
@@ -54,15 +53,15 @@ final class SimpleBlockingQueue[A <: AnyRef](
def size: Int = queue.size
- def get(): Future[Option[A]] = get(500.days.fromNow)
+ def get(): Future[Option[A]] = get(Forever)
- def get(deadline: Time): Future[Option[A]] = {
+ def get(deadline: Deadline): Future[Option[A]] = {
val promise = new Promise[Option[A]]
waitFor(promise, deadline)
promise
}
- private def waitFor(promise: Promise[Option[A]], deadline: Time) {
+ private def waitFor(promise: Promise[Option[A]], deadline: Deadline) {
val item = poll()()
item match {
case s @ Some(x) => promise.setValue(s)
@@ -105,6 +104,11 @@ final class SimpleBlockingQueue[A <: AnyRef](
queue.clear()
waiters.triggerAll()
}
+
+ /**
+ * Return the number of consumers waiting for an item.
+ */
+ def waiterCount: Int = waiters.size
}
/**
@@ -114,15 +118,19 @@ final class SimpleBlockingQueue[A <: AnyRef](
* exactly one of the functions will be called, never both.
*/
final class DeadlineWaitQueue(timer: Timer) {
- case class Waiter(var timerTask: TimerTask, awaken: () => Unit)
+ case class Waiter(var timerTask: Option[TimerTask], awaken: () => Unit)
private val queue = new LinkedHashSet[Waiter].asScala
- def add(deadline: Time, awaken: () => Unit, onTimeout: () => Unit) = {
- val waiter = Waiter(null, awaken)
- val timerTask = timer.schedule(deadline) {
- if (synchronized { queue.remove(waiter) }) onTimeout()
+ def add(deadline: Deadline, awaken: () => Unit, onTimeout: () => Unit) = {
+ val waiter = Waiter(None, awaken)
+ deadline match {
+ case Before(time) =>
+ val timerTask = timer.schedule(time) {
+ if (synchronized { queue.remove(waiter) }) onTimeout()
+ }
+ waiter.timerTask = Some(timerTask)
+ case Forever => ()
}
- waiter.timerTask = timerTask
synchronized { queue.add(waiter) }
waiter
}
@@ -134,7 +142,7 @@ final class DeadlineWaitQueue(timer: Timer) {
waiter
}
}.foreach { waiter =>
- waiter.timerTask.cancel()
+ waiter.timerTask.foreach { _.cancel() }
waiter.awaken()
}
}
@@ -145,14 +153,14 @@ final class DeadlineWaitQueue(timer: Timer) {
queue.clear()
rv
}.foreach { waiter =>
- waiter.timerTask.cancel()
+ waiter.timerTask.foreach { _.cancel() }
waiter.awaken()
}
}
def remove(waiter: Waiter) {
synchronized { queue.remove(waiter) }
- waiter.timerTask.cancel()
+ waiter.timerTask.foreach { _.cancel() }
}
def size = {
View
36 src/test/scala/com/twitter/libkestrel/ConcurrentBlockingQueueSpec.scala
@@ -149,7 +149,7 @@ class ConcurrentBlockingQueueSpec extends Spec with ResourceCheckingSuite with S
it("timeout") {
Time.withCurrentTimeFrozen { timeMutator =>
val queue = newQueue()
- val future = queue.get(10.milliseconds.fromNow)
+ val future = queue.get(Before(10.milliseconds.fromNow))
timeMutator.advance(10.milliseconds)
timer.tick()
@@ -162,8 +162,8 @@ class ConcurrentBlockingQueueSpec extends Spec with ResourceCheckingSuite with S
it("fulfill gets before they timeout") {
Time.withCurrentTimeFrozen { timeMutator =>
val queue = newQueue()
- val future1 = queue.get(10.milliseconds.fromNow)
- val future2 = queue.get(10.milliseconds.fromNow)
+ val future1 = queue.get(Before(10.milliseconds.fromNow))
+ val future2 = queue.get(Before(10.milliseconds.fromNow))
queue.put("surprise!")
timeMutator.advance(10.milliseconds)
@@ -176,8 +176,8 @@ class ConcurrentBlockingQueueSpec extends Spec with ResourceCheckingSuite with S
}
}
- describe("really long timeout is cancelled") {
- val deadline = 7.days.fromNow
+ describe("really long timeout is canceled") {
+ val deadline = Before(7.days.fromNow)
it("when an item arrives") {
val queue = newQueue()
@@ -190,7 +190,7 @@ class ConcurrentBlockingQueueSpec extends Spec with ResourceCheckingSuite with S
assert(timer.tasks.size === 0)
}
- it("when the future is cancelled") {
+ it("when the future is canceled") {
val queue = newQueue()
val future = queue.get(deadline)
assert(timer.tasks.size === 1)
@@ -198,6 +198,30 @@ class ConcurrentBlockingQueueSpec extends Spec with ResourceCheckingSuite with S
future.cancel()
timer.tick()
assert(timer.tasks.size === 0)
+ assert(queue.waiterCount === 0)
+ }
+ }
+
+ describe("infinitely long timeout is never created") {
+ val deadline = Forever
+
+ it("but allows items to be retrieved") {
+ val queue = newQueue()
+ val future = queue.get(deadline)
+ assert(timer.tasks.size === 0)
+
+ queue.put("hello!")
+ assert(future() === Some("hello!"))
+ assert(queue.waiterCount === 0)
+ }
+
+ it("but allows future to be canceled") {
+ val queue = newQueue()
+ val future = queue.get(deadline)
+ assert(timer.tasks.size === 0)
+
+ future.cancel()
+ assert(queue.waiterCount === 0)
}
}
View
6 src/test/scala/com/twitter/libkestrel/JournaledBlockingQueueSpec.scala
@@ -70,7 +70,7 @@ class JournaledBlockingQueueSpec extends Spec with ResourceCheckingSuite with Sh
assert(reader.items === 3)
assert(reader.openItems === 0)
- assert(blockingQueue.get(100.seconds.fromNow)() === Some("first"))
+ assert(blockingQueue.get(Before(100.seconds.fromNow))() === Some("first"))
assert(blockingQueue.get()() === Some("second"))
assert(reader.items === 1)
@@ -115,7 +115,7 @@ class JournaledBlockingQueueSpec extends Spec with ResourceCheckingSuite with Sh
assert(reader.items === 3)
assert(reader.openItems === 0)
- val txn1 = blockingQueue.get(100.seconds.fromNow)().get
+ val txn1 = blockingQueue.get(Before(100.seconds.fromNow))().get
val txn2 = blockingQueue.get()().get
assert(txn1.item === "first")
@@ -145,7 +145,7 @@ class JournaledBlockingQueueSpec extends Spec with ResourceCheckingSuite with Sh
assert(reader.items === 3)
assert(reader.openItems === 0)
- val txn1 = blockingQueue.get(100.seconds.fromNow)().get
+ val txn1 = blockingQueue.get(Before(100.seconds.fromNow))().get
val txn2 = blockingQueue.get()().get
assert(txn1.item === "first")
View
2  src/test/scala/com/twitter/libkestrel/JournaledQueueSpec.scala
@@ -372,7 +372,7 @@ class JournaledQueueSpec extends Spec with ResourceCheckingSuite with ShouldMatc
assert(item.get.id === 4L)
assert(reader.get(None)() === None)
- val future = reader.get(Some(1.second.fromNow))
+ val future = reader.get(Some(Before(1.second.fromNow)))
reader.unget(item.get.id)
assert(future.isDefined)
assert(future().get.id === 4L)
View
2  src/test/scala/com/twitter/libkestrel/load/FloodTest.scala
@@ -120,7 +120,7 @@ object FloodTest extends LoadTesting {
polls += 1
queue.poll()()
} else {
- queue.get(1.millisecond.fromNow)()
+ queue.get(Before(1.millisecond.fromNow))()
}
if (item.isDefined) count += 1
if (validate) {
View
2  src/test/scala/com/twitter/libkestrel/load/TimeoutTest.scala
@@ -95,7 +95,7 @@ object TimeoutTest extends LoadTesting {
override def run() {
while (readerDeadline > Time.now) {
val timeout = readTimeoutHigh + random() % (range + 1)
- val optItem = queue.get(timeout.milliseconds.fromNow)()
+ val optItem = queue.get(Before(timeout.milliseconds.fromNow))()
optItem match {
case None =>
case Some(item) => {

No commit comments for this range

Something went wrong with that request. Please try again.