Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix OverflowException in MultiSubscriptionSubscriber #2576

Merged
merged 7 commits into from Feb 11, 2021

Conversation

koldat
Copy link
Contributor

@koldat koldat commented Jan 27, 2021

Fixes:

public class FFTest {
    private static final Logger log = LoggerFactory.getLogger(FFTest.class);

    @Test
    public void test() {
        int round = 0;
        try {
            while (true) {
                Flux.range(0, 10)
                    .concatWithValues(10, 11, 12, 13)
                    .concatWith(Flux.range(14, 100 - 14))
                    .limitRate(16, 2)
                    .publishOn(Schedulers.boundedElastic(), 16)
                    .subscribeOn(Schedulers.boundedElastic())
                    .blockLast();
                log.info("Round : " + (round++));
            }
        } catch(Exception e) {
            log.error("", e);
        }
    }
}
13:06:20.429 [main] INFO com.ca.apm.tas.FFTest - Round : 0
......
13:06:20.442 [main] INFO com.ca.apm.tas.FFTest - Round : 38
13:06:20.442 [main] INFO com.ca.apm.tas.FFTest - Round : 39
13:06:20.450 [main] ERROR com.ca.apm.tas.FFTest - 
reactor.core.Exceptions$OverflowException: Queue is full: Reactive Streams source doesn't respect backpressure
	at reactor.core.Exceptions.failWithOverflow(Exceptions.java:234)
	at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.onNext(FluxPublishOn.java:232)
	at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onNext(FluxConcatArray.java:183)
	at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:154)
	at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:109)

Problem is that reactor.core.publisher.Operators.MultiSubscriptionSubscriber.drainLoop() is double counting missedRequested in case that missedSubscription happens during evaluation (two cycles). Example values:

First iteration in drainLoop:
missedRequested = 12, missedProduced = 0, requested = 2, missedSubscription = null,
requestTarget = log
requestAmount = 12

Second iteration in drainLoop:
missedRequested = 0, missedProduced = 0, requested = 14, missedSubscription = range
requestTarget = range
requestAmount = 26

See that missedRequested (12) was counted twice so requestAmount is 26 instead of 14.

@koldat
Copy link
Contributor Author

koldat commented Jan 28, 2021

