Skip to content

Commit

Permalink
Revert "Added a few tests that measure the receiver’s rate."
Browse files Browse the repository at this point in the history
This reverts commit 0c51959.
  • Loading branch information
dragos committed Jul 21, 2015
1 parent 0c51959 commit 210f495
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,63 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
assert(recordedData.toSet === generatedData.toSet)
}

ignore("block generator throttling") {
val blockGeneratorListener = new FakeBlockGeneratorListener
val blockIntervalMs = 100
val maxRate = 1001
val conf = new SparkConf().set("spark.streaming.blockInterval", s"${blockIntervalMs}ms").
set("spark.streaming.receiver.maxRate", maxRate.toString)
val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf)
val expectedBlocks = 20
val waitTime = expectedBlocks * blockIntervalMs
val expectedMessages = maxRate * waitTime / 1000
val expectedMessagesPerBlock = maxRate * blockIntervalMs / 1000
val generatedData = new ArrayBuffer[Int]

// Generate blocks
val startTime = System.currentTimeMillis()
blockGenerator.start()
var count = 0
while(System.currentTimeMillis - startTime < waitTime) {
blockGenerator.addData(count)
generatedData += count
count += 1
}
blockGenerator.stop()

val recordedBlocks = blockGeneratorListener.arrayBuffers
val recordedData = recordedBlocks.flatten
assert(blockGeneratorListener.arrayBuffers.size > 0, "No blocks received")
assert(recordedData.toSet === generatedData.toSet, "Received data not same")

// recordedData size should be close to the expected rate; use an error margin proportional to
// the value, so that rate changes don't cause a brittle test
val minExpectedMessages = expectedMessages - 0.05 * expectedMessages
val maxExpectedMessages = expectedMessages + 0.05 * expectedMessages
val numMessages = recordedData.size
assert(
numMessages >= minExpectedMessages && numMessages <= maxExpectedMessages,
s"#records received = $numMessages, not between $minExpectedMessages and $maxExpectedMessages"
)

// XXX Checking every block would require an even distribution of messages across blocks,
// which throttling code does not control. Therefore, test against the average.
val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 0.05 * expectedMessagesPerBlock
val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 0.05 * expectedMessagesPerBlock
val receivedBlockSizes = recordedBlocks.map { _.size }.mkString(",")

// the first and last block may be incomplete, so we slice them out
val validBlocks = recordedBlocks.drop(1).dropRight(1)
val averageBlockSize = validBlocks.map(block => block.size).sum / validBlocks.size

assert(
averageBlockSize >= minExpectedMessagesPerBlock &&
averageBlockSize <= maxExpectedMessagesPerBlock,
s"# records in received blocks = [$receivedBlockSizes], not between " +
s"$minExpectedMessagesPerBlock and $maxExpectedMessagesPerBlock, on average"
)
}

/**
* Test whether write ahead logs are generated by received,
* and automatically cleaned up. The clean up must be aware of the
Expand Down Expand Up @@ -290,33 +347,28 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
errors += throwable
}
}
}

/**
* An implementation of BlockGeneratorListener that is used to test the BlockGenerator.
*/
class FakeBlockGeneratorListener(pushDelay: Long = 0) extends BlockGeneratorListener {
// buffer of data received as ArrayBuffers
val arrayBuffers = new ArrayBuffer[ArrayBuffer[Int]]
val errors = new ArrayBuffer[Throwable]

def onAddData(data: Any, metadata: Any) {}
/**
* An implementation of BlockGeneratorListener that is used to test the BlockGenerator.
*/
class FakeBlockGeneratorListener(pushDelay: Long = 0) extends BlockGeneratorListener {
// buffer of data received as ArrayBuffers
val arrayBuffers = new ArrayBuffer[ArrayBuffer[Int]]
val errors = new ArrayBuffer[Throwable]

def onGenerateBlock(blockId: StreamBlockId) {}
def onAddData(data: Any, metadata: Any) { }

def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
val bufferOfInts = arrayBuffer.map(_.asInstanceOf[Int])
arrayBuffers += bufferOfInts
Thread.sleep(0)
}
def onGenerateBlock(blockId: StreamBlockId) { }

def onError(message: String, throwable: Throwable) {
errors += throwable
}
def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
val bufferOfInts = arrayBuffer.map(_.asInstanceOf[Int])
arrayBuffers += bufferOfInts
Thread.sleep(0)
}

