From 4721c7d9394b7917b2be8fb7df5e4eb1c31d68df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Garillot?= Date: Mon, 13 Jul 2015 13:26:57 +0200 Subject: [PATCH] [SPARK-8975][Streaming] Add a mechanism to send a new rate from the driver to the block generator --- .../streaming/receiver/RateLimiter.scala | 24 ++++++++++++++++--- .../streaming/receiver/ReceiverMessage.scala | 3 ++- .../receiver/ReceiverSupervisorImpl.scala | 2 ++ .../streaming/scheduler/ReceiverTracker.scala | 8 ++++++- 4 files changed, 32 insertions(+), 5 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala index 8df542b367d27..356ae340387df 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala @@ -17,6 +17,8 @@ package org.apache.spark.streaming.receiver +import java.util.concurrent.atomic.AtomicInteger + import com.google.common.util.concurrent.{RateLimiter => GuavaRateLimiter} import org.apache.spark.{Logging, SparkConf} @@ -34,12 +36,28 @@ import org.apache.spark.{Logging, SparkConf} */ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging { - private val desiredRate = conf.getInt("spark.streaming.receiver.maxRate", 0) - private lazy val rateLimiter = GuavaRateLimiter.create(desiredRate) + // treated as an upper limit + private val maxRateLimit = conf.getInt("spark.streaming.receiver.maxRate", 0) + private[receiver] var currentRateLimit = new AtomicInteger(maxRateLimit) + private lazy val rateLimiter = GuavaRateLimiter.create(currentRateLimit.get()) def waitToPush() { - if (desiredRate > 0) { + if (currentRateLimit.get() > 0) { rateLimiter.acquire() } } + + private[receiver] def updateRate(newRate: Int): Unit = + if (newRate > 0) { + try { + if (maxRateLimit > 0) { + currentRateLimit.set(newRate.min(maxRateLimit)) + } + else { + currentRateLimit.set(newRate) + } + } finally { + rateLimiter.setRate(currentRateLimit.get()) + } + } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala index 7bf3c33319491..1eb55affaa9d0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala @@ -23,4 +23,5 @@ import org.apache.spark.streaming.Time private[streaming] sealed trait ReceiverMessage extends Serializable private[streaming] object StopReceiver extends ReceiverMessage private[streaming] case class CleanupOldBlocks(threshTime: Time) extends ReceiverMessage - +private[streaming] case class UpdateRateLimit(elementsPerSecond: Long) + extends ReceiverMessage diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 6078cdf8f8790..6e819460b1b23 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -77,6 +77,8 @@ private[streaming] class ReceiverSupervisorImpl( case CleanupOldBlocks(threshTime) => logDebug("Received delete old batch signal") cleanupOldBlocks(threshTime) + case UpdateRateLimit(eps) => + blockGenerator.updateRate(eps.toInt) } }) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 644e581cd8279..604d1a0dae289 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -27,7 +27,7 @@ import org.apache.spark.{Logging, SparkEnv, SparkException} import org.apache.spark.rpc._ import org.apache.spark.streaming.{StreamingContext, Time} import org.apache.spark.streaming.receiver.{CleanupOldBlocks, Receiver, ReceiverSupervisorImpl, - StopReceiver} + StopReceiver, UpdateRateLimit} import org.apache.spark.util.SerializableConfiguration /** @@ -180,6 +180,12 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false logError(s"Deregistered receiver for stream $streamId: $messageWithError") } + /** Update a receiver's maximum rate from an estimator's update */ + def sendRateUpdate(streamUID: Int, newRate: Long): Unit = { + for (info <- receiverInfo.get(streamUID); eP <- Option(info.endpoint)) + eP.send(UpdateRateLimit(newRate)) + } + /** Add new blocks for the given stream */ private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { receivedBlockTracker.addBlock(receivedBlockInfo)