Just pasting another test that this PR fixes (#2567) to be complete. It is about emitting one value twice from Flux.just:

    private static Scheduler reader = Schedulers.newBoundedElastic(32, Integer.MAX_VALUE, "tas-reader");
    
    public static void main(String[] args) throws InterruptedException {
        int round = 0;
        while (true) {
        	Mono.just(0)
        	.flatMapMany(i -> {
        		return Flux.range(0,10)
	                .publishOn(reader)
	                .concatWithValues(10)
	                .concatWithValues(11)
	                .concatWithValues(12)
	                .concatWithValues(13)
	                .concatWithValues(14)
	                .concatWithValues(15)
	                .concatWithValues(16)
	                .concatWithValues(17)
	                .concatWith(Flux.range(18, 100-18));
        	})
            //.log()
            .publishOn(reader, 16)
            //.log()
            .subscribeOn(reader)
            .blockLast();

        	System.out.println("Round : " + (round++));
        }
    }
28: [tas-reader-2] - reactor.Flux.PublishOn.81446 | onNext(11)
29: [tas-reader-1] - reactor.Flux.MonoFlatMapMany.81445 onNext(13)
30: [tas-reader-2] - reactor.Flux.MonoFlatMapMany.81445 request(12)
31: [tas-reader-1] - reactor.Flux.MonoFlatMapMany.81445 onNext(14) <-- Here is duplicate
32: [tas-reader-2] - reactor.Flux.MonoFlatMapMany.81445 onNext(14) <-- Here is duplicate
33: [tas-reader-1] - reactor.Flux.MonoFlatMapMany.81445 onNext(15)
34: [tas-reader-1] - reactor.Flux.MonoFlatMapMany.81445 onNext(16)

@OlegDokuka
Copy link
Contributor

OlegDokuka commented Jan 28, 2021

@koldat looking at the code now. Thinking if we can do it better. I'm not sure all the volatiles you added should be there. It can be a false positive scenario that everything works since you have added extra volatiles that should not be there. I believe that the root cause is that multi subscription subscriber.

@koldat
Copy link
Contributor Author

koldat commented Jan 28, 2021

@OlegDokuka - all volatiles I added needs to be there as there is no "synchronized" that make a barrier. Threads has no way of finding out that memory was changed during race on places I updated. We only protect area using WIP. We should be better safe than sorry. These took me 3 days until I found correct place. It is very hard to find as it is extremely rare and heavy multithread. But if you want we can take it one by one. But there is always a possibility that two threads (one from up and one from down) will be going to these variables area as we have onComplete (from up) and request (from down or both as it transfer to onNext).

@OlegDokuka
Copy link
Contributor

OlegDokuka commented Jan 28, 2021

@koldat it is a little tricky. There are guarantees provided by the reactive streams spec which already provides memory barriers. In addition, the execution itself is sequential, thus in many cases, volatiles are just redundant.

@koldat
Copy link
Contributor Author

koldat commented Jan 28, 2021

@OlegDokuka I would like to read the source as what Java barrier guarantee is all changes inside synchronized over same monitor. This would hold the lock. But maybe reactor does that different way which I would like to see as I did not found any memory barriers around calls. Can you point me to one example in sources?

@OlegDokuka
Copy link
Contributor

OlegDokuka commented Jan 28, 2021

@koldat there is a pattern so-called "work in progress" (please have a look here, here, here, here and here), which uses the volatile field to increment and decrement a counter. As you may notice, there is a check before entering the critical section which checks that nobody is working now, and if somebody works, the counter will be just incremented (according to the Java Memory Model, all the non-volatile writes that happen before the write memory barrier must be available to the reader. In the same letter, once the work is done, wip field is decremented, chance all the action did within the critical section MUST be available to the next worker which reads on the same volatile field. That said. if we have WIP guarding at any place in our code, we have guaranteed that this section will not have any racing at all.

Apart, feel free to check places in the spec where words serial and thread-safe are mentioned. The spec says that access to subscription MUST be serial as well as a producer MUST call the subscriber in a serial fashion. Though we still have a possibility of racing onNext with subscription#request, we have in many places the Work in progress guard which should (I say should since we may have a bug as it is in MultisubscriptionSubscriber now) prevent racing in such scenarios

@koldat
Copy link
Contributor Author

koldat commented Jan 28, 2021

You probably refer to reactor.core.Fuseable.THREAD_BARRIER - /**
* Indicates that the queue will be drained from another thread
* thus any queue-exit computation may be invalid at that point.*/

It is just a flag for fusion. In this example two threads are racing there thus needs to follow memory behavior. For example:
use PRODUCED.incrementAndGet(this); instead of produced++;

produced variable is read under WIP, but onNext does not have WIP so two thread can modify the field.

And even if onNext has WIP no one guarantee produced field will be consistent with changes as it is not volatile. And even if it would be volatile one need to use atomic operation as this would break it:

					long c = produced;
					if (c != 0L) {
						produced = 0L; <-- Other thread can increment right before this
						produced(c);
					}

correct is

					long c = PRODUCED.getAndSet(this, 0);

etc. Similar is in MultiSubscriptionSubscriber. subscription and requested fields are protected by WIP. But if two threads will go for change (will get WIP access) microseconds in a row then second thread can have old result. This probability is very small, because caches will be most likelly flushed, but I think possibility is there as queue can be drained by two different threads.

@OlegDokuka
Copy link
Contributor

OlegDokuka commented Jan 28, 2021

@koldat if you refer to FluxConcatArray, then onNext can not race with another onXXX method (that are guarantees mandated by the spec and that is how all the producers are implemented in the reactor (unless a bug somewhere)), hence modifying non-volatile fields within that methods are safe.

@OlegDokuka
Copy link
Contributor

OlegDokuka commented Jan 28, 2021

Also, when an operator was fussed, we are under another (internal agreement) guarantee that says that if the operator was fused in ASYNC or SYNC fashion, then the producer will not be sending any real signals over onNext, but rather will be signaling that a queue has values to be drained under the WIP guard. Hence, it will never happen (unless a bug) that onNext modifying a non-volatile field simultaneously with logic in the drain loop

@koldat
Copy link
Contributor Author

koldat commented Jan 28, 2021

@OlegDokuka I agree that FluxConcatArray produced volatile you are right, because most likely switched thread will not have cached such a memory.

But I do not agree with your view on JVM memory barrier as it has nothing with WIP pattern which is based on atomics and is protecting you only in a way that a thread has an access to variables defined by contract. It does not guarantee you contents of variable that is not volatile in any way. Only the one that are volatile has guaranteed value under WIP.

But if you guarantee it will never happen I can live with that. I would just not be that sure, because threading defects are mostly impossible to find even with such a simple and strict unit test. If it would be a money transaction I would not be happy to count it twice even if everyone will say it cannot happen.

So should I revert volatile changes?

@OlegDokuka
Copy link
Contributor

OlegDokuka commented Jan 28, 2021

If tests still pass without volatiles - then yes, please

@OlegDokuka
Copy link
Contributor

OlegDokuka commented Jan 29, 2021

I mean, it does not pass for me. But I'm still trying to figure out what goes wrong there

@koldat
Copy link
Contributor Author

koldat commented Jan 29, 2021

I have reverted volatile changes. Only fix for double count and Flux.,just is there. Tests pass.

@OlegDokuka
Copy link
Contributor

@koldat can you please add the mentioned test to make sure changes works as expected

Copy link
Member

@simonbasle simonbasle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there no way to reproduce the issue in a unit test or a JCStress test ? even a test that was proven to fail locally when using eg. the repeat until failure feature of IntelliJ, but now passes? I know, I know, heisenbugs, the lack of failure doesn't prove it can't happen, but it would still be useful to have something, especially to demonstrate what you think is the cause and example of a situation that can lead to the issue?

as a side note, and smaller nitpick, we could use this opportunity to get rid of the weird terminado wording (which I think was an early inside "easter egg")

@OlegDokuka I'll defer to you for the rest of the review

@koldat
Copy link
Contributor Author

koldat commented Feb 2, 2021

I changed terminado -> terminated. Let me know if you have better name.

I am just strugling where to put the test. I can put there 1000 cycles as it was failing mostly before 200 cycles. Then the test would not need to take that much time. Are there already any tests like this (stress ones) so I can put it to same place? Any pointer?

@simonbasle
Copy link
Member

mmh there are stress tests like this inside ordinary test classes per operator (eg in this case FluxJustTest.java I guess). In master there are also separate stress tests using JCStress (but that has a learning curve, like JMH), but this targets 3.3.x anyway.

@koldat
Copy link
Contributor Author

koldat commented Feb 10, 2021

I have added three tests that runs around 500ms. All of them fails randomly (every 3-5 runs) before fix and do not fail after fix.

Copy link
Member

@simonbasle simonbasle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. @OlegDokuka ?

Copy link
Contributor

@OlegDokuka OlegDokuka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets not edit WeakScalarSubscription for now but rather use ScalarSubscription instead of WeakScalarSubscription there -> https://github.com/koldat/reactor-core/blob/fix_overflow_race/reactor-core/src/main/java/reactor/core/publisher/FluxJust.java#L70

@simonbasle
Copy link
Member

Yes I just seen that as well. I can change it to make test passing. Is it ok?

yes as a stopgap measure, but we'll need to follow up on that, just is more useful as a step name than scalarSubscription(foo) I think. hopefully this doesn't break anybody, as stepName is intended for human consumption 🤞

@simonbasle
Copy link
Member

simonbasle commented Feb 11, 2021

created #2611 for the follow up this was addressed in this PR directly

Copy link
Contributor

@OlegDokuka OlegDokuka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@koldat
Copy link
Contributor Author

koldat commented Feb 11, 2021

Let me add that attribute to not break some user tests. Give me a minute.

@simonbasle
Copy link
Member

Let me add that attribute to not break some user tests. Give me a minute.

As you wish. It can be a String field prepopulated with the current stepName with an additional constructor and corresponding Operators.scalarSubscription method that take a precomputed stepName instead. Here is the sketch of the additional scalarSubscription I just came up with:

	/**
	 * Represents a fuseable Subscription that emits a single constant value synchronously
	 * to a Subscriber or consumer. Also give the subscription a user-defined {@code stepName}
	 * for the purpose of {@link Scannable#stepName()}.
	 *
	 * @param subscriber the delegate {@link Subscriber} that will be requesting the value
	 * @param value the single value to be emitted
	 * @param stepName the {@link String} to represent the {@link Subscription} in {@link Scannable#stepName()}
	 * @param <T> the value type
	 * @return a new scalar {@link Subscription}
	 */
	public static <T> Subscription scalarSubscription(CoreSubscriber<? super T> subscriber,
			T value, String stepName) {
		return new ScalarSubscription<>(subscriber, value, stepName);

@koldat
Copy link
Contributor Author

koldat commented Feb 11, 2021

Pushed, please check

Copy link
Member

@simonbasle simonbasle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

last couple of small adjustments, I promise 😆
this is looking good, thanks for your effort 👍

Copy link
Member

@simonbasle simonbasle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @koldat ! looks like I can scrap the follow up issue 👍 🙇 🎁

@koldat
Copy link
Contributor Author

koldat commented Feb 11, 2021

yay! :) Thank you guys and sorry for so many turn arounds.

@simonbasle simonbasle changed the title Fixes OverflowException in MultiSubscriptionSubscriber #2567 Fix OverflowException in MultiSubscriptionSubscriber Feb 11, 2021
@simonbasle simonbasle merged commit 31254c6 into reactor:3.3.x Feb 11, 2021
@reactorbot
Copy link

@simonbasle this PR seems to have been merged on a maintenance branch, please ensure the change is merge-forwarded to intermediate maintenance branches and up to master 🙇

@simonbasle
Copy link
Member

Forward-merge commit into 3.4.3 is d22f352

simonbasle added a commit that referenced this pull request Feb 11, 2021
 - ScalarSubscription scan of TERMINATED shouldn't take the cancelled
 state into account (none of the other operators do that when the two
 states can be distinguished)
 - tests have been amended accordingly (including FluxJustTest)
simonbasle added a commit that referenced this pull request Feb 11, 2021
- ScalarSubscription scan of TERMINATED shouldn't take the cancelled
 state into account (none of the other operators do that when the two
 states can be distinguished)
 - tests have been amended accordingly (including FluxJustTest)
simonbasle added a commit that referenced this pull request Feb 11, 2021
This commit makes ScalarSubscription advertise a RunStyle.SYNC, which
can be safely assumed of any ScalarSubscription, and is congruent with
the RunStyle of `FluxJust` prior to it switching to the mutualized
implementation.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

concatWithValues produce sometimes two same values instead of one or out of order under heavy load
4 participants