Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make PageProcessor preserve input block laziness #1075

Merged
merged 8 commits into from
Jul 29, 2019

Conversation

sopel39
Copy link
Member

@sopel39 sopel39 commented Jul 3, 2019

Followup:

  • port semi-join to WorkProcessorOperator (@Praveen2112 ?)
  • port FilterAndProjectOperator to WorkProcessorOperator
    This way we get (node-local) dynamic filtering for semi joins and IN (subquery), NOT IN (subquery), x >= ALL (subquery), etc expressions

@sopel39 sopel39 added the WIP label Jul 3, 2019
@cla-bot cla-bot bot added the cla-signed label Jul 3, 2019
@sopel39 sopel39 force-pushed the ks/lazy_page_processor branch 2 times, most recently from fa91b33 to f49bddb Compare July 4, 2019 16:19
ScanFilterAndProject can have different source node ID
and plan node ID.
@sopel39 sopel39 force-pushed the ks/lazy_page_processor branch 5 times, most recently from 038b4f4 to 2cd3415 Compare July 6, 2019 22:21
@sopel39 sopel39 removed the WIP label Jul 8, 2019
@sopel39
Copy link
Member Author

sopel39 commented Jul 8, 2019

Benchmarks (no perf regression for non-lazy pages).
Benchmarks were done using plain input reference expressions:

AFTER:

Benchmark                                                      (columnCount)  (dictionaryBlocks)  (positionsPerPage)   (type)  Mode  Cnt    Score   Error  Units
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              2               false                  32  varchar  avgt   50  141.904 ± 0.821  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              2               false                  32   bigint  avgt   50   45.060 ± 0.567  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              2               false                1024  varchar  avgt   50   77.402 ± 0.540  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              2               false                1024   bigint  avgt   50    9.260 ± 0.039  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              2                true                  32  varchar  avgt   50   90.962 ± 0.611  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              2                true                  32   bigint  avgt   50   56.806 ± 0.832  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              2                true                1024  varchar  avgt   50   19.393 ± 0.171  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              2                true                1024   bigint  avgt   50    8.482 ± 0.017  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              4               false                  32  varchar  avgt   50  211.098 ± 0.377  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              4               false                  32   bigint  avgt   50   72.684 ± 0.713  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              4               false                1024  varchar  avgt   50   96.418 ± 0.502  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              4               false                1024   bigint  avgt   50   13.353 ± 0.073  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              4                true                  32  varchar  avgt   50  149.502 ± 0.675  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              4                true                  32   bigint  avgt   50   94.367 ± 1.635  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              4                true                1024  varchar  avgt   50   24.067 ± 0.242  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              4                true                1024   bigint  avgt   50   12.741 ± 0.066  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              8               false                  32   bigint  avgt   50  125.515 ± 1.271  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              8               false                1024  varchar  avgt   50  130.306 ± 0.741  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              8               false                1024   bigint  avgt   50   21.516 ± 0.048  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              8                true                  32  varchar  avgt   50  256.790 ± 1.033  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              8                true                  32   bigint  avgt   50  162.861 ± 2.340  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              8                true                1024  varchar  avgt   50   33.036 ± 0.096  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              8                true                1024   bigint  avgt   50   21.007 ± 0.081  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented             16               false                  32   bigint  avgt   50  224.977 ± 1.399  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented             16               false                1024  varchar  avgt   50  200.095 ± 0.717  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented             16               false                1024   bigint  avgt   50   37.257 ± 0.113  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented             16                true                  32  varchar  avgt   50  484.378 ± 1.285  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented             16                true                  32   bigint  avgt   50  292.979 ± 1.243  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented             16                true                1024  varchar  avgt   50   50.663 ± 0.503  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented             16                true                1024   bigint  avgt   50   37.043 ± 0.152  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented             32               false                  32   bigint  avgt   50  420.169 ± 2.380  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented             32               false                1024  varchar  avgt   50  342.290 ± 1.434  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented             32               false                1024   bigint  avgt   50   69.130 ± 0.343  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented             32                true                  32   bigint  avgt   50  612.408 ± 4.062  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented             32                true                1024  varchar  avgt   50   87.303 ± 0.618  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented             32                true                1024   bigint  avgt   50   71.148 ± 0.465  ms/op

