Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Commit

Permalink
Fix LatchedChannelSource to maintain ordering; bump minor version (in…
Browse files Browse the repository at this point in the history
…terface of same changed)
  • Loading branch information
Stephan Zuercher committed Sep 27, 2012
1 parent 673aa7b commit ec9a9af
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 111 deletions.
2 changes: 1 addition & 1 deletion project/Build.scala
Expand Up @@ -12,7 +12,7 @@ object Naggati extends Build {
).settings(
name := "naggati",
organization := "com.twitter",
version := "4.0.2-SNAPSHOT",
version := "4.1.0-SNAPSHOT",
scalaVersion := "2.9.2",

libraryDependencies ++= Seq(
Expand Down
127 changes: 32 additions & 95 deletions src/main/scala/com/twitter/naggati/LatchedChannelSource.scala
Expand Up @@ -16,132 +16,69 @@

package com.twitter.naggati

import com.twitter.concurrent.{Broker, Offer, Serialized}
import com.twitter.concurrent.{Broker, Offer}
import com.twitter.util.{Future, Promise}
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.collection.mutable.{ArrayBuffer, Queue}
import scala.collection.JavaConversions._

sealed abstract trait Notification
case object Sent extends Notification
case object Done extends Notification

/**
* LatchedChannelSource buffers all posted messages until there is at least one receiver.
* As soon as there is at least one receiver, it latches open and never buffers again.
*/
class LatchedChannelSource[A] extends Serialized {
@volatile private[this] var open = true
private[this] val channelBroker = new Broker[A]
private[this] val closeBroker = new Broker[Unit]
class LatchedChannelSource[A] {
private[this] val channelBroker = new Broker[Notification]
private[this] val observersAdded = new AtomicInteger(0)

private[this] val observers =
new JConcurrentMapWrapper(new ConcurrentHashMap[ConcreteObserver[A], ConcreteObserver[A]])
new JConcurrentMapWrapper(new ConcurrentHashMap[Observer[A], Observer[A]])

protected[naggati] var buffer = new ListBuffer[A]
@volatile protected[naggati] var ready = false
@volatile protected[naggati] var closed = false
protected[naggati] val buffer = new Queue[A]

/**
* An object representing the lifecycle of subscribing to a LatchedChannelSource.
* This object can be used to unsubscribe.
*/
trait Observer[A] {
/**
* Indicates that the Observer is no longer interested in receiving
* messages.
*/
def dispose()
}
private[this] val closesPromise = new Promise[Unit]

private[this] class ConcreteObserver[A](listener: A => Future[Unit]) extends Observer[A] {
private[this] class Observer[A](listener: A => Future[Unit]) {
def apply(a: A) = { listener(a) }
def dispose() {
LatchedChannelSource.this.serialized {
observers.remove(this)
}
}
}

private[this] val _closes = new Promise[Unit]
val closes: Future[Unit] = _closes

def isOpen = open

def respond(listener: A => Future[Unit]): Observer[A] = {
val observer = new ConcreteObserver(listener)
def respond(listener: A => Future[Unit]) {
val observer = new Observer(listener)
def loop() {
val closeOffer = closeBroker.recv

val channelOffer = channelBroker.recv { a =>
val observersCopy = new ArrayBuffer[ConcreteObserver[A]]
observers.keys.copyToBuffer(observersCopy)
observersCopy.foreach { _(a) }
loop()
val channelOffer = channelBroker.recv {
case Sent =>
val item = synchronized { buffer.dequeue }
val observersCopy = new ArrayBuffer[Observer[A]]
observers.keys.copyToBuffer(observersCopy)
observersCopy.foreach { _(item) }
loop()
case Done =>
closesPromise.setValue(())
}

// sequence to ensure channel gets priority over close
val offer = channelOffer orElse {
Offer.choose(channelOffer, closeOffer)
}
offer.sync()
}

serialized {
if (open) {
observers += observer -> observer
if (observersAdded.incrementAndGet == 1) {
// first observer -- deliver the buffer and handle delayed close
deliverBuffer()
if (!closed) loop()
}
}
channelOffer.sync()
}

observer
}

// if the channel isn't ready yet, execute some code inside the lock.
// return whether the channel was ready: true = channel ready; false = code was executed
private def checkReady(ifNot: => Unit): Boolean = {
synchronized {
if (!ready) ifNot
ready
observers += observer -> observer
if (observersAdded.incrementAndGet == 1) {
loop()
}
}

private[this] def closeInternal() {
serialized {
if (open) {
open = false
_closes.setValue(())
observers.clear()
}
}
}
def closes: Future[Unit] = closesPromise

def close() {
// don't allow a close() to take effect until after we latch.
if (checkReady { closed = true }) {
closeInternal()
closeBroker.send(()).sync()
}
channelBroker.send(Done).sync()
}

def send(a: A): Seq[Future[Unit]] = {
if (checkReady { buffer += a }) {
Seq(channelBroker.send(a).sync())
} else {
Seq()
}
}

private[this] def deliverBuffer() {
def send(a: A): Future[Unit] = {
synchronized {
if (!ready) {
buffer.foreach { item => observers.keys.foreach { _(item) } }
buffer.clear()
ready = true
if (closed) closeInternal()
}
buffer.enqueue(a)
}
channelBroker.send(Sent).sync()
}
}
17 changes: 4 additions & 13 deletions src/test/scala/com/twitter/naggati/LatchedChannelSourceSpec.scala
Expand Up @@ -25,18 +25,15 @@ class LatchedChannelSourceSpec extends Specification {
"buffer when there are no receivers" in {
val channel = new LatchedChannelSource[String]
channel.send("hello")
channel.ready mustEqual false
channel.buffer.size mustEqual 1
channel.send("goodbye")
channel.ready mustEqual false
channel.buffer.size mustEqual 2
}

"send pent-up messages when a receiver is added" in {
val channel = new LatchedChannelSource[String]
channel.send("hello")
channel.send("kitty")
channel.ready mustEqual false
channel.buffer.size mustEqual 2

var received = new mutable.ListBuffer[String]
Expand All @@ -45,13 +42,11 @@ class LatchedChannelSourceSpec extends Specification {
Future.Done
}
received.toList mustEqual List("hello", "kitty")
channel.ready mustEqual true
}

"not buffer after the channel is latched" in {
"not buffer after the channel is observed" in {
val channel = new LatchedChannelSource[String]
channel.send("hello")
channel.ready mustEqual false
channel.buffer.size mustEqual 1

var received = new mutable.ListBuffer[String]
Expand All @@ -60,28 +55,25 @@ class LatchedChannelSourceSpec extends Specification {
Future.Done
}
received.toList mustEqual List("hello")
channel.ready mustEqual true
channel.buffer.size mustEqual 0

channel.send("kitty")
received.toList mustEqual List("hello", "kitty")
channel.ready mustEqual true
channel.buffer.size mustEqual 0
}

"not actually close until the channel is latched" in {
"not actually close until the channel is observed" in {
val channel = new LatchedChannelSource[String]
channel.send("hello")
channel.close()
channel.ready mustEqual false
channel.buffer.size mustEqual 1
channel.isOpen mustEqual true

var received = new mutable.ListBuffer[String]
channel.respond { s =>
received += s
Future.Done
}
received.toList mustEqual List("hello")
channel.isOpen mustEqual false
}

"keep items in order after latching" in {
Expand All @@ -100,7 +92,6 @@ class LatchedChannelSourceSpec extends Specification {

channel.respond { s =>
received += s
Thread.sleep(100)
Future.Done
}

Expand Down
Expand Up @@ -101,10 +101,11 @@ class MemcacheCodecSpec extends Specification with JMocker {

codec.send(new MemcacheResponse("OK") then Codec.Stream(channel)) mustEqual List("OK\r\n")

Future.join(channel.send(new MemcacheResponse("VALUE foo 0 5", Some(ByteBuffer.wrap("kitty".getBytes)))))()
channel.send(new MemcacheResponse("VALUE foo 0 5", Some(ByteBuffer.wrap("kitty".getBytes))))()

codec.getDownstream mustEqual List("OK\r\n", "VALUE foo 0 5\r\nkitty\r\nEND\r\n")

Future.join(channel.send(new MemcacheResponse("END")))()
channel.send(new MemcacheResponse("END"))()
codec.getDownstream mustEqual List("OK\r\n", "VALUE foo 0 5\r\nkitty\r\nEND\r\n", "END\r\n")
}
}
Expand Down

0 comments on commit ec9a9af

Please sign in to comment.