Skip to content

IllegalStateException When Fetching Body More Than Once In ReadBodyPredicateFactory #438

@ryanjbaxter

Description

@ryanjbaxter
java.lang.IllegalStateException: Only one connection receive subscriber allowed.
	at reactor.ipc.netty.channel.FluxReceive.startReceiver(FluxReceive.java:276) ~[reactor-netty-0.7.8.RELEASE.jar:0.7.8.RELEASE]
	at reactor.ipc.netty.channel.FluxReceive.subscribe(FluxReceive.java:124) ~[reactor-netty-0.7.8.RELEASE.jar:0.7.8.RELEASE]
	at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:62) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.ipc.netty.ByteBufFlux.subscribe(ByteBufFlux.java:242) ~[reactor-netty-0.7.8.RELEASE.jar:0.7.8.RELEASE]
	at reactor.core.publisher.FluxPeek.subscribe(FluxPeek.java:83) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.ipc.netty.ByteBufFlux.subscribe(ByteBufFlux.java:242) ~[reactor-netty-0.7.8.RELEASE.jar:0.7.8.RELEASE]
	at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:62) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.FluxMapSignal.subscribe(FluxMapSignal.java:69) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.FluxFlatMap.subscribe(FluxFlatMap.java:97) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:62) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.MonoSingle.subscribe(MonoSingle.java:58) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.FluxSourceMonoFuseable.subscribe(FluxSourceMonoFuseable.java:38) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:63) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.Flux.subscribe(Flux.java:6877) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.FluxReplay.connect(FluxReplay.java:1052) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.FluxAutoConnectFuseable.subscribe(FluxAutoConnectFuseable.java:62) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.MonoIgnoreElements.subscribe(MonoIgnoreElements.java:37) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.Mono.subscribe(Mono.java:3080) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:172) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.Mono.subscribe(Mono.java:3080) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.FluxZip$ZipCoordinator.subscribe(FluxZip.java:579) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.FluxZip.handleBoth(FluxZip.java:326) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.FluxZip.handleArrayMode(FluxZip.java:267) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.FluxZip.subscribe(FluxZip.java:136) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:62) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.Flux.subscribe(Flux.java:6877) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.FluxFilterWhen$FluxFilterWhenSubscriber.drain(FluxFilterWhen.java:269) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.FluxFilterWhen$FluxFilterWhenSubscriber.innerResult(FluxFilterWhen.java:352) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.FluxFilterWhen$FilterWhenInner.onNext(FluxFilterWhen.java:432) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.FluxFilterWhen$FilterWhenInner.onNext(FluxFilterWhen.java:396) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:108) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.FluxZip$ZipCoordinator.drain(FluxZip.java:735) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.FluxZip$ZipInner.onNext(FluxZip.java:894) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1068) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onNext(MonoIgnoreThen.java:295) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:1640) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onSubscribe(MonoIgnoreThen.java:284) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:153) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.ignoreDone(MonoIgnoreThen.java:190) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreInner.onComplete(MonoIgnoreThen.java:239) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:80) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.FluxReplay$UnboundedReplayBuffer.replayNormal(FluxReplay.java:550) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.FluxReplay$UnboundedReplayBuffer.replay(FluxReplay.java:653) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.onComplete(FluxReplay.java:1188) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:138) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1085) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:144) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1083) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.MonoSingle$SingleSubscriber.onComplete(MonoSingle.java:171) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:130) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:773) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:543) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:523) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.onComplete(FluxFlatMap.java:409) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.DrainUtils.postCompleteDrain(DrainUtils.java:131) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.DrainUtils.postComplete(DrainUtils.java:186) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.FluxMapSignal$FluxMapSignalSubscriber.onComplete(FluxMapSignal.java:213) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:130) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:245) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:130) ~[reactor-core-3.1.8.RELEASE.jar:3.1.8.RELEASE]
	at reactor.ipc.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:377) ~[reactor-netty-0.7.8.RELEASE.jar:0.7.8.RELEASE]
	at reactor.ipc.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:202) ~[reactor-netty-0.7.8.RELEASE.jar:0.7.8.RELEASE]
	at reactor.ipc.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:342) ~[reactor-netty-0.7.8.RELEASE.jar:0.7.8.RELEASE]
	at reactor.ipc.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:336) ~[reactor-netty-0.7.8.RELEASE.jar:0.7.8.RELEASE]
	at reactor.ipc.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:408) ~[reactor-netty-0.7.8.RELEASE.jar:0.7.8.RELEASE]
	at reactor.ipc.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:136) ~[reactor-netty-0.7.8.RELEASE.jar:0.7.8.RELEASE]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final]
	at reactor.ipc.netty.http.server.HttpServerHandler.channelRead(HttpServerHandler.java:164) ~[reactor-netty-0.7.8.RELEASE.jar:0.7.8.RELEASE]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final]
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310) ~[netty-codec-4.1.25.Final.jar:4.1.25.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284) ~[netty-codec-4.1.25.Final.jar:4.1.25.Final]
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final]
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:647) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:582) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:499) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:461) ~[netty-transport-4.1.25.Final.jar:4.1.25.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884) ~[netty-common-4.1.25.Final.jar:4.1.25.Final]
	at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_91]

My RouteLocator looks like this

	@Bean
	public RouteLocator routeLocator(RouteLocatorBuilder builder) {
		return builder.routes()
				.route(p -> p.path("/slack/events").and().method(HttpMethod.POST).and().readBody(SlackEvent.class, eventPredicate("message.channels")).uri("http://localhost:9090"))
				.route(p -> p.path("/slack/events").and().method(HttpMethod.POST).and().readBody(SlackEvent.class, eventPredicate("message")).uri("http://localhost:9080"))

				.build();
	}

	private Predicate<SlackEvent> eventPredicate(String type) {
		return r -> r.getEvent().getType().equals(type);
	}

This happens if the incoming request to the gateway does not match the first route and the gateway and a subsequent route also uses the read body predicate. The next route that uses the read body predicate will try to read the body of the request a second time resulting in the above exception.

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions