Skip to content

Commit

Permalink
Further address the comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jerryshao committed Jan 22, 2015
1 parent a237c75 commit f0b0c0b
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 7 deletions.
Expand Up @@ -209,9 +209,9 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " +
timesToReschedule.mkString(", "))
timesToReschedule.foreach { time =>
// Allocate the related blocks when recovering from failure, because some added but not
// allocated block is dangled in the queue after recovering, we have to insert some block
// allocation event to group up them and get the right behavior.
// Allocate the related blocks when recovering from failure, because some blocks that were
// added but not allocated, are dangling in the queue after recovering, we have to allocate
// those blocks to the next batch, which is the batch they were supposed to go.
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
jobScheduler.submitJobSet(JobSet(time, graph.generateJobs(time)))
}
Expand Down
Expand Up @@ -67,7 +67,7 @@ private[streaming] class ReceivedBlockTracker(
extends Logging {

private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]

private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]
private val logManagerOption = createLogManager()
Expand Down Expand Up @@ -114,7 +114,7 @@ private[streaming] class ReceivedBlockTracker(
// 2. Slow checkpointing makes recovered batch time older than WAL recovered
// lastAllocatedBatchTime.
// This situation will only occurs in recovery time.
logWarning(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
}
}

Expand Down
Expand Up @@ -87,8 +87,10 @@ class ReceivedBlockTrackerSuite
receivedBlockTracker.allocateBlocksToBatch(1)
receivedBlockTracker.getBlocksOfBatchAndStream(1, streamId) shouldEqual blockInfos

blockInfos.map(receivedBlockTracker.addBlock)
receivedBlockTracker.allocateBlocksToBatch(2)
receivedBlockTracker.getBlocksOfBatchAndStream(2, streamId) shouldBe empty
receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos
}

test("block addition, block to batch allocation and cleanup with write ahead log") {
Expand Down Expand Up @@ -184,14 +186,14 @@ class ReceivedBlockTrackerSuite
tracker4.getBlocksOfBatchAndStream(batchTime1, streamId) shouldBe empty // should be cleaned
tracker4.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
}

test("enabling write ahead log but not setting checkpoint dir") {
conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
intercept[SparkException] {
createTracker(setCheckpointDir = false)
}
}

test("setting checkpoint dir but not enabling write ahead log") {
// When WAL config is not set, log manager should not be enabled
val tracker1 = createTracker(setCheckpointDir = true)
Expand Down

0 comments on commit f0b0c0b

Please sign in to comment.