Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/spark into master_…
Browse files Browse the repository at this point in the history
…nravi
  • Loading branch information
nishkamravi2 committed Nov 19, 2014
2 parents 5c9a4cb + f9adda9 commit b446edc
Show file tree
Hide file tree
Showing 86 changed files with 1,291 additions and 321 deletions.
2 changes: 1 addition & 1 deletion assembly/pom.xml
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.2.0-SNAPSHOT</version>
<version>1.3.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion bagel/pom.xml
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.2.0-SNAPSHOT</version>
<version>1.3.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
3 changes: 3 additions & 0 deletions bin/spark-submit
Expand Up @@ -22,6 +22,9 @@
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
ORIG_ARGS=("$@")

# Set COLUMNS for progress bar
export COLUMNS=`tput cols`

while (($#)); do
if [ "$1" = "--deploy-mode" ]; then
SPARK_SUBMIT_DEPLOY_MODE=$2
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.2.0-SNAPSHOT</version>
<version>1.3.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
1 change: 1 addition & 0 deletions core/src/main/java/org/apache/spark/SparkStageInfo.java
Expand Up @@ -26,6 +26,7 @@
public interface SparkStageInfo {
int stageId();
int currentAttemptId();
long submissionTime();
String name();
int numTasks();
int numActiveTasks();
Expand Down
Expand Up @@ -26,13 +26,6 @@ $(function() {
// Switch the class of the arrow from open to closed.
$(this).find('.expand-additional-metrics-arrow').toggleClass('arrow-open');
$(this).find('.expand-additional-metrics-arrow').toggleClass('arrow-closed');

// If clicking caused the metrics to expand, automatically check all options for additional
// metrics (don't trigger a click when collapsing metrics, because it leads to weird
// toggling behavior).
if (!$(additionalMetricsDiv).hasClass('collapsed')) {
$(this).parent().find('input:checkbox:not(:checked)').trigger('click');
}
});

$("input:checkbox:not(:checked)").each(function() {
Expand All @@ -48,6 +41,16 @@ $(function() {
stripeTables();
});

$("#select-all-metrics").click(function() {
if (this.checked) {
// Toggle all un-checked options.
$('input:checkbox:not(:checked)').trigger('click');
} else {
// Toggle all checked options.
$('input:checkbox:checked').trigger('click');
}
});

// Trigger a click on the checkbox if a user clicks the label next to it.
$("span.additional-metric-title").click(function() {
$(this).parent().find('input:checkbox').trigger('click');
Expand Down
13 changes: 12 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Expand Up @@ -50,7 +50,7 @@ import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SparkD
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import org.apache.spark.scheduler.local.LocalBackend
import org.apache.spark.storage._
import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.{SparkUI, ConsoleProgressBar}
import org.apache.spark.ui.jobs.JobProgressListener
import org.apache.spark.util._

Expand Down Expand Up @@ -245,6 +245,13 @@ class SparkContext(config: SparkConf) extends Logging {

val statusTracker = new SparkStatusTracker(this)

private[spark] val progressBar: Option[ConsoleProgressBar] =
if (conf.getBoolean("spark.ui.showConsoleProgress", true) && !log.isInfoEnabled) {
Some(new ConsoleProgressBar(this))
} else {
None
}

// Initialize the Spark UI
private[spark] val ui: Option[SparkUI] =
if (conf.getBoolean("spark.ui.enabled", true)) {
Expand Down Expand Up @@ -1274,6 +1281,7 @@ class SparkContext(config: SparkConf) extends Logging {
logInfo("Starting job: " + callSite.shortForm)
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}

Expand Down Expand Up @@ -1805,6 +1813,9 @@ object SparkContext extends Logging {
def localCpuCount = Runtime.getRuntime.availableProcessors()
// local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
val threadCount = if (threads == "*") localCpuCount else threads.toInt
if (threadCount <= 0) {
throw new SparkException(s"Asked to run locally with $threadCount threads")
}
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalBackend(scheduler, threadCount)
scheduler.initialize(backend)
Expand Down
Expand Up @@ -96,6 +96,7 @@ class SparkStatusTracker private[spark] (sc: SparkContext) {
new SparkStageInfoImpl(
stageId,
info.attemptId,
info.submissionTime.getOrElse(0),
info.name,
info.numTasks,
data.numActiveTasks,
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/StatusAPIImpl.scala
Expand Up @@ -26,6 +26,7 @@ private class SparkJobInfoImpl (
private class SparkStageInfoImpl(
val stageId: Int,
val currentAttemptId: Int,
val submissionTime: Long,
val name: String,
val numTasks: Int,
val numActiveTasks: Int,
Expand Down
24 changes: 16 additions & 8 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Expand Up @@ -47,7 +47,7 @@ private[spark] class PythonRDD(
pythonIncludes: JList[String],
preservePartitoning: Boolean,
pythonExec: String,
broadcastVars: JList[Broadcast[Array[Byte]]],
broadcastVars: JList[Broadcast[Array[Array[Byte]]]],
accumulator: Accumulator[JList[Array[Byte]]])
extends RDD[Array[Byte]](parent) {

Expand Down Expand Up @@ -230,8 +230,8 @@ private[spark] class PythonRDD(
if (!oldBids.contains(broadcast.id)) {
// send new broadcast
dataOut.writeLong(broadcast.id)
dataOut.writeInt(broadcast.value.length)
dataOut.write(broadcast.value)
dataOut.writeLong(broadcast.value.map(_.length.toLong).sum)
broadcast.value.foreach(dataOut.write)
oldBids.add(broadcast.id)
}
}
Expand Down Expand Up @@ -368,16 +368,24 @@ private[spark] object PythonRDD extends Logging {
}
}

def readBroadcastFromFile(sc: JavaSparkContext, filename: String): Broadcast[Array[Byte]] = {
def readBroadcastFromFile(
sc: JavaSparkContext,
filename: String): Broadcast[Array[Array[Byte]]] = {
val size = new File(filename).length()
val file = new DataInputStream(new FileInputStream(filename))
val blockSize = 1 << 20
val n = ((size + blockSize - 1) / blockSize).toInt
val obj = new Array[Array[Byte]](n)
try {
val length = file.readInt()
val obj = new Array[Byte](length)
file.readFully(obj)
sc.broadcast(obj)
for (i <- 0 until n) {
val length = if (i < (n - 1)) blockSize else (size % blockSize).toInt
obj(i) = new Array[Byte](length)
file.readFully(obj(i))
}
} finally {
file.close()
}
sc.broadcast(obj)
}

def writeIteratorToStream[T](iter: Iterator[T], dataOut: DataOutputStream) {
Expand Down
14 changes: 12 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.deploy

import java.net.{URI, URISyntaxException}

import scala.collection.mutable.ListBuffer

import org.apache.log4j.Level
Expand Down Expand Up @@ -73,7 +75,8 @@ private[spark] class ClientArguments(args: Array[String]) {

if (!ClientArguments.isValidJarUrl(_jarUrl)) {
println(s"Jar url '${_jarUrl}' is not in valid format.")
println(s"Must be a jar file path in URL format (e.g. hdfs://XX.jar, file://XX.jar)")
println(s"Must be a jar file path in URL format " +
"(e.g. hdfs://host:port/XX.jar, file:///XX.jar)")
printUsageAndExit(-1)
}

Expand Down Expand Up @@ -114,5 +117,12 @@ private[spark] class ClientArguments(args: Array[String]) {
}

object ClientArguments {
def isValidJarUrl(s: String): Boolean = s.matches("(.+):(.+)jar")
def isValidJarUrl(s: String): Boolean = {
try {
val uri = new URI(s)
uri.getScheme != null && uri.getPath != null && uri.getPath.endsWith(".jar")
} catch {
case _: URISyntaxException => false
}
}
}
Expand Up @@ -134,7 +134,7 @@ private[spark] object SparkSubmitDriverBootstrapper {
override def run() = {
if (process != null) {
process.destroy()
sys.exit(process.waitFor())
process.waitFor()
}
}
})
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/package.scala
Expand Up @@ -44,5 +44,5 @@ package org.apache

package object spark {
// For package docs only
val SPARK_VERSION = "1.2.0-SNAPSHOT"
val SPARK_VERSION = "1.3.0-SNAPSHOT"
}
31 changes: 17 additions & 14 deletions core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
Expand Up @@ -39,21 +39,24 @@ class ZippedWithIndexRDDPartition(val prev: Partition, val startIndex: Long)
private[spark]
class ZippedWithIndexRDD[T: ClassTag](@transient prev: RDD[T]) extends RDD[(T, Long)](prev) {

override def getPartitions: Array[Partition] = {
/** The start index of each partition. */
@transient private val startIndices: Array[Long] = {
val n = prev.partitions.size
val startIndices: Array[Long] =
if (n == 0) {
Array[Long]()
} else if (n == 1) {
Array(0L)
} else {
prev.context.runJob(
prev,
Utils.getIteratorSize _,
0 until n - 1, // do not need to count the last partition
false
).scanLeft(0L)(_ + _)
}
if (n == 0) {
Array[Long]()
} else if (n == 1) {
Array(0L)
} else {
prev.context.runJob(
prev,
Utils.getIteratorSize _,
0 until n - 1, // do not need to count the last partition
allowLocal = false
).scanLeft(0L)(_ + _)
}
}

override def getPartitions: Array[Partition] = {
firstParent[T].partitions.map(x => new ZippedWithIndexRDDPartition(x, startIndices(x.index)))
}

Expand Down
Expand Up @@ -148,6 +148,7 @@ private[spark] class TachyonBlockManager(
logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
}
}
client.close()
}
})
}
Expand Down
Expand Up @@ -116,6 +116,8 @@ private[spark] class TachyonStore(
case ioe: IOException =>
logWarning(s"Failed to fetch the block $blockId from Tachyon", ioe)
None
} finally {
is.close()
}
}

Expand Down

0 comments on commit b446edc

Please sign in to comment.