Skip to content

Commit

Permalink
Merge branch 'master' into pyspark-inputformats
Browse files Browse the repository at this point in the history
Conflicts:
	core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
  • Loading branch information
MLnick committed Mar 19, 2014
2 parents 97ef708 + 4d88030 commit 41856a5
Show file tree
Hide file tree
Showing 425 changed files with 4,726 additions and 2,226 deletions.
2 changes: 1 addition & 1 deletion assembly/pom.xml
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>0.9.0-incubating-SNAPSHOT</version>
<version>1.0.0-incubating-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion bagel/pom.xml
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>0.9.0-incubating-SNAPSHOT</version>
<version>1.0.0-incubating-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
56 changes: 33 additions & 23 deletions bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
Expand Up @@ -28,21 +28,23 @@ object Bagel extends Logging {
/**
* Runs a Bagel program.
* @param sc [[org.apache.spark.SparkContext]] to use for the program.
* @param vertices vertices of the graph represented as an RDD of (Key, Vertex) pairs. Often the Key will be
* the vertex id.
* @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often this will be an
* empty array, i.e. sc.parallelize(Array[K, Message]()).
* @param combiner [[org.apache.spark.bagel.Combiner]] combines multiple individual messages to a given vertex into one
* message before sending (which often involves network I/O).
* @param aggregator [[org.apache.spark.bagel.Aggregator]] performs a reduce across all vertices after each superstep,
* and provides the result to each vertex in the next superstep.
* @param vertices vertices of the graph represented as an RDD of (Key, Vertex) pairs. Often the
* Key will be the vertex id.
* @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often
* this will be an empty array, i.e. sc.parallelize(Array[K, Message]()).
* @param combiner [[org.apache.spark.bagel.Combiner]] combines multiple individual messages to a
* given vertex into one message before sending (which often involves network
* I/O).
* @param aggregator [[org.apache.spark.bagel.Aggregator]] performs a reduce across all vertices
* after each superstep and provides the result to each vertex in the next
* superstep.
* @param partitioner [[org.apache.spark.Partitioner]] partitions values by key
* @param numPartitions number of partitions across which to split the graph.
* Default is the default parallelism of the SparkContext
* @param storageLevel [[org.apache.spark.storage.StorageLevel]] to use for caching of intermediate RDDs in each superstep.
* Defaults to caching in memory.
* @param compute function that takes a Vertex, optional set of (possibly combined) messages to the Vertex,
* optional Aggregator and the current superstep,
* @param storageLevel [[org.apache.spark.storage.StorageLevel]] to use for caching of
* intermediate RDDs in each superstep. Defaults to caching in memory.
* @param compute function that takes a Vertex, optional set of (possibly combined) messages to
* the Vertex, optional Aggregator and the current superstep,
* and returns a set of (Vertex, outgoing Messages) pairs
* @tparam K key
* @tparam V vertex type
Expand Down Expand Up @@ -71,7 +73,7 @@ object Bagel extends Logging {
var msgs = messages
var noActivity = false
do {
logInfo("Starting superstep "+superstep+".")
logInfo("Starting superstep " + superstep + ".")
val startTime = System.currentTimeMillis

val aggregated = agg(verts, aggregator)
Expand All @@ -97,7 +99,8 @@ object Bagel extends Logging {
verts
}

/** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the default storage level */
/** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the default
* storage level */
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
Expand All @@ -106,8 +109,8 @@ object Bagel extends Logging {
partitioner: Partitioner,
numPartitions: Int
)(
compute: (V, Option[C], Int) => (V, Array[M])
): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
compute: (V, Option[C], Int) => (V, Array[M])): RDD[(K, V)] = run(sc, vertices, messages,
combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)

/** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] */
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
Expand All @@ -127,8 +130,8 @@ object Bagel extends Logging {
}

