Skip to content

Commit

Permalink
Merge branch 'master' into hive
Browse files Browse the repository at this point in the history
Conflicts:
	project/build/SparkProject.scala
  • Loading branch information
mateiz committed Jun 21, 2011
2 parents 3391fb9 + 2142500 commit 83a14f8
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 12 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/spark/Executor.scala
Expand Up @@ -69,9 +69,9 @@ class Executor extends mesos.Executor with Logging {
d.sendStatusUpdate(new TaskStatus(
taskId, TaskState.TASK_FAILED, Utils.serialize(reason)))
}
case e: Exception => {
case t: Throwable => {
// TODO: Handle errors in tasks less dramatically
logError("Exception in task ID " + taskId, e)
logError("Exception in task ID " + taskId, t)
System.exit(1)
}
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/spark/LocalScheduler.scala
Expand Up @@ -37,9 +37,9 @@ private class LocalScheduler(threads: Int) extends DAGScheduler with Logging {
logInfo("Finished task " + i)
taskEnded(tasks(i), Success, result, accumUpdates)
} catch {
case e: Exception => {
case t: Throwable => {
// TODO: Do something nicer here
logError("Exception in task " + i, e)
logError("Exception in task " + i, t)
System.exit(1)
null
}
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/spark/Logging.scala
Expand Up @@ -46,4 +46,8 @@ trait Logging {

def logError(msg: => String, throwable: Throwable) =
if (log.isErrorEnabled) log.error(msg, throwable)

// Method for ensuring that logging is initialized, to avoid having multiple
// threads do it concurrently (as SLF4J initialization is not thread safe).
def initLogging() { log }
}
61 changes: 61 additions & 0 deletions core/src/main/scala/spark/PipedRDD.scala
@@ -0,0 +1,61 @@
package spark

import java.io.PrintWriter
import java.util.StringTokenizer

import scala.collection.mutable.ArrayBuffer
import scala.io.Source


/**
* An RDD that pipes the contents of each parent partition through an external command
* (printing them one per line) and returns the output as a collection of strings.
*/
class PipedRDD[T: ClassManifest](parent: RDD[T], command: Seq[String])
extends RDD[String](parent.context) {
// Similar to Runtime.exec(), if we are given a single string, split it into words
// using a standard StringTokenizer (i.e. by spaces)
def this(parent: RDD[T], command: String) = this(parent, PipedRDD.tokenize(command))

override def splits = parent.splits

override val dependencies = List(new OneToOneDependency(parent))

override def compute(split: Split): Iterator[String] = {
val proc = Runtime.getRuntime.exec(command.toArray)
val env = SparkEnv.get

// Start a thread to print the process's stderr to ours
new Thread("stderr reader for " + command) {
override def run() {
for(line <- Source.fromInputStream(proc.getErrorStream).getLines)
System.err.println(line)
}
}.start()

// Start a thread to feed the process input from our parent's iterator
new Thread("stdin writer for " + command) {
override def run() {
SparkEnv.set(env)
val out = new PrintWriter(proc.getOutputStream)
for(elem <- parent.iterator(split))
out.println(elem)
out.close()
}
}.start()

// Return an iterator that read lines from the process's stdout
Source.fromInputStream(proc.getInputStream).getLines
}
}

object PipedRDD {
// Split a string into words using a standard StringTokenizer
def tokenize(command: String): Seq[String] = {
val buf = new ArrayBuffer[String]
val tok = new StringTokenizer(command)
while(tok.hasMoreElements)
buf += tok.nextToken()
buf
}
}
23 changes: 23 additions & 0 deletions core/src/main/scala/spark/RDD.scala
Expand Up @@ -78,6 +78,12 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
def groupBy[K](func: T => K): RDD[(K, Seq[T])] =
groupBy[K](func, sc.numCores)

def pipe(command: String): RDD[String] =
new PipedRDD(this, command)

def pipe(command: Seq[String]): RDD[String] =
new PipedRDD(this, command)

// Parallel operations

def foreach(f: T => Unit) {
Expand Down Expand Up @@ -295,6 +301,23 @@ extends RDD[Array[T]](prev.context) {
(k, (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]]))
}
}

def lookup(key: K): Seq[V] = {
self.partitioner match {
case Some(p) =>
val index = p.getPartition(key)
def process(it: Iterator[(K, V)]): Seq[V] = {
val buf = new ArrayBuffer[V]
for ((k, v) <- it if k == key)
buf += v
buf
}
val res = self.context.runJob(self, process, Array(index))
res(0)
case None =>
throw new UnsupportedOperationException("lookup() called on an RDD without a partitioner")
}
}
}

class MappedValuesRDD[K, V, U](
Expand Down
14 changes: 12 additions & 2 deletions core/src/main/scala/spark/SparkContext.scala
Expand Up @@ -15,6 +15,9 @@ class SparkContext(
val sparkHome: String = null,
val jars: Seq[String] = Nil)
extends Logging {
// Ensure logging is initialized before we spawn any threads
initLogging()

// Set Spark master host and port system properties
if (System.getProperty("spark.master.host") == null)
System.setProperty("spark.master.host", Utils.localHostName)
Expand Down Expand Up @@ -145,7 +148,11 @@ extends Logging {
None
}

private[spark] def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int])
/**
* Run a function on a given set of partitions in an RDD and return the results.
* This is the main entry point to the scheduler, by which all actions get launched.
*/
def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int])
(implicit m: ClassManifest[U])
: Array[U] = {
logInfo("Starting job...")
Expand All @@ -155,7 +162,10 @@ extends Logging {
result
}

private[spark] def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U)
/**
* Run a job on all partitions in an RDD and return the results in an array.
*/
def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U)
(implicit m: ClassManifest[U])
: Array[U] = {
runJob(rdd, func, 0 until rdd.splits.size)
Expand Down
7 changes: 3 additions & 4 deletions project/build/SparkProject.scala
Expand Up @@ -51,10 +51,11 @@ class SparkProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje
val colt = "colt" % "colt" % "1.2.0"
}

class HiveProject(info: ProjectInfo)
extends DefaultProject(info) with BaseProject with DepJar
class HiveProject(info: ProjectInfo) extends DefaultProject(info) with BaseProject with DepJar

class BagelProject(info: ProjectInfo) extends DefaultProject(info) with BaseProject with DepJar with XmlTestReport

override def managedStyle = ManagedStyle.Maven
}


Expand Down Expand Up @@ -107,6 +108,4 @@ trait DepJar extends AssemblyBuilder {
depJarOutputPath,
packageOptions)
}.dependsOn(compile).describedAs("Bundle project's dependencies into a JAR.")

override def managedStyle = ManagedStyle.Maven
}
2 changes: 1 addition & 1 deletion run
Expand Up @@ -80,4 +80,4 @@ else
SCALA=scala
fi

exec $SCALA -cp $CLASSPATH $@
exec $SCALA -cp $CLASSPATH "$@"
2 changes: 1 addition & 1 deletion sbt/sbt
Expand Up @@ -4,4 +4,4 @@ if [ "$MESOS_HOME" != "" ]; then
EXTRA_ARGS="-Djava.library.path=$MESOS_HOME/lib/java"
fi
export SPARK_HOME=$(cd "$(dirname $0)/.."; pwd)
java -Xmx700M $EXTRA_ARGS -jar $SPARK_HOME/sbt/sbt-launch-*.jar "$@"
java -Xmx800M -XX:MaxPermSize=150m $EXTRA_ARGS -jar $SPARK_HOME/sbt/sbt-launch-*.jar "$@"

0 comments on commit 83a14f8

Please sign in to comment.