Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into SPARK-4180
Browse files Browse the repository at this point in the history
Conflicts:
	streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
  • Loading branch information
JoshRosen committed Nov 10, 2014
2 parents 7ba6db8 + bd86cb1 commit a1cba65
Show file tree
Hide file tree
Showing 126 changed files with 3,043 additions and 617 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ and Spark Streaming for stream processing.
## Online Documentation

You can find the latest Spark documentation, including a programming
guide, on the [project web page](http://spark.apache.org/documentation.html).
guide, on the [project web page](http://spark.apache.org/documentation.html)
and [project wiki](https://cwiki.apache.org/confluence/display/SPARK).
This README file only contains basic setup instructions.

## Building Spark
Expand Down
7 changes: 7 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,13 @@
<artifactId>derby</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.tachyonproject</groupId>
<artifactId>tachyon</artifactId>
<version>0.5.0</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.tachyonproject</groupId>
<artifactId>tachyon-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ $(function() {
var column = "table ." + $(this).attr("name");
$(column).hide();
});
// Stripe table rows after rows have been hidden to ensure correct striping.
stripeTables();

$("input:checkbox").click(function() {
var column = "table ." + $(this).attr("name");
Expand Down
5 changes: 0 additions & 5 deletions core/src/main/resources/org/apache/spark/ui/static/table.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,3 @@ function stripeTables() {
});
});
}

/* Stripe all tables after pages finish loading. */
$(function() {
stripeTables();
});
14 changes: 14 additions & 0 deletions core/src/main/resources/org/apache/spark/ui/static/webui.css
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,20 @@ pre {
border: none;
}

.stacktrace-details {
max-height: 300px;
overflow-y: auto;
margin: 0;
transition: max-height 0.5s ease-out, padding 0.5s ease-out;
}

.stacktrace-details.collapsed {
max-height: 0;
padding-top: 0;
padding-bottom: 0;
border: none;
}

span.expand-additional-metrics {
cursor: pointer;
}
Expand Down
14 changes: 3 additions & 11 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -343,15 +343,7 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging with
*/
def getSecretKey(): String = secretKey

override def getSaslUser(appId: String): String = {
val myAppId = sparkConf.getAppId
require(appId == myAppId, s"SASL appId $appId did not match my appId ${myAppId}")
getSaslUser()
}

override def getSecretKey(appId: String): String = {
val myAppId = sparkConf.getAppId
require(appId == myAppId, s"SASL appId $appId did not match my appId ${myAppId}")
getSecretKey()
}
// Default SecurityManager only has a single secret key, so ignore appId.
override def getSaslUser(appId: String): String = getSaslUser()
override def getSecretKey(appId: String): String = getSecretKey()
}
4 changes: 4 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,8 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {


/**
* :: Experimental ::
*
* Get an RDD for a Hadoop-readable dataset as PortableDataStream for each file
* (useful for binary data)
*
Expand Down Expand Up @@ -610,6 +612,8 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
}

/**
* :: Experimental ::
*
* Load data from a flat binary file, assuming the length of each record is constant.
*
* @param path Directory to the input data files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.hadoop.mapred._
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.HadoopRDD

/**
Expand Down
35 changes: 34 additions & 1 deletion core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,48 @@ case class FetchFailed(
* :: DeveloperApi ::
* Task failed due to a runtime exception. This is the most common failure case and also captures
* user program exceptions.
*
* `stackTrace` contains the stack trace of the exception itself. It still exists for backward
* compatibility. It's better to use `this(e: Throwable, metrics: Option[TaskMetrics])` to
* create `ExceptionFailure` as it will handle the backward compatibility properly.
*
* `fullStackTrace` is a better representation of the stack trace because it contains the whole
* stack trace including the exception and its causes
*/
@DeveloperApi
case class ExceptionFailure(
className: String,
description: String,
stackTrace: Array[StackTraceElement],
fullStackTrace: String,
metrics: Option[TaskMetrics])
extends TaskFailedReason {
override def toErrorString: String = Utils.exceptionString(className, description, stackTrace)

private[spark] def this(e: Throwable, metrics: Option[TaskMetrics]) {
this(e.getClass.getName, e.getMessage, e.getStackTrace, Utils.exceptionString(e), metrics)
}

override def toErrorString: String =
if (fullStackTrace == null) {
// fullStackTrace is added in 1.2.0
// If fullStackTrace is null, use the old error string for backward compatibility
exceptionString(className, description, stackTrace)
} else {
fullStackTrace
}

/**
* Return a nice string representation of the exception, including the stack trace.
* Note: It does not include the exception's causes, and is only used for backward compatibility.
*/
private def exceptionString(
className: String,
description: String,
stackTrace: Array[StackTraceElement]): String = {
val desc = if (description == null) "" else description
val st = if (stackTrace == null) "" else stackTrace.map(" " + _).mkString("\n")
s"$className: $desc\n$st"
}
}

/**
Expand Down
16 changes: 8 additions & 8 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -493,9 +493,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
}

/**
* Returns the top K elements from this RDD as defined by
* Returns the top k (largest) elements from this RDD as defined by
* the specified Comparator[T].
* @param num the number of top elements to return
* @param num k, the number of top elements to return
* @param comp the comparator that defines the order
* @return an array of top elements
*/
Expand All @@ -507,9 +507,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
}

