Skip to content

Commit

Permalink
Merge 258007d into c4e9b33
Browse files Browse the repository at this point in the history
  • Loading branch information
izeigerman committed Jan 14, 2019
2 parents c4e9b33 + 258007d commit 4f68ce4
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 25 deletions.
Expand Up @@ -64,8 +64,18 @@ private[akkeeper] class MonitoringService(instanceStorage: InstanceStorage.Async
.pipeTo(self)
}

private def removeInstance(instanceId: InstanceId): Unit = {
instances.remove(instanceId)
launchTimeoutTasks.remove(instanceId)
pendingTermination.remove(instanceId)
}

private def isKnownAddress(address: UniqueAddress): Boolean = {
address == cluster.selfUniqueAddress || cluster.state.members.exists(_.uniqueAddress == address)
}

private def memberAutoDown(instanceId: InstanceId, instanceAddr: UniqueAddress): Unit = {
log.debug(s"Launching the auto down job for instance ${instanceAddr.address}")
log.info(s"Launching the auto down job for instance $instanceId (${instanceAddr.address})")
val autoDownService = MemberAutoDownService.createLocal(context,
instanceAddr, instanceId, instanceStorage)
autoDownService ! MemberAutoDownService.PollInstanceStatus
Expand Down Expand Up @@ -126,20 +136,19 @@ private[akkeeper] class MonitoringService(instanceStorage: InstanceStorage.Async
}

private def onInstanceInfoUpdate(info: InstanceInfo): Unit = {
if (deadInstances.contains(info.instanceId)) {
val address = info.address
if (address.isDefined && !isKnownAddress(address.get)) {
log.error(s"Received update from the instance ${info.instanceId} (${address.get.address}) " +
"that doesn't belong to the cluster. Removing it from the local storage")
removeInstance(info.instanceId)
} else if (deadInstances.contains(info.instanceId)) {
log.warning(s"Received update from the dead instance ${info.instanceId}. " +
"Attempting to terminate this instance")
if (info.address.nonEmpty) {
terminateInstance(info)
}
} else if (info.status != InstanceDeploying) {
val updatedInfo =
if (info.address.exists(a => !cluster.failureDetector.isAvailable(a.address))) {
info.copy(status = InstanceUnreachable)
} else {
info
}
instances.put(info.instanceId, Some(updatedInfo))
} else {
instances.put(info.instanceId, Some(info))

if (info.status == InstanceDeployFailed) {
log.error(s"Instance ${info.instanceId} deployment failed")
Expand All @@ -149,12 +158,10 @@ private[akkeeper] class MonitoringService(instanceStorage: InstanceStorage.Async
instanceLaunchTimeout, self,
InstanceLaunchTimeout(info.instanceId))
launchTimeoutTasks.put(info.instanceId, timeoutTask)
} else {
} else if (info.status != InstanceDeploying) {
launchTimeoutTasks.get(info.instanceId).foreach(_.cancel())
launchTimeoutTasks.remove(info.instanceId)
}
} else {
instances.put(info.instanceId, Some(info))
}
if (pendingTermination.contains(info.instanceId)) {
// This instance is pending termination.
Expand Down Expand Up @@ -296,13 +303,17 @@ private[akkeeper] class MonitoringService(instanceStorage: InstanceStorage.Async
private def clusterEventReceive: Receive = {
case UnreachableMember(member) =>
if (member.status != MemberStatus.Exiting) {
findInstanceByAddr(member.uniqueAddress)
.foreach(i => {
findInstanceByAddr(member.uniqueAddress) match {
case Some(i) =>
// Update the instance's status and launch an actor which will automatically
// eliminate an unreachable member from the cluster.
instances.put(i.instanceId, Some(i.copy(status = InstanceUnreachable)))
memberAutoDown(i.instanceId, member.uniqueAddress)
})
case None =>
log.warning(s"Unknown unreachable cluster member ${member.address}. " +
"Removing it from the cluster")
cluster.down(member.address)
}
} else {
// Receiving the UNREACHABLE event for exiting members - is an expected behavior.
cluster.down(member.address)
Expand All @@ -312,11 +323,12 @@ private[akkeeper] class MonitoringService(instanceStorage: InstanceStorage.Async
case MemberUp(member) =>
updateInstanceStatusByAddr(member.uniqueAddress, InstanceUp)
case MemberRemoved(member, _) =>
val info = findInstanceByAddr(member.uniqueAddress)
info.foreach { i =>
instances.remove(i.instanceId)
launchTimeoutTasks.remove(i.instanceId)
pendingTermination.remove(i.instanceId)
findInstanceByAddr(member.uniqueAddress) match {
case Some(i) =>
log.info(s"Removing instance ${i.instanceId} (${member.address}) from the cluster")
removeInstance(i.instanceId)
case None =>
log.warning(s"Unknown cluster member ${member.address} has been removed")
}
}

Expand Down
Expand Up @@ -22,7 +22,7 @@ import akka.testkit.{ImplicitSender, TestKit}
import akkeeper._
import akkeeper.api._
import akkeeper.common._
import akkeeper.storage.InstanceStorage
import akkeeper.storage.{InstanceStorage, RecordNotFoundException}
import org.scalamock.scalatest.MockFactory
import org.scalatest._

Expand Down Expand Up @@ -431,9 +431,7 @@ class MonitoringServiceSpec(system: ActorSystem) extends TestKit(system)
it should "handle cluster events" in {
val storage = mock[InstanceStorage.Async]

val port = 12345
val addr = Address("akka.tcp", system.name, "localhost", port)
val uniqueAddr = UniqueAddress(addr, 1L)
val uniqueAddr = Cluster(system).selfUniqueAddress
val member = createTestMember(uniqueAddr)

val instance = createInstanceInfo("container").copy(address = Some(uniqueAddr))
Expand Down Expand Up @@ -568,6 +566,33 @@ class MonitoringServiceSpec(system: ActorSystem) extends TestKit(system)

gracefulActorStop(service)
}

it should "immediately remove instances which are not members of the cluster" in {
val storage = mock[InstanceStorage.Async]

val port = 12345
val addr = Address("akka.tcp", system.name, "localhost", port)
val unknownUniqueAddr = UniqueAddress(addr, 1L)

val instance = createInstanceInfo("container").copy(address = Some(unknownUniqueAddr))
(storage.start _).expects()
(storage.stop _).expects()
(storage.getInstances _)
.expects()
.returns(Future successful Seq(instance.instanceId))
(storage.getInstance _)
.expects(instance.instanceId)
.returns(Future failed RecordNotFoundException(""))
.anyNumberOfTimes()

val service = createMonitoringService(storage)

service ! instance
service ! GetInstance(instance.instanceId)
expectMsgClass(classOf[InstanceNotFound])

gracefulActorStop(service)
}
}

object MonitoringServiceSpec {
Expand Down

0 comments on commit 4f68ce4

Please sign in to comment.