Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

8196106: Support nested infinite or recursive flat mapped streams #18625

Closed
wants to merge 4 commits into from

Conversation

viktorklang-ora
Copy link
Contributor

@viktorklang-ora viktorklang-ora commented Apr 4, 2024

This PR implements Gatherer-inspired encoding of flatMap that shows that it is both competitive performance-wise as well as improve correctness.

Below is the performance of Stream::flatMap (for reference types):

Before this PR (on my MacBook, aarch64):

Non-short-circuiting:

Benchmark                 (size)   Mode  Cnt        Score       Error  Units
FlatMap.par_array             10  thrpt   12   257582,480 ? 31360,242  ops/s
FlatMap.par_array            100  thrpt   12    55202,015 ? 14011,668  ops/s
FlatMap.par_array           1000  thrpt   12     6546,252 ?  3675,764  ops/s
FlatMap.par_doublestream      10  thrpt   12   267423,410 ? 37338,089  ops/s
FlatMap.par_doublestream     100  thrpt   12    27140,703 ?  4979,878  ops/s
FlatMap.par_doublestream    1000  thrpt   12     2978,178 ?  1890,250  ops/s
FlatMap.par_intstream         10  thrpt   12   268194,530 ? 37295,092  ops/s
FlatMap.par_intstream        100  thrpt   12    30897,034 ?  5388,245  ops/s
FlatMap.par_intstream       1000  thrpt   12     3969,043 ?  2494,641  ops/s
FlatMap.par_longstream        10  thrpt   12   279756,861 ? 19519,497  ops/s
FlatMap.par_longstream       100  thrpt   12    45733,955 ? 18385,144  ops/s
FlatMap.par_longstream      1000  thrpt   12     5115,615 ?  4147,237  ops/s
FlatMap.seq_array             10  thrpt   12  2937192,934 ? 58605,583  ops/s
FlatMap.seq_array            100  thrpt   12    41859,462 ?   139,021  ops/s
FlatMap.seq_array           1000  thrpt   12      442,677 ?     1,041  ops/s
FlatMap.seq_doublestream      10  thrpt   12  4972651,093 ? 35997,267  ops/s
FlatMap.seq_doublestream     100  thrpt   12    99265,005 ?   193,497  ops/s
FlatMap.seq_doublestream    1000  thrpt   12     1037,030 ?     3,254  ops/s
FlatMap.seq_intstream         10  thrpt   12  5133751,888 ? 23516,294  ops/s
FlatMap.seq_intstream        100  thrpt   12   145166,206 ?   399,263  ops/s
FlatMap.seq_intstream       1000  thrpt   12     1565,004 ?     3,207  ops/s
FlatMap.seq_longstream        10  thrpt   12  5047029,363 ? 24742,300  ops/s
FlatMap.seq_longstream       100  thrpt   12   233225,307 ?  7162,604  ops/s
FlatMap.seq_longstream      1000  thrpt   12     2999,926 ?     9,945  ops/s

// Short-circuiting:

