Skip to content

Commit

Permalink
multi-tracker branch now compiles and runs; but it crashes right befo…
Browse files Browse the repository at this point in the history
…re the

end. The same problem is seen also in the master branch (in the
ChainedStreaming implementation)
  • Loading branch information
Mosharaf Chowdhury committed Oct 12, 2010
1 parent 4fdd482 commit 0d67bc1
Show file tree
Hide file tree
Showing 122 changed files with 1,479 additions and 16,160 deletions.
14 changes: 9 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@ EMPTY =
SPACE = $(EMPTY) $(EMPTY)

# Build up classpath by concatenating some strings
JARS = third_party/nexus.jar
JARS = third_party/mesos.jar
JARS += third_party/asm-3.2/lib/all/asm-all-3.2.jar
JARS += third_party/colt.jar
JARS += third_party/google-collect-1.0-rc5/google-collect-1.0-rc5.jar
JARS += third_party/guava-r06/guava-r06.jar
JARS += third_party/hadoop-0.20.0/hadoop-0.20.0-core.jar
JARS += third_party/hadoop-0.20.0/lib/commons-logging-1.0.4.jar
JARS += third_party/scalatest-1.2-for-scala-2.8.0.RC3-SNAPSHOT.jar
JARS += third_party/scalacheck_2.8.0.RC3-1.7.jar
JARS += third_party/FreePastry-2.1.jar
JARS += third_party/scalatest-1.2/scalatest-1.2.jar
JARS += third_party/scalacheck_2.8.0-1.7.jar
JARS += third_party/jetty-7.1.6.v20100715/jetty-server-7.1.6.v20100715.jar
JARS += third_party/jetty-7.1.6.v20100715/servlet-api-2.5.jar
JARS += third_party/apache-log4j-1.2.16/log4j-1.2.16.jar
JARS += third_party/slf4j-1.6.1/slf4j-api-1.6.1.jar
JARS += third_party/slf4j-1.6.1/slf4j-log4j12-1.6.1.jar
CLASSPATH = $(subst $(SPACE),:,$(JARS))

