Skip to content

Commit

Permalink
Merge pull request #25 from izeigerman/random-akka-port-for-master
Browse files Browse the repository at this point in the history
Random akka port for master
  • Loading branch information
izeigerman committed Aug 10, 2018
2 parents d32657b + 911ff9f commit b9b07a8
Show file tree
Hide file tree
Showing 17 changed files with 120 additions and 45 deletions.
4 changes: 2 additions & 2 deletions akkeeper/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ akka {

akkeeper {
akka {
port = 2550
port = 0
system-name = "AkkeeperSystem"
}

Expand All @@ -33,7 +33,7 @@ akkeeper {
application-name = "AkkeeperApplication"
master {
cpus = 1
memory = 1048
memory = 1024
jvm.args = ["-Xmx1g"]
}
max-attempts = 2
Expand Down
10 changes: 8 additions & 2 deletions akkeeper/src/main/scala/akkeeper/common/InstanceInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package akkeeper.common

import akka.actor.Address
import akka.cluster.UniqueAddress
import akkeeper.AkkeeperException
import spray.json._

Expand Down Expand Up @@ -50,10 +51,12 @@ object InstanceStatus {
* @param roles the list of the instance's Akka roles.
* @param address the address of the instance.
* @param actors the list of user actors that are available on this instance.
* @param extra the key value properties with additional instance specific data.
*/
case class InstanceInfo private[akkeeper] (instanceId: InstanceId, status: InstanceStatus,
containerName: String, roles: Set[String],
address: Option[Address], actors: Set[String])
address: Option[UniqueAddress], actors: Set[String],
extra: Map[String, String] = Map.empty)

object InstanceInfo {
private def createWithStatus(instanceId: InstanceId, status: InstanceStatus): InstanceInfo = {
Expand Down Expand Up @@ -104,6 +107,9 @@ trait InstanceStatusJsonProtocol extends DefaultJsonProtocol with InstanceIdJson
}
}

implicit val uniqueAddressFormat =
jsonFormat[Address, Long, UniqueAddress]((a, u) => UniqueAddress(a, u), "address", "longUid")

implicit val instanceStatusFormat = new JsonFormat[InstanceStatus] {
override def read(json: JsValue): InstanceStatus = {
InstanceStatus.fromString(json.convertTo[String])
Expand All @@ -113,7 +119,7 @@ trait InstanceStatusJsonProtocol extends DefaultJsonProtocol with InstanceIdJson
}
}

implicit val instanceInfoFormat = jsonFormat6(InstanceInfo.apply)
implicit val instanceInfoFormat = jsonFormat7(InstanceInfo.apply)
}

object InstanceStatusJsonProtocol extends InstanceStatusJsonProtocol
2 changes: 1 addition & 1 deletion akkeeper/src/main/scala/akkeeper/common/StopInstance.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@
*/
package akkeeper.common

private[akkeeper] case object StopInstance
case object StopInstance
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import scala.concurrent.duration.Duration
object ContainerInstanceMain extends App {

val optParser = new OptionParser[ContainerInstanceArguments]("akkeeperInstance") {
head("akkeeperInstance", "0.1")
head("akkeeperInstance", "0.2.0")

opt[String](AppIdArg).required().action((v, c) => {
c.copy(appId = v)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ import ContainerInstanceService._
class ContainerInstanceService(instanceStorage: InstanceStorage.Async,
instanceId: InstanceId,
masterAddress: Address,
retryInterval: FiniteDuration)
registrationRetryInterval: FiniteDuration,
joinClusterTimeout: FiniteDuration)
extends Actor with ActorLogging {

private implicit val dispatcher = context.dispatcher
Expand Down Expand Up @@ -77,7 +78,7 @@ class ContainerInstanceService(instanceStorage: InstanceStorage.Async,
status = InstanceUp,
containerName = instanceId.containerName,
roles = cluster.selfRoles,
address = Some(cluster.selfAddress),
address = Some(cluster.selfUniqueAddress),
actors = actors.toSet
)
thisInstance = Some(info)
Expand All @@ -98,16 +99,20 @@ class ContainerInstanceService(instanceStorage: InstanceStorage.Async,
notifyMonitoringService
case OperationFailed(_, e) =>
// Failed to save the record to a storage.
log.error(e, s"Failed to store this instance information. Retrying in $retryInterval")
log.error(e, "Failed to store this instance information. " +
s"Retrying in $registrationRetryInterval")
// Scheduling retry.
context.system.scheduler.scheduleOnce(retryInterval, self, RetryRegistration)
context.system.scheduler.scheduleOnce(registrationRetryInterval,
self, RetryRegistration)
case RetryRegistration =>
log.info("Retrying instance registration process")
registerThisInstance
case StopInstance =>
log.info("Termination command received. Stopping this instance")
cluster.leave(cluster.selfAddress)
context.system.terminate()
case JoinClusterTimeout =>
// Safely ignore the timeout command.
}

private def joiningTheClusterReceive: Receive = {
Expand All @@ -118,6 +123,10 @@ class ContainerInstanceService(instanceStorage: InstanceStorage.Async,
context.become(initializedReceive)
registerThisInstance
}
case JoinClusterTimeout =>
log.error(s"Couldn't join the cluster during ${joinClusterTimeout.toSeconds} seconds. " +
"Terminating this instance...")
context.system.terminate()
}

private def waitingForActorsReceive: Receive = {
Expand All @@ -127,6 +136,8 @@ class ContainerInstanceService(instanceStorage: InstanceStorage.Async,
log.debug(s"Joining the cluster (master: $masterAddress)")
cluster.join(masterAddress)
cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberUp])
// Scheduling a timeout command.
context.system.scheduler.scheduleOnce(joinClusterTimeout, self, JoinClusterTimeout)
}

override def receive: Receive = waitingForActorsReceive
Expand All @@ -135,17 +146,20 @@ class ContainerInstanceService(instanceStorage: InstanceStorage.Async,
object ContainerInstanceService {
private[akkeeper] case class LaunchActors(actors: Seq[ActorLaunchContext])
private case object RetryRegistration
private val DefaultRetryInterval = 30 seconds
private case object JoinClusterTimeout
private val DefaultRegistrationRetryInterval = 30 seconds
private val DefaultJoinClusterTimeout = 120 seconds

val ActorName = "akkeeperInstance"

def createLocal(factory: ActorRefFactory,
instanceStorage: InstanceStorage.Async,
instanceId: InstanceId,
masterAddress: Address,
retryInterval: FiniteDuration = DefaultRetryInterval): ActorRef = {
registrationRetryInterval: FiniteDuration = DefaultRegistrationRetryInterval,
joinClusterTimeout: FiniteDuration = DefaultJoinClusterTimeout): ActorRef = {
val props = Props(classOf[ContainerInstanceService], instanceStorage,
instanceId, masterAddress, retryInterval)
instanceId, masterAddress, registrationRetryInterval, joinClusterTimeout)
factory.actorOf(props, ActorName)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import scala.concurrent.duration._
object LauncherMain extends App {

val optParser = new OptionParser[LaunchArguments]("akkeeper") {
head("akkeeper", "0.1")
head("akkeeper", "0.2.0")

opt[File]("akkeeperJar").required().action((v, c) => {
c.copy(akkeeperJarPath = v)
Expand Down
2 changes: 1 addition & 1 deletion akkeeper/src/main/scala/akkeeper/master/MasterMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.util.control.NonFatal
object MasterMain extends App {

val optParser = new OptionParser[MasterArguments]("akkeeperMaster") {
head("akkeeperMaster", "0.1")
head("akkeeperMaster", "0.2.0")

opt[String](AppIdArg).required().action((v, c) => {
c.copy(appId = v)
Expand Down
15 changes: 15 additions & 0 deletions akkeeper/src/main/scala/akkeeper/master/MasterRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package akkeeper.master

import java.util.UUID
import java.util.concurrent.TimeUnit

import akka.actor.{ActorRef, ActorSystem}
Expand All @@ -23,6 +24,7 @@ import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer
import akka.util.Timeout
import akkeeper.common.{InstanceId, InstanceInfo, InstanceUp}
import akkeeper.deploy.{DeployClient, DeployClientFactory}
import akkeeper.deploy.yarn.YarnApplicationMasterConfig
import akkeeper.master.route._
Expand Down Expand Up @@ -86,6 +88,17 @@ private[master] class YarnMasterRunner extends MasterRunner {
)).route
}

private def registerMasterInstance[F[_]](storage: InstanceStorage[F],
actorSystem: ActorSystem,
restPort: Int): F[InstanceId] = {
val cluster = Cluster(actorSystem)
val instanceId = InstanceId(MasterService.MasterServiceName, UUID.randomUUID())
val extra = Map("apiPort" -> restPort.toString)
val info = InstanceInfo(instanceId, InstanceUp, MasterService.MasterServiceName,
cluster.selfRoles, Some(cluster.selfUniqueAddress), Set.empty, extra)
storage.registerInstance(info)
}

def run(masterArgs: MasterArguments): Unit = {
val config = masterArgs.config
.map(c => ConfigFactory.parseFile(c).withFallback(ConfigFactory.load()))
Expand Down Expand Up @@ -115,6 +128,8 @@ private[master] class YarnMasterRunner extends MasterRunner {
logger.error(s"Failed to bind to port $restPort", ex)
}

registerMasterInstance(instanceStorage, actorSystem, restPort)

Await.result(actorSystem.whenTerminated, Duration.Inf)
materializer.shutdown()
ticketRenewer.foreach(_.stop())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ private[akkeeper] class MasterService(deployClient: DeployClient.Async,
log.debug(s"Received instance info. ${numOfRequiredInstances - seedInstances.size} " +
"more needed to proceed")
if (seedInstances.size >= numOfRequiredInstances) {
val seedAddrs = immutable.Seq(seedInstances.map(_.address.get).toSeq: _*)
val seedAddrs = immutable.Seq(seedInstances.map(_.address.get.address).toSeq: _*)
joinCluster(seedAddrs)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@
package akkeeper.master.service

import akka.actor._
import akka.cluster.Cluster
import akka.cluster.{Cluster, UniqueAddress}
import akka.cluster.ClusterEvent._
import akka.pattern.pipe
import akkeeper.api._
import akkeeper.common._
import akkeeper.container.service.ContainerInstanceService
import akkeeper.storage._

import scala.collection.mutable
import scala.concurrent.Future
import scala.util.control.NonFatal
Expand Down Expand Up @@ -63,23 +64,23 @@ private[akkeeper] class MonitoringService(instanceStorage: InstanceStorage.Async
sender() ! InstanceNotFound(requestId, instanceId)
}

private def findInstanceByAddr(addr: Address): Option[InstanceInfo] = {
private def findInstanceByAddr(addr: UniqueAddress): Option[InstanceInfo] = {
instances.collectFirst {
case (id, Some(info)) if info.address.exists(_ == addr) => info
case (_, Some(info)) if info.address.exists(_ == addr) => info
}
}

private def updateInstanceStatusByAddr(addr: Address, status: InstanceStatus): Unit = {
private def updateInstanceStatusByAddr(addr: UniqueAddress, status: InstanceStatus): Unit = {
findInstanceByAddr(addr)
.foreach(i => instances.put(i.instanceId, Some(i.copy(status = status))))
}

private def terminateInstance(instance: InstanceInfo): Unit = {
val addr = instance.address.get
val path = RootActorPath(addr) / "user" / ContainerInstanceService.ActorName
val path = RootActorPath(addr.address) / "user" / ContainerInstanceService.ActorName
val selection = context.actorSelection(path)
selection ! StopInstance
cluster.down(addr)
cluster.down(addr.address)
log.info(s"Instance ${instance.instanceId} terminated successfully")
}

Expand Down Expand Up @@ -115,7 +116,7 @@ private[akkeeper] class MonitoringService(instanceStorage: InstanceStorage.Async
private def onInstanceInfoUpdate(info: InstanceInfo): Unit = {
if (info.status != InstanceDeploying) {
val updatedInfo =
if (info.address.exists(!cluster.failureDetector.isAvailable(_))) {
if (info.address.exists(a => !cluster.failureDetector.isAvailable(a.address))) {
info.copy(status = InstanceUnreachable)
} else {
info
Expand Down Expand Up @@ -256,11 +257,11 @@ private[akkeeper] class MonitoringService(instanceStorage: InstanceStorage.Async

private def clusterEventReceive: Receive = {
case UnreachableMember(member) =>
updateInstanceStatusByAddr(member.address, InstanceUnreachable)
updateInstanceStatusByAddr(member.uniqueAddress, InstanceUnreachable)
case ReachableMember(member) =>
updateInstanceStatusByAddr(member.address, InstanceUp)
updateInstanceStatusByAddr(member.uniqueAddress, InstanceUp)
case MemberRemoved(member, _) =>
val info = findInstanceByAddr(member.address)
val info = findInstanceByAddr(member.uniqueAddress)
info.foreach(i => instances.remove(i.instanceId))
}

Expand Down
5 changes: 4 additions & 1 deletion akkeeper/src/test/scala/akkeeper/ActorTestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ package akkeeper

import akka.actor.ActorRef
import akka.pattern.gracefulStop
import scala.concurrent.duration._

trait ActorTestUtils extends AwaitMixin {

protected val gracefulStopTimeout: FiniteDuration = 6 seconds

protected def gracefulActorStop(actor: ActorRef): Unit = {
await(gracefulStop(actor, awaitTimeout))
await(gracefulStop(actor, gracefulStopTimeout))
}
}

0 comments on commit b9b07a8

Please sign in to comment.