Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zhzhan committed Sep 22, 2014
2 parents 57ea52e + 3ee3b2b commit d48bd18
Show file tree
Hide file tree
Showing 105 changed files with 2,465 additions and 1,681 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
*~
*.#*
*#*#
*.swp
*.ipr
*.iml
*.iws
.idea/
.idea_modules/
sbt/*.jar
.settings
.cache
Expand All @@ -16,6 +19,7 @@ third_party/libmesos.so
third_party/libmesos.dylib
conf/java-opts
conf/*.sh
conf/*.cmd
conf/*.properties
conf/*.conf
conf/*.xml
Expand Down
2 changes: 2 additions & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ log4j.properties.template
metrics.properties.template
slaves
spark-env.sh
spark-env.cmd
spark-env.sh.template
log4j-defaults.properties
bootstrap-tooltip.js
Expand Down Expand Up @@ -58,3 +59,4 @@ dist/*
.*iws
logs
.*scalastyle-output.xml
.*dependency-reduced-pom.xml
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ submitting any copyrighted material via pull request, email, or other means
you agree to license the material under the project's open source license and
warrant that you have the legal authority to do so.

Please see [Contributing to Spark wiki page](https://cwiki.apache.org/SPARK/Contributing+to+Spark)
Please see the [Contributing to Spark wiki page](https://cwiki.apache.org/SPARK/Contributing+to+Spark)
for more information.
2 changes: 1 addition & 1 deletion bin/spark-sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
set -o posix

CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
CLASS_NOT_FOUND_EXIT_STATUS=1
CLASS_NOT_FOUND_EXIT_STATUS=101

# Figure out where Spark is installed
FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
}
}
} else {
logWarning ("No need to commit output of task: " + taID.value)
logInfo ("No need to commit output of task: " + taID.value)
}
}

Expand Down
31 changes: 25 additions & 6 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -775,17 +775,36 @@ private[spark] object PythonRDD extends Logging {
}.toJavaRDD()
}

private class AutoBatchedPickler(iter: Iterator[Any]) extends Iterator[Array[Byte]] {
private val pickle = new Pickler()
private var batch = 1
private val buffer = new mutable.ArrayBuffer[Any]

override def hasNext(): Boolean = iter.hasNext

override def next(): Array[Byte] = {
while (iter.hasNext && buffer.length < batch) {
buffer += iter.next()
}
val bytes = pickle.dumps(buffer.toArray)
val size = bytes.length
// let 1M < size < 10M
if (size < 1024 * 1024) {
batch *= 2
} else if (size > 1024 * 1024 * 10 && batch > 1) {
batch /= 2
}
buffer.clear()
bytes
}
}

/**
* Convert an RDD of Java objects to an RDD of serialized Python objects, that is usable by
* PySpark.
*/
def javaToPython(jRDD: JavaRDD[Any]): JavaRDD[Array[Byte]] = {
jRDD.rdd.mapPartitions { iter =>
val pickle = new Pickler
iter.map { row =>
pickle.dumps(row)
}
}
jRDD.rdd.mapPartitions { iter => new AutoBatchedPickler(iter) }
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ private[python] object SerDeUtil extends Logging {
construct(args ++ Array(""))
} else if (args.length == 2 && args(1).isInstanceOf[String]) {
val typecode = args(0).asInstanceOf[String].charAt(0)
val data: String = args(1).asInstanceOf[String]
construct(typecode, machineCodes(typecode), data.getBytes("ISO-8859-1"))
val data: Array[Byte] = args(1).asInstanceOf[String].getBytes("ISO-8859-1")
construct(typecode, machineCodes(typecode), data)
} else {
super.construct(args)
}
Expand Down
9 changes: 5 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ object SparkSubmit {
private val SPARK_SHELL = "spark-shell"
private val PYSPARK_SHELL = "pyspark-shell"

private val CLASS_NOT_FOUND_EXIT_STATUS = 1
private val CLASS_NOT_FOUND_EXIT_STATUS = 101

// Exposed for testing
private[spark] var exitFn: () => Unit = () => System.exit(-1)
Expand Down Expand Up @@ -172,7 +172,7 @@ object SparkSubmit {
// All cluster managers
OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.master"),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"),
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.jars"),
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"),
OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT,
sysProp = "spark.driver.memory"),
OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
Expand All @@ -183,6 +183,7 @@ object SparkSubmit {
sysProp = "spark.driver.extraLibraryPath"),

// Standalone cluster only
OptionAssigner(args.jars, STANDALONE, CLUSTER, sysProp = "spark.jars"),
OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = "--memory"),
OptionAssigner(args.driverCores, STANDALONE, CLUSTER, clOption = "--cores"),