/**
* Returns the top K elements from this RDD using the
* Returns the top k (largest) elements from this RDD using the
* natural ordering for T.
* @param num the number of top elements to return
* @param num k, the number of top elements to return
* @return an array of top elements
*/
def top(num: Int): JList[T] = {
Expand All @@ -518,9 +518,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
}

/**
* Returns the first K elements from this RDD as defined by
* Returns the first k (smallest) elements from this RDD as defined by
* the specified Comparator[T] and maintains the order.
* @param num the number of top elements to return
* @param num k, the number of elements to return
* @param comp the comparator that defines the order
* @return an array of top elements
*/
Expand Down Expand Up @@ -552,9 +552,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
}

/**
* Returns the first K elements from this RDD using the
* Returns the first k (smallest) elements from this RDD using the
* natural ordering for T while maintain the order.
* @param num the number of top elements to return
* @param num k, the number of top elements to return
* @return an array of top elements
*/
def takeOrdered(num: Int): JList[T] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,14 @@ import java.io.Closeable
import java.util
import java.util.{Map => JMap}

import java.io.DataInputStream

import org.apache.hadoop.io.{BytesWritable, LongWritable}
import org.apache.spark.input.{PortableDataStream, FixedLengthBinaryInputFormat}

import scala.collection.JavaConversions
import scala.collection.JavaConversions._
import scala.language.implicitConversions
import scala.reflect.ClassTag

import com.google.common.base.Optional
import org.apache.hadoop.conf.Configuration
import org.apache.spark.input.PortableDataStream
import org.apache.hadoop.mapred.{InputFormat, JobConf}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}

Expand Down Expand Up @@ -289,6 +285,8 @@ class JavaSparkContext(val sc: SparkContext)
new JavaPairRDD(sc.binaryFiles(path, minPartitions))

/**
* :: Experimental ::
*
* Read a directory of binary files from HDFS, a local file system (available on all nodes),
* or any Hadoop-supported file system URI as a byte array. Each file is read as a single
* record and returned in a key-value pair, where the key is the path of each file,
Expand All @@ -315,15 +313,19 @@ class JavaSparkContext(val sc: SparkContext)
*
* @note Small files are preferred; very large files but may cause bad performance.
*/
@Experimental
def binaryFiles(path: String): JavaPairRDD[String, PortableDataStream] =
new JavaPairRDD(sc.binaryFiles(path, defaultMinPartitions))

/**
* :: Experimental ::
*
* Load data from a flat binary file, assuming the length of each record is constant.
*
* @param path Directory to the input data files
* @return An RDD of data with values, represented as byte arrays
*/
@Experimental
def binaryRecords(path: String, recordLength: Int): JavaRDD[Array[Byte]] = {
new JavaRDD(sc.binaryRecords(path, recordLength))
}
Expand Down
45 changes: 29 additions & 16 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.io._
import java.net._
import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections}

import org.apache.spark.input.PortableDataStream

