Skip to content

Commit

Permalink
merged master
Browse files Browse the repository at this point in the history
  • Loading branch information
manishamde committed May 7, 2014
2 parents 7fc9545 + 5200872 commit 968ca9d
Show file tree
Hide file tree
Showing 333 changed files with 5,575 additions and 1,513 deletions.
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,9 @@ unit-tests.log
/lib/
rat-results.txt
scalastyle.txt

# For Hive
metastore_db/
metastore/
warehouse/
TempStatsStore/
13 changes: 0 additions & 13 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,6 @@
<deb.user>root</deb.user>
</properties>

<repositories>
<!-- A repository in the local filesystem for the Py4J JAR, which is not in Maven central -->
<repository>
<id>lib</id>
<url>file://${project.basedir}/lib</url>
</repository>
</repositories>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down Expand Up @@ -84,11 +76,6 @@
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
<version>0.8.1</version>
</dependency>
</dependencies>

<build>
Expand Down
53 changes: 38 additions & 15 deletions bin/compute-classpath.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ CLASSPATH="$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH:$FWDIR/conf"

ASSEMBLY_DIR="$FWDIR/assembly/target/scala-$SCALA_VERSION"

if [ -n "$JAVA_HOME" ]; then
JAR_CMD="$JAVA_HOME/bin/jar"
else
JAR_CMD="jar"
fi

# First check if we have a dependencies jar. If so, include binary classes with the deps jar
if [ -f "$ASSEMBLY_DIR"/spark-assembly*hadoop*-deps.jar ]; then
CLASSPATH="$CLASSPATH:$FWDIR/core/target/scala-$SCALA_VERSION/classes"
Expand All @@ -44,33 +50,50 @@ if [ -f "$ASSEMBLY_DIR"/spark-assembly*hadoop*-deps.jar ]; then
CLASSPATH="$CLASSPATH:$FWDIR/sql/catalyst/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/sql/core/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/sql/hive/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/yarn/stable/target/scala-$SCALA_VERSION/classes"

DEPS_ASSEMBLY_JAR=`ls "$ASSEMBLY_DIR"/spark-assembly*hadoop*-deps.jar`
CLASSPATH="$CLASSPATH:$DEPS_ASSEMBLY_JAR"
ASSEMBLY_JAR=$(ls "$ASSEMBLY_DIR"/spark-assembly*hadoop*-deps.jar 2>/dev/null)
else
# Else use spark-assembly jar from either RELEASE or assembly directory
if [ -f "$FWDIR/RELEASE" ]; then
ASSEMBLY_JAR=`ls "$FWDIR"/lib/spark-assembly*hadoop*.jar`
ASSEMBLY_JAR=$(ls "$FWDIR"/lib/spark-assembly*hadoop*.jar 2>/dev/null)
else
ASSEMBLY_JAR=`ls "$ASSEMBLY_DIR"/spark-assembly*hadoop*.jar`
ASSEMBLY_JAR=$(ls "$ASSEMBLY_DIR"/spark-assembly*hadoop*.jar 2>/dev/null)
fi
CLASSPATH="$CLASSPATH:$ASSEMBLY_JAR"
fi

# Verify that versions of java used to build the jars and run Spark are compatible
jar_error_check=$("$JAR_CMD" -tf "$ASSEMBLY_JAR" nonexistent/class/path 2>&1)
if [[ "$jar_error_check" =~ "invalid CEN header" ]]; then
echo "Loading Spark jar with '$JAR_CMD' failed. "
echo "This is likely because Spark was compiled with Java 7 and run "
echo "with Java 6. (see SPARK-1703). Please use Java 7 to run Spark "
echo "or build Spark with Java 6."
exit 1
fi

CLASSPATH="$CLASSPATH:$ASSEMBLY_JAR"

# When Hive support is needed, Datanucleus jars must be included on the classpath.
# Datanucleus jars do not work if only included in the uber jar as plugin.xml metadata is lost.
# Datanucleus jars do not work if only included in the uber jar as plugin.xml metadata is lost.
# Both sbt and maven will populate "lib_managed/jars/" with the datanucleus jars when Spark is
# built with Hive, so first check if the datanucleus jars exist, and then ensure the current Spark
# assembly is built for Hive, before actually populating the CLASSPATH with the jars.
# Note that this check order is faster (by up to half a second) in the case where Hive is not used.
num_datanucleus_jars=$(ls "$FWDIR"/lib_managed/jars/ 2>/dev/null | grep "datanucleus-.*\\.jar" | wc -l)
if [ $num_datanucleus_jars -gt 0 ]; then
AN_ASSEMBLY_JAR=${ASSEMBLY_JAR:-$DEPS_ASSEMBLY_JAR}
num_hive_files=$(jar tvf "$AN_ASSEMBLY_JAR" org/apache/hadoop/hive/ql/exec 2>/dev/null | wc -l)
if [ $num_hive_files -gt 0 ]; then
if [ -f "$FWDIR/RELEASE" ]; then
datanucleus_dir="$FWDIR"/lib
else
datanucleus_dir="$FWDIR"/lib_managed/jars
fi

datanucleus_jars=$(find "$datanucleus_dir" 2>/dev/null | grep "datanucleus-.*\\.jar")
datanucleus_jars=$(echo "$datanucleus_jars" | tr "\n" : | sed s/:$//g)

if [ -n "$datanucleus_jars" ]; then
hive_files=$("$JAR_CMD" -tf "$ASSEMBLY_JAR" org/apache/hadoop/hive/ql/exec 2>/dev/null)
if [ -n "$hive_files" ]; then
echo "Spark assembly has been built with Hive, including Datanucleus jars on classpath" 1>&2
DATANUCLEUSJARS=$(echo "$FWDIR/lib_managed/jars"/datanucleus-*.jar | tr " " :)
CLASSPATH=$CLASSPATH:$DATANUCLEUSJARS
CLASSPATH="$CLASSPATH:$datanucleus_jars"
fi
fi

Expand All @@ -90,10 +113,10 @@ fi
# Add hadoop conf dir if given -- otherwise FileSystem.*, etc fail !
# Note, this assumes that there is either a HADOOP_CONF_DIR or YARN_CONF_DIR which hosts
# the configurtion files.
if [ "x" != "x$HADOOP_CONF_DIR" ]; then
if [ -n "$HADOOP_CONF_DIR" ]; then
CLASSPATH="$CLASSPATH:$HADOOP_CONF_DIR"
fi
if [ "x" != "x$YARN_CONF_DIR" ]; then
if [ -n "$YARN_CONF_DIR" ]; then
CLASSPATH="$CLASSPATH:$YARN_CONF_DIR"
fi

Expand Down
1 change: 1 addition & 0 deletions bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ export PYSPARK_PYTHON

# Add the PySpark classes to the Python path:
export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH
export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH

# Load the PySpark shell.py script when ./pyspark is used interactively:
export OLD_PYTHONSTARTUP=$PYTHONSTARTUP
Expand Down
1 change: 1 addition & 0 deletions bin/pyspark2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ rem Figure out which Python to use.
if "x%PYSPARK_PYTHON%"=="x" set PYSPARK_PYTHON=python

set PYTHONPATH=%FWDIR%python;%PYTHONPATH%
set PYTHONPATH=%FWDIR%python\lib\py4j-0.8.1-src.zip;%PYTHONPATH%

set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
set PYTHONSTARTUP=%FWDIR%python\pyspark\shell.py
Expand Down
9 changes: 8 additions & 1 deletion bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,14 @@ if [ -e "$TOOLS_DIR"/target/spark-tools*[0-9Tg].jar ]; then
fi

# Compute classpath using external script
CLASSPATH=`$FWDIR/bin/compute-classpath.sh`
classpath_output=$($FWDIR/bin/compute-classpath.sh)
if [[ "$?" != "0" ]]; then
echo "$classpath_output"
exit 1
else
CLASSPATH=$classpath_output
fi

if [[ "$1" =~ org.apache.spark.tools.* ]]; then
CLASSPATH="$CLASSPATH:$SPARK_TOOLS_JAR"
fi
Expand Down
4 changes: 2 additions & 2 deletions bin/spark-submit
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#

export SPARK_HOME="$(cd `dirname $0`/..; pwd)"
ORIG_ARGS=$@
ORIG_ARGS=("$@")

while (($#)); do
if [ "$1" = "--deploy-mode" ]; then
Expand All @@ -39,5 +39,5 @@ if [ ! -z $DRIVER_MEMORY ] && [ ! -z $DEPLOY_MODE ] && [ $DEPLOY_MODE = "client"
export SPARK_MEM=$DRIVER_MEMORY
fi

$SPARK_HOME/bin/spark-class org.apache.spark.deploy.SparkSubmit $ORIG_ARGS
$SPARK_HOME/bin/spark-class org.apache.spark.deploy.SparkSubmit "${ORIG_ARGS[@]}"

57 changes: 51 additions & 6 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,6 @@
<dependency>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
Expand All @@ -69,6 +63,10 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
Expand Down Expand Up @@ -249,6 +247,11 @@
<artifactId>pyrolite</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
<version>0.8.1</version>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down Expand Up @@ -294,6 +297,48 @@
</environmentVariables>
</configuration>
</plugin>
<!-- Unzip py4j so we can include its files in the jar -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<phase>generate-resources</phase>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>unzip</executable>
<workingDirectory>../python</workingDirectory>
<arguments>
<argument>-o</argument>
<argument>lib/py4j*.zip</argument>
<argument>-d</argument>
<argument>build</argument>
</arguments>
</configuration>
</plugin>
</plugins>

<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
<resource>
<directory>../python</directory>
<includes>
<include>pyspark/*.py</include>
</includes>
</resource>
<resource>
<directory>../python/build</directory>
<includes>
<include>py4j/*.py</include>
</includes>
</resource>
</resources>
</build>
</project>
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,9 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
setViewAcls(defaultAclUsers, sparkConf.get("spark.ui.view.acls", ""))

private val secretKey = generateSecretKey()
logInfo("SecurityManager, is authentication enabled: " + authOn +
" are ui acls enabled: " + uiAclsOn + " users with view permissions: " + viewAcls.toString())
logInfo("SecurityManager: authentication " + (if (authOn) "enabled" else "disabled") +
"; ui acls " + (if (uiAclsOn) "enabled" else "disabled") +
"; users with view permissions: " + viewAcls.toString())

// Set our own authenticator to properly negotiate user/password for HTTP connections.
// This is needed by the HTTP client fetching from the HttpServer. Put here so its
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark

import java.io.File

import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.concurrent.Await
Expand Down Expand Up @@ -156,13 +158,11 @@ object SparkEnv extends Logging {
conf.set("spark.driver.port", boundPort.toString)
}

val classLoader = Thread.currentThread.getContextClassLoader

// Create an instance of the class named by the given Java system property, or by
// defaultClassName if the property is not set, and return it as a T
def instantiateClass[T](propertyName: String, defaultClassName: String): T = {
val name = conf.get(propertyName, defaultClassName)
val cls = Class.forName(name, true, classLoader)
val cls = Class.forName(name, true, Utils.getContextOrSparkClassLoader)
// First try with the constructor that takes SparkConf. If we can't find one,
// use a no-arg constructor instead.
try {
Expand Down Expand Up @@ -306,7 +306,7 @@ object SparkEnv extends Logging {
k == "java.class.path"
}.getOrElse(("", ""))
val classPathEntries = classPathProperty._2
.split(conf.get("path.separator", ":"))
.split(File.pathSeparator)
.filterNot(e => e.isEmpty)
.map(e => (e, "System Classpath"))
val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "Added By User"))
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/org/apache/spark/TaskContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,21 @@ class TaskContext(
// List of callback functions to execute when the task completes.
@transient private val onCompleteCallbacks = new ArrayBuffer[() => Unit]

// Set to true when the task is completed, before the onCompleteCallbacks are executed.
@volatile var completed: Boolean = false

/**
* Add a callback function to be executed on task completion. An example use
* is for HadoopRDD to register a callback to close the input stream.
* Will be called in any situation - success, failure, or cancellation.
* @param f Callback function.
*/
def addOnCompleteCallback(f: () => Unit) {
onCompleteCallbacks += f
}

def executeOnCompleteCallbacks() {
completed = true
// Process complete callbacks in the reverse order of registration
onCompleteCallbacks.reverse.foreach{_()}
}
Expand Down
26 changes: 23 additions & 3 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.api.java

import java.util.{Comparator, List => JList, Iterator => JIterator}
import java.lang.{Iterable => JIterable}
import java.lang.{Iterable => JIterable, Long => JLong}

import scala.collection.JavaConversions._
import scala.reflect.ClassTag
Expand Down Expand Up @@ -75,11 +75,11 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index
* of the original partition.
*/
def mapPartitionsWithIndex[R: ClassTag](
def mapPartitionsWithIndex[R](
f: JFunction2[java.lang.Integer, java.util.Iterator[T], java.util.Iterator[R]],
preservesPartitioning: Boolean = false): JavaRDD[R] =
new JavaRDD(rdd.mapPartitionsWithIndex(((a,b) => f(a,asJavaIterator(b))),
preservesPartitioning))
preservesPartitioning)(fakeClassTag))(fakeClassTag)

/**
* Return a new RDD by applying a function to all elements of this RDD.
Expand Down Expand Up @@ -264,6 +264,26 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
rdd.zipPartitions(other.rdd)(fn)(other.classTag, fakeClassTag[V]))(fakeClassTag[V])
}

/**
* Zips this RDD with generated unique Long ids. Items in the kth partition will get ids k, n+k,
* 2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method
* won't trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]].
*/
def zipWithUniqueId(): JavaPairRDD[T, JLong] = {
JavaPairRDD.fromRDD(rdd.zipWithUniqueId()).asInstanceOf[JavaPairRDD[T, JLong]]
}

/**
* Zips this RDD with its element indices. The ordering is first based on the partition index
* and then the ordering of items within each partition. So the first item in the first
* partition gets index 0, and the last item in the last partition receives the largest index.
* This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index type.
* This method needs to trigger a spark job when this RDD contains more than one partitions.
*/
def zipWithIndex(): JavaPairRDD[T, JLong] = {
JavaPairRDD.fromRDD(rdd.zipWithIndex()).asInstanceOf[JavaPairRDD[T, JLong]]
}

// Actions (launch a job to return a value to the user program)

/**
Expand Down
Loading

0 comments on commit 968ca9d

Please sign in to comment.