Benchmark                   (size)   Mode  Cnt       Score       Error  Units
FlatMap.par_iterate_double      10  thrpt   12   46336,834 ?  6803,751  ops/s
FlatMap.par_iterate_double     100  thrpt   12   15365,884 ?  2750,656  ops/s
FlatMap.par_iterate_double    1000  thrpt   12    1677,618 ?   174,397  ops/s
FlatMap.par_iterate_int         10  thrpt   12   50275,901 ?  4321,430  ops/s
FlatMap.par_iterate_int        100  thrpt   12   16688,084 ?  2977,145  ops/s
FlatMap.par_iterate_int       1000  thrpt   12    1963,556 ?   384,075  ops/s
FlatMap.par_iterate_long        10  thrpt   12   50923,900 ?  3732,781  ops/s
FlatMap.par_iterate_long       100  thrpt   12   16660,676 ?  2086,627  ops/s
FlatMap.par_iterate_long      1000  thrpt   12    1949,490 ?   414,274  ops/s
FlatMap.par_iterate_ref         10  thrpt   12   26493,083 ?  5345,774  ops/s
FlatMap.par_iterate_ref        100  thrpt   12   10526,834 ?   165,435  ops/s
FlatMap.par_iterate_ref       1000  thrpt   12     923,592 ?     6,473  ops/s
FlatMap.seq_iterate_double      10  thrpt   12  457939,954 ? 21836,434  ops/s
FlatMap.seq_iterate_double     100  thrpt   12   10362,293 ?    79,699  ops/s
FlatMap.seq_iterate_double    1000  thrpt   12     101,664 ?     0,869  ops/s
FlatMap.seq_iterate_int         10  thrpt   12  438688,302 ?  8519,874  ops/s
FlatMap.seq_iterate_int        100  thrpt   12   10280,163 ?    94,136  ops/s
FlatMap.seq_iterate_int       1000  thrpt   12     100,687 ?     1,199  ops/s
FlatMap.seq_iterate_long        10  thrpt   12  437124,344 ? 39209,828  ops/s
FlatMap.seq_iterate_long       100  thrpt   12   10218,305 ?    80,571  ops/s
FlatMap.seq_iterate_long      1000  thrpt   12     100,560 ?     1,228  ops/s
FlatMap.seq_iterate_ref         10  thrpt   12  338190,816 ?  7213,549  ops/s
FlatMap.seq_iterate_ref        100  thrpt   12    8453,383 ?    44,707  ops/s
FlatMap.seq_iterate_ref       1000  thrpt   12      77,174 ?     0,701  ops/s

After this PR (on my MacBook, aarch64):

Non-short-circuiting:

Benchmark                 (size)   Mode  Cnt        Score       Error  Units
FlatMap.par_array             10  thrpt   12   309043,346 ? 22267,788  ops/s
FlatMap.par_array            100  thrpt   12    63773,183 ? 18772,809  ops/s
FlatMap.par_array           1000  thrpt   12     8454,577 ?   223,855  ops/s
FlatMap.par_doublestream      10  thrpt   12   287825,910 ? 40801,718  ops/s
FlatMap.par_doublestream     100  thrpt   12    29102,473 ?  4706,576  ops/s
FlatMap.par_doublestream    1000  thrpt   12     3966,528 ?  1915,358  ops/s
FlatMap.par_intstream         10  thrpt   12   291119,116 ? 32542,028  ops/s
FlatMap.par_intstream        100  thrpt   12    37945,217 ?  6986,065  ops/s
FlatMap.par_intstream       1000  thrpt   12     5004,935 ?  2325,120  ops/s
FlatMap.par_longstream        10  thrpt   12   306691,186 ? 30836,677  ops/s
FlatMap.par_longstream       100  thrpt   12    39204,098 ?  6159,602  ops/s
FlatMap.par_longstream      1000  thrpt   12     5197,154 ?  4293,582  ops/s
FlatMap.seq_array             10  thrpt   12  2852595,288 ? 19117,935  ops/s
FlatMap.seq_array            100  thrpt   12    41792,834 ?   125,631  ops/s
FlatMap.seq_array           1000  thrpt   12     1085,509 ?  1226,692  ops/s
FlatMap.seq_doublestream      10  thrpt   12  5405153,919 ? 25352,824  ops/s
FlatMap.seq_doublestream     100  thrpt   12    99297,136 ?   170,830  ops/s
FlatMap.seq_doublestream    1000  thrpt   12     1038,508 ?     1,066  ops/s
FlatMap.seq_intstream         10  thrpt   12  5558941,679 ? 25526,270  ops/s
FlatMap.seq_intstream        100  thrpt   12   145225,641 ?   334,990  ops/s
FlatMap.seq_intstream       1000  thrpt   12     1567,861 ?     2,790  ops/s
FlatMap.seq_longstream        10  thrpt   12  5592000,353 ? 20322,575  ops/s
FlatMap.seq_longstream       100  thrpt   12   241101,180 ?  9170,915  ops/s
FlatMap.seq_longstream      1000  thrpt   12     2132,999 ?  1589,173  ops/s

