Skip to content

Commit

Permalink
fix null pointer exception in the retrying broker when the passed-in
Browse files Browse the repository at this point in the history
channel is null (eg. when we use services).

also test this in the service specs, additionally verifying that we
DON'T retry on close-after-write.  add a big fat TODO with a whiny
note about vague semantics here.
  • Loading branch information
mariusae committed Dec 16, 2010
1 parent 40e4528 commit 9fa4fd7
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 3 deletions.
16 changes: 13 additions & 3 deletions src/main/scala/com/twitter/finagle/channel/RetryingBroker.scala
Expand Up @@ -5,7 +5,7 @@ import java.util.concurrent.TimeUnit

import org.jboss.netty.channel.{
Channels, Channel, DownstreamMessageEvent,
MessageEvent, ChannelFuture}
MessageEvent, ChannelFuture, DefaultChannelFuture}
import org.jboss.netty.util.{HashedWheelTimer, TimerTask, Timeout}

import com.twitter.finagle.util.{Cancelled, Error, Ok}
Expand All @@ -31,9 +31,16 @@ trait RetryingBrokerBase extends WrappingBroker {
override def getChannel = channel
}

// TODO: should we treat the case where a server closes the
// connection immediately as retriable? it's not clear what's going
// on from a protocol point of view. currently "retriable" events
// are ones in which the write fails. does this cover all applicable
// cases where there is a lingering closed server socket?

override def dispatch(e: MessageEvent): ReplyFuture = {
val incomingFuture = e.getFuture
val interceptErrors = Channels.future(e.getChannel)

interceptErrors {
case Ok(channel) =>
incomingFuture.setSuccess()
Expand Down Expand Up @@ -62,10 +69,13 @@ class RetryingBroker(val underlying: Broker, tries: Int) extends RetryingBrokerB
@volatile var triesLeft = tries
def retryFuture(channel: Channel) = {
triesLeft -= 1
val future = new DefaultChannelFuture(channel, false)
if (triesLeft > 0)
Channels.succeededFuture(channel)
future.setSuccess()
else
Channels.failedFuture(channel, new RetryFailureException)
future.setFailure(new RetryFailureException)

future
}
}

Expand Down
80 changes: 80 additions & 0 deletions src/test/scala/com/twitter/finagle/service/Client.scala
@@ -0,0 +1,80 @@
package com.twitter.finagle.service

import scala.collection.JavaConversions._

import org.specs.Specification

import org.jboss.netty.channel.local._
import org.jboss.netty.channel._
import org.jboss.netty.bootstrap.ServerBootstrap
import org.jboss.netty.handler.codec.http._

import com.twitter.util.TimeConversions._
import com.twitter.util.Throw
import com.twitter.finagle.builder.{ClientBuilder, ServerBuilder, Http}
import com.twitter.finagle.channel.ChannelClosedException

object ClientSpec extends Specification {
def withServer(handler: ChannelHandler)(spec: ClientBuilder => Unit) {
val cf = new DefaultLocalServerChannelFactory()

val bs = new ServerBootstrap(cf)
bs.setPipelineFactory(new ChannelPipelineFactory {
def getPipeline = {
val pipeline = Channels.pipeline()
pipeline.addLast("theHandler", handler)
pipeline
}
})

val serverAddress = new LocalAddress("server")
val serverChannel = bs.bind(serverAddress)

val builder =
ClientBuilder()
.channelFactory(new DefaultLocalClientChannelFactory)
.hosts(Seq(serverAddress))
.codec(Http)

try {
spec(builder)
} finally {
serverChannel.close().awaitUninterruptibly()
}
}

"client service" should {
var counter = 0
val closingHandler = new SimpleChannelUpstreamHandler {
override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
counter += 1
Channels.close(ctx.getChannel)
}
}

"report a closed connection when the server doesn't reply" in {
withServer(closingHandler) { clientBuilder =>
val client = clientBuilder.buildService[HttpRequest, HttpResponse]
val future = client(new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"))
val resolved = future within(1.second)
resolved.isThrow must beTrue
val Throw(cause) = resolved
cause must haveClass[ChannelClosedException]
}
}

"report a closed connection when the server doesn't reply, without retrying" in {
withServer(closingHandler) { clientBuilder =>
val client = clientBuilder
.retries(10)
.buildService[HttpRequest, HttpResponse]
val future = client(new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"))
val resolved = future within(1.second)
resolved.isThrow must beTrue
val Throw(cause) = resolved
cause must haveClass[ChannelClosedException]
counter must be_==(1)
}
}
}
}

0 comments on commit 9fa4fd7

Please sign in to comment.