Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into namedtuple
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Aug 2, 2014
2 parents f7b1bde + c281189 commit 98df6c6
Show file tree
Hide file tree
Showing 344 changed files with 14,600 additions and 2,721 deletions.
1 change: 1 addition & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,4 @@ dist/*
.*ipr
.*iws
logs
.*scalastyle-output.xml
4 changes: 2 additions & 2 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.


========================================================================
For Py4J (python/lib/py4j0.7.egg and files in assembly/lib/net/sf/py4j):
For Py4J (python/lib/py4j-0.8.2.1-src.zip)
========================================================================

Copyright (c) 2009-2011, Barthelemy Dagenais All rights reserved.
Expand Down Expand Up @@ -532,7 +532,7 @@ The following components are provided under a BSD-style license. See project lin
(New BSD license) Protocol Buffer Java API (org.spark-project.protobuf:protobuf-java:2.4.1-shaded - http://code.google.com/p/protobuf)
(The BSD License) Fortran to Java ARPACK (net.sourceforge.f2j:arpack_combined_all:0.1 - http://f2j.sourceforge.net)
(The BSD License) xmlenc Library (xmlenc:xmlenc:0.52 - http://xmlenc.sourceforge.net)
(The New BSD License) Py4J (net.sf.py4j:py4j:0.8.1 - http://py4j.sourceforge.net/)
(The New BSD License) Py4J (net.sf.py4j:py4j:0.8.2.1 - http://py4j.sourceforge.net/)
(Two-clause BSD-style license) JUnit-Interface (com.novocode:junit-interface:0.10 - http://github.com/szeiger/junit-interface/)
(ISC/BSD License) jbcrypt (org.mindrot:jbcrypt:0.3m - http://www.mindrot.org/)

Expand Down
2 changes: 1 addition & 1 deletion bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ export PYSPARK_PYTHON

# Add the PySpark classes to the Python path:
export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH
export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH
export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH

# Load the PySpark shell.py script when ./pyspark is used interactively:
export OLD_PYTHONSTARTUP=$PYTHONSTARTUP
Expand Down
2 changes: 1 addition & 1 deletion bin/pyspark2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ rem Figure out which Python to use.
if [%PYSPARK_PYTHON%] == [] set PYSPARK_PYTHON=python

set PYTHONPATH=%FWDIR%python;%PYTHONPATH%
set PYTHONPATH=%FWDIR%python\lib\py4j-0.8.1-src.zip;%PYTHONPATH%
set PYTHONPATH=%FWDIR%python\lib\py4j-0.8.2.1-src.zip;%PYTHONPATH%

set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
set PYTHONSTARTUP=%FWDIR%python\pyspark\shell.py
Expand Down
11 changes: 8 additions & 3 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@
</dependency>
<dependency>
<groupId>org.tachyonproject</groupId>
<artifactId>tachyon</artifactId>
<version>0.4.1-thrift</version>
<artifactId>tachyon-client</artifactId>
<version>0.5.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
Expand Down Expand Up @@ -262,6 +262,11 @@
<artifactId>asm</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.novocode</groupId>
<artifactId>junit-interface</artifactId>
Expand All @@ -275,7 +280,7 @@
<dependency>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
<version>0.8.1</version>
<version>0.8.2.1</version>
</dependency>
</dependencies>
<build>
Expand Down
24 changes: 16 additions & 8 deletions core/src/main/scala/org/apache/spark/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,23 @@ case class Aggregator[K, V, C] (
} else {
val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
combiners.insertAll(iter)
// TODO: Make this non optional in a future release
Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)
Option(context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled)
// Update task metrics if context is not null
// TODO: Make context non optional in a future release
Option(context).foreach { c =>
c.taskMetrics.memoryBytesSpilled += combiners.memoryBytesSpilled
c.taskMetrics.diskBytesSpilled += combiners.diskBytesSpilled
}
combiners.iterator
}
}

@deprecated("use combineCombinersByKey with TaskContext argument", "0.9.0")
def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] =
def combineCombinersByKey(iter: Iterator[_ <: Product2[K, C]]) : Iterator[(K, C)] =
combineCombinersByKey(iter, null)

def combineCombinersByKey(iter: Iterator[(K, C)], context: TaskContext) : Iterator[(K, C)] = {
def combineCombinersByKey(iter: Iterator[_ <: Product2[K, C]], context: TaskContext)
: Iterator[(K, C)] =
{
if (!externalSorting) {
val combiners = new AppendOnlyMap[K,C]
var kc: Product2[K, C] = null
Expand All @@ -85,9 +90,12 @@ case class Aggregator[K, V, C] (
val pair = iter.next()
combiners.insert(pair._1, pair._2)
}
// TODO: Make this non optional in a future release
Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)
Option(context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled)
// Update task metrics if context is not null
// TODO: Make context non-optional in a future release
Option(context).foreach { c =>
c.taskMetrics.memoryBytesSpilled += combiners.memoryBytesSpilled
c.taskMetrics.diskBytesSpilled += combiners.diskBytesSpilled
}
combiners.iterator
}
}
Expand Down
32 changes: 20 additions & 12 deletions core/src/main/scala/org/apache/spark/Dependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,50 +27,58 @@ import org.apache.spark.shuffle.ShuffleHandle
* Base class for dependencies.
*/
@DeveloperApi
abstract class Dependency[T](val rdd: RDD[T]) extends Serializable
abstract class Dependency[T] extends Serializable {
def rdd: RDD[T]
}


