Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

fix race condition when giving channels back from the BrokerAdapter.

  • Loading branch information...
commit 445ef3f80e75eaaf53b99b8eace623e6efc3b1ca 1 parent 1312c6c
@mariusae mariusae authored
View
3  finagle-core/project/build.properties
@@ -1,3 +0,0 @@
-#Project properties
-#Fri Jan 07 12:57:30 PST 2011
-project.name=finagle-core
View
23 finagle-core/src/main/scala/com/twitter/finagle/channel/BrokerAdapter.scala
@@ -12,15 +12,19 @@ import com.twitter.util.{Future, Promise, Return, Throw, Try}
class BrokerAdapter extends SimpleChannelUpstreamHandler {
@volatile private[this] var replyFuture: Promise[AnyRef] = null
- def writeAndRegisterReply(to: Channel, message: AnyRef, replyFuture: Promise[AnyRef]) {
- if (this.replyFuture ne null) {
- done(Throw(new TooManyConcurrentRequestsException))
+ def writeAndRegisterReply(channel: Channel, message: AnyRef,
+ incomingReplyFuture: Promise[AnyRef]) {
+ // If there is an outstanding request, something up the stack has
+ // fucked up. We currently just fail this request immediately, and
+ // let the current request complete.
+ if (replyFuture ne null) {
+ incomingReplyFuture.updateIfEmpty(Throw(new TooManyConcurrentRequestsException))
} else {
- this.replyFuture = replyFuture
- Channels.write(to, message) {
+ replyFuture = incomingReplyFuture
+ Channels.write(channel, message) {
case Error(cause) =>
// Always close on error.
- fail(to, new WriteException(cause))
+ fail(channel, new WriteException(cause))
case _ => ()
}
}
@@ -62,8 +66,13 @@ class BrokerAdapter extends SimpleChannelUpstreamHandler {
private[this] def done(answer: Try[AnyRef]) {
if (replyFuture ne null) {
- replyFuture.updateIfEmpty(answer)
+ // The order of operations here is important: the callback from
+ // the future could invoke another request immediately, and
+ // since the stack upstream knows we're done when the reply
+ // future has been satisfied, it may reuse us immediately.
+ val currentReplyFuture = replyFuture
replyFuture = null
+ currentReplyFuture.updateIfEmpty(answer)
}
}
}
View
9 finagle-core/src/main/scala/com/twitter/finagle/channel/ConnectingChannelBroker.scala
@@ -39,10 +39,13 @@ trait ConnectingChannelBroker extends Broker {
replyFuture
}
- private[this] def connectChannel(to: Channel, message: AnyRef, replyFuture: Promise[AnyRef]) {
- to.getPipeline.getLast match {
+ private[this] def connectChannel(
+ channel: Channel,
+ message: AnyRef,
+ replyFuture: Promise[AnyRef]) {
+ channel.getPipeline.getLast match {
case adapter: BrokerAdapter =>
- adapter.writeAndRegisterReply(to, message, replyFuture)
+ adapter.writeAndRegisterReply(channel, message, replyFuture)
case _ =>
replyFuture.updateIfEmpty(Throw(new InvalidPipelineException))
}
View
5 finagle-core/src/main/scala/com/twitter/finagle/channel/Exceptions.scala
@@ -11,9 +11,12 @@ class NoBrokersAvailableException extends RequestException
class ChannelException extends Exception
class ConnectionFailedException extends ChannelException
class ChannelClosedException extends ChannelException
-class WriteException(e: Throwable) extends ChannelException
class SpuriousMessageException extends ChannelException
class UnknownChannelException(e: Throwable) extends ChannelException
+class WriteException(e: Throwable) extends ChannelException {
+ override def toString = "%s: %s".format(super.toString, e.toString)
+}
+
// Subclass this for application exceptions
class ApplicationException extends Exception
View
4 finagle-ostrich/project/build.properties
@@ -1,4 +0,0 @@
-#Project properties
-#Fri Jan 07 12:51:17 PST 2011
-project.name=finagle-ostrich
-
View
4 finagle-thrift/project/build.properties
@@ -1,4 +0,0 @@
-#Project properties
-#Fri Jan 07 12:57:30 PST 2011
-project.name=finagle-thrift
-
Please sign in to comment.
Something went wrong with that request. Please try again.