Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide API in WebTestClient to test SSE [SPR-17248] #21781

Closed
spring-projects-issues opened this issue Sep 5, 2018 · 9 comments
Closed

Provide API in WebTestClient to test SSE [SPR-17248] #21781

spring-projects-issues opened this issue Sep 5, 2018 · 9 comments
Assignees
Labels
in: test status: declined type: enhancement

Comments

@spring-projects-issues
Copy link
Collaborator

@spring-projects-issues spring-projects-issues commented Sep 5, 2018

Bernd Kolb opened SPR-17248 and commented

Summary
As I user I want to test a continuous, infinite stream of events. I have a controller that provides a Flux<ServerSentEvent.

Actual Behavior
WebTestClient does not provide api to test such a scenario. WebTestClient.returnResult's documentation states that is should be usable in in such a scenario, however in order to call this method, I have to call exchange first. The implementation of exchange internally however blocks until a timeout is reached.

Expected Behavior
In addition to exchange, there should a method similar to ResponseSpec.returnResult which can be used with a StepVerifier.


Affects: 5.0.8

Referenced from: pull request #1949

@spring-projects-issues
Copy link
Collaborator Author

@spring-projects-issues spring-projects-issues commented Sep 6, 2018

Rossen Stoyanchev commented

The exchange() method blocks only to get the ClientResponse which contains the status and headers, but the body is yet to be read via one of the body methods on the response. Here is one streaming test. The response content is not yet read when the status and headers are asserted, not even when FluxExchangeResult is returned. It's only in the StepVerifier that the actual content is consumed.

It's true that a retrieve method next to exchange in WebTestClient could be used as a kind of shortcut, if you don't want to assert the status and go straight to the response content, but for that it should cover all options like WebClient does with the ResponseSpec it returns.

@spring-projects-issues
Copy link
Collaborator Author

@spring-projects-issues spring-projects-issues commented Oct 12, 2018

Rossen Stoyanchev commented

Resolving for now.

@spring-projects-issues spring-projects-issues added status: declined in: test type: enhancement labels Jan 11, 2019
@MderM
Copy link

@MderM MderM commented Aug 2, 2019

Rossen Stoyanchev commented

Here is one streaming test.

I see two problems here. The linked test class doesn't test the streaming method 'Flux getPersonStream()'.

And even then, as this issue states, the exchange method blocks forever in some circumstance, one of them being returning an processor in a controller method with no events happened. Everything works well if the stream immediately returns some data.
For now I did not found any help and am working with 'warmup' events in my tests.

I do not think, the issue is resolved after all.

@rstoyanchev
Copy link
Contributor

@rstoyanchev rstoyanchev commented Aug 2, 2019

The linked test class doesn't test the streaming method Flux getPersonStream().

I just verified with a debug point that it does call Flux<Person> getPersonStream(), so I'm not sure what you mean.

even then, as this issue states, the exchange method blocks forever in some circumstance

As I mentioned before it only blocks for the status and headers, and not for the body, so I don't follow this either. Please provide a sample to demonstrate the issue.

@MderM
Copy link

@MderM MderM commented Aug 8, 2019

Example for blocking WebTestClient:

RestController:

package com.example;

import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.ReplayProcessor;

@RestController
public class ExampleController {

    private final FluxSink<ServerSentEvent<String>> dataSink;
    private final ReplayProcessor<ServerSentEvent<String>> processor;

    public ExampleController() {
        this.processor = ReplayProcessor.create(100, false);
        this.dataSink = this.processor.sink();
    }

    @PutMapping("/updateExample")
    public void update(String content) {
        this.dataSink.next(
                ServerSentEvent.<String>builder()
                        .id("id")
                        .event("message")
                        .data(content)
                        .build());
    }

    @GetMapping("/subscribeExample")
    public Flux<ServerSentEvent<String>> subscribe(String subscriptionID) {
        return this.processor;
    }

}

TestCode:

package com.example;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.test.web.reactive.server.FluxExchangeResult;
import org.springframework.test.web.reactive.server.WebTestClient;
import reactor.test.StepVerifier;

import java.time.Duration;

@WebFluxTest(controllers = ExampleController.class)
public class ExampleTests {

    @Autowired
    private WebTestClient webClient;

    private static final ParameterizedTypeReference<ServerSentEvent<String>> typeRef = new ParameterizedTypeReference<>() {};

    @Test
    public void subscribeTest() {
        // uncomment next line to make the test pass
        // webClient.put().uri("/updateExample").body(BodyInserters.fromObject("")).exchange().expectStatus().isOk();
        FluxExchangeResult<ServerSentEvent<String>> result = webClient.get().uri("/subscribeExample")
                .accept(MediaType.valueOf(MediaType.TEXT_EVENT_STREAM_VALUE))
                .exchange()
                .expectStatus().isOk()
                .expectHeader().contentTypeCompatibleWith(MediaType.TEXT_EVENT_STREAM)
                .returnResult(typeRef);
        StepVerifier.create(result.getResponseBody())
                .expectSubscription()
                .expectNoEvent(Duration.ofSeconds(5))
                .thenCancel()
                .verify();
    }

}

This test leads to the blocking timeout exception. If you uncomment the webClient.put() call, everything works. If that behaviour is intended, do you have any advise where to find detailed documentation for how to use WebTestClient with SSE?

@MderM
Copy link

@MderM MderM commented Sep 12, 2019

Is there any chance for this being worked on? SSE subscribes with returned (processor)flux that doesn't have data yet seems a valid use case (at least it's a pattern we at work use quite often) for me.
The problem is that there won't be any response if the flux has no data, so that exchange() runs into timeouts.

@rstoyanchev
Copy link
Contributor

@rstoyanchev rstoyanchev commented Sep 13, 2019

@MderM, so the server is not producing any data to start and the test client eventually times out? You can change the responseTimeout on WebTestClient.Builder. By default it is set to 5 seconds.

@MderM
Copy link

@MderM MderM commented Sep 13, 2019

@rstoyanchev I tuned the timeout before, but that didn't change anything. It is not possible to check if there was no data until now (at least not without some not so nice tricks), but yeah, I found workarounds for all my testcases, but they're just that: workarounds. I seperated out business logic from the controller to a service to be able to handle and test the Flux behaviour directly.

But it would be really nice to get the Flux through the webtestclient without the exchange to do st like

StepVerifier.create(responseFlux).
...
...
.testThatNoDataFlowedTillNow ()
.then(() -> inject data)
.expectNext ()
.andSoOn

But yeah, I think it will work without some way or another. You just have to be careful with SSE in combination with ConnectedFlux/Processor.

@Nazjara
Copy link

@Nazjara Nazjara commented Jul 4, 2022

Please suggest what I'm missing here.
I'm also using SSE and exchange call is blocking the test so I never reach StepVerifier.
I'm using Spring MVC so my REST controller returns SseEmitter object that keeps an infinite stream, messages are streamed constantly every 1 second.
The idea is to verify the first few messages and cancel the subscription. Here's the test:

@Test
public void testSubscribeOnAlertsWithServerError()
{
    var exchangeResult = webTestClient.get()
            .uri("/someurl")
            .exchange()
            .returnResult(String.class);

    // I never reach that in debug
    StepVerifier.create(exchangeResult.getResponseBody())
            .expectNextCount(5)
            .thenCancel()
            .verify();
}

BTW it works fine in similar tests where the stream is completed at some point.
Probably this is not expected to work with Spring MVC, in this case, could you suggest a way to test such a scenario?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
in: test status: declined type: enhancement
Projects
None yet
Development

No branches or pull requests

4 participants