Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Getting 'Connection closed prematurely' with WebClient [SPR-15972] #20523

Closed
spring-projects-issues opened this issue Sep 18, 2017 · 22 comments
Closed
Assignees
Labels
status: declined

Comments

@spring-projects-issues
Copy link
Collaborator

@spring-projects-issues spring-projects-issues commented Sep 18, 2017

Arnaud Cogoluègnes opened SPR-15972 and commented

I'm getting the following exception in the HTTP RabbitMQ Client test suite after an upgrade to Spring 5.0.0.RC4:

reactor.core.Exceptions$ReactiveException: java.io.IOException: Connection closed prematurely at com.rabbitmq.http.client.ReactiveClientSpec.PUT /api/exchanges/{vhost}/{name} when vhost exists(ReactiveClientSpec.groovy:348)
Caused by: reactor.core.Exceptions$ReactiveException: java.io.IOException: Connection closed prematurely
	at reactor.core.Exceptions.propagate(Exceptions.java:294)
	at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:87)
	at reactor.core.publisher.Flux.blockFirst(Flux.java:1909)
	... 1 more
Caused by: java.io.IOException: Connection closed prematurely
	at reactor.ipc.netty.http.client.HttpClientOperations.onInboundComplete(HttpClientOperations.java:262)
	at reactor.ipc.netty.channel.ChannelOperations.onHandlerTerminate(ChannelOperations.java:417)
	at reactor.ipc.netty.channel.ChannelOperationsHandler.channelInactive(ChannelOperationsHandler.java:105)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
	at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:377)
	at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:342)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1354)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
	at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:917)
	at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:763)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
	at java.lang.Thread.run(Thread.java:748)

Instructions to reproduce:

$ tar xf rabbitmq-server-generic-unix-3.7.0-beta.20.tar.xz
$ cd rabbitmq_server-3.7.0-beta.20/sbin
$ ./rabbitmq-server -detached
$ ./rabbitmq-plugins enable rabbitmq_management
$ git clone https://github.com/rabbitmq/hop.git
  • Launch one of the tests that fails:
./gradlew cleanTest test --tests '*ReactiveClientSpec.PUT /api/exchanges/{vhost}/{name} when vhost exists'

The test suite was affected by a Reactor Netty issue and was working with the connection pool disabled. It also works with Spring 5.0.0.RC4, but only against RabbitMQ 3.6.x. Disabling the connection pool makes the test suite works against Spring 5.0.0.RC4 and RabbitMQ 3.7.0.M20.

I noticed something weird: it looks like Wireshark doesn't recognize WebClient PUT requests as HTTP requests (the HTTP details don't show up, see capture).


Affects: 5.0 RC4

Attachments:

@spring-projects-issues
Copy link
Collaborator Author

@spring-projects-issues spring-projects-issues commented Sep 18, 2017

Rossen Stoyanchev commented

I think the key here is the Reactor and Reactor Netty version. Can you try Bismuth snapshots? There is a recent fix post Bismuth 4 (from today) that's worth checking.

@spring-projects-issues
Copy link
Collaborator Author

@spring-projects-issues spring-projects-issues commented Sep 19, 2017

Arnaud Cogoluègnes commented

Not more luck with the latest Bismuth snapshots.

@spring-projects-issues
Copy link
Collaborator Author

@spring-projects-issues spring-projects-issues commented Sep 19, 2017

Rossen Stoyanchev commented

Right, so the Reactor Netty issue #138 was about emitting an error signal when the server closes the connection prematurely. Now you're getting the IOException (as expected). The question is why the server does that.

Can you see anything in the RabbitMQ logs that would indicate whether it is proactively closing/rejecting, or does it hit some exception and drop it? The server version is also of interest and narrowing down to a specific milestone where it stopped could help to figure out some change that causes this. Do you have full request details available? We could consider opening a ticket with RabbitMQ as well to ask why this happens.

@spring-projects-issues
Copy link
Collaborator Author

@spring-projects-issues spring-projects-issues commented Sep 20, 2017

Arnaud Cogoluègnes commented

There's nothing in the RabbitMQ logs. I'll try to lower the log level. I'll try to also to narrow down the issue (maybe using only Reactor Netty) and to find when the problem shows up in RabbitMQ (between the 3.7.0 milestones and 3.6.x). There may have been upgrades of the HTTP part (Cowboy and Ranch).
Note the HOP blocking client works (it uses RestTemplate and Commons HttpClient).

