Skip to content

Commit

Permalink
Add support for Coroutines Flow
Browse files Browse the repository at this point in the history
Flow is a Kotlin Coroutines related cold asynchronous
stream of the data, that emits from zero to N (where N
can be unbounded) values and completes normally or with
an exception.

It is conceptually the Coroutines equivalent of Flux with
an extension oriented API design, easy custom operator
capabilities and some suspending methods.

This commit leverages Flow <-> Flux interoperability
to support Flow on controller handler method parameters
or return values, and also adds Flow based extensions to
WebFlux.fn. It allows to reach a point when we can consider
Spring Framework officially supports Coroutines even if some
additional work remains to be done like adding
interoperability between Reactor and Coroutines contexts.

Flow is currently an experimental API that is expected to
become final before Spring Framework 5.2 GA.

Close spring-projectsgh-19975
  • Loading branch information
sdeleuze committed Apr 4, 2019
1 parent e1080f8 commit 15ec3e5
Show file tree
Hide file tree
Showing 11 changed files with 210 additions and 3 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Expand Up @@ -29,7 +29,7 @@ ext {
}

aspectjVersion = "1.9.2"
coroutinesVersion = "1.2.0-alpha"
coroutinesVersion = "1.2.0-alpha-2"
freemarkerVersion = "2.3.28"
groovyVersion = "2.5.6"
hsqldbVersion = "2.4.1"
Expand Down
Expand Up @@ -27,6 +27,9 @@
import io.reactivex.Flowable;
import kotlinx.coroutines.CompletableDeferredKt;
import kotlinx.coroutines.Deferred;
import kotlinx.coroutines.flow.FlowKt;
import kotlinx.coroutines.reactive.flow.FlowAsPublisherKt;
import kotlinx.coroutines.reactive.flow.PublisherAsFlowKt;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -97,6 +100,10 @@ public ReactiveAdapterRegistry() {
if (ClassUtils.isPresent("kotlinx.coroutines.Deferred", classLoader)) {
new CoroutinesRegistrar().registerAdapters(this);
}
// TODO Use a single CoroutinesRegistrar when Flow will be not experimental anymore
if (ClassUtils.isPresent("kotlinx.coroutines.flow.Flow", classLoader)) {
new CoroutinesFlowRegistrar().registerAdapters(this);
}
}


Expand Down Expand Up @@ -335,7 +342,17 @@ void registerAdapters(ReactiveAdapterRegistry registry) {
source -> CoroutinesUtils.deferredToMono((Deferred<?>) source),
source -> CoroutinesUtils.monoToDeferred(Mono.from(source)));
}
}

private static class CoroutinesFlowRegistrar {

void registerAdapters(ReactiveAdapterRegistry registry) {
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(kotlinx.coroutines.flow.Flow.class, FlowKt::emptyFlow),
source -> FlowAsPublisherKt.from((kotlinx.coroutines.flow.Flow<?>) source),
PublisherAsFlowKt::from
);
}
}

}
Expand Up @@ -17,14 +17,21 @@
package org.springframework.core

import kotlinx.coroutines.Deferred
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking
import org.junit.Assert.assertEquals
import org.junit.Assert.assertTrue
import org.junit.Assert.fail
import org.junit.Test
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.test.StepVerifier
import java.time.Duration
import kotlin.reflect.KClass

Expand All @@ -49,6 +56,36 @@ class KotlinReactiveAdapterRegistryTests {

}

@Test
@FlowPreview
fun flowToPublisher() {
val source = flow {
emit(1)
emit(2)
emit(3)
}
val target: Publisher<Int> = getAdapter(Flow::class).toPublisher(source)
assertTrue("Expected Flux Publisher: " + target.javaClass.name, target is Flux<*>)
StepVerifier.create(target)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.verifyComplete()
}

@Test
@FlowPreview
fun publisherToFlow() {
val source = Flux.just(1, 2, 3)
val target = getAdapter(Flow::class).fromPublisher(source)
if (target is Flow<*>) {
assertEquals(listOf(1, 2, 3), runBlocking { target.toList() })
}
else {
fail()
}
}

private fun getAdapter(reactiveType: KClass<*>): ReactiveAdapter {
return this.registry.getAdapter(reactiveType.java)!!
}
Expand Down
Expand Up @@ -16,8 +16,11 @@

package org.springframework.web.reactive.function.client

import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.reactive.awaitSingle
import kotlinx.coroutines.reactive.flow.asFlow
import org.springframework.core.ParameterizedTypeReference
import org.springframework.http.ResponseEntity
import reactor.core.publisher.Flux
Expand Down Expand Up @@ -45,6 +48,19 @@ inline fun <reified T : Any> ClientResponse.bodyToMono(): Mono<T> =
inline fun <reified T : Any> ClientResponse.bodyToFlux(): Flux<T> =
bodyToFlux(object : ParameterizedTypeReference<T>() {})