// Short-circuiting:

Benchmark                   (size)   Mode  Cnt        Score        Error  Units
FlatMap.par_iterate_double      10  thrpt   12    46354,848 ?   6372,823  ops/s
FlatMap.par_iterate_double     100  thrpt   12    15851,059 ?   2942,830  ops/s
FlatMap.par_iterate_double    1000  thrpt   12     1778,464 ?     27,042  ops/s
FlatMap.par_iterate_int         10  thrpt   12    51879,241 ?   6759,903  ops/s
FlatMap.par_iterate_int        100  thrpt   12    16458,214 ?   3206,146  ops/s
FlatMap.par_iterate_int       1000  thrpt   12     2421,501 ?      5,582  ops/s
FlatMap.par_iterate_long        10  thrpt   12    47412,155 ?   9957,661  ops/s
FlatMap.par_iterate_long       100  thrpt   12    17745,066 ?   3519,614  ops/s
FlatMap.par_iterate_long      1000  thrpt   12     2413,274 ?     27,152  ops/s
FlatMap.par_iterate_ref         10  thrpt   12    24754,662 ?   5144,389  ops/s
FlatMap.par_iterate_ref        100  thrpt   12    10535,374 ?    273,802  ops/s
FlatMap.par_iterate_ref       1000  thrpt   12      990,926 ?    153,386  ops/s
FlatMap.seq_iterate_double      10  thrpt   12   941441,571 ?  72849,560  ops/s
FlatMap.seq_iterate_double     100  thrpt   12    26116,346 ?    225,451  ops/s
FlatMap.seq_iterate_double    1000  thrpt   12      285,873 ?      1,924  ops/s
FlatMap.seq_iterate_int         10  thrpt   12  1097970,188 ? 365428,186  ops/s
FlatMap.seq_iterate_int        100  thrpt   12    28136,050 ?    136,249  ops/s
FlatMap.seq_iterate_int       1000  thrpt   12      316,555 ?      0,687  ops/s
FlatMap.seq_iterate_long        10  thrpt   12  1021716,173 ?   8729,180  ops/s
FlatMap.seq_iterate_long       100  thrpt   12    28290,057 ?    264,404  ops/s
FlatMap.seq_iterate_long      1000  thrpt   12      316,620 ?      1,906  ops/s
FlatMap.seq_iterate_ref         10  thrpt   12   820498,644 ? 167233,794  ops/s
FlatMap.seq_iterate_ref        100  thrpt   12    13583,900 ?    116,079  ops/s
FlatMap.seq_iterate_ref       1000  thrpt   12      158,580 ?      1,977  ops/s

Progress

  • Change must be properly reviewed (1 review required, with at least 1 Reviewer)
  • Change must not contain extraneous whitespace
  • Commit message must refer to an issue

Issue

  • JDK-8196106: Support nested infinite or recursive flat mapped streams (Bug - P4)

Reviewers

Reviewing

Using git

Checkout this PR locally:
$ git fetch https://git.openjdk.org/jdk.git pull/18625/head:pull/18625
$ git checkout pull/18625

Update a local copy of the PR:
$ git checkout pull/18625
$ git pull https://git.openjdk.org/jdk.git pull/18625/head

Using Skara CLI tools

Checkout this PR locally:
$ git pr checkout 18625

View PR using the GUI difftool:
$ git pr show -t 18625

Using diff file

Download this PR as a diff file:
https://git.openjdk.org/jdk/pull/18625.diff

Webrev

Link to Webrev Comment

@bridgekeeper
Copy link

bridgekeeper bot commented Apr 4, 2024

👋 Welcome back vklang! A progress list of the required criteria for merging this PR into master will be added to the body of your pull request. There are additional pull request commands available for use with this pull request.

@openjdk
Copy link

openjdk bot commented Apr 4, 2024

@viktorklang-ora This change now passes all automated pre-integration checks.

ℹ️ This project also has non-automated pre-integration requirements. Please see the file CONTRIBUTING.md for details.

After integration, the commit message for the final commit will be:

