Permalink
Browse files

Tweaks to debug output

  • Loading branch information...
1 parent 3ffbf87 commit 5e2cac8cb0ac4c998e6c94b4d9f634884f18ee0a @mateiz mateiz committed Feb 6, 2012
@@ -1,6 +1,6 @@
package spark
-import java.io.{File, FileOutputStream}
+import java.io.{File, FileOutputStream, BufferedOutputStream, ObjectOutputStream}
import java.net.{URI, URL, URLClassLoader}
import java.util.concurrent._
@@ -72,8 +72,11 @@ class Executor extends org.apache.mesos.Executor with Logging {
val accumUpdates = Accumulators.values
val result = new TaskResult(value, accumUpdates)
+ // TODO: This is currently a hack to transfer the result to the master through HTTP:
+ // we put it in a shuffle directory for shuffle ID "0" and pass back an URL. It will
+ // be replaced when we have a unified block server.
val file = LocalFileShuffle.getOutputFile(0, desc.getTaskId.getValue.toInt, 0)
- val out = new java.io.ObjectOutputStream(new java.io.BufferedOutputStream(new java.io.FileOutputStream(file)))
+ val out = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(file)))
out.writeObject(result)
out.close()
val url = LocalFileShuffle.getServerUri + "/shuffle/0/" + desc.getTaskId.getValue.toInt + "/0"
@@ -31,7 +31,7 @@ private class LocalScheduler(threads: Int, maxFailures: Int) extends DAGSchedule
}
def runTask(task: Task[_], idInJob: Int, attemptId: Int) {
- logInfo("Running task " + idInJob)
+ logInfo("Running task " + idInJob + " attempt " + attemptId)
// Set the Spark execution environment for the worker thread
SparkEnv.set(env)
try {
@@ -45,7 +45,7 @@ private class LocalScheduler(threads: Int, maxFailures: Int) extends DAGSchedule
bytes, Thread.currentThread.getContextClassLoader)
val result: Any = deserializedTask.run(attemptId)
val accumUpdates = Accumulators.values
- logInfo("Finished task " + idInJob)
+ logInfo("Finished task " + idInJob + " attempt " + attemptId)
taskEnded(task, Success, result, accumUpdates)
} catch {
case t: Throwable => {
@@ -154,7 +154,6 @@ extends MScheduler with DAGScheduler with Logging
def jobFinished(job: Job) {
this.synchronized {
- logInfo("Job finished: " + job.getId)
activeJobs -= job.getId
activeJobsQueue.dequeueAll(x => (x == job))
taskIdToJobId --= jobTasks(job.getId)
@@ -223,7 +223,7 @@ extends Job(jobId) with Logging
// Mark finished and stop if we've finished all the tasks
finished(index) = true
if (tasksFinished == numTasks) {
- logInfo("Finishing jobs because all tasks are done")
+ logDebug("Finishing job because all tasks are done")
sched.jobFinished(SimpleJob.this)
}
}
@@ -253,7 +253,7 @@ extends Job(jobId) with Logging
finished(index) = true
tasksFinished += 1
if (tasksFinished == numTasks) {
- logInfo("Finishing jobs because of fetch failure")
+ logDebug("Finishing job because of fetch failure")
sched.jobFinished(this)
}
return

0 comments on commit 5e2cac8

Please sign in to comment.