diff --git a/akkeeper/src/main/resources/reference.conf b/akkeeper/src/main/resources/reference.conf index 3568832..b084433 100644 --- a/akkeeper/src/main/resources/reference.conf +++ b/akkeeper/src/main/resources/reference.conf @@ -18,7 +18,7 @@ akka { akkeeper { akka { - port = 2550 + port = 0 system-name = "AkkeeperSystem" } @@ -33,7 +33,7 @@ akkeeper { application-name = "AkkeeperApplication" master { cpus = 1 - memory = 1048 + memory = 1024 jvm.args = ["-Xmx1g"] } max-attempts = 2 diff --git a/akkeeper/src/main/scala/akkeeper/common/InstanceInfo.scala b/akkeeper/src/main/scala/akkeeper/common/InstanceInfo.scala index 69873f8..2d3c417 100644 --- a/akkeeper/src/main/scala/akkeeper/common/InstanceInfo.scala +++ b/akkeeper/src/main/scala/akkeeper/common/InstanceInfo.scala @@ -16,6 +16,7 @@ package akkeeper.common import akka.actor.Address +import akka.cluster.UniqueAddress import akkeeper.AkkeeperException import spray.json._ @@ -50,10 +51,12 @@ object InstanceStatus { * @param roles the list of the instance's Akka roles. * @param address the address of the instance. * @param actors the list of user actors that are available on this instance. + * @param extra the key value properties with additional instance specific data. */ case class InstanceInfo private[akkeeper] (instanceId: InstanceId, status: InstanceStatus, containerName: String, roles: Set[String], - address: Option[Address], actors: Set[String]) + address: Option[UniqueAddress], actors: Set[String], + extra: Map[String, String] = Map.empty) object InstanceInfo { private def createWithStatus(instanceId: InstanceId, status: InstanceStatus): InstanceInfo = { @@ -104,6 +107,9 @@ trait InstanceStatusJsonProtocol extends DefaultJsonProtocol with InstanceIdJson } } + implicit val uniqueAddressFormat = + jsonFormat[Address, Long, UniqueAddress]((a, u) => UniqueAddress(a, u), "address", "longUid") + implicit val instanceStatusFormat = new JsonFormat[InstanceStatus] { override def read(json: JsValue): InstanceStatus = { InstanceStatus.fromString(json.convertTo[String]) @@ -113,7 +119,7 @@ trait InstanceStatusJsonProtocol extends DefaultJsonProtocol with InstanceIdJson } } - implicit val instanceInfoFormat = jsonFormat6(InstanceInfo.apply) + implicit val instanceInfoFormat = jsonFormat7(InstanceInfo.apply) } object InstanceStatusJsonProtocol extends InstanceStatusJsonProtocol diff --git a/akkeeper/src/main/scala/akkeeper/common/StopInstance.scala b/akkeeper/src/main/scala/akkeeper/common/StopInstance.scala index 7567581..98d9d87 100644 --- a/akkeeper/src/main/scala/akkeeper/common/StopInstance.scala +++ b/akkeeper/src/main/scala/akkeeper/common/StopInstance.scala @@ -15,4 +15,4 @@ */ package akkeeper.common -private[akkeeper] case object StopInstance +case object StopInstance diff --git a/akkeeper/src/main/scala/akkeeper/container/ContainerInstanceMain.scala b/akkeeper/src/main/scala/akkeeper/container/ContainerInstanceMain.scala index 5ada2fe..5db1c8e 100644 --- a/akkeeper/src/main/scala/akkeeper/container/ContainerInstanceMain.scala +++ b/akkeeper/src/main/scala/akkeeper/container/ContainerInstanceMain.scala @@ -39,7 +39,7 @@ import scala.concurrent.duration.Duration object ContainerInstanceMain extends App { val optParser = new OptionParser[ContainerInstanceArguments]("akkeeperInstance") { - head("akkeeperInstance", "0.1") + head("akkeeperInstance", "0.2.0") opt[String](AppIdArg).required().action((v, c) => { c.copy(appId = v) diff --git a/akkeeper/src/main/scala/akkeeper/container/service/ContainerInstanceService.scala b/akkeeper/src/main/scala/akkeeper/container/service/ContainerInstanceService.scala index fedebe2..7a275a4 100644 --- a/akkeeper/src/main/scala/akkeeper/container/service/ContainerInstanceService.scala +++ b/akkeeper/src/main/scala/akkeeper/container/service/ContainerInstanceService.scala @@ -30,7 +30,8 @@ import ContainerInstanceService._ class ContainerInstanceService(instanceStorage: InstanceStorage.Async, instanceId: InstanceId, masterAddress: Address, - retryInterval: FiniteDuration) + registrationRetryInterval: FiniteDuration, + joinClusterTimeout: FiniteDuration) extends Actor with ActorLogging { private implicit val dispatcher = context.dispatcher @@ -77,7 +78,7 @@ class ContainerInstanceService(instanceStorage: InstanceStorage.Async, status = InstanceUp, containerName = instanceId.containerName, roles = cluster.selfRoles, - address = Some(cluster.selfAddress), + address = Some(cluster.selfUniqueAddress), actors = actors.toSet ) thisInstance = Some(info) @@ -98,9 +99,11 @@ class ContainerInstanceService(instanceStorage: InstanceStorage.Async, notifyMonitoringService case OperationFailed(_, e) => // Failed to save the record to a storage. - log.error(e, s"Failed to store this instance information. Retrying in $retryInterval") + log.error(e, "Failed to store this instance information. " + + s"Retrying in $registrationRetryInterval") // Scheduling retry. - context.system.scheduler.scheduleOnce(retryInterval, self, RetryRegistration) + context.system.scheduler.scheduleOnce(registrationRetryInterval, + self, RetryRegistration) case RetryRegistration => log.info("Retrying instance registration process") registerThisInstance @@ -108,6 +111,8 @@ class ContainerInstanceService(instanceStorage: InstanceStorage.Async, log.info("Termination command received. Stopping this instance") cluster.leave(cluster.selfAddress) context.system.terminate() + case JoinClusterTimeout => + // Safely ignore the timeout command. } private def joiningTheClusterReceive: Receive = { @@ -118,6 +123,10 @@ class ContainerInstanceService(instanceStorage: InstanceStorage.Async, context.become(initializedReceive) registerThisInstance } + case JoinClusterTimeout => + log.error(s"Couldn't join the cluster during ${joinClusterTimeout.toSeconds} seconds. " + + "Terminating this instance...") + context.system.terminate() } private def waitingForActorsReceive: Receive = { @@ -127,6 +136,8 @@ class ContainerInstanceService(instanceStorage: InstanceStorage.Async, log.debug(s"Joining the cluster (master: $masterAddress)") cluster.join(masterAddress) cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberUp]) + // Scheduling a timeout command. + context.system.scheduler.scheduleOnce(joinClusterTimeout, self, JoinClusterTimeout) } override def receive: Receive = waitingForActorsReceive @@ -135,7 +146,9 @@ class ContainerInstanceService(instanceStorage: InstanceStorage.Async, object ContainerInstanceService { private[akkeeper] case class LaunchActors(actors: Seq[ActorLaunchContext]) private case object RetryRegistration - private val DefaultRetryInterval = 30 seconds + private case object JoinClusterTimeout + private val DefaultRegistrationRetryInterval = 30 seconds + private val DefaultJoinClusterTimeout = 120 seconds val ActorName = "akkeeperInstance" @@ -143,9 +156,10 @@ object ContainerInstanceService { instanceStorage: InstanceStorage.Async, instanceId: InstanceId, masterAddress: Address, - retryInterval: FiniteDuration = DefaultRetryInterval): ActorRef = { + registrationRetryInterval: FiniteDuration = DefaultRegistrationRetryInterval, + joinClusterTimeout: FiniteDuration = DefaultJoinClusterTimeout): ActorRef = { val props = Props(classOf[ContainerInstanceService], instanceStorage, - instanceId, masterAddress, retryInterval) + instanceId, masterAddress, registrationRetryInterval, joinClusterTimeout) factory.actorOf(props, ActorName) } } diff --git a/akkeeper/src/main/scala/akkeeper/launcher/LauncherMain.scala b/akkeeper/src/main/scala/akkeeper/launcher/LauncherMain.scala index 3b21116..70746e7 100644 --- a/akkeeper/src/main/scala/akkeeper/launcher/LauncherMain.scala +++ b/akkeeper/src/main/scala/akkeeper/launcher/LauncherMain.scala @@ -28,7 +28,7 @@ import scala.concurrent.duration._ object LauncherMain extends App { val optParser = new OptionParser[LaunchArguments]("akkeeper") { - head("akkeeper", "0.1") + head("akkeeper", "0.2.0") opt[File]("akkeeperJar").required().action((v, c) => { c.copy(akkeeperJarPath = v) diff --git a/akkeeper/src/main/scala/akkeeper/master/MasterMain.scala b/akkeeper/src/main/scala/akkeeper/master/MasterMain.scala index 8fdb6fe..69ac6f4 100644 --- a/akkeeper/src/main/scala/akkeeper/master/MasterMain.scala +++ b/akkeeper/src/main/scala/akkeeper/master/MasterMain.scala @@ -23,7 +23,7 @@ import scala.util.control.NonFatal object MasterMain extends App { val optParser = new OptionParser[MasterArguments]("akkeeperMaster") { - head("akkeeperMaster", "0.1") + head("akkeeperMaster", "0.2.0") opt[String](AppIdArg).required().action((v, c) => { c.copy(appId = v) diff --git a/akkeeper/src/main/scala/akkeeper/master/MasterRunner.scala b/akkeeper/src/main/scala/akkeeper/master/MasterRunner.scala index efec3a0..42fc95c 100644 --- a/akkeeper/src/main/scala/akkeeper/master/MasterRunner.scala +++ b/akkeeper/src/main/scala/akkeeper/master/MasterRunner.scala @@ -15,6 +15,7 @@ */ package akkeeper.master +import java.util.UUID import java.util.concurrent.TimeUnit import akka.actor.{ActorRef, ActorSystem} @@ -23,6 +24,7 @@ import akka.http.scaladsl.Http import akka.http.scaladsl.server.Route import akka.stream.ActorMaterializer import akka.util.Timeout +import akkeeper.common.{InstanceId, InstanceInfo, InstanceUp} import akkeeper.deploy.{DeployClient, DeployClientFactory} import akkeeper.deploy.yarn.YarnApplicationMasterConfig import akkeeper.master.route._ @@ -86,6 +88,17 @@ private[master] class YarnMasterRunner extends MasterRunner { )).route } + private def registerMasterInstance[F[_]](storage: InstanceStorage[F], + actorSystem: ActorSystem, + restPort: Int): F[InstanceId] = { + val cluster = Cluster(actorSystem) + val instanceId = InstanceId(MasterService.MasterServiceName, UUID.randomUUID()) + val extra = Map("apiPort" -> restPort.toString) + val info = InstanceInfo(instanceId, InstanceUp, MasterService.MasterServiceName, + cluster.selfRoles, Some(cluster.selfUniqueAddress), Set.empty, extra) + storage.registerInstance(info) + } + def run(masterArgs: MasterArguments): Unit = { val config = masterArgs.config .map(c => ConfigFactory.parseFile(c).withFallback(ConfigFactory.load())) @@ -115,6 +128,8 @@ private[master] class YarnMasterRunner extends MasterRunner { logger.error(s"Failed to bind to port $restPort", ex) } + registerMasterInstance(instanceStorage, actorSystem, restPort) + Await.result(actorSystem.whenTerminated, Duration.Inf) materializer.shutdown() ticketRenewer.foreach(_.stop()) diff --git a/akkeeper/src/main/scala/akkeeper/master/service/MasterService.scala b/akkeeper/src/main/scala/akkeeper/master/service/MasterService.scala index d32d396..e5910b3 100644 --- a/akkeeper/src/main/scala/akkeeper/master/service/MasterService.scala +++ b/akkeeper/src/main/scala/akkeeper/master/service/MasterService.scala @@ -119,7 +119,7 @@ private[akkeeper] class MasterService(deployClient: DeployClient.Async, log.debug(s"Received instance info. ${numOfRequiredInstances - seedInstances.size} " + "more needed to proceed") if (seedInstances.size >= numOfRequiredInstances) { - val seedAddrs = immutable.Seq(seedInstances.map(_.address.get).toSeq: _*) + val seedAddrs = immutable.Seq(seedInstances.map(_.address.get.address).toSeq: _*) joinCluster(seedAddrs) } diff --git a/akkeeper/src/main/scala/akkeeper/master/service/MonitoringService.scala b/akkeeper/src/main/scala/akkeeper/master/service/MonitoringService.scala index c6d0174..47ea00b 100644 --- a/akkeeper/src/main/scala/akkeeper/master/service/MonitoringService.scala +++ b/akkeeper/src/main/scala/akkeeper/master/service/MonitoringService.scala @@ -16,13 +16,14 @@ package akkeeper.master.service import akka.actor._ -import akka.cluster.Cluster +import akka.cluster.{Cluster, UniqueAddress} import akka.cluster.ClusterEvent._ import akka.pattern.pipe import akkeeper.api._ import akkeeper.common._ import akkeeper.container.service.ContainerInstanceService import akkeeper.storage._ + import scala.collection.mutable import scala.concurrent.Future import scala.util.control.NonFatal @@ -63,23 +64,23 @@ private[akkeeper] class MonitoringService(instanceStorage: InstanceStorage.Async sender() ! InstanceNotFound(requestId, instanceId) } - private def findInstanceByAddr(addr: Address): Option[InstanceInfo] = { + private def findInstanceByAddr(addr: UniqueAddress): Option[InstanceInfo] = { instances.collectFirst { - case (id, Some(info)) if info.address.exists(_ == addr) => info + case (_, Some(info)) if info.address.exists(_ == addr) => info } } - private def updateInstanceStatusByAddr(addr: Address, status: InstanceStatus): Unit = { + private def updateInstanceStatusByAddr(addr: UniqueAddress, status: InstanceStatus): Unit = { findInstanceByAddr(addr) .foreach(i => instances.put(i.instanceId, Some(i.copy(status = status)))) } private def terminateInstance(instance: InstanceInfo): Unit = { val addr = instance.address.get - val path = RootActorPath(addr) / "user" / ContainerInstanceService.ActorName + val path = RootActorPath(addr.address) / "user" / ContainerInstanceService.ActorName val selection = context.actorSelection(path) selection ! StopInstance - cluster.down(addr) + cluster.down(addr.address) log.info(s"Instance ${instance.instanceId} terminated successfully") } @@ -115,7 +116,7 @@ private[akkeeper] class MonitoringService(instanceStorage: InstanceStorage.Async private def onInstanceInfoUpdate(info: InstanceInfo): Unit = { if (info.status != InstanceDeploying) { val updatedInfo = - if (info.address.exists(!cluster.failureDetector.isAvailable(_))) { + if (info.address.exists(a => !cluster.failureDetector.isAvailable(a.address))) { info.copy(status = InstanceUnreachable) } else { info @@ -256,11 +257,11 @@ private[akkeeper] class MonitoringService(instanceStorage: InstanceStorage.Async private def clusterEventReceive: Receive = { case UnreachableMember(member) => - updateInstanceStatusByAddr(member.address, InstanceUnreachable) + updateInstanceStatusByAddr(member.uniqueAddress, InstanceUnreachable) case ReachableMember(member) => - updateInstanceStatusByAddr(member.address, InstanceUp) + updateInstanceStatusByAddr(member.uniqueAddress, InstanceUp) case MemberRemoved(member, _) => - val info = findInstanceByAddr(member.address) + val info = findInstanceByAddr(member.uniqueAddress) info.foreach(i => instances.remove(i.instanceId)) } diff --git a/akkeeper/src/test/scala/akkeeper/ActorTestUtils.scala b/akkeeper/src/test/scala/akkeeper/ActorTestUtils.scala index 617b0be..75832db 100644 --- a/akkeeper/src/test/scala/akkeeper/ActorTestUtils.scala +++ b/akkeeper/src/test/scala/akkeeper/ActorTestUtils.scala @@ -17,10 +17,13 @@ package akkeeper import akka.actor.ActorRef import akka.pattern.gracefulStop +import scala.concurrent.duration._ trait ActorTestUtils extends AwaitMixin { + protected val gracefulStopTimeout: FiniteDuration = 6 seconds + protected def gracefulActorStop(actor: ActorRef): Unit = { - await(gracefulStop(actor, awaitTimeout)) + await(gracefulStop(actor, gracefulStopTimeout)) } } diff --git a/akkeeper/src/test/scala/akkeeper/container/service/ContainerInstanceServiceSpec.scala b/akkeeper/src/test/scala/akkeeper/container/service/ContainerInstanceServiceSpec.scala index ffbe20e..9275b6c 100644 --- a/akkeeper/src/test/scala/akkeeper/container/service/ContainerInstanceServiceSpec.scala +++ b/akkeeper/src/test/scala/akkeeper/container/service/ContainerInstanceServiceSpec.scala @@ -16,9 +16,9 @@ package akkeeper.container.service import akka.actor._ -import akka.cluster.Cluster +import akka.cluster.{Cluster, UniqueAddress} import akka.testkit.{ImplicitSender, TestKit} -import akkeeper.{AkkeeperException, ActorTestUtils} +import akkeeper.{ActorTestUtils, AkkeeperException} import akkeeper.common._ import akkeeper.master.service._ import akkeeper.storage.InstanceStorage @@ -26,6 +26,7 @@ import akkeeper.utils.ConfigUtils._ import com.typesafe.config.ConfigFactory import org.scalamock.scalatest.MockFactory import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers} + import scala.concurrent.Future import scala.concurrent.duration._ import ContainerInstanceService._ @@ -40,7 +41,7 @@ class ContainerInstanceServiceSpec(system: ActorSystem) extends TestKit(system) ConfigFactory.load().withMasterPort.withMasterRole)) private def createExpectedInstanceInfo(instanceId: InstanceId, - addr: Address): InstanceInfo = { + addr: UniqueAddress): InstanceInfo = { InstanceInfo( instanceId = instanceId, status = InstanceUp, @@ -52,7 +53,7 @@ class ContainerInstanceServiceSpec(system: ActorSystem) extends TestKit(system) } "A Container Instance service" should "register itself successfully" in { - val selfAddr = Cluster(system).selfAddress + val selfAddr = Cluster(system).selfUniqueAddress val instanceId = InstanceId("container") val expectedInstanceInfo = createExpectedInstanceInfo(instanceId, selfAddr) @@ -64,7 +65,8 @@ class ContainerInstanceServiceSpec(system: ActorSystem) extends TestKit(system) .returns(Future successful instanceId) val masterServiceMock = createMasterServiceMock(system, self) - val service = ContainerInstanceService.createLocal(system, storage, instanceId, selfAddr) + val service = ContainerInstanceService + .createLocal(system, storage, instanceId, selfAddr.address) service ! LaunchActors(Seq(ActorLaunchContext("testActor", classOf[TestUserActor].getName))) @@ -81,7 +83,7 @@ class ContainerInstanceServiceSpec(system: ActorSystem) extends TestKit(system) } it should "retry if the registration failed" in { - val selfAddr = Cluster(system).selfAddress + val selfAddr = Cluster(system).selfUniqueAddress val instanceId = InstanceId("container") val expectedInstanceInfo = createExpectedInstanceInfo(instanceId, selfAddr) @@ -96,7 +98,7 @@ class ContainerInstanceServiceSpec(system: ActorSystem) extends TestKit(system) val masterServiceMock = createMasterServiceMock(system, self) val service = ContainerInstanceService.createLocal(system, storage, - instanceId, selfAddr, retryInterval = 1 second) + instanceId, selfAddr.address, registrationRetryInterval = 1 second) service ! LaunchActors(Seq(ActorLaunchContext("testActor", classOf[TestUserActor].getName))) @@ -107,6 +109,24 @@ class ContainerInstanceServiceSpec(system: ActorSystem) extends TestKit(system) gracefulActorStop(masterServiceMock) gracefulActorStop(service) } + + it should "terminate if the join timeout occurred" in { + val newSystem = ActorSystem("ContainerInstanceServiceSpecTemp") + val instanceId = InstanceId("container") + + val storage = mock[InstanceStorage.Async] + (storage.start _).expects() + (storage.stop _).expects() + + val seedPort = 12345 + val service = ContainerInstanceService.createLocal(newSystem, storage, instanceId, + Address("akka.tcp", "ContainerInstanceServiceSpecTemp", "127.0.0.1", seedPort), + joinClusterTimeout = 1 second) + + service ! LaunchActors(Seq(ActorLaunchContext("testActor", classOf[TestUserActor].getName))) + + await(newSystem.whenTerminated) + } } object ContainerInstanceServiceSpec { diff --git a/akkeeper/src/test/scala/akkeeper/master/service/MasterServiceSpec.scala b/akkeeper/src/test/scala/akkeeper/master/service/MasterServiceSpec.scala index b62a4ab..642bac3 100644 --- a/akkeeper/src/test/scala/akkeeper/master/service/MasterServiceSpec.scala +++ b/akkeeper/src/test/scala/akkeeper/master/service/MasterServiceSpec.scala @@ -114,7 +114,7 @@ class MasterServiceSpec extends FlatSpecLike with Matchers with MockFactory { it should "initialize successfully and join the existing cluster" in { new MasterServiceTestRunner() { override def test(): Unit = { - val selfAddr = Cluster(system).selfAddress + val selfAddr = Cluster(system).selfUniqueAddress val instance = createInstanceInfo("container").copy(address = Some(selfAddr)) val storage = mock[InstanceStorage.Async] (storage.start _).expects() @@ -180,7 +180,7 @@ class MasterServiceSpec extends FlatSpecLike with Matchers with MockFactory { it should "shutdown the Actor system if the init process fails" in { new MasterServiceTestRunner() { override def test(): Unit = { - val selfAddr = Cluster(system).selfAddress + val selfAddr = Cluster(system).selfUniqueAddress val instance = createInstanceInfo("container").copy(address = Some(selfAddr)) val storage = mock[InstanceStorage.Async] (storage.start _).expects() diff --git a/akkeeper/src/test/scala/akkeeper/master/service/MonitoringServiceSpec.scala b/akkeeper/src/test/scala/akkeeper/master/service/MonitoringServiceSpec.scala index fef5099..37481e7 100644 --- a/akkeeper/src/test/scala/akkeeper/master/service/MonitoringServiceSpec.scala +++ b/akkeeper/src/test/scala/akkeeper/master/service/MonitoringServiceSpec.scala @@ -40,10 +40,10 @@ class MonitoringServiceSpec(system: ActorSystem) extends TestKit(system) super.afterAll() } - private def createTestMember(addr: Address): Member = { + private def createTestMember(addr: UniqueAddress): Member = { val ctr = classOf[Member].getDeclaredConstructor(classOf[UniqueAddress], classOf[Int], classOf[MemberStatus], classOf[Set[String]]) - ctr.newInstance(UniqueAddress(addr, 1), new Integer(1), MemberStatus.Up, Set.empty[String]) + ctr.newInstance(addr, new Integer(1), MemberStatus.Up, Set.empty[String]) } "A Monitoring Service" should "not respond if it's not initialized" in { @@ -327,7 +327,7 @@ class MonitoringServiceSpec(system: ActorSystem) extends TestKit(system) } it should "terminate instance successfully (instance from storage)" in { - val selfAddr = Cluster(system).selfAddress + val selfAddr = Cluster(system).selfUniqueAddress val instance = createInstanceInfo("container").copy(address = Some(selfAddr)) val storage = mock[InstanceStorage.Async] (storage.start _).expects() @@ -349,7 +349,7 @@ class MonitoringServiceSpec(system: ActorSystem) extends TestKit(system) } it should "fail to terminate the instance (instance from local cache)" in { - val selfAddr = Cluster(system).selfAddress + val selfAddr = Cluster(system).selfUniqueAddress val instance = createInstanceInfo("container").copy(address = Some(selfAddr)) val storage = mock[InstanceStorage.Async] (storage.start _).expects() @@ -375,7 +375,7 @@ class MonitoringServiceSpec(system: ActorSystem) extends TestKit(system) } it should "fail to terminate the instance" in { - val selfAddr = Cluster(system).selfAddress + val selfAddr = Cluster(system).selfUniqueAddress val instance = createInstanceInfo("container").copy(address = Some(selfAddr)) val storage = mock[InstanceStorage.Async] val exception = new AkkeeperException("fail") @@ -432,9 +432,10 @@ class MonitoringServiceSpec(system: ActorSystem) extends TestKit(system) val port = 12345 val addr = Address("akka.tcp", system.name, "localhost", port) - val member = createTestMember(addr) + val uniqueAddr = UniqueAddress(addr, 1L) + val member = createTestMember(uniqueAddr) - val instance = createInstanceInfo("container").copy(address = Some(addr)) + val instance = createInstanceInfo("container").copy(address = Some(uniqueAddr)) (storage.start _).expects() (storage.stop _).expects() (storage.getInstances _) diff --git a/akkeeper/src/test/scala/akkeeper/storage/zookeeper/async/ZookeeperInstanceStorageSpec.scala b/akkeeper/src/test/scala/akkeeper/storage/zookeeper/async/ZookeeperInstanceStorageSpec.scala index e920ca0..457c73f 100644 --- a/akkeeper/src/test/scala/akkeeper/storage/zookeeper/async/ZookeeperInstanceStorageSpec.scala +++ b/akkeeper/src/test/scala/akkeeper/storage/zookeeper/async/ZookeeperInstanceStorageSpec.scala @@ -16,6 +16,7 @@ package akkeeper.storage.zookeeper.async import akka.actor.Address +import akka.cluster.UniqueAddress import akkeeper.AwaitMixin import akkeeper.common._ import akkeeper.storage._ @@ -43,7 +44,7 @@ class ZookeeperInstanceStorageSpec extends FlatSpec status = InstanceUp, containerName = container, roles = Set("testRole"), - address = Some(Address("akka.tcp", "AkkaSystem", "localhost", port)), + address = Some(UniqueAddress(Address("akka.tcp", "AkkaSystem", "localhost", port), 1L)), actors = Set("/user/actor1") ) } diff --git a/bin/akkeeper-submit b/bin/akkeeper-submit index 0fff099..221f94d 100755 --- a/bin/akkeeper-submit +++ b/bin/akkeeper-submit @@ -27,6 +27,8 @@ export YARN_CONF_DIR="${YARN_CONF_DIR:-$HADOOP_HOME/etc/hadoop}" YARN_BIN="yarn" if [[ "$HADOOP_HOME" != "" ]]; then YARN_BIN="$HADOOP_HOME/bin/yarn" +elif [[ "$HADOOP_PREFIX" != "" ]]; then + YARN_BIN="$HADOOP_PREFIX/bin/yarn" fi AKKEEPER_HOME="${AKKEEPER_HOME:-$(dirname $(dirname $(realpath $0)))}" diff --git a/build.sbt b/build.sbt index 3af569c..c49ec09 100644 --- a/build.sbt +++ b/build.sbt @@ -99,12 +99,24 @@ val AkkeeperSettings = CommonSettings ++ Seq( new File(baseDirectory.value.getAbsolutePath, "../bin/akkeeper-submit") -> "bin/akkeeper-submit", fatJar -> ("lib/" + fatJar.getName) ) - } + }, + + publishMavenStyle := true, + publishArtifact in Test := false, + pomIncludeRepository := (_ => false), + publishTo := Some( + if (version.value.trim.endsWith("SNAPSHOT")) { + "snapshots" at "https://oss.sonatype.org/content/repositories/snapshots" + } else { + "releases" at "https://oss.sonatype.org/service/local/staging/deploy/maven2" + } + ) ) val NoPublishSettings = CommonSettings ++ Seq( publishArtifact := false, publish := {}, + skip in publish := true, coverageEnabled := false )