Skip to content

Commit

Permalink
Merge 9aea218 into ea5f49a
Browse files Browse the repository at this point in the history
  • Loading branch information
izeigerman committed Jan 17, 2019
2 parents ea5f49a + 9aea218 commit 79e2e51
Show file tree
Hide file tree
Showing 19 changed files with 92 additions and 104 deletions.
Expand Up @@ -82,7 +82,7 @@ object ContainerInstanceMain extends App {
val actorSystem = ActorSystem(instanceConfig.akkeeperAkka.actorSystemName, instanceConfig)

val zkConfig = actorSystem.settings.config.zookeeper.clientConfig
val instanceStorage = InstanceStorageFactory.createAsync(zkConfig.child(instanceArgs.appId))
val instanceStorage = InstanceStorageFactory(zkConfig.child(instanceArgs.appId))

val actorsJsonStr = Source.fromFile(instanceArgs.actors).getLines().mkString("\n")
val actors = actorsJsonStr.parseJson.convertTo[Seq[ActorLaunchContext]]
Expand Down
Expand Up @@ -27,7 +27,7 @@ import scala.util.control.NonFatal
import ContainerInstanceService._

class ContainerInstanceService(userActors: Seq[ActorLaunchContext],
instanceStorage: InstanceStorage.Async,
instanceStorage: InstanceStorage,
instanceId: InstanceId,
masterAddress: Address,
registrationRetryInterval: FiniteDuration,
Expand Down Expand Up @@ -173,7 +173,7 @@ object ContainerInstanceService {

def createLocal(factory: ActorRefFactory,
userActors: Seq[ActorLaunchContext],
instanceStorage: InstanceStorage.Async,
instanceStorage: InstanceStorage,
instanceId: InstanceId,
masterAddress: Address,
registrationRetryInterval: FiniteDuration = DefaultRegistrationRetryInterval,
Expand Down
17 changes: 6 additions & 11 deletions akkeeper/src/main/scala/akkeeper/deploy/DeployClient.scala
Expand Up @@ -20,7 +20,7 @@ import akkeeper.deploy.yarn._
import scala.concurrent.Future

/** A client that is responsible for deploying new container instances. */
private[akkeeper] trait DeployClient[F[_]] {
private[akkeeper] trait DeployClient {

/** Starts the client. */
def start(): Unit
Expand All @@ -44,11 +44,7 @@ private[akkeeper] trait DeployClient[F[_]] {
* Each item in this list represents a result for a one particular instance.
* See [[DeployResult]].
*/
def deploy(container: ContainerDefinition, instances: Seq[InstanceId]): Seq[F[DeployResult]]
}

private[akkeeper] object DeployClient {
type Async = DeployClient[Future]
def deploy(container: ContainerDefinition, instances: Seq[InstanceId]): Seq[Future[DeployResult]]
}

/** A result of the deployment operation. Contains the ID of the instance to which this
Expand All @@ -65,20 +61,19 @@ private[akkeeper] case class DeploySuccessful(instanceId: InstanceId) extends De
private[akkeeper] case class DeployFailed(instanceId: InstanceId,
e: Throwable) extends DeployResult

private[akkeeper] trait DeployClientFactory[F[_], T] extends (T => DeployClient[F])
private[akkeeper] trait DeployClientFactory[T] extends (T => DeployClient)

private[akkeeper] object DeployClientFactory {
type AsyncDeployClientFactory[T] = DeployClientFactory[Future, T]

implicit object YarnDeployClientFactory
extends AsyncDeployClientFactory[YarnApplicationMasterConfig] {
extends DeployClientFactory[YarnApplicationMasterConfig] {

override def apply(config: YarnApplicationMasterConfig): DeployClient.Async = {
override def apply(config: YarnApplicationMasterConfig): DeployClient = {
new YarnApplicationMaster(config, new YarnMasterClient)
}
}

def createAsync[T: AsyncDeployClientFactory](config: T): DeployClient.Async = {
def apply[T: DeployClientFactory](config: T): DeployClient = {
implicitly[T](config)
}
}
Expand Up @@ -31,20 +31,21 @@ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.slf4j.LoggerFactory

import scala.collection.mutable
import scala.concurrent.{Future, Promise}
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util._
import scala.util.control.NonFatal
import scala.collection.JavaConverters._
import YarnApplicationMaster._

private[akkeeper] class YarnApplicationMaster(config: YarnApplicationMasterConfig,
yarnClient: YarnMasterClient)
extends DeployClient.Async {
extends DeployClient {

private val logger = LoggerFactory.getLogger(classOf[YarnApplicationMaster])

private val executorService: ScheduledExecutorService = Executors.newScheduledThreadPool(
config.config.yarn.clientThreads)
private implicit val executionContext: ExecutionContext = ExecutionContext.fromExecutor(executorService)

private val stagingDirectory: String = config.config.yarn.stagingDirectory(config.yarnConf, config.appId)
private val localResourceManager: YarnLocalResourceManager =
Expand Down Expand Up @@ -227,8 +228,7 @@ private[akkeeper] class YarnApplicationMaster(config: YarnApplicationMasterConfi
val request = buildContainerRequest(container)
pendingInstances.put(request.getPriority.getPriority, container -> id)

yarnClient.addContainerRequest(request)
promise.future
Future(yarnClient.addContainerRequest(request)).flatMap(_ => promise.future)
})
}

Expand Down
10 changes: 4 additions & 6 deletions akkeeper/src/main/scala/akkeeper/launcher/Launcher.scala
Expand Up @@ -26,7 +26,7 @@ import scala.concurrent.{ExecutionContext, Future}
final case class LaunchResult(appId: String, masterAddress: Address)

/** Launcher for the Akkeeper application. */
trait Launcher[F[_]] {
trait Launcher {

/** Launches the Akkeeper application and returns a launch result.
*
Expand All @@ -36,19 +36,17 @@ trait Launcher[F[_]] {
* submitted application and the address of the node where
* Akkeeper Master is running.
*/
def launch(config: Config, args: LaunchArguments): F[LaunchResult]
def launch(config: Config, args: LaunchArguments): Future[LaunchResult]
}

object Launcher {
def apply[F[_]](implicit l: Launcher[F]): Launcher[F] = l

def createYarnLauncher(yarnConfig: YarnConfiguration)
(implicit context: ExecutionContext): Launcher[Future] = {
(implicit context: ExecutionContext): Launcher = {
new YarnLauncher(yarnConfig, () => new YarnLauncherClient)
}

def createYarnLauncher(yarnConfig: Configuration)
(implicit context: ExecutionContext): Launcher[Future] = {
(implicit context: ExecutionContext): Launcher = {
createYarnLauncher(new YarnConfiguration(yarnConfig))
}
}
Expand Up @@ -38,7 +38,7 @@ import scala.concurrent.{ExecutionContext, Future}

final class YarnLauncher(yarnConf: YarnConfiguration,
yarnClientCreator: () => YarnLauncherClient)
(implicit context: ExecutionContext)extends Launcher[Future] {
(implicit context: ExecutionContext) extends Launcher {

private val logger = LoggerFactory.getLogger(classOf[YarnLauncher])

Expand Down
8 changes: 4 additions & 4 deletions akkeeper/src/main/scala/akkeeper/master/MasterRunner.scala
Expand Up @@ -65,14 +65,14 @@ private[master] class YarnMasterRunner extends MasterRunner {
private val logger = LoggerFactory.getLogger(classOf[YarnMasterRunner])

private def createInstanceStorage(actorSystem: ActorSystem,
appId: String): InstanceStorage.Async = {
appId: String): InstanceStorage = {
val zkConfig = actorSystem.settings.config.zookeeper.clientConfig
InstanceStorageFactory.createAsync(zkConfig.child(appId))
InstanceStorageFactory(zkConfig.child(appId))
}

private def createDeployClient(actorSystem: ActorSystem,
masterArgs: MasterArguments,
restApiPort: Int): DeployClient.Async = {
restApiPort: Int): DeployClient = {
val yarnConf = YarnUtils.getYarnConfiguration
val config = actorSystem.settings.config
val selfAddr = Cluster(actorSystem).selfAddress
Expand All @@ -83,7 +83,7 @@ private[master] class YarnMasterRunner extends MasterRunner {
config = config, yarnConf = yarnConf,
appId = masterArgs.appId, selfAddress = selfAddr, trackingUrl = trackingUrl,
principal = principal)
DeployClientFactory.createAsync(yarnConfig)
DeployClientFactory(yarnConfig)
}

private def loginAndGetRenewer(config: Config, principal: String): KerberosTicketRenewer = {
Expand Down
Expand Up @@ -22,7 +22,7 @@ import akkeeper.common._
import akkeeper.deploy._
import MonitoringService._

private[akkeeper] class DeployService(deployClient: DeployClient.Async,
private[akkeeper] class DeployService(deployClient: DeployClient,
containerService: ActorRef,
monitoringService: ActorRef) extends RequestTrackingService {

Expand Down Expand Up @@ -92,7 +92,7 @@ object DeployService extends RemoteServiceFactory {
override val actorName = "deployService"

private[akkeeper] def createLocal(factory: ActorRefFactory,
deployClient: DeployClient.Async,
deployClient: DeployClient,
containerService: ActorRef,
monitoringService: ActorRef): ActorRef = {
factory.actorOf(Props(classOf[DeployService], deployClient,
Expand Down
Expand Up @@ -29,8 +29,8 @@ import com.typesafe.config.Config
import scala.collection.{immutable, mutable}


private[akkeeper] class MasterService(deployClient: DeployClient.Async,
instanceStorage: InstanceStorage.Async)
private[akkeeper] class MasterService(deployClient: DeployClient,
instanceStorage: InstanceStorage)
extends Actor with ActorLogging with Stash {

private val config: Config = context.system.settings.config
Expand Down Expand Up @@ -168,8 +168,8 @@ object MasterService extends RemoteServiceFactory {

override val actorName = MasterServiceName

private[akkeeper] def createLocal(factory: ActorRefFactory, deployClient: DeployClient.Async,
instanceStorage: InstanceStorage.Async): ActorRef = {
private[akkeeper] def createLocal(factory: ActorRefFactory, deployClient: DeployClient,
instanceStorage: InstanceStorage): ActorRef = {
factory.actorOf(Props(classOf[MasterService], deployClient, instanceStorage), actorName)
}
}
Expand Up @@ -39,7 +39,7 @@ import MemberAutoDownService._
*/
class MemberAutoDownService(targetAddress: UniqueAddress,
targetInstanceId: InstanceId,
instanceStorage: InstanceStorage.Async,
instanceStorage: InstanceStorage,
pollInterval: FiniteDuration)
extends Actor with ActorLogging {

Expand Down Expand Up @@ -103,7 +103,7 @@ object MemberAutoDownService {
private[akkeeper] def createLocal(factory: ActorRefFactory,
targetAddress: UniqueAddress,
targetInstanceId: InstanceId,
instanceStorage: InstanceStorage.Async,
instanceStorage: InstanceStorage,
pollInterval: FiniteDuration = DefaultPollInterval): ActorRef = {
factory.actorOf(Props(classOf[MemberAutoDownService], targetAddress,
targetInstanceId, instanceStorage, pollInterval), s"autoDown-$targetInstanceId")
Expand Down
Expand Up @@ -30,7 +30,7 @@ import scala.util.control.NonFatal
import scala.concurrent.duration.FiniteDuration
import MonitoringService._

private[akkeeper] class MonitoringService(instanceStorage: InstanceStorage.Async)
private[akkeeper] class MonitoringService(instanceStorage: InstanceStorage)
extends RequestTrackingService with Stash {

private val instanceLaunchTimeout: FiniteDuration = context.system.settings.config.monitoring.launchTimeout
Expand Down Expand Up @@ -366,7 +366,7 @@ object MonitoringService extends RemoteServiceFactory {
override val actorName = "monitoringService"

private[akkeeper] def createLocal(factory: ActorRefFactory,
instanceStorage: InstanceStorage.Async): ActorRef = {
instanceStorage: InstanceStorage): ActorRef = {
factory.actorOf(Props(classOf[MonitoringService], instanceStorage), actorName)
}
}
23 changes: 9 additions & 14 deletions akkeeper/src/main/scala/akkeeper/storage/InstanceStorage.scala
Expand Up @@ -21,7 +21,7 @@ import akkeeper.storage.zookeeper.async.ZookeeperInstanceStorage
import scala.concurrent.Future

/** A persistent storage that stores information about existing instances. */
private[akkeeper] trait InstanceStorage[F[_]] extends Storage {
private[akkeeper] trait InstanceStorage extends Storage {

/** Registers a new instance. Same instance can't be registered
* more than once. The instance record must be removed automatically
Expand All @@ -31,49 +31,44 @@ private[akkeeper] trait InstanceStorage[F[_]] extends Storage {
* @param info the instance info. See [[InstanceInfo]].
* @return a container object with the registered instance ID.
*/
def registerInstance(info: InstanceInfo): F[InstanceId]
def registerInstance(info: InstanceInfo): Future[InstanceId]

/** Retrieves the information about the instance by its ID.
*
* @param instanceId the ID of the instance.
* @return a container object with the instance's information.
* See [[InstanceInfo]].
*/
def getInstance(instanceId: InstanceId): F[InstanceInfo]
def getInstance(instanceId: InstanceId): Future[InstanceInfo]

/** Retrieves all instances that belong to the specified
* container.
*
* @param containerName the name of the container.
* @return a container object with the list of instance IDs.
*/
def getInstancesByContainer(containerName: String): F[Seq[InstanceId]]
def getInstancesByContainer(containerName: String): Future[Seq[InstanceId]]

/** Retrieves all existing instances.
*
* @return a container object with the list of instance IDs.
*/
def getInstances: F[Seq[InstanceId]]
def getInstances: Future[Seq[InstanceId]]
}

private[akkeeper] object InstanceStorage {
type Async = InstanceStorage[Future]
}

private[akkeeper] trait InstanceStorageFactory[F[_], T] extends (T => InstanceStorage[F])
private[akkeeper] trait InstanceStorageFactory[T] extends (T => InstanceStorage)

private[akkeeper] object InstanceStorageFactory {
type AsyncInstanceStorageFactory[T] = InstanceStorageFactory[Future, T]

implicit object ZookeeperInstanceStorageFactory
extends AsyncInstanceStorageFactory[ZookeeperClientConfig] {
extends InstanceStorageFactory[ZookeeperClientConfig] {

override def apply(config: ZookeeperClientConfig): InstanceStorage.Async = {
override def apply(config: ZookeeperClientConfig): InstanceStorage = {
new ZookeeperInstanceStorage(config.child("instances"))
}
}

def createAsync[T: AsyncInstanceStorageFactory](config: T): InstanceStorage.Async = {
def apply[T: InstanceStorageFactory](config: T): InstanceStorage = {
implicitly[T](config)
}
}
Expand Up @@ -24,7 +24,7 @@ import ZookeeperInstanceStorage._
import InstanceStatusJsonProtocol._

private[akkeeper] class ZookeeperInstanceStorage(config: ZookeeperClientConfig)
extends BaseZookeeperStorage with InstanceStorage.Async {
extends BaseZookeeperStorage with InstanceStorage {

protected override val zookeeperClient =
new AsyncZookeeperClient(config, CreateMode.EPHEMERAL)
Expand Down
Expand Up @@ -64,7 +64,7 @@ class ContainerInstanceServiceSpec(system: ActorSystem) extends TestKit(system)
}

private def createContainerInstanceService(userActors: Seq[ActorLaunchContext],
instanceStorage: InstanceStorage.Async,
instanceStorage: InstanceStorage,
instanceId: InstanceId,
masterAddress: Address,
retryInterval: FiniteDuration = DefaultRegistrationRetryInterval,
Expand All @@ -81,7 +81,7 @@ class ContainerInstanceServiceSpec(system: ActorSystem) extends TestKit(system)
val instanceId = InstanceId("container")
val expectedInstanceInfo = createExpectedInstanceInfo(instanceId, selfAddr)

val storage = mock[InstanceStorage.Async]
val storage = mock[InstanceStorage]
(storage.start _).expects()
(storage.stop _).expects()
(storage.registerInstance _)
Expand All @@ -108,7 +108,7 @@ class ContainerInstanceServiceSpec(system: ActorSystem) extends TestKit(system)
val instanceId = InstanceId("container")
val expectedInstanceInfo = createExpectedInstanceInfo(instanceId, selfAddr)

val storage = mock[InstanceStorage.Async]
val storage = mock[InstanceStorage]
(storage.start _).expects()
(storage.stop _).expects()
val numberOfAttempts = 3
Expand All @@ -133,7 +133,7 @@ class ContainerInstanceServiceSpec(system: ActorSystem) extends TestKit(system)
val newSystem = ActorSystem("ContainerInstanceServiceSpecTemp")
val instanceId = InstanceId("container")

val storage = mock[InstanceStorage.Async]
val storage = mock[InstanceStorage]
(storage.start _).expects()
(storage.stop _).expects()

Expand All @@ -155,7 +155,7 @@ class ContainerInstanceServiceSpec(system: ActorSystem) extends TestKit(system)
val expectedInstanceInfo = createExpectedInstanceInfo(instanceId,
newCluster.selfUniqueAddress, actorPath = "/user/akkeeperInstance/testActor")

val storage = mock[InstanceStorage.Async]
val storage = mock[InstanceStorage]
(storage.start _).expects()
(storage.stop _).expects()
(storage.registerInstance _)
Expand Down

0 comments on commit 79e2e51

Please sign in to comment.