Skip to content

Commit

Permalink
Merge 27b512b into b7fa2c0
Browse files Browse the repository at this point in the history
  • Loading branch information
izeigerman committed Oct 31, 2018
2 parents b7fa2c0 + 27b512b commit 0caaf26
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 7 deletions.
Expand Up @@ -32,7 +32,8 @@ class ContainerInstanceService(userActors: Seq[ActorLaunchContext],
instanceId: InstanceId,
masterAddress: Address,
registrationRetryInterval: FiniteDuration,
joinClusterTimeout: FiniteDuration)
joinClusterTimeout: FiniteDuration,
leaveClusterTimeout: FiniteDuration)
extends Actor with ActorLogging {

private implicit val dispatcher = context.dispatcher
Expand Down Expand Up @@ -95,6 +96,9 @@ class ContainerInstanceService(userActors: Seq[ActorLaunchContext],
private def terminateThisInstance(): Unit = {
cluster.leave(cluster.selfAddress)
cluster.registerOnMemberRemoved(context.system.terminate())
// Scheduling a timeout command.
context.become(leavingClusterReceive)
context.system.scheduler.scheduleOnce(leaveClusterTimeout, self, LeaveClusterTimeout)
}

private def initializedReceive: Receive = {
Expand Down Expand Up @@ -124,7 +128,14 @@ class ContainerInstanceService(userActors: Seq[ActorLaunchContext],
// Safely ignore the timeout command.
}

private def joiningTheClusterReceive: Receive = {
private def leavingClusterReceive: Receive = {
case LeaveClusterTimeout =>
log.warning(s"Couldn't leave the cluster after ${leaveClusterTimeout.toSeconds} seconds. " +
"Terminating this instance...")
context.system.terminate()
}

private def joiningClusterReceive: Receive = {
case InstanceJoinedCluster =>
log.debug("Successfully joined the cluster")
launchUserActors()
Expand All @@ -138,7 +149,7 @@ class ContainerInstanceService(userActors: Seq[ActorLaunchContext],

private def waitingForJoinCommandReceive: Receive = {
case JoinCluster =>
context.become(joiningTheClusterReceive)
context.become(joiningClusterReceive)
log.debug(s"Joining the cluster (master: $masterAddress)")
cluster.join(masterAddress)
cluster.registerOnMemberUp(self ! InstanceJoinedCluster)
Expand All @@ -153,9 +164,11 @@ object ContainerInstanceService {
private case object JoinCluster
private case object RetryRegistration
private case object JoinClusterTimeout
private case object LeaveClusterTimeout
private case object InstanceJoinedCluster
private[akkeeper] val DefaultRegistrationRetryInterval = 30 seconds
private[akkeeper] val DefaultJoinClusterTimeout = 120 seconds
private[akkeeper] val DefaultLeaveClusterTimeout = 30 seconds

val ActorName = "akkeeperInstance"

Expand All @@ -165,9 +178,10 @@ object ContainerInstanceService {
instanceId: InstanceId,
masterAddress: Address,
registrationRetryInterval: FiniteDuration = DefaultRegistrationRetryInterval,
joinClusterTimeout: FiniteDuration = DefaultJoinClusterTimeout): ActorRef = {
joinClusterTimeout: FiniteDuration = DefaultJoinClusterTimeout,
leaveClusterTimeout: FiniteDuration = DefaultLeaveClusterTimeout): ActorRef = {
val props = Props(classOf[ContainerInstanceService], userActors, instanceStorage,
instanceId, masterAddress, registrationRetryInterval, joinClusterTimeout)
instanceId, masterAddress, registrationRetryInterval, joinClusterTimeout, leaveClusterTimeout)
factory.actorOf(props, ActorName)
}
}
Expand Up @@ -68,10 +68,11 @@ class ContainerInstanceServiceSpec(system: ActorSystem) extends TestKit(system)
instanceId: InstanceId,
masterAddress: Address,
retryInterval: FiniteDuration = DefaultRegistrationRetryInterval,
joinClusterTimeout: FiniteDuration = DefaultJoinClusterTimeout
joinClusterTimeout: FiniteDuration = DefaultJoinClusterTimeout,
leaveClusterTimeout: FiniteDuration = DefaultLeaveClusterTimeout
): ActorRef = {
val props = Props(classOf[ContainerInstanceService], userActors, instanceStorage,
instanceId, masterAddress, retryInterval, joinClusterTimeout)
instanceId, masterAddress, retryInterval, joinClusterTimeout, leaveClusterTimeout)
childActorOf(props, ContainerInstanceService.ActorName)
}

Expand Down

0 comments on commit 0caaf26

Please sign in to comment.