Expand Down Expand Up @@ -261,7 +262,7 @@ object SparkSubmit {
}

// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (clusterManager == YARN && deployMode == CLUSTER) {
if (isYarnCluster) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
if (args.primaryResource != SPARK_INTERNAL) {
childArgs += ("--jar", args.primaryResource)
Expand All @@ -279,7 +280,7 @@ object SparkSubmit {
}

// Read from default spark properties, if any
for ((k, v) <- args.getDefaultSparkProperties) {
for ((k, v) <- args.defaultSparkProperties) {
sysProps.getOrElseUpdate(k, v)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
var pyFiles: String = null
val sparkProperties: HashMap[String, String] = new HashMap[String, String]()

parseOpts(args.toList)
mergeSparkProperties()
checkRequiredArguments()

/** Return default present in the currently defined defaults file. */
def getDefaultSparkProperties = {
/** Default properties present in the currently defined defaults file. */
lazy val defaultSparkProperties: HashMap[String, String] = {
val defaultProperties = new HashMap[String, String]()
if (verbose) SparkSubmit.printStream.println(s"Using properties file: $propertiesFile")
Option(propertiesFile).foreach { filename =>
Expand All @@ -79,6 +75,10 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
defaultProperties
}

parseOpts(args.toList)
mergeSparkProperties()
checkRequiredArguments()

/**
* Fill in any undefined values based on the default properties file or options passed in through
* the '--conf' flag.
Expand Down Expand Up @@ -107,7 +107,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
}
}

val properties = getDefaultSparkProperties
val properties = HashMap[String, String]()
properties.putAll(defaultSparkProperties)
properties.putAll(sparkProperties)

// Use properties file as fallback for values which have a direct analog to
Expand Down Expand Up @@ -213,7 +214,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
| verbose $verbose
|
|Default properties from $propertiesFile:
|${getDefaultSparkProperties.mkString(" ", "\n ", "\n")}
|${defaultSparkProperties.mkString(" ", "\n ", "\n")}
""".stripMargin
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
}

private val appHeader = Seq(
"App ID",
"App Name",
"Started",
"Completed",
Expand All @@ -81,7 +82,8 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
val duration = UIUtils.formatDuration(info.endTime - info.startTime)
val lastUpdated = UIUtils.formatDate(info.lastUpdated)
<tr>
<td><a href={uiAddress}>{info.name}</a></td>
<td><a href={uiAddress}>{info.id}</a></td>
<td>{info.name}</td>
<td>{startTime}</td>
<td>{endTime}</td>
<td>{duration}</td>
Expand Down
11 changes: 6 additions & 5 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -489,23 +489,24 @@ private[spark] class Master(
// First schedule drivers, they take strict precedence over applications
// Randomization helps balance drivers
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val aliveWorkerNum = shuffledAliveWorkers.size
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0

for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
// start from the last worker that was assigned a driver, and continue onwards until we have
// explored all alive workers.
curPos = (curPos + 1) % aliveWorkerNum
val startPos = curPos
var launched = false
while (curPos != startPos && !launched) {
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % aliveWorkerNum
curPos = (curPos + 1) % numWorkersAlive
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ class TaskMetrics extends Serializable {
merged.localBlocksFetched += depMetrics.localBlocksFetched
merged.remoteBlocksFetched += depMetrics.remoteBlocksFetched
merged.remoteBytesRead += depMetrics.remoteBytesRead
merged.shuffleFinishTime = math.max(merged.shuffleFinishTime, depMetrics.shuffleFinishTime)
}
_shuffleReadMetrics = Some(merged)
}
Expand Down Expand Up @@ -177,11 +176,6 @@ case class InputMetrics(readMethod: DataReadMethod.Value) {
*/
@DeveloperApi
class ShuffleReadMetrics extends Serializable {
/**
* Absolute time when this task finished reading shuffle data
*/
var shuffleFinishTime: Long = -1

/**
* Number of blocks fetched in this shuffle by this task (remote or local)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ private[spark] class ReceivingConnection(
if (currId != null) currId else super.getRemoteConnectionManagerId()
}

// The reciever's remote address is the local socket on remote side : which is NOT
// The receiver's remote address is the local socket on remote side : which is NOT
// the connection manager id of the receiver.
// We infer that from the messages we receive on the receiver socket.
private def processConnectionManagerId(header: MessageChunkHeader) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ private[nio] class ConnectionManager(

def changeConnectionKeyInterest(connection: Connection, ops: Int) {
keyInterestChangeRequests += ((connection.key, ops))
// so that registerations happen !
// so that registrations happen !
wakeupSelector()
}

Expand Down Expand Up @@ -832,7 +832,7 @@ private[nio] class ConnectionManager(
}

/**
* Send a message and block until an acknowldgment is received or an error occurs.
* Send a message and block until an acknowledgment is received or an error occurs.
* @param connectionManagerId the message's destination
* @param message the message being sent
* @return a Future that either returns the acknowledgment message or captures an exception.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,12 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName)
hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
}
hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])

// Use configured output committer if already set
if (conf.getOutputCommitter == null) {
hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
}

FileOutputFormat.setOutputPath(hadoopConf,
SparkHadoopWriter.createPathFromString(path, hadoopConf))
saveAsHadoopDataset(hadoopConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
}
val shuffleReadMetrics = taskMetrics.shuffleReadMetrics match {
case Some(metrics) =>
" SHUFFLE_FINISH_TIME=" + metrics.shuffleFinishTime +
" BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched +
" BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched +
" BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ private[spark] class SparkDeploySchedulerBackend(
var client: AppClient = null
var stopping = false
var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _
var appId: String = _
@volatile var appId: String = _

val registrationLock = new Object()
var registrationDone = false
Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,6 @@ private[spark] object JsonProtocol {
}

def shuffleReadMetricsToJson(shuffleReadMetrics: ShuffleReadMetrics): JValue = {
("Shuffle Finish Time" -> shuffleReadMetrics.shuffleFinishTime) ~
("Remote Blocks Fetched" -> shuffleReadMetrics.remoteBlocksFetched) ~
("Local Blocks Fetched" -> shuffleReadMetrics.localBlocksFetched) ~
("Fetch Wait Time" -> shuffleReadMetrics.fetchWaitTime) ~
Expand Down Expand Up @@ -590,7 +589,6 @@ private[spark] object JsonProtocol {

def shuffleReadMetricsFromJson(json: JValue): ShuffleReadMetrics = {
val metrics = new ShuffleReadMetrics
metrics.shuffleFinishTime = (json \ "Shuffle Finish Time").extract[Long]
metrics.remoteBlocksFetched = (json \ "Remote Blocks Fetched").extract[Int]
metrics.localBlocksFetched = (json \ "Local Blocks Fetched").extract[Int]
metrics.fetchWaitTime = (json \ "Fetch Wait Time").extract[Long]
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1382,15 +1382,15 @@ private[spark] object Utils extends Logging {
}

/**
* Default number of retries in binding to a port.
* Default maximum number of retries when binding to a port before giving up.
*/
val portMaxRetries: Int = {
if (sys.props.contains("spark.testing")) {
// Set a higher number of retries for tests...
sys.props.get("spark.ports.maxRetries").map(_.toInt).getOrElse(100)
sys.props.get("spark.port.maxRetries").map(_.toInt).getOrElse(100)
} else {
Option(SparkEnv.get)
.flatMap(_.conf.getOption("spark.ports.maxRetries"))
.flatMap(_.conf.getOption("spark.port.maxRetries"))
.map(_.toInt)
.getOrElse(16)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,13 @@ class JsonProtocolSuite extends FunSuite {
workerInfo.lastHeartbeat = JsonConstants.currTimeInMillis
workerInfo
}

def createExecutorRunner(): ExecutorRunner = {
new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host",
new File("sparkHome"), new File("workDir"), "akka://worker",
new SparkConf, ExecutorState.RUNNING)
}

def createDriverRunner(): DriverRunner = {
new DriverRunner(new SparkConf(), "driverId", new File("workDir"), new File("sparkHome"),
createDriverDesc(), null, "akka://worker")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ class SparkSubmitSuite extends FunSuite with Matchers {
sysProps("spark.app.name") should be ("beauty")
sysProps("spark.shuffle.spill") should be ("false")
sysProps("SPARK_SUBMIT") should be ("true")
sysProps.keys should not contain ("spark.jars")
}

test("handles YARN client mode") {
Expand Down
Loading

0 comments on commit d48bd18

Please sign in to comment.