Skip to content

Commit

Permalink
Make sure that the staging directory gets removed when the Akkeeper A…
Browse files Browse the repository at this point in the history
…M terminates
  • Loading branch information
izeigerman committed Nov 20, 2018
1 parent 1b613e0 commit 18dbd17
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
*/
package akkeeper.deploy.yarn

import java.io.{FileNotFoundException, ByteArrayInputStream}
import java.io.{ByteArrayInputStream, FileNotFoundException}
import java.util
import java.util.concurrent.{TimeUnit, Executors, ScheduledExecutorService}
import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit}

import akkeeper.common.{ContainerDefinition, InstanceId}
import akkeeper.container.ContainerInstanceMain
Expand All @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.slf4j.LoggerFactory

import scala.collection.mutable
import scala.concurrent.{Future, Promise}
import scala.util._
Expand Down Expand Up @@ -276,10 +277,22 @@ private[akkeeper] class YarnApplicationMaster(config: YarnApplicationMasterConfi
isRunning = false
}

private def cleanupStagingDirectory(): Unit = {
val path = new Path(stagingDirectory)
val fs = path.getFileSystem(config.yarnConf)
try {
fs.delete(path, true)
} catch {
case NonFatal(e) =>
logger.error("Failed to remove the staging directory", e)
}
}

override def stop(): Unit = {
if (isRunning) {
unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "")
stopClients()
cleanupStagingDirectory()
logger.info("YARN Application Master stopped")
}
}
Expand All @@ -288,6 +301,7 @@ private[akkeeper] class YarnApplicationMaster(config: YarnApplicationMasterConfi
if (isRunning) {
unregisterApplicationMaster(FinalApplicationStatus.FAILED, error.getMessage)
stopClients()
cleanupStagingDirectory()
logger.error("YARN Application Master stopped with errors", error)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,11 @@ class YarnApplicationMasterSpec extends FlatSpec with Matchers
val deployResult = await(actualResult(0))
deployResult shouldBe DeploySuccessful(instanceId)

val stagingDirectory = masterConfig.config.getString("akkeeper.yarn.staging-directory")
hadoopFs.exists(new Path(stagingDirectory, "appId")) shouldBe true
master.stop()
// Make sure that the staging directory has been cleaned up.
hadoopFs.exists(new Path(stagingDirectory, "appId")) shouldBe false
}

it should "handle failed deployment properly" in {
Expand Down

0 comments on commit 18dbd17

Please sign in to comment.