@spring-projects-issues
Copy link
Collaborator Author

@spring-projects-issues spring-projects-issues commented Sep 20, 2017

Rossen Stoyanchev commented

Differences in the headers between the blocking and the WebClient might provide some clues too.

@spring-projects-issues
Copy link
Collaborator Author

@spring-projects-issues spring-projects-issues commented Sep 20, 2017

Rossen Stoyanchev commented

it looks like Wireshark doesn't recognize WebClient PUT requests as HTTP requests

Also does the request look complete -- headers and entire body content, matching to the content-length (if present)?

@spring-projects-issues
Copy link
Collaborator Author

@spring-projects-issues spring-projects-issues commented Sep 21, 2017

Arnaud Cogoluègnes commented

Here are 2 runs of the same code, one failing, the second one successful (against RabbitMQ 3.7.0 M17):

[DEBUG] (main) Using Console logging
[DEBUG] (main) onSubscribe(FluxMap.MapSubscriber)
[DEBUG] (main) request(unbounded)
[DEBUG] (main) Default epoll support : false
[DEBUG] (main) New http client pool for /127.0.0.1:15672
[DEBUG] (main) Acquiring existing channel from pool: DefaultPromise@25fb8912(incomplete) SimpleChannelPool{activeConnections=1}
[DEBUG] (reactor-http-nio-4) Created [id: 0xdd37c7c0], now 1 active connections
[DEBUG] (reactor-http-nio-4) After pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.left.httpDecoder = io.netty.handler.codec.http.HttpResponseDecoder), (reactor.left.httpEncoder = io.netty.handler.codec.http.HttpRequestEncoder), (reactor.right.reactiveBridge = reactor.ipc.netty.channel.ChannelOperationsHandler)}
[DEBUG] (reactor-http-nio-4) Acquired active channel: [id: 0xdd37c7c0, L:/127.0.0.1:44684 - R:localhost/127.0.0.1:15672]
[DEBUG] (reactor-http-nio-4) [HttpClient] [id: 0xdd37c7c0, L:/127.0.0.1:44684 - R:localhost/127.0.0.1:15672] handler is being applied: HttpClientHandler{startURI=http://127.0.0.1:15672/api/exchanges/%2F/hop.test, method=PUT, handler=org.springframework.http.client.reactive.ReactorClientHttpConnector$$Lambda$70/169663597@6d05c13c}
[DEBUG] (reactor-http-nio-4) [id: 0xdd37c7c0, L:/127.0.0.1:44684 - R:localhost/127.0.0.1:15672] Writing object DefaultHttpRequest(decodeResult: success, version: HTTP/1.1)
PUT /api/exchanges/%2F/hop.test HTTP/1.1
user-agent: ReactorNetty/0.7.0.M2
transfer-encoding: chunked
host: 127.0.0.1:15672
accept: */*
Authorization: Basic Z3Vlc3Q6Z3Vlc3Q=
Content-Type: application/json
[DEBUG] (reactor-http-nio-4) [id: 0xdd37c7c0, L:/127.0.0.1:44684 - R:localhost/127.0.0.1:15672] Writing object FluxMapFuseable
[TRACE] (reactor-http-nio-4) [id: 0xdd37c7c0, L:/127.0.0.1:44684 - R:localhost/127.0.0.1:15672] Pending write size = 85
[TRACE] (reactor-http-nio-4) [id: 0xdd37c7c0, L:/127.0.0.1:44684 - R:localhost/127.0.0.1:15672] End of the pipeline, User event [Response Write Completed]
[DEBUG] (reactor-http-nio-4) [id: 0xdd37c7c0, L:/127.0.0.1:44684 - R:localhost/127.0.0.1:15672] Writing object EmptyLastHttpContent
[DEBUG] (reactor-http-nio-4) [id: 0xdd37c7c0, L:/127.0.0.1:44684 - R:localhost/127.0.0.1:15672] Received response (auto-read:false) : [server=Cowboy, date=Thu, 21 Sep 2017 13:21:37 GMT, content-length=0, content-type=application/json, vary=accept, accept-encoding, origin]
[DEBUG] (reactor-http-nio-4) onNext(ReactorClientHttpResponse{request=[PUT /api/exchanges/%2F/hop.test],status=204})
[DEBUG] (reactor-http-nio-4) onComplete()
[DEBUG] (reactor-http-nio-4) [id: 0xdd37c7c0, L:/127.0.0.1:44684 - R:localhost/127.0.0.1:15672] Received last HTTP packet
[TRACE] (reactor-http-nio-4) [id: 0xdd37c7c0, L:/127.0.0.1:44684 - R:localhost/127.0.0.1:15672] Disposing ChannelOperation from a channel
[TRACE] (reactor-http-nio-4) [id: 0xdd37c7c0, L:/127.0.0.1:44684 - R:localhost/127.0.0.1:15672] End of the pipeline, User event [Handler Terminated]
[DEBUG] (reactor-http-nio-4) [id: 0xdd37c7c0, L:/127.0.0.1:44684 - R:localhost/127.0.0.1:15672] Disposing context reactor.ipc.netty.channel.PooledClientContextHandler@3100fc45
[DEBUG] (reactor-http-nio-4) Releasing channel: [id: 0xdd37c7c0, L:/127.0.0.1:44684 - R:localhost/127.0.0.1:15672]
[DEBUG] (reactor-http-nio-4) Released [id: 0xdd37c7c0, L:/127.0.0.1:44684 - R:localhost/127.0.0.1:15672], now 0 active connections
[DEBUG] (main) onSubscribe(FluxMap.MapSubscriber)
[DEBUG] (main) request(unbounded)
[DEBUG] (main) Acquiring existing channel from pool: DefaultPromise@6392827e(incomplete) SimpleChannelPool{activeConnections=0}
[DEBUG] (reactor-http-nio-4) Acquired [id: 0xdd37c7c0, L:/127.0.0.1:44684 - R:localhost/127.0.0.1:15672], now 1 active connections
[DEBUG] (reactor-http-nio-4) Acquired active channel: [id: 0xdd37c7c0, L:/127.0.0.1:44684 - R:localhost/127.0.0.1:15672]
[DEBUG] (reactor-http-nio-4) [HttpClient] [id: 0xdd37c7c0, L:/127.0.0.1:44684 - R:localhost/127.0.0.1:15672] handler is being applied: HttpClientHandler{startURI=http://127.0.0.1:15672/api/exchanges/%2F, method=GET, handler=org.springframework.http.client.reactive.ReactorClientHttpConnector$$Lambda$70/169663597@1d657532}
[DEBUG] (reactor-http-nio-4) [id: 0xdd37c7c0, L:/127.0.0.1:44684 - R:localhost/127.0.0.1:15672] Writing object DefaultFullHttpRequest(decodeResult: success, version: HTTP/1.1, content: UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 0))
GET /api/exchanges/%2F HTTP/1.1
user-agent: ReactorNetty/0.7.0.M2
host: 127.0.0.1:15672
accept: */*
Authorization: Basic Z3Vlc3Q6Z3Vlc3Q=
Content-Type: application/json
content-length: 0
[TRACE] (reactor-http-nio-4) [id: 0xdd37c7c0, L:/127.0.0.1:44684 ! R:localhost/127.0.0.1:15672] Disposing ChannelOperation from a channel
[ERROR] (reactor-http-nio-4) onError(java.io.IOException: Connection closed prematurely)
[ERROR] (reactor-http-nio-4)  - java.io.IOException: Connection closed prematurely
java.io.IOException: Connection closed prematurely
	at reactor.ipc.netty.http.client.HttpClientOperations.onInboundComplete(HttpClientOperations.java:262)
	at reactor.ipc.netty.channel.ChannelOperations.onHandlerTerminate(ChannelOperations.java:417)
	at reactor.ipc.netty.channel.ChannelOperationsHandler.channelInactive(ChannelOperationsHandler.java:105)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
	at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:377)
	at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:342)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
	at io.netty.handler.logging.LoggingHandler.channelInactive(LoggingHandler.java:167)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1354)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
	at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:917)
	at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:763)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
	at java.lang.Thread.run(Thread.java:748)
