Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IllegalStateException during inTransaction(…) with inner Mono.error(…) #48

Closed
mp911de opened this issue Jan 8, 2019 · 7 comments
Closed
Labels
type: bug A general bug

Comments

@mp911de
Copy link
Member

mp911de commented Jan 8, 2019

The following code causes an IllegalStateException:

client
    .inTransaction(
        dc ->
            passwordHistoryRepository
                .save(new PasswordHistory(1))
                .then(Mono.error(new RuntimeException("6")))
                .then(passwordHistoryRepository.save(new PasswordHistory(2))
    )

Trace:

RuntimeException, trace=java.lang.RuntimeException: Async resource cleanup failed after onComplete
	at reactor.core.publisher.FluxUsingWhen$CommitInner.onError(FluxUsingWhen.java:519)
...
Caused by: java.lang.IllegalStateException: Connection is closed!
	at org.springframework.data.r2dbc.function.connectionfactory.SingletonConnectionFactory.create(SingletonConnectionFactory.java:54)
	at org.springframework.data.r2dbc.function.DefaultTransactionalDatabaseClient.lambda$cleanup$7(DefaultTransactionalDatabaseClient.java:142)

Related ticket: #44.

@mp911de mp911de added the type: bug A general bug label Jan 8, 2019
@ToffeeLu
Copy link

ToffeeLu commented Jan 9, 2019

One thing to correct is this IllegalStateException happens only after commenting the code of Mono.error(new RuntimeException("6")), which means it doesn't work when no error happens.

@mp911de
Copy link
Member Author

mp911de commented Jan 9, 2019

I wasn't able to reproduce the issue (see reproducer code. Can you provide a complete, minimal, verifiable sample that reproduces the problem, ideally as GitHub repository or Gist?

@ToffeeLu
Copy link

ToffeeLu commented Jan 10, 2019

@mp911de check this reproducer code
Just as I said above, error only happens without Mono.error(),
here is what's different between me and your reproduce code:

    LegoSet legoSet1 = new LegoSet(null, "SCHAUFELRADBAGGER", 12);
    LegoSet legoSet2 = new LegoSet(null, "SCHAUFELRADBAGGER", 13);

    Flux<Map<String, Object>> transactional = client.inTransaction(db ->
      legoSetRepository.save(legoSet1)
          .then(legoSetRepository.save(legoSet2))
          .then(Mono.empty()));

and the error - Caused by: java.lang.IllegalStateException: Connection is closed! happens:

java.lang.AssertionError: expectation "expectError(Class)" failed (expected error of type: MyException; actual type: java.lang.RuntimeException: Async resource cleanup failed after onComplete)

	at reactor.test.ErrorFormatter.assertionError(ErrorFormatter.java:105)
	at reactor.test.ErrorFormatter.failPrefix(ErrorFormatter.java:94)
	at reactor.test.ErrorFormatter.fail(ErrorFormatter.java:64)
	at reactor.test.ErrorFormatter.failOptional(ErrorFormatter.java:79)
	at reactor.test.DefaultStepVerifierBuilder.lambda$expectError$6(DefaultStepVerifierBuilder.java:357)
	at reactor.test.DefaultStepVerifierBuilder$SignalEvent.test(DefaultStepVerifierBuilder.java:2112)
	at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onSignal(DefaultStepVerifierBuilder.java:1408)
	at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onExpectation(DefaultStepVerifierBuilder.java:1356)
	at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onError(DefaultStepVerifierBuilder.java:1030)
	at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onError(FluxContextStart.java:117)
	at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.deferredError(FluxUsingWhen.java:397)
	at reactor.core.publisher.FluxUsingWhen$CommitInner.onError(FluxUsingWhen.java:520)
	...
	at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:310)
	at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:141)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:648)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:583)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:500)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:462)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897)
	at java.lang.Thread.run(Thread.java:748)
	Suppressed: java.lang.RuntimeException: Async resource cleanup failed after onComplete
		at reactor.core.publisher.FluxUsingWhen$CommitInner.onError(FluxUsingWhen.java:519)
		... 129 more
	Caused by: java.lang.IllegalStateException: Connection is closed!
		at org.springframework.data.r2dbc.function.connectionfactory.SingletonConnectionFactory.create(SingletonConnectionFactory.java:54)
		at org.springframework.data.r2dbc.function.DefaultTransactionalDatabaseClient.lambda$cleanup$7(DefaultTransactionalDatabaseClient.java:142)
		at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:118)
		... 128 more


Process finished with exit code -1

@mp911de
Copy link
Member Author

mp911de commented Jan 10, 2019

Thanks a lot, I'll have a look.

@mp911de
Copy link
Member Author

mp911de commented Jan 10, 2019

The initial trace results in unexpected behavior of our resource cleanup. The cleanup happens twice and that's why the refCounter in the bound connection factory is decremented twice. This results in prematurely closing the connection. Analysis ongoing.

@mp911de
Copy link
Member Author

mp911de commented Jan 11, 2019

See also reactor/reactor-core#1486.

mp911de added a commit that referenced this issue Jan 11, 2019
We now make sure to close connections only once by tracking the cleanup state. Flux.usingWhen/Mono.usingWhen do not ensure atomic cleanup in situations where the subscription completes and then the subscription is terminated.

This behavior has lead to closing a connection multiple times.

Related ticket: reactor/reactor-core#1486
mp911de added a commit that referenced this issue Jan 11, 2019
Formatting.
@mp911de
Copy link
Member Author

mp911de commented Jan 11, 2019

That's fixed now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: bug A general bug
Projects
None yet
Development

No branches or pull requests

2 participants