Skip to content

Commit

Permalink
Add a test to verify no memory leak
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Apr 30, 2015
1 parent 4a8f886 commit fd03ad0
Showing 1 changed file with 47 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -208,4 +208,51 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
(listener.waitingBatches.size + listener.runningBatches.size +
listener.retainedCompletedBatches.size + 10)
}

test("detect memory leak") {
val ssc = setupStreams(input, operation)
val listener = new StreamingJobProgressListener(ssc)

val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 1000)

for (_ <- 0 until 2 * limit) {
val receivedBlockInfo = Map(
0 -> Array(ReceivedBlockInfo(0, 100, null), ReceivedBlockInfo(0, 200, null)),
1 -> Array(ReceivedBlockInfo(1, 300, null))
)

// onBatchSubmitted
val batchInfoSubmitted = BatchInfo(Time(1000), receivedBlockInfo, 1000, None, None)
listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))

// onBatchStarted
val batchInfoStarted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None)
listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted))

// onJobStart
val jobStart1 = createJobStart(Time(1000), outputOpId = 0, jobId = 0)
listener.onJobStart(jobStart1)

val jobStart2 = createJobStart(Time(1000), outputOpId = 0, jobId = 1)
listener.onJobStart(jobStart2)

val jobStart3 = createJobStart(Time(1000), outputOpId = 1, jobId = 0)
listener.onJobStart(jobStart3)

val jobStart4 = createJobStart(Time(1000), outputOpId = 1, jobId = 1)
listener.onJobStart(jobStart4)

// onBatchCompleted
val batchInfoCompleted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None)
listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
}

listener.waitingBatches.size should be (0)
listener.runningBatches.size should be (0)
listener.retainedCompletedBatches.size should be (limit)
listener.batchTimeToOutputOpIdSparkJobIdPair.size() should be <=
(listener.waitingBatches.size + listener.runningBatches.size +
listener.retainedCompletedBatches.size + 10)
}

}

0 comments on commit fd03ad0

Please sign in to comment.