/**
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], default [[org.apache.spark.HashPartitioner]]
* and default storage level
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], default
* [[org.apache.spark.HashPartitioner]] and default storage level
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
Expand All @@ -138,9 +141,13 @@ object Bagel extends Logging {
numPartitions: Int
)(
compute: (V, Option[C], Int) => (V, Array[M])
): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions,
DEFAULT_STORAGE_LEVEL)(compute)

/** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the default [[org.apache.spark.HashPartitioner]]*/
/**
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the
* default [[org.apache.spark.HashPartitioner]]
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
Expand All @@ -158,7 +165,8 @@ object Bagel extends Logging {
}

/**
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], default [[org.apache.spark.HashPartitioner]],
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]],
* default [[org.apache.spark.HashPartitioner]],
* [[org.apache.spark.bagel.DefaultCombiner]] and the default storage level
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
Expand All @@ -171,7 +179,8 @@ object Bagel extends Logging {
): RDD[(K, V)] = run(sc, vertices, messages, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)

/**
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], the default [[org.apache.spark.HashPartitioner]]
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]],
* the default [[org.apache.spark.HashPartitioner]]
* and [[org.apache.spark.bagel.DefaultCombiner]]
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
Expand Down Expand Up @@ -227,8 +236,9 @@ object Bagel extends Logging {
})

numMsgs += newMsgs.size
if (newVert.active)
if (newVert.active) {
numActiveVerts += 1
}

Some((newVert, newMsgs))
}.persist(storageLevel)
Expand Down
48 changes: 42 additions & 6 deletions bin/spark-shell
Expand Up @@ -21,8 +21,6 @@
# Shell script for starting the Spark Shell REPL
# Note that it will set MASTER to spark://${SPARK_MASTER_IP}:${SPARK_MASTER_PORT}
# if those two env vars are set in spark-env.sh but MASTER is not.
# Options:
# -c <cores> Set the number of cores for REPL to use

cygwin=false
case "`uname`" in
Expand All @@ -32,14 +30,52 @@ esac
# Enter posix mode for bash
set -o posix

CORE_PATTERN="^[0-9]+$"
MEM_PATTERN="^[0-9]+[m|g|M|G]$"

FWDIR="$(cd `dirname $0`/..; pwd)"

if [ "$1" = "--help" ] || [ "$1" = "-h" ]; then
echo "Usage: spark-shell [OPTIONS]"
echo "OPTIONS:"
echo "-c --cores num, the maximum number of cores to be used by the spark shell"
echo "-em --execmem num[m|g], the memory used by each executor of spark shell"
echo "-dm --drivermem num[m|g], the memory used by the spark shell and driver"
echo "-h --help, print this help information"
exit
fi

SPARK_SHELL_OPTS=""

for o in "$@"; do
if [ "$1" = "-c" -o "$1" = "--cores" ]; then
shift
if [ -n "$1" ]; then
OPTIONS="-Dspark.cores.max=$1"
if [[ "$1" =~ $CORE_PATTERN ]]; then
SPARK_SHELL_OPTS="$SPARK_SHELL_OPTS -Dspark.cores.max=$1"
shift
else
echo "ERROR: wrong format for -c/--cores"
exit 1
fi
fi
if [ "$1" = "-em" -o "$1" = "--execmem" ]; then
shift
if [[ $1 =~ $MEM_PATTERN ]]; then
SPARK_SHELL_OPTS="$SPARK_SHELL_OPTS -Dspark.executor.memory=$1"
shift
else
echo "ERROR: wrong format for --execmem/-em"
exit 1
fi
fi
if [ "$1" = "-dm" -o "$1" = "--drivermem" ]; then
shift
if [[ $1 =~ $MEM_PATTERN ]]; then
export SPARK_MEM=$1
shift
else
echo "ERROR: wrong format for --drivermem/-dm"
exit 1
fi
fi
done
Expand Down Expand Up @@ -95,10 +131,10 @@ if $cygwin; then
# "Backspace sends ^H" setting in "Keys" section of the Mintty options
# (see https://github.com/sbt/sbt/issues/562).
stty -icanon min 1 -echo > /dev/null 2>&1
$FWDIR/bin/spark-class -Djline.terminal=unix $OPTIONS org.apache.spark.repl.Main "$@"
$FWDIR/bin/spark-class -Djline.terminal=unix $SPARK_SHELL_OPTS org.apache.spark.repl.Main "$@"
stty icanon echo > /dev/null 2>&1
else
$FWDIR/bin/spark-class $OPTIONS org.apache.spark.repl.Main "$@"
$FWDIR/bin/spark-class $SPARK_SHELL_OPTS org.apache.spark.repl.Main "$@"
fi

# record the exit status lest it be overwritten:
Expand Down
2 changes: 1 addition & 1 deletion conf/metrics.properties.template
Expand Up @@ -67,7 +67,7 @@
# period 10 Poll period
# unit seconds Units of poll period
# ttl 1 TTL of messages sent by Ganglia
# mode multicast Ganglia network mode ('unicast' or 'mulitcast')
# mode multicast Ganglia network mode ('unicast' or 'multicast')

# org.apache.spark.metrics.sink.JmxSink

Expand Down
1 change: 1 addition & 0 deletions conf/spark-env.sh.template
Expand Up @@ -19,3 +19,4 @@
# - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT
# - SPARK_WORKER_INSTANCES, to set the number of worker processes per node
# - SPARK_WORKER_DIR, to set the working directory of worker processes
# - SPARK_PUBLIC_DNS, to set the public dns name of the master
33 changes: 23 additions & 10 deletions core/pom.xml
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>0.9.0-incubating-SNAPSHOT</version>
<version>1.0.0-incubating-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand All @@ -39,6 +39,12 @@
<dependency>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
Expand Down Expand Up @@ -68,6 +74,22 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>com.ning</groupId>
<artifactId>compress-lzf</artifactId>
Expand Down Expand Up @@ -127,10 +149,6 @@
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
<dependency>
<groupId>com.clearspring.analytics</groupId>
<artifactId>stream</artifactId>
Expand Down Expand Up @@ -190,11 +208,6 @@
<artifactId>junit-interface</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
Expand Up @@ -23,17 +23,17 @@
* Expose some commonly useful storage level constants.
*/
public class StorageLevels {
public static final StorageLevel NONE = new StorageLevel(false, false, false, 1);
public static final StorageLevel DISK_ONLY = new StorageLevel(true, false, false, 1);
public static final StorageLevel DISK_ONLY_2 = new StorageLevel(true, false, false, 2);
public static final StorageLevel MEMORY_ONLY = new StorageLevel(false, true, true, 1);
public static final StorageLevel MEMORY_ONLY_2 = new StorageLevel(false, true, true, 2);
public static final StorageLevel MEMORY_ONLY_SER = new StorageLevel(false, true, false, 1);
public static final StorageLevel MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, 2);
public static final StorageLevel MEMORY_AND_DISK = new StorageLevel(true, true, true, 1);
public static final StorageLevel MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2);
public static final StorageLevel MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, 1);
public static final StorageLevel MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2);
public static final StorageLevel NONE = create(false, false, false, 1);
public static final StorageLevel DISK_ONLY = create(true, false, false, 1);
public static final StorageLevel DISK_ONLY_2 = create(true, false, false, 2);
public static final StorageLevel MEMORY_ONLY = create(false, true, true, 1);
public static final StorageLevel MEMORY_ONLY_2 = create(false, true, true, 2);
public static final StorageLevel MEMORY_ONLY_SER = create(false, true, false, 1);
public static final StorageLevel MEMORY_ONLY_SER_2 = create(false, true, false, 2);
public static final StorageLevel MEMORY_AND_DISK = create(true, true, true, 1);
public static final StorageLevel MEMORY_AND_DISK_2 = create(true, true, true, 2);
public static final StorageLevel MEMORY_AND_DISK_SER = create(true, true, false, 1);
public static final StorageLevel MEMORY_AND_DISK_SER_2 = create(true, true, false, 2);