/**
* Coroutines [kotlinx.coroutines.flow.Flow] based variant of [ClientResponse.bodyToFlux].
*
* Backpressure is controlled by [batchSize] parameter that controls the size of in-flight elements
* and [org.reactivestreams.Subscription.request] size.
*
* @author Sebastien Deleuze
* @since 5.2
*/
@FlowPreview
inline fun <reified T : Any> ClientResponse.bodyToFlow(batchSize: Int = 1): Flow<T> =
bodyToFlux<T>().asFlow(batchSize)

/**
* Extension for [ClientResponse.toEntity] providing a `toEntity<Foo>()` variant
* leveraging Kotlin reified type parameters. This extension is not subject to type
Expand Down
Expand Up @@ -17,8 +17,12 @@
package org.springframework.web.reactive.function.client

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.awaitSingle
import kotlinx.coroutines.reactive.flow.asFlow
import kotlinx.coroutines.reactive.flow.asPublisher
import kotlinx.coroutines.reactor.mono
import org.reactivestreams.Publisher
import org.springframework.core.ParameterizedTypeReference
Expand All @@ -28,7 +32,7 @@ import reactor.core.publisher.Flux
import reactor.core.publisher.Mono

/**
* Extension for [WebClient.RequestBodySpec.body] providing a `body<Foo>()` variant
* Extension for [WebClient.RequestBodySpec.body] providing a `body(Publisher<T>)` variant
* leveraging Kotlin reified type parameters. This extension is not subject to type
* erasure and retains actual generic type arguments.
*
Expand All @@ -39,6 +43,18 @@ import reactor.core.publisher.Mono
inline fun <reified T : Any, S : Publisher<T>> RequestBodySpec.body(publisher: S): RequestHeadersSpec<*> =
body(publisher, object : ParameterizedTypeReference<T>() {})

/**
* Coroutines [Flow] based extension for [WebClient.RequestBodySpec.body] providing a
* body(Flow<T>)` variant leveraging Kotlin reified type parameters. This extension is
* not subject to type erasure and retains actual generic type arguments.
*
* @author Sebastien Deleuze
* @since 5.2
*/
@FlowPreview
inline fun <reified T : Any, S : Flow<T>> RequestBodySpec.body(flow: S): RequestHeadersSpec<*> =
body(flow.asPublisher(), object : ParameterizedTypeReference<T>() {})

/**
* Extension for [WebClient.ResponseSpec.bodyToMono] providing a `bodyToMono<Foo>()` variant
* leveraging Kotlin reified type parameters. This extension is not subject to type
Expand All @@ -62,6 +78,20 @@ inline fun <reified T : Any> WebClient.ResponseSpec.bodyToMono(): Mono<T> =
inline fun <reified T : Any> WebClient.ResponseSpec.bodyToFlux(): Flux<T> =
bodyToFlux(object : ParameterizedTypeReference<T>() {})

/**
* Coroutines [kotlinx.coroutines.flow.Flow] based variant of [WebClient.ResponseSpec.bodyToFlux].
*
* Backpressure is controlled by [batchSize] parameter that controls the size of in-flight elements
* and [org.reactivestreams.Subscription.request] size.
*
* @author Sebastien Deleuze
* @since 5.2
*/
@FlowPreview
inline fun <reified T : Any> WebClient.ResponseSpec.bodyToFlow(batchSize: Int = 1): Flow<T> =
bodyToFlux<T>().asFlow(batchSize)


/**
* Coroutines variant of [WebClient.RequestHeadersSpec.exchange].
*
Expand Down
Expand Up @@ -16,8 +16,11 @@

package org.springframework.web.reactive.function.server

import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.reactive.awaitSingle
import kotlinx.coroutines.reactive.flow.asFlow
import org.springframework.core.ParameterizedTypeReference
import org.springframework.http.codec.multipart.Part
import org.springframework.util.MultiValueMap
Expand Down Expand Up @@ -48,6 +51,19 @@ inline fun <reified T : Any> ServerRequest.bodyToMono(): Mono<T> =
inline fun <reified T : Any> ServerRequest.bodyToFlux(): Flux<T> =
bodyToFlux(object : ParameterizedTypeReference<T>() {})

/**
* Coroutines [kotlinx.coroutines.flow.Flow] based variant of [ServerRequest.bodyToFlux].
*
* Backpressure is controlled by [batchSize] parameter that controls the size of in-flight elements
* and [org.reactivestreams.Subscription.request] size.
*
* @author Sebastien Deleuze
* @since 5.2
*/
@FlowPreview
inline fun <reified T : Any> ServerRequest.bodyToFlow(batchSize: Int = 1): Flow<T> =
bodyToFlux<T>().asFlow(batchSize)

/**
* Non-nullable Coroutines variant of [ServerRequest.bodyToMono].
*
Expand Down
Expand Up @@ -16,7 +16,10 @@

package org.springframework.web.reactive.function.server

import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.awaitSingle
import kotlinx.coroutines.reactive.flow.asPublisher
import org.reactivestreams.Publisher
import org.springframework.core.ParameterizedTypeReference
import org.springframework.http.MediaType
Expand Down Expand Up @@ -74,6 +77,19 @@ fun ServerResponse.BodyBuilder.html() = contentType(MediaType.TEXT_HTML)
suspend fun ServerResponse.HeadersBuilder<out ServerResponse.HeadersBuilder<*>>.buildAndAwait(): ServerResponse =
build().awaitSingle()


/**
* Coroutines [Flow] based extension for [ServerResponse.BodyBuilder.body] providing a
* `body(Flow<T>)` variant. This extension is not subject to type erasure and retains
* actual generic type arguments.
*
* @author Sebastien Deleuze
* @since 5.0
*/
@FlowPreview
suspend inline fun <reified T : Any> ServerResponse.BodyBuilder.bodyAndAwait(flow: Flow<T>): ServerResponse =
body(flow.asPublisher(), object : ParameterizedTypeReference<T>() {}).awaitSingle()

/**
* Coroutines variant of [ServerResponse.BodyBuilder.syncBody].
*
Expand All @@ -83,6 +99,18 @@ suspend fun ServerResponse.HeadersBuilder<out ServerResponse.HeadersBuilder<*>>.
suspend fun ServerResponse.BodyBuilder.bodyAndAwait(body: Any): ServerResponse =
syncBody(body).awaitSingle()

/**
* Coroutines [Flow] based extension for [ServerResponse.BodyBuilder.body] providing a
* `bodyToServerSentEvents(Flow<T>)` variant. This extension is not subject to type
* erasure and retains actual generic type arguments.
*
* @author Sebastien Deleuze
* @since 5.0
*/
@FlowPreview
suspend inline fun <reified T : Any> ServerResponse.BodyBuilder.bodyToServerSentEventsAndAwait(flow: Flow<T>): ServerResponse =
contentType(MediaType.TEXT_EVENT_STREAM).body(flow.asPublisher(), object : ParameterizedTypeReference<T>() {}).awaitSingle()


/**
* Coroutines variant of [ServerResponse.BodyBuilder.syncBody] without the sync prefix since it is ok to use it within
Expand Down
Expand Up @@ -19,6 +19,7 @@ package org.springframework.web.reactive.function.client
import io.mockk.every
import io.mockk.mockk
import io.mockk.verify
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.runBlocking
import org.junit.Assert.assertEquals
import org.junit.Assert.assertNull
Expand Down Expand Up @@ -49,6 +50,13 @@ class ClientResponseExtensionsTests {
verify { response.bodyToFlux(object : ParameterizedTypeReference<List<Foo>>() {}) }
}

@Test
@FlowPreview
fun `bodyToFlow with reified type parameters`() {
response.bodyToFlow<List<Foo>>()
verify { response.bodyToFlux(object : ParameterizedTypeReference<List<Foo>>() {}) }
}

@Test
fun `toEntity with reified type parameters`() {
response.toEntity<List<Foo>>()
Expand Down
Expand Up @@ -19,6 +19,8 @@ package org.springframework.web.reactive.function.client
import io.mockk.every
import io.mockk.mockk
import io.mockk.verify
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.runBlocking
import org.junit.Assert.assertEquals
import org.junit.Test
Expand All @@ -45,6 +47,14 @@ class WebClientExtensionsTests {
verify { requestBodySpec.body(body, object : ParameterizedTypeReference<List<Foo>>() {}) }
}

@Test
@FlowPreview
fun `RequestBodySpec#body with Flow and reified type parameters`() {
val body = mockk<Flow<List<Foo>>>()
requestBodySpec.body(body)
verify { requestBodySpec.body(ofType<Publisher<List<Foo>>>(), object : ParameterizedTypeReference<List<Foo>>() {}) }
}

@Test
fun `ResponseSpec#bodyToMono with reified type parameters`() {
responseSpec.bodyToMono<List<Foo>>()
Expand All @@ -57,6 +67,13 @@ class WebClientExtensionsTests {
verify { responseSpec.bodyToFlux(object : ParameterizedTypeReference<List<Foo>>() {}) }
}

@Test
@FlowPreview
fun `bodyToFlow with reified type parameters`() {
responseSpec.bodyToFlow<List<Foo>>()
verify { responseSpec.bodyToFlux(object : ParameterizedTypeReference<List<Foo>>() {}) }
}

@Test
fun awaitExchange() {
val response = mockk<ClientResponse>()
Expand Down
Expand Up @@ -19,6 +19,7 @@ package org.springframework.web.reactive.function.client
import io.mockk.every
import io.mockk.mockk
import io.mockk.verify
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.runBlocking
import org.junit.Assert.assertEquals
import org.junit.Assert.assertNull
Expand Down Expand Up @@ -52,6 +53,13 @@ class ServerRequestExtensionsTests {
verify { request.bodyToFlux(object : ParameterizedTypeReference<List<Foo>>() {}) }
}

@Test
@FlowPreview
fun `bodyToFlow with reified type parameters`() {
request.bodyToFlow<List<Foo>>()
verify { request.bodyToFlux(object : ParameterizedTypeReference<List<Foo>>() {}) }
}

@Test
fun awaitBody() {
every { request.bodyToMono<String>() } returns Mono.just("foo")
Expand Down

0 comments on commit 15ec3e5

Please sign in to comment.