Browse files

looks like finish() could be called twice, messing up connection counts

  • Loading branch information...
1 parent eedb1b7 commit c554d1027bb32bc83c343b7d9961e3bf1ba59a7e Robey Pointer committed Sep 23, 2011
Showing with 6 additions and 2 deletions.
  1. +6 −2 src/main/scala/net/lag/kestrel/NettyHandler.scala
8 src/main/scala/net/lag/kestrel/NettyHandler.scala
@@ -19,6 +19,8 @@ package net.lag.kestrel
+import java.nio.channels.ClosedChannelException
+import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.mutable
import com.twitter.conversions.time._
import com.twitter.logging.Logger
@@ -42,6 +44,7 @@ extends KestrelHandler(queueCollection, maxOpenTransactions) with ChannelUpstrea
private var remoteAddress: InetSocketAddress = null
var channel: Channel = null
+ val finished = new AtomicBoolean(false)
protected def clientDescription: String = {
"%s:%d".format(remoteAddress.getHostName, remoteAddress.getPort)
@@ -55,7 +58,8 @@ extends KestrelHandler(queueCollection, maxOpenTransactions) with ChannelUpstrea
e.getCause() match {
case _: ProtocolError =>
- case e: java.nio.channels.ClosedChannelException =>
+ case e: ClosedChannelException =>
+ if (finished.getAndSet(true) == false) finish()
jeffstyr added a line comment Sep 24, 2011

Heh actually the reason it still works is that the ClosedChannelException case now has finish() twice (once conditionalized and once not, just a typo I think). If you get rid of the unconditional one then the stuck transactions come back.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
case e: IOException =>
log.debug("I/O Exception on session %d: %s", sessionId, e.toString)
@@ -66,7 +70,7 @@ extends KestrelHandler(queueCollection, maxOpenTransactions) with ChannelUpstrea
case s: ChannelStateEvent =>
if ((s.getState() == ChannelState.CONNECTED) && (s.getValue() eq null)) {
- finish()
+ if (finished.getAndSet(true) == false) finish()
} else if ((s.getState() == ChannelState.OPEN) && (s.getValue() == true)) {
channel = s.getChannel()
remoteAddress = channel.getRemoteAddress.asInstanceOf[InetSocketAddress]

3 comments on commit c554d10


Hmm I think maybe this check needs to be inside of finish() instead (around the count bookkeeping), because a Future can get fulfilled after the ChannelStateEvent has been processed, causing the ClosedChannelException, so I think abortAnyTransaction() does potentially need to get called twice. (That said, with the change in the revision I can't seem to get any stuck transactions, but since it's a race it's hard to be sure.)


yeah, this was stupid. i realized later in the day that this fix is totally broken, and that probably the whole point of your original patch was that finish may need to be called a second time when a dead session has queue items thrown at it. ... i just didn't get to sit at my desk for the rest of the day. :)

Please sign in to comment.