Skip to content

Commit

Permalink
Add concept of global resources which can be shared across applicatio…
Browse files Browse the repository at this point in the history
…ns (#80)
  • Loading branch information
lceleban-dd authored and izeigerman committed Dec 24, 2019
1 parent ad4feb0 commit 547f64e
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 52 deletions.
@@ -0,0 +1,6 @@
package akkeeper.common.config

import java.net.URI

final case class AkkeeperResource(uri: URI, localPath: String, archive: Boolean = false)

Expand Up @@ -15,6 +15,7 @@
*/
package akkeeper.common.config

import java.net.URI
import java.time.{Duration => JavaDuration}
import java.util.concurrent.TimeUnit

Expand All @@ -34,6 +35,14 @@ private[akkeeper] final class AkkeeperConfig(akkeeperConfig: Config) {
Seq.empty
}
}

lazy val globalResources: Seq[AkkeeperResource] = {
if (akkeeperConfig.hasPath("global-resources")) {
akkeeperConfig.getConfigList("global-resources").asScala.map(akkeeperResourceFromConfig)
} else {
Seq.empty
}
}
}

private[akkeeper] final class AkkeeperAkkaConfig(akkeeperAkkaConfig: Config) {
Expand Down Expand Up @@ -120,4 +129,12 @@ object ConfigUtils {
environment = config.getMapOfStrings("environment")
)
}

private[config] def akkeeperResourceFromConfig(config: Config): AkkeeperResource = {
AkkeeperResource(
new URI(config.getString("uri")),
config.getString("local-path"),
config.getBoolean("archive")
)
}
}
Expand Up @@ -17,13 +17,15 @@ package akkeeper.launcher

import java.net.URI

import akkeeper.common.config.AkkeeperResource
import akkeeper.launcher.LaunchArguments._
import com.typesafe.config.Config
import LaunchArguments._

final case class LaunchArguments(akkeeperJarPath: URI = new URI("."),
userJar: URI = new URI("."),
otherJars: Seq[URI] = Seq.empty,
resources: Seq[URI] = Seq.empty,
globalResources: Seq[AkkeeperResource] = Seq.empty,
masterJvmArgs: Seq[String] = Seq.empty,
userConfig: Option[Config] = None,
pollInterval: Long = DefaultPollInterval,
Expand Down
Expand Up @@ -24,7 +24,7 @@ import akkeeper.common.config._
import akkeeper.launcher._
import akkeeper.yarn._
import akkeeper.yarn.client.YarnLauncherClient
import com.typesafe.config.{Config, ConfigRenderOptions, ConfigValueFactory}
import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions, ConfigValueFactory}
import org.apache.commons.io.FilenameUtils
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration
Expand Down Expand Up @@ -88,46 +88,49 @@ final class YarnLauncher(yarnConf: YarnConfiguration,
files.foreach(file => {
val fileName = FilenameUtils.getName(file.getPath)
val localPath = dstLocalDir + "/" + fileName
resourceManger.createLocalResource(file.toString, localPath)
resourceManger.uploadLocalResource(file.toString, localPath)
})
}

