From ecb4e7db817af6199642e96a9f6465b8f4b695d9 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 10 Jun 2015 23:58:40 -0500 Subject: [PATCH] debugging printlns --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 3 +++ .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 1 + 2 files changed, 4 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index ca3f04b973479..b86e6278ca87e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1046,6 +1046,7 @@ class DAGScheduler( if (runningStages.contains(shuffleStage) && shuffleStage.pendingTasks.isEmpty) { markStageAsFinished(shuffleStage) + println(s"marking $shuffleStage as finished") logInfo("looking for newly runnable stages") logInfo("running: " + runningStages) logInfo("waiting: " + waitingStages) @@ -1072,6 +1073,7 @@ class DAGScheduler( .map(_._2).mkString(", ")) submitStage(shuffleStage) } else { + println(s"looking for newly runnable stage") val newlyRunnable = new ArrayBuffer[Stage] for (shuffleStage <- waitingStages) { logInfo("Missing parents for " + shuffleStage + ": " + @@ -1081,6 +1083,7 @@ class DAGScheduler( { newlyRunnable += shuffleStage } + println(s"newly runnable stages = $newlyRunnable") waitingStages --= newlyRunnable runningStages ++= newlyRunnable for { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 090352c3d8b95..0cd07656821c0 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -88,6 +88,7 @@ class DAGSchedulerSuite // normally done by TaskSetManager taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch) taskSets += taskSet + println(s"submitting taskSet $taskSet. taskSets = $taskSets") } override def cancelTasks(stageId: Int, interruptThread: Boolean) { cancelledStages += stageId