Skip to content

Commit

Permalink
[SPARK-8975][Streaming] Add a mechanism to send a new rate from the d…
Browse files Browse the repository at this point in the history
…river to the block generator
  • Loading branch information
huitseeker committed Jul 15, 2015
1 parent 6b89943 commit 4721c7d
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.streaming.receiver

import java.util.concurrent.atomic.AtomicInteger

import com.google.common.util.concurrent.{RateLimiter => GuavaRateLimiter}

import org.apache.spark.{Logging, SparkConf}
Expand All @@ -34,12 +36,28 @@ import org.apache.spark.{Logging, SparkConf}
*/
private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {

private val desiredRate = conf.getInt("spark.streaming.receiver.maxRate", 0)
private lazy val rateLimiter = GuavaRateLimiter.create(desiredRate)
// treated as an upper limit
private val maxRateLimit = conf.getInt("spark.streaming.receiver.maxRate", 0)
private[receiver] var currentRateLimit = new AtomicInteger(maxRateLimit)
private lazy val rateLimiter = GuavaRateLimiter.create(currentRateLimit.get())

def waitToPush() {
if (desiredRate > 0) {
if (currentRateLimit.get() > 0) {
rateLimiter.acquire()
}
}

private[receiver] def updateRate(newRate: Int): Unit =
if (newRate > 0) {
try {
if (maxRateLimit > 0) {
currentRateLimit.set(newRate.min(maxRateLimit))
}
else {
currentRateLimit.set(newRate)
}
} finally {
rateLimiter.setRate(currentRateLimit.get())
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ import org.apache.spark.streaming.Time
private[streaming] sealed trait ReceiverMessage extends Serializable
private[streaming] object StopReceiver extends ReceiverMessage
private[streaming] case class CleanupOldBlocks(threshTime: Time) extends ReceiverMessage

private[streaming] case class UpdateRateLimit(elementsPerSecond: Long)
extends ReceiverMessage
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ private[streaming] class ReceiverSupervisorImpl(
case CleanupOldBlocks(threshTime) =>
logDebug("Received delete old batch signal")
cleanupOldBlocks(threshTime)
case UpdateRateLimit(eps) =>
blockGenerator.updateRate(eps.toInt)
}
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.{Logging, SparkEnv, SparkException}
import org.apache.spark.rpc._
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.receiver.{CleanupOldBlocks, Receiver, ReceiverSupervisorImpl,
StopReceiver}
StopReceiver, UpdateRateLimit}
import org.apache.spark.util.SerializableConfiguration

/**
Expand Down Expand Up @@ -180,6 +180,12 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
logError(s"Deregistered receiver for stream $streamId: $messageWithError")
}

/** Update a receiver's maximum rate from an estimator's update */
def sendRateUpdate(streamUID: Int, newRate: Long): Unit = {
for (info <- receiverInfo.get(streamUID); eP <- Option(info.endpoint))
eP.send(UpdateRateLimit(newRate))
}

/** Add new blocks for the given stream */
private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
receivedBlockTracker.addBlock(receivedBlockInfo)
Expand Down

0 comments on commit 4721c7d

Please sign in to comment.