BEFORE:

Benchmark                                                      (columnCount)  (dictionaryBlocks)  (positionsPerPage)   (type)  Mode  Cnt    Score    Error  Units
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              2               false                  32  varchar  avgt   50  143.775 ±  1.108  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              2               false                  32   bigint  avgt   50   45.893 ±  0.651  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              2               false                1024  varchar  avgt   50   77.121 ±  0.609  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              2               false                1024   bigint  avgt   50    9.616 ±  0.051  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              2                true                  32  varchar  avgt   50   92.151 ±  0.384  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              2                true                  32   bigint  avgt   50   56.643 ±  0.310  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              2                true                1024  varchar  avgt   50   19.985 ±  0.174  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              2                true                1024   bigint  avgt   50    8.833 ±  0.043  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              4               false                  32  varchar  avgt   50  214.515 ±  0.780  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              4               false                  32   bigint  avgt   50   75.181 ±  1.003  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              4               false                1024  varchar  avgt   50   95.147 ±  0.377  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              4               false                1024   bigint  avgt   50   13.822 ±  0.051  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              4                true                  32  varchar  avgt   50  149.661 ±  0.613  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              4                true                  32   bigint  avgt   50   94.085 ±  0.839  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              4                true                1024  varchar  avgt   50   24.553 ±  0.174  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              4                true                1024   bigint  avgt   50   13.018 ±  0.051  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              8               false                  32   bigint  avgt   50  127.073 ±  1.259  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              8               false                1024  varchar  avgt   50  129.735 ±  0.541  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              8               false                1024   bigint  avgt   50   21.865 ±  0.104  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              8                true                  32  varchar  avgt   50  258.460 ±  0.951  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              8                true                  32   bigint  avgt   50  165.347 ±  2.190  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              8                true                1024  varchar  avgt   50   33.486 ±  0.182  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented              8                true                1024   bigint  avgt   50   21.073 ±  0.075  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented             16               false                  32   bigint  avgt   50  221.195 ±  1.120  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented             16               false                1024  varchar  avgt   50  199.427 ±  0.476  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented             16               false                1024   bigint  avgt   50   37.843 ±  0.175  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented             16                true                  32  varchar  avgt   50  485.161 ±  1.569  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented             16                true                  32   bigint  avgt   50  295.203 ±  2.633  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented             16                true                1024  varchar  avgt   50   52.209 ±  0.265  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented             16                true                1024   bigint  avgt   50   37.172 ±  0.220  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented             32               false                  32   bigint  avgt   50  413.407 ±  1.834  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented             32               false                1024  varchar  avgt   50  339.468 ±  1.265  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented             32               false                1024   bigint  avgt   50   69.612 ±  0.218  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented             32                true                  32   bigint  avgt   50  584.004 ± 12.342  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented             32                true                1024  varchar  avgt   50   89.021 ±  0.305  ms/op
BenchmarkScanFilterAndProjectOperator.benchmarkColumnOriented             32                true                1024   bigint  avgt   50   71.113 ±  0.421  ms/op

@sopel39 sopel39 changed the title [WIP] Make PageProcessor preserve input block laziness Make PageProcessor preserve input block laziness Jul 8, 2019
@sopel39
Copy link
Member Author

sopel39 commented Jul 8, 2019

Some queries (significant CPU reduction for TopN queries with filter):

No work processor pipelines:

