diff --git a/CHANGELOG.rst b/CHANGELOG.rst index b9f6786c10..aacbc668a2 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -22,6 +22,9 @@ Runtime Behavior Changes * finagle-netty4: Change the 'connection_requests' metric to debug verbosity. ``PHAB_ID=D391289`` +* finagle-serversets: Ensure `ZkSession#retrying` is resilient to ZK host resolution failure. + ``PHAB_ID=D403895`` + * finagle-thrift: Per-method metrics are now created lazily, so if you have methods on a Thrift service that you don't use, the associated metrics won't be exported. ``PHAB_ID=D400382`` diff --git a/finagle-serversets/src/main/scala/com/twitter/finagle/serverset2/ZkSession.scala b/finagle-serversets/src/main/scala/com/twitter/finagle/serverset2/ZkSession.scala index bac22633ad..8af95aae6f 100644 --- a/finagle-serversets/src/main/scala/com/twitter/finagle/serverset2/ZkSession.scala +++ b/finagle-serversets/src/main/scala/com/twitter/finagle/serverset2/ZkSession.scala @@ -8,6 +8,7 @@ import com.twitter.finagle.stats._ import com.twitter.io.Buf import com.twitter.logging.Logger import com.twitter.util._ +import java.net.UnknownHostException import scala.collection.concurrent object zkConcurrentOperations @@ -297,8 +298,13 @@ private[serverset2] object ZkSession { /** * Produce a new `ZkSession`. * + * Note: if `ClientBuilder` throws `UnknownHostException`, we return a no-op `ZkSession` + * in a failed state to trigger a retry (see `ZkSession#retrying`). + * + * @param retryStream The backoff schedule for reconnecting on session expiry and retrying operations. * @param hosts A comma-separated "host:port" string for a ZooKeeper server. * @param sessionTimeout The ZooKeeper session timeout to use. + * @param statsReceiver A [[StatsReceiver]] for the ZooKeeper session. */ private[serverset2] def apply( retryStream: RetryStream, @@ -307,22 +313,37 @@ private[serverset2] object ZkSession { statsReceiver: StatsReceiver )( implicit timer: Timer - ): ZkSession = - new ZkSession( - retryStream, + ): ZkSession = { + val watchedZkReader = try { ClientBuilder() .hosts(hosts) .sessionTimeout(sessionTimeout) .statsReceiver(DefaultStatsReceiver.scope("zkclient").scope(Zk2Resolver.statsOf(hosts))) .readOnlyOK() - .reader(), - statsReceiver.scope(Zk2Resolver.statsOf(hosts)) - ) + .reader() + } catch { + case exc: UnknownHostException => + Watched(NullZooKeeperReader, Var(WatchState.FailedToInitialize(exc))) + } + + new ZkSession(retryStream, watchedZkReader, statsReceiver.scope(Zk2Resolver.statsOf(hosts))) + } /** - * Produce a `Var[ZkSession]` representing a ZooKeeper session that automatically - * reconnects upon session expiry. Reconnect attempts cease when any - * observation of the returned `Var[ZkSession]` is closed. + * Determine if the given `WatchState` requires a reconnect (see `ZkSession#retrying`). + */ + private[serverset2] def needsReconnect(state: WatchState): Boolean = { + state match { + case WatchState.FailedToInitialize(_) | WatchState.SessionState(SessionState.Expired) => true + case _ => false + } + } + + /** + * Produce a `Var[ZkSession]` representing a ZooKeeper session that automatically retries + * when a session fails to initialize _or_ when the current session expires. + * + * Reconnect attempts cease when any observation of the returned Var[ZkSession] is closed. */ def retrying( backoff: RetryStream, @@ -360,21 +381,25 @@ private[serverset2] object ZkSession { case _ => } - // Kick off a delayed reconnection on session expiration. + // Kick off a delayed reconnection if the new session failed to initialize _or_ the current session expired. zkSession.state.changes - .filter { - _ == WatchState.SessionState(SessionState.Expired) - } + .filter(needsReconnect) .toFuture() - .unit - .before { + .flatMap { state => val jitter = backoff.next() - logger.error( - s"Zookeeper session ${zkSession.sessionIdAsHex} has expired. Reconnecting in $jitter" - ) + state match { + case WatchState.FailedToInitialize(exc) => + logger.error( + s"Zookeeper session failed to initialize with exception: $exc. Retrying in $jitter" + ) + case WatchState.SessionState(SessionState.Expired) => + logger.error( + s"Zookeeper session ${zkSession.sessionIdAsHex} has expired. Reconnecting in $jitter" + ) + } Future.sleep(jitter) } - .ensure { reconnect() } + .ensure(reconnect()) } reconnect() diff --git a/finagle-serversets/src/main/scala/com/twitter/finagle/serverset2/client/types.scala b/finagle-serversets/src/main/scala/com/twitter/finagle/serverset2/client/types.scala index c8e2ba8654..4fd7f22e7b 100644 --- a/finagle-serversets/src/main/scala/com/twitter/finagle/serverset2/client/types.scala +++ b/finagle-serversets/src/main/scala/com/twitter/finagle/serverset2/client/types.scala @@ -86,6 +86,7 @@ private[serverset2] sealed trait WatchState private[serverset2] object WatchState { object Pending extends WatchState + case class FailedToInitialize(exc: Throwable) extends WatchState case class Determined(event: NodeEvent) extends WatchState case class SessionState(state: com.twitter.finagle.serverset2.client.SessionState) extends WatchState diff --git a/finagle-serversets/src/test/scala/com/twitter/finagle/serverset2/ZkSessionTest.scala b/finagle-serversets/src/test/scala/com/twitter/finagle/serverset2/ZkSessionTest.scala index 05d42ad78f..5ced16ce31 100644 --- a/finagle-serversets/src/test/scala/com/twitter/finagle/serverset2/ZkSessionTest.scala +++ b/finagle-serversets/src/test/scala/com/twitter/finagle/serverset2/ZkSessionTest.scala @@ -167,6 +167,56 @@ class ZkSessionTest extends FunSuite with Eventually with IntegrationPatience { } } + test("factory retries when ZK session fails to initialize") { + Time.withCurrentTimeFrozen { tc => + val identity = Identities.get().head + val authInfo = "%s:%s".format(identity, identity) + implicit val timer = new MockTimer + + // A normal `ZkSession` with updatable state. + val zkState: Var[WatchState] with Updatable[WatchState] = Var(WatchState.Pending) + val watchedZk = Watched(new OpqueueZkReader(), zkState) + + // A failed `ZkSession`. + val failedZk = Watched( + new OpqueueZkReader(), + Var(WatchState.FailedToInitialize(new Exception("failed"))) + ) + + // Return failed session on the first invocation. + var failed = true + val zk = ZkSession.retrying( + retryStream, + () => { + if (failed) { + failed = false + new ZkSession(retryStream, failedZk, NullStatsReceiver) + } else new ZkSession(retryStream, watchedZk, NullStatsReceiver) + } + ) + + zk.changes.respond { + case _ => () + } + + // The underlying session is in a failed state here. Ensure we haven't updated the `Var` yet. + assert(zk.sample() == ZkSession.nil) + + // Advance the timer to allow `reconnect` to run. + tc.advance(10.seconds) + timer.tick() + + // The underlying `ZkSession` should be set to `watchedZk` now. + // Update the session state to connected -- we should receive auth info. + zkState() = WatchState.SessionState(SessionState.SyncConnected) + eventually { + assert( + watchedZk.value.opq == Seq(AddAuthInfo("digest", Buf.Utf8(authInfo))) + ) + } + } + } + test("factory authenticates and closes on expiry") { Time.withCurrentTimeFrozen { tc => val identity = Identities.get().head