Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flux/Mono.usingWhen late resource memory leak #3695

Closed
mlozbin opened this issue Jan 16, 2024 · 14 comments
Closed

Flux/Mono.usingWhen late resource memory leak #3695

mlozbin opened this issue Jan 16, 2024 · 14 comments
Labels
status/invalid We don't feel this issue is valid, or the root cause was found outside of Reactor

Comments

@mlozbin
Copy link

mlozbin commented Jan 16, 2024

Depending on the resource acquisition mechanism usingWhen method may lead to a memory or resource leak. Latest related fixes assured that provided resource will be closed or subscription will be cancelled. The problem is that cancellation process most often is not deterministic and either not intuitive or impossible to implement (side effects during resource creation). Yes, sometimes cancelling the resource subscription may save some resources if we cancel it but in most cases it leads to resource leaks that are very hard to find and fix (the amount of existing issues for this method is a good proof of this point).

This issue is a result of investigation of previously created issues (#1233, #2661, #124, #2836) and should meet all the requirements.

Expected Behavior

asyncCancel (respective asyncCleanup) should be always called.

Actual Behavior

Resource creation process was started but pipeline was cancelled so cleanup never happened.

Steps to Reproduce

Ensure some delay in the resource allocation
Cancel the main pipeline before the resource is emitted.

Below is the modified version of an existing test that shows that problem still exists if resource allocation is long. Existing test passes because resource allocation is delayed (so can be cancelled) but not long.
Random is not needed since problem is 100% reproducible.

	@Test
	void cancelEarlyDoesNotLeak() {
		Releaseable releaseable = new Releaseable();
		Mono<Long> mono = Mono.usingWhen(Mono.fromSupplier(() -> {
			LockSupport.parkNanos(Duration.ofMillis(150).toNanos());
			releaseable.allocate();
			return releaseable;
		}), it -> Mono.just(1L), it -> Mono.fromRunnable(releaseable::release));

		StepVerifier.create(mono, 1)
		            .thenAwait(Duration.ofMillis(50))
		            .thenCancel()
		            .verify();

		Awaitility.await()
		          .atMost(Duration.ofSeconds(10))
		          .until(releaseable::wasAcquired);
		Awaitility.await()
		          .atMost(Duration.ofSeconds(10))
		          .until(releaseable::wasReleased);
	}

	static class Releaseable {

		private final AtomicBoolean allocated = new AtomicBoolean();
		private final AtomicBoolean released = new AtomicBoolean();

		void allocate() {
			System.out.println("alloc");
			allocated.set(true);
		}

		void release() {
			System.out.println("release");
			released.set(true);
		}

		boolean wasReleased() {
			return released.get();
		}

		boolean wasAcquired() {
			return released.get();
		}
	}

Possible Solution

Remove this cancel method override from ResourceSubscriber that forces early cancel for resource pipeline. That will allow the process to gracefully finish, emit the resource and do a cleanup.

		@Override
		public void cancel() {
			if (!resourceProvided) {
				resourceSubscription.cancel();
			}

			super.cancel();
		}

Your Environment

Reactor version(s) used: 3.6.3
Other relevant libraries versions (eg. netty, ...): -
JVM version (java -version): Java 17
OS and version (eg uname -a): Windows 11

@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Jan 16, 2024
mlozbin pushed a commit to mlozbin/reactor-core that referenced this issue Jan 16, 2024
UsingWhen method may lead to a memory or resource leak of allocation was finished after the main pipeline was cancelled. This change assures that asyncCancel is always called even if deferred.

Fixes reactor#3695.
@Kindrat
Copy link

Kindrat commented Feb 19, 2024

Anyone? We consider this to be pretty critical since it prevents proper resource clean up.

@chemicL
Copy link
Member

chemicL commented Feb 20, 2024

I'll have a look at it this week. @mlozbin I see you submitted a PR but failed to sign the CLA till this point. Are you planning to take care of that? I've delayed looking at this contribution and checked occasionally the CLA status, but I see there's more interest in this subject now so let's gauge the status quo and try to make some progress.

@chemicL
Copy link
Member

chemicL commented Feb 20, 2024

Just FYI I pasted the provided reproducer and the test seems to pass (ran it 1k times) in the current main branch of reactor-core (3.6.4 snapshot). Am I missing something here?

@chemicL
Copy link
Member

chemicL commented Feb 20, 2024

I updated the reproducer as it had two issues:

  1. Using the same Thread caused the validation to coordinate with the subscription - the reproducer needs an asynchronous execution.
  2. Releasable#wasAcquired used the wrong variable (released), it should use allocated.

I changed the class/variable names to make it possible to paste into MonoUsingWhenTest and indeed it fails consistently now:

@Test
void cancelEarlyDoesNotLeak2() {
	Releaseable2 releaseable = new Releaseable2();
	Mono<Long> mono = Mono.usingWhen(Mono.fromSupplier(() -> {
		LockSupport.parkNanos(Duration.ofMillis(150).toNanos());
		releaseable.allocate();
		return releaseable;
	}).subscribeOn(Schedulers.boundedElastic()), // added
			it -> Mono.just(1L),
			it -> Mono.fromRunnable(releaseable::release));

	StepVerifier.create(mono, 1)
	            .thenAwait(Duration.ofMillis(50))
	            .thenCancel()
	            .verify();

	Awaitility.await()
	          .atMost(Duration.ofSeconds(10))
	          .until(releaseable::wasAcquired);
	Awaitility.await()
	          .atMost(Duration.ofSeconds(10))
	          .until(releaseable::wasReleased);
}

static class Releaseable2 {

	private final AtomicBoolean allocated = new AtomicBoolean();
	private final AtomicBoolean released = new AtomicBoolean();

	void allocate() {
		System.out.println("alloc");
		allocated.set(true);
	}

	void release() {
		System.out.println("release");
		released.set(true);
	}

	boolean wasReleased() {
		return released.get();
	}

	boolean wasAcquired() {
		return allocated.get(); // was released mistakingly
	}
}

@mlozbin
Copy link
Author

mlozbin commented Feb 20, 2024

I'll have a look at it this week. @mlozbin I see you submitted a PR but failed to sign the CLA till this point. Are you planning to take care of that? I've delayed looking at this contribution and checked occasionally the CLA status, but I see there's more interest in this subject now so let's gauge the status quo and try to make some progress.

Hello, @chemicL. Did not realize that I had to sign it for a minor change. Already signed.

Existing test passes because resource allocation is delayed (so can be cancelled) but not long.

Sorry, I've made a mistake in a description (tests from pull request will fail on the old code). The missing part was subscribeOn on a resource allocation. Here is the fixed version.

	@Test
	void cancelEarlyDoesNotLeak() {
		Releaseable releaseable = new Releaseable();
		Mono<Long> mono = Mono.usingWhen(Mono.fromSupplier(() -> {
			LockSupport.parkNanos(Duration.ofMillis(150).toNanos());
			releaseable.allocate();
			return releaseable;
		}).subscribeOn(Schedulers.single()), it -> Mono.just(1L), it -> Mono.fromRunnable(releaseable::release));

		StepVerifier.create(mono, 1)
		            .thenAwait(Duration.ofMillis(50))
		            .thenCancel()
		            .verify();

		Awaitility.await()
		          .atMost(Duration.ofSeconds(10))
		          .until(releaseable::wasAcquired);
		Awaitility.await()
		          .atMost(Duration.ofSeconds(10))
		          .until(releaseable::wasReleased);
	}

	static class Releaseable {

		private final AtomicBoolean allocated = new AtomicBoolean();
		private final AtomicBoolean released = new AtomicBoolean();

		void allocate() {
			System.out.println("alloc");
			allocated.set(true);
		}

		void release() {
			System.out.println("release");
			released.set(true);
		}

		boolean wasReleased() {
			return released.get();
		}

		boolean wasAcquired() {
			return allocated.get();
		}
	}

@chemicL
Copy link
Member

chemicL commented Feb 20, 2024

Thanks! It doesn't look to me like a "minor change" though. It is a breaking change in terms of behaviour and requires some design in my opinion.

Take as an example the following alteration of the reproducer you provided. Assume my code relied on the cancellation to be propagated to the resource generation to avoid some costly accumulation. Without any changes, this test fails assertion <1> but passes <2>. However, with your PR, <1> is successfully asserted, while <2> fails.

@Test
void cancelEarlyDoesNotLeak2() {
	Releaseable2 releaseable = new Releaseable2();
	Mono<Long> mono = Mono.usingWhen(Mono.fromSupplier(() -> {
		LockSupport.parkNanos(Duration.ofMillis(150).toNanos());
		releaseable.allocate();
		return releaseable;
	}).doOnCancel(releaseable::cancel) // stops some heavy and lengthy process
	  .subscribeOn(Schedulers.boundedElastic()), // added
			it -> Mono.just(1L),
			it -> Mono.fromRunnable(releaseable::release));

	StepVerifier.create(mono, 1)
	            .thenAwait(Duration.ofMillis(50))
	            .thenCancel()
	            .verify();

	Awaitility.await()
	          .atMost(Duration.ofSeconds(10))
	          .until(releaseable::wasAcquired);
	Awaitility.await() // <1>
	          .atMost(Duration.ofSeconds(10))
	          .until(releaseable::wasReleased);
	Awaitility.await() // <2>
	          .atMost(Duration.ofSeconds(10))
	          .until(releaseable::wasCanceled);
}

static class Releaseable2 {

	private final AtomicBoolean allocated = new AtomicBoolean();
	private final AtomicBoolean released = new AtomicBoolean();
	private final AtomicBoolean canceled = new AtomicBoolean();

	void allocate() {
		System.out.println("alloc");
		allocated.set(true);
	}

	void release() {
		System.out.println("release");
		released.set(true);
	}

	void cancel() {
		System.out.println("cancel");
		canceled.set(true);
	}

	boolean wasReleased() {
		return released.get();
	}

	boolean wasAcquired() {
		return allocated.get(); // was released mistakingly
	}

	boolean wasCanceled() {
		return canceled.get();
	}
}

Before making any further suggestions about the design, for the reproducer that was provided in the initial report, there is a trivial fix that can be accomplished with the existing API:

Mono.fromSupplier(...).doOnCancel(releaseable::release)

I do see the highlighted issue, however the solution must not break existing usage of the API. At the same time, if you are able to provide an example that is difficult to find a workaround for then it would be a good material for discussion. Thanks in advance.

@chemicL chemicL added status/need-design This needs more in depth design work status/need-user-input This needs user input to proceed and removed ❓need-triage This issue needs triage, hasn't been looked at by a team member yet labels Feb 20, 2024
@Kindrat
Copy link

Kindrat commented Feb 21, 2024

@chemicL it's frustrating to see that fix is a breaking change. Documentation does not mention that cancel on pipeline skips cleanup publisher - it mentions empty pipeline and error signals. So method does not work according to own description and potentially can lead to major problems for its users since resources are not released due to this implicit shortcut.

You use same dangerous speculation about costly allocation. The thing is that you can't just cancel allocation process or terminate it during execution. Simple example - external service call - when termination can drop communication and break protocol leading to corrupted packets sent that will cause external service to disconnect broken client that will require more time consuming and expensive re-connection and synchronization. For this exact reason, as example, Tomcat websocket client does not drop active send task on cancellation and finishes it - it provides special wrapper over Future to caller that can be safely cancelled without impact on actual communication.

This assumption of yours is not only wrong - not only we faced this problem as you see from linked issue and links to discussions there - it goes against common usage patterns when cleanup callback has guarantees to be called leaving user ability to handle it properly according to application logic

@chemicL
Copy link
Member

chemicL commented Feb 22, 2024

@Kindrat thank you for the feedback.

First, I'd encourage you to look up Hyrum's Law and consider it in the context of this proposed change.

Second, I'm absolutely open to proposals which don't break the existing behaviour and also am open to documentation updates. Let's just not break existing tests and the usage of the API that we don't have insight into.

To me, personally, it is completely acceptable to cancel resource allocation in certain scenarios, while it might be a bad or impossible solution in others. That's why the API is flexible enough - allowing for the .doOnCancel() hook to be used together with the resourceSupplier's Publisher as in the tests that were removed in the PR!

Let me paste the test under consideration with inlined comments:

@Test
public void lateMonoResourcePublisherIsCancelledOnCancel() {
	AtomicBoolean resourceCancelled = new AtomicBoolean();
	AtomicBoolean commitDone = new AtomicBoolean();
	AtomicBoolean rollbackDone = new AtomicBoolean();
	AtomicBoolean cancelDone = new AtomicBoolean();

	Mono<String> resourcePublisher = Mono.<String>never()
			// <1>
			.doOnCancel(() -> resourceCancelled.set(true));

	Mono<String> usingWhen = Mono.usingWhen(resourcePublisher,
			Mono::just,
			tr -> Mono.fromRunnable(() -> commitDone.set(true)),
			(tr, err) -> Mono.fromRunnable(() -> rollbackDone.set(true)),
			tr -> Mono.fromRunnable(() -> cancelDone.set(true)));

	StepVerifier.create(usingWhen)
	            .expectSubscription()
	            .expectNoEvent(Duration.ofMillis(100))
	            .thenCancel()
	            .verify(Duration.ofSeconds(1));

	assertThat(commitDone).as("commitDone").isFalse();
	assertThat(rollbackDone).as("rollbackDone").isFalse();
	assertThat(cancelDone).as("cancelDone").isFalse();

	// <2>
	assertThat(resourceCancelled).as("resource cancelled").isTrue();
}

<1> Note, here you are able to intercept the cancellation and delay the actual resource cleanup as you wish until the resource is actually obtained.
<2> This validation actually makes sure that those users who have relied on this pattern are protected.

To me it looks like a completely deliberate design which has a documented behaviour using tests that were added in ec8a16b.

Now on to the documentation then. First, Mono's javadoc:

Unlike in the Flux counterpart, ALL signals are deferred until the Mono terminates and the relevant Function generates and invokes a "cleanup" Publisher.

To me, this makes it super clear that you should not expect the asyncCleanup to run if the main Publisher is cancelled before terminating.

For Flux:

Whenever the resulting sequence terminates, a provided Function generates a "cleanup" Publisher that is invoked but doesn't change the content of the main sequence. Instead it just defers the termination (unless it errors, in which case the error suppresses the original termination signal).

Again, I read this as a clear indication that the asyncCleanup is only called in case of successful termination.

I'm looking forward to your point of view and suggestions. Most probably to have a constructive outcome, please provide a depictive, concrete example where the current design falls short and makes it impossible to clean up.

@Kindrat
Copy link

Kindrat commented Feb 24, 2024

@chemicL the arguments are good except of fact that if resource init publisher already started its execution - you need guarantees for cleanup. Otherwise the whole structure like usingWhen is useless. You can easily replace it with doWork().doFinally(). What's the purpose in having it as one of default operators then?

It's reasonable to skip cleanup when init was also completely skipped, but every allocation requires its disposal.

@SimoneGiusso
Copy link

I'm not 100% but it seems that this issue is causing the following in r2dbc-pool making it too risky for use in production systems.

@chemicL
Copy link
Member

chemicL commented Feb 26, 2024

Hey, @SimoneGiusso 👋 Thanks for the link and a concrete example. I made a comment to highlight the relationship that I see (or fail to see) with our issue: r2dbc/r2dbc-pool#198 (comment)

However, that example is very useful, thanks! It made me realize that perhaps there is some actual bug in case of cancellation that happens when the resource has been successfully allocated, but processing a result obtained via that resource is in progress. However, this case is also supported properly using the asyncCancel callback.

Please have a look at the following code sample:

@Test
public void cancelAfterAllocation() {
	AtomicBoolean resourceCancelled = new AtomicBoolean();
	AtomicBoolean commitDone = new AtomicBoolean();
	AtomicBoolean rollbackDone = new AtomicBoolean();
	AtomicBoolean cancelDone = new AtomicBoolean();

	Flux<String> resourcePublisher = Flux.just("Resource")
	                                     .doOnCancel(() -> resourceCancelled.set(true));

	StepVerifier.create(Flux.usingWhen(resourcePublisher,
			            resource -> Flux.never(), // <0>
			            tr -> Mono.fromRunnable(() -> commitDone.set(true)),
			            (tr, err) -> Mono.fromRunnable(() -> rollbackDone.set(true)),
			            tr -> Mono.fromRunnable(() -> cancelDone.set(true))))
	            .expectSubscription()
	            .expectNoEvent(Duration.ofMillis(100))
	            .thenCancel()
	            .verify(Duration.ofSeconds(1));

	assertThat(commitDone).as("commitDone").isFalse();
	assertThat(rollbackDone).as("rollbackDone").isFalse();
	// <1>
	assertThat(cancelDone).as("cancelDone").isTrue();

	// <2>
	assertThat(resourceCancelled).as("resource cancelled").isTrue();
}

<0> We simulate working with the resource indefinitely using Flux.never().
<1> Shows that in this scenario, when the processing is ongoing, the cancellation callback is invoked properly and the resource can be released.
<2> This might be surprising, but the cancellation on the resourcePublisher is coming from the fact that we expect exactly 1 resource and once it is obtained, the Publisher is cancelled in the usingWhen operator.

@Kindrat I am sorry but I fail to see your points. They are quite vague and philosophical in a sense. Please follow up with a code sample to prove your point. You can use the above as the base. In my view, there are currently 3 situations for cancellation:

  1. resource allocation has not started yet -> that's fine, no leaks.
  2. resource allocation has started, but has not been finalized -> use .onCancel() on the resourcePublisher to intercept the cancellation and free up the resource.
  3. resource allocation has completed successfully/errored -> the asyncCancel cleanup is guaranteed to be called - no leaks.

I might be wrong of course, so please do prove me wrong with code or just suggest documentation improvements if the documentation is misleading.

@mlozbin I'd love to learn your perspective as well.

@Kindrat
Copy link

Kindrat commented Feb 26, 2024

@chemicL please, let's keep to the point rather than encouraging philosophy around topic. You did try to cover this specific scenario with tests on your own. Your tests are false negative due to fact they rely on execution in single thread with explicit wait before verification. Max did exactly this little to show logic is broken - changed test to run in multi-threaded way. So test failed and it shows that logic is broken as well. It has nothing to do with breaking changes or any other sophistic here. Please, review carefully what test was doing before and how it was changed in PR. It's not healthy to claim everything is working and request more and more examples when there was a clear scenario with broken test showing it as valid one.

I don't know how to explain it even more simple.

@chemicL
Copy link
Member

chemicL commented Feb 26, 2024

@Kindrat perhaps code will better depict what I'm trying to explain here. I took the test added by @mlozbin in the PR for MonoUsingWhenTest and modified it to show how the API would be properly used. It's below with comments and alterations done by commenting out certain bits.

Regarding the "false negative" comment – again, looking forward to a code example to falsify my claims.

Here's the modified test from the PR which retains the assertions, but introduces some cancellation handling. I'm not saying this is the best way to do it by any means. It will depend on the use case, but this is what I came up with for the test at hand as that's what was called out above.

@Test
public void lateResourcePublisherCleanupIsDeferredOnCancel()
		throws InterruptedException {
	// Let's not use ref but use a Sink instead
	// AtomicReference<FluxUsingWhenTest.TestResource> ref = new AtomicReference<>();
	Sinks.One<FluxUsingWhenTest.TestResource> resource = Sinks.one();
	Sinks.One<Boolean> cancellation = Sinks.one();
	
	AtomicReference<FluxUsingWhenTest.TestResource> ref = new AtomicReference<>();
	CountDownLatch resourceSubscribeLatch = new CountDownLatch(1);
	CountDownLatch resourceCancelLatch = new CountDownLatch(1);
	Mono<Integer> mono = Mono.usingWhen(Mono.fromCallable(() -> {
				LockSupport.parkNanos(Duration.ofMillis(100).toNanos());
				FluxUsingWhenTest.TestResource testResource
						= new FluxUsingWhenTest.TestResource();
				// Replace the below with sink
				// ref.set(testResource);
				// Emit to sink instead
				resource.tryEmitValue(testResource);
				resourceSubscribeLatch.countDown();
				return testResource;
			})
	                                        // Will be called if cancellation
	                                        // happens before allocation finishes
	                                        .doOnCancel(() -> {
		                                        cancellation.tryEmitValue(true);
												resourceCancelLatch.countDown();
											})
	                                        // If terminated either with completion
	                                        // or error
			                                .doOnTerminate(() -> {
												cancellation.tryEmitValue(false);
											})
			                                .subscribeOn(Schedulers.single()),
			d -> Mono.just(1),
			FluxUsingWhenTest.TestResource::commit,
			FluxUsingWhenTest.TestResource::rollback,
			// Will be called if testResource allocation finishes sucessfully (not 
			// cancelled during allocation)
			testResource -> testResource.cancel()
			                            .doOnSubscribe(unused -> resourceCancelLatch.countDown()));

	StepVerifier.create(mono.take(Duration.ofMillis(10)), 1)
	            .verifyComplete();

	assertThat(resourceSubscribeLatch.await(1, TimeUnit.SECONDS))
			.as("Resource create subscribed")
			.isTrue();
	assertThat(resourceCancelLatch.await(1, TimeUnit.SECONDS))
			.as("Resource cancel subscribed")
			.isTrue();

	// Instead of the below, replace with Sink and perform cleanup in case of 
	// cancellation
	// assertThat(ref.get())

	FluxUsingWhenTest.TestResource result =
			cancellation.asMono().flatMap(cancelled -> {
				return resource.asMono().flatMap(r -> {
					Mono<FluxUsingWhenTest.TestResource> rMono = Mono.just(r);
					return cancelled ?
							r.cancel().doOnSubscribe(unused -> resourceCancelLatch.countDown()).then(rMono) : rMono;
				});
			}).block();

	// assert that the cancellation led to freeing the resource
	assertThat(result)
			.isNotNull()
			.matches(tr -> !tr.commitProbe.wasSubscribed(), "no commit")
			.matches(tr -> !tr.rollbackProbe.wasSubscribed(), "no rollback")
			.matches(tr -> tr.cancelProbe.wasSubscribed(), "cancel method used");
}

@chemicL
Copy link
Member

chemicL commented Feb 27, 2024

Alright, after giving it a chance and looking at the related bug in r2dbc, I have decided that this is a non-issue. Feel free to open a PR that improves the documentation if you find wording that better explains what's going on. I think @simonbasle's response sums it up fully. Let me quote it just in case:

The asyncCancel handler is a Function, actually deriving "how to release" from the value. If the value hasn't been emitted, then there is nothing to process. I don't think the bug really lies here

The related bug called out by @SimoneGiusso is in my view caused by reactor/reactor-pool#124 and should be addressed in reactor-pool.

Thanks everyone for dedicating the time to this discussion.

@chemicL chemicL closed this as not planned Won't fix, can't repro, duplicate, stale Feb 27, 2024
@chemicL chemicL added status/invalid We don't feel this issue is valid, or the root cause was found outside of Reactor and removed status/need-design This needs more in depth design work status/need-user-input This needs user input to proceed labels Feb 27, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status/invalid We don't feel this issue is valid, or the root cause was found outside of Reactor
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants