Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/spark into master_…
Browse files Browse the repository at this point in the history
…nravi

Conflicts:
	yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
  • Loading branch information
nishkamravi2 committed Sep 25, 2014
2 parents f00fa31 + 74fb2ec commit c726bd9
Show file tree
Hide file tree
Showing 63 changed files with 1,711 additions and 823 deletions.
4 changes: 3 additions & 1 deletion assembly/pom.xml
Expand Up @@ -141,7 +141,9 @@
<include>com.google.common.**</include>
</includes>
<excludes>
<exclude>com.google.common.base.Optional**</exclude>
<exclude>com/google/common/base/Absent*</exclude>
<exclude>com/google/common/base/Optional*</exclude>
<exclude>com/google/common/base/Present*</exclude>
</excludes>
</relocation>
</relocations>
Expand Down
2 changes: 2 additions & 0 deletions core/pom.xml
Expand Up @@ -343,7 +343,9 @@
<filter>
<artifact>com.google.guava:guava</artifact>
<includes>
<include>com/google/common/base/Absent*</include>
<include>com/google/common/base/Optional*</include>
<include>com/google/common/base/Present*</include>
</includes>
</filter>
</filters>
Expand Down
32 changes: 22 additions & 10 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Expand Up @@ -1030,28 +1030,40 @@ class SparkContext(config: SparkConf) extends Logging {
}

