Permalink
Browse files

Merge branch 'master' into hive

Conflicts:
	project/build/SparkProject.scala
  • Loading branch information...
2 parents 3391fb9 + 2142500 commit 83a14f834061d4b08483fe4e5ce2620f654e308a @mateiz mateiz committed Jun 21, 2011
@@ -69,9 +69,9 @@ class Executor extends mesos.Executor with Logging {
d.sendStatusUpdate(new TaskStatus(
taskId, TaskState.TASK_FAILED, Utils.serialize(reason)))
}
- case e: Exception => {
+ case t: Throwable => {
// TODO: Handle errors in tasks less dramatically
- logError("Exception in task ID " + taskId, e)
+ logError("Exception in task ID " + taskId, t)
System.exit(1)
}
}
@@ -37,9 +37,9 @@ private class LocalScheduler(threads: Int) extends DAGScheduler with Logging {
logInfo("Finished task " + i)
taskEnded(tasks(i), Success, result, accumUpdates)
} catch {
- case e: Exception => {
+ case t: Throwable => {
// TODO: Do something nicer here
- logError("Exception in task " + i, e)
+ logError("Exception in task " + i, t)
System.exit(1)
null
}
@@ -46,4 +46,8 @@ trait Logging {
def logError(msg: => String, throwable: Throwable) =
if (log.isErrorEnabled) log.error(msg, throwable)
+
+ // Method for ensuring that logging is initialized, to avoid having multiple
+ // threads do it concurrently (as SLF4J initialization is not thread safe).
+ def initLogging() { log }
}
@@ -0,0 +1,61 @@
+package spark
+
+import java.io.PrintWriter
+import java.util.StringTokenizer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.io.Source
+
+
+/**
+ * An RDD that pipes the contents of each parent partition through an external command
+ * (printing them one per line) and returns the output as a collection of strings.
+ */
+class PipedRDD[T: ClassManifest](parent: RDD[T], command: Seq[String])
+extends RDD[String](parent.context) {
+ // Similar to Runtime.exec(), if we are given a single string, split it into words
+ // using a standard StringTokenizer (i.e. by spaces)
+ def this(parent: RDD[T], command: String) = this(parent, PipedRDD.tokenize(command))
+
+ override def splits = parent.splits
+
+ override val dependencies = List(new OneToOneDependency(parent))
+
+ override def compute(split: Split): Iterator[String] = {
+ val proc = Runtime.getRuntime.exec(command.toArray)
+ val env = SparkEnv.get
+
+ // Start a thread to print the process's stderr to ours
+ new Thread("stderr reader for " + command) {
+ override def run() {
+ for(line <- Source.fromInputStream(proc.getErrorStream).getLines)
+ System.err.println(line)
+ }
+ }.start()
+
+ // Start a thread to feed the process input from our parent's iterator
+ new Thread("stdin writer for " + command) {
+ override def run() {
+ SparkEnv.set(env)
+ val out = new PrintWriter(proc.getOutputStream)
+ for(elem <- parent.iterator(split))
+ out.println(elem)
+ out.close()
+ }
+ }.start()
+
+ // Return an iterator that read lines from the process's stdout
+ Source.fromInputStream(proc.getInputStream).getLines
+ }
+}
+
+object PipedRDD {
+ // Split a string into words using a standard StringTokenizer
+ def tokenize(command: String): Seq[String] = {
+ val buf = new ArrayBuffer[String]
+ val tok = new StringTokenizer(command)
+ while(tok.hasMoreElements)
+ buf += tok.nextToken()
+ buf
+ }
+}
@@ -78,6 +78,12 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
def groupBy[K](func: T => K): RDD[(K, Seq[T])] =
groupBy[K](func, sc.numCores)
+ def pipe(command: String): RDD[String] =
+ new PipedRDD(this, command)
+
+ def pipe(command: Seq[String]): RDD[String] =
+ new PipedRDD(this, command)
+
// Parallel operations
def foreach(f: T => Unit) {
@@ -295,6 +301,23 @@ extends RDD[Array[T]](prev.context) {
(k, (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]]))
}
}
+
+ def lookup(key: K): Seq[V] = {
+ self.partitioner match {
+ case Some(p) =>
+ val index = p.getPartition(key)
+ def process(it: Iterator[(K, V)]): Seq[V] = {
+ val buf = new ArrayBuffer[V]
+ for ((k, v) <- it if k == key)
+ buf += v
+ buf
+ }
+ val res = self.context.runJob(self, process, Array(index))
+ res(0)
+ case None =>
+ throw new UnsupportedOperationException("lookup() called on an RDD without a partitioner")
+ }
+ }
}
class MappedValuesRDD[K, V, U](
@@ -15,6 +15,9 @@ class SparkContext(
val sparkHome: String = null,
val jars: Seq[String] = Nil)
extends Logging {
+ // Ensure logging is initialized before we spawn any threads
+ initLogging()
+
// Set Spark master host and port system properties
if (System.getProperty("spark.master.host") == null)
System.setProperty("spark.master.host", Utils.localHostName)
@@ -145,7 +148,11 @@ extends Logging {
None
}
- private[spark] def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int])
+ /**
+ * Run a function on a given set of partitions in an RDD and return the results.
+ * This is the main entry point to the scheduler, by which all actions get launched.
+ */
+ def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int])
(implicit m: ClassManifest[U])
: Array[U] = {
logInfo("Starting job...")
@@ -155,7 +162,10 @@ extends Logging {
result
}
- private[spark] def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U)
+ /**
+ * Run a job on all partitions in an RDD and return the results in an array.
+ */
+ def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U)
(implicit m: ClassManifest[U])
: Array[U] = {
runJob(rdd, func, 0 until rdd.splits.size)
@@ -51,10 +51,11 @@ class SparkProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje
val colt = "colt" % "colt" % "1.2.0"
}
- class HiveProject(info: ProjectInfo)
- extends DefaultProject(info) with BaseProject with DepJar
+ class HiveProject(info: ProjectInfo) extends DefaultProject(info) with BaseProject with DepJar
class BagelProject(info: ProjectInfo) extends DefaultProject(info) with BaseProject with DepJar with XmlTestReport
+
+ override def managedStyle = ManagedStyle.Maven
}
@@ -107,6 +108,4 @@ trait DepJar extends AssemblyBuilder {
depJarOutputPath,
packageOptions)
}.dependsOn(compile).describedAs("Bundle project's dependencies into a JAR.")
-
- override def managedStyle = ManagedStyle.Maven
}
View
2 run
@@ -80,4 +80,4 @@ else
SCALA=scala
fi
-exec $SCALA -cp $CLASSPATH $@
+exec $SCALA -cp $CLASSPATH "$@"
View
@@ -4,4 +4,4 @@ if [ "$MESOS_HOME" != "" ]; then
EXTRA_ARGS="-Djava.library.path=$MESOS_HOME/lib/java"
fi
export SPARK_HOME=$(cd "$(dirname $0)/.."; pwd)
-java -Xmx700M $EXTRA_ARGS -jar $SPARK_HOME/sbt/sbt-launch-*.jar "$@"
+java -Xmx800M -XX:MaxPermSize=150m $EXTRA_ARGS -jar $SPARK_HOME/sbt/sbt-launch-*.jar "$@"

0 comments on commit 83a14f8

Please sign in to comment.