8196106: Support nested infinite or recursive flat mapped streams

Reviewed-by: psandoz

You can use pull request commands such as /summary, /contributor and /issue to adjust it as needed.

At the time when this comment was updated there had been 174 new commits pushed to the master branch:

  • 140f567: 8323220: Reassociate loop invariants involved in Cmps and Add/Subs
  • a293bdf: 8330197: Make javac/diags/example release agnostic
  • da75e01: 8330196: Make java/lang/invoke/defineHiddenClass/BasicTest release agnostic
  • ddc3921: 8330103: Compiler memory statistics should keep separate records for C1 and C2
  • 273df62: 8328792: Parallel: Refactor PSParallelCompact::summary_phase
  • a3fecdb: 8329764: G1: Handle null references during verification first
  • 60d88b7: 8330176: Typo in Linker javadoc
  • 3f1d9c4: 8329257: AIX: Switch HOTSPOT_TOOLCHAIN_TYPE from xlc to gcc
  • 5404b4e: 8330105: SharedRuntime::resolve* should respect interpreter-only mode
  • d22d560: 8329864: TestLibGraal.java still crashes with assert(_stack_base != nullptr)
  • ... and 164 more: https://git.openjdk.org/jdk/compare/db159149c1c13a98ee9a85750741c6a3cd39f408...master

As there are no conflicts, your changes will automatically be rebased on top of these commits when integrating. If you prefer to avoid this automatic rebasing, please check the documentation for the /integrate command for further details.

➡️ To integrate this PR with the above commit message to the master branch, type /integrate in a new comment.

@openjdk
Copy link

openjdk bot commented Apr 4, 2024

@viktorklang-ora The following label will be automatically applied to this pull request:

  • core-libs

When this pull request is ready to be reviewed, an "RFR" email will be sent to the corresponding mailing list. If you would like to change these labels, use the /label pull request command.

@openjdk openjdk bot added the core-libs core-libs-dev@openjdk.org label Apr 4, 2024
@viktorklang-ora
Copy link
Contributor Author

@PaulSandoz @AlanBateman I've added a commit to this PR which removes the use of Gatherer for Stream::flatMap, but instead implements flatMap for all of the pipelines using the same encoding which Gatherer would use. It seems very competitive performance-wise, and resolves at least one open JBS-issue with flatMap (will look to see if it resolves more than that)

@PaulSandoz
Copy link
Member

@PaulSandoz @AlanBateman I've added a commit to this PR which removes the use of Gatherer for Stream::flatMap, but instead implements flatMap for all of the pipelines using the same encoding which Gatherer would use. It seems very competitive performance-wise, and resolves at least one open JBS-issue with flatMap (will look to see if it resolves more than that)

That's a rather clever use of allMatch!

Did you observe performance improvements using @Stable on the cancel field? It's really hard to predict in the abstract (since the default value of the field will be read in proportion to the number of elements until the stream is cancelled).

@viktorklang-ora
Copy link
Contributor Author

@PaulSandoz @AlanBateman I've added a commit to this PR which removes the use of Gatherer for Stream::flatMap, but instead implements flatMap for all of the pipelines using the same encoding which Gatherer would use. It seems very competitive performance-wise, and resolves at least one open JBS-issue with flatMap (will look to see if it resolves more than that)

That's a rather clever use of allMatch!

Thank you :)

Did you observe performance improvements using @Stable on the cancel field? It's really hard to predict in the abstract (since the default value of the field will be read in proportion to the number of elements until the stream is cancelled).

I'm currently exploring some different venues for optimization. Stay tuned :)

@viktorklang-ora viktorklang-ora marked this pull request as ready for review April 9, 2024 09:56
@openjdk openjdk bot added the rfr Pull request is ready for review label Apr 9, 2024
@mlbridge
Copy link

mlbridge bot commented Apr 9, 2024

@viktorklang-ora
Copy link
Contributor Author

@AlanBateman @PaulSandoz Moving this to a candidate PR, see the updated description for performance numbers, the sequential improvements are very encouraging, as well as the improvements which addresses https://bugs.openjdk.org/browse/JDK-8196106

