From 11907b34e4b70a640944164b9db66cf507f3918c Mon Sep 17 00:00:00 2001 From: Calvin Jia Date: Tue, 3 Mar 2015 19:18:46 -0800 Subject: [PATCH] Use TachyonURI for tachyon paths instead of strings. --- .../spark/storage/TachyonBlockManager.scala | 27 +++++++++---------- .../scala/org/apache/spark/util/Utils.scala | 4 ++- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala index af873034215a9..2ab6a8f3ec1d4 100644 --- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala @@ -20,8 +20,8 @@ package org.apache.spark.storage import java.text.SimpleDateFormat import java.util.{Date, Random} -import tachyon.client.TachyonFS -import tachyon.client.TachyonFile +import tachyon.TachyonURI +import tachyon.client.{TachyonFile, TachyonFS} import org.apache.spark.Logging import org.apache.spark.executor.ExecutorExitCode @@ -40,7 +40,7 @@ private[spark] class TachyonBlockManager( val master: String) extends Logging { - val client = if (master != null && master != "") TachyonFS.get(master) else null + val client = if (master != null && master != "") TachyonFS.get(new TachyonURI(master)) else null if (client == null) { logError("Failed to connect to the Tachyon as the master address is not configured") @@ -60,11 +60,11 @@ private[spark] class TachyonBlockManager( addShutdownHook() def removeFile(file: TachyonFile): Boolean = { - client.delete(file.getPath(), false) + client.delete(new TachyonURI(file.getPath()), false) } def fileExists(file: TachyonFile): Boolean = { - client.exist(file.getPath()) + client.exist(new TachyonURI(file.getPath())) } def getFile(filename: String): TachyonFile = { @@ -81,7 +81,7 @@ private[spark] class TachyonBlockManager( if (old != null) { old } else { - val path = tachyonDirs(dirId) + "/" + "%02x".format(subDirId) + val path = new TachyonURI(s"${tachyonDirs(dirId)}/${"%02x".format(subDirId)}") client.mkdir(path) val newDir = client.getFile(path) subDirs(dirId)(subDirId) = newDir @@ -89,7 +89,7 @@ private[spark] class TachyonBlockManager( } } } - val filePath = subDir + "/" + filename + val filePath = new TachyonURI(s"$subDir/$filename") if(!client.exist(filePath)) { client.createFile(filePath) } @@ -101,7 +101,7 @@ private[spark] class TachyonBlockManager( // TODO: Some of the logic here could be consolidated/de-duplicated with that in the DiskStore. private def createTachyonDirs(): Array[TachyonFile] = { - logDebug("Creating tachyon directories at root dirs '" + rootDirs + "'") + logDebug(s"Creating tachyon directories at root dirs '$rootDirs'") val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss") rootDirs.split(",").map { rootDir => var foundLocalDir = false @@ -113,22 +113,21 @@ private[spark] class TachyonBlockManager( tries += 1 try { tachyonDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536)) - val path = rootDir + "/" + "spark-tachyon-" + tachyonDirId + val path = new TachyonURI(s"$rootDir/spark-tachyon-$tachyonDirId") if (!client.exist(path)) { foundLocalDir = client.mkdir(path) tachyonDir = client.getFile(path) } } catch { case e: Exception => - logWarning("Attempt " + tries + " to create tachyon dir " + tachyonDir + " failed", e) + logWarning(s"Attempt $tries to create tachyon dir $tachyonDir failed", e) } } if (!foundLocalDir) { - logError("Failed " + MAX_DIR_CREATION_ATTEMPTS + " attempts to create tachyon dir in " + - rootDir) + logError(s"Failed $MAX_DIR_CREATION_ATTEMPTS attempts to create tachyon dir in $rootDir") System.exit(ExecutorExitCode.TACHYON_STORE_FAILED_TO_CREATE_DIR) } - logInfo("Created tachyon directory at " + tachyonDir) + logInfo(s"Created tachyon directory at $tachyonDir") tachyonDir } } @@ -145,7 +144,7 @@ private[spark] class TachyonBlockManager( } } catch { case e: Exception => - logError("Exception while deleting tachyon spark dir: " + tachyonDir, e) + logError(s"Exception while deleting tachyon spark dir: $tachyonDir", e) } } client.close() diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 4644088f19f4b..308b929188e42 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -42,6 +42,8 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.log4j.PropertyConfigurator import org.eclipse.jetty.util.MultiException import org.json4s._ + +import tachyon.TachyonURI import tachyon.client.{TachyonFS, TachyonFile} import org.apache.spark._ @@ -965,7 +967,7 @@ private[spark] object Utils extends Logging { * Delete a file or directory and its contents recursively. */ def deleteRecursively(dir: TachyonFile, client: TachyonFS) { - if (!client.delete(dir.getPath(), true)) { + if (!client.delete(new TachyonURI(dir.getPath()), true)) { throw new IOException("Failed to delete the tachyon dir: " + dir) } }