Skip to content

Commit

Permalink
[EXT] Fix repeated batch after stop gracefully
Browse files Browse the repository at this point in the history
Fix repeated batch after stop gracefully
  • Loading branch information
zzcclp committed Aug 31, 2019
1 parent 227f29e commit 32e5be4
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import org.apache.spark.streaming.scheduler.JobGenerator
import org.apache.spark.util.Utils

private[streaming]
class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
class Checkpoint(ssc: StreamingContext, val checkpointTime: Time,
val afterBatchCompletion: Boolean = false)
extends Logging with Serializable {
val master = ssc.sc.master
val framework = ssc.sc.appName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,13 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {

// Batches when the master was down, that is,
// between the checkpoint and current restart time
val checkpointTime = ssc.initialCheckpoint.checkpointTime
val checkpointTime = if (ssc.initialCheckpoint.afterBatchCompletion) {
// if checkpoint after batch completion, start from next batch
ssc.initialCheckpoint.checkpointTime + batchDuration
} else {
ssc.initialCheckpoint.checkpointTime
}

val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds))
val downTimes = checkpointTime.until(restartTime, batchDuration)
logInfo("Batches during down time (" + downTimes.size + " batches): "
Expand Down Expand Up @@ -295,6 +301,8 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
logInfo("Checkpointing graph for time " + time)
ssc.graph.updateCheckpointData(time)
checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)
val afterBatchCompletion = clearCheckpointDataLater
checkpointWriter.write(new Checkpoint(ssc, time, afterBatchCompletion), clearCheckpointDataLater)
} else if (clearCheckpointDataLater) {
markBatchFullyProcessed(time)
}
Expand Down

0 comments on commit 32e5be4

Please sign in to comment.