/**
* Support function for API backtraces.
* Set the thread-local property for overriding the call sites
* of actions and RDDs.
*/
def setCallSite(site: String) {
setLocalProperty("externalCallSite", site)
def setCallSite(shortCallSite: String) {
setLocalProperty(CallSite.SHORT_FORM, shortCallSite)
}

/**
* Support function for API backtraces.
* Set the thread-local property for overriding the call sites
* of actions and RDDs.
*/
private[spark] def setCallSite(callSite: CallSite) {
setLocalProperty(CallSite.SHORT_FORM, callSite.shortForm)
setLocalProperty(CallSite.LONG_FORM, callSite.longForm)
}

/**
* Clear the thread-local property for overriding the call sites
* of actions and RDDs.
*/
def clearCallSite() {
setLocalProperty("externalCallSite", null)
setLocalProperty(CallSite.SHORT_FORM, null)
setLocalProperty(CallSite.LONG_FORM, null)
}

/**
* Capture the current user callsite and return a formatted version for printing. If the user
* has overridden the call site, this will return the user's version.
* has overridden the call site using `setCallSite()`, this will return the user's version.
*/
private[spark] def getCallSite(): CallSite = {
Option(getLocalProperty("externalCallSite")) match {
case Some(callSite) => CallSite(callSite, longForm = "")
case None => Utils.getCallSite
}
Option(getLocalProperty(CallSite.SHORT_FORM)).map { case shortCallSite =>
val longCallSite = Option(getLocalProperty(CallSite.LONG_FORM)).getOrElse("")
CallSite(shortCallSite, longCallSite)
}.getOrElse(Utils.getCallSite())
}

/**
Expand Down
Expand Up @@ -75,6 +75,10 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
defaultProperties
}

// Respect SPARK_*_MEMORY for cluster mode
driverMemory = sys.env.get("SPARK_DRIVER_MEMORY").orNull
executorMemory = sys.env.get("SPARK_EXECUTOR_MEMORY").orNull

parseOpts(args.toList)
mergeSparkProperties()
checkRequiredArguments()
Expand Down
16 changes: 11 additions & 5 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Expand Up @@ -24,6 +24,7 @@ import java.util.concurrent._

import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.util.control.NonFatal

import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
Expand Down Expand Up @@ -375,12 +376,17 @@ private[spark] class Executor(
}

val message = Heartbeat(executorId, tasksMetrics.toArray, env.blockManager.blockManagerId)
val response = AkkaUtils.askWithReply[HeartbeatResponse](message, heartbeatReceiverRef,
retryAttempts, retryIntervalMs, timeout)
if (response.reregisterBlockManager) {
logWarning("Told to re-register on heartbeat")
env.blockManager.reregister()
try {
val response = AkkaUtils.askWithReply[HeartbeatResponse](message, heartbeatReceiverRef,
retryAttempts, retryIntervalMs, timeout)
if (response.reregisterBlockManager) {
logWarning("Told to re-register on heartbeat")
env.blockManager.reregister()
}
} catch {
case NonFatal(t) => logWarning("Issue communicating with driver in heartbeater", t)
}

Thread.sleep(interval)
}
}
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.rdd

import java.util.Random
import java.util.{Properties, Random}

import scala.collection.{mutable, Map}
import scala.collection.mutable.ArrayBuffer
Expand All @@ -41,7 +41,7 @@ import org.apache.spark.partial.CountEvaluator
import org.apache.spark.partial.GroupedCountEvaluator
import org.apache.spark.partial.PartialResult
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.{BoundedPriorityQueue, Utils}
import org.apache.spark.util.{BoundedPriorityQueue, Utils, CallSite}
import org.apache.spark.util.collection.OpenHashMap
import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, SamplingUtils}

Expand Down Expand Up @@ -1224,7 +1224,8 @@ abstract class RDD[T: ClassTag](
private var storageLevel: StorageLevel = StorageLevel.NONE

/** User code that created this RDD (e.g. `textFile`, `parallelize`). */
@transient private[spark] val creationSite = Utils.getCallSite
@transient private[spark] val creationSite = sc.getCallSite()

private[spark] def getCreationSite: String = Option(creationSite).map(_.shortForm).getOrElse("")

private[spark] def elementClassTag: ClassTag[T] = classTag[T]
Expand Down
27 changes: 20 additions & 7 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Expand Up @@ -49,6 +49,11 @@ import org.apache.spark.serializer.{DeserializationStream, SerializationStream,
/** CallSite represents a place in user code. It can have a short and a long form. */
private[spark] case class CallSite(shortForm: String, longForm: String)

private[spark] object CallSite {
val SHORT_FORM = "callSite.short"
val LONG_FORM = "callSite.long"
}

/**
* Various utility methods used by Spark.
*/
Expand Down Expand Up @@ -859,18 +864,26 @@ private[spark] object Utils extends Logging {
}
}

/**
* A regular expression to match classes of the "core" Spark API that we want to skip when
* finding the call site of a method.
*/
private val SPARK_CLASS_REGEX = """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?\.[A-Z]""".r
/** Default filtering function for finding call sites using `getCallSite`. */
private def coreExclusionFunction(className: String): Boolean = {
// A regular expression to match classes of the "core" Spark API that we want to skip when
// finding the call site of a method.
val SPARK_CORE_CLASS_REGEX = """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?\.[A-Z]""".r
val SCALA_CLASS_REGEX = """^scala""".r
val isSparkCoreClass = SPARK_CORE_CLASS_REGEX.findFirstIn(className).isDefined
val isScalaClass = SCALA_CLASS_REGEX.findFirstIn(className).isDefined
// If the class is a Spark internal class or a Scala class, then exclude.
isSparkCoreClass || isScalaClass
}

/**
* When called inside a class in the spark package, returns the name of the user code class
* (outside the spark package) that called into Spark, as well as which Spark method they called.
* This is used, for example, to tell users where in their code each RDD got created.
*
* @param skipClass Function that is used to exclude non-user-code classes.
*/
def getCallSite: CallSite = {
def getCallSite(skipClass: String => Boolean = coreExclusionFunction): CallSite = {
val trace = Thread.currentThread.getStackTrace()
.filterNot { ste:StackTraceElement =>
// When running under some profilers, the current stack trace might contain some bogus
Expand All @@ -891,7 +904,7 @@ private[spark] object Utils extends Logging {

for (el <- trace) {
if (insideSpark) {
if (SPARK_CLASS_REGEX.findFirstIn(el.getClassName).isDefined) {
if (skipClass(el.getClassName)) {
lastSparkMethod = if (el.getMethodName == "<init>") {
// Spark method is a constructor; get its class name
el.getClassName.substring(el.getClassName.lastIndexOf('.') + 1)
Expand Down
26 changes: 26 additions & 0 deletions core/src/test/java/org/apache/spark/JavaAPISuite.java
Expand Up @@ -1307,4 +1307,30 @@ public void collectUnderlyingScalaRDD() {
SomeCustomClass[] collected = (SomeCustomClass[]) rdd.rdd().retag(SomeCustomClass.class).collect();
Assert.assertEquals(data.size(), collected.length);
}

/**
* Test for SPARK-3647. This test needs to use the maven-built assembly to trigger the issue,
* since that's the only artifact where Guava classes have been relocated.
*/
@Test
public void testGuavaOptional() {
// Stop the context created in setUp() and start a local-cluster one, to force usage of the
// assembly.
sc.stop();
JavaSparkContext localCluster = new JavaSparkContext("local-cluster[1,1,512]", "JavaAPISuite");
try {
JavaRDD<Integer> rdd1 = localCluster.parallelize(Arrays.asList(1, 2, null), 3);
JavaRDD<Optional<Integer>> rdd2 = rdd1.map(
new Function<Integer, Optional<Integer>>() {
@Override
public Optional<Integer> call(Integer i) {
return Optional.fromNullable(i);
}
});
rdd2.collect();
} finally {
localCluster.stop();
}
}

}
4 changes: 2 additions & 2 deletions dev/run-tests-jenkins
Expand Up @@ -92,13 +92,13 @@ function post_message () {
merge_note=" * This patch merges cleanly."

source_files=$(
git diff master --name-only \
git diff master... --name-only `# diff patch against master from branch point` \
| grep -v -e "\/test" `# ignore files in test directories` \
| grep -e "\.py$" -e "\.java$" -e "\.scala$" `# include only code files` \
| tr "\n" " "
)
new_public_classes=$(
git diff master ${source_files} `# diff this patch against master and...` \
git diff master... ${source_files} `# diff patch against master from branch point` \
| grep "^\+" `# filter in only added lines` \
| sed -r -e "s/^\+//g" `# remove the leading +` \
| grep -e "trait " -e "class " `# filter in lines with these key words` \
Expand Down
2 changes: 1 addition & 1 deletion docs/mllib-feature-extraction.md
Expand Up @@ -68,7 +68,7 @@ val sc: SparkContext = ...
val documents: RDD[Seq[String]] = sc.textFile("...").map(_.split(" ").toSeq)

val hashingTF = new HashingTF()
val tf: RDD[Vector] = hasingTF.transform(documents)
val tf: RDD[Vector] = hashingTF.transform(documents)
{% endhighlight %}

While applying `HashingTF` only needs a single pass to the data, applying `IDF` needs two passes:
Expand Down
2 changes: 2 additions & 0 deletions docs/running-on-yarn.md
Expand Up @@ -205,6 +205,8 @@ Note that for the first option, both executors and the application master will s
log4j configuration, which may cause issues when they run on the same node (e.g. trying to write
to the same log file).

If you need a reference to the proper location to put log files in the YARN so that YARN can properly display and aggregate them, use "${spark.yarn.app.container.log.dir}" in your log4j.properties. For example, log4j.appender.file_appender.File=${spark.yarn.app.container.log.dir}/spark.log. For streaming application, configuring RollingFileAppender and setting file location to YARN's log directory will avoid disk overflow caused by large log file, and logs can be accessed using YARN's log utility.

# Important notes

- Before Hadoop 2.2, YARN does not support cores in container resource requests. Thus, when running against an earlier version, the numbers of cores given via command line arguments cannot be passed to YARN. Whether core requests are honored in scheduling decisions depends on which scheduler is in use and how it is configured.
Expand Down
6 changes: 3 additions & 3 deletions docs/sql-programming-guide.md
Expand Up @@ -605,7 +605,7 @@ Spark SQL can automatically infer the schema of a JSON dataset and load it as a
This conversion can be done using one of two methods in a SQLContext:

* `jsonFile` - loads data from a directory of JSON files where each line of the files is a JSON object.
* `jsonRdd` - loads data from an existing RDD where each element of the RDD is a string containing a JSON object.
* `jsonRDD` - loads data from an existing RDD where each element of the RDD is a string containing a JSON object.

{% highlight scala %}
// sc is an existing SparkContext.
Expand Down Expand Up @@ -643,7 +643,7 @@ Spark SQL can automatically infer the schema of a JSON dataset and load it as a
This conversion can be done using one of two methods in a JavaSQLContext :

* `jsonFile` - loads data from a directory of JSON files where each line of the files is a JSON object.
* `jsonRdd` - loads data from an existing RDD where each element of the RDD is a string containing a JSON object.
* `jsonRDD` - loads data from an existing RDD where each element of the RDD is a string containing a JSON object.

{% highlight java %}
// sc is an existing JavaSparkContext.
Expand Down Expand Up @@ -681,7 +681,7 @@ Spark SQL can automatically infer the schema of a JSON dataset and load it as a
This conversion can be done using one of two methods in a SQLContext:

* `jsonFile` - loads data from a directory of JSON files where each line of the files is a JSON object.
* `jsonRdd` - loads data from an existing RDD where each element of the RDD is a string containing a JSON object.
* `jsonRDD` - loads data from an existing RDD where each element of the RDD is a string containing a JSON object.

{% highlight python %}
# sc is an existing SparkContext.
Expand Down
4 changes: 2 additions & 2 deletions ec2/spark_ec2.py
Expand Up @@ -38,7 +38,7 @@
from boto.ec2.blockdevicemapping import BlockDeviceMapping, BlockDeviceType, EBSBlockDeviceType
from boto import ec2

DEFAULT_SPARK_VERSION = "1.0.0"
DEFAULT_SPARK_VERSION = "1.1.0"

# A URL prefix from which to fetch AMI information
AMI_PREFIX = "https://raw.github.com/mesos/spark-ec2/v2/ami-list"
Expand Down Expand Up @@ -218,7 +218,7 @@ def is_active(instance):
def get_spark_shark_version(opts):
spark_shark_map = {
"0.7.3": "0.7.1", "0.8.0": "0.8.0", "0.8.1": "0.8.1", "0.9.0": "0.9.0", "0.9.1": "0.9.1",
"1.0.0": "1.0.0"
"1.0.0": "1.0.0", "1.0.1": "1.0.1", "1.0.2": "1.0.2", "1.1.0": "1.1.0"
}
version = opts.spark_version.replace("v", "")
if version not in spark_shark_map:
Expand Down
Expand Up @@ -81,7 +81,7 @@ public void testKafkaStream() throws InterruptedException {
Predef.<Tuple2<String, Object>>conforms()));

HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("zookeeper.connect", testSuite.zkConnect());
kafkaParams.put("zookeeper.connect", testSuite.zkHost() + ":" + testSuite.zkPort());
kafkaParams.put("group.id", "test-consumer-" + KafkaTestUtils.random().nextInt(10000));
kafkaParams.put("auto.offset.reset", "smallest");

Expand Down

0 comments on commit c726bd9

Please sign in to comment.