Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use streaming aggregation for a correlated scalar subquery #10731

Merged
merged 10 commits into from Jun 20, 2018

Commits on Jun 20, 2018

  1. Configuration menu
    Copy the full SHA
    d31cb2a View commit details
    Browse the repository at this point in the history
  2. Update AddLocalExchanges to plan streaming aggregations

    Also, fixed dormant bug in local property derivations for cross joins.
    mbasmanova committed Jun 20, 2018
    Configuration menu
    Copy the full SHA
    6c97d9f View commit details
    Browse the repository at this point in the history
  3. Configuration menu
    Copy the full SHA
    4c31b76 View commit details
    Browse the repository at this point in the history
  4. Configuration menu
    Copy the full SHA
    3a28de7 View commit details
    Browse the repository at this point in the history
  5. Configuration menu
    Copy the full SHA
    ac5d37d View commit details
    Browse the repository at this point in the history
  6. Enable streaming aggregation for pre-grouped inputs

    Scalar subqueries are often rewritten as follows:
    
    - Aggregation(unique, a, b)
      - LeftJoin
        - AssignUniqueId(unique)
          - probe (a, b)
        - build
    
    In this plan, aggregation input is grouped on all grouping keys and allows for
    single-step streaming execution. This commit updates PropertyDerivations and
    StreamPropertyDerivations to derive partitioned_on(unique) global property and
    {grouped(unique), constant(a), constant(b)} local properties for aggregation
    input. These changes allow AddLocalExchange to plan streaming aggregations.
    
    Specific changes are:
    
    - Update PropertyDerivations to
      - set {grouped(unique), constant(a), constant(b)} local properties for the
        output of AssignUniqueId node;
    
    - Update StreamPropertyDerivations to
      - set partitioned_on(unique) for the output of AssignUniqueId node;
    mbasmanova committed Jun 20, 2018
    Configuration menu
    Copy the full SHA
    c33a1a3 View commit details
    Browse the repository at this point in the history
  7. Identify streaming aggregations in EXPLAIN

    Append (STREAMING) to Aggregate node when printing query plan.
    
    For example,
    
    EXPLAIN (TYPE DISTRIBUTED)
    SELECT (SELECT count(*) FROM region r2 WHERE r2.regionkey > r1.regionkey)
    FROM region r1;
    
    - Aggregate(STREAMING)[regionkey, unique] => [regionkey:bigint, unique:bigint, count:bigint]
           count := "count"("non_null")
       - LeftJoin[("regionkey_0" > "regionkey")] => [regionkey:bigint, unique:bigint, non_null:boolean]
               Distribution: REPLICATED
               SortExpression["regionkey_0"]
           - AssignUniqueId => [regionkey:bigint, unique:bigint]
               - TableScan[tpch:tpch:region:sf0.01, originalConstraint = true] => [regionkey:bigint]
    mbasmanova committed Jun 20, 2018
    Configuration menu
    Copy the full SHA
    6666a7b View commit details
    Browse the repository at this point in the history
  8. Push up AssignUniqueId through remote exchange

    Scalar subqueries are often rewritten as follows:
    
    - Aggregation(unique, a, b)
      - LeftJoin
        - RemoteExchange (REPARTITION)
          - AssignUniqueId(unique)
            - probe (a, b)
        - RemoteExchange (REPARTITION)
          - build
    
    In these plans, streaming aggregation is not applied because
    grouped(unique, a, b) local property generated by the AssignUniqueId node
    can't reach the Aggregation node as it gets dropped when passing through the
    RemoteExchange node.
    
    This commit adds an optimizer rule to push up AssignUniqueId node through the
    RemoteExchange to allow for streaming aggregation. It also updates
    HashGenerationOptimizer to avoid generating unnecessary hashes for streaming
    aggregations. The modified plan looks like this:
    
    - Aggregation(unique, a, b)
      - LeftJoin
        - AssignUniqueId(unique)
          - RemoteExchange (REPARTITION)
            - probe (a, b)
        - RemoteExchange (REPARTITION)
          - build
    mbasmanova committed Jun 20, 2018
    Configuration menu
    Copy the full SHA
    445d110 View commit details
    Browse the repository at this point in the history
  9. Configuration menu
    Copy the full SHA
    2d4f084 View commit details
    Browse the repository at this point in the history
  10. Add StreamingAggregationOperator

    StreamingAggregationOperator applies only when aggregation source is already
    grouped on the grouping keys, e.g. aggregations over joins generated for the
    scalar subqueries. In these cases, StreamingAggregationOperator is
    more efficient than HashAggregationOperator as it uses less CPU and a lot less
    memory.
    
    Add a benchmark to compare StreamingAggregationOperator and
    HashAggregationOperator for pre-grouped data sources.
    
      (operatorType)  (rowsPerGroup)  Mode  Cnt    Score    Error  Units
           streaming               1  avgt   30  379.203 ±  9.431  ms/op
           streaming              10  avgt   30   68.950 ±  3.164  ms/op
           streaming            1000  avgt   30   27.623 ±  0.776  ms/op
                hash               1  avgt   30  451.901 ± 15.327  ms/op
                hash              10  avgt   30   94.244 ±  2.437  ms/op
                hash            1000  avgt   30   53.038 ±  0.980  ms/op
    mbasmanova committed Jun 20, 2018
    Configuration menu
    Copy the full SHA
    cc916c4 View commit details
    Browse the repository at this point in the history