/**
* :: DeveloperApi ::
* Base class for dependencies where each partition of the parent RDD is used by at most one
* partition of the child RDD. Narrow dependencies allow for pipelined execution.
* Base class for dependencies where each partition of the child RDD depends on a small number
* of partitions of the parent RDD. Narrow dependencies allow for pipelined execution.
*/
@DeveloperApi
abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) {
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
/**
* Get the parent partitions for a child partition.
* @param partitionId a partition of the child RDD
* @return the partitions of the parent RDD that the child partition depends upon
*/
def getParents(partitionId: Int): Seq[Int]

override def rdd: RDD[T] = _rdd
}


/**
* :: DeveloperApi ::
* Represents a dependency on the output of a shuffle stage.
* @param rdd the parent RDD
* Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle,
* the RDD is transient since we don't need it on the executor side.
*
* @param _rdd the parent RDD
* @param partitioner partitioner used to partition the shuffle output
* @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If set to None,
* the default serializer, as specified by `spark.serializer` config option, will
* be used.
*/
@DeveloperApi
class ShuffleDependency[K, V, C](
@transient rdd: RDD[_ <: Product2[K, V]],
@transient _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Option[Serializer] = None,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false)
extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {
extends Dependency[Product2[K, V]] {

override def rdd = _rdd.asInstanceOf[RDD[Product2[K, V]]]

val shuffleId: Int = rdd.context.newShuffleId()
val shuffleId: Int = _rdd.context.newShuffleId()

val shuffleHandle: ShuffleHandle = rdd.context.env.shuffleManager.registerShuffle(
shuffleId, rdd.partitions.size, this)
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.size, this)

rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}


Expand Down
46 changes: 46 additions & 0 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import akka.actor.Actor
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.scheduler.TaskScheduler

/**
* A heartbeat from executors to the driver. This is a shared message used by several internal
* components to convey liveness or execution information for in-progress tasks.
*/
private[spark] case class Heartbeat(
executorId: String,
taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
blockManagerId: BlockManagerId)

private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)

/**
* Lives in the driver to receive heartbeats from executors..
*/
private[spark] class HeartbeatReceiver(scheduler: TaskScheduler) extends Actor {
override def receive = {
case Heartbeat(executorId, taskMetrics, blockManagerId) =>
val response = HeartbeatResponse(
!scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId))
sender ! response
}
}
23 changes: 12 additions & 11 deletions core/src/main/scala/org/apache/spark/Logging.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,7 @@ trait Logging {
initializeIfNecessary()
var className = this.getClass.getName
// Ignore trailing $'s in the class names for Scala objects
if (className.endsWith("$")) {
className = className.substring(0, className.length - 1)
}
log_ = LoggerFactory.getLogger(className)
log_ = LoggerFactory.getLogger(className.stripSuffix("$"))
}
log_
}
Expand Down Expand Up @@ -110,23 +107,27 @@ trait Logging {
}

private def initializeLogging() {
// If Log4j is being used, but is not initialized, load a default properties file
val binder = StaticLoggerBinder.getSingleton
val usingLog4j = binder.getLoggerFactoryClassStr.endsWith("Log4jLoggerFactory")
val log4jInitialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements
if (!log4jInitialized && usingLog4j) {
// Don't use a logger in here, as this is itself occurring during initialization of a logger
// If Log4j 1.2 is being used, but is not initialized, load a default properties file
val binderClass = StaticLoggerBinder.getSingleton.getLoggerFactoryClassStr
// This distinguishes the log4j 1.2 binding, currently
// org.slf4j.impl.Log4jLoggerFactory, from the log4j 2.0 binding, currently
// org.apache.logging.slf4j.Log4jLoggerFactory
val usingLog4j12 = "org.slf4j.impl.Log4jLoggerFactory".equals(binderClass)
val log4j12Initialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements
if (!log4j12Initialized && usingLog4j12) {
val defaultLogProps = "org/apache/spark/log4j-defaults.properties"
Option(Utils.getSparkClassLoader.getResource(defaultLogProps)) match {
case Some(url) =>
PropertyConfigurator.configure(url)
log.info(s"Using Spark's default log4j profile: $defaultLogProps")
System.err.println(s"Using Spark's default log4j profile: $defaultLogProps")
case None =>
System.err.println(s"Spark was unable to load $defaultLogProps")
}
}
Logging.initialized = true

// Force a call into slf4j to initialize it. Avoids this happening from mutliple threads
// Force a call into slf4j to initialize it. Avoids this happening from multiple threads
// and triggering this: http://mailman.qos.ch/pipermail/slf4j-dev/2010-April/002956.html
log
}
Expand Down
Loading

0 comments on commit 98df6c6

Please sign in to comment.