Skip to content

Commit

Permalink
Replace explicit Configuration instantiation with SparkHadoopUtil.
Browse files Browse the repository at this point in the history
This is the basic grunt work; code doesn't fully compile yet, since
I'll do some of the more questionable changes in separate commits.
  • Loading branch information
Marcelo Vanzin committed Aug 7, 2014
1 parent b9e9e53 commit 1e7003f
Show file tree
Hide file tree
Showing 24 changed files with 79 additions and 71 deletions.
21 changes: 1 addition & 20 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -224,26 +224,7 @@ class SparkContext(config: SparkConf) extends Logging {
ui.bind()

/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration: Configuration = {
val hadoopConf = SparkHadoopUtil.get.newConfiguration()
// Explicitly check for S3 environment variables
if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
}
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
conf.getAll.foreach { case (key, value) =>
if (key.startsWith("spark.hadoop.")) {
hadoopConf.set(key.substring("spark.hadoop.".length), value)
}
}
val bufferSize = conf.get("spark.buffer.size", "65536")
hadoopConf.set("io.file.buffer.size", bufferSize)
hadoopConf
}
val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)

// Optionally log Spark events
private[spark] val eventLogger: Option[EventLoggingListener] = {
Expand Down
25 changes: 22 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark.{Logging, SparkContext, SparkException}
import org.apache.spark.{Logging, SparkContext, SparkConf, SparkException}

import scala.collection.JavaConversions._

Expand Down Expand Up @@ -68,7 +68,26 @@ class SparkHadoopUtil extends Logging {
* Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop
* subsystems.
*/
def newConfiguration(): Configuration = new Configuration()
def newConfiguration(conf: SparkConf): Configuration = {
val hadoopConf = new Configuration()
// Explicitly check for S3 environment variables
if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
}
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
conf.getAll.foreach { case (key, value) =>
if (key.startsWith("spark.hadoop.")) {
hadoopConf.set(key.substring("spark.hadoop.".length), value)
}
}
val bufferSize = conf.get("spark.buffer.size", "65536")
hadoopConf.set("io.file.buffer.size", bufferSize)
hadoopConf
}

/**
* Add any user credentials to the job conf which are necessary for running on a secure Hadoop
Expand All @@ -86,7 +105,7 @@ class SparkHadoopUtil extends Logging {

def getSecretKeyFromUserCredentials(key: String): Array[Byte] = { null }

def loginUserFromKeytab(principalName: String, keytabFilename: String) {
def loginUserFromKeytab(principalName: String, keytabFilename: String) {
UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileUtil, Path}

import org.apache.spark.Logging
import org.apache.spark.deploy.{Command, DriverDescription}
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.{Command, DriverDescription, SparkHadoopUtil}
import org.apache.spark.deploy.DeployMessages.DriverStateChanged
import org.apache.spark.deploy.master.DriverState
import org.apache.spark.deploy.master.DriverState.DriverState
Expand All @@ -39,6 +39,7 @@ import org.apache.spark.deploy.master.DriverState.DriverState
* This is currently only used in standalone cluster deploy mode.
*/
private[spark] class DriverRunner(
val conf: SparkConf,
val driverId: String,
val workDir: File,
val sparkHome: File,
Expand Down Expand Up @@ -144,8 +145,8 @@ private[spark] class DriverRunner(

val jarPath = new Path(driverDesc.jarUrl)

val emptyConf = new Configuration()
val jarFileSystem = jarPath.getFileSystem(emptyConf)
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
val jarFileSystem = jarPath.getFileSystem(hadoopConf)

val destPath = new File(driverDir.getAbsolutePath, jarPath.getName)
val jarFileName = jarPath.getName
Expand All @@ -154,7 +155,7 @@ private[spark] class DriverRunner(

if (!localJarFile.exists()) { // May already exist if running multiple workers on one node
logInfo(s"Copying user jar $jarPath to $destPath")
FileUtil.copy(jarFileSystem, jarPath, destPath, false, emptyConf)
FileUtil.copy(jarFileSystem, jarPath, destPath, false, hadoopConf)
}

if (!localJarFile.exists()) { // Verify copy succeeded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ private[spark] class Worker(
val fullId = appId + "/" + execId
if (ExecutorState.isFinished(state)) {
executors.get(fullId) match {
case Some(executor) =>
case Some(executor) =>
logInfo("Executor " + fullId + " finished with state " + state +
message.map(" message " + _).getOrElse("") +
exitStatus.map(" exitStatus " + _).getOrElse(""))
Expand Down Expand Up @@ -299,7 +299,7 @@ private[spark] class Worker(

case LaunchDriver(driverId, driverDesc) => {
logInfo(s"Asked to launch driver $driverId")
val driver = new DriverRunner(driverId, workDir, sparkHome, driverDesc, self, akkaUrl)
val driver = new DriverRunner(conf, driverId, workDir, sparkHome, driverDesc, self, akkaUrl)
drivers(driverId) = driver
driver.start()

Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,9 @@ private[spark] class Executor(
try {
val klass = Class.forName("org.apache.spark.repl.ExecutorClassLoader")
.asInstanceOf[Class[_ <: ClassLoader]]
val constructor = klass.getConstructor(classOf[String], classOf[ClassLoader],
classOf[Boolean])
constructor.newInstance(classUri, parent, userClassPathFirst)
val constructor = klass.getConstructor(classOf[SparkConf], classOf[String],
classOf[ClassLoader], classOf[Boolean])
constructor.newInstance(conf, classUri, parent, userClassPathFirst)
} catch {
case _: ClassNotFoundException =>
logError("Could not find org.apache.spark.repl.ExecutorClassLoader on classpath!")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ private[spark] object CheckpointRDD extends Logging {
val sc = new SparkContext(cluster, "CheckpointRDD Test")
val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000)
val path = new Path(hdfsPath, "temp")
val conf = SparkHadoopUtil.get.newConfiguration()
val conf = SparkHadoopUtil.get.newConfiguration(new SparkConf())
val fs = path.getFileSystem(conf)
val broadcastedConf = sc.broadcast(new SerializableWritable(conf))
sc.runJob(rdd, CheckpointRDD.writeToFile[Int](path.toString, broadcastedConf, 1024) _)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,14 @@ import org.apache.spark.util.{FileLogger, JsonProtocol, Utils}
private[spark] class EventLoggingListener(
appName: String,
sparkConf: SparkConf,
hadoopConf: Configuration = SparkHadoopUtil.get.newConfiguration())
hadoopConf: Configuration)
extends SparkListener with Logging {

import EventLoggingListener._

def this(appName: String, sparkConf: SparkConf) =
this(appName, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf))

private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false)
private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}

import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler.TaskSchedulerImpl

private[spark] class SimrSchedulerBackend(
Expand All @@ -42,7 +43,7 @@ private[spark] class SimrSchedulerBackend(
sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)

val conf = new Configuration()
val conf = SparkHadoopUtil.get.newConfiguration(sc.conf)
val fs = FileSystem.get(conf)

logInfo("Writing to HDFS file: " + driverFilePath)
Expand All @@ -61,7 +62,7 @@ private[spark] class SimrSchedulerBackend(
}

override def stop() {
val conf = new Configuration()
val conf = SparkHadoopUtil.get.newConfiguration(sc.conf)
val fs = FileSystem.get(conf)
fs.delete(new Path(driverFilePath), false)
super.stop()
Expand Down
11 changes: 10 additions & 1 deletion core/src/main/scala/org/apache/spark/util/FileLogger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,22 @@ import org.apache.spark.io.CompressionCodec
private[spark] class FileLogger(
logDir: String,
sparkConf: SparkConf,
hadoopConf: Configuration = SparkHadoopUtil.get.newConfiguration(),
hadoopConf: Configuration,
outputBufferSize: Int = 8 * 1024, // 8 KB
compress: Boolean = false,
overwrite: Boolean = true,
dirPermissions: Option[FsPermission] = None)
extends Logging {

def this(
logDir: String,
sparkConf: SparkConf,
compress: Boolean = false,
overwrite: Boolean = true) = {
this(logDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf), compress = compress,
overwrite = overwrite)
}

private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ class JsonProtocolSuite extends FunSuite {
new SparkConf, ExecutorState.RUNNING)
}
def createDriverRunner(): DriverRunner = {
new DriverRunner("driverId", new File("workDir"), new File("sparkHome"), createDriverDesc(),
null, "akka://worker")
new DriverRunner(new SparkConf(), "driverId", new File("workDir"), new File("sparkHome"),
createDriverDesc(), null, "akka://worker")
}

def assertValidJson(json: JValue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ class DriverRunnerTest extends FunSuite {
private def createDriverRunner() = {
val command = new Command("mainClass", Seq(), Map(), Seq(), Seq(), Seq())
val driverDescription = new DriverDescription("jarUrl", 512, 1, true, command)
new DriverRunner("driverId", new File("workDir"), new File("sparkHome"), driverDescription,
null, "akka://1.2.3.4/worker/")
new DriverRunner(new SparkConf(), "driverId", new File("workDir"), new File("sparkHome"),
driverDescription, null, "akka://1.2.3.4/worker/")
}

private def createProcessBuilderAndProcess(): (ProcessBuilderLike, Process) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ object SparkHdfsLR {

val sparkConf = new SparkConf().setAppName("SparkHdfsLR")
val inputPath = args(0)
val conf = SparkHadoopUtil.get.newConfiguration()
val conf = SparkHadoopUtil.get.newConfiguration(sparkConf)
val sc = new SparkContext(sparkConf,
InputFormatInfo.computePreferredLocations(
Seq(new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ object SparkTachyonHdfsLR {

def main(args: Array[String]) {
val inputPath = args(0)
val conf = SparkHadoopUtil.get.newConfiguration()
val sparkConf = new SparkConf().setAppName("SparkTachyonHdfsLR")
val conf = SparkHadoopUtil.get.newConfiguration(sparkConf)
val sc = new SparkContext(sparkConf,
InputFormatInfo.computePreferredLocations(
Seq(new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@ import java.io.{ByteArrayOutputStream, InputStream}
import java.net.{URI, URL, URLEncoder}
import java.util.concurrent.{Executors, ExecutorService}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.SparkEnv
import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.util.Utils
import org.apache.spark.util.ParentClassLoader

Expand All @@ -36,7 +35,7 @@ import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._
* used to load classes defined by the interpreter when the REPL is used.
* Allows the user to specify if user class path should be first
*/
class ExecutorClassLoader(classUri: String, parent: ClassLoader,
class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: ClassLoader,
userClassPathFirst: Boolean) extends ClassLoader {
val uri = new URI(classUri)
val directory = uri.getPath
Expand All @@ -48,7 +47,7 @@ class ExecutorClassLoader(classUri: String, parent: ClassLoader,
if (uri.getScheme() == "http") {
null
} else {
FileSystem.get(uri, new Configuration())
FileSystem.get(uri, SparkHadoopUtil.get.newConfiguration(conf))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.scalatest.FunSuite

import com.google.common.io.Files

import org.apache.spark.TestUtils
import org.apache.spark.{SparkConf, TestUtils}
import org.apache.spark.util.Utils

class ExecutorClassLoaderSuite extends FunSuite with BeforeAndAfterAll {
Expand Down Expand Up @@ -57,31 +57,31 @@ class ExecutorClassLoaderSuite extends FunSuite with BeforeAndAfterAll {

test("child first") {
val parentLoader = new URLClassLoader(urls2, null)
val classLoader = new ExecutorClassLoader(url1, parentLoader, true)
val classLoader = new ExecutorClassLoader(new SparkConf(), url1, parentLoader, true)
val fakeClass = classLoader.loadClass("ReplFakeClass2").newInstance()
val fakeClassVersion = fakeClass.toString
assert(fakeClassVersion === "1")
}

test("parent first") {
val parentLoader = new URLClassLoader(urls2, null)
val classLoader = new ExecutorClassLoader(url1, parentLoader, false)
val classLoader = new ExecutorClassLoader(new SparkConf(), url1, parentLoader, false)
val fakeClass = classLoader.loadClass("ReplFakeClass1").newInstance()
val fakeClassVersion = fakeClass.toString
assert(fakeClassVersion === "2")
}

test("child first can fall back") {
val parentLoader = new URLClassLoader(urls2, null)
val classLoader = new ExecutorClassLoader(url1, parentLoader, true)
val classLoader = new ExecutorClassLoader(new SparkConf(), url1, parentLoader, true)
val fakeClass = classLoader.loadClass("ReplFakeClass3").newInstance()
val fakeClassVersion = fakeClass.toString
assert(fakeClassVersion === "2")
}

test("child first can fail") {
val parentLoader = new URLClassLoader(urls2, null)
val classLoader = new ExecutorClassLoader(url1, parentLoader, true)
val classLoader = new ExecutorClassLoader(new SparkConf(), url1, parentLoader, true)
intercept[java.lang.ClassNotFoundException] {
classLoader.loadClass("ReplFakeClassDoesNotExist").newInstance()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
sparkConf: SparkConf) extends Logging {

def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
this(args, new Configuration(), sparkConf)
this(args, SparkHadoopUtil.get.newConfiguration(sparkConf), sparkConf)

def this(args: ApplicationMasterArguments) = this(args, new SparkConf())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
extends YarnClientImpl with ClientBase with Logging {

def this(clientArgs: ClientArguments, spConf: SparkConf) =
this(clientArgs, new Configuration(), spConf)
this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf)

def this(clientArgs: ClientArguments) = this(clientArgs, new SparkConf())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
extends Logging {

def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
this(args, new Configuration(), sparkConf)
this(args, SparkHadoopUtil.get.newConfiguration(sparkConf), sparkConf)

def this(args: ApplicationMasterArguments) = this(args, new SparkConf())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {

// Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
// Always create a new config, dont reuse yarnConf.
override def newConfiguration(): Configuration = new YarnConfiguration(new Configuration())
override def newConfiguration(conf: SparkConf): Configuration =
new YarnConfiguration(super.newConfiguration(conf))

// add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
override def addCredentials(conf: JobConf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.scheduler.cluster

import org.apache.spark._
import org.apache.hadoop.conf.Configuration
import org.apache.spark.deploy.yarn.YarnAllocationHandler
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.util.Utils
Expand All @@ -27,14 +26,12 @@ import org.apache.spark.util.Utils
*
* This scheduler launches executors through Yarn - by calling into Client to launch ExecutorLauncher as AM.
*/
private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) {

def this(sc: SparkContext) = this(sc, new Configuration())
private[spark] class YarnClientClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {

// By default, rack is unknown
override def getRackForHost(hostPort: String): Option[String] = {
val host = Utils.parseHostPort(hostPort)._1
val retval = YarnAllocationHandler.lookupRack(conf, host)
val retval = YarnAllocationHandler.lookupRack(sc.hadoopConfiguration, host)
if (retval != null) Some(retval) else None
}
}
Loading

0 comments on commit 1e7003f

Please sign in to comment.