Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parameterized Statements throws exception while using Kotlin Flow #256

Closed
xnart opened this issue May 8, 2022 · 3 comments
Closed

Parameterized Statements throws exception while using Kotlin Flow #256

xnart opened this issue May 8, 2022 · 3 comments
Labels
status: waiting-for-triage An issue we've not yet triaged

Comments

@xnart
Copy link

xnart commented May 8, 2022

Bug Report

Versions

  • Driver: r2dbc-mssql-0.8.8.RELEASE
  • Database: Microsoft SQL Server 2019 (RTM-CU15) (KB5008996) - 15.0.4198.2 (X64)
  • Java: temurin-11.0.15
  • Kotlin: 1.6.21
  • Spring Boot: 2.6.7
  • OS: Linux 5.17.5-arch1-1

Current Behavior

While collecting from the following flow, it randomly closes connection and cancels the flow. It happens when i try fetch too many rows. In my case that is 34k rows. Also, rarely it collects all rows without error.

fun findAllByIdBetween(fromId: Long, toId: Long): Flow<LegacyCustomer>
Stack trace
org.springframework.dao.DataAccessResourceFailureException: executeMany; SQL [SELECT Customers.Id, Customers.MobilePhone, Customers.FirstName, Customers.LastName, Customers.DMSId, Customers.BirthDate, Customers.Email, Customers.Gender, Customers.OccupationId, Customers.SegmentId, Customers.LFCustomerId, Customers.CreateDate, Customers.UpdateDate FROM Customers WHERE Customers.Id BETWEEN @P0_Id AND @P1_Id]; null; nested exception is io.r2dbc.mssql.client.ReactorNettyClient$MssqlConnectionException
	at org.springframework.r2dbc.connection.ConnectionFactoryUtils.convertR2dbcException(ConnectionFactoryUtils.java:226) ~[spring-r2dbc-5.3.19.jar:5.3.19]
	at org.springframework.r2dbc.core.DefaultDatabaseClient.lambda$inConnectionMany$8(DefaultDatabaseClient.java:147) ~[spring-r2dbc-5.3.19.jar:5.3.19]
	at reactor.core.publisher.Flux.lambda$onErrorMap$29(Flux.java:6946) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.Flux.lambda$onErrorResume$30(Flux.java:6999) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.deferredError(FluxUsingWhen.java:398) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxUsingWhen$RollbackInner.onComplete(FluxUsingWhen.java:475) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2058) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2058) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:145) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:145) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.Operators.complete(Operators.java:137) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:46) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.Mono.subscribe(Mono.java:4400) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:278) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:84) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:198) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.MonoSupplier.subscribe(MonoSupplier.java:62) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.Mono.subscribe(Mono.java:4400) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.Mono.subscribe(Mono.java:4400) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:84) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:132) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxFilter$FilterSubscriber.onError(FluxFilter.java:157) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxFilter$FilterConditionalSubscriber.onError(FluxFilter.java:291) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onError(FluxMap.java:259) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.Operators.error(Operators.java:198) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.MonoError.subscribe(MonoError.java:53) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.Mono.subscribe(Mono.java:4400) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onError(FluxUsingWhen.java:364) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:842) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:608) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.request(FluxFlatMap.java:347) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.request(FluxUsingWhen.java:319) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.Operators$DeferredSubscription.request(Operators.java:1680) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2158) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:790) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.request(FluxFlatMap.java:347) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.request(FluxUsingWhen.java:319) ~[reactor-core-3.4.17.jar:3.4.17]
	at org.springframework.cloud.sleuth.instrument.reactor.SleuthContextOperator.request(ReactorSleuth.java:658) ~[spring-cloud-sleuth-instrumentation-3.1.1.jar:3.1.1]
	at reactor.core.publisher.StrictSubscriber.request(StrictSubscriber.java:138) ~[reactor-core-3.4.17.jar:3.4.17]
	at kotlinx.coroutines.reactive.ReactiveSubscriber.makeRequest(ReactiveFlow.kt:153) ~[kotlinx-coroutines-reactive-1.5.2.jar:na]
	at kotlinx.coroutines.reactive.PublisherAsFlow.collectImpl(ReactiveFlow.kt:103) ~[kotlinx-coroutines-reactive-1.5.2.jar:na]
	at kotlinx.coroutines.reactive.PublisherAsFlow.access$collectImpl(ReactiveFlow.kt:48) ~[kotlinx-coroutines-reactive-1.5.2.jar:na]
	at kotlinx.coroutines.reactive.PublisherAsFlow$collectImpl$1.invokeSuspend(ReactiveFlow.kt) ~[kotlinx-coroutines-reactive-1.5.2.jar:na]
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) ~[kotlin-stdlib-1.6.21.jar:1.6.21-release-334(1.6.21)]
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106) ~[kotlinx-coroutines-core-jvm-1.5.2.jar:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: io.r2dbc.mssql.client.ReactorNettyClient$MssqlConnectionException: null
	at io.r2dbc.mssql.client.ReactorNettyClient.lambda$handleConnectionError$17(ReactorNettyClient.java:619) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE]
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
	*__checkpointSQL "SELECT Customers.Id, Customers.MobilePhone, Customers.FirstName, Customers.LastName, Customers.DMSId, Customers.BirthDate, Customers.Email, Customers.Gender, Customers.OccupationId, Customers.SegmentId, Customers.LFCustomerId, Customers.CreateDate, Customers.UpdateDate FROM Customers WHERE Customers.Id BETWEEN @P0_Id AND @P1_Id" [DatabaseClient]
Original Stack Trace:
		at io.r2dbc.mssql.client.ReactorNettyClient.lambda$handleConnectionError$17(ReactorNettyClient.java:619) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE]
		at io.r2dbc.mssql.client.ReactorNettyClient.drainError(ReactorNettyClient.java:629) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE]
		at io.r2dbc.mssql.client.ReactorNettyClient.handleConnectionError(ReactorNettyClient.java:619) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE]
		at io.r2dbc.mssql.client.ReactorNettyClient.resumeError(ReactorNettyClient.java:317) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE]
		at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94) ~[reactor-core-3.4.17.jar:3.4.17]
		at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:321) ~[reactor-core-3.4.17.jar:3.4.17]
		at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:686) ~[reactor-core-3.4.17.jar:3.4.17]
		at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.request(FluxFlattenIterable.java:274) ~[reactor-core-3.4.17.jar:3.4.17]
		at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2158) ~[reactor-core-3.4.17.jar:3.4.17]
		at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:503) ~[reactor-core-3.4.17.jar:3.4.17]
		at reactor.core.publisher.EmitterProcessor$EmitterInner.drainParent(EmitterProcessor.java:640) ~[reactor-core-3.4.17.jar:3.4.17]
		at reactor.core.publisher.FluxPublish$PubSubInner.request(FluxPublish.java:585) ~[reactor-core-3.4.17.jar:3.4.17]
		at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.4.17.jar:3.4.17]
		at org.springframework.cloud.sleuth.instrument.reactor.SleuthContextOperator.request(ReactorSleuth.java:658) ~[spring-cloud-sleuth-instrumentation-3.1.1.jar:3.1.1]
		at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.request(MonoFlatMapMany.java:112) ~[reactor-core-3.4.17.jar:3.4.17]
		at reactor.core.publisher.FluxHandle$HandleSubscriber.request(FluxHandle.java:269) ~[reactor-core-3.4.17.jar:3.4.17]
		at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.4.17.jar:3.4.17]
		at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.4.17.jar:3.4.17]
		at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.4.17.jar:3.4.17]
		at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:503) ~[reactor-core-3.4.17.jar:3.4.17]
		at reactor.core.publisher.EmitterProcessor$EmitterInner.drainParent(EmitterProcessor.java:640) ~[reactor-core-3.4.17.jar:3.4.17]
		at reactor.core.publisher.FluxPublish$PubSubInner.request(FluxPublish.java:585) ~[reactor-core-3.4.17.jar:3.4.17]
		at reactor.core.publisher.FluxHandle$HandleConditionalSubscriber.request(FluxHandle.java:475) ~[reactor-core-3.4.17.jar:3.4.17]
		at reactor.core.publisher.FluxFilter$FilterSubscriber.request(FluxFilter.java:186) ~[reactor-core-3.4.17.jar:3.4.17]
		at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.4.17.jar:3.4.17]
		at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.4.17.jar:3.4.17]
		at io.r2dbc.mssql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.request(FluxDiscardOnCancel.java:110) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE]
		at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request(FluxContextWrite.java:136) ~[reactor-core-3.4.17.jar:3.4.17]
		at reactor.core.publisher.FluxWindowPredicate$WindowFlux.drainRegular(FluxWindowPredicate.java:682) ~[reactor-core-3.4.17.jar:3.4.17]
		at reactor.core.publisher.FluxWindowPredicate$WindowFlux.drain(FluxWindowPredicate.java:746) ~[reactor-core-3.4.17.jar:3.4.17]
		at reactor.core.publisher.FluxWindowPredicate$WindowFlux.request(FluxWindowPredicate.java:833) ~[reactor-core-3.4.17.jar:3.4.17]
		at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.request(FluxHandleFuseable.java:257) ~[reactor-core-3.4.17.jar:3.4.17]
		at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144) ~[reactor-core-3.4.17.jar:3.4.17]
		at reactor.core.publisher.FluxFlatMap$FlatMapInner.request(FluxFlatMap.java:1008) ~[reactor-core-3.4.17.jar:3.4.17]
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:729) ~[reactor-core-3.4.17.jar:3.4.17]
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588) ~[reactor-core-3.4.17.jar:3.4.17]
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.request(FluxFlatMap.java:347) ~[reactor-core-3.4.17.jar:3.4.17]
		at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.request(FluxUsingWhen.java:319) ~[reactor-core-3.4.17.jar:3.4.17]
		at reactor.core.publisher.Operators$DeferredSubscription.request(Operators.java:1680) ~[reactor-core-3.4.17.jar:3.4.17]
		at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2158) ~[reactor-core-3.4.17.jar:3.4.17]
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:790) ~[reactor-core-3.4.17.jar:3.4.17]
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588) ~[reactor-core-3.4.17.jar:3.4.17]
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.request(FluxFlatMap.java:347) ~[reactor-core-3.4.17.jar:3.4.17]
		at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.request(FluxUsingWhen.java:319) ~[reactor-core-3.4.17.jar:3.4.17]
		at org.springframework.cloud.sleuth.instrument.reactor.SleuthContextOperator.request(ReactorSleuth.java:658) ~[spring-cloud-sleuth-instrumentation-3.1.1.jar:3.1.1]
		at reactor.core.publisher.StrictSubscriber.request(StrictSubscriber.java:138) ~[reactor-core-3.4.17.jar:3.4.17]
		at kotlinx.coroutines.reactive.ReactiveSubscriber.makeRequest(ReactiveFlow.kt:153) ~[kotlinx-coroutines-reactive-1.5.2.jar:na]
		at kotlinx.coroutines.reactive.PublisherAsFlow.collectImpl(ReactiveFlow.kt:103) ~[kotlinx-coroutines-reactive-1.5.2.jar:na]
		at kotlinx.coroutines.reactive.PublisherAsFlow.access$collectImpl(ReactiveFlow.kt:48) ~[kotlinx-coroutines-reactive-1.5.2.jar:na]
		at kotlinx.coroutines.reactive.PublisherAsFlow$collectImpl$1.invokeSuspend(ReactiveFlow.kt) ~[kotlinx-coroutines-reactive-1.5.2.jar:na]
		at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) ~[kotlin-stdlib-1.6.21.jar:1.6.21-release-334(1.6.21)]
		at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106) ~[kotlinx-coroutines-core-jvm-1.5.2.jar:na]
		at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
		at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
		at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: java.lang.RuntimeException: java.lang.IllegalArgumentException: Invalid header type: 0x0
	at io.r2dbc.mssql.client.StreamDecoder$ListSink.error(StreamDecoder.java:350) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE]
	at io.r2dbc.mssql.client.StreamDecoder.withState(StreamDecoder.java:135) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE]
	at io.r2dbc.mssql.client.StreamDecoder.decode(StreamDecoder.java:88) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE]
	at io.r2dbc.mssql.client.StreamDecoder.decode(StreamDecoder.java:64) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE]
	at io.r2dbc.mssql.client.ReactorNettyClient.lambda$new$6(ReactorNettyClient.java:255) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE]
	at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:351) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:686) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.request(FluxFlattenIterable.java:274) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2158) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:503) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.EmitterProcessor$EmitterInner.drainParent(EmitterProcessor.java:640) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxPublish$PubSubInner.request(FluxPublish.java:585) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.4.17.jar:3.4.17]
	at org.springframework.cloud.sleuth.instrument.reactor.SleuthContextOperator.request(ReactorSleuth.java:658) ~[spring-cloud-sleuth-instrumentation-3.1.1.jar:3.1.1]
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.request(MonoFlatMapMany.java:112) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxHandle$HandleSubscriber.request(FluxHandle.java:269) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:503) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.EmitterProcessor$EmitterInner.drainParent(EmitterProcessor.java:640) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxPublish$PubSubInner.request(FluxPublish.java:585) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxHandle$HandleConditionalSubscriber.request(FluxHandle.java:475) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxFilter$FilterSubscriber.request(FluxFilter.java:186) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.4.17.jar:3.4.17]
	at io.r2dbc.mssql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.request(FluxDiscardOnCancel.java:110) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE]
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request(FluxContextWrite.java:136) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxWindowPredicate$WindowFlux.drainRegular(FluxWindowPredicate.java:682) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxWindowPredicate$WindowFlux.drain(FluxWindowPredicate.java:746) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxWindowPredicate$WindowFlux.request(FluxWindowPredicate.java:833) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.request(FluxHandleFuseable.java:257) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxFlatMap$FlatMapInner.request(FluxFlatMap.java:1008) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:729) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.request(FluxFlatMap.java:347) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.request(FluxUsingWhen.java:319) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.Operators$DeferredSubscription.request(Operators.java:1680) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2158) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:790) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.request(FluxFlatMap.java:347) ~[reactor-core-3.4.17.jar:3.4.17]
	at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.request(FluxUsingWhen.java:319) ~[reactor-core-3.4.17.jar:3.4.17]
	at org.springframework.cloud.sleuth.instrument.reactor.SleuthContextOperator.request(ReactorSleuth.java:658) ~[spring-cloud-sleuth-instrumentation-3.1.1.jar:3.1.1]
	at reactor.core.publisher.StrictSubscriber.request(StrictSubscriber.java:138) ~[reactor-core-3.4.17.jar:3.4.17]
	at kotlinx.coroutines.reactive.ReactiveSubscriber.makeRequest(ReactiveFlow.kt:153) ~[kotlinx-coroutines-reactive-1.5.2.jar:na]
	at kotlinx.coroutines.reactive.PublisherAsFlow.collectImpl(ReactiveFlow.kt:103) ~[kotlinx-coroutines-reactive-1.5.2.jar:na]
	at kotlinx.coroutines.reactive.PublisherAsFlow.access$collectImpl(ReactiveFlow.kt:48) ~[kotlinx-coroutines-reactive-1.5.2.jar:na]
	at kotlinx.coroutines.reactive.PublisherAsFlow$collectImpl$1.invokeSuspend(ReactiveFlow.kt) ~[kotlinx-coroutines-reactive-1.5.2.jar:na]
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) ~[kotlin-stdlib-1.6.21.jar:1.6.21-release-334(1.6.21)]
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106) ~[kotlinx-coroutines-core-jvm-1.5.2.jar:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: java.lang.IllegalArgumentException: Invalid header type: 0x0
	at io.r2dbc.mssql.message.header.Type.valueOf(Type.java:68) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE]
	at io.r2dbc.mssql.message.header.Header.decode(Header.java:215) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE]
	at io.r2dbc.mssql.client.StreamDecoder$DecoderState.readChunk(StreamDecoder.java:289) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE]
	at io.r2dbc.mssql.client.StreamDecoder.withState(StreamDecoder.java:112) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE]
	... 53 common frames omitted

