Skip to content

Commit

Permalink
Introduce support for retry attempts when binding to the REST API por…
Browse files Browse the repository at this point in the history
…t in Application Master
  • Loading branch information
izeigerman committed Dec 5, 2018
1 parent 2161824 commit a31d8b8
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 22 deletions.
3 changes: 3 additions & 0 deletions akkeeper/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ akkeeper {
# The REST API port.
port = 5050

# The maximum number of attempts to bind to a next available port.
port-max-attempts = 8

# The client request timeout for the REST API.
request-timeout = 30s
}
Expand Down
66 changes: 44 additions & 22 deletions akkeeper/src/main/scala/akkeeper/master/MasterRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,14 @@
*/
package akkeeper.master

import java.util.UUID
import java.util.concurrent.TimeUnit

import akka.actor.{ActorRef, ActorSystem}
import akka.actor._
import akka.cluster.Cluster
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer
import akka.util.Timeout
import akkeeper.common.{InstanceId, InstanceInfo, InstanceUp}
import akkeeper.deploy.{DeployClient, DeployClientFactory}
import akkeeper.deploy.yarn.YarnApplicationMasterConfig
import akkeeper.master.route._
Expand All @@ -35,13 +33,33 @@ import akkeeper.utils.yarn._
import com.typesafe.config.{Config, ConfigFactory}
import org.slf4j.LoggerFactory

import scala.concurrent.{Await, ExecutionContext}
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.Duration

private[master] trait MasterRunner {
def run(masterArgs: MasterArguments): Unit
}

private[master] object MasterRunner {
final class ServiceProxy extends Actor with Stash {
private var service: Option[ActorRef] = None

override def receive: Receive = waitingForServiceReceive

private val waitingForServiceReceive: Receive = {
case newService: ActorRef =>
service = Some(newService)
context.become(proxyReceive)
unstashAll()
case _ => stash()
}

private val proxyReceive: Receive = {
case message => service.get.forward(message)
}
}
}

private[master] class YarnMasterRunner extends MasterRunner {

private val logger = LoggerFactory.getLogger(classOf[YarnMasterRunner])
Expand Down Expand Up @@ -89,15 +107,21 @@ private[master] class YarnMasterRunner extends MasterRunner {
)).route
}

private def registerMasterInstance[F[_]](storage: InstanceStorage[F],
actorSystem: ActorSystem,
restPort: Int): F[InstanceId] = {
val cluster = Cluster(actorSystem)
val instanceId = InstanceId(MasterService.MasterServiceName, UUID.randomUUID())
val extra = Map("apiPort" -> restPort.toString)
val info = InstanceInfo(instanceId, InstanceUp, MasterService.MasterServiceName,
cluster.selfRoles, Some(cluster.selfUniqueAddress), Set.empty, extra)
storage.registerInstance(info)
private def bindHttp(handler: Route, port: Int, attemptsLeft: Int)
(implicit system: ActorSystem, context: ExecutionContext,
materializer: ActorMaterializer): Future[Int] = {
Http().bindAndHandle(handler, "0.0.0.0", port)
.map { binding =>
binding.localAddress.getPort
}
.recoverWith { case ex =>
logger.error(s"Failed to bind to port $port", ex)
if (attemptsLeft == 0) {
Future.failed(ex)
} else {
bindHttp(handler, port + 1, attemptsLeft - 1)
}
}
}

def run(masterArgs: MasterArguments): Unit = {
Expand All @@ -113,23 +137,21 @@ private[master] class YarnMasterRunner extends MasterRunner {

val restConfig = config.getRestConfig
val restPort = restConfig.getInt("port")
val restPortMaxAttempts = restConfig.getInt("port-max-attempts")

val masterConfig = config.withMasterPort.withMasterRole
implicit val actorSystem = ActorSystem(config.getActorSystemName, masterConfig)
implicit val materializer = ActorMaterializer()
implicit val dispatcher = actorSystem.dispatcher

val masterServiceProxy = actorSystem.actorOf(Props(classOf[MasterRunner.ServiceProxy]))
val restHandler = createRestHandler(restConfig, masterServiceProxy)
val actualRestPort = Await.result(bindHttp(restHandler, restPort, restPortMaxAttempts), Duration.Inf)

val instanceStorage = createInstanceStorage(actorSystem, masterArgs.appId)
val deployClient = createDeployClient(actorSystem, masterArgs, restPort)
val deployClient = createDeployClient(actorSystem, masterArgs, actualRestPort)
val masterService = MasterService.createLocal(actorSystem, deployClient, instanceStorage)

val restHandler = createRestHandler(restConfig, masterService)
Http().bindAndHandle(restHandler, "0.0.0.0", restPort).onFailure {
case ex: Exception =>
logger.error(s"Failed to bind to port $restPort", ex)
}

registerMasterInstance(instanceStorage, actorSystem, restPort)
masterServiceProxy ! masterService

Await.result(actorSystem.whenTerminated, Duration.Inf)
materializer.shutdown()
Expand Down

0 comments on commit a31d8b8

Please sign in to comment.