Skip to content

Commit

Permalink
finagle-kestrel: Do not empty cluster when it becomes Pending
Browse files Browse the repository at this point in the history
Problem
--

Some service discovery implementations (most notably Zk2Resolver) use the Pending state to signal
that the underlying mechanism is currently unavailable (e.g. ZooKeeper is not reachable). Kestrel
MultiReader interprets it as an empty set of Addresses, causing the cluster to empty when service
discovery is not reachable.

Solution
--

Do not empty the server set when Addr goes to Pending.

RB_ID=832923
  • Loading branch information
atollena authored and jenkins committed May 23, 2016
1 parent 9089647 commit a4eddb5
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 9 deletions.
1 change: 1 addition & 0 deletions finagle-kestrel/src/main/scala/BUILD
Expand Up @@ -14,5 +14,6 @@ scala_library(name='scala',
'finagle/finagle-kestrel/src/main/thrift:thrift-scala',
'util/util-core',
],
fatal_warnings=True,
sources=rglobs('*.scala'),
)
@@ -1,7 +1,5 @@
package com.twitter.finagle.kestrel

import scala.collection.JavaConversions._

import com.twitter.concurrent.{Offer, Broker}
import com.twitter.conversions.time._
import com.twitter.finagle.kestrel.protocol._
Expand All @@ -12,6 +10,8 @@ import com.twitter.finagle.kestrel.net.lag.kestrel.thriftscala.Item
import com.twitter.finagle.kestrel.net.lag.kestrel.thriftscala.Kestrel.FinagledClient
import com.twitter.io.Buf
import com.twitter.util.{Command=>_, _}
import scala.collection.JavaConverters._
import scala.reflect.ClassTag

/**
* Indicates that a [[com.twitter.finagle.kestrel.ReadHandle]] has been closed.
Expand Down Expand Up @@ -159,7 +159,7 @@ object ReadHandle {
* A java-friendly interface to {{merged}}
*/
def merged(handles: _root_.java.util.Iterator[ReadHandle]): ReadHandle =
merged(handles.toSeq)
merged(handles.asScala.toSeq)
}

object Client {
Expand Down Expand Up @@ -333,7 +333,7 @@ abstract protected[kestrel] class CommandExecutorFactory[U]
* @tparam ItemId the type used by {{CommandExecutor}} to identify returned
* items
*/
abstract protected[kestrel] class ClientBase[CommandExecutor <: Closable, Reply, ItemId](
abstract protected[kestrel] class ClientBase[CommandExecutor <: Closable, Reply: ClassTag, ItemId](
underlying: CommandExecutorFactory[CommandExecutor])
extends Client
{
Expand Down
Expand Up @@ -475,8 +475,6 @@ abstract class MultiReaderBuilder[Req, Rep, Builder] private[kestrel](

private[this] val logger = DefaultLogger

private[this] val ReturnEmptySet = Return(Set.empty[ReadHandle])

protected[kestrel] def copy(config: MultiReaderConfig[Req, Rep]): Builder

protected[kestrel] def withConfig(
Expand Down Expand Up @@ -577,9 +575,19 @@ abstract class MultiReaderBuilder[Req, Rep, Builder] private[kestrel](
Return(currentHandles.values.toSet)
}

case Addr.Failed(t) => Throw(t)
case Addr.Neg =>
logger.info(s"Address could not be bound while trying to read from ${config.queueName}")
currentHandles.clear()
Return(currentHandles.values.toSet)

case _ => ReturnEmptySet
case Addr.Pending =>
// If resolution goes back to pending, it can mean there is a
// transient problem with service discovery. Keep the existing
// set.
logger.info(s"Pending name resolution for reading from ${config.queueName}")
Return(currentHandles.values.toSet)

case Addr.Failed(t) => Throw(t)
}

Var(Return(Set.empty), event)
Expand Down
Expand Up @@ -5,7 +5,7 @@ import org.jboss.netty.handler.codec.oneone.OneToOneEncoder

import com.twitter.finagle.memcached.protocol.text.{Decoding, Tokens, TokensWithData, ValueLines}
import com.twitter.io.Buf
import com.twitter.util.{Time, Duration}
import com.twitter.util.Duration

object ResponseToEncoding {
private val ZERO = "0"
Expand Down
Expand Up @@ -438,6 +438,23 @@ class MultiReaderTest extends FunSuite with MockitoSugar with Eventually with In
}
}

test("Var[Addr]-based cluster should use last known good state if addr becomes Pending") {
new AddrClusterHelper {
val va: Var[Addr] with Updatable[Addr] = Var(Addr.Bound(hosts: _*))
val handle = MultiReader(va, queueName).clientBuilder(mockClientBuilder).build()
va.update(Addr.Pending)

val messages = configureMessageReader(handle)
assert(messages.size == 0)

sendMessages()

eventually {
assert(messages == sentMessages.toSet)
}
}
}

test("dynamic SocketAddress cluster should read messages from a ready cluster") {
new DynamicClusterHelper {
val cluster = new DynamicCluster[SocketAddress](hosts)
Expand Down

0 comments on commit a4eddb5

Please sign in to comment.