import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.language.existentials
Expand Down Expand Up @@ -395,22 +397,33 @@ private[spark] object PythonRDD extends Logging {
newIter.asInstanceOf[Iterator[String]].foreach { str =>
writeUTF(str, dataOut)
}
case pair: Tuple2[_, _] =>
pair._1 match {
case bytePair: Array[Byte] =>
newIter.asInstanceOf[Iterator[Tuple2[Array[Byte], Array[Byte]]]].foreach { pair =>
dataOut.writeInt(pair._1.length)
dataOut.write(pair._1)
dataOut.writeInt(pair._2.length)
dataOut.write(pair._2)
}
case stringPair: String =>
newIter.asInstanceOf[Iterator[Tuple2[String, String]]].foreach { pair =>
writeUTF(pair._1, dataOut)
writeUTF(pair._2, dataOut)
}
case other =>
throw new SparkException("Unexpected Tuple2 element type " + pair._1.getClass)
case stream: PortableDataStream =>
newIter.asInstanceOf[Iterator[PortableDataStream]].foreach { stream =>
val bytes = stream.toArray()
dataOut.writeInt(bytes.length)
dataOut.write(bytes)
}
case (key: String, stream: PortableDataStream) =>
newIter.asInstanceOf[Iterator[(String, PortableDataStream)]].foreach {
case (key, stream) =>
writeUTF(key, dataOut)
val bytes = stream.toArray()
dataOut.writeInt(bytes.length)
dataOut.write(bytes)
}
case (key: String, value: String) =>
newIter.asInstanceOf[Iterator[(String, String)]].foreach {
case (key, value) =>
writeUTF(key, dataOut)
writeUTF(value, dataOut)
}
case (key: Array[Byte], value: Array[Byte]) =>
newIter.asInstanceOf[Iterator[(Array[Byte], Array[Byte])]].foreach {
case (key, value) =>
dataOut.writeInt(key.length)
dataOut.write(key)
dataOut.writeInt(value.length)
dataOut.write(value)
}
case other =>
throw new SparkException("Unexpected element type " + first.getClass)
Expand Down
46 changes: 39 additions & 7 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.deploy

import java.lang.reflect.Method
import java.security.PrivilegedExceptionAction

import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -133,14 +134,9 @@ class SparkHadoopUtil extends Logging {
*/
private[spark] def getFSBytesReadOnThreadCallback(path: Path, conf: Configuration)
: Option[() => Long] = {
val qualifiedPath = path.getFileSystem(conf).makeQualified(path)
val scheme = qualifiedPath.toUri().getScheme()
val stats = FileSystem.getAllStatistics().filter(_.getScheme().equals(scheme))
try {
val threadStats = stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
val statisticsDataClass =
Class.forName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData")
val getBytesReadMethod = statisticsDataClass.getDeclaredMethod("getBytesRead")
val threadStats = getFileSystemThreadStatistics(path, conf)
val getBytesReadMethod = getFileSystemThreadStatisticsMethod("getBytesRead")
val f = () => threadStats.map(getBytesReadMethod.invoke(_).asInstanceOf[Long]).sum
val baselineBytesRead = f()
Some(() => f() - baselineBytesRead)
Expand All @@ -151,6 +147,42 @@ class SparkHadoopUtil extends Logging {
}
}
}

/**
* Returns a function that can be called to find Hadoop FileSystem bytes written. If
* getFSBytesWrittenOnThreadCallback is called from thread r at time t, the returned callback will
* return the bytes written on r since t. Reflection is required because thread-level FileSystem
* statistics are only available as of Hadoop 2.5 (see HADOOP-10688).
* Returns None if the required method can't be found.
*/
private[spark] def getFSBytesWrittenOnThreadCallback(path: Path, conf: Configuration)
: Option[() => Long] = {
try {
val threadStats = getFileSystemThreadStatistics(path, conf)
val getBytesWrittenMethod = getFileSystemThreadStatisticsMethod("getBytesWritten")
val f = () => threadStats.map(getBytesWrittenMethod.invoke(_).asInstanceOf[Long]).sum
val baselineBytesWritten = f()
Some(() => f() - baselineBytesWritten)
} catch {
case e: NoSuchMethodException => {
logDebug("Couldn't find method for retrieving thread-level FileSystem output data", e)
None
}
}
}

private def getFileSystemThreadStatistics(path: Path, conf: Configuration): Seq[AnyRef] = {
val qualifiedPath = path.getFileSystem(conf).makeQualified(path)
val scheme = qualifiedPath.toUri().getScheme()
val stats = FileSystem.getAllStatistics().filter(_.getScheme().equals(scheme))
stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
}

private def getFileSystemThreadStatisticsMethod(methodName: String): Method = {
val statisticsDataClass =
Class.forName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData")
statisticsDataClass.getDeclaredMethod(methodName)
}
}

object SparkHadoopUtil {
Expand Down
Loading

0 comments on commit a1cba65

Please sign in to comment.