DoubleConsumer downstreamAsDouble = downstream::accept;
class FlatMap implements Sink.OfDouble, DoublePredicate {
boolean cancel;
private final boolean shorts = isShortCircuitingPipeline();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's a known, short-circuiting, operation in the pipeline, we have to use allMatch but in the case where we know that there's nothing which could short-circuit we can go down a fast-path to forEach.

@openjdk
Copy link

openjdk bot commented Apr 12, 2024

@viktorklang-ora Please do not rebase or force-push to an active PR as it invalidates existing review comments. Note for future reference, the bots always squash all changes into a single commit automatically as part of the integration. See OpenJDK Developers’ Guide for more information.

@viktorklang-ora
Copy link
Contributor Author

@PaulSandoz @AlanBateman This should be ready for review now, if you find some time :)

Copy link
Member

@PaulSandoz PaulSandoz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice work.

@openjdk openjdk bot added the ready Pull request is ready to be integrated label Apr 12, 2024
@forax
Copy link
Member

forax commented Apr 13, 2024

Hello, i will not pretend I fully understand the code, but i think it's important to keep a constistent code style.
(and no I do not want to start an holy war about the code style, I just think we should keep it consistent)

@viktorklang-ora
Copy link
Contributor Author

Just pushed another commit addressing feedback from @forax. (FYI @PaulSandoz & @AlanBateman)

@viktorklang-ora
Copy link
Contributor Author

/integrate

@openjdk
Copy link

openjdk bot commented Apr 16, 2024

Going to push as commit 8a5b86c.
Since your change was applied there have been 180 commits pushed to the master branch:

  • 58911cc: 8188784: javax/management/notification/BroadcasterSupportDeadlockTest.java - TEST FAILED: deadlock
  • 97c1808: 8329099: Undocumented exception thrown by Instruction factory methods accepting Opcode
  • def2577: 8330215: Trim working set for OldObjectSamples
  • 2f11afd: 8330172: G1: Consolidate update_bot_for_block and update_bot_for_obj in HeapRegion
  • 31a1f9c: 8307143: CredentialsCache.cacheName should not be static
  • 274c805: 8327743: JVM crash in hotspot/share/runtime/javaThread.cpp - failed: held monitor count should be equal to jni: 0 != 1
  • 140f567: 8323220: Reassociate loop invariants involved in Cmps and Add/Subs
  • a293bdf: 8330197: Make javac/diags/example release agnostic
  • da75e01: 8330196: Make java/lang/invoke/defineHiddenClass/BasicTest release agnostic
  • ddc3921: 8330103: Compiler memory statistics should keep separate records for C1 and C2
  • ... and 170 more: https://git.openjdk.org/jdk/compare/db159149c1c13a98ee9a85750741c6a3cd39f408...master

Your commit was automatically rebased without conflicts.

@openjdk openjdk bot added the integrated Pull request has been integrated label Apr 16, 2024
@openjdk openjdk bot closed this Apr 16, 2024
@openjdk openjdk bot removed ready Pull request is ready to be integrated rfr Pull request is ready for review labels Apr 16, 2024
@openjdk
Copy link

openjdk bot commented Apr 16, 2024

@viktorklang-ora Pushed as commit 8a5b86c.

💡 You may see a message that your pull request was closed with unmerged commits. This can be safely ignored.

@charpov
Copy link

charpov commented Nov 27, 2024

I just noticed this improved performance in Java 23. However, it still doesn't fit my needs because this doesn't work:

Stream.of(1)
.flatMap(c -> Stream.generate(() -> 1).flatMap(x -> Stream.generate(() -> x)))
.iterator()
.next()

Should it work? And how else can one use some stream elements in a non-terminal way?
(See my forum question here: https://forums.oracle.com/ords/apexds/post/laziness-of-stream-iterators-3439.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core-libs core-libs-dev@openjdk.org integrated Pull request has been integrated
Development

Successfully merging this pull request may close these issues.

4 participants