Skip to content

Commit

Permalink
Correctly replay the WAL log when recovering from failure
Browse files Browse the repository at this point in the history
  • Loading branch information
jerryshao committed Jan 22, 2015
1 parent 3027f06 commit 558bdc3
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 7 deletions.
Expand Up @@ -76,7 +76,7 @@ object KafkaWordCountProducer {

val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args

// Zookeper connection properties
// Zookeeper connection properties
val props = new Properties()
props.put("metadata.broker.list", brokers)
props.put("serializer.class", "kafka.serializer.StringEncoder")
Expand Down
Expand Up @@ -17,12 +17,14 @@

package org.apache.spark.streaming.scheduler

import akka.actor.{ActorRef, ActorSystem, Props, Actor}
import org.apache.spark.{SparkException, SparkEnv, Logging}
import org.apache.spark.streaming.{Checkpoint, Time, CheckpointWriter}
import org.apache.spark.streaming.util.{ManualClock, RecurringTimer, Clock}
import scala.util.{Failure, Success, Try}

import akka.actor.{ActorRef, Props, Actor}

import org.apache.spark.{SparkEnv, Logging}
import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, Time}
import org.apache.spark.streaming.util.{Clock, ManualClock, RecurringTimer}

/** Event classes for JobGenerator */
private[scheduler] sealed trait JobGeneratorEvent
private[scheduler] case class GenerateJobs(time: Time) extends JobGeneratorEvent
Expand Down Expand Up @@ -206,9 +208,13 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering)
logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " +
timesToReschedule.mkString(", "))
timesToReschedule.foreach(time =>
timesToReschedule.foreach { time =>
// Allocate the related blocks when recovering from failure, because some added but not
// allocated block is dangled in the queue after recovering, we have to insert some block
// allocation event to group up them and get the right behavior.
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
jobScheduler.submitJobSet(JobSet(time, graph.generateJobs(time)))
)
}

// Restart the timer
timer.start(restartTime.milliseconds)
Expand Down
Expand Up @@ -106,6 +106,12 @@ private[streaming] class ReceivedBlockTracker(
timeToAllocatedBlocks(batchTime) = allocatedBlocks
lastAllocatedBatchTime = batchTime
allocatedBlocks
} else if (batchTime == lastAllocatedBatchTime) {
// This situation occurs when WAL is ended with BatchAllocationEvent,
// but without BatchCleanupEvent, possibly processed batch job or half-processed batch
// job need to process again, so the batchTime will be equal to lastAllocatedBatchTime.
// This situation will only occurs in recovery time.
logWarning(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
} else {
throw new SparkException(s"Unexpected allocation of blocks, " +
s"last batch = $lastAllocatedBatchTime, batch time to allocate = $batchTime ")
Expand Down

0 comments on commit 558bdc3

Please sign in to comment.