explain analyze select * from hive.tpch_sf10_orc.lineitem order by partkey limit 10;

 Fragment 2 [SOURCE]
     CPU: 38.48s, Scheduled: 4.65m, Input: 59986052 rows (7.30GB); per task: avg.: 59986052.00 std.dev.: 0.00, Output: 480 rows (71.47kB)
     Output layout: [orderkey, partkey, suppkey, linenumber, quantity, extendedprice, discount, tax, returnflag, linestatus, shipdate, commitdate, receiptdate, shipinstruct, shipmode, comment]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     TopNPartial[10 by (partkey ASC_NULLS_LAST)]
     │   Layout: [orderkey:bigint, partkey:bigint, suppkey:bigint, linenumber:integer, quantity:double, extendedprice:double, discount:double, tax:double, returnflag:varchar(1), linestatus:varchar(1), shipdate:date, commitdate:date, receiptdat
     │   CPU: 4.27s (0.86%), Scheduled: 6.74s (17.52%), Output: 480 rows (71.47kB)
     │   Input avg.: 1153577.92 rows, Input std.dev.: 30.50%
     └─ TableScan[hive:tpch_sf10_orc:lineitem, grouped = false]
            Layout: [orderkey:bigint, partkey:bigint, suppkey:bigint, linenumber:integer, quantity:double, extendedprice:double, discount:double, tax:double, returnflag:varchar(1), linestatus:varchar(1), shipdate:date, commitdate:date, receipt
            Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
            CPU: 34.21s (6.86%), Scheduled: 8.20m (1278.45%), Output: 59986052 rows (7.30GB)

presto> explain analyze select * from hive.tpch_sf10_orc.lineitem where partkey < 100000 order by partkey limit 10;

 Fragment 2 [SOURCE]
     CPU: 35.97s, Scheduled: 4.60m, Input: 59986052 rows (7.30GB); per task: avg.: 59986052.00 std.dev.: 0.00, Output: 480 rows (71.47kB)
     Output layout: [orderkey, partkey, suppkey, linenumber, quantity, extendedprice, discount, tax, returnflag, linestatus, shipdate, commitdate, receiptdate, shipinstruct, shipmode, comment]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     TopNPartial[10 by (partkey ASC_NULLS_LAST)]
     │   Layout: [orderkey:bigint, partkey:bigint, suppkey:bigint, linenumber:integer, quantity:double, extendedprice:double, discount:double, tax:double, returnflag:varchar(1), linestatus:varchar(1), shipdate:date, commitdate:date, receiptdat
     │   CPU: 214.00ms (0.04%), Scheduled: 353.00ms (0.98%), Output: 480 rows (71.47kB)
     │   Input avg.: 57638.79 rows, Input std.dev.: 30.50%
     └─ ScanFilter[table = hive:tpch_sf10_orc:lineitem, grouped = false, filterPredicate = ("partkey" < BIGINT '100000')]
            Layout: [orderkey:bigint, partkey:bigint, suppkey:bigint, linenumber:integer, quantity:double, extendedprice:double, discount:double, tax:double, returnflag:varchar(1), linestatus:varchar(1), shipdate:date, commitdate:date, receipt
            Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
            CPU: 35.75s (7.40%), Scheduled: 8.05m (1342.71%), Output: 2997217 rows (436.74MB)

with work processor pipelines:

explain analyze select * from hive.tpch_sf10_orc.lineitem order by partkey limit 10;

 Fragment 2 [SOURCE]
     CPU: 20.81s, Scheduled: 3.87m, Input: 59986052 rows (803.03MB); per task: avg.: 59986052.00 std.dev.: 0.00, Output: 480 rows (71.47kB)
     Output layout: [orderkey, partkey, suppkey, linenumber, quantity, extendedprice, discount, tax, returnflag, linestatus, shipdate, commitdate, receiptdate, shipinstruct, shipmode, comment]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     TopNPartial[10 by (partkey ASC_NULLS_LAST)]
     │   Layout: [orderkey:bigint, partkey:bigint, suppkey:bigint, linenumber:integer, quantity:double, extendedprice:double, discount:double, tax:double, returnflag:varchar(1), linestatus:varchar(1), shipdate:date, commitdate:date, receiptdat
     │   CPU: 19.00s (4.37%), Scheduled: 2.90m (836.25%), Output: 480 rows (71.47kB)
     │   Input avg.: 1153577.92 rows, Input std.dev.: 30.50%
     └─ TableScan[hive:tpch_sf10_orc:lineitem, grouped = false]
            Layout: [orderkey:bigint, partkey:bigint, suppkey:bigint, linenumber:integer, quantity:double, extendedprice:double, discount:double, tax:double, returnflag:varchar(1), linestatus:varchar(1), shipdate:date, commitdate:date, receipt
            Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
            CPU: 1.80s (0.41%), Scheduled: 4.34m (1252.39%), Output: 59986052 rows (803.03MB)

explain analyze select * from hive.tpch_sf10_orc.lineitem where partkey < 1000000000000 order by partkey limit 10;

 Fragment 2 [SOURCE]
     CPU: 20.64s, Scheduled: 3.88m, Input: 59986052 rows (803.03MB); per task: avg.: 59986052.00 std.dev.: 0.00, Output: 480 rows (71.47kB)
     Output layout: [orderkey, partkey, suppkey, linenumber, quantity, extendedprice, discount, tax, returnflag, linestatus, shipdate, commitdate, receiptdate, shipinstruct, shipmode, comment]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     TopNPartial[10 by (partkey ASC_NULLS_LAST)]
     │   Layout: [orderkey:bigint, partkey:bigint, suppkey:bigint, linenumber:integer, quantity:double, extendedprice:double, discount:double, tax:double, returnflag:varchar(1), linestatus:varchar(1), shipdate:date, commitdate:date, receiptdat
     │   CPU: 16.63s (3.81%), Scheduled: 2.90m (843.18%), Output: 480 rows (71.47kB)
     │   Input avg.: 1153577.92 rows, Input std.dev.: 30.50%
     └─ ScanFilter[table = hive:tpch_sf10_orc:lineitem, grouped = false, filterPredicate = ("partkey" < 1000000000000)]
            Layout: [orderkey:bigint, partkey:bigint, suppkey:bigint, linenumber:integer, quantity:double, extendedprice:double, discount:double, tax:double, returnflag:varchar(1), linestatus:varchar(1), shipdate:date, commitdate:date, receipt
            Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
            CPU: 4.00s (0.92%), Scheduled: 4.37m (1270.81%), Output: 59986052 rows (803.03MB)

presto> explain analyze select * from hive.tpch_sf10_orc.lineitem where partkey < 100000 order by partkey limit 10;

 Fragment 2 [SOURCE]
     CPU: 16.59s, Scheduled: 3.70m, Input: 59986052 rows (802.87MB); per task: avg.: 59986052.00 std.dev.: 0.00, Output: 480 rows (71.47kB)
     Output layout: [orderkey, partkey, suppkey, linenumber, quantity, extendedprice, discount, tax, returnflag, linestatus, shipdate, commitdate, receiptdate, shipinstruct, shipmode, comment]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     TopNPartial[10 by (partkey ASC_NULLS_LAST)]
     │   Layout: [orderkey:bigint, partkey:bigint, suppkey:bigint, linenumber:integer, quantity:double, extendedprice:double, discount:double, tax:double, returnflag:varchar(1), linestatus:varchar(1), shipdate:date, commitdate:date, receiptdat
     │   CPU: 12.80s (3.04%), Scheduled: 2.73m (987.65%), Output: 480 rows (71.47kB)
     │   Input avg.: 57638.79 rows, Input std.dev.: 30.50%
     └─ ScanFilter[table = hive:tpch_sf10_orc:lineitem, grouped = false, filterPredicate = ("partkey" < BIGINT '100000')]
            Layout: [orderkey:bigint, partkey:bigint, suppkey:bigint, linenumber:integer, quantity:double, extendedprice:double, discount:double, tax:double, returnflag:varchar(1), linestatus:varchar(1), shipdate:date, commitdate:date, receipt
            Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
            CPU: 3.78s (0.90%), Scheduled: 4.30m (1554.56%), Output: 2997217 rows (40.70MB)

@sopel39 sopel39 requested a review from dain July 8, 2019 10:30
@sopel39
Copy link
Member Author

sopel39 commented Jul 17, 2019

part of: #49

Copy link
Member

@dain dain left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly just suggestions, and requests for more explanations (and code comments in one case).

I'm curious why we are only applying lazyness to some cases, how those were choose, and if we should expose it to more cases?

Copy link
Member Author

@sopel39 sopel39 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ac

@dain dain self-requested a review July 27, 2019 21:25
@dain dain assigned sopel39 and unassigned dain Jul 28, 2019
@sopel39 sopel39 merged commit 25da929 into trinodb:master Jul 29, 2019
@sopel39 sopel39 deleted the ks/lazy_page_processor branch July 29, 2019 08:41
@sopel39 sopel39 mentioned this pull request Jul 29, 2019
5 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

2 participants