SCALA_SOURCES = src/examples/*.scala src/scala/spark/*.scala src/scala/spark/repl/*.scala
Expand Down
29 changes: 27 additions & 2 deletions README
Original file line number Diff line number Diff line change
@@ -1,15 +1,40 @@
Spark requires Scala 2.8. This version has been tested with 2.8.0RC3.
BUILDING

Spark requires Scala 2.8. This version has been tested with 2.8.0.final.

To build and run Spark, you will need to have Scala's bin in your $PATH,
or you will need to set the SCALA_HOME environment variable to point
to where you've installed Scala. Scala must be accessible through one
of these methods on Nexus slave nodes as well as on the master.
of these methods on Mesos slave nodes as well as on the master.

To build Spark and the example programs, run make.

To run one of the examples, use ./run <class> <params>. For example,
./run SparkLR will run the Logistic Regression example. Each of the
example programs prints usage help if no params are given.

All of the Spark samples take a <host> parameter that is the Mesos master
to connect to. This can be a Mesos URL, or "local" to run locally with one
thread, or "local[N]" to run locally with N threads.

Tip: If you are building Spark and examples repeatedly, export USE_FSC=1
to have the Makefile use the fsc compiler daemon instead of scalac.

CONFIGURATION

Spark can be configured through two files: conf/java-opts and conf/spark-env.sh.

In java-opts, you can add flags to be passed to the JVM when running Spark.

In spark-env.sh, you can set any environment variables you wish to be available
when running Spark programs, such as PATH, SCALA_HOME, etc. There are also
several Spark-specific variables you can set:
- SPARK_CLASSPATH: Extra entries to be added to the classpath, separated by ":".
- SPARK_MEM: Memory for Spark to use, in the format used by java's -Xmx option
(for example, 200m meams 200 MB, 1g means 1 GB, etc).
- SPARK_LIBRARY_PATH: Extra entries to add to java.library.path for locating
shared libraries.
- SPARK_JAVA_OPTS: Extra options to pass to JVM.

Note that spark-env.sh must be a shell script (it must be executable and start
with a #! header to specify the shell to use).
1 change: 1 addition & 0 deletions conf/java-opts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-Dspark.broadcast.masterHostAddress=127.0.0.1 -Dspark.broadcast.masterTrackerPort=11111 -Dspark.broadcast.blockSize=1024 -Dspark.broadcast.maxRetryCount=2 -Dspark.broadcast.serverSocketTimout=50000 -Dspark.broadcast.dualMode=false
8 changes: 8 additions & 0 deletions conf/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Set everything to be logged to the console
log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.eclipse.jetty=WARN
13 changes: 13 additions & 0 deletions conf/spark-env.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/usr/bin/env bash

# Set Spark environment variables for your site in this file. Some useful
# variables to set are:
# - MESOS_HOME, to point to your Mesos installation
# - SCALA_HOME, to point to your Scala installation
# - SPARK_CLASSPATH, to add elements to Spark's classpath
# - SPARK_JAVA_OPTS, to add JVM options
# - SPARK_MEM, to change the amount of memory used per node (this should
# be in the same format as the JVM's -Xmx option, e.g. 300m or 1g).
# - SPARK_LIBRARY_PATH, to add extra search paths for native libraries.

MESOS_HOME=/home/mosharaf/Work/mesos
40 changes: 31 additions & 9 deletions run
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,49 @@
# Figure out where the Scala framework is installed
FWDIR=`dirname $0`

# Set JAVA_OPTS to be able to load libnexus.so and set various other misc options
export JAVA_OPTS="-Djava.library.path=$FWDIR/third_party:$FWDIR/src/native -Xms100m -Xmx750m"
# Load environment variables from conf/spark-env.sh, if it exists
if [ -e $FWDIR/conf/spark-env.sh ] ; then
. $FWDIR/conf/spark-env.sh
fi

if [ "x$MESOS_HOME" != "x" ] ; then
SPARK_CLASSPATH="$MESOS_HOME/lib/java/mesos.jar:$SPARK_CLASSPATH"
SPARK_LIBRARY_PATH="$MESOS_HOME/lib/java:$SPARK_LIBARY_PATH"
fi

if [ "x$SPARK_MEM" == "x" ] ; then
SPARK_MEM="300m"
fi

# Set JAVA_OPTS to be able to load native libraries and to set heap size
JAVA_OPTS="$SPARK_JAVA_OPTS"
JAVA_OPTS+=" -Djava.library.path=$SPARK_LIBRARY_PATH:$FWDIR/third_party:$FWDIR/src/native"
JAVA_OPTS+=" -Xms$SPARK_MEM -Xmx$SPARK_MEM"
# Load extra JAVA_OPTS from conf/java-opts, if it exists
if [ -e $FWDIR/conf/java-opts ] ; then
JAVA_OPTS+=" `cat $FWDIR/conf/java-opts`"
fi
export JAVA_OPTS

# Build up classpath
CLASSPATH=$FWDIR/build/classes
CLASSPATH+=:$FWDIR/third_party/nexus.jar
CLASSPATH="$SPARK_CLASSPATH:$FWDIR/build/classes"
CLASSPATH+=:$FWDIR/conf
CLASSPATH+=:$FWDIR/third_party/mesos.jar
CLASSPATH+=:$FWDIR/third_party/asm-3.2/lib/all/asm-all-3.2.jar
CLASSPATH+=:$FWDIR/third_party/colt.jar
CLASSPATH+=:$FWDIR/third_party/google-collect-1.0-rc5/google-collect-1.0-rc5.jar
CLASSPATH+=:$FWDIR/third_party/guava-r06/guava-r06.jar
CLASSPATH+=:$FWDIR/third_party/hadoop-0.20.0/hadoop-0.20.0-core.jar
CLASSPATH+=:third_party/scalatest-1.2-for-scala-2.8.0.RC3-SNAPSHOT.jar
CLASSPATH+=:third_party/scalacheck_2.8.0.RC3-1.7.jar
CLASSPATH+=:$FWDIR/third_party/FreePastry-2.1.jar
CLASSPATH+=:$FWDIR/third_party/scalatest-1.2/scalatest-1.2.jar
CLASSPATH+=:$FWDIR/third_party/scalacheck_2.8.0-1.7.jar
CLASSPATH+=:$FWDIR/third_party/jetty-7.1.6.v20100715/jetty-server-7.1.6.v20100715.jar
CLASSPATH+=:$FWDIR/third_party/jetty-7.1.6.v20100715/servlet-api-2.5.jar
CLASSPATH+=:$FWDIR/third_party/apache-log4j-1.2.16/log4j-1.2.16.jar
CLASSPATH+=:$FWDIR/third_party/slf4j-1.6.1/slf4j-api-1.6.1.jar
CLASSPATH+=:$FWDIR/third_party/slf4j-1.6.1/slf4j-log4j12-1.6.1.jar
for jar in $FWDIR/third_party/hadoop-0.20.0/lib/*.jar; do
CLASSPATH+=:$jar
done
export CLASSPATH
export CLASSPATH # Needed for spark-shell

if [ -n "$SCALA_HOME" ]; then
SCALA=${SCALA_HOME}/bin/scala
Expand Down
3 changes: 1 addition & 2 deletions spark-executor
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#!/bin/sh
echo "In spark-executor"
FWDIR="`dirname $0`"
echo Framework dir: $FWDIR
echo "Running spark-executor with framework dir = $FWDIR"
exec $FWDIR/run spark.Executor
18 changes: 7 additions & 11 deletions src/examples/BroadcastTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,15 @@ object BroadcastTest {
val slices = if (args.length > 1) args(1).toInt else 2
val num = if (args.length > 2) args(2).toInt else 1000000

var arr1 = new Array[Int](num)
for (i <- 0 until arr1.length)
arr1(i) = i
var arr = new Array[Int](num)
for (i <- 0 until arr.length)
arr(i) = i

// var arr2 = new Array[Int](num * 2)
// for (i <- 0 until arr2.length)
// arr2(i) = i

val barr1 = spark.broadcast(arr1)
// val barr2 = spark.broadcast(arr2)
val barr = spark.broadcast(arr)
spark.parallelize(1 to 10, slices).foreach {
// i => println(barr1.value.size + barr2.value.size)
i => println(barr1.value.size)
println("in task: barr = " + barr)
i => println(barr.value.size)
}
}
}

5 changes: 0 additions & 5 deletions src/examples/SparkALS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,6 @@ object SparkALS {
var msc = spark.broadcast(ms)
var usc = spark.broadcast(us)
for (iter <- 1 to ITERATIONS) {
val start = System.nanoTime

println("Iteration " + iter + ":")
ms = spark.parallelize(0 until M, slices)
.map(i => updateMovie(i, msc.value(i), usc.value, Rc.value))
Expand All @@ -136,9 +134,6 @@ object SparkALS {
usc = spark.broadcast(us) // Re-broadcast us because it was updated
println("RMSE = " + rmse(R, ms, us))
println()

val time = (System.nanoTime - start) / 1e9
println( "This iteration took " + time + " s")
}
}
}
2 changes: 1 addition & 1 deletion src/examples/Vector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ object Vector {
implicit def doubleToMultiplier(num: Double) = new Multiplier(num)

implicit object VectorAccumParam extends spark.AccumulatorParam[Vector] {
def add(t1: Vector, t2: Vector) = t1 + t2
def addInPlace(t1: Vector, t2: Vector) = t1 + t2
def zero(initialValue: Vector) = Vector.zeros(initialValue.length)
}
}
12 changes: 7 additions & 5 deletions src/scala/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@ import java.io._

import scala.collection.mutable.Map

@serializable class Accumulator[T](initialValue: T, param: AccumulatorParam[T])
@serializable class Accumulator[T](
@transient initialValue: T, param: AccumulatorParam[T])
{
val id = Accumulators.newId
@transient var value_ = initialValue
@transient var value_ = initialValue // Current value on master
val zero = param.zero(initialValue) // Zero value to be passed to workers
var deserialized = false

Accumulators.register(this)

def += (term: T) { value_ = param.add(value_, term) }
def += (term: T) { value_ = param.addInPlace(value_, term) }
def value = this.value_
def value_= (t: T) {
if (!deserialized) value_ = t
Expand All @@ -22,7 +24,7 @@ import scala.collection.mutable.Map
// Called by Java when deserializing an object
private def readObject(in: ObjectInputStream) {
in.defaultReadObject
value_ = param.zero(initialValue)
value_ = zero
deserialized = true
Accumulators.register(this)
}
Expand All @@ -31,7 +33,7 @@ import scala.collection.mutable.Map
}

@serializable trait AccumulatorParam[T] {
def add(t1: T, t2: T): T
def addInPlace(t1: T, t2: T): T
def zero(initialValue: T): T
}

Expand Down
Loading

0 comments on commit 0d67bc1

Please sign in to comment.