private def buildLocalResources(stagingDir: String,
args: LaunchArguments,
userConfig: Option[Config]): util.HashMap[String, LocalResource] = {
val resourceManger = new YarnLocalResourceManager(yarnConf, stagingDir)
launcherConfig: Config): util.HashMap[String, LocalResource] = {
val resourceManager = new YarnLocalResourceManager(yarnConf, stagingDir)
val localResources = new util.HashMap[String, LocalResource]()

val akkeeperJar = resourceManger.createLocalResource(args.akkeeperJarPath.toString,
def uploadConfig(localResourceName: String, config: Config): Unit = {
val userConfigString = config.root().render(ConfigRenderOptions.concise())
val configResource = resourceManager.uploadLocalResource(
new ByteArrayInputStream(userConfigString.getBytes("UTF-8")), localResourceName)
localResources.put(localResourceName, configResource)
}

val akkeeperJar = resourceManager.uploadLocalResource(args.akkeeperJarPath.toString,
LocalResourceNames.AkkeeperJarName)
localResources.put(LocalResourceNames.AkkeeperJarName, akkeeperJar)

// Add a user jar to the staging directory. No need to include it
// into master local resources.
resourceManger.createLocalResource(args.userJar.toString,
resourceManager.uploadLocalResource(args.userJar.toString,
LocalResourceNames.UserJarName)

// Just upload the third-party jars. No need to include them
// into master local resources.
uploadGenericFiles(args.otherJars, LocalResourceNames.ExtraJarsDirName, resourceManger)
uploadGenericFiles(args.otherJars, LocalResourceNames.ExtraJarsDirName, resourceManager)

// Distribute resources.
uploadGenericFiles(args.resources, LocalResourceNames.ResourcesDirName, resourceManger)
uploadGenericFiles(args.resources, LocalResourceNames.ResourcesDirName, resourceManager)

args.principal.foreach(_ => {
// Distribute keytab.
val keytabResource = resourceManger.createLocalResource(args.keytab.get.toString,
val keytabResource = resourceManager.uploadLocalResource(args.keytab.get.toString,
LocalResourceNames.KeytabName)
localResources.put(LocalResourceNames.KeytabName, keytabResource)
})

userConfig.foreach(config => {
val userConfigString = config.root().render(ConfigRenderOptions.concise())
val configResource = resourceManger.createLocalResource(
new ByteArrayInputStream(userConfigString.getBytes("UTF-8")),
LocalResourceNames.UserConfigName)
localResources.put(LocalResourceNames.UserConfigName, configResource)
})
args.userConfig.foreach(uploadConfig(LocalResourceNames.UserConfigName, _))
uploadConfig(LocalResourceNames.ApplicationConfigName, launcherConfig)

localResources
}

Expand All @@ -150,8 +153,16 @@ final class YarnLauncher(yarnConf: YarnConfiguration,
YarnUtils.buildCmd(mainClass, jvmArgs = jvmArgs, appArgs = appArgs)
}

private def addStagingDirToUserConfig(config: Config, stagingDir: String): Config = {
config.withValue("akkeeper.yarn.staging-directory", ConfigValueFactory.fromAnyRef(stagingDir))
private def buildLauncherConfig(stagingDir: String, resources: Seq[AkkeeperResource]): Config = {
val globalResourcesConfigValue = resources.map { r =>
Map("uri" -> r.uri.toString, "local-path" -> r.localPath, "archive" -> r.archive).asJava
}
ConfigFactory.parseMap(
Map(
"akkeeper.yarn.staging-directory" -> stagingDir,
"akkeeper.global-resources" -> globalResourcesConfigValue.asJava
).asJava
)
}

private def launchWithClient(yarnClient: YarnLauncherClient,
Expand All @@ -176,8 +187,8 @@ final class YarnLauncher(yarnConf: YarnConfiguration,

val baseStagingDir = config.yarn.stagingDirectory.getOrElse(YarnUtils.defaultStagingDirectory(yarnConf))
val stagingDir = YarnUtils.appStagingDirectory(yarnConf, Some(baseStagingDir), appId.toString)
val updatedUserConfig = args.userConfig.map(addStagingDirToUserConfig(_, baseStagingDir))
val localResources = buildLocalResources(stagingDir, args, updatedUserConfig)
val launcherConfig = buildLauncherConfig(baseStagingDir, args.globalResources)
val localResources = buildLocalResources(stagingDir, args, launcherConfig)
val cmd = buildCmd(appId, config, args)
logger.debug(s"Akkeeper Master command: ${cmd.mkString(" ")}")

Expand Down
Expand Up @@ -19,6 +19,7 @@ private[akkeeper] object LocalResourceNames {
val AkkeeperJarName = "akkeeper.jar"
val UserJarName = "user.jar"
val UserConfigName = "user_config.conf"
val ApplicationConfigName = "application.conf"
val ActorLaunchContextsName = "actors.json"
val KeytabName = "akkeeper.keytab"

Expand Down
Expand Up @@ -37,16 +37,19 @@ private[akkeeper] final class YarnLocalResourceManager(conf: Configuration,
}
}

private def create(fs: FileSystem, status: FileStatus): LocalResource = {
private def create(fs: FileSystem,
status: FileStatus,
localResourceType: LocalResourceType,
localResourceVisibility: LocalResourceVisibility): LocalResource = {
LocalResource.newInstance(
ConverterUtils.getYarnUrlFromURI(fs.makeQualified(status.getPath).toUri),
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
localResourceType, localResourceVisibility,
status.getLen, status.getModificationTime
)
}

private def copyResourceToStagingDir(dstFs: FileSystem, srcStream: InputStream,
dstPath: String): Path = {
private def copyResourceToStagingDir(srcStream: InputStream, dstPath: String): Path = {
val dstFs = stagingDirPath.getFileSystem(conf)
val dst = new Path(stagingDirPath, dstPath)
withStream(dstFs.create(dst)) { out =>
IOUtils.copy(srcStream, out)
Expand All @@ -55,28 +58,34 @@ private[akkeeper] final class YarnLocalResourceManager(conf: Configuration,
dst
}

def createLocalResource(srcStream: InputStream, dstPath: String): LocalResource = {
val dstFs = stagingDirPath.getFileSystem(conf)
val dst = copyResourceToStagingDir(dstFs, srcStream, dstPath)
val dstStatus = dstFs.getFileStatus(dst)
create(dstFs, dstStatus)
def getLocalResource(
dstPath: Path,
localResourceType: LocalResourceType = LocalResourceType.FILE,
localResourceVisibility: LocalResourceVisibility = LocalResourceVisibility.APPLICATION
): LocalResource = {
val dstFs = dstPath.getFileSystem(conf)
val dstStatus = dstFs.getFileStatus(dstPath)
create(dstFs, dstStatus, localResourceType, localResourceVisibility)
}

def uploadLocalResource(srcStream: InputStream, dstPath: String): LocalResource = {
val dst = copyResourceToStagingDir(srcStream, dstPath)
getLocalResource(dst)
}

def createLocalResource(srcPath: String, dstPath: String): LocalResource = {
def uploadLocalResource(srcPath: String, dstPath: String): LocalResource = {
val path = new Path(srcPath)
val srcFs = path.getFileSystem(conf)
withStream(srcFs.open(path)) { srcStream =>
createLocalResource(srcStream, dstPath)
uploadLocalResource(srcStream, dstPath)
}
}

def getExistingLocalResource(dstPath: Path): LocalResource = {
val fs = dstPath.getFileSystem(conf)
val dstStatus = fs.getFileStatus(new Path(stagingDirPath, dstPath))
create(fs, dstStatus)
def getLocalResourceFromStagingDir(dstPath: Path): LocalResource = {
getLocalResource(new Path(stagingDirPath, dstPath))
}

def getExistingLocalResource(dstPath: String): LocalResource = {
getExistingLocalResource(new Path(dstPath))
def getLocalResourceFromStagingDir(dstPath: String): LocalResource = {
getLocalResourceFromStagingDir(new Path(dstPath))
}
}
Expand Up @@ -63,7 +63,7 @@ class YarnLocalResourceManagerSpec extends FlatSpec with Matchers with BeforeAnd
val expectedFileName = UUID.randomUUID().toString
val expectedPath = new Path(stagingDir, expectedFileName).toString

val actualResult = manager.createLocalResource(resource, expectedFileName)
val actualResult = manager.uploadLocalResource(resource, expectedFileName)
validateLocalResource(actualResult, expectedPath)
validateResourcePayload("/application-container-test.conf", expectedPath)
}
Expand All @@ -74,7 +74,7 @@ class YarnLocalResourceManagerSpec extends FlatSpec with Matchers with BeforeAnd
val expectedFileName = UUID.randomUUID().toString
val expectedPath = new Path(stagingDir, expectedFileName).toString

val actualResult = manager.createLocalResource(resource, expectedFileName)
val actualResult = manager.uploadLocalResource(resource, expectedFileName)
validateLocalResource(actualResult, expectedPath)
validateResourcePayload("/application-container-test.conf", expectedPath)
resource.close()
Expand All @@ -86,10 +86,10 @@ class YarnLocalResourceManagerSpec extends FlatSpec with Matchers with BeforeAnd
val expectedFileName = UUID.randomUUID().toString
val expectedPath = new Path(stagingDir, expectedFileName).toString

manager.createLocalResource(resource, expectedFileName)
manager.uploadLocalResource(resource, expectedFileName)
val newExpectedFileName = UUID.randomUUID().toString
val newExpectedPath = new Path(stagingDir, newExpectedFileName).toString
val actualResult = manager.createLocalResource(expectedPath, newExpectedFileName)
val actualResult = manager.uploadLocalResource(expectedPath, newExpectedFileName)
validateLocalResource(actualResult, newExpectedPath)
validateResourcePayload("/application-container-test.conf", newExpectedPath)
}
Expand All @@ -100,8 +100,8 @@ class YarnLocalResourceManagerSpec extends FlatSpec with Matchers with BeforeAnd
val expectedFileName = UUID.randomUUID().toString
val expectedPath = new Path(stagingDir, expectedFileName).toString

manager.createLocalResource(resource, expectedFileName)
val actualResult = manager.getExistingLocalResource(expectedFileName)
manager.uploadLocalResource(resource, expectedFileName)
val actualResult = manager.getLocalResourceFromStagingDir(expectedFileName)
validateLocalResource(actualResult, expectedPath)
validateResourcePayload("/application-container-test.conf", expectedPath)
}
Expand Down
Expand Up @@ -76,7 +76,7 @@ private[akkeeper] class YarnApplicationMaster(config: YarnApplicationMasterConfi

// Distribute the user configuration.
try {
val instanceConfigResource = localResourceManager.getExistingLocalResource(
val instanceConfigResource = localResourceManager.getLocalResourceFromStagingDir(
LocalResourceNames.UserConfigName)
localResources.put(LocalResourceNames.UserConfigName, instanceConfigResource)
} catch {
Expand All @@ -86,18 +86,18 @@ private[akkeeper] class YarnApplicationMaster(config: YarnApplicationMasterConfi

// Retrieve the Akkeeper Assembly jar.
val akkeeperJarResource = localResourceManager
.getExistingLocalResource(LocalResourceNames.AkkeeperJarName)
.getLocalResourceFromStagingDir(LocalResourceNames.AkkeeperJarName)
localResources.put(LocalResourceNames.AkkeeperJarName, akkeeperJarResource)

// Retrieve the user jar.
val userJarResource = localResourceManager
.getExistingLocalResource(LocalResourceNames.UserJarName)
.getLocalResourceFromStagingDir(LocalResourceNames.UserJarName)
localResources.put(LocalResourceNames.UserJarName, userJarResource)

// Retrieve the keytab if present.
config.principal.foreach(_ => {
val keytabResource = localResourceManager
.getExistingLocalResource(LocalResourceNames.KeytabName)
.getLocalResourceFromStagingDir(LocalResourceNames.KeytabName)
localResources.put(LocalResourceNames.KeytabName, keytabResource)
})

Expand All @@ -107,7 +107,7 @@ private[akkeeper] class YarnApplicationMaster(config: YarnApplicationMasterConfi
val resources = fs.listStatus(new Path(stagingDirectory, directory))
resources.foreach(status => {
val fileName = directory + "/" + status.getPath.getName
val resource = localResourceManager.getExistingLocalResource(fileName)
val resource = localResourceManager.getLocalResourceFromStagingDir(fileName)
localResources.put(fileName, resource)
})
} catch {
Expand All @@ -119,15 +119,25 @@ private[akkeeper] class YarnApplicationMaster(config: YarnApplicationMasterConfi
// Retrieve a content of the resources/ directory.
addExistingResources(LocalResourceNames.ResourcesDirName)

localResources ++= getAkkeeperGlobalResources()
localResources.toMap
}

private def getAkkeeperGlobalResources(): Map[String, LocalResource] = {
config.config.akkeeper.globalResources.map { r =>
val resourceType = if (r.archive) LocalResourceType.ARCHIVE else LocalResourceType.FILE
val resource = localResourceManager.getLocalResource(
new Path(r.uri), resourceType, LocalResourceVisibility.PRIVATE)
(r.localPath, resource)
}.toMap
}

private def buildActorLaunchContextResource(containerDefinition: ContainerDefinition,
instanceId: InstanceId): LocalResource = {
import spray.json._
import akkeeper.api.ContainerDefinitionJsonProtocol._
val jsonStr = containerDefinition.actors.toJson.compactPrint
localResourceManager.createLocalResource(new ByteArrayInputStream(jsonStr.getBytes("UTF-8")),
localResourceManager.uploadLocalResource(new ByteArrayInputStream(jsonStr.getBytes("UTF-8")),
s"actors_$instanceId.json")
}

Expand Down
8 changes: 6 additions & 2 deletions akkeeper/src/main/scala/akkeeper/master/MasterRunner.scala
Expand Up @@ -15,6 +15,8 @@
*/
package akkeeper.master

import java.io.File

import akka.actor._
import akka.cluster.Cluster
import akka.http.scaladsl.Http
Expand Down Expand Up @@ -121,8 +123,10 @@ private[master] class YarnMasterRunner extends MasterRunner {

def run(masterArgs: MasterArguments): Unit = {
val config = masterArgs.config
.map(c => ConfigFactory.parseFile(c).withFallback(ConfigFactory.load()))
.getOrElse(ConfigFactory.load())
.map(c => ConfigFactory.parseFile(c))
.getOrElse(ConfigFactory.empty())
.withFallback(ConfigFactory.parseFile(new File(LocalResourceNames.ApplicationConfigName)))
.withFallback(ConfigFactory.load())

// Create and start the Kerberos ticket renewer if necessary.
val ticketRenewer = masterArgs.principal.map(principal => {
Expand Down

0 comments on commit 547f64e

Please sign in to comment.