diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala index c096a251374b6..5d7127627eea5 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala @@ -155,6 +155,63 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { assert(recordedData.toSet === generatedData.toSet) } + ignore("block generator throttling") { + val blockGeneratorListener = new FakeBlockGeneratorListener + val blockIntervalMs = 100 + val maxRate = 1001 + val conf = new SparkConf().set("spark.streaming.blockInterval", s"${blockIntervalMs}ms"). + set("spark.streaming.receiver.maxRate", maxRate.toString) + val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf) + val expectedBlocks = 20 + val waitTime = expectedBlocks * blockIntervalMs + val expectedMessages = maxRate * waitTime / 1000 + val expectedMessagesPerBlock = maxRate * blockIntervalMs / 1000 + val generatedData = new ArrayBuffer[Int] + + // Generate blocks + val startTime = System.currentTimeMillis() + blockGenerator.start() + var count = 0 + while(System.currentTimeMillis - startTime < waitTime) { + blockGenerator.addData(count) + generatedData += count + count += 1 + } + blockGenerator.stop() + + val recordedBlocks = blockGeneratorListener.arrayBuffers + val recordedData = recordedBlocks.flatten + assert(blockGeneratorListener.arrayBuffers.size > 0, "No blocks received") + assert(recordedData.toSet === generatedData.toSet, "Received data not same") + + // recordedData size should be close to the expected rate; use an error margin proportional to + // the value, so that rate changes don't cause a brittle test + val minExpectedMessages = expectedMessages - 0.05 * expectedMessages + val maxExpectedMessages = expectedMessages + 0.05 * expectedMessages + val numMessages = recordedData.size + assert( + numMessages >= minExpectedMessages && numMessages <= maxExpectedMessages, + s"#records received = $numMessages, not between $minExpectedMessages and $maxExpectedMessages" + ) + + // XXX Checking every block would require an even distribution of messages across blocks, + // which throttling code does not control. Therefore, test against the average. + val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 0.05 * expectedMessagesPerBlock + val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 0.05 * expectedMessagesPerBlock + val receivedBlockSizes = recordedBlocks.map { _.size }.mkString(",") + + // the first and last block may be incomplete, so we slice them out + val validBlocks = recordedBlocks.drop(1).dropRight(1) + val averageBlockSize = validBlocks.map(block => block.size).sum / validBlocks.size + + assert( + averageBlockSize >= minExpectedMessagesPerBlock && + averageBlockSize <= maxExpectedMessagesPerBlock, + s"# records in received blocks = [$receivedBlockSizes], not between " + + s"$minExpectedMessagesPerBlock and $maxExpectedMessagesPerBlock, on average" + ) + } + /** * Test whether write ahead logs are generated by received, * and automatically cleaned up. The clean up must be aware of the @@ -290,33 +347,28 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { errors += throwable } } -} -/** - * An implementation of BlockGeneratorListener that is used to test the BlockGenerator. - */ -class FakeBlockGeneratorListener(pushDelay: Long = 0) extends BlockGeneratorListener { - // buffer of data received as ArrayBuffers - val arrayBuffers = new ArrayBuffer[ArrayBuffer[Int]] - val errors = new ArrayBuffer[Throwable] - - def onAddData(data: Any, metadata: Any) {} + /** + * An implementation of BlockGeneratorListener that is used to test the BlockGenerator. + */ + class FakeBlockGeneratorListener(pushDelay: Long = 0) extends BlockGeneratorListener { + // buffer of data received as ArrayBuffers + val arrayBuffers = new ArrayBuffer[ArrayBuffer[Int]] + val errors = new ArrayBuffer[Throwable] - def onGenerateBlock(blockId: StreamBlockId) {} + def onAddData(data: Any, metadata: Any) { } - def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) { - val bufferOfInts = arrayBuffer.map(_.asInstanceOf[Int]) - arrayBuffers += bufferOfInts - Thread.sleep(0) - } + def onGenerateBlock(blockId: StreamBlockId) { } - def onError(message: String, throwable: Throwable) { - errors += throwable - } + def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) { + val bufferOfInts = arrayBuffer.map(_.asInstanceOf[Int]) + arrayBuffers += bufferOfInts + Thread.sleep(0) + } - def reset(): Unit = { - arrayBuffers.clear() - errors.clear() + def onError(message: String, throwable: Throwable) { + errors += throwable + } } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala index e58baed5f205a..c6330eb3673fb 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/receiver/RateLimiterSuite.scala @@ -17,12 +17,8 @@ package org.apache.spark.streaming.receiver -import scala.collection.mutable.ArrayBuffer - import org.apache.spark.SparkConf import org.apache.spark.SparkFunSuite -import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.FakeBlockGeneratorListener /** Testsuite for testing the network receiver behavior */ class RateLimiterSuite extends SparkFunSuite { @@ -47,108 +43,4 @@ class RateLimiterSuite extends SparkFunSuite { rateLimiter.updateRate(105) assert(rateLimiter.getCurrentLimit === 100) } - - def setupGenerator(blockInterval: Int): (BlockGenerator, FakeBlockGeneratorListener) = { - val blockGeneratorListener = new FakeBlockGeneratorListener - val conf = new SparkConf().set("spark.streaming.blockInterval", s"${blockInterval}ms") - val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf) - (blockGenerator, blockGeneratorListener) - } - - test("throttling block generator") { - val blockIntervalMs = 100 - val (blockGenerator, blockGeneratorListener) = setupGenerator(blockIntervalMs) - val maxRate = 1000 - blockGenerator.updateRate(maxRate) - blockGenerator.start() - throttlingTest(maxRate, blockGenerator, blockGeneratorListener, blockIntervalMs) - blockGenerator.stop() - } - - test("throttling block generator changes rate up") { - val blockIntervalMs = 100 - val (blockGenerator, blockGeneratorListener) = setupGenerator(blockIntervalMs) - val maxRate1 = 1000 - blockGenerator.start() - blockGenerator.updateRate(maxRate1) - throttlingTest(maxRate1, blockGenerator, blockGeneratorListener, blockIntervalMs) - - blockGeneratorListener.reset() - val maxRate2 = 5000 - blockGenerator.updateRate(maxRate2) - throttlingTest(maxRate2, blockGenerator, blockGeneratorListener, blockIntervalMs) - blockGenerator.stop() - } - - test("throttling block generator changes rate up and down") { - val blockIntervalMs = 100 - val (blockGenerator, blockGeneratorListener) = setupGenerator(blockIntervalMs) - val maxRate1 = 1000 - blockGenerator.updateRate(maxRate1) - blockGenerator.start() - throttlingTest(maxRate1, blockGenerator, blockGeneratorListener, blockIntervalMs) - - blockGeneratorListener.reset() - val maxRate2 = 5000 - blockGenerator.updateRate(maxRate2) - throttlingTest(maxRate2, blockGenerator, blockGeneratorListener, blockIntervalMs) - - blockGeneratorListener.reset() - val maxRate3 = 1000 - blockGenerator.updateRate(maxRate3) - throttlingTest(maxRate3, blockGenerator, blockGeneratorListener, blockIntervalMs) - blockGenerator.stop() - } - - def throttlingTest( - maxRate: Long, - blockGenerator: BlockGenerator, - blockGeneratorListener: FakeBlockGeneratorListener, - blockIntervalMs: Int) { - val expectedBlocks = 20 - val waitTime = expectedBlocks * blockIntervalMs - val expectedMessages = maxRate * waitTime / 1000 - val expectedMessagesPerBlock = maxRate * blockIntervalMs / 1000 - val generatedData = new ArrayBuffer[Int] - - // Generate blocks - val startTime = System.currentTimeMillis() - var count = 0 - while(System.currentTimeMillis - startTime < waitTime) { - blockGenerator.addData(count) - generatedData += count - count += 1 - } - - val recordedBlocks = blockGeneratorListener.arrayBuffers - val recordedData = recordedBlocks.flatten - assert(blockGeneratorListener.arrayBuffers.size > 0, "No blocks received") - - // recordedData size should be close to the expected rate; use an error margin proportional to - // the value, so that rate changes don't cause a brittle test - val minExpectedMessages = expectedMessages - 0.05 * expectedMessages - val maxExpectedMessages = expectedMessages + 0.05 * expectedMessages - val numMessages = recordedData.size - assert( - numMessages >= minExpectedMessages && numMessages <= maxExpectedMessages, - s"#records received = $numMessages, not between $minExpectedMessages and $maxExpectedMessages" - ) - - // XXX Checking every block would require an even distribution of messages across blocks, - // which throttling code does not control. Therefore, test against the average. - val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 0.05 * expectedMessagesPerBlock - val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 0.05 * expectedMessagesPerBlock - val receivedBlockSizes = recordedBlocks.map { _.size }.mkString(",") - - // the first and last block may be incomplete, so we slice them out - val validBlocks = recordedBlocks.drop(1).dropRight(1) - val averageBlockSize = validBlocks.map(block => block.size).sum / validBlocks.size - - assert( - averageBlockSize >= minExpectedMessagesPerBlock && - averageBlockSize <= maxExpectedMessagesPerBlock, - s"# records in received blocks = [$receivedBlockSizes], not between " + - s"$minExpectedMessagesPerBlock and $maxExpectedMessagesPerBlock, on average" - ) - } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala index 46d7bc479b5ff..41d92fb5db32f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receiver._ import org.apache.spark.util.Utils import org.apache.spark.streaming.dstream.InputDStream +import scala.reflect.ClassTag import org.apache.spark.streaming.dstream.ReceiverInputDStream /** Testsuite for receiver scheduling */