Skip to content

Commit

Permalink
Refactor Instance Storage and Deploy Client. Drop unnecessary F[_] us…
Browse files Browse the repository at this point in the history
…age, always use Futures instead (#61)
  • Loading branch information
izeigerman committed Jan 17, 2019
1 parent ea5f49a commit 4905d8e
Show file tree
Hide file tree
Showing 19 changed files with 92 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ object ContainerInstanceMain extends App {
val actorSystem = ActorSystem(instanceConfig.akkeeperAkka.actorSystemName, instanceConfig)

val zkConfig = actorSystem.settings.config.zookeeper.clientConfig
val instanceStorage = InstanceStorageFactory.createAsync(zkConfig.child(instanceArgs.appId))
val instanceStorage = InstanceStorageFactory(zkConfig.child(instanceArgs.appId))

val actorsJsonStr = Source.fromFile(instanceArgs.actors).getLines().mkString("\n")
val actors = actorsJsonStr.parseJson.convertTo[Seq[ActorLaunchContext]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.util.control.NonFatal
import ContainerInstanceService._

class ContainerInstanceService(userActors: Seq[ActorLaunchContext],
instanceStorage: InstanceStorage.Async,
instanceStorage: InstanceStorage,
instanceId: InstanceId,
masterAddress: Address,
registrationRetryInterval: FiniteDuration,
Expand Down Expand Up @@ -173,7 +173,7 @@ object ContainerInstanceService {

def createLocal(factory: ActorRefFactory,
userActors: Seq[ActorLaunchContext],
instanceStorage: InstanceStorage.Async,
instanceStorage: InstanceStorage,
instanceId: InstanceId,
masterAddress: Address,
registrationRetryInterval: FiniteDuration = DefaultRegistrationRetryInterval,
Expand Down
17 changes: 6 additions & 11 deletions akkeeper/src/main/scala/akkeeper/deploy/DeployClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import akkeeper.deploy.yarn._
import scala.concurrent.Future

/** A client that is responsible for deploying new container instances. */
private[akkeeper] trait DeployClient[F[_]] {
private[akkeeper] trait DeployClient {

/** Starts the client. */
def start(): Unit
Expand All @@ -44,11 +44,7 @@ private[akkeeper] trait DeployClient[F[_]] {
* Each item in this list represents a result for a one particular instance.
* See [[DeployResult]].
*/
def deploy(container: ContainerDefinition, instances: Seq[InstanceId]): Seq[F[DeployResult]]
}

private[akkeeper] object DeployClient {
type Async = DeployClient[Future]
def deploy(container: ContainerDefinition, instances: Seq[InstanceId]): Seq[Future[DeployResult]]
}

/** A result of the deployment operation. Contains the ID of the instance to which this
Expand All @@ -65,20 +61,19 @@ private[akkeeper] case class DeploySuccessful(instanceId: InstanceId) extends De
private[akkeeper] case class DeployFailed(instanceId: InstanceId,
e: Throwable) extends DeployResult

private[akkeeper] trait DeployClientFactory[F[_], T] extends (T => DeployClient[F])
private[akkeeper] trait DeployClientFactory[T] extends (T => DeployClient)

private[akkeeper] object DeployClientFactory {
type AsyncDeployClientFactory[T] = DeployClientFactory[Future, T]

implicit object YarnDeployClientFactory
extends AsyncDeployClientFactory[YarnApplicationMasterConfig] {
extends DeployClientFactory[YarnApplicationMasterConfig] {

override def apply(config: YarnApplicationMasterConfig): DeployClient.Async = {
override def apply(config: YarnApplicationMasterConfig): DeployClient = {
new YarnApplicationMaster(config, new YarnMasterClient)
}
}

def createAsync[T: AsyncDeployClientFactory](config: T): DeployClient.Async = {
def apply[T: DeployClientFactory](config: T): DeployClient = {
implicitly[T](config)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,21 @@ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.slf4j.LoggerFactory

import scala.collection.mutable
import scala.concurrent.{Future, Promise}
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util._
import scala.util.control.NonFatal
import scala.collection.JavaConverters._
import YarnApplicationMaster._

private[akkeeper] class YarnApplicationMaster(config: YarnApplicationMasterConfig,
yarnClient: YarnMasterClient)
extends DeployClient.Async {
extends DeployClient {

private val logger = LoggerFactory.getLogger(classOf[YarnApplicationMaster])

private val executorService: ScheduledExecutorService = Executors.newScheduledThreadPool(
config.config.yarn.clientThreads)
private implicit val executionContext: ExecutionContext = ExecutionContext.fromExecutor(executorService)

private val stagingDirectory: String = config.config.yarn.stagingDirectory(config.yarnConf, config.appId)
private val localResourceManager: YarnLocalResourceManager =
Expand Down Expand Up @@ -227,8 +228,7 @@ private[akkeeper] class YarnApplicationMaster(config: YarnApplicationMasterConfi
val request = buildContainerRequest(container)
pendingInstances.put(request.getPriority.getPriority, container -> id)

yarnClient.addContainerRequest(request)
promise.future
Future(yarnClient.addContainerRequest(request)).flatMap(_ => promise.future)
})
}

Expand Down
10 changes: 4 additions & 6 deletions akkeeper/src/main/scala/akkeeper/launcher/Launcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.concurrent.{ExecutionContext, Future}
final case class LaunchResult(appId: String, masterAddress: Address)

/** Launcher for the Akkeeper application. */
trait Launcher[F[_]] {
trait Launcher {

/** Launches the Akkeeper application and returns a launch result.
*
Expand All @@ -36,19 +36,17 @@ trait Launcher[F[_]] {
* submitted application and the address of the node where
* Akkeeper Master is running.
*/
def launch(config: Config, args: LaunchArguments): F[LaunchResult]
def launch(config: Config, args: LaunchArguments): Future[LaunchResult]
}

object Launcher {
def apply[F[_]](implicit l: Launcher[F]): Launcher[F] = l

def createYarnLauncher(yarnConfig: YarnConfiguration)
(implicit context: ExecutionContext): Launcher[Future] = {
(implicit context: ExecutionContext): Launcher = {
new YarnLauncher(yarnConfig, () => new YarnLauncherClient)
}

def createYarnLauncher(yarnConfig: Configuration)
(implicit context: ExecutionContext): Launcher[Future] = {
(implicit context: ExecutionContext): Launcher = {
createYarnLauncher(new YarnConfiguration(yarnConfig))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import scala.concurrent.{ExecutionContext, Future}

final class YarnLauncher(yarnConf: YarnConfiguration,
yarnClientCreator: () => YarnLauncherClient)
(implicit context: ExecutionContext)extends Launcher[Future] {
(implicit context: ExecutionContext) extends Launcher {

private val logger = LoggerFactory.getLogger(classOf[YarnLauncher])

Expand Down
8 changes: 4 additions & 4 deletions akkeeper/src/main/scala/akkeeper/master/MasterRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,14 @@ private[master] class YarnMasterRunner extends MasterRunner {
private val logger = LoggerFactory.getLogger(classOf[YarnMasterRunner])

private def createInstanceStorage(actorSystem: ActorSystem,
appId: String): InstanceStorage.Async = {
appId: String): InstanceStorage = {
val zkConfig = actorSystem.settings.config.zookeeper.clientConfig
InstanceStorageFactory.createAsync(zkConfig.child(appId))
InstanceStorageFactory(zkConfig.child(appId))
}

private def createDeployClient(actorSystem: ActorSystem,
masterArgs: MasterArguments,
restApiPort: Int): DeployClient.Async = {
restApiPort: Int): DeployClient = {
val yarnConf = YarnUtils.getYarnConfiguration
val config = actorSystem.settings.config
val selfAddr = Cluster(actorSystem).selfAddress
Expand All @@ -83,7 +83,7 @@ private[master] class YarnMasterRunner extends MasterRunner {
config = config, yarnConf = yarnConf,
appId = masterArgs.appId, selfAddress = selfAddr, trackingUrl = trackingUrl,
principal = principal)
DeployClientFactory.createAsync(yarnConfig)
DeployClientFactory(yarnConfig)
}

private def loginAndGetRenewer(config: Config, principal: String): KerberosTicketRenewer = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import akkeeper.common._
import akkeeper.deploy._
import MonitoringService._

private[akkeeper] class DeployService(deployClient: DeployClient.Async,
private[akkeeper] class DeployService(deployClient: DeployClient,
containerService: ActorRef,
monitoringService: ActorRef) extends RequestTrackingService {

Expand Down Expand Up @@ -92,7 +92,7 @@ object DeployService extends RemoteServiceFactory {
override val actorName = "deployService"

private[akkeeper] def createLocal(factory: ActorRefFactory,
deployClient: DeployClient.Async,
deployClient: DeployClient,
containerService: ActorRef,
monitoringService: ActorRef): ActorRef = {
factory.actorOf(Props(classOf[DeployService], deployClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import com.typesafe.config.Config
import scala.collection.{immutable, mutable}


private[akkeeper] class MasterService(deployClient: DeployClient.Async,
instanceStorage: InstanceStorage.Async)
private[akkeeper] class MasterService(deployClient: DeployClient,
instanceStorage: InstanceStorage)
extends Actor with ActorLogging with Stash {

private val config: Config = context.system.settings.config
Expand Down Expand Up @@ -168,8 +168,8 @@ object MasterService extends RemoteServiceFactory {

override val actorName = MasterServiceName

private[akkeeper] def createLocal(factory: ActorRefFactory, deployClient: DeployClient.Async,
instanceStorage: InstanceStorage.Async): ActorRef = {
private[akkeeper] def createLocal(factory: ActorRefFactory, deployClient: DeployClient,
instanceStorage: InstanceStorage): ActorRef = {
factory.actorOf(Props(classOf[MasterService], deployClient, instanceStorage), actorName)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import MemberAutoDownService._
*/
class MemberAutoDownService(targetAddress: UniqueAddress,
targetInstanceId: InstanceId,
instanceStorage: InstanceStorage.Async,
instanceStorage: InstanceStorage,
pollInterval: FiniteDuration)
extends Actor with ActorLogging {

Expand Down Expand Up @@ -103,7 +103,7 @@ object MemberAutoDownService {
private[akkeeper] def createLocal(factory: ActorRefFactory,
targetAddress: UniqueAddress,
targetInstanceId: InstanceId,
instanceStorage: InstanceStorage.Async,
instanceStorage: InstanceStorage,
pollInterval: FiniteDuration = DefaultPollInterval): ActorRef = {
factory.actorOf(Props(classOf[MemberAutoDownService], targetAddress,
targetInstanceId, instanceStorage, pollInterval), s"autoDown-$targetInstanceId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import scala.util.control.NonFatal
import scala.concurrent.duration.FiniteDuration
import MonitoringService._

private[akkeeper] class MonitoringService(instanceStorage: InstanceStorage.Async)
private[akkeeper] class MonitoringService(instanceStorage: InstanceStorage)
extends RequestTrackingService with Stash {

private val instanceLaunchTimeout: FiniteDuration = context.system.settings.config.monitoring.launchTimeout
Expand Down Expand Up @@ -366,7 +366,7 @@ object MonitoringService extends RemoteServiceFactory {
override val actorName = "monitoringService"

private[akkeeper] def createLocal(factory: ActorRefFactory,
instanceStorage: InstanceStorage.Async): ActorRef = {
instanceStorage: InstanceStorage): ActorRef = {
factory.actorOf(Props(classOf[MonitoringService], instanceStorage), actorName)
}
}
23 changes: 9 additions & 14 deletions akkeeper/src/main/scala/akkeeper/storage/InstanceStorage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import akkeeper.storage.zookeeper.async.ZookeeperInstanceStorage
import scala.concurrent.Future

/** A persistent storage that stores information about existing instances. */
private[akkeeper] trait InstanceStorage[F[_]] extends Storage {
private[akkeeper] trait InstanceStorage extends Storage {

/** Registers a new instance. Same instance can't be registered
* more than once. The instance record must be removed automatically
Expand All @@ -31,49 +31,44 @@ private[akkeeper] trait InstanceStorage[F[_]] extends Storage {
* @param info the instance info. See [[InstanceInfo]].
* @return a container object with the registered instance ID.
*/
def registerInstance(info: InstanceInfo): F[InstanceId]
def registerInstance(info: InstanceInfo): Future[InstanceId]

/** Retrieves the information about the instance by its ID.
*
* @param instanceId the ID of the instance.
* @return a container object with the instance's information.
* See [[InstanceInfo]].
*/
def getInstance(instanceId: InstanceId): F[InstanceInfo]
def getInstance(instanceId: InstanceId): Future[InstanceInfo]

/** Retrieves all instances that belong to the specified
* container.
*
* @param containerName the name of the container.
* @return a container object with the list of instance IDs.
*/
def getInstancesByContainer(containerName: String): F[Seq[InstanceId]]
def getInstancesByContainer(containerName: String): Future[Seq[InstanceId]]

/** Retrieves all existing instances.
*
* @return a container object with the list of instance IDs.
*/
def getInstances: F[Seq[InstanceId]]
def getInstances: Future[Seq[InstanceId]]
}

private[akkeeper] object InstanceStorage {
type Async = InstanceStorage[Future]
}

private[akkeeper] trait InstanceStorageFactory[F[_], T] extends (T => InstanceStorage[F])
private[akkeeper] trait InstanceStorageFactory[T] extends (T => InstanceStorage)

private[akkeeper] object InstanceStorageFactory {
type AsyncInstanceStorageFactory[T] = InstanceStorageFactory[Future, T]

implicit object ZookeeperInstanceStorageFactory
extends AsyncInstanceStorageFactory[ZookeeperClientConfig] {
extends InstanceStorageFactory[ZookeeperClientConfig] {

override def apply(config: ZookeeperClientConfig): InstanceStorage.Async = {
override def apply(config: ZookeeperClientConfig): InstanceStorage = {
new ZookeeperInstanceStorage(config.child("instances"))
}
}

def createAsync[T: AsyncInstanceStorageFactory](config: T): InstanceStorage.Async = {
def apply[T: InstanceStorageFactory](config: T): InstanceStorage = {
implicitly[T](config)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import ZookeeperInstanceStorage._
import InstanceStatusJsonProtocol._

private[akkeeper] class ZookeeperInstanceStorage(config: ZookeeperClientConfig)
extends BaseZookeeperStorage with InstanceStorage.Async {
extends BaseZookeeperStorage with InstanceStorage {

protected override val zookeeperClient =
new AsyncZookeeperClient(config, CreateMode.EPHEMERAL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class ContainerInstanceServiceSpec(system: ActorSystem) extends TestKit(system)
}

private def createContainerInstanceService(userActors: Seq[ActorLaunchContext],
instanceStorage: InstanceStorage.Async,
instanceStorage: InstanceStorage,
instanceId: InstanceId,
masterAddress: Address,
retryInterval: FiniteDuration = DefaultRegistrationRetryInterval,
Expand All @@ -81,7 +81,7 @@ class ContainerInstanceServiceSpec(system: ActorSystem) extends TestKit(system)
val instanceId = InstanceId("container")
val expectedInstanceInfo = createExpectedInstanceInfo(instanceId, selfAddr)

val storage = mock[InstanceStorage.Async]
val storage = mock[InstanceStorage]
(storage.start _).expects()
(storage.stop _).expects()
(storage.registerInstance _)
Expand All @@ -108,7 +108,7 @@ class ContainerInstanceServiceSpec(system: ActorSystem) extends TestKit(system)
val instanceId = InstanceId("container")
val expectedInstanceInfo = createExpectedInstanceInfo(instanceId, selfAddr)

val storage = mock[InstanceStorage.Async]
val storage = mock[InstanceStorage]
(storage.start _).expects()
(storage.stop _).expects()
val numberOfAttempts = 3
Expand All @@ -133,7 +133,7 @@ class ContainerInstanceServiceSpec(system: ActorSystem) extends TestKit(system)
val newSystem = ActorSystem("ContainerInstanceServiceSpecTemp")
val instanceId = InstanceId("container")

val storage = mock[InstanceStorage.Async]
val storage = mock[InstanceStorage]
(storage.start _).expects()
(storage.stop _).expects()

Expand All @@ -155,7 +155,7 @@ class ContainerInstanceServiceSpec(system: ActorSystem) extends TestKit(system)
val expectedInstanceInfo = createExpectedInstanceInfo(instanceId,
newCluster.selfUniqueAddress, actorPath = "/user/akkeeperInstance/testActor")

val storage = mock[InstanceStorage.Async]
val storage = mock[InstanceStorage]
(storage.start _).expects()
(storage.stop _).expects()
(storage.registerInstance _)
Expand Down

0 comments on commit 4905d8e

Please sign in to comment.