Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
Browse files Browse the repository at this point in the history
Conflicts:
	project/SparkBuild.scala
  • Loading branch information
MLnick committed Jan 20, 2014
2 parents 1c8efbc + 792d908 commit 619c0fa
Show file tree
Hide file tree
Showing 242 changed files with 10,345 additions and 1,934 deletions.
116 changes: 115 additions & 1 deletion assembly/pom.xml
Expand Up @@ -30,6 +30,13 @@
<name>Spark Project Assembly</name>
<url>http://spark.incubator.apache.org/</url>

<properties>
<spark.jar>${project.build.directory}/scala-${scala.binary.version}/${project.artifactId}-${project.version}-hadoop${hadoop.version}.jar</spark.jar>
<deb.pkg.name>spark</deb.pkg.name>
<deb.install.path>/usr/share/spark</deb.install.path>
<deb.user>root</deb.user>
</properties>

<repositories>
<!-- A repository in the local filesystem for the Py4J JAR, which is not in Maven central -->
<repository>
Expand Down Expand Up @@ -79,7 +86,7 @@
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<outputFile>${project.build.directory}/scala-${scala.binary.version}/${project.artifactId}-${project.version}-hadoop${hadoop.version}.jar</outputFile>
<outputFile>${spark.jar}</outputFile>
<artifactSet>
<includes>
<include>*:*</include>
Expand Down Expand Up @@ -171,5 +178,112 @@
</plugins>
</build>
</profile>
<profile>
<id>deb</id>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>buildnumber-maven-plugin</artifactId>
<version>1.1</version>
<executions>
<execution>
<phase>validate</phase>
<goals>
<goal>create</goal>
</goals>
<configuration>
<shortRevisionLength>8</shortRevisionLength>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.vafer</groupId>
<artifactId>jdeb</artifactId>
<version>0.11</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>jdeb</goal>
</goals>
<configuration>
<deb>${project.build.directory}/${deb.pkg.name}_${project.version}-${buildNumber}_all.deb</deb>
<attach>false</attach>
<compression>gzip</compression>
<dataSet>
<data>
<src>${spark.jar}</src>
<type>file</type>
<mapper>
<type>perm</type>
<user>${deb.user}</user>
<group>${deb.user}</group>
<prefix>${deb.install.path}/jars</prefix>
</mapper>
</data>
<data>
<src>${basedir}/src/deb/RELEASE</src>
<type>file</type>
<mapper>
<type>perm</type>
<user>${deb.user}</user>
<group>${deb.user}</group>
<prefix>${deb.install.path}</prefix>
</mapper>
</data>
<data>
<src>${basedir}/../conf</src>
<type>directory</type>
<mapper>
<type>perm</type>
<user>${deb.user}</user>
<group>${deb.user}</group>
<prefix>${deb.install.path}/conf</prefix>
<filemode>744</filemode>
</mapper>
</data>
<data>
<src>${basedir}/../bin</src>
<type>directory</type>
<mapper>
<type>perm</type>
<user>${deb.user}</user>
<group>${deb.user}</group>
<prefix>${deb.install.path}/bin</prefix>
<filemode>744</filemode>
</mapper>
</data>
<data>
<src>${basedir}/../sbin</src>
<type>directory</type>
<mapper>
<type>perm</type>
<user>${deb.user}</user>
<group>${deb.user}</group>
<prefix>${deb.install.path}/sbin</prefix>
<filemode>744</filemode>
</mapper>
</data>
<data>
<src>${basedir}/../python</src>
<type>directory</type>
<mapper>
<type>perm</type>
<user>${deb.user}</user>
<group>${deb.user}</group>
<prefix>${deb.install.path}/python</prefix>
<filemode>744</filemode>
</mapper>
</data>
</dataSet>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
2 changes: 2 additions & 0 deletions assembly/src/deb/RELEASE
@@ -0,0 +1,2 @@
compute-classpath.sh uses the existence of this file to decide whether to put the assembly jar on the
classpath or instead to use classfiles in the source tree.
File renamed without changes.
2 changes: 2 additions & 0 deletions bin/compute-classpath.sh
Expand Up @@ -39,6 +39,7 @@ if [ -f "$FWDIR"/assembly/target/scala-$SCALA_VERSION/spark-assembly*hadoop*-dep
CLASSPATH="$CLASSPATH:$FWDIR/repl/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/mllib/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/bagel/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/graphx/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/streaming/target/scala-$SCALA_VERSION/classes"

DEPS_ASSEMBLY_JAR=`ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/spark-assembly*hadoop*-deps.jar`
Expand All @@ -59,6 +60,7 @@ if [[ $SPARK_TESTING == 1 ]]; then
CLASSPATH="$CLASSPATH:$FWDIR/repl/target/scala-$SCALA_VERSION/test-classes"
CLASSPATH="$CLASSPATH:$FWDIR/mllib/target/scala-$SCALA_VERSION/test-classes"
CLASSPATH="$CLASSPATH:$FWDIR/bagel/target/scala-$SCALA_VERSION/test-classes"
CLASSPATH="$CLASSPATH:$FWDIR/graphx/target/scala-$SCALA_VERSION/test-classes"
CLASSPATH="$CLASSPATH:$FWDIR/streaming/target/scala-$SCALA_VERSION/test-classes"
fi

Expand Down
2 changes: 1 addition & 1 deletion bin/spark-class2.cmd 100644 → 100755
Expand Up @@ -73,7 +73,7 @@ for %%d in ("%TOOLS_DIR%\target\scala-%SCALA_VERSION%\spark-tools*assembly*.jar"

rem Compute classpath using external script
set DONT_PRINT_CLASSPATH=1
call "%FWDIR%sbin\compute-classpath.cmd"
call "%FWDIR%bin\compute-classpath.cmd"
set DONT_PRINT_CLASSPATH=0
set CLASSPATH=%CLASSPATH%;%SPARK_TOOLS_JAR%

Expand Down
4 changes: 2 additions & 2 deletions bin/spark-shell.cmd 100644 → 100755
Expand Up @@ -18,6 +18,6 @@ rem limitations under the License.
rem

rem Find the path of sbin
set SBIN=%~dp0..\sbin\
set BIN=%~dp0..\bin\

cmd /V /E /C %SBIN%spark-class2.cmd org.apache.spark.repl.Main %*
cmd /V /E /C %BIN%spark-class2.cmd org.apache.spark.repl.Main %*
42 changes: 32 additions & 10 deletions core/src/main/scala/org/apache/spark/Accumulators.scala
Expand Up @@ -17,17 +17,17 @@

package org.apache.spark

import java.io._
import java.io.{ObjectInputStream, Serializable}

import scala.collection.mutable.Map
import scala.collection.generic.Growable
import org.apache.spark.serializer.JavaSerializer

/**
* A datatype that can be accumulated, i.e. has an commutative and associative "add" operation,
* A data type that can be accumulated, ie has an commutative and associative "add" operation,
* but where the result type, `R`, may be different from the element type being added, `T`.
*
* You must define how to add data, and how to merge two of these together. For some datatypes,
* You must define how to add data, and how to merge two of these together. For some data types,
* such as a counter, these might be the same operation. In that case, you can use the simpler
* [[org.apache.spark.Accumulator]]. They won't always be the same, though -- e.g., imagine you are
* accumulating a set. You will add items to the set, and you will union two sets together.
Expand All @@ -45,7 +45,7 @@ class Accumulable[R, T] (
val id = Accumulators.newId
@transient private var value_ = initialValue // Current value on master
val zero = param.zero(initialValue) // Zero value to be passed to workers
var deserialized = false
private var deserialized = false

Accumulators.register(this, true)

Expand Down Expand Up @@ -127,7 +127,7 @@ class Accumulable[R, T] (

/**
* Helper object defining how to accumulate values of a particular type. An implicit
* AccumulableParam needs to be available when you create Accumulables of a specific type.
* AccumulableParam needs to be available when you create [[Accumulable]]s of a specific type.
*
* @tparam R the full accumulated data (result type)
* @tparam T partial data that can be added in
Expand Down Expand Up @@ -185,8 +185,30 @@ class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Ser
}

/**
* A simpler value of [[org.apache.spark.Accumulable]] where the result type being accumulated is the same
* as the types of elements being merged.
* A simpler value of [[Accumulable]] where the result type being accumulated is the same
* as the types of elements being merged, i.e. variables that are only "added" to through an
* associative operation and can therefore be efficiently supported in parallel. They can be used
* to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of type
* `Int` and `Double`, and programmers can add support for new types.
*
* An accumulator is created from an initial value `v` by calling [[SparkContext#accumulator]].
* Tasks running on the cluster can then add to it using the [[Accumulable#+=]] operator.
* However, they cannot read its value. Only the driver program can read the accumulator's value,
* using its value method.
*
* The interpreter session below shows an accumulator being used to add up the elements of an array:
*
* {{{
* scala> val accum = sc.accumulator(0)
* accum: spark.Accumulator[Int] = 0
*
* scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
* ...
* 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
*
* scala> accum.value
* res2: Int = 10
* }}}
*
* @param initialValue initial value of accumulator
* @param param helper object defining how to add elements of type `T`
Expand All @@ -196,9 +218,9 @@ class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T])
extends Accumulable[T,T](initialValue, param)

/**
* A simpler version of [[org.apache.spark.AccumulableParam]] where the only datatype you can add in is the same type
* as the accumulated value. An implicit AccumulatorParam object needs to be available when you create
* Accumulators of a specific type.
* A simpler version of [[org.apache.spark.AccumulableParam]] where the only data type you can add
* in is the same type as the accumulated value. An implicit AccumulatorParam object needs to be
* available when you create Accumulators of a specific type.
*
* @tparam T type of value to accumulate
*/
Expand Down
27 changes: 21 additions & 6 deletions core/src/main/scala/org/apache/spark/Aggregator.scala
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark

import scala.{Option, deprecated}

import org.apache.spark.util.collection.{AppendOnlyMap, ExternalAppendOnlyMap}

/**
Expand All @@ -31,10 +33,14 @@ case class Aggregator[K, V, C] (
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C) {

private val sparkConf = SparkEnv.get.conf
private val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true)
private val externalSorting = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true)

@deprecated("use combineValuesByKey with TaskContext argument", "0.9.0")
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]): Iterator[(K, C)] =
combineValuesByKey(iter, null)

def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = {
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]],
context: TaskContext): Iterator[(K, C)] = {
if (!externalSorting) {
val combiners = new AppendOnlyMap[K,C]
var kv: Product2[K, V] = null
Expand All @@ -47,17 +53,23 @@ case class Aggregator[K, V, C] (
}
combiners.iterator
} else {
val combiners =
new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
while (iter.hasNext) {
val (k, v) = iter.next()
combiners.insert(k, v)
}
// TODO: Make this non optional in a future release
Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)
Option(context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled)
combiners.iterator
}
}

def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = {
@deprecated("use combineCombinersByKey with TaskContext argument", "0.9.0")
def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] =
combineCombinersByKey(iter, null)

def combineCombinersByKey(iter: Iterator[(K, C)], context: TaskContext) : Iterator[(K, C)] = {
if (!externalSorting) {
val combiners = new AppendOnlyMap[K,C]
var kc: Product2[K, C] = null
Expand All @@ -75,6 +87,9 @@ case class Aggregator[K, V, C] (
val (k, c) = iter.next()
combiners.insert(k, c)
}
// TODO: Make this non optional in a future release
Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)
Option(context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled)
combiners.iterator
}
}
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/FutureAction.scala
Expand Up @@ -27,8 +27,8 @@ import org.apache.spark.rdd.RDD


/**
* A future for the result of an action. This is an extension of the Scala Future interface to
* support cancellation.
* A future for the result of an action to support cancellation. This is an extension of the
* Scala Future interface to support cancellation.
*/
trait FutureAction[T] extends Future[T] {
// Note that we redefine methods of the Future trait here explicitly so we can specify a different
Expand Down Expand Up @@ -86,7 +86,7 @@ trait FutureAction[T] extends Future[T] {


/**
* The future holding the result of an action that triggers a single job. Examples include
* A [[FutureAction]] holding the result of an action that triggers a single job. Examples include
* count, collect, reduce.
*/
class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: => T)
Expand Down Expand Up @@ -150,7 +150,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:


/**
* A FutureAction for actions that could trigger multiple Spark jobs. Examples include take,
* A [[FutureAction]] for actions that could trigger multiple Spark jobs. Examples include take,
* takeSample. Cancellation works by setting the cancelled flag to true and interrupting the
* action thread if it is being blocked by a job.
*/
Expand Down
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark

/**
* An iterator that wraps around an existing iterator to provide task killing functionality.
* It works by checking the interrupted flag in TaskContext.
* It works by checking the interrupted flag in [[TaskContext]].
*/
class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T])
extends Iterator[T] {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/Logging.scala
Expand Up @@ -122,7 +122,7 @@ trait Logging {
}
}

object Logging {
private object Logging {
@volatile private var initialized = false
val initLock = new Object()
}

0 comments on commit 619c0fa

Please sign in to comment.