/**
* Create a new StorageLevel object.
Expand Down
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.network.netty;

import java.util.concurrent.TimeUnit;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
Expand All @@ -27,8 +29,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

class FileClient {

private static final Logger LOG = LoggerFactory.getLogger(FileClient.class.getName());
Expand Down
Expand Up @@ -23,11 +23,11 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.DefaultFileRegion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.storage.BlockId;
import org.apache.spark.storage.FileSegment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class FileServerHandler extends SimpleChannelInboundHandler<String> {

Expand Down
Expand Up @@ -17,7 +17,8 @@

package org.apache.hadoop.mapreduce

import java.lang.{Integer => JInteger, Boolean => JBoolean}
import java.lang.{Boolean => JBoolean, Integer => JInteger}

import org.apache.hadoop.conf.Configuration

private[apache]
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/org/apache/spark/Accumulators.scala
Expand Up @@ -19,8 +19,9 @@ package org.apache.spark

import java.io.{ObjectInputStream, Serializable}

import scala.collection.mutable.Map
import scala.collection.generic.Growable
import scala.collection.mutable.Map

import org.apache.spark.serializer.JavaSerializer

/**
Expand Down Expand Up @@ -188,8 +189,8 @@ class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Ser
* A simpler value of [[Accumulable]] where the result type being accumulated is the same
* as the types of elements being merged, i.e. variables that are only "added" to through an
* associative operation and can therefore be efficiently supported in parallel. They can be used
* to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of type
* `Int` and `Double`, and programmers can add support for new types.
* to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric
* value types, and programmers can add support for new types.
*
* An accumulator is created from an initial value `v` by calling [[SparkContext#accumulator]].
* Tasks running on the cluster can then add to it using the [[Accumulable#+=]] operator.
Expand Down

0 comments on commit 41856a5

Please sign in to comment.