Table schema

Input Code
data class LegacyCustomer(
    @Id @Column("Id") val id: Long? = null,
    @Column("MobilePhone") val mobilePhone: String?,
    @Column("FirstName") var firstName: String? = null,
    @Column("LastName") var lastName: String? = null,
    @Column("DMSId") var dmsId: String? = null,
    @Column("BirthDate") var birthDate: LocalDate? = null,
    @Column("Email") var email: String? = null,
    @Column("Gender") var gender: Int? = null,
    @Column("OccupationId") var occupationId: Long? = null,
    @Column("SegmentId") var segmentId: Long? = null,
    @Column("LFCustomerId") var customerId: String? = null,
    @CreatedDate @Column("CreateDate") var createDate: Instant? = null,
    @LastModifiedDate @Column("UpdateDate") var updateDate: Instant? = null
)

Steps to reproduce

Input Code
interface LegacyCustomerRepository : CoroutineCrudRepository<LegacyCustomer, Long> {

    fun findAllByIdBetween(fromId: Long, toId: Long): Flow<LegacyCustomer>
}

@Service
class LegacyCustomerService(private val legacyCustomerRepository: LegacyCustomerRepository){

    suspend fun createCustomerMigration(customerMigrationRequest: CustomerMigrationRequest) {
        with(customerMigrationRequest) {
            logger.info("Legacy customer migration started from id: $startFromId to id: $endToId.")

            legacyCustomerRepository.findAllByIdBetween(startFromId, endToId)
                .onCompletion {
                    if (it == null) logger.info("Legacy customer migration finished. from id: $startFromId to id: $endToId.", it)
                }
                .catch {
                    logger.error("Legacy customer migration finished with error. from id: $startFromId to id: $endToId.", it)
                }
                .onEach {
                    try {
                        logger.info("Migrating customer with id: ${it.id}")
                        // ... another suspend method call
                    } catch (exception: Throwable) {
                        logger.error("Migrating failed for customer with id: ${it.id}", exception)
                    }
                }.launchIn(CoroutineScope(Dispatchers.IO))
        }
    }
}

