Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ArrayIndexOutOfBoundsException when using ParallelFlux #1293

Closed
blake-bauman opened this issue Aug 3, 2018 · 5 comments
Closed

ArrayIndexOutOfBoundsException when using ParallelFlux #1293

blake-bauman opened this issue Aug 3, 2018 · 5 comments
Assignees
Labels
type/bug A general bug
Milestone

Comments

@blake-bauman
Copy link

We recently updated from 3.1.7 to 3.1.8 and code that was previously working now fails with an exception:

java.lang.ArrayIndexOutOfBoundsException: -1
	at reactor.core.publisher.ParallelLiftFuseable.subscribe(ParallelLiftFuseable.java:74) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.ParallelMergeReduce.subscribe(ParallelMergeReduce.java:63) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:56) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:56) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.MonoPeekTerminal.subscribe(MonoPeekTerminal.java:61) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:56) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.Mono.subscribe(Mono.java:3080) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.Mono.subscribeWith(Mono.java:3188) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.Mono.subscribe(Mono.java:2965) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at me.MyClass$MyRunnable.run() ~[classes/:?]
	at reactor.core.scheduler.PeriodicSchedulerTask.call(PeriodicSchedulerTask.java:49) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.scheduler.PeriodicSchedulerTask.run(PeriodicSchedulerTask.java:63) [reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at org.springframework.cloud.sleuth.instrument.async.TraceRunnable.run(TraceRunnable.java:62) [spring-cloud-sleuth-core-2.0.0.RELEASE.jar:2.0.0.RELEASE]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_144]
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_144]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_144]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_144]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_144]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_144]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144]

Looking at the code for ParallelLiftFuseable.java:74, I see:

		int i = 0;
		while (i < subscribers.length) {
			CoreSubscriber<? super O> actual = s[i - 1];

Seems like i will always be 0 in the first iteration, thus s[i - 1] will always be s[-1] and throw an exception every time.

Steps to reproduce

Where this is failing is a somewhat complex parallel/reduce operation. I'm trying to come up with a simple example to demonstrate, but not having any luck. However, looking at the above code, it's hard to see how this class could ever work.

Reactor Core version

Started failing in 3.1.8. Previous version (3.1.7) works fine.

JVM version (e.g. java -version)

JRE is 10.0.2, but compile target 1.8.

@simonbasle
Copy link
Member

simonbasle commented Aug 3, 2018

🤦‍♂️ oh god yeah, that can't work...
Are you using some hooks, or a library that does (like Spring Sleuth)?
You are using collect to perform the reduce I assume?

@simonbasle simonbasle added this to the 3.1.9.RELEASE milestone Aug 3, 2018
@simonbasle simonbasle added the type/bug A general bug label Aug 3, 2018
@simonbasle simonbasle self-assigned this Aug 3, 2018
@blake-bauman
Copy link
Author

Yes on Spring Sleuth.

Reducing using ParallelFlux.reduce(). Something similar to:

Flux.concat(tasks)
    .parallel(numRails)
    .runOn(scheduler)
    .reduce(HashMap::new, (map, value) -> {
        map.put(value, value);
        return map;
    })
    .reduce((a, b) -> {
        a.putAll(b);
        return a;
    })

@simonbasle
Copy link
Member

as a possible workaround, can you try to append .hide() after the last parallel operator in the chain (should still be a ParallelFlux, here in your example that'd be after the first reduce I reckon).

@blake-bauman
Copy link
Author

Didn't seem to change anything. Also tried adding hide() at other places, too.

@simonbasle
Copy link
Member

Ah yeah the hide() can't change anything as Sleuth installs the incriminating hook on every operator, not just the last in a chain. So it still sees the Fuseable ParallelReduce and wraps it in the (bugged) lift function 😢

simonbasle added a commit that referenced this issue Aug 7, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug A general bug
Projects
None yet
Development

No branches or pull requests

2 participants