Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Fix LatchedChannelSource to maintain ordering; bump minor version (in…

…terface of same changed)
  • Loading branch information...
commit ec9a9af2e059270ab03cc382e7e51ea8c3d5aeac 1 parent 673aa7b
@zuercher zuercher authored
View
2  project/Build.scala
@@ -12,7 +12,7 @@ object Naggati extends Build {
).settings(
name := "naggati",
organization := "com.twitter",
- version := "4.0.2-SNAPSHOT",
+ version := "4.1.0-SNAPSHOT",
scalaVersion := "2.9.2",
libraryDependencies ++= Seq(
View
127 src/main/scala/com/twitter/naggati/LatchedChannelSource.scala
@@ -16,132 +16,69 @@
package com.twitter.naggati
-import com.twitter.concurrent.{Broker, Offer, Serialized}
+import com.twitter.concurrent.{Broker, Offer}
import com.twitter.util.{Future, Promise}
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
-import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import scala.collection.mutable.{ArrayBuffer, Queue}
import scala.collection.JavaConversions._
+sealed abstract trait Notification
+case object Sent extends Notification
+case object Done extends Notification
+
/**
* LatchedChannelSource buffers all posted messages until there is at least one receiver.
* As soon as there is at least one receiver, it latches open and never buffers again.
*/
-class LatchedChannelSource[A] extends Serialized {
- @volatile private[this] var open = true
- private[this] val channelBroker = new Broker[A]
- private[this] val closeBroker = new Broker[Unit]
+class LatchedChannelSource[A] {
+ private[this] val channelBroker = new Broker[Notification]
private[this] val observersAdded = new AtomicInteger(0)
private[this] val observers =
- new JConcurrentMapWrapper(new ConcurrentHashMap[ConcreteObserver[A], ConcreteObserver[A]])
+ new JConcurrentMapWrapper(new ConcurrentHashMap[Observer[A], Observer[A]])
- protected[naggati] var buffer = new ListBuffer[A]
- @volatile protected[naggati] var ready = false
- @volatile protected[naggati] var closed = false
+ protected[naggati] val buffer = new Queue[A]
- /**
- * An object representing the lifecycle of subscribing to a LatchedChannelSource.
- * This object can be used to unsubscribe.
- */
- trait Observer[A] {
- /**
- * Indicates that the Observer is no longer interested in receiving
- * messages.
- */
- def dispose()
- }
+ private[this] val closesPromise = new Promise[Unit]
- private[this] class ConcreteObserver[A](listener: A => Future[Unit]) extends Observer[A] {
+ private[this] class Observer[A](listener: A => Future[Unit]) {
def apply(a: A) = { listener(a) }
- def dispose() {
- LatchedChannelSource.this.serialized {
- observers.remove(this)
- }
- }
}
- private[this] val _closes = new Promise[Unit]
- val closes: Future[Unit] = _closes
-
- def isOpen = open
-
- def respond(listener: A => Future[Unit]): Observer[A] = {
- val observer = new ConcreteObserver(listener)
+ def respond(listener: A => Future[Unit]) {
+ val observer = new Observer(listener)
def loop() {
- val closeOffer = closeBroker.recv
-
- val channelOffer = channelBroker.recv { a =>
- val observersCopy = new ArrayBuffer[ConcreteObserver[A]]
- observers.keys.copyToBuffer(observersCopy)
- observersCopy.foreach { _(a) }
- loop()
+ val channelOffer = channelBroker.recv {
+ case Sent =>
+ val item = synchronized { buffer.dequeue }
+ val observersCopy = new ArrayBuffer[Observer[A]]
+ observers.keys.copyToBuffer(observersCopy)
+ observersCopy.foreach { _(item) }
+ loop()
+ case Done =>
+ closesPromise.setValue(())
}
- // sequence to ensure channel gets priority over close
- val offer = channelOffer orElse {
- Offer.choose(channelOffer, closeOffer)
- }
- offer.sync()
- }
-
- serialized {
- if (open) {
- observers += observer -> observer
- if (observersAdded.incrementAndGet == 1) {
- // first observer -- deliver the buffer and handle delayed close
- deliverBuffer()
- if (!closed) loop()
- }
- }
+ channelOffer.sync()
}
- observer
- }
-
- // if the channel isn't ready yet, execute some code inside the lock.
- // return whether the channel was ready: true = channel ready; false = code was executed
- private def checkReady(ifNot: => Unit): Boolean = {
- synchronized {
- if (!ready) ifNot
- ready
+ observers += observer -> observer
+ if (observersAdded.incrementAndGet == 1) {
+ loop()
}
}
- private[this] def closeInternal() {
- serialized {
- if (open) {
- open = false
- _closes.setValue(())
- observers.clear()
- }
- }
- }
+ def closes: Future[Unit] = closesPromise
def close() {
- // don't allow a close() to take effect until after we latch.
- if (checkReady { closed = true }) {
- closeInternal()
- closeBroker.send(()).sync()
- }
+ channelBroker.send(Done).sync()
}
- def send(a: A): Seq[Future[Unit]] = {
- if (checkReady { buffer += a }) {
- Seq(channelBroker.send(a).sync())
- } else {
- Seq()
- }
- }
-
- private[this] def deliverBuffer() {
+ def send(a: A): Future[Unit] = {
synchronized {
- if (!ready) {
- buffer.foreach { item => observers.keys.foreach { _(item) } }
- buffer.clear()
- ready = true
- if (closed) closeInternal()
- }
+ buffer.enqueue(a)
}
+ channelBroker.send(Sent).sync()
}
}
View
17 src/test/scala/com/twitter/naggati/LatchedChannelSourceSpec.scala
@@ -25,10 +25,8 @@ class LatchedChannelSourceSpec extends Specification {
"buffer when there are no receivers" in {
val channel = new LatchedChannelSource[String]
channel.send("hello")
- channel.ready mustEqual false
channel.buffer.size mustEqual 1
channel.send("goodbye")
- channel.ready mustEqual false
channel.buffer.size mustEqual 2
}
@@ -36,7 +34,6 @@ class LatchedChannelSourceSpec extends Specification {
val channel = new LatchedChannelSource[String]
channel.send("hello")
channel.send("kitty")
- channel.ready mustEqual false
channel.buffer.size mustEqual 2
var received = new mutable.ListBuffer[String]
@@ -45,13 +42,11 @@ class LatchedChannelSourceSpec extends Specification {
Future.Done
}
received.toList mustEqual List("hello", "kitty")
- channel.ready mustEqual true
}
- "not buffer after the channel is latched" in {
+ "not buffer after the channel is observed" in {
val channel = new LatchedChannelSource[String]
channel.send("hello")
- channel.ready mustEqual false
channel.buffer.size mustEqual 1
var received = new mutable.ListBuffer[String]
@@ -60,20 +55,18 @@ class LatchedChannelSourceSpec extends Specification {
Future.Done
}
received.toList mustEqual List("hello")
- channel.ready mustEqual true
+ channel.buffer.size mustEqual 0
channel.send("kitty")
received.toList mustEqual List("hello", "kitty")
- channel.ready mustEqual true
+ channel.buffer.size mustEqual 0
}
- "not actually close until the channel is latched" in {
+ "not actually close until the channel is observed" in {
val channel = new LatchedChannelSource[String]
channel.send("hello")
channel.close()
- channel.ready mustEqual false
channel.buffer.size mustEqual 1
- channel.isOpen mustEqual true
var received = new mutable.ListBuffer[String]
channel.respond { s =>
@@ -81,7 +74,6 @@ class LatchedChannelSourceSpec extends Specification {
Future.Done
}
received.toList mustEqual List("hello")
- channel.isOpen mustEqual false
}
"keep items in order after latching" in {
@@ -100,7 +92,6 @@ class LatchedChannelSourceSpec extends Specification {
channel.respond { s =>
received += s
- Thread.sleep(100)
Future.Done
}
View
5 src/test/scala/com/twitter/naggati/codec/MemcacheCodecSpec.scala
@@ -101,10 +101,11 @@ class MemcacheCodecSpec extends Specification with JMocker {
codec.send(new MemcacheResponse("OK") then Codec.Stream(channel)) mustEqual List("OK\r\n")
- Future.join(channel.send(new MemcacheResponse("VALUE foo 0 5", Some(ByteBuffer.wrap("kitty".getBytes)))))()
+ channel.send(new MemcacheResponse("VALUE foo 0 5", Some(ByteBuffer.wrap("kitty".getBytes))))()
+
codec.getDownstream mustEqual List("OK\r\n", "VALUE foo 0 5\r\nkitty\r\nEND\r\n")
- Future.join(channel.send(new MemcacheResponse("END")))()
+ channel.send(new MemcacheResponse("END"))()
codec.getDownstream mustEqual List("OK\r\n", "VALUE foo 0 5\r\nkitty\r\nEND\r\n", "END\r\n")
}
}
Please sign in to comment.
Something went wrong with that request. Please try again.