Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-8979][Streaming] Implements a PIDRateEstimator #17

Conversation

huitseeker
Copy link

@huitseeker huitseeker force-pushed the SPARK-8979 branch 3 times, most recently from 94c2225 to 2144472 Compare July 15, 2015 00:34
@huitseeker huitseeker changed the title [SPARK-8979][Streaming] Implement a PIDRateEstimator [SPARK-8979][Streaming] Implements a PIDRateEstimator Jul 15, 2015
*/
protected [streaming] val rateEstimator = ssc.conf
.getOption("spark.streaming.RateEstimator")
.getOrElse("noop") match {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd break this logic into its own method. The list of implementations might grow.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you suggest the signature of that method would be ?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about String => RateEstimator?

@huitseeker huitseeker force-pushed the SPARK-8979 branch 6 times, most recently from 219e1bd to ef7b0a4 Compare July 15, 2015 18:51
* an estimate of the speed at which this stream should ingest messages,
* given an estimate computation from a `RateEstimator`
*/
@DeveloperApi
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This must be private[streaming]

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume the same goes for RateEstimator, NoopRateEstimator, and PIDRateEstimator, at least for now, right ?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@transient private val executionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonSingleThreadExecutor("stream-rate-update"))

private val speedLimit : AtomicLong = new AtomicLong(-1L)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

speedLimit --> rateLimit

@tdas
Copy link

tdas commented Jul 15, 2015

Here should be the test plan.

  1. RateLimiterSuite - add unit test to test rate limit updates (mentioned in this in the inline comments)
  2. ReceiverTrackerSuite - add unit test that tests the communication between receiver - (i) runs a streamingContext+receiverStream+fake receiver that when started gives us access to the internal rate limiter of the receiver (may have to use objects) (ii) directly change the rate by calling receiverTracker.updateRate (iii) check the current rate in the receiver's rate limiter (not by sending data, but directly)

These should be part of subtask 1 or dynamic rate limiting. Additionally, with the rate controller subtask.

  1. RateControllerSuite -

    a. add unit test that uses a mock RateController to test whether publish() gets called when onBatchCompleted is called

    b. add unit test that uses a fake RateEstimator (something that just doubles the rate) to test whether the rate limit is being set correctly in the Receiver. Use the technique explained above in 2.

  2. PIDRateEstimatorSuite - already there, may need changes but i will review it later.

@tdas
Copy link

tdas commented Jul 15, 2015

Overall, this whole PR looks quite good and ready for PRs to the Spark main repo. I suggest breaking them down into smaller PRs according to the subtasks.
@huitseeker @dragos
Thank you very much for your efforts. :)

@typesafe-tools
Copy link
Collaborator

Refer to this link for build results (access rights to CI server needed):
https://ci.typesafe.com/job/ghprb-spark-multi-conf/52/

Build Log
last 10 lines

[...truncated 11 lines...]
 > git rev-parse refs/remotes/origin/origin/pr/17/merge^{commit} # timeout=10
Checking out Revision 16437f83d698c6f2d2a7572a87a4ba0a91e72636 (refs/remotes/origin/pr/17/merge)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f 16437f83d698c6f2d2a7572a87a4ba0a91e72636
First time build. Skipping changelog.
Triggering ghprb-spark-multi-conf » Spark-Ora-JDK7-PV,2.10
ghprb-spark-multi-conf » Spark-Ora-JDK7-PV,2.10 completed with result FAILURE
Notifying upstream projects of job completion
Setting status of 48c869c75acf5c613e3e737b3913a189af0dada5 to FAILURE with url http://ci.typesafe.com/job/ghprb-spark-multi-conf/52/ and message: Merged build finished.

Test FAILed.

@typesafe-tools
Copy link
Collaborator

Refer to this link for build results (access rights to CI server needed):
https://ci.typesafe.com/job/ghprb-spark-multi-conf/54/

Build Log
last 10 lines

[...truncated 11 lines...]
 > git rev-parse refs/remotes/origin/origin/pr/17/merge^{commit} # timeout=10
Checking out Revision 16437f83d698c6f2d2a7572a87a4ba0a91e72636 (refs/remotes/origin/pr/17/merge)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f 16437f83d698c6f2d2a7572a87a4ba0a91e72636
First time build. Skipping changelog.
Triggering ghprb-spark-multi-conf » Spark-Ora-JDK7-PV,2.10
ghprb-spark-multi-conf » Spark-Ora-JDK7-PV,2.10 completed with result FAILURE
Notifying upstream projects of job completion
Setting status of 94c2225bb60e800194cdacde35b6916678bc676d to FAILURE with url http://ci.typesafe.com/job/ghprb-spark-multi-conf/54/ and message: Merged build finished.

Test FAILed.

@dragos dragos force-pushed the topic/streaming-bp/dynamic-rate branch from 13ada97 to 0c51959 Compare July 20, 2015 14:45
@dragos dragos closed this Sep 9, 2015
typesafe-tools pushed a commit that referenced this pull request May 27, 2016
…onfig option.

## What changes were proposed in this pull request?

Currently, `OptimizeIn` optimizer replaces `In` expression into `InSet` expression if the size of set is greater than a constant, 10.
This issue aims to make a configuration `spark.sql.optimizer.inSetConversionThreshold` for that.

After this PR, `OptimizerIn` is configurable.
```scala
scala> sql("select a in (1,2,3) from (select explode(array(1,2)) a) T").explain()
== Physical Plan ==
WholeStageCodegen
:  +- Project [a#7 IN (1,2,3) AS (a IN (1, 2, 3))#8]
:     +- INPUT
+- Generate explode([1,2]), false, false, [a#7]
   +- Scan OneRowRelation[]

scala> sqlContext.setConf("spark.sql.optimizer.inSetConversionThreshold", "2")

scala> sql("select a in (1,2,3) from (select explode(array(1,2)) a) T").explain()
== Physical Plan ==
WholeStageCodegen
:  +- Project [a#16 INSET (1,2,3) AS (a IN (1, 2, 3))#17]
:     +- INPUT
+- Generate explode([1,2]), false, false, [a#16]
   +- Scan OneRowRelation[]
```

## How was this patch tested?

Pass the Jenkins tests (with a new testcase)

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes apache#12562 from dongjoon-hyun/SPARK-14796.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants