Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-8975][Streaming] Adds a mechanism to send a new rate from the driver to the block generator #15

Merged

Conversation

huitseeker
Copy link

No description provided.

@huitseeker
Copy link
Author

unrelated SQL tests failing on https://ci.typesafe.com/job/ghprb-spark-multi-conf/39

@@ -23,4 +23,5 @@ import org.apache.spark.streaming.Time
private[streaming] sealed trait ReceiverMessage extends Serializable
private[streaming] object StopReceiver extends ReceiverMessage
private[streaming] case class CleanupOldBlocks(threshTime: Time) extends ReceiverMessage

private[streaming] case class RateLimitUpdate(elementsPerSecond: Long)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RateLimitUpdate --> UpdateRateLimit

@huitseeker huitseeker changed the title Add a mechanism to send a new rate from the driver to the block generator [SPARK-8975] Adds a mechanism to send a new rate from the driver to the block generator Jul 14, 2015
@huitseeker
Copy link
Author

retest this please

@typesafe-tools
Copy link
Collaborator

Refer to this link for build results (access rights to CI server needed):
https://ci.typesafe.com/job/ghprb-spark-multi-conf/40/

Build Log
last 10 lines

[...truncated 18 lines...]
 > git config core.sparsecheckout # timeout=10
 > git checkout -f 6fc5141959418c3909dd5db4a01681574cd98166
First time build. Skipping changelog.
Triggering ghprb-spark-multi-conf » Spark-Ora-JDK7-PV,2.10
Configuration ghprb-spark-multi-conf » Spark-Ora-JDK7-PV,2.10 is still in the queue: Waiting for next available executor on Spark JDK-7 PV (i-cfb8ec1c)
Configuration ghprb-spark-multi-conf » Spark-Ora-JDK7-PV,2.10 is still in the queue: Waiting for next available executor on Spark-Ora-JDK7-PV
ghprb-spark-multi-conf » Spark-Ora-JDK7-PV,2.10 completed with result FAILURE
Notifying upstream projects of job completion
Setting status of 21732cb7e1d5d0fbd024b2612a28d277af2abb7e to FAILURE with url http://ci.typesafe.com/job/ghprb-spark-multi-conf/40/ and message: Merged build finished.

Test FAILed.

@huitseeker huitseeker changed the title [SPARK-8975] Adds a mechanism to send a new rate from the driver to the block generator [SPARK-8975][Streaming] Adds a mechanism to send a new rate from the driver to the block generator Jul 15, 2015
@huitseeker
Copy link
Author

if (desiredRate > 0) {
currentRate.set(newRate min desiredRate)
}
else currentRate.set(newRate)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure this is the usual formatting of if expressions in Spark. I would use Java-style, as in the rest of the code base.

@dragos
Copy link

dragos commented Jul 15, 2015

Probably a few unit tests would be in order.

@typesafe-tools
Copy link
Collaborator

Refer to this link for build results (access rights to CI server needed):
https://ci.typesafe.com/job/ghprb-spark-multi-conf/43/

Build Log
last 10 lines

[...truncated 11 lines...]
 > git rev-parse refs/remotes/origin/origin/pr/15/merge^{commit} # timeout=10
Checking out Revision 6fc5141959418c3909dd5db4a01681574cd98166 (refs/remotes/origin/pr/15/merge)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f 6fc5141959418c3909dd5db4a01681574cd98166
First time build. Skipping changelog.
Triggering ghprb-spark-multi-conf » Spark-Ora-JDK7-PV,2.10
ghprb-spark-multi-conf » Spark-Ora-JDK7-PV,2.10 completed with result FAILURE
Notifying upstream projects of job completion
Setting status of 21732cb7e1d5d0fbd024b2612a28d277af2abb7e to FAILURE with url http://ci.typesafe.com/job/ghprb-spark-multi-conf/43/ and message: Merged build finished.

Test FAILed.

@huitseeker
Copy link
Author

I've added a few unit tests.

@huitseeker
Copy link
Author

@typesafe-tools
Copy link
Collaborator

Refer to this link for build results (access rights to CI server needed):
https://ci.typesafe.com/job/ghprb-spark-multi-conf/46/

Build Log
last 10 lines

[...truncated 11 lines...]
 > git rev-parse refs/remotes/origin/origin/pr/15/merge^{commit} # timeout=10
Checking out Revision 60703aa99060db81ebd60b23034780134c6d83ba (refs/remotes/origin/pr/15/merge)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f 60703aa99060db81ebd60b23034780134c6d83ba
First time build. Skipping changelog.
Triggering ghprb-spark-multi-conf » Spark-Ora-JDK7-PV,2.10
ghprb-spark-multi-conf » Spark-Ora-JDK7-PV,2.10 completed with result FAILURE
Notifying upstream projects of job completion
Setting status of 930a6c7baed4c53892efe40033860d2b5b6e3523 to FAILURE with url http://ci.typesafe.com/job/ghprb-spark-multi-conf/46/ and message: Merged build finished.

Test FAILed.

@huitseeker
Copy link
Author

Build 46: pyspark failure
https://ci.typesafe.com/job/ghprb-spark-multi-conf/label=Spark-Ora-JDK7-PV,scala_version=2.10/46/console
(missing file on the test machine)

assert(rateLimiter.currentRateLimit.get == 105)
}

test("rate limiter stays below maxRate despite large updates") {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests are good, but I was more thinking about testing the path from ReceiverTracker to rate limiter.

@dragos
Copy link

dragos commented Jul 17, 2015

I'll merge this so I can add the test I had in mind and open the first PR on apache/spark

dragos added a commit that referenced this pull request Jul 17, 2015
[SPARK-8975][Streaming] Adds a mechanism to send a new rate from the driver to the block generator
@dragos dragos merged commit 6369b30 into lightbend:topic/streaming-bp/dynamic-rate Jul 17, 2015
dragos added a commit that referenced this pull request Jul 23, 2015
… driver to the block generator

First step for [SPARK-7398](https://issues.apache.org/jira/browse/SPARK-7398).

tdas huitseeker

Author: Iulian Dragos <jaguarul@gmail.com>
Author: François Garillot <francois@garillot.net>

Closes apache#7471 from dragos/topic/streaming-bp/dynamic-rate and squashes the following commits:

8941cf9 [Iulian Dragos] Renames and other nitpicks.
162d9e5 [Iulian Dragos] Use Reflection for accessing truly private `executor` method and use the listener bus to know when receivers have registered (`onStart` is called before receivers have registered, leading to flaky behavior).
210f495 [Iulian Dragos] Revert "Added a few tests that measure the receiver’s rate."
0c51959 [Iulian Dragos] Added a few tests that measure the receiver’s rate.
261a051 [Iulian Dragos] - removed field to hold the current rate limit in rate limiter - made rate limit a Long and default to Long.MaxValue (consequence of the above) - removed custom `waitUntil` and replaced it by `eventually`
cd1397d [Iulian Dragos] Add a test for the propagation of a new rate limit from driver to receivers.
6369b30 [Iulian Dragos] Merge pull request #15 from huitseeker/SPARK-8975
d15de42 [François Garillot] [SPARK-8975][Streaming] Adds Ratelimiter unit tests w.r.t. spark.streaming.receiver.maxRate
4721c7d [François Garillot] [SPARK-8975][Streaming] Add a mechanism to send a new rate from the driver to the block generator
dragos added a commit that referenced this pull request Jul 30, 2015
…ements the RateController

Based on apache#7471.

- [x] add a test that exercises the publish path from driver to receiver
- [ ] remove Serializable from `RateController` and `RateEstimator`

Author: Iulian Dragos <jaguarul@gmail.com>
Author: François Garillot <francois@garillot.net>

Closes apache#7600 from dragos/topic/streaming-bp/rate-controller and squashes the following commits:

f168c94 [Iulian Dragos] Latest review round.
5125e60 [Iulian Dragos] Fix style.
a2eb3b9 [Iulian Dragos] Merge remote-tracking branch 'upstream/master' into topic/streaming-bp/rate-controller
475e346 [Iulian Dragos] Latest round of reviews.
e9fb45e [Iulian Dragos] - Add a test for checkpointing - fixed serialization for RateController.executionContext
715437a [Iulian Dragos] Review comments and added a `reset` call in ReceiverTrackerTest.
e57c66b [Iulian Dragos] Added a couple of tests for the full scenario from driver to receivers, with several rate updates.
b425d32 [Iulian Dragos] Removed DeveloperAPI, removed rateEstimator field, removed Noop rate estimator, changed logic for initialising rate estimator.
238cfc6 [Iulian Dragos] Merge remote-tracking branch 'upstream/master' into topic/streaming-bp/rate-controller
34a389d [Iulian Dragos] Various style changes and a first test for the rate controller.
d32ca36 [François Garillot] [SPARK-8977][Streaming] Defines the RateEstimator interface, and implements the ReceiverRateController
8941cf9 [Iulian Dragos] Renames and other nitpicks.
162d9e5 [Iulian Dragos] Use Reflection for accessing truly private `executor` method and use the listener bus to know when receivers have registered (`onStart` is called before receivers have registered, leading to flaky behavior).
210f495 [Iulian Dragos] Revert "Added a few tests that measure the receiver’s rate."
0c51959 [Iulian Dragos] Added a few tests that measure the receiver’s rate.
261a051 [Iulian Dragos] - removed field to hold the current rate limit in rate limiter - made rate limit a Long and default to Long.MaxValue (consequence of the above) - removed custom `waitUntil` and replaced it by `eventually`
cd1397d [Iulian Dragos] Add a test for the propagation of a new rate limit from driver to receivers.
6369b30 [Iulian Dragos] Merge pull request #15 from huitseeker/SPARK-8975
d15de42 [François Garillot] [SPARK-8975][Streaming] Adds Ratelimiter unit tests w.r.t. spark.streaming.receiver.maxRate
4721c7d [François Garillot] [SPARK-8975][Streaming] Add a mechanism to send a new rate from the driver to the block generator
fdp-ci pushed a commit that referenced this pull request Aug 27, 2019
## What changes were proposed in this pull request?
This PR aims at improving the way physical plans are explained in spark.

Currently, the explain output for physical plan may look very cluttered and each operator's
string representation can be very wide and wraps around in the display making it little
hard to follow. This especially happens when explaining a query 1) Operating on wide tables
2) Has complex expressions etc.

This PR attempts to split the output into two sections. In the header section, we display
the basic operator tree with a number associated with each operator. In this section, we strictly
control what we output for each operator. In the footer section, each operator is verbosely
displayed. Based on the feedback from Maryann, the uncorrelated subqueries (SubqueryExecs) are not included in the main plan. They are printed separately after the main plan and can be
correlated by the originating expression id from its parent plan.

To illustrate, here is a simple plan displayed in old vs new way.

Example query1 :
```
EXPLAIN SELECT key, Max(val) FROM explain_temp1 WHERE key > 0 GROUP BY key HAVING max(val) > 0
```

Old :
```
*(2) Project [key#2, max(val)#15]
+- *(2) Filter (isnotnull(max(val#3)#18) AND (max(val#3)#18 > 0))
   +- *(2) HashAggregate(keys=[key#2], functions=[max(val#3)], output=[key#2, max(val)#15, max(val#3)#18])
      +- Exchange hashpartitioning(key#2, 200)
         +- *(1) HashAggregate(keys=[key#2], functions=[partial_max(val#3)], output=[key#2, max#21])
            +- *(1) Project [key#2, val#3]
               +- *(1) Filter (isnotnull(key#2) AND (key#2 > 0))
                  +- *(1) FileScan parquet default.explain_temp1[key#2,val#3] Batched: true, DataFilters: [isnotnull(key#2), (key#2 > 0)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp1], PartitionFilters: [], PushedFilters: [IsNotNull(key), GreaterThan(key,0)], ReadSchema: struct<key:int,val:int>
```
New :
```
Project (8)
+- Filter (7)
   +- HashAggregate (6)
      +- Exchange (5)
         +- HashAggregate (4)
            +- Project (3)
               +- Filter (2)
                  +- Scan parquet default.explain_temp1 (1)

(1) Scan parquet default.explain_temp1 [codegen id : 1]
Output: [key#2, val#3]

(2) Filter [codegen id : 1]
Input     : [key#2, val#3]
Condition : (isnotnull(key#2) AND (key#2 > 0))

(3) Project [codegen id : 1]
Output    : [key#2, val#3]
Input     : [key#2, val#3]

(4) HashAggregate [codegen id : 1]
Input: [key#2, val#3]

(5) Exchange
Input: [key#2, max#11]

(6) HashAggregate [codegen id : 2]
Input: [key#2, max#11]

(7) Filter [codegen id : 2]
Input     : [key#2, max(val)#5, max(val#3)#8]
Condition : (isnotnull(max(val#3)#8) AND (max(val#3)#8 > 0))

(8) Project [codegen id : 2]
Output    : [key#2, max(val)#5]
Input     : [key#2, max(val)#5, max(val#3)#8]
```

Example Query2 (subquery):
```
SELECT * FROM   explain_temp1 WHERE  KEY = (SELECT Max(KEY) FROM   explain_temp2 WHERE  KEY = (SELECT Max(KEY) FROM   explain_temp3 WHERE  val > 0) AND val = 2) AND val > 3
```
Old:
```
*(1) Project [key#2, val#3]
+- *(1) Filter (((isnotnull(KEY#2) AND isnotnull(val#3)) AND (KEY#2 = Subquery scalar-subquery#39)) AND (val#3 > 3))
   :  +- Subquery scalar-subquery#39
   :     +- *(2) HashAggregate(keys=[], functions=[max(KEY#26)], output=[max(KEY)apache#45])
   :        +- Exchange SinglePartition
   :           +- *(1) HashAggregate(keys=[], functions=[partial_max(KEY#26)], output=[max#47])
   :              +- *(1) Project [key#26]
   :                 +- *(1) Filter (((isnotnull(KEY#26) AND isnotnull(val#27)) AND (KEY#26 = Subquery scalar-subquery#38)) AND (val#27 = 2))
   :                    :  +- Subquery scalar-subquery#38
   :                    :     +- *(2) HashAggregate(keys=[], functions=[max(KEY#28)], output=[max(KEY)apache#43])
   :                    :        +- Exchange SinglePartition
   :                    :           +- *(1) HashAggregate(keys=[], functions=[partial_max(KEY#28)], output=[max#49])
   :                    :              +- *(1) Project [key#28]
   :                    :                 +- *(1) Filter (isnotnull(val#29) AND (val#29 > 0))
   :                    :                    +- *(1) FileScan parquet default.explain_temp3[key#28,val#29] Batched: true, DataFilters: [isnotnull(val#29), (val#29 > 0)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp3], PartitionFilters: [], PushedFilters: [IsNotNull(val), GreaterThan(val,0)], ReadSchema: struct<key:int,val:int>
   :                    +- *(1) FileScan parquet default.explain_temp2[key#26,val#27] Batched: true, DataFilters: [isnotnull(key#26), isnotnull(val#27), (val#27 = 2)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp2], PartitionFilters: [], PushedFilters: [IsNotNull(key), IsNotNull(val), EqualTo(val,2)], ReadSchema: struct<key:int,val:int>
   +- *(1) FileScan parquet default.explain_temp1[key#2,val#3] Batched: true, DataFilters: [isnotnull(key#2), isnotnull(val#3), (val#3 > 3)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/explain_temp1], PartitionFilters: [], PushedFilters: [IsNotNull(key), IsNotNull(val), GreaterThan(val,3)], ReadSchema: struct<key:int,val:int>
```
New:
```
Project (3)
+- Filter (2)
   +- Scan parquet default.explain_temp1 (1)

(1) Scan parquet default.explain_temp1 [codegen id : 1]
Output: [key#2, val#3]

(2) Filter [codegen id : 1]
Input     : [key#2, val#3]
Condition : (((isnotnull(KEY#2) AND isnotnull(val#3)) AND (KEY#2 = Subquery scalar-subquery#23)) AND (val#3 > 3))

(3) Project [codegen id : 1]
Output    : [key#2, val#3]
Input     : [key#2, val#3]
===== Subqueries =====

Subquery:1 Hosting operator id = 2 Hosting Expression = Subquery scalar-subquery#23
HashAggregate (9)
+- Exchange (8)
   +- HashAggregate (7)
      +- Project (6)
         +- Filter (5)
            +- Scan parquet default.explain_temp2 (4)

(4) Scan parquet default.explain_temp2 [codegen id : 1]
Output: [key#26, val#27]

(5) Filter [codegen id : 1]
Input     : [key#26, val#27]
Condition : (((isnotnull(KEY#26) AND isnotnull(val#27)) AND (KEY#26 = Subquery scalar-subquery#22)) AND (val#27 = 2))

(6) Project [codegen id : 1]
Output    : [key#26]
Input     : [key#26, val#27]

(7) HashAggregate [codegen id : 1]
Input: [key#26]

(8) Exchange
Input: [max#35]

(9) HashAggregate [codegen id : 2]
Input: [max#35]

Subquery:2 Hosting operator id = 5 Hosting Expression = Subquery scalar-subquery#22
HashAggregate (15)
+- Exchange (14)
   +- HashAggregate (13)
      +- Project (12)
         +- Filter (11)
            +- Scan parquet default.explain_temp3 (10)

(10) Scan parquet default.explain_temp3 [codegen id : 1]
Output: [key#28, val#29]

(11) Filter [codegen id : 1]
Input     : [key#28, val#29]
Condition : (isnotnull(val#29) AND (val#29 > 0))

(12) Project [codegen id : 1]
Output    : [key#28]
Input     : [key#28, val#29]

(13) HashAggregate [codegen id : 1]
Input: [key#28]

(14) Exchange
Input: [max#37]

(15) HashAggregate [codegen id : 2]
Input: [max#37]
```

Note:
I opened this PR as a WIP to start getting feedback. I will be on vacation starting tomorrow
would not be able to immediately incorporate the feedback. I will start to
work on them as soon as i can. Also, currently this PR provides a basic infrastructure
for explain enhancement. The details about individual operators will be implemented
in follow-up prs
## How was this patch tested?
Added a new test `explain.sql` that tests basic scenarios. Need to add more tests.

Closes apache#24759 from dilipbiswal/explain_feature.

Authored-by: Dilip Biswal <dbiswal@us.ibm.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants