Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

converting from Stream to Flux fails [DATAJPA-1510] #1822

Closed
spring-projects-issues opened this issue Mar 1, 2019 · 3 comments
Closed

converting from Stream to Flux fails [DATAJPA-1510] #1822

spring-projects-issues opened this issue Mar 1, 2019 · 3 comments
Assignees
Labels
in: core status: declined type: bug

Comments

@spring-projects-issues
Copy link

@spring-projects-issues spring-projects-issues commented Mar 1, 2019

John Cao opened DATAJPA-1510 and commented

  1. This issue seems only happens to Kotlin, rather than Java, not sure!
  2. define a Stream return type in a JpaRepository:
    @Query("select city from City city")
    fun findAllCities(): Stream<City>
  3. walk through this stream and convert to Flux in CityServiceImpl:
    @Transactional
    override fun findAll(): Flux<City> {

//idea 1 FAIL
return Flux.generate {
cityRepository.findAllCities().use {
stream2 -> stream2.forEach

{ city -> it.next(city) }

}
}

//idea 2 FAIL
// return cityRepository.findAllCities().toFlux()

//idea 3 FAIL
// return Flux.fromStream(cityRepository.findAllCities())
}

  1. then the 'Flux.generate' will break and through exception:

2019-02-28 20:34:21.663 ERROR 57324 — [ctor-http-nio-2] a.w.r.e.AbstractErrorWebExceptionHandler : [dc8fec58] 500 Server Error for HTTP GET "/city/all"

org.springframework.dao.InvalidDataAccessApiUsageException: You're trying to execute a streaming query method without a surrounding transaction that keeps the connection open so that the Stream can actually be consumed. Make sure the code consuming the stream uses @Transactional or any other way of declaring a (read-only) transaction.
at org.springframework.data.jpa.repository.query.JpaQueryExecution$StreamExecution.doExecute(JpaQueryExecution.java:358) ~[spring-data-jpa-2.1.5.RELEASE.jar:2.1.5.RELEASE]
at org.springframework.data.jpa.repository.query.JpaQueryExecution.execute(JpaQueryExecution.java:91) ~[spring-data-jpa-2.1.5.RELEASE.jar:2.1.5.RELEASE]
at org.springframework.data.jpa.repository.query.AbstractJpaQuery.doExecute(AbstractJpaQuery.java:136) ~[spring-data-jpa-2.1.5.RELEASE.jar:2.1.5.RELEASE]
at org.springframework.data.jpa.repository.query.AbstractJpaQuery.execute(AbstractJpaQuery.java:125) ~[spring-data-jpa-2.1.5.RELEASE.jar:2.1.5.RELEASE]
at org.springframework.data.repository.core.support.RepositoryFactorySupport$QueryExecutorMethodInterceptor.doInvoke(RepositoryFactorySupport.java:605) ~[spring-data-commons-2.1.5.RELEASE.jar:2.1.5.RELEASE]
at org.springframework.data.repository.core.support.RepositoryFactorySupport$QueryExecutorMethodInterceptor.lambda$invoke$3(RepositoryFactorySupport.java:595) ~[spring-data-commons-2.1.5.RELEASE.jar:2.1.5.RELEASE]
at org.springframework.data.repository.core.support.RepositoryFactorySupport$QueryExecutorMethodInterceptor.invoke(RepositoryFactorySupport.java:595) ~[spring-data-commons-2.1.5.RELEASE.jar:2.1.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.data.projection.DefaultMethodInvokingMethodInterceptor.invoke(DefaultMethodInvokingMethodInterceptor.java:59) ~[spring-data-commons-2.1.5.RELEASE.jar:2.1.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:294) ~[spring-tx-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98) ~[spring-tx-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:139) ~[spring-tx-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.data.jpa.repository.support.CrudMethodMetadataPostProcessor$CrudMethodMetadataPopulatingMethodInterceptor.invoke(CrudMethodMetadataPostProcessor.java:135) ~[spring-data-jpa-2.1.5.RELEASE.jar:2.1.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:93) ~[spring-aop-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.data.repository.core.support.SurroundingTransactionDetectorMethodInterceptor.invoke(SurroundingTransactionDetectorMethodInterceptor.java:61) ~[spring-data-commons-2.1.5.RELEASE.jar:2.1.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.data.repository.core.support.MethodInvocationValidator.invoke(MethodInvocationValidator.java:99) ~[spring-data-commons-2.1.5.RELEASE.jar:2.1.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) ~[spring-aop-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at com.sun.proxy.$Proxy99.findAllCities(Unknown Source) ~[na:na]
at com.example.demo.CityServiceImpl$findAll$1.accept(CityServiceImpl.kt:13) ~[classes/:na]
at com.example.demo.CityServiceImpl$findAll$1.accept(CityServiceImpl.kt:8) ~[classes/:na]
at reactor.core.publisher.FluxGenerate.lambda$new$1(FluxGenerate.java:56) ~[reactor-core-3.2.6.RELEASE.jar:3.2.6.RELEASE]


Affects: 2.1.5 (Lovelace SR5)

Reference URL: https://github.com/jhcao23/spring-boot-jpa-stream

@spring-projects-issues
Copy link
Author

@spring-projects-issues spring-projects-issues commented Mar 1, 2019

Oliver Drotbohm commented

You cannot wrap a Stream into a lazy Flux and the exception message hints at why that is: That would keep the Stream open until an undetermined point in time and it's unclear who would be responsible for freeing the resources. With JPA, by default, the lifecycle of an EntityManager is bound to the transaction. I.e. the EntityManager gets closed once the method is left. That renders the Stream unusable.

Generally speaking, it doesn't make much sense to try to use JPA in a reactive application as it's a fundamentally blocking API. If it weren't we would have added out of the box support for Flux and Mono to Spring Data JPA in the first place. The fact that you have to do this weird wrapping manually is supposed to be an indicator that what you try to achieve is not a good idea in the first place.

All of that is the reason we invest so heavily into R2DBC and the corresponding Spring Data module. If you really know that you're okay with the blocking although this completely breaks the benefits you get from reactive programming, just let your repository return a List and then use Flux.fromIterable(…)

@spring-projects-issues
Copy link
Author

@spring-projects-issues spring-projects-issues commented Mar 1, 2019

John Cao commented

Thank you for your quick response Oliver! 

We're using Oracle on a huge data table although I wish R2DBC could help. It can't return List or Iterator since the dataset is too large. But you're right! If there is a way to convert Stream to Flux, you've already done. I shouldn't consider start from Stream.

So I changed my plan and used Page<T> instead of Stream to load all the huge data page by page then Flux.generate from this consumption process. Then my controller can continue sending to the frontend 'Hey this data has been transformed...'.

Thank you for your hint! Awesome

@spring-projects-issues
Copy link
Author

@spring-projects-issues spring-projects-issues commented May 22, 2019

Jens Schauder commented

Batch closing resolved issue without a fix version and a resolution indicating that there is nothing to release (Won't fix, Invalid ...)

@spring-projects-issues spring-projects-issues added type: bug status: declined in: core labels Dec 30, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
in: core status: declined type: bug
Projects
None yet
Development

No branches or pull requests

2 participants