Skip to content

Commit

Permalink
[SPARK-8688][YARN]Bug fix: disable the cache fs to gain the HDFS conn…
Browse files Browse the repository at this point in the history
…ection.
  • Loading branch information
SaintBacchus committed Jun 28, 2015
1 parent f510045 commit cf776a1
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 5 deletions.
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,16 @@ class SparkHadoopUtil extends Logging {
* Stop the thread that does the delegation token updates.
*/
private[spark] def stopExecutorDelegationTokenRenewer() {}

/**
* Disable the hadoop fs cache mechanism, otherwise DFSClient will use old token to connect nn.
*/
private[spark] def getDiscachedConf(hadoopConf: Configuration, path: Path): Configuration = {
val newConf = new Configuration(hadoopConf)
val confKey = s"fs.${path.toUri.getScheme}.impl.disable.cache"
newConf.setBoolean(confKey, true)
newConf
}
}

object SparkHadoopUtil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ private[yarn] class AMDelegationTokenRenewer(
sparkConf.getInt("spark.yarn.credentials.file.retention.days", 5)
private val numFilesToKeep =
sparkConf.getInt("spark.yarn.credentials.file.retention.count", 5)
private val discachedConfiguration =
hadoopUtil.getDiscachedConf(hadoopConf, new Path(credentialsFile))

/**
* Schedule a login from the keytab and principal set using the --principal and --keytab
Expand Down Expand Up @@ -123,7 +125,7 @@ private[yarn] class AMDelegationTokenRenewer(
private def cleanupOldFiles(): Unit = {
import scala.concurrent.duration._
try {
val remoteFs = FileSystem.get(hadoopConf)
val remoteFs = FileSystem.get(discachedConfiguration)
val credentialsPath = new Path(credentialsFile)
val thresholdTime = System.currentTimeMillis() - (daysToKeepFiles days).toMillis
hadoopUtil.listFilesSorted(
Expand Down Expand Up @@ -169,13 +171,13 @@ private[yarn] class AMDelegationTokenRenewer(
// Get a copy of the credentials
override def run(): Void = {
val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst
hadoopUtil.obtainTokensForNamenodes(nns, hadoopConf, tempCreds)
hadoopUtil.obtainTokensForNamenodes(nns, discachedConfiguration, tempCreds)
null
}
})
// Add the temp credentials back to the original ones.
UserGroupInformation.getCurrentUser.addCredentials(tempCreds)
val remoteFs = FileSystem.get(hadoopConf)
val remoteFs = FileSystem.get(discachedConfiguration)
// If lastCredentialsFileSuffix is 0, then the AM is either started or restarted. If the AM
// was restarted, then the lastCredentialsFileSuffix might be > 0, so find the newest file
// and update the lastCredentialsFileSuffix.
Expand All @@ -194,7 +196,7 @@ private[yarn] class AMDelegationTokenRenewer(
val tempTokenPath = new Path(tokenPathStr + SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
logInfo("Writing out delegation tokens to " + tempTokenPath.toString)
val credentials = UserGroupInformation.getCurrentUser.getCredentials
credentials.writeTokenStorageFile(tempTokenPath, hadoopConf)
credentials.writeTokenStorageFile(tempTokenPath, discachedConfiguration)
logInfo(s"Delegation Tokens written out successfully. Renaming file to $tokenPathStr")
remoteFs.rename(tempTokenPath, tokenPath)
logInfo("Delegation token file rename complete.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ private[spark] class ExecutorDelegationTokenUpdater(
@volatile private var lastCredentialsFileSuffix = 0

private val credentialsFile = sparkConf.get("spark.yarn.credentials.file")
private val discachedConfiguration =
SparkHadoopUtil.get.getDiscachedConf(hadoopConf, new Path(credentialsFile))

private val delegationTokenRenewer =
Executors.newSingleThreadScheduledExecutor(
Expand All @@ -49,7 +51,7 @@ private[spark] class ExecutorDelegationTokenUpdater(
def updateCredentialsIfRequired(): Unit = {
try {
val credentialsFilePath = new Path(credentialsFile)
val remoteFs = FileSystem.get(hadoopConf)
val remoteFs = FileSystem.get(discachedConfiguration)
SparkHadoopUtil.get.listFilesSorted(
remoteFs, credentialsFilePath.getParent,
credentialsFilePath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
Expand Down

0 comments on commit cf776a1

Please sign in to comment.