[TRACE] (reactor-http-nio-4) [id: 0xdd37c7c0, L:/127.0.0.1:44684 ! R:localhost/127.0.0.1:15672] End of the pipeline, User event [Handler Terminated]
[DEBUG] (reactor-http-nio-4) [id: 0xdd37c7c0, L:/127.0.0.1:44684 ! R:localhost/127.0.0.1:15672] Disposing context reactor.ipc.netty.channel.PooledClientContextHandler@37d97617
[DEBUG] (reactor-http-nio-4) Releasing channel: [id: 0xdd37c7c0, L:/127.0.0.1:44684 ! R:localhost/127.0.0.1:15672]
[DEBUG] (reactor-http-nio-4) Released [id: 0xdd37c7c0, L:/127.0.0.1:44684 ! R:localhost/127.0.0.1:15672], now 0 active connections
Exception in thread "main" reactor.core.Exceptions$ReactiveException: java.io.IOException: Connection closed prematurely
	at reactor.core.Exceptions.propagate(Exceptions.java:294)
	at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:87)
	at reactor.core.publisher.Flux.blockFirst(Flux.java:1909)
	at io.pivotal.App.main(App.java:19)
	Suppressed: java.lang.Exception: #block terminated with an error
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:88)
		... 2 more
Caused by: java.io.IOException: Connection closed prematurely
	at reactor.ipc.netty.http.client.HttpClientOperations.onInboundComplete(HttpClientOperations.java:262)
	at reactor.ipc.netty.channel.ChannelOperations.onHandlerTerminate(ChannelOperations.java:417)
	at reactor.ipc.netty.channel.ChannelOperationsHandler.channelInactive(ChannelOperationsHandler.java:105)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
	at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:377)
	at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:342)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
	at io.netty.handler.logging.LoggingHandler.channelInactive(LoggingHandler.java:167)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1354)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
	at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:917)
	at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:763)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
	at java.lang.Thread.run(Thread.java:748)

Process finished with exit code 1
/usr/local/java8/bin/java -javaagent:/usr/local/idea-IC-172.3317.76/lib/idea_rt.jar=43563:/usr/local/idea-IC-172.3317.76/bin -Dfile.encoding=UTF-8 -classpath /usr/local/java8/jre/lib/charsets.jar:/usr/local/java8/jre/lib/deploy.jar:/usr/local/java8/jre/lib/ext/cldrdata.jar:/usr/local/java8/jre/lib/ext/dnsns.jar:/usr/local/java8/jre/lib/ext/jaccess.jar:/usr/local/java8/jre/lib/ext/jfxrt.jar:/usr/local/java8/jre/lib/ext/localedata.jar:/usr/local/java8/jre/lib/ext/nashorn.jar:/usr/local/java8/jre/lib/ext/sunec.jar:/usr/local/java8/jre/lib/ext/sunjce_provider.jar:/usr/local/java8/jre/lib/ext/sunpkcs11.jar:/usr/local/java8/jre/lib/ext/zipfs.jar:/usr/local/java8/jre/lib/javaws.jar:/usr/local/java8/jre/lib/jce.jar:/usr/local/java8/jre/lib/jfr.jar:/usr/local/java8/jre/lib/jfxswt.jar:/usr/local/java8/jre/lib/jsse.jar:/usr/local/java8/jre/lib/management-agent.jar:/usr/local/java8/jre/lib/plugin.jar:/usr/local/java8/jre/lib/resources.jar:/usr/local/java8/jre/lib/rt.jar:/home/acogoluegnes/temp/rabbitmq/hopwebflux/target/classes:/home/acogoluegnes/.m2/repository/org/springframework/spring-webflux/5.0.0.RC4/spring-webflux-5.0.0.RC4.jar:/home/acogoluegnes/.m2/repository/io/projectreactor/reactor-core/3.1.0.RC1/reactor-core-3.1.0.RC1.jar:/home/acogoluegnes/.m2/repository/org/reactivestreams/reactive-streams/1.0.1/reactive-streams-1.0.1.jar:/home/acogoluegnes/.m2/repository/org/springframework/spring-beans/5.0.0.RC4/spring-beans-5.0.0.RC4.jar:/home/acogoluegnes/.m2/repository/org/springframework/spring-core/5.0.0.RC4/spring-core-5.0.0.RC4.jar:/home/acogoluegnes/.m2/repository/org/springframework/spring-jcl/5.0.0.RC4/spring-jcl-5.0.0.RC4.jar:/home/acogoluegnes/.m2/repository/org/springframework/spring-web/5.0.0.RC4/spring-web-5.0.0.RC4.jar:/home/acogoluegnes/.m2/repository/io/projectreactor/ipc/reactor-netty/0.7.0.M2/reactor-netty-0.7.0.M2.jar:/home/acogoluegnes/.m2/repository/io/netty/netty-codec-http/4.1.15.Final/netty-codec-http-4.1.15.Final.jar:/home/acogoluegnes/.m2/repository/io/netty/netty-codec/4.1.15.Final/netty-codec-4.1.15.Final.jar:/home/acogoluegnes/.m2/repository/io/netty/netty-handler/4.1.15.Final/netty-handler-4.1.15.Final.jar:/home/acogoluegnes/.m2/repository/io/netty/netty-buffer/4.1.15.Final/netty-buffer-4.1.15.Final.jar:/home/acogoluegnes/.m2/repository/io/netty/netty-transport/4.1.15.Final/netty-transport-4.1.15.Final.jar:/home/acogoluegnes/.m2/repository/io/netty/netty-resolver/4.1.15.Final/netty-resolver-4.1.15.Final.jar:/home/acogoluegnes/.m2/repository/io/netty/netty-handler-proxy/4.1.15.Final/netty-handler-proxy-4.1.15.Final.jar:/home/acogoluegnes/.m2/repository/io/netty/netty-codec-socks/4.1.15.Final/netty-codec-socks-4.1.15.Final.jar:/home/acogoluegnes/.m2/repository/io/netty/netty-transport-native-epoll/4.1.15.Final/netty-transport-native-epoll-4.1.15.Final.jar:/home/acogoluegnes/.m2/repository/io/netty/netty-common/4.1.15.Final/netty-common-4.1.15.Final.jar:/home/acogoluegnes/.m2/repository/io/netty/netty-transport-native-unix-common/4.1.15.Final/netty-transport-native-unix-common-4.1.15.Final.jar:/home/acogoluegnes/.m2/repository/com/fasterxml/jackson/core/jackson-databind/2.9.0.pr4/jackson-databind-2.9.0.pr4.jar:/home/acogoluegnes/.m2/repository/com/fasterxml/jackson/core/jackson-annotations/2.9.0.pr4/jackson-annotations-2.9.0.pr4.jar:/home/acogoluegnes/.m2/repository/com/fasterxml/jackson/core/jackson-core/2.9.0.pr4/jackson-core-2.9.0.pr4.jar io.pivotal.App
[DEBUG] (main) Using Console logging
[DEBUG] (main) onSubscribe(FluxMap.MapSubscriber)
[DEBUG] (main) request(unbounded)
[DEBUG] (main) Default epoll support : false
[DEBUG] (main) New http client pool for /127.0.0.1:15672
[DEBUG] (main) Acquiring existing channel from pool: DefaultPromise@25fb8912(incomplete) SimpleChannelPool{activeConnections=1}
[DEBUG] (reactor-http-nio-4) Created [id: 0xf526ab83], now 1 active connections
[DEBUG] (reactor-http-nio-4) After pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.left.httpDecoder = io.netty.handler.codec.http.HttpResponseDecoder), (reactor.left.httpEncoder = io.netty.handler.codec.http.HttpRequestEncoder), (reactor.right.reactiveBridge = reactor.ipc.netty.channel.ChannelOperationsHandler)}
[DEBUG] (reactor-http-nio-4) Acquired active channel: [id: 0xf526ab83, L:/127.0.0.1:44740 - R:localhost/127.0.0.1:15672]
[DEBUG] (reactor-http-nio-4) [HttpClient] [id: 0xf526ab83, L:/127.0.0.1:44740 - R:localhost/127.0.0.1:15672] handler is being applied: HttpClientHandler{startURI=http://127.0.0.1:15672/api/exchanges/%2F/hop.test, method=PUT, handler=org.springframework.http.client.reactive.ReactorClientHttpConnector$$Lambda$70/169663597@6ff0dd74}
[DEBUG] (reactor-http-nio-4) [id: 0xf526ab83, L:/127.0.0.1:44740 - R:localhost/127.0.0.1:15672] Writing object DefaultHttpRequest(decodeResult: success, version: HTTP/1.1)
PUT /api/exchanges/%2F/hop.test HTTP/1.1
user-agent: ReactorNetty/0.7.0.M2
transfer-encoding: chunked
host: 127.0.0.1:15672
accept: */*
Authorization: Basic Z3Vlc3Q6Z3Vlc3Q=
Content-Type: application/json
[DEBUG] (reactor-http-nio-4) [id: 0xf526ab83, L:/127.0.0.1:44740 - R:localhost/127.0.0.1:15672] Writing object FluxMapFuseable
[TRACE] (reactor-http-nio-4) [id: 0xf526ab83, L:/127.0.0.1:44740 - R:localhost/127.0.0.1:15672] Pending write size = 85
[TRACE] (reactor-http-nio-4) [id: 0xf526ab83, L:/127.0.0.1:44740 - R:localhost/127.0.0.1:15672] End of the pipeline, User event [Response Write Completed]
[DEBUG] (reactor-http-nio-4) [id: 0xf526ab83, L:/127.0.0.1:44740 - R:localhost/127.0.0.1:15672] Writing object EmptyLastHttpContent
[DEBUG] (reactor-http-nio-4) [id: 0xf526ab83, L:/127.0.0.1:44740 - R:localhost/127.0.0.1:15672] Received response (auto-read:false) : [server=Cowboy, date=Thu, 21 Sep 2017 13:22:36 GMT, content-length=0, content-type=application/json, vary=accept, accept-encoding, origin]
[DEBUG] (reactor-http-nio-4) onNext(ReactorClientHttpResponse{request=[PUT /api/exchanges/%2F/hop.test],status=204})
[DEBUG] (reactor-http-nio-4) onComplete()
[DEBUG] (reactor-http-nio-4) [id: 0xf526ab83, L:/127.0.0.1:44740 - R:localhost/127.0.0.1:15672] Received last HTTP packet
[TRACE] (reactor-http-nio-4) [id: 0xf526ab83, L:/127.0.0.1:44740 - R:localhost/127.0.0.1:15672] Disposing ChannelOperation from a channel
[DEBUG] (main) onSubscribe(FluxMap.MapSubscriber)
[DEBUG] (main) request(unbounded)
[DEBUG] (main) Acquiring existing channel from pool: DefaultPromise@e15b7e8(incomplete) SimpleChannelPool{activeConnections=1}
[DEBUG] (reactor-http-nio-2) Created [id: 0x7edd660a], now 2 active connections
[DEBUG] (reactor-http-nio-2) After pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.left.httpDecoder = io.netty.handler.codec.http.HttpResponseDecoder), (reactor.left.httpEncoder = io.netty.handler.codec.http.HttpRequestEncoder), (reactor.right.reactiveBridge = reactor.ipc.netty.channel.ChannelOperationsHandler)}
[DEBUG] (reactor-http-nio-2) Acquired active channel: [id: 0x7edd660a, L:/127.0.0.1:44742 - R:localhost/127.0.0.1:15672]
[DEBUG] (reactor-http-nio-2) [HttpClient] [id: 0x7edd660a, L:/127.0.0.1:44742 - R:localhost/127.0.0.1:15672] handler is being applied: HttpClientHandler{startURI=http://127.0.0.1:15672/api/exchanges/%2F, method=GET, handler=org.springframework.http.client.reactive.ReactorClientHttpConnector$$Lambda$70/169663597@127c7454}
[DEBUG] (reactor-http-nio-2) [id: 0x7edd660a, L:/127.0.0.1:44742 - R:localhost/127.0.0.1:15672] Writing object DefaultFullHttpRequest(decodeResult: success, version: HTTP/1.1, content: UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 0))
GET /api/exchanges/%2F HTTP/1.1
user-agent: ReactorNetty/0.7.0.M2
host: 127.0.0.1:15672
accept: */*
Authorization: Basic Z3Vlc3Q6Z3Vlc3Q=
Content-Type: application/json
content-length: 0
[TRACE] (reactor-http-nio-4) [id: 0xf526ab83, L:/127.0.0.1:44740 - R:localhost/127.0.0.1:15672] End of the pipeline, User event [Handler Terminated]
[DEBUG] (reactor-http-nio-4) [id: 0xf526ab83, L:/127.0.0.1:44740 - R:localhost/127.0.0.1:15672] Disposing context reactor.ipc.netty.channel.PooledClientContextHandler@6eff0ee7
[DEBUG] (reactor-http-nio-4) Releasing channel: [id: 0xf526ab83, L:/127.0.0.1:44740 - R:localhost/127.0.0.1:15672]
[DEBUG] (reactor-http-nio-4) Released [id: 0xf526ab83, L:/127.0.0.1:44740 - R:localhost/127.0.0.1:15672], now 1 active connections
[DEBUG] (reactor-http-nio-2) [id: 0x7edd660a, L:/127.0.0.1:44742 - R:localhost/127.0.0.1:15672] Received response (auto-read:false) : [server=Cowboy, date=Thu, 21 Sep 2017 13:22:37 GMT, content-length=1433, content-type=application/json, vary=accept, accept-encoding, origin, Cache-Control=no-cache]
[DEBUG] (reactor-http-nio-2) onNext(ReactorClientHttpResponse{request=[GET /api/exchanges/%2F],status=200})
[DEBUG] (reactor-http-nio-2) [id: 0x7edd660a, L:/127.0.0.1:44742 - R:localhost/127.0.0.1:15672] Subscribing inbound receiver [pending: 0, cancelled:false, inboundDone: false]
[DEBUG] (reactor-http-nio-2) onComplete()
[DEBUG] (reactor-http-nio-2) cancel()
[TRACE] (reactor-http-nio-2) [id: 0x7edd660a, L:/127.0.0.1:44742 ! R:localhost/127.0.0.1:15672] Disposing ChannelOperation from a channel
[TRACE] (reactor-http-nio-2) [id: 0x7edd660a, L:/127.0.0.1:44742 ! R:localhost/127.0.0.1:15672] End of the pipeline, User event [Handler Terminated]
[DEBUG] (reactor-http-nio-2) [id: 0x7edd660a, L:/127.0.0.1:44742 ! R:localhost/127.0.0.1:15672] Disposing context reactor.ipc.netty.channel.PooledClientContextHandler@4c0c5584
[DEBUG] (reactor-http-nio-2) Releasing channel: [id: 0x7edd660a, L:/127.0.0.1:44742 ! R:localhost/127.0.0.1:15672]
[DEBUG] (reactor-http-nio-2) Released [id: 0x7edd660a, L:/127.0.0.1:44742 ! R:localhost/127.0.0.1:15672], now 0 active connections

Process finished with exit code 0

Still investigating.

@spring-projects-issues
Copy link
Collaborator Author

@spring-projects-issues spring-projects-issues commented Sep 21, 2017

Rossen Stoyanchev commented

You can set reactor.ipc.netty to TRACE level. That will show content being written as well.

@spring-projects-issues
Copy link
Collaborator Author

@spring-projects-issues spring-projects-issues commented Sep 21, 2017

Rossen Stoyanchev commented

I am now up and running with the test and able to reproduce the failure.

@spring-projects-issues
Copy link
Collaborator Author

@spring-projects-issues spring-projects-issues commented Sep 21, 2017

Arnaud Cogoluègnes commented

There have a been an upgrade of RabbitMQ HTTP server (cowboy) between RabbitMQ 3.6.12 and RabbitMQ 3.7.0.M2. I'll investigate more tomorrow.

@spring-projects-issues
Copy link
Collaborator Author

@spring-projects-issues spring-projects-issues commented Sep 21, 2017

Rossen Stoyanchev commented

Okay so I see the PUT with the 201 response and Content-Length:0. As a result Reactor Netty does get LastHttpContent and releases the connection back to the pool. So it gets re-used for the subsequent GET. However in Wireshark I don't see the connection getting closed, and that's what confuses Wireshare -- it mixes the PUT and the GET into one.

Looking at the code you're using the WebClient and for the PUT you have exchange().block(), i.e. never consuming the body, nor closing (recently added, see #19316). If I add a close() it works but without an explicit close it doesn't.

client.declareExchange(v, s, new ExchangeInfo("fanout", false, false))
        .doOnNext({response -> response.close()})
        .block()

Given Content-Length:0 I don't think we should have to close explicitly so I suspect this may end up being a Reactor Netty fix but I'm not sure yet.

As for the Cowboy server, I see in the release notes:

Migration to Cowboy REST

RabbitMQ management plugin as well as its extensions (e.g. those of
Federation and Shovel, rabbitmq-top) now uses Cowboy REST
instead of Webmachine. Cowboy is a state-of-the-art open source Erlang HTTP 1.1 server and REST micro framework
that is also used in the plugins that provide WebSocket support.

The change is largely invisible to management UI and HTTP API
clients but there is one change that can affect test suites: POST and PUT responses now use 201 Created instead of 204 No Content.

The 201-204 on PUT looks relevant but at this time I don't think that's the issue because as far as I can see Netty does the same thing after 204 and 201 with content-length:0.

@spring-projects-issues
Copy link
Collaborator Author

@spring-projects-issues spring-projects-issues commented Sep 22, 2017

Arnaud Cogoluègnes commented

Sequences that fail always re-use the same connection from the pool. Sequences that succeed always use a new connection Created [id: 0x7edd660a], now 2 active connections. I don't know if that matters.

I'm using Reactor Netty HttpClient but don't manage to reproduce.

@spring-projects-issues
Copy link
Collaborator Author

@spring-projects-issues spring-projects-issues commented Sep 22, 2017

Rossen Stoyanchev commented

Yes, the same connection is re-used after not being closed, did you see my last comment above?

What is the code you're using with Reactor Netty HttpClient?

@spring-projects-issues
Copy link
Collaborator Author

@spring-projects-issues spring-projects-issues commented Sep 22, 2017

Arnaud Cogoluègnes commented

Something like this (far from perfect I guess, I've just discovered the API):

HttpClient client = HttpClient.create();

client.put("http://localhost:15672/api/exchanges/%2F/hoptest", request -> {
    request
        .addHeader("Authorization", "Basic Z3Vlc3Q6Z3Vlc3Q=")
        .addHeader("Content-Type", "application/json")
        .sendString(Mono.just("{\"type\":\"fanout\",\"durable\":false,\"internal\":false,\"arguments\":{},\"auto_delete\":false}")).then().block();

    return request;
}).block().responseHeaders();

client.get("http://localhost:15672/api/exchanges/%2F", request -> {
    request
        .addHeader("Authorization", "Basic Z3Vlc3Q6Z3Vlc3Q=")
        .addHeader("Content-Type", "application/json")
        .addHeader("content-length", "0");

    return request;
}).block().status();

@spring-projects-issues
Copy link
Collaborator Author

@spring-projects-issues spring-projects-issues commented Sep 22, 2017

Rossen Stoyanchev commented

And you confirmed there is Content-Length:0 in the response + the same connection is re-used?

@spring-projects-issues
Copy link
Collaborator Author

@spring-projects-issues spring-projects-issues commented Sep 22, 2017

Arnaud Cogoluègnes commented

In the 201 response there is content-length: 0, but another connection is used for the GET request.

@spring-projects-issues
Copy link
Collaborator Author

@spring-projects-issues spring-projects-issues commented Sep 22, 2017

Rossen Stoyanchev commented

Right i'm certain you have the same issue, i.e. you'd have to call HttpClientResponse.dispose().

I'm working on a fix on the WebClient side to automatically close for empty responses. I will also create an issue in Reactor Netty since it shouldn't return connections to the pool that have not been disposed.

Effectively we try to save you that effort where you've either consumed the body (via bodyToMono for example) or when the body is empty. Overall though you're still responsible to consume the body or call close() after exchange(). Alternatively you can also use retrieve() which simplifies the way to consume the body. And if you don't do any of that, the fix on the Reactor Netty should ensure the connection pool doesn't have unclosed connections.

@spring-projects-issues
Copy link
Collaborator Author

@spring-projects-issues spring-projects-issues commented Sep 22, 2017

Arnaud Cogoluègnes commented

Makes sense. I admit the code isn't perfect or representative of the usage of a reactive API (I ported most of the code from the blocking client test as-is). Nevertheless, this kind of usage could happen in the wild, so I guess it's worth the effort.

Thanks for the follow-up and happy to test the fixes.

@spring-projects-issues
Copy link
Collaborator Author

@spring-projects-issues spring-projects-issues commented Sep 22, 2017

Rossen Stoyanchev commented

After further investigation, it seems that the problem is on the side of Cowboy, which is also consistent with the fact that it worked in 3.6.x where a different HTTP server was used.

The WebClient uses a connection pool, with persistent connection (HTTP 1.1 default), so it does not need to be closed. For some reason that causes Cowboy to hang on the GET after the PUT (with chunked encoding). We've tested the same interaction -- with the connection re-used, and confirmed it works fine with Tomcat and Reactor Netty server.

If you can't figure out why Cowboy behaves this way and fix it, then you might need to use the workaround I specified above with doOnNext() and response.close(). However keep in mind the downside is that closing the connection regularly defeats the benefit of using a connection pool.

@spring-projects-issues
Copy link
Collaborator Author

@spring-projects-issues spring-projects-issues commented Sep 23, 2017

Rossen Stoyanchev commented

Resolving for now since we have confirmation from testing with other HTTP servers but feel free to comment.

@spring-projects-issues
Copy link
Collaborator Author

@spring-projects-issues spring-projects-issues commented Sep 26, 2017

Arnaud Cogoluègnes commented

That is indeed a Cowboy problem. Cowboy 2 should fix it but we already have a fix in RabbitMQ. Thanks for your help on this Rossen.

@spring-projects-issues
Copy link
Collaborator Author

@spring-projects-issues spring-projects-issues commented Sep 26, 2017

Rossen Stoyanchev commented

Good to know, thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: declined
Projects
None yet
Development

No branches or pull requests

2 participants