Expected behavior/code

Flow should be collected without errors as that is while using Reactor Flux. I do not face with the error if i use Flux and subscribe it:

fun findAllByIdBetween(fromId: Long, toId: Long): Flow<LegacyCustomer>

legacyCustomerRepo.findAllByIdBetween(startFromId, endToId)
    .doAfterTerminate {
        logger.info("Legacy customer migration finished. from id: $startFromId to id: $endToId.")
    }
    .doOnError {
        logger.error("Legacy customer migration finished with error. from id: $startFromId to id: $endToId.", it)
    }
    .subscribe {
        try {
            logger.info("Migrating customer with id: ${it.id}")
            runBlocking {
                // ... another suspend method call
            }
        } catch (exception: Throwable) {
            logger.error("Migrating failed for customer with id: ${it.id}", exception)
        }
    }

Possible Solution

When i used cursored exchange with fetch size as 128 instead of direct exchange, it worked as expected. But had faced with following prepared statement cache error when i called second time:
Could not find prepared statement with handle 1073741825

Additional context

@mp911de
Copy link
Member

mp911de commented Jul 14, 2022

This looks like a protocol offset error. There are a lot of moving parts involved. Can you provide a minimal reproducer using R2DBC MSSQL code only (without the use of Spring)? Otherwise, it will be next to impossible to diagnose the issue.

@mp911de mp911de added the status: waiting-for-triage An issue we've not yet triaged label Jul 14, 2022
@mp911de
Copy link
Member

mp911de commented May 25, 2023

Closing due to lack of requested feedback. If you would like us to look at this issue, please provide the requested information and we will re-open the issue.

@mp911de mp911de closed this as not planned Won't fix, can't repro, duplicate, stale May 25, 2023
@SimenRokaas
Copy link

SimenRokaas commented May 31, 2023

Hi @mp911de, this error (Could not find prepared statement with handle 1073741825) now pops up again in version 1.0.1 when using prepared statements (bind method). First sql after startup runs fine, but subsequent sqls fail.

The error is not present in version 1.0.0, and the only code change I can see which has anything to do with prepared statements seems to be the removal of the no-param constructor in ConnectionOptions which was part of the commit which resolved issue 267.

We use bind a lot, so if it is possible to resolve this in the 1.x branch it would be much appreciated as we need to use Spring Boot 3 which is wired to the 1.x branch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: waiting-for-triage An issue we've not yet triaged
Projects
None yet
Development

No branches or pull requests

3 participants