Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
coderxiang committed Jun 22, 2015
2 parents 6ad4865 + 96aa013 commit f4fa534
Show file tree
Hide file tree
Showing 458 changed files with 10,893 additions and 4,583 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ scalastyle-output.xml
R-unit-tests.log
R/unit-tests.out
python/lib/pyspark.zip
lint-r-report.log

# For Hive
metastore_db/
Expand Down
2 changes: 2 additions & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ spark-env.sh
spark-env.cmd
spark-env.sh.template
log4j-defaults.properties
log4j-defaults-repl.properties
bootstrap-tooltip.js
jquery-1.11.1.min.js
d3.min.js
Expand Down Expand Up @@ -85,3 +86,4 @@ local-1430917381535_2
DESCRIPTION
NAMESPACE
test_support/*
.lintr
1 change: 1 addition & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -950,3 +950,4 @@ The following components are provided under the MIT License. See project link fo
(MIT License) scopt (com.github.scopt:scopt_2.10:3.2.0 - https://github.com/scopt/scopt)
(The MIT License) Mockito (org.mockito:mockito-all:1.8.5 - http://www.mockito.org)
(MIT License) jquery (https://jquery.org/license/)
(MIT License) AnchorJS (https://github.com/bryanbraun/anchorjs)
2 changes: 1 addition & 1 deletion R/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
log4j.rootCategory=INFO, file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=true
log4j.appender.file.file=R-unit-tests.log
log4j.appender.file.file=R/target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n

Expand Down
2 changes: 2 additions & 0 deletions R/pkg/.lintr
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
linters: with_defaults(line_length_linter(100), camel_case_linter = NULL, open_curly_linter(allow_single_line = TRUE), closed_curly_linter(allow_single_line = TRUE))
exclusions: list("inst/profile/general.R" = 1, "inst/profile/shell.R")
5 changes: 5 additions & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ export("sparkR.init")
export("sparkR.stop")
export("print.jobj")

# Job group lifecycle management methods
export("setJobGroup",
"clearJobGroup",
"cancelJobGroup")

exportClasses("DataFrame")

exportMethods("arrange",
Expand Down
44 changes: 44 additions & 0 deletions R/pkg/R/sparkR.R
Original file line number Diff line number Diff line change
Expand Up @@ -278,3 +278,47 @@ sparkRHive.init <- function(jsc = NULL) {
assign(".sparkRHivesc", hiveCtx, envir = .sparkREnv)
hiveCtx
}

#' Assigns a group ID to all the jobs started by this thread until the group ID is set to a
#' different value or cleared.
#'
#' @param sc existing spark context
#' @param groupid the ID to be assigned to job groups
#' @param description description for the the job group ID
#' @param interruptOnCancel flag to indicate if the job is interrupted on job cancellation
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' setJobGroup(sc, "myJobGroup", "My job group description", TRUE)
#'}

setJobGroup <- function(sc, groupId, description, interruptOnCancel) {
callJMethod(sc, "setJobGroup", groupId, description, interruptOnCancel)
}

#' Clear current job group ID and its description
#'
#' @param sc existing spark context
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' clearJobGroup(sc)
#'}

clearJobGroup <- function(sc) {
callJMethod(sc, "clearJobGroup")
}

#' Cancel active jobs for the specified group
#'
#' @param sc existing spark context
#' @param groupId the ID of job group to be cancelled
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' cancelJobGroup(sc, "myJobGroup")
#'}

cancelJobGroup <- function(sc, groupId) {
callJMethod(sc, "cancelJobGroup", groupId)
}
7 changes: 7 additions & 0 deletions R/pkg/inst/tests/test_context.R
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,10 @@ test_that("rdd GC across sparkR.stop", {
count(rdd3)
count(rdd4)
})

test_that("job group functions can be called", {
sc <- sparkR.init()
setJobGroup(sc, "groupId", "job description", TRUE)
cancelJobGroup(sc, "groupId")
clearJobGroup(sc)
})
35 changes: 13 additions & 22 deletions build/mvn
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,14 @@ install_app() {

# Install maven under the build/ folder
install_mvn() {
local MVN_VERSION="3.3.3"

install_app \
"http://archive.apache.org/dist/maven/maven-3/3.2.5/binaries" \
"apache-maven-3.2.5-bin.tar.gz" \
"apache-maven-3.2.5/bin/mvn"
MVN_BIN="${_DIR}/apache-maven-3.2.5/bin/mvn"
"http://archive.apache.org/dist/maven/maven-3/${MVN_VERSION}/binaries" \
"apache-maven-${MVN_VERSION}-bin.tar.gz" \
"apache-maven-${MVN_VERSION}/bin/mvn"

MVN_BIN="${_DIR}/apache-maven-${MVN_VERSION}/bin/mvn"
}

# Install zinc under the build/ folder
Expand Down Expand Up @@ -105,28 +108,16 @@ install_scala() {
SCALA_LIBRARY="$(cd "$(dirname ${scala_bin})/../lib" && pwd)/scala-library.jar"
}

# Determines if a given application is already installed. If not, will attempt
# to install
## Arg1 - application name
## Arg2 - Alternate path to local install under build/ dir
check_and_install_app() {
# create the local environment variable in uppercase
local app_bin="`echo $1 | awk '{print toupper(\$0)}'`_BIN"
# some black magic to set the generated app variable (i.e. MVN_BIN) into the
# environment
eval "${app_bin}=`which $1 2>/dev/null`"

if [ -z "`which $1 2>/dev/null`" ]; then
install_$1
fi
}

# Setup healthy defaults for the Zinc port if none were provided from
# the environment
ZINC_PORT=${ZINC_PORT:-"3030"}

# Check and install all applications necessary to build Spark
check_and_install_app "mvn"
# Install Maven if necessary
MVN_BIN="$(command -v mvn)"

if [ ! "$MVN_BIN" ]; then
install_mvn
fi

# Install the proper version of Scala and Zinc for the build
install_zinc
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Set everything to be logged to the console
log4j.rootCategory=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark-project.jetty=WARN
log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ function renderDagViz(forJob) {

// Find cached RDDs and mark them as such
metadataContainer().selectAll(".cached-rdd").each(function(v) {
var nodeId = VizConstants.nodePrefix + d3.select(this).text();
var rddId = d3.select(this).text().trim();
var nodeId = VizConstants.nodePrefix + rddId;
svg.selectAll("g." + nodeId).classed("cached", true);
});

Expand All @@ -150,7 +151,7 @@ function renderDagViz(forJob) {
/* Render the RDD DAG visualization on the stage page. */
function renderDagVizForStage(svgContainer) {
var metadata = metadataContainer().select(".stage-metadata");
var dot = metadata.select(".dot-file").text();
var dot = metadata.select(".dot-file").text().trim();
var containerId = VizConstants.graphPrefix + metadata.attr("stage-id");
var container = svgContainer.append("g").attr("id", containerId);
renderDot(dot, container, false);
Expand Down
26 changes: 19 additions & 7 deletions core/src/main/scala/org/apache/spark/Logging.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,25 @@ trait Logging {
if (usingLog4j12) {
val log4j12Initialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements
if (!log4j12Initialized) {
val defaultLogProps = "org/apache/spark/log4j-defaults.properties"
Option(Utils.getSparkClassLoader.getResource(defaultLogProps)) match {
case Some(url) =>
PropertyConfigurator.configure(url)
System.err.println(s"Using Spark's default log4j profile: $defaultLogProps")
case None =>
System.err.println(s"Spark was unable to load $defaultLogProps")
if (Utils.isInInterpreter) {
val replDefaultLogProps = "org/apache/spark/log4j-defaults-repl.properties"
Option(Utils.getSparkClassLoader.getResource(replDefaultLogProps)) match {
case Some(url) =>
PropertyConfigurator.configure(url)
System.err.println(s"Using Spark's repl log4j profile: $replDefaultLogProps")
System.err.println("To adjust logging level use sc.setLogLevel(\"INFO\")")
case None =>
System.err.println(s"Spark was unable to load $replDefaultLogProps")
}
} else {
val defaultLogProps = "org/apache/spark/log4j-defaults.properties"
Option(Utils.getSparkClassLoader.getResource(defaultLogProps)) match {
case Some(url) =>
PropertyConfigurator.configure(url)
System.err.println(s"Using Spark's default log4j profile: $defaultLogProps")
case None =>
System.err.println(s"Spark was unable to load $defaultLogProps")
}
}
}
}
Expand Down
49 changes: 48 additions & 1 deletion core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io._
import java.util.concurrent.ConcurrentHashMap
import java.util.zip.{GZIPInputStream, GZIPOutputStream}

import scala.collection.mutable.{HashSet, Map}
import scala.collection.mutable.{HashMap, HashSet, Map}
import scala.collection.JavaConversions._
import scala.reflect.ClassTag

Expand Down Expand Up @@ -284,6 +284,53 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
cachedSerializedStatuses.contains(shuffleId) || mapStatuses.contains(shuffleId)
}

/**
* Return a list of locations that each have fraction of map output greater than the specified
* threshold.
*
* @param shuffleId id of the shuffle
* @param reducerId id of the reduce task
* @param numReducers total number of reducers in the shuffle
* @param fractionThreshold fraction of total map output size that a location must have
* for it to be considered large.
*
* This method is not thread-safe.
*/
def getLocationsWithLargestOutputs(
shuffleId: Int,
reducerId: Int,
numReducers: Int,
fractionThreshold: Double)
: Option[Array[BlockManagerId]] = {

if (mapStatuses.contains(shuffleId)) {
val statuses = mapStatuses(shuffleId)
if (statuses.nonEmpty) {
// HashMap to add up sizes of all blocks at the same location
val locs = new HashMap[BlockManagerId, Long]
var totalOutputSize = 0L
var mapIdx = 0
while (mapIdx < statuses.length) {
val status = statuses(mapIdx)
val blockSize = status.getSizeForBlock(reducerId)
if (blockSize > 0) {
locs(status.location) = locs.getOrElse(status.location, 0L) + blockSize
totalOutputSize += blockSize
}
mapIdx = mapIdx + 1
}
val topLocs = locs.filter { case (loc, size) =>
size.toDouble / totalOutputSize >= fractionThreshold
}
// Return if we have any locations which satisfy the required threshold
if (topLocs.nonEmpty) {
return Some(topLocs.map(_._1).toArray)
}
}
}
None
}

def incrementEpoch() {
epochLock.synchronized {
epoch += 1
Expand Down
17 changes: 14 additions & 3 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
// key used to store the spark secret in the Hadoop UGI
private val sparkSecretLookupKey = "sparkCookie"

private val authOn = sparkConf.getBoolean("spark.authenticate", false)
private val authOn = sparkConf.getBoolean(SecurityManager.SPARK_AUTH_CONF, false)
// keep spark.ui.acls.enable for backwards compatibility with 1.0
private var aclsOn =
sparkConf.getBoolean("spark.acls.enable", sparkConf.getBoolean("spark.ui.acls.enable", false))
Expand Down Expand Up @@ -365,10 +365,12 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
cookie
} else {
// user must have set spark.authenticate.secret config
sparkConf.getOption("spark.authenticate.secret") match {
// For Master/Worker, auth secret is in conf; for Executors, it is in env variable
sys.env.get(SecurityManager.ENV_AUTH_SECRET)
.orElse(sparkConf.getOption(SecurityManager.SPARK_AUTH_SECRET_CONF)) match {
case Some(value) => value
case None => throw new Exception("Error: a secret key must be specified via the " +
"spark.authenticate.secret config")
SecurityManager.SPARK_AUTH_SECRET_CONF + " config")
}
}
sCookie
Expand Down Expand Up @@ -449,3 +451,12 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
override def getSaslUser(appId: String): String = getSaslUser()
override def getSecretKey(appId: String): String = getSecretKey()
}

private[spark] object SecurityManager {

val SPARK_AUTH_CONF: String = "spark.authenticate"
val SPARK_AUTH_SECRET_CONF: String = "spark.authenticate.secret"
// This is used to set auth secret to an executor's env variable. It should have the same
// value as SPARK_AUTH_SECERET_CONF set in SparkConf
val ENV_AUTH_SECRET = "_SPARK_AUTH_SECRET"
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class SerializableWritable[T <: Writable](@transient var t: T) extends Serializa
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
val ow = new ObjectWritable()
ow.setConf(new Configuration())
ow.setConf(new Configuration(false))
ow.readFields(in)
t = ow.get().asInstanceOf[T]
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ private[spark] object SparkConf extends Logging {
def isExecutorStartupConf(name: String): Boolean = {
isAkkaConf(name) ||
name.startsWith("spark.akka") ||
name.startsWith("spark.auth") ||
(name.startsWith("spark.auth") && name != SecurityManager.SPARK_AUTH_SECRET_CONF) ||
name.startsWith("spark.ssl") ||
isSparkPortConf(name)
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -974,7 +974,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
assertNotStopped()
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
this,
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark
import java.io.File
import java.net.Socket

import akka.actor.ActorSystem

import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.util.Properties
Expand Down Expand Up @@ -75,7 +77,8 @@ class SparkEnv (
val conf: SparkConf) extends Logging {

// TODO Remove actorSystem
val actorSystem = rpcEnv.asInstanceOf[AkkaRpcEnv].actorSystem
@deprecated("Actor system is no longer supported as of 1.4")
val actorSystem: ActorSystem = rpcEnv.asInstanceOf[AkkaRpcEnv].actorSystem

private[spark] var isStopped = false
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.HadoopRDD
import org.apache.spark.util.SerializableJobConf

/**
* Internal helper class that saves an RDD using a Hadoop OutputFormat.
Expand All @@ -42,7 +43,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
with Serializable {

private val now = new Date()
private val conf = new SerializableWritable(jobConf)
private val conf = new SerializableJobConf(jobConf)

private var jobID = 0
private var splitID = 0
Expand Down
Loading

0 comments on commit f4fa534

Please sign in to comment.