Skip to content

Commit

Permalink
finagle-serversets: Ensure ZkSession#retrying is resilient to ZK host…
Browse files Browse the repository at this point in the history
… resolution failure

**Problem**
Currently, there is no exception handling in `ZkSession#retrying`,
since we assume `newZkSession` will always return normally.  However, it's
possible that `newZkSession` throws `UnknownHostException` if DNS fails and we
cannot resolve hosts.

This causes the thread responsible for re-establishing the `ZkSession` to
terminate and the corresponding `ServiceDiscoverer` to remain unhealthy forever.

**Solution**
To solve this, we add a new `WatchState` to indicate that a
`ZkSession` could not be created (`WatchState.FailedToInitialize`). Then we
adapt the reconnect logic in `ZkSession#retrying` to retry on
`WatchState.FailedToInitialize` (in addition to session expiry).

**Result**
`ZkSession#retrying` can recover from intermittent DNS failures.

JIRA Issues: COORD-11929

Differential Revision: https://phabricator.twitter.biz/D403895
  • Loading branch information
Shakeel Rao authored and jenkins committed Dec 10, 2019
1 parent e333c83 commit 7125026
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 19 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ Runtime Behavior Changes
* finagle-netty4: Change the 'connection_requests' metric to debug verbosity.
``PHAB_ID=D391289``

* finagle-serversets: Ensure `ZkSession#retrying` is resilient to ZK host resolution failure.
``PHAB_ID=D403895``

* finagle-thrift: Per-method metrics are now created lazily, so if you have methods on a Thrift
service that you don't use, the associated metrics won't be exported. ``PHAB_ID=D400382``

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.twitter.finagle.stats._
import com.twitter.io.Buf
import com.twitter.logging.Logger
import com.twitter.util._
import java.net.UnknownHostException
import scala.collection.concurrent

object zkConcurrentOperations
Expand Down Expand Up @@ -297,8 +298,13 @@ private[serverset2] object ZkSession {
/**
* Produce a new `ZkSession`.
*
* Note: if `ClientBuilder` throws `UnknownHostException`, we return a no-op `ZkSession`
* in a failed state to trigger a retry (see `ZkSession#retrying`).
*
* @param retryStream The backoff schedule for reconnecting on session expiry and retrying operations.
* @param hosts A comma-separated "host:port" string for a ZooKeeper server.
* @param sessionTimeout The ZooKeeper session timeout to use.
* @param statsReceiver A [[StatsReceiver]] for the ZooKeeper session.
*/
private[serverset2] def apply(
retryStream: RetryStream,
Expand All @@ -307,22 +313,37 @@ private[serverset2] object ZkSession {
statsReceiver: StatsReceiver
)(
implicit timer: Timer
): ZkSession =
new ZkSession(
retryStream,
): ZkSession = {
val watchedZkReader = try {
ClientBuilder()
.hosts(hosts)
.sessionTimeout(sessionTimeout)
.statsReceiver(DefaultStatsReceiver.scope("zkclient").scope(Zk2Resolver.statsOf(hosts)))
.readOnlyOK()
.reader(),
statsReceiver.scope(Zk2Resolver.statsOf(hosts))
)
.reader()
} catch {
case exc: UnknownHostException =>
Watched(NullZooKeeperReader, Var(WatchState.FailedToInitialize(exc)))
}

new ZkSession(retryStream, watchedZkReader, statsReceiver.scope(Zk2Resolver.statsOf(hosts)))
}

/**
* Produce a `Var[ZkSession]` representing a ZooKeeper session that automatically
* reconnects upon session expiry. Reconnect attempts cease when any
* observation of the returned `Var[ZkSession]` is closed.
* Determine if the given `WatchState` requires a reconnect (see `ZkSession#retrying`).
*/
private[serverset2] def needsReconnect(state: WatchState): Boolean = {
state match {
case WatchState.FailedToInitialize(_) | WatchState.SessionState(SessionState.Expired) => true
case _ => false
}
}

/**
* Produce a `Var[ZkSession]` representing a ZooKeeper session that automatically retries
* when a session fails to initialize _or_ when the current session expires.
*
* Reconnect attempts cease when any observation of the returned Var[ZkSession] is closed.
*/
def retrying(
backoff: RetryStream,
Expand Down Expand Up @@ -360,21 +381,25 @@ private[serverset2] object ZkSession {
case _ =>
}

// Kick off a delayed reconnection on session expiration.
// Kick off a delayed reconnection if the new session failed to initialize _or_ the current session expired.
zkSession.state.changes
.filter {
_ == WatchState.SessionState(SessionState.Expired)
}
.filter(needsReconnect)
.toFuture()
.unit
.before {
.flatMap { state =>
val jitter = backoff.next()
logger.error(
s"Zookeeper session ${zkSession.sessionIdAsHex} has expired. Reconnecting in $jitter"
)
state match {
case WatchState.FailedToInitialize(exc) =>
logger.error(
s"Zookeeper session failed to initialize with exception: $exc. Retrying in $jitter"
)
case WatchState.SessionState(SessionState.Expired) =>
logger.error(
s"Zookeeper session ${zkSession.sessionIdAsHex} has expired. Reconnecting in $jitter"
)
}
Future.sleep(jitter)
}
.ensure { reconnect() }
.ensure(reconnect())
}

reconnect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ private[serverset2] sealed trait WatchState

private[serverset2] object WatchState {
object Pending extends WatchState
case class FailedToInitialize(exc: Throwable) extends WatchState
case class Determined(event: NodeEvent) extends WatchState
case class SessionState(state: com.twitter.finagle.serverset2.client.SessionState)
extends WatchState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,56 @@ class ZkSessionTest extends FunSuite with Eventually with IntegrationPatience {
}
}

test("factory retries when ZK session fails to initialize") {
Time.withCurrentTimeFrozen { tc =>
val identity = Identities.get().head
val authInfo = "%s:%s".format(identity, identity)
implicit val timer = new MockTimer

// A normal `ZkSession` with updatable state.
val zkState: Var[WatchState] with Updatable[WatchState] = Var(WatchState.Pending)
val watchedZk = Watched(new OpqueueZkReader(), zkState)

// A failed `ZkSession`.
val failedZk = Watched(
new OpqueueZkReader(),
Var(WatchState.FailedToInitialize(new Exception("failed")))
)

// Return failed session on the first invocation.
var failed = true
val zk = ZkSession.retrying(
retryStream,
() => {
if (failed) {
failed = false
new ZkSession(retryStream, failedZk, NullStatsReceiver)
} else new ZkSession(retryStream, watchedZk, NullStatsReceiver)
}
)

zk.changes.respond {
case _ => ()
}

// The underlying session is in a failed state here. Ensure we haven't updated the `Var` yet.
assert(zk.sample() == ZkSession.nil)

// Advance the timer to allow `reconnect` to run.
tc.advance(10.seconds)
timer.tick()

// The underlying `ZkSession` should be set to `watchedZk` now.
// Update the session state to connected -- we should receive auth info.
zkState() = WatchState.SessionState(SessionState.SyncConnected)
eventually {
assert(
watchedZk.value.opq == Seq(AddAuthInfo("digest", Buf.Utf8(authInfo)))
)
}
}
}

test("factory authenticates and closes on expiry") {
Time.withCurrentTimeFrozen { tc =>
val identity = Identities.get().head
Expand Down

0 comments on commit 7125026

Please sign in to comment.