def reset(): Unit = {
arrayBuffers.clear()
errors.clear()
def onError(message: String, throwable: Throwable) {
errors += throwable
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,8 @@

package org.apache.spark.streaming.receiver

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.SparkConf
import org.apache.spark.SparkFunSuite
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.FakeBlockGeneratorListener

/** Testsuite for testing the network receiver behavior */
class RateLimiterSuite extends SparkFunSuite {
Expand All @@ -47,108 +43,4 @@ class RateLimiterSuite extends SparkFunSuite {
rateLimiter.updateRate(105)
assert(rateLimiter.getCurrentLimit === 100)
}

def setupGenerator(blockInterval: Int): (BlockGenerator, FakeBlockGeneratorListener) = {
val blockGeneratorListener = new FakeBlockGeneratorListener
val conf = new SparkConf().set("spark.streaming.blockInterval", s"${blockInterval}ms")
val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf)
(blockGenerator, blockGeneratorListener)
}

test("throttling block generator") {
val blockIntervalMs = 100
val (blockGenerator, blockGeneratorListener) = setupGenerator(blockIntervalMs)
val maxRate = 1000
blockGenerator.updateRate(maxRate)
blockGenerator.start()
throttlingTest(maxRate, blockGenerator, blockGeneratorListener, blockIntervalMs)
blockGenerator.stop()
}

test("throttling block generator changes rate up") {
val blockIntervalMs = 100
val (blockGenerator, blockGeneratorListener) = setupGenerator(blockIntervalMs)
val maxRate1 = 1000
blockGenerator.start()
blockGenerator.updateRate(maxRate1)
throttlingTest(maxRate1, blockGenerator, blockGeneratorListener, blockIntervalMs)

blockGeneratorListener.reset()
val maxRate2 = 5000
blockGenerator.updateRate(maxRate2)
throttlingTest(maxRate2, blockGenerator, blockGeneratorListener, blockIntervalMs)
blockGenerator.stop()
}

test("throttling block generator changes rate up and down") {
val blockIntervalMs = 100
val (blockGenerator, blockGeneratorListener) = setupGenerator(blockIntervalMs)
val maxRate1 = 1000
blockGenerator.updateRate(maxRate1)
blockGenerator.start()
throttlingTest(maxRate1, blockGenerator, blockGeneratorListener, blockIntervalMs)

blockGeneratorListener.reset()
val maxRate2 = 5000
blockGenerator.updateRate(maxRate2)
throttlingTest(maxRate2, blockGenerator, blockGeneratorListener, blockIntervalMs)

blockGeneratorListener.reset()
val maxRate3 = 1000
blockGenerator.updateRate(maxRate3)
throttlingTest(maxRate3, blockGenerator, blockGeneratorListener, blockIntervalMs)
blockGenerator.stop()
}

def throttlingTest(
maxRate: Long,
blockGenerator: BlockGenerator,
blockGeneratorListener: FakeBlockGeneratorListener,
blockIntervalMs: Int) {
val expectedBlocks = 20
val waitTime = expectedBlocks * blockIntervalMs
val expectedMessages = maxRate * waitTime / 1000
val expectedMessagesPerBlock = maxRate * blockIntervalMs / 1000
val generatedData = new ArrayBuffer[Int]

// Generate blocks
val startTime = System.currentTimeMillis()
var count = 0
while(System.currentTimeMillis - startTime < waitTime) {
blockGenerator.addData(count)
generatedData += count
count += 1
}

val recordedBlocks = blockGeneratorListener.arrayBuffers
val recordedData = recordedBlocks.flatten
assert(blockGeneratorListener.arrayBuffers.size > 0, "No blocks received")

// recordedData size should be close to the expected rate; use an error margin proportional to
// the value, so that rate changes don't cause a brittle test
val minExpectedMessages = expectedMessages - 0.05 * expectedMessages
val maxExpectedMessages = expectedMessages + 0.05 * expectedMessages
val numMessages = recordedData.size
assert(
numMessages >= minExpectedMessages && numMessages <= maxExpectedMessages,
s"#records received = $numMessages, not between $minExpectedMessages and $maxExpectedMessages"
)

// XXX Checking every block would require an even distribution of messages across blocks,
// which throttling code does not control. Therefore, test against the average.
val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 0.05 * expectedMessagesPerBlock
val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 0.05 * expectedMessagesPerBlock
val receivedBlockSizes = recordedBlocks.map { _.size }.mkString(",")

// the first and last block may be incomplete, so we slice them out
val validBlocks = recordedBlocks.drop(1).dropRight(1)
val averageBlockSize = validBlocks.map(block => block.size).sum / validBlocks.size

assert(
averageBlockSize >= minExpectedMessagesPerBlock &&
averageBlockSize <= maxExpectedMessagesPerBlock,
s"# records in received blocks = [$receivedBlockSizes], not between " +
s"$minExpectedMessagesPerBlock and $maxExpectedMessagesPerBlock, on average"
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver._
import org.apache.spark.util.Utils
import org.apache.spark.streaming.dstream.InputDStream
import scala.reflect.ClassTag
import org.apache.spark.streaming.dstream.ReceiverInputDStream

/** Testsuite for receiver scheduling */
Expand Down

0 comments on commit 210f495

Please sign in to comment.