From cf776a14725940e888ec187d210b74e1cc24c191 Mon Sep 17 00:00:00 2001 From: huangzhaowei Date: Sun, 28 Jun 2015 16:19:17 +0800 Subject: [PATCH] [SPARK-8688][YARN]Bug fix: disable the cache fs to gain the HDFS connection. --- .../org/apache/spark/deploy/SparkHadoopUtil.scala | 10 ++++++++++ .../spark/deploy/yarn/AMDelegationTokenRenewer.scala | 10 ++++++---- .../deploy/yarn/ExecutorDelegationTokenUpdater.scala | 4 +++- 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 7fa75ac8c2b54..909c70b999490 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -334,6 +334,16 @@ class SparkHadoopUtil extends Logging { * Stop the thread that does the delegation token updates. */ private[spark] def stopExecutorDelegationTokenRenewer() {} + + /** + * Disable the hadoop fs cache mechanism, otherwise DFSClient will use old token to connect nn. + */ + private[spark] def getDiscachedConf(hadoopConf: Configuration, path: Path): Configuration = { + val newConf = new Configuration(hadoopConf) + val confKey = s"fs.${path.toUri.getScheme}.impl.disable.cache" + newConf.setBoolean(confKey, true) + newConf + } } object SparkHadoopUtil { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala index 77af46c192cc2..08dc93dd01f2c 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala @@ -65,6 +65,8 @@ private[yarn] class AMDelegationTokenRenewer( sparkConf.getInt("spark.yarn.credentials.file.retention.days", 5) private val numFilesToKeep = sparkConf.getInt("spark.yarn.credentials.file.retention.count", 5) + private val discachedConfiguration = + hadoopUtil.getDiscachedConf(hadoopConf, new Path(credentialsFile)) /** * Schedule a login from the keytab and principal set using the --principal and --keytab @@ -123,7 +125,7 @@ private[yarn] class AMDelegationTokenRenewer( private def cleanupOldFiles(): Unit = { import scala.concurrent.duration._ try { - val remoteFs = FileSystem.get(hadoopConf) + val remoteFs = FileSystem.get(discachedConfiguration) val credentialsPath = new Path(credentialsFile) val thresholdTime = System.currentTimeMillis() - (daysToKeepFiles days).toMillis hadoopUtil.listFilesSorted( @@ -169,13 +171,13 @@ private[yarn] class AMDelegationTokenRenewer( // Get a copy of the credentials override def run(): Void = { val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst - hadoopUtil.obtainTokensForNamenodes(nns, hadoopConf, tempCreds) + hadoopUtil.obtainTokensForNamenodes(nns, discachedConfiguration, tempCreds) null } }) // Add the temp credentials back to the original ones. UserGroupInformation.getCurrentUser.addCredentials(tempCreds) - val remoteFs = FileSystem.get(hadoopConf) + val remoteFs = FileSystem.get(discachedConfiguration) // If lastCredentialsFileSuffix is 0, then the AM is either started or restarted. If the AM // was restarted, then the lastCredentialsFileSuffix might be > 0, so find the newest file // and update the lastCredentialsFileSuffix. @@ -194,7 +196,7 @@ private[yarn] class AMDelegationTokenRenewer( val tempTokenPath = new Path(tokenPathStr + SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION) logInfo("Writing out delegation tokens to " + tempTokenPath.toString) val credentials = UserGroupInformation.getCurrentUser.getCredentials - credentials.writeTokenStorageFile(tempTokenPath, hadoopConf) + credentials.writeTokenStorageFile(tempTokenPath, discachedConfiguration) logInfo(s"Delegation Tokens written out successfully. Renaming file to $tokenPathStr") remoteFs.rename(tempTokenPath, tokenPath) logInfo("Delegation token file rename complete.") diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala index 229c2c4d5eb36..90d369bdb8830 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala @@ -35,6 +35,8 @@ private[spark] class ExecutorDelegationTokenUpdater( @volatile private var lastCredentialsFileSuffix = 0 private val credentialsFile = sparkConf.get("spark.yarn.credentials.file") + private val discachedConfiguration = + SparkHadoopUtil.get.getDiscachedConf(hadoopConf, new Path(credentialsFile)) private val delegationTokenRenewer = Executors.newSingleThreadScheduledExecutor( @@ -49,7 +51,7 @@ private[spark] class ExecutorDelegationTokenUpdater( def updateCredentialsIfRequired(): Unit = { try { val credentialsFilePath = new Path(credentialsFile) - val remoteFs = FileSystem.get(hadoopConf) + val remoteFs = FileSystem.get(discachedConfiguration) SparkHadoopUtil.get.listFilesSorted( remoteFs, credentialsFilePath.getParent, credentialsFilePath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)