Skip to content

Commit

Permalink
GH-792 Fix Supplier streaming in s-c-function-web
Browse files Browse the repository at this point in the history
Resolves #792
  • Loading branch information
olegz committed Jan 24, 2022
1 parent 8ceb6ed commit fef8efc
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 50 deletions.
21 changes: 6 additions & 15 deletions spring-cloud-function-samples/function-sample/pom.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>io.spring.sample</groupId>
Expand All @@ -14,29 +14,20 @@
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<<<<<<< HEAD
<version>2.6.1</version>
=======
<version>3.0.0-SNAPSHOT</version>
>>>>>>> 4.x
<relativePath/>
<relativePath />
</parent>

<properties>
<<<<<<< HEAD
<java.version>1.8</java.version>
<spring-cloud-function.version>3.2.2-SNAPSHOT</spring-cloud-function.version>
=======
<spring-cloud-function.version>4.0.0-SNAPSHOT</spring-cloud-function.version>
>>>>>>> 4.x
<wrapper.version>1.0.27.RELEASE</wrapper.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-function-webflux</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.example;

import org.springframework.web.reactive.function.client.WebClient;

/**
* Sample client to test infinite stream from function.
*
* @author Oleg Zhurakousky
*
*/
public class Client {

public static void main(String[] args) throws Exception {
WebClient client = WebClient.create();
WebClient.ResponseSpec responseSpec = client.post()
.uri("http://localhost:8080/infinite")
.header("accept", "text/event-stream")
.retrieve();

responseSpec.bodyToFlux(String.class).subscribe(v -> {
System.out.println(v);
});

System.in.read();

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@

package com.example;

import java.time.Duration;
import java.util.function.Function;
import java.util.function.Supplier;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
Expand Down Expand Up @@ -55,9 +57,11 @@ public Supplier<String> hello() {
}

@Bean
public Supplier<Flux<String>> words() {
return () -> Flux.fromArray(new String[] {"foo", "bar"});
public Supplier<Flux<String>> infinite() {
return () -> Flux
.interval(Duration.ofSeconds(1))
.log()
.map(counter -> String.format("Counter: %s", counter));
}

}
// @checkstyle:on
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,6 @@ public void testHello() {
assertThat(output).isEqualTo("hello");
}

@Test
public void testWords() {
Flux<String> output = this.functions.words().get();
List<String> results = output.collectList().block();
assertThat(results.size()).isEqualTo(2);
assertThat(results.get(0)).isEqualTo("foo");
assertThat(results.get(1)).isEqualTo("bar");
}

@Test
public void testGreeter() {
assertThat(new Greeter().apply("World")).isEqualTo("Hello World");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.springframework.cloud.function.web.flux;

import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -74,27 +75,25 @@ public Mono<ResponseEntity<?>> post(ServerWebExchange request,
return (Mono<ResponseEntity<?>>) FunctionWebRequestProcessingHelper.processRequest(wrapper(request), body, false);
}

@SuppressWarnings("unchecked")
@PostMapping(path = "/**", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Mono<ResponseEntity<?>> postStream(ServerWebExchange request, @RequestBody(required = false) Flux<String> body) {
return (Mono<ResponseEntity<?>>) FunctionWebRequestProcessingHelper.processRequest(wrapper(request), body, false);
public Publisher<?> postStream(ServerWebExchange request, @RequestBody(required = false) Flux<String> body) {
return FunctionWebRequestProcessingHelper.processRequest(wrapper(request), body, true);
}

@SuppressWarnings("unchecked")
@GetMapping(path = "/**")
@GetMapping(path = "/**", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Mono<ResponseEntity<?>> get(ServerWebExchange request) {
public Publisher<?> getStream(ServerWebExchange request) {
FunctionWrapper wrapper = wrapper(request);
return (Mono<ResponseEntity<?>>) FunctionWebRequestProcessingHelper.processRequest(wrapper, wrapper.getArgument(), false);
return FunctionWebRequestProcessingHelper.processRequest(wrapper, wrapper.getArgument(), true);
}

@SuppressWarnings("unchecked")
@GetMapping(path = "/**", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@GetMapping(path = "/**")
@ResponseBody
public Mono<ResponseEntity<?>> getStream(ServerWebExchange request) {
public Mono<ResponseEntity<?>> get(ServerWebExchange request) {
FunctionWrapper wrapper = wrapper(request);
return (Mono<ResponseEntity<?>>) FunctionWebRequestProcessingHelper.processRequest(wrapper, wrapper.getArgument(), true);
return (Mono<ResponseEntity<?>>) FunctionWebRequestProcessingHelper.processRequest(wrapper, wrapper.getArgument(), false);
}

private FunctionWrapper wrapper(ServerWebExchange request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,21 +94,18 @@ public Mono<ResponseEntity<Publisher<?>>> postStream(WebRequest request,
.headers(response.getHeaders()).body((Publisher<?>) response.getBody()));
}

@SuppressWarnings("unchecked")
@GetMapping(path = "/**", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Mono<ResponseEntity<Publisher<?>>> getStream(WebRequest request) {
public Publisher<?> getStream(WebRequest request) {
FunctionWrapper wrapper = wrapper(request);
return ((Mono<ResponseEntity<?>>) FunctionWebRequestProcessingHelper
.processRequest(wrapper, wrapper.getArgument(), true)).map(response -> ResponseEntity.ok()
.headers(response.getHeaders()).body((Publisher<?>) response.getBody()));
return FunctionWebRequestProcessingHelper
.processRequest(wrapper, wrapper.getArgument(), true);
}

@PostMapping(path = "/**")
@ResponseBody
public Object post(WebRequest request, @RequestBody(required = false) String body) {
String argument = StringUtils.hasText(body) ? body : "";
return FunctionWebRequestProcessingHelper.processRequest(wrapper(request), argument, false);
return FunctionWebRequestProcessingHelper.processRequest(wrapper(request), body, false);
}

@GetMapping(path = "/**")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public static Object invokeFunction(FunctionInvocationWrapper function, Object i
}

@SuppressWarnings({ "rawtypes", "unchecked" })
public static Object processRequest(FunctionWrapper wrapper, Object argument, boolean eventStream) {
public static Publisher<?> processRequest(FunctionWrapper wrapper, Object argument, boolean eventStream) {
FunctionInvocationWrapper function = wrapper.getFunction();

if (function == null) {
Expand All @@ -99,7 +99,7 @@ public static Object processRequest(FunctionWrapper wrapper, Object argument, bo
function.setSkipOutputConversion(true);
}

Object input = argument == null ? Flux.empty() : (argument instanceof Publisher ? Flux.from((Publisher) argument) : inputMessage);
Object input = argument == null ? "" : (argument instanceof Publisher ? Flux.from((Publisher) argument) : inputMessage);

Object result = function.apply(input);
if (function.isConsumer()) {
Expand All @@ -115,7 +115,7 @@ public static Object processRequest(FunctionWrapper wrapper, Object argument, bo
if (result instanceof Publisher) {
pResult = (Publisher) result;
if (eventStream) {
return Flux.from(pResult).then(Mono.fromSupplier(() -> responseOkBuilder.body(result)));
return Flux.from(pResult);
}

if (pResult instanceof Flux) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,8 @@ public void uppercaseJsonArray() throws Exception {
@Test
@DirtiesContext
public void uppercaseSSE() throws Exception {
String s = this.rest.exchange(RequestEntity.post(new URI("/uppercase")).contentType(MediaType.APPLICATION_JSON)
.body("[\"foo\",\"bar\"]"), String.class).getBody();
assertThat(this.rest.exchange(RequestEntity.post(new URI("/uppercase")).contentType(MediaType.APPLICATION_JSON)
.body("[\"foo\",\"bar\"]"), String.class).getBody())
.isEqualTo(sse("(FOO)", "(BAR)"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,18 @@ public void testIssue274() throws Exception {
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
}

@Test
public void testIssue274WithData() throws Exception {
SpringApplication.run(Issue274Configuration.class);
TestRestTemplate testRestTemplate = new TestRestTemplate();
String port = System.getProperty("server.port");
Thread.sleep(200);
ResponseEntity<String> response = testRestTemplate
.postForEntity(new URI("http://localhost:" + port + "/echo"), "hello", String.class);
assertThat(response.getBody()).isEqualTo("HELLO");
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
}


@SpringBootApplication
protected static class Issue274Configuration {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,12 +290,29 @@ public void uppercaseJsonArray() throws Exception {

@Test
public void uppercaseSSE() throws Exception {
assertThat(this.rest.exchange(RequestEntity.post(new URI("/uppercase"))
.accept(EVENT_STREAM).contentType(MediaType.APPLICATION_JSON)
String s = this.rest.exchange(RequestEntity.post(new URI("/uppercase")).contentType(MediaType.APPLICATION_JSON)
.body("[\"foo\",\"bar\"]"), String.class).getBody();
assertThat(this.rest.exchange(RequestEntity.post(new URI("/uppercase")).contentType(MediaType.APPLICATION_JSON)
.body("[\"foo\",\"bar\"]"), String.class).getBody())
.isEqualTo(sse("(FOO)", "(BAR)"));
}

// @Test
// public void uppercaseSSE() throws Exception {
// assertThat(this.rest.exchange(RequestEntity.post(new URI("/uppercase"))
// .accept(EVENT_STREAM).contentType(MediaType.APPLICATION_JSON)
// .body("[\"foo\",\"bar\"]"), String.class).getBody())
// .isEqualTo(sse("(FOO)", "(BAR)"));
//
//// String body = this.rest.exchange(RequestEntity.post(new URI("/uppercase")).contentType(MediaType.APPLICATION_JSON)
//// .body("[\"foo\",\"bar\"]"), String.class).getBody();
//
//// System.out.println(body);
//
//// assertThat(body)
//// .isEqualTo(sse("(FOO)", "(BAR)"));
// }

@Test
public void sum() throws Exception {

Expand Down Expand Up @@ -334,7 +351,8 @@ public void count() throws Exception {
}

private String sse(String... values) {
return "data:" + StringUtils.arrayToDelimitedString(values, "\n\ndata:") + "\n\n";
//return "data:" + StringUtils.arrayToDelimitedString(values, "\n\ndata:") + "\n\n";
return "[\"" + StringUtils.arrayToDelimitedString(values, "\",\"") + "\"]";
}

@EnableAutoConfiguration
Expand Down

0 comments on commit fef8efc

Please sign in to comment.