From 4905d8e55bc541284b5f390904b66862b5eaab9e Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Thu, 17 Jan 2019 10:24:01 -0800 Subject: [PATCH] Refactor Instance Storage and Deploy Client. Drop unnecessary F[_] usage, always use Futures instead (#61) --- .../container/ContainerInstanceMain.scala | 2 +- .../service/ContainerInstanceService.scala | 4 +- .../scala/akkeeper/deploy/DeployClient.scala | 17 +++----- .../deploy/yarn/YarnApplicationMaster.scala | 8 ++-- .../scala/akkeeper/launcher/Launcher.scala | 10 ++--- .../akkeeper/launcher/yarn/YarnLauncher.scala | 2 +- .../scala/akkeeper/master/MasterRunner.scala | 8 ++-- .../master/service/DeployService.scala | 4 +- .../master/service/MasterService.scala | 8 ++-- .../service/MemberAutoDownService.scala | 4 +- .../master/service/MonitoringService.scala | 4 +- .../akkeeper/storage/InstanceStorage.scala | 23 +++++------ .../async/ZookeeperInstanceStorage.scala | 2 +- .../ContainerInstanceServiceSpec.scala | 10 ++--- .../master/service/DeployServiceSpec.scala | 10 ++--- .../master/service/MasterServiceSpec.scala | 24 +++++------ .../service/MemberAutoDownServiceSpec.scala | 12 +++--- .../service/MonitoringServiceSpec.scala | 40 +++++++++---------- .../async/ZookeeperInstanceStorageSpec.scala | 4 +- 19 files changed, 92 insertions(+), 104 deletions(-) diff --git a/akkeeper/src/main/scala/akkeeper/container/ContainerInstanceMain.scala b/akkeeper/src/main/scala/akkeeper/container/ContainerInstanceMain.scala index 5cc6728..9d25bd7 100644 --- a/akkeeper/src/main/scala/akkeeper/container/ContainerInstanceMain.scala +++ b/akkeeper/src/main/scala/akkeeper/container/ContainerInstanceMain.scala @@ -82,7 +82,7 @@ object ContainerInstanceMain extends App { val actorSystem = ActorSystem(instanceConfig.akkeeperAkka.actorSystemName, instanceConfig) val zkConfig = actorSystem.settings.config.zookeeper.clientConfig - val instanceStorage = InstanceStorageFactory.createAsync(zkConfig.child(instanceArgs.appId)) + val instanceStorage = InstanceStorageFactory(zkConfig.child(instanceArgs.appId)) val actorsJsonStr = Source.fromFile(instanceArgs.actors).getLines().mkString("\n") val actors = actorsJsonStr.parseJson.convertTo[Seq[ActorLaunchContext]] diff --git a/akkeeper/src/main/scala/akkeeper/container/service/ContainerInstanceService.scala b/akkeeper/src/main/scala/akkeeper/container/service/ContainerInstanceService.scala index c990ae9..c8b649d 100644 --- a/akkeeper/src/main/scala/akkeeper/container/service/ContainerInstanceService.scala +++ b/akkeeper/src/main/scala/akkeeper/container/service/ContainerInstanceService.scala @@ -27,7 +27,7 @@ import scala.util.control.NonFatal import ContainerInstanceService._ class ContainerInstanceService(userActors: Seq[ActorLaunchContext], - instanceStorage: InstanceStorage.Async, + instanceStorage: InstanceStorage, instanceId: InstanceId, masterAddress: Address, registrationRetryInterval: FiniteDuration, @@ -173,7 +173,7 @@ object ContainerInstanceService { def createLocal(factory: ActorRefFactory, userActors: Seq[ActorLaunchContext], - instanceStorage: InstanceStorage.Async, + instanceStorage: InstanceStorage, instanceId: InstanceId, masterAddress: Address, registrationRetryInterval: FiniteDuration = DefaultRegistrationRetryInterval, diff --git a/akkeeper/src/main/scala/akkeeper/deploy/DeployClient.scala b/akkeeper/src/main/scala/akkeeper/deploy/DeployClient.scala index 3e7d32a..d629f6f 100644 --- a/akkeeper/src/main/scala/akkeeper/deploy/DeployClient.scala +++ b/akkeeper/src/main/scala/akkeeper/deploy/DeployClient.scala @@ -20,7 +20,7 @@ import akkeeper.deploy.yarn._ import scala.concurrent.Future /** A client that is responsible for deploying new container instances. */ -private[akkeeper] trait DeployClient[F[_]] { +private[akkeeper] trait DeployClient { /** Starts the client. */ def start(): Unit @@ -44,11 +44,7 @@ private[akkeeper] trait DeployClient[F[_]] { * Each item in this list represents a result for a one particular instance. * See [[DeployResult]]. */ - def deploy(container: ContainerDefinition, instances: Seq[InstanceId]): Seq[F[DeployResult]] -} - -private[akkeeper] object DeployClient { - type Async = DeployClient[Future] + def deploy(container: ContainerDefinition, instances: Seq[InstanceId]): Seq[Future[DeployResult]] } /** A result of the deployment operation. Contains the ID of the instance to which this @@ -65,20 +61,19 @@ private[akkeeper] case class DeploySuccessful(instanceId: InstanceId) extends De private[akkeeper] case class DeployFailed(instanceId: InstanceId, e: Throwable) extends DeployResult -private[akkeeper] trait DeployClientFactory[F[_], T] extends (T => DeployClient[F]) +private[akkeeper] trait DeployClientFactory[T] extends (T => DeployClient) private[akkeeper] object DeployClientFactory { - type AsyncDeployClientFactory[T] = DeployClientFactory[Future, T] implicit object YarnDeployClientFactory - extends AsyncDeployClientFactory[YarnApplicationMasterConfig] { + extends DeployClientFactory[YarnApplicationMasterConfig] { - override def apply(config: YarnApplicationMasterConfig): DeployClient.Async = { + override def apply(config: YarnApplicationMasterConfig): DeployClient = { new YarnApplicationMaster(config, new YarnMasterClient) } } - def createAsync[T: AsyncDeployClientFactory](config: T): DeployClient.Async = { + def apply[T: DeployClientFactory](config: T): DeployClient = { implicitly[T](config) } } diff --git a/akkeeper/src/main/scala/akkeeper/deploy/yarn/YarnApplicationMaster.scala b/akkeeper/src/main/scala/akkeeper/deploy/yarn/YarnApplicationMaster.scala index 0812fb9..6c5cee5 100644 --- a/akkeeper/src/main/scala/akkeeper/deploy/yarn/YarnApplicationMaster.scala +++ b/akkeeper/src/main/scala/akkeeper/deploy/yarn/YarnApplicationMaster.scala @@ -31,7 +31,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest import org.slf4j.LoggerFactory import scala.collection.mutable -import scala.concurrent.{Future, Promise} +import scala.concurrent.{ExecutionContext, Future, Promise} import scala.util._ import scala.util.control.NonFatal import scala.collection.JavaConverters._ @@ -39,12 +39,13 @@ import YarnApplicationMaster._ private[akkeeper] class YarnApplicationMaster(config: YarnApplicationMasterConfig, yarnClient: YarnMasterClient) - extends DeployClient.Async { + extends DeployClient { private val logger = LoggerFactory.getLogger(classOf[YarnApplicationMaster]) private val executorService: ScheduledExecutorService = Executors.newScheduledThreadPool( config.config.yarn.clientThreads) + private implicit val executionContext: ExecutionContext = ExecutionContext.fromExecutor(executorService) private val stagingDirectory: String = config.config.yarn.stagingDirectory(config.yarnConf, config.appId) private val localResourceManager: YarnLocalResourceManager = @@ -227,8 +228,7 @@ private[akkeeper] class YarnApplicationMaster(config: YarnApplicationMasterConfi val request = buildContainerRequest(container) pendingInstances.put(request.getPriority.getPriority, container -> id) - yarnClient.addContainerRequest(request) - promise.future + Future(yarnClient.addContainerRequest(request)).flatMap(_ => promise.future) }) } diff --git a/akkeeper/src/main/scala/akkeeper/launcher/Launcher.scala b/akkeeper/src/main/scala/akkeeper/launcher/Launcher.scala index dc91537..e15e143 100644 --- a/akkeeper/src/main/scala/akkeeper/launcher/Launcher.scala +++ b/akkeeper/src/main/scala/akkeeper/launcher/Launcher.scala @@ -26,7 +26,7 @@ import scala.concurrent.{ExecutionContext, Future} final case class LaunchResult(appId: String, masterAddress: Address) /** Launcher for the Akkeeper application. */ -trait Launcher[F[_]] { +trait Launcher { /** Launches the Akkeeper application and returns a launch result. * @@ -36,19 +36,17 @@ trait Launcher[F[_]] { * submitted application and the address of the node where * Akkeeper Master is running. */ - def launch(config: Config, args: LaunchArguments): F[LaunchResult] + def launch(config: Config, args: LaunchArguments): Future[LaunchResult] } object Launcher { - def apply[F[_]](implicit l: Launcher[F]): Launcher[F] = l - def createYarnLauncher(yarnConfig: YarnConfiguration) - (implicit context: ExecutionContext): Launcher[Future] = { + (implicit context: ExecutionContext): Launcher = { new YarnLauncher(yarnConfig, () => new YarnLauncherClient) } def createYarnLauncher(yarnConfig: Configuration) - (implicit context: ExecutionContext): Launcher[Future] = { + (implicit context: ExecutionContext): Launcher = { createYarnLauncher(new YarnConfiguration(yarnConfig)) } } diff --git a/akkeeper/src/main/scala/akkeeper/launcher/yarn/YarnLauncher.scala b/akkeeper/src/main/scala/akkeeper/launcher/yarn/YarnLauncher.scala index edfd373..83f5cfe 100644 --- a/akkeeper/src/main/scala/akkeeper/launcher/yarn/YarnLauncher.scala +++ b/akkeeper/src/main/scala/akkeeper/launcher/yarn/YarnLauncher.scala @@ -38,7 +38,7 @@ import scala.concurrent.{ExecutionContext, Future} final class YarnLauncher(yarnConf: YarnConfiguration, yarnClientCreator: () => YarnLauncherClient) - (implicit context: ExecutionContext)extends Launcher[Future] { + (implicit context: ExecutionContext) extends Launcher { private val logger = LoggerFactory.getLogger(classOf[YarnLauncher]) diff --git a/akkeeper/src/main/scala/akkeeper/master/MasterRunner.scala b/akkeeper/src/main/scala/akkeeper/master/MasterRunner.scala index f27c317..d0db1bb 100644 --- a/akkeeper/src/main/scala/akkeeper/master/MasterRunner.scala +++ b/akkeeper/src/main/scala/akkeeper/master/MasterRunner.scala @@ -65,14 +65,14 @@ private[master] class YarnMasterRunner extends MasterRunner { private val logger = LoggerFactory.getLogger(classOf[YarnMasterRunner]) private def createInstanceStorage(actorSystem: ActorSystem, - appId: String): InstanceStorage.Async = { + appId: String): InstanceStorage = { val zkConfig = actorSystem.settings.config.zookeeper.clientConfig - InstanceStorageFactory.createAsync(zkConfig.child(appId)) + InstanceStorageFactory(zkConfig.child(appId)) } private def createDeployClient(actorSystem: ActorSystem, masterArgs: MasterArguments, - restApiPort: Int): DeployClient.Async = { + restApiPort: Int): DeployClient = { val yarnConf = YarnUtils.getYarnConfiguration val config = actorSystem.settings.config val selfAddr = Cluster(actorSystem).selfAddress @@ -83,7 +83,7 @@ private[master] class YarnMasterRunner extends MasterRunner { config = config, yarnConf = yarnConf, appId = masterArgs.appId, selfAddress = selfAddr, trackingUrl = trackingUrl, principal = principal) - DeployClientFactory.createAsync(yarnConfig) + DeployClientFactory(yarnConfig) } private def loginAndGetRenewer(config: Config, principal: String): KerberosTicketRenewer = { diff --git a/akkeeper/src/main/scala/akkeeper/master/service/DeployService.scala b/akkeeper/src/main/scala/akkeeper/master/service/DeployService.scala index 08f013a..cd3bf7c 100644 --- a/akkeeper/src/main/scala/akkeeper/master/service/DeployService.scala +++ b/akkeeper/src/main/scala/akkeeper/master/service/DeployService.scala @@ -22,7 +22,7 @@ import akkeeper.common._ import akkeeper.deploy._ import MonitoringService._ -private[akkeeper] class DeployService(deployClient: DeployClient.Async, +private[akkeeper] class DeployService(deployClient: DeployClient, containerService: ActorRef, monitoringService: ActorRef) extends RequestTrackingService { @@ -92,7 +92,7 @@ object DeployService extends RemoteServiceFactory { override val actorName = "deployService" private[akkeeper] def createLocal(factory: ActorRefFactory, - deployClient: DeployClient.Async, + deployClient: DeployClient, containerService: ActorRef, monitoringService: ActorRef): ActorRef = { factory.actorOf(Props(classOf[DeployService], deployClient, diff --git a/akkeeper/src/main/scala/akkeeper/master/service/MasterService.scala b/akkeeper/src/main/scala/akkeeper/master/service/MasterService.scala index 5856982..2f0ce6b 100644 --- a/akkeeper/src/main/scala/akkeeper/master/service/MasterService.scala +++ b/akkeeper/src/main/scala/akkeeper/master/service/MasterService.scala @@ -29,8 +29,8 @@ import com.typesafe.config.Config import scala.collection.{immutable, mutable} -private[akkeeper] class MasterService(deployClient: DeployClient.Async, - instanceStorage: InstanceStorage.Async) +private[akkeeper] class MasterService(deployClient: DeployClient, + instanceStorage: InstanceStorage) extends Actor with ActorLogging with Stash { private val config: Config = context.system.settings.config @@ -168,8 +168,8 @@ object MasterService extends RemoteServiceFactory { override val actorName = MasterServiceName - private[akkeeper] def createLocal(factory: ActorRefFactory, deployClient: DeployClient.Async, - instanceStorage: InstanceStorage.Async): ActorRef = { + private[akkeeper] def createLocal(factory: ActorRefFactory, deployClient: DeployClient, + instanceStorage: InstanceStorage): ActorRef = { factory.actorOf(Props(classOf[MasterService], deployClient, instanceStorage), actorName) } } diff --git a/akkeeper/src/main/scala/akkeeper/master/service/MemberAutoDownService.scala b/akkeeper/src/main/scala/akkeeper/master/service/MemberAutoDownService.scala index aceb015..91c091c 100644 --- a/akkeeper/src/main/scala/akkeeper/master/service/MemberAutoDownService.scala +++ b/akkeeper/src/main/scala/akkeeper/master/service/MemberAutoDownService.scala @@ -39,7 +39,7 @@ import MemberAutoDownService._ */ class MemberAutoDownService(targetAddress: UniqueAddress, targetInstanceId: InstanceId, - instanceStorage: InstanceStorage.Async, + instanceStorage: InstanceStorage, pollInterval: FiniteDuration) extends Actor with ActorLogging { @@ -103,7 +103,7 @@ object MemberAutoDownService { private[akkeeper] def createLocal(factory: ActorRefFactory, targetAddress: UniqueAddress, targetInstanceId: InstanceId, - instanceStorage: InstanceStorage.Async, + instanceStorage: InstanceStorage, pollInterval: FiniteDuration = DefaultPollInterval): ActorRef = { factory.actorOf(Props(classOf[MemberAutoDownService], targetAddress, targetInstanceId, instanceStorage, pollInterval), s"autoDown-$targetInstanceId") diff --git a/akkeeper/src/main/scala/akkeeper/master/service/MonitoringService.scala b/akkeeper/src/main/scala/akkeeper/master/service/MonitoringService.scala index 70c12e8..42740a4 100644 --- a/akkeeper/src/main/scala/akkeeper/master/service/MonitoringService.scala +++ b/akkeeper/src/main/scala/akkeeper/master/service/MonitoringService.scala @@ -30,7 +30,7 @@ import scala.util.control.NonFatal import scala.concurrent.duration.FiniteDuration import MonitoringService._ -private[akkeeper] class MonitoringService(instanceStorage: InstanceStorage.Async) +private[akkeeper] class MonitoringService(instanceStorage: InstanceStorage) extends RequestTrackingService with Stash { private val instanceLaunchTimeout: FiniteDuration = context.system.settings.config.monitoring.launchTimeout @@ -366,7 +366,7 @@ object MonitoringService extends RemoteServiceFactory { override val actorName = "monitoringService" private[akkeeper] def createLocal(factory: ActorRefFactory, - instanceStorage: InstanceStorage.Async): ActorRef = { + instanceStorage: InstanceStorage): ActorRef = { factory.actorOf(Props(classOf[MonitoringService], instanceStorage), actorName) } } diff --git a/akkeeper/src/main/scala/akkeeper/storage/InstanceStorage.scala b/akkeeper/src/main/scala/akkeeper/storage/InstanceStorage.scala index 585a94b..b35936c 100644 --- a/akkeeper/src/main/scala/akkeeper/storage/InstanceStorage.scala +++ b/akkeeper/src/main/scala/akkeeper/storage/InstanceStorage.scala @@ -21,7 +21,7 @@ import akkeeper.storage.zookeeper.async.ZookeeperInstanceStorage import scala.concurrent.Future /** A persistent storage that stores information about existing instances. */ -private[akkeeper] trait InstanceStorage[F[_]] extends Storage { +private[akkeeper] trait InstanceStorage extends Storage { /** Registers a new instance. Same instance can't be registered * more than once. The instance record must be removed automatically @@ -31,7 +31,7 @@ private[akkeeper] trait InstanceStorage[F[_]] extends Storage { * @param info the instance info. See [[InstanceInfo]]. * @return a container object with the registered instance ID. */ - def registerInstance(info: InstanceInfo): F[InstanceId] + def registerInstance(info: InstanceInfo): Future[InstanceId] /** Retrieves the information about the instance by its ID. * @@ -39,7 +39,7 @@ private[akkeeper] trait InstanceStorage[F[_]] extends Storage { * @return a container object with the instance's information. * See [[InstanceInfo]]. */ - def getInstance(instanceId: InstanceId): F[InstanceInfo] + def getInstance(instanceId: InstanceId): Future[InstanceInfo] /** Retrieves all instances that belong to the specified * container. @@ -47,33 +47,28 @@ private[akkeeper] trait InstanceStorage[F[_]] extends Storage { * @param containerName the name of the container. * @return a container object with the list of instance IDs. */ - def getInstancesByContainer(containerName: String): F[Seq[InstanceId]] + def getInstancesByContainer(containerName: String): Future[Seq[InstanceId]] /** Retrieves all existing instances. * * @return a container object with the list of instance IDs. */ - def getInstances: F[Seq[InstanceId]] + def getInstances: Future[Seq[InstanceId]] } -private[akkeeper] object InstanceStorage { - type Async = InstanceStorage[Future] -} - -private[akkeeper] trait InstanceStorageFactory[F[_], T] extends (T => InstanceStorage[F]) +private[akkeeper] trait InstanceStorageFactory[T] extends (T => InstanceStorage) private[akkeeper] object InstanceStorageFactory { - type AsyncInstanceStorageFactory[T] = InstanceStorageFactory[Future, T] implicit object ZookeeperInstanceStorageFactory - extends AsyncInstanceStorageFactory[ZookeeperClientConfig] { + extends InstanceStorageFactory[ZookeeperClientConfig] { - override def apply(config: ZookeeperClientConfig): InstanceStorage.Async = { + override def apply(config: ZookeeperClientConfig): InstanceStorage = { new ZookeeperInstanceStorage(config.child("instances")) } } - def createAsync[T: AsyncInstanceStorageFactory](config: T): InstanceStorage.Async = { + def apply[T: InstanceStorageFactory](config: T): InstanceStorage = { implicitly[T](config) } } diff --git a/akkeeper/src/main/scala/akkeeper/storage/zookeeper/async/ZookeeperInstanceStorage.scala b/akkeeper/src/main/scala/akkeeper/storage/zookeeper/async/ZookeeperInstanceStorage.scala index 5874ce5..56b9ec7 100644 --- a/akkeeper/src/main/scala/akkeeper/storage/zookeeper/async/ZookeeperInstanceStorage.scala +++ b/akkeeper/src/main/scala/akkeeper/storage/zookeeper/async/ZookeeperInstanceStorage.scala @@ -24,7 +24,7 @@ import ZookeeperInstanceStorage._ import InstanceStatusJsonProtocol._ private[akkeeper] class ZookeeperInstanceStorage(config: ZookeeperClientConfig) - extends BaseZookeeperStorage with InstanceStorage.Async { + extends BaseZookeeperStorage with InstanceStorage { protected override val zookeeperClient = new AsyncZookeeperClient(config, CreateMode.EPHEMERAL) diff --git a/akkeeper/src/test/scala/akkeeper/container/service/ContainerInstanceServiceSpec.scala b/akkeeper/src/test/scala/akkeeper/container/service/ContainerInstanceServiceSpec.scala index 09b65e5..ff33ea1 100644 --- a/akkeeper/src/test/scala/akkeeper/container/service/ContainerInstanceServiceSpec.scala +++ b/akkeeper/src/test/scala/akkeeper/container/service/ContainerInstanceServiceSpec.scala @@ -64,7 +64,7 @@ class ContainerInstanceServiceSpec(system: ActorSystem) extends TestKit(system) } private def createContainerInstanceService(userActors: Seq[ActorLaunchContext], - instanceStorage: InstanceStorage.Async, + instanceStorage: InstanceStorage, instanceId: InstanceId, masterAddress: Address, retryInterval: FiniteDuration = DefaultRegistrationRetryInterval, @@ -81,7 +81,7 @@ class ContainerInstanceServiceSpec(system: ActorSystem) extends TestKit(system) val instanceId = InstanceId("container") val expectedInstanceInfo = createExpectedInstanceInfo(instanceId, selfAddr) - val storage = mock[InstanceStorage.Async] + val storage = mock[InstanceStorage] (storage.start _).expects() (storage.stop _).expects() (storage.registerInstance _) @@ -108,7 +108,7 @@ class ContainerInstanceServiceSpec(system: ActorSystem) extends TestKit(system) val instanceId = InstanceId("container") val expectedInstanceInfo = createExpectedInstanceInfo(instanceId, selfAddr) - val storage = mock[InstanceStorage.Async] + val storage = mock[InstanceStorage] (storage.start _).expects() (storage.stop _).expects() val numberOfAttempts = 3 @@ -133,7 +133,7 @@ class ContainerInstanceServiceSpec(system: ActorSystem) extends TestKit(system) val newSystem = ActorSystem("ContainerInstanceServiceSpecTemp") val instanceId = InstanceId("container") - val storage = mock[InstanceStorage.Async] + val storage = mock[InstanceStorage] (storage.start _).expects() (storage.stop _).expects() @@ -155,7 +155,7 @@ class ContainerInstanceServiceSpec(system: ActorSystem) extends TestKit(system) val expectedInstanceInfo = createExpectedInstanceInfo(instanceId, newCluster.selfUniqueAddress, actorPath = "/user/akkeeperInstance/testActor") - val storage = mock[InstanceStorage.Async] + val storage = mock[InstanceStorage] (storage.start _).expects() (storage.stop _).expects() (storage.registerInstance _) diff --git a/akkeeper/src/test/scala/akkeeper/master/service/DeployServiceSpec.scala b/akkeeper/src/test/scala/akkeeper/master/service/DeployServiceSpec.scala index 332aacc..42ed8e9 100644 --- a/akkeeper/src/test/scala/akkeeper/master/service/DeployServiceSpec.scala +++ b/akkeeper/src/test/scala/akkeeper/master/service/DeployServiceSpec.scala @@ -45,7 +45,7 @@ class DeployServiceSpec(system: ActorSystem) extends TestKit(system) ContainerDefinition(name, cpus = cpus, memory = memory, actors = Seq(actor)) } - private def createDeployService(deployClient: DeployClient.Async, + private def createDeployService(deployClient: DeployClient, containerService: ActorRef, monitoringService: ActorRef): ActorRef = { childActorOf(Props(classOf[DeployService], deployClient, @@ -59,7 +59,7 @@ class DeployServiceSpec(system: ActorSystem) extends TestKit(system) jvmProperties = Map("property" -> "other_value")) val ids = (0 until 2).map(_ => InstanceId("container")) - val deployClient = mock[DeployClient.Async] + val deployClient = mock[DeployClient] (deployClient.start _).expects() (deployClient.stop _).expects() val deployResult = ids.map(id => Future successful DeploySuccessful(id)) @@ -112,7 +112,7 @@ class DeployServiceSpec(system: ActorSystem) extends TestKit(system) } it should "return an error if the specified container is invalid" in { - val deployClient = mock[DeployClient.Async] + val deployClient = mock[DeployClient] (deployClient.start _).expects() (deployClient.stop _).expects() @@ -135,7 +135,7 @@ class DeployServiceSpec(system: ActorSystem) extends TestKit(system) it should "fail the container deployment" in { val container = createContainer("container") val id = InstanceId("container") - val deployClient = mock[DeployClient.Async] + val deployClient = mock[DeployClient] (deployClient.start _).expects() (deployClient.stop _).expects() val deployResult = Future successful DeployFailed(id, new AkkeeperException("")) @@ -187,7 +187,7 @@ class DeployServiceSpec(system: ActorSystem) extends TestKit(system) it should "stop with an error" in { val exception = new AkkeeperException("fail") - val deployClient = mock[DeployClient.Async] + val deployClient = mock[DeployClient] (deployClient.start _).expects() (deployClient.stop _).expects() (deployClient.stopWithError _).expects(exception) diff --git a/akkeeper/src/test/scala/akkeeper/master/service/MasterServiceSpec.scala b/akkeeper/src/test/scala/akkeeper/master/service/MasterServiceSpec.scala index 3301d25..9a5f872 100644 --- a/akkeeper/src/test/scala/akkeeper/master/service/MasterServiceSpec.scala +++ b/akkeeper/src/test/scala/akkeeper/master/service/MasterServiceSpec.scala @@ -37,12 +37,12 @@ import MonitoringServiceSpec._ class MasterServiceSpec extends FlatSpecLike with Matchers with MockFactory { "A Master Service" should "initialize successfully and create a new cluster" in { - val storage = mock[InstanceStorage.Async] + val storage = mock[InstanceStorage] (storage.start _).expects() (storage.stop _).expects() (storage.getInstances _).expects().returns(Future successful Seq.empty) - val deployClient = mock[DeployClient.Async] + val deployClient = mock[DeployClient] (deployClient.start _).expects() (deployClient.stop _).expects() @@ -62,12 +62,12 @@ class MasterServiceSpec extends FlatSpecLike with Matchers with MockFactory { } ignore should "deploy initial instances" in { - val storage = mock[InstanceStorage.Async] + val storage = mock[InstanceStorage] (storage.start _).expects() (storage.stop _).expects() (storage.getInstances _).expects().returns(Future successful Seq.empty) - val deployClient = mock[DeployClient.Async] + val deployClient = mock[DeployClient] (deployClient.start _).expects() (deployClient.stop _).expects() (deployClient.deploy _) @@ -116,13 +116,13 @@ class MasterServiceSpec extends FlatSpecLike with Matchers with MockFactory { override def test(): Unit = { val selfAddr = Cluster(system).selfUniqueAddress val instance = createInstanceInfo("container").copy(address = Some(selfAddr)) - val storage = mock[InstanceStorage.Async] + val storage = mock[InstanceStorage] (storage.start _).expects() (storage.stop _).expects() (storage.getInstances _).expects().returns(Future successful Seq(instance.instanceId)) (storage.getInstance _).expects(instance.instanceId).returns(Future successful instance) - val deployClient = mock[DeployClient.Async] + val deployClient = mock[DeployClient] (deployClient.start _).expects() (deployClient.stop _).expects() @@ -140,14 +140,14 @@ class MasterServiceSpec extends FlatSpecLike with Matchers with MockFactory { } it should "proxy deploy and container requests" in { - val storage = mock[InstanceStorage.Async] + val storage = mock[InstanceStorage] (storage.start _).expects() (storage.stop _).expects() (storage.getInstances _).expects().returns(Future successful Seq.empty) val instanceIds = (0 until 2).map(_ => InstanceId("container1")) val deployFutures = instanceIds.map(id => Future successful DeploySuccessful(id)) - val deployClient = mock[DeployClient.Async] + val deployClient = mock[DeployClient] (deployClient.start _).expects() (deployClient.stop _).expects() (deployClient.deploy _) @@ -182,7 +182,7 @@ class MasterServiceSpec extends FlatSpecLike with Matchers with MockFactory { override def test(): Unit = { val selfAddr = Cluster(system).selfUniqueAddress val instance = createInstanceInfo("container").copy(address = Some(selfAddr)) - val storage = mock[InstanceStorage.Async] + val storage = mock[InstanceStorage] (storage.start _).expects() (storage.stop _).expects() (storage.getInstances _).expects().returns(Future successful Seq(instance.instanceId)) @@ -190,7 +190,7 @@ class MasterServiceSpec extends FlatSpecLike with Matchers with MockFactory { .expects(instance.instanceId) .returns(Future failed new AkkeeperException("")) - val deployClient = mock[DeployClient.Async] + val deployClient = mock[DeployClient] (deployClient.start _).expects() (deployClient.stop _).expects() (deployClient.stopWithError _).expects(*) @@ -205,12 +205,12 @@ class MasterServiceSpec extends FlatSpecLike with Matchers with MockFactory { it should "shutdown the Actor system if the termination request has been received" in { new MasterServiceTestRunner() { override def test(): Unit = { - val storage = mock[InstanceStorage.Async] + val storage = mock[InstanceStorage] (storage.start _).expects() (storage.stop _).expects() (storage.getInstances _).expects().returns(Future successful Seq.empty) - val deployClient = mock[DeployClient.Async] + val deployClient = mock[DeployClient] (deployClient.start _).expects() (deployClient.stop _).expects() diff --git a/akkeeper/src/test/scala/akkeeper/master/service/MemberAutoDownServiceSpec.scala b/akkeeper/src/test/scala/akkeeper/master/service/MemberAutoDownServiceSpec.scala index 123e436..86a54cc 100644 --- a/akkeeper/src/test/scala/akkeeper/master/service/MemberAutoDownServiceSpec.scala +++ b/akkeeper/src/test/scala/akkeeper/master/service/MemberAutoDownServiceSpec.scala @@ -41,7 +41,7 @@ class MemberAutoDownServiceSpec(system: ActorSystem) extends TestKit(system) private def createMemberAutdownService(targetAddress: UniqueAddress, targetInstanceId: InstanceId, - instanceStorage: InstanceStorage.Async, + instanceStorage: InstanceStorage, pollInterval: FiniteDuration = 30 seconds): ActorRef = { childActorOf(Props(classOf[MemberAutoDownService], targetAddress, targetInstanceId, instanceStorage, pollInterval), s"autoDown-$targetInstanceId") @@ -52,7 +52,7 @@ class MemberAutoDownServiceSpec(system: ActorSystem) extends TestKit(system) val address = UniqueAddress(Address("akka.tcp", "MemberAutoDownServiceSpec", "localhost", port), 1L) val instanceId = InstanceId("container") - val storage = mock[InstanceStorage.Async] + val storage = mock[InstanceStorage] (storage.getInstance _).expects(instanceId).returns(Future failed RecordNotFoundException("")) val service = createMemberAutdownService(address, instanceId, storage) @@ -68,7 +68,7 @@ class MemberAutoDownServiceSpec(system: ActorSystem) extends TestKit(system) val instanceId = InstanceId("container") val info = InstanceInfo(instanceId, InstanceUp, "", Set.empty, None, Set.empty) - val storage = mock[InstanceStorage.Async] + val storage = mock[InstanceStorage] (storage.getInstance _).expects(instanceId).returns(Future successful info).atLeastTwice() val service = createMemberAutdownService(address, instanceId, storage, 1 second) @@ -86,7 +86,7 @@ class MemberAutoDownServiceSpec(system: ActorSystem) extends TestKit(system) val instanceId = InstanceId("container") val info = InstanceInfo(instanceId, InstanceUp, "", Set.empty, None, Set.empty) - val storage = mock[InstanceStorage.Async] + val storage = mock[InstanceStorage] (storage.getInstance _).expects(instanceId).returns(Future successful info) val service = createMemberAutdownService(address, instanceId, storage) @@ -104,7 +104,7 @@ class MemberAutoDownServiceSpec(system: ActorSystem) extends TestKit(system) val instanceId = InstanceId("container") val info = InstanceInfo(instanceId, InstanceUp, "", Set.empty, None, Set.empty) - val storage = mock[InstanceStorage.Async] + val storage = mock[InstanceStorage] (storage.getInstance _).expects(instanceId).returns(Future successful info) val service = createMemberAutdownService(address, instanceId, storage) @@ -120,7 +120,7 @@ class MemberAutoDownServiceSpec(system: ActorSystem) extends TestKit(system) val address = UniqueAddress(Address("akka.tcp", "MemberAutoDownServiceSpec", "localhost", port), 1L) val instanceId = InstanceId("container") - val storage = mock[InstanceStorage.Async] + val storage = mock[InstanceStorage] (storage.getInstance _).expects(instanceId).returns(Future failed new Exception("")).atLeastTwice() val service = createMemberAutdownService(address, instanceId, storage, 1 second) diff --git a/akkeeper/src/test/scala/akkeeper/master/service/MonitoringServiceSpec.scala b/akkeeper/src/test/scala/akkeeper/master/service/MonitoringServiceSpec.scala index d120725..bca58b3 100644 --- a/akkeeper/src/test/scala/akkeeper/master/service/MonitoringServiceSpec.scala +++ b/akkeeper/src/test/scala/akkeeper/master/service/MonitoringServiceSpec.scala @@ -43,7 +43,7 @@ class MonitoringServiceSpec(system: ActorSystem) extends TestKit(system) super.afterAll() } - private def createMonitoringService(instanceStorage: InstanceStorage.Async): ActorRef = { + private def createMonitoringService(instanceStorage: InstanceStorage): ActorRef = { childActorOf(Props(classOf[MonitoringService], instanceStorage), MonitoringService.actorName) } @@ -54,7 +54,7 @@ class MonitoringServiceSpec(system: ActorSystem) extends TestKit(system) Thread.sleep(sleepMs) throw new AkkeeperException("") } - val storage = mock[InstanceStorage.Async] + val storage = mock[InstanceStorage] (storage.start _).expects() (storage.stop _).expects() (storage.getInstances _).expects().returns(delayedResponse) @@ -68,7 +68,7 @@ class MonitoringServiceSpec(system: ActorSystem) extends TestKit(system) } it should "become initialized successfully when there is no active instances" in { - val storage = mock[InstanceStorage.Async] + val storage = mock[InstanceStorage] (storage.start _).expects() (storage.stop _).expects() (storage.getInstances _).expects().returns(Future successful Seq.empty) @@ -86,7 +86,7 @@ class MonitoringServiceSpec(system: ActorSystem) extends TestKit(system) it should "become initialized successfully and return the instance info" in { val instance = createInstanceInfo() - val storage = mock[InstanceStorage.Async] + val storage = mock[InstanceStorage] (storage.start _).expects() (storage.stop _).expects() (storage.getInstances _).expects().returns(Future successful Seq(instance.instanceId)) @@ -113,7 +113,7 @@ class MonitoringServiceSpec(system: ActorSystem) extends TestKit(system) it should "respond properly if the instance is not found" in { val instance = createInstanceInfo() - val storage = mock[InstanceStorage.Async] + val storage = mock[InstanceStorage] (storage.start _).expects() (storage.stop _).expects() (storage.getInstances _).expects().returns(Future successful Seq(instance.instanceId)) @@ -137,7 +137,7 @@ class MonitoringServiceSpec(system: ActorSystem) extends TestKit(system) it should "respond properly if the remote storage error occurs" in { val instance = createInstanceInfo() - val storage = mock[InstanceStorage.Async] + val storage = mock[InstanceStorage] (storage.start _).expects() (storage.stop _).expects() (storage.getInstances _).expects().returns(Future successful Seq(instance.instanceId)) @@ -184,7 +184,7 @@ class MonitoringServiceSpec(system: ActorSystem) extends TestKit(system) val instance2 = createInstanceInfo() val ids = Seq(instance1.instanceId, instance2.instanceId) - val storage = mock[InstanceStorage.Async] + val storage = mock[InstanceStorage] (storage.start _).expects() (storage.stop _).expects() (storage.getInstances _).expects().returns(Future successful ids) @@ -212,7 +212,7 @@ class MonitoringServiceSpec(system: ActorSystem) extends TestKit(system) val instance2 = createInstanceInfo("container2") val ids = Seq(instance1.instanceId, instance2.instanceId) - val storage = mock[InstanceStorage.Async] + val storage = mock[InstanceStorage] (storage.start _).expects() (storage.stop _).expects() (storage.getInstances _).expects().returns(Future successful ids) @@ -250,7 +250,7 @@ class MonitoringServiceSpec(system: ActorSystem) extends TestKit(system) val ids = Seq(instance1.instanceId, instance2.instanceId, instance3.instanceId, instance4.instanceId) - val storage = mock[InstanceStorage.Async] + val storage = mock[InstanceStorage] (storage.start _).expects() (storage.stop _).expects() (storage.getInstances _).expects().returns(Future successful ids) @@ -297,7 +297,7 @@ class MonitoringServiceSpec(system: ActorSystem) extends TestKit(system) val instance2 = createInstanceInfo("container1") val ids = Seq(instance1.instanceId, instance2.instanceId) - val storage = mock[InstanceStorage.Async] + val storage = mock[InstanceStorage] (storage.start _).expects() (storage.stop _).expects() (storage.getInstances _).expects().returns(Future successful ids) @@ -330,7 +330,7 @@ class MonitoringServiceSpec(system: ActorSystem) extends TestKit(system) it should "terminate instance successfully (instance from storage)" in { val selfAddr = Cluster(system).selfUniqueAddress val instance = createInstanceInfo("container").copy(address = Some(selfAddr)) - val storage = mock[InstanceStorage.Async] + val storage = mock[InstanceStorage] (storage.start _).expects() (storage.stop _).expects() (storage.getInstances _).expects().returns(Future successful Seq(instance.instanceId)) @@ -352,7 +352,7 @@ class MonitoringServiceSpec(system: ActorSystem) extends TestKit(system) it should "fail to terminate the instance (instance from local cache)" in { val selfAddr = Cluster(system).selfUniqueAddress val instance = createInstanceInfo("container").copy(address = Some(selfAddr)) - val storage = mock[InstanceStorage.Async] + val storage = mock[InstanceStorage] (storage.start _).expects() (storage.stop _).expects() (storage.getInstances _).expects().returns(Future successful Seq(instance.instanceId)) @@ -378,7 +378,7 @@ class MonitoringServiceSpec(system: ActorSystem) extends TestKit(system) it should "fail to terminate the instance" in { val selfAddr = Cluster(system).selfUniqueAddress val instance = createInstanceInfo("container").copy(address = Some(selfAddr)) - val storage = mock[InstanceStorage.Async] + val storage = mock[InstanceStorage] val exception = new AkkeeperException("fail") (storage.start _).expects() (storage.stop _).expects() @@ -399,7 +399,7 @@ class MonitoringServiceSpec(system: ActorSystem) extends TestKit(system) } it should "update instance's info" in { - val storage = mock[InstanceStorage.Async] + val storage = mock[InstanceStorage] val instanceId1 = InstanceId("container") val instance1 = InstanceInfo.deploying(instanceId1) val instanceId2 = InstanceId("container") @@ -429,7 +429,7 @@ class MonitoringServiceSpec(system: ActorSystem) extends TestKit(system) } it should "handle cluster events" in { - val storage = mock[InstanceStorage.Async] + val storage = mock[InstanceStorage] val uniqueAddr = Cluster(system).selfUniqueAddress val member = createTestMember(uniqueAddr) @@ -468,7 +468,7 @@ class MonitoringServiceSpec(system: ActorSystem) extends TestKit(system) } it should "stop with an error" in { - val storage = mock[InstanceStorage.Async] + val storage = mock[InstanceStorage] (storage.start _).expects() (storage.stop _).expects() (storage.getInstances _).expects().returns(Future successful Seq.empty) @@ -484,7 +484,7 @@ class MonitoringServiceSpec(system: ActorSystem) extends TestKit(system) } it should "remove instances which hasn't been transitioned from the launching state" in { - val storage = mock[InstanceStorage.Async] + val storage = mock[InstanceStorage] val instanceId1 = InstanceId("container") val instance1 = InstanceInfo.launching(instanceId1) val instanceId2 = InstanceId("container") @@ -516,7 +516,7 @@ class MonitoringServiceSpec(system: ActorSystem) extends TestKit(system) } it should "remove instances whose deployment has failed" in { - val storage = mock[InstanceStorage.Async] + val storage = mock[InstanceStorage] val instanceId1 = InstanceId("container") val instance1 = InstanceInfo.deploying(instanceId1) (storage.start _).expects() @@ -539,7 +539,7 @@ class MonitoringServiceSpec(system: ActorSystem) extends TestKit(system) } it should "terminate instance that has been considered dead previously" in { - val storage = mock[InstanceStorage.Async] + val storage = mock[InstanceStorage] val instanceId1 = InstanceId("container") val instance1 = InstanceInfo.launching(instanceId1) (storage.start _).expects() @@ -568,7 +568,7 @@ class MonitoringServiceSpec(system: ActorSystem) extends TestKit(system) } it should "immediately remove instances which are not members of the cluster" in { - val storage = mock[InstanceStorage.Async] + val storage = mock[InstanceStorage] val port = 12345 val addr = Address("akka.tcp", system.name, "localhost", port) diff --git a/akkeeper/src/test/scala/akkeeper/storage/zookeeper/async/ZookeeperInstanceStorageSpec.scala b/akkeeper/src/test/scala/akkeeper/storage/zookeeper/async/ZookeeperInstanceStorageSpec.scala index 457c73f..97565b1 100644 --- a/akkeeper/src/test/scala/akkeeper/storage/zookeeper/async/ZookeeperInstanceStorageSpec.scala +++ b/akkeeper/src/test/scala/akkeeper/storage/zookeeper/async/ZookeeperInstanceStorageSpec.scala @@ -27,13 +27,13 @@ import org.scalatest.{FlatSpec, Matchers} class ZookeeperInstanceStorageSpec extends FlatSpec with Matchers with AwaitMixin with ZookeeperBaseSpec { - private def withStorage[T](f: InstanceStorage.Async => T): T = { + private def withStorage[T](f: InstanceStorage => T): T = { val zookeeper = new ZookeeperServer() val connectionInterval = 1000 val clientConfig = ZookeeperClientConfig(zookeeper.getConnectString, connectionInterval, 1, None, "akkeeper") val storage = new ZookeeperInstanceStorage(clientConfig) - withStorage[InstanceStorage.Async, T](storage, zookeeper)(f) + withStorage[InstanceStorage, T](storage, zookeeper)(f) } private def createInstanceStatus(container: String): InstanceInfo = {