Skip to content

Commit

Permalink
Added tests for errors in the source stream
Browse files Browse the repository at this point in the history
This commit adds decoder/message-reader tests for errors in
the source data buffer publisher. Because the tests extend
AbstractDataBufferAllocatingTestCase, they also check whether
the buffers that precede the error in the stream are properly
released.

Issue: SPR-17025
  • Loading branch information
poutsma committed Sep 5, 2018
1 parent 196c0ad commit 259b2ca
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 18 deletions.
Expand Up @@ -29,9 +29,7 @@
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.util.MimeTypeUtils;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;

/**
* @author Arjen Poutsma
Expand Down Expand Up @@ -67,6 +65,21 @@ public void decode() {
.verify();
}

@Test
public void decodeError() {
DataBuffer fooBuffer = stringBuffer("foo");
Flux<DataBuffer> source =
Flux.just(fooBuffer).mergeWith(Flux.error(new RuntimeException()));
Flux<byte[]> output = this.decoder.decode(source,
ResolvableType.forClassWithGenerics(Publisher.class, byte[].class),
null, Collections.emptyMap());

StepVerifier.create(output)
.consumeNextWith(bytes -> assertArrayEquals("foo".getBytes(), bytes))
.expectError()
.verify();
}

@Test
public void decodeToMono() {
DataBuffer fooBuffer = stringBuffer("foo");
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,8 +30,7 @@
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.util.MimeTypeUtils;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;

/**
* @author Sebastien Deleuze
Expand Down Expand Up @@ -65,6 +64,21 @@ public void decode() {
.verify();
}

@Test
public void decodeError() {
DataBuffer fooBuffer = stringBuffer("foo");
Flux<DataBuffer> source =
Flux.just(fooBuffer).mergeWith(Flux.error(new RuntimeException()));
Flux<ByteBuffer> output = this.decoder.decode(source,
ResolvableType.forClassWithGenerics(Publisher.class, ByteBuffer.class),
null, Collections.emptyMap());

StepVerifier.create(output)
.expectNext(ByteBuffer.wrap("foo".getBytes()))
.expectError()
.verify();
}

@Test
public void decodeToMono() {
DataBuffer fooBuffer = stringBuffer("foo");
Expand Down
Expand Up @@ -32,7 +32,7 @@
import org.springframework.util.StreamUtils;

import static org.junit.Assert.*;
import static org.springframework.core.ResolvableType.*;
import static org.springframework.core.ResolvableType.forClass;

/**
* @author Arjen Poutsma
Expand Down Expand Up @@ -73,4 +73,19 @@ public void decode() {
.verify();
}

@Test
public void decodeError() {
DataBuffer fooBuffer = stringBuffer("foo");
Flux<DataBuffer> source =
Flux.just(fooBuffer).mergeWith(Flux.error(new RuntimeException()));


Flux<Resource> result = this.decoder
.decode(source, forClass(Resource.class), null, Collections.emptyMap());

StepVerifier.create(result)
.expectError()
.verify();
}

}
Expand Up @@ -173,6 +173,22 @@ public void decodeEmptyDataBuffer() {

}

@Test
public void decodeError() {
DataBuffer fooBuffer = stringBuffer("foo\n");
Flux<DataBuffer> source =
Flux.just(fooBuffer).mergeWith(Flux.error(new RuntimeException()));

Flux<String> output = this.decoder.decode(source,
ResolvableType.forClass(String.class), null, Collections.emptyMap());

StepVerifier.create(output)
.expectNext("foo")
.expectError()
.verify();

}

@Test
public void decodeToMono() {
Flux<DataBuffer> source = Flux.just(stringBuffer("foo"), stringBuffer("bar"), stringBuffer("baz"));
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,8 +20,14 @@
import java.util.Map;

import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
Expand All @@ -34,7 +40,7 @@
/**
* @author Sebastien Deleuze
*/
public class FormHttpMessageReaderTests {
public class FormHttpMessageReaderTests extends AbstractDataBufferAllocatingTestCase {

private final FormHttpMessageReader reader = new FormHttpMessageReader();

Expand Down Expand Up @@ -96,8 +102,25 @@ public void readFormAsFlux() {
assertNull("Invalid result", result.getFirst("name 3"));
}

@Test
public void readFormError() {
DataBuffer fooBuffer = stringBuffer("name=value");
Flux<DataBuffer> body =
Flux.just(fooBuffer).mergeWith(Flux.error(new RuntimeException()));
MockServerHttpRequest request = request(body);

Flux<MultiValueMap<String, String>> result = this.reader.read(null, request, null);
StepVerifier.create(result)
.expectError()
.verify();
}


private MockServerHttpRequest request(String body) {
return request(Mono.just(stringBuffer(body)));
}

private MockServerHttpRequest request(Publisher<? extends DataBuffer> body) {
return MockServerHttpRequest
.method(HttpMethod.GET, "/")
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_FORM_URLENCODED_VALUE)
Expand Down
Expand Up @@ -21,10 +21,12 @@

import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.MediaType;
import org.springframework.http.codec.json.Jackson2JsonDecoder;
import org.springframework.mock.http.server.reactive.test.MockServerHttpRequest;
Expand Down Expand Up @@ -57,9 +59,10 @@ public void canRead() {

@Test
public void readServerSentEvents() {
MockServerHttpRequest request = MockServerHttpRequest.post("/").body(
"id:c42\nevent:foo\nretry:123\n:bla\n:bla bla\n:bla bla bla\ndata:bar\n\n" +
"id:c43\nevent:bar\nretry:456\ndata:baz\n\n");
MockServerHttpRequest request = MockServerHttpRequest.post("/")
.body(Mono.just(stringBuffer(
"id:c42\nevent:foo\nretry:123\n:bla\n:bla bla\n:bla bla bla\ndata:bar\n\n" +
"id:c43\nevent:bar\nretry:456\ndata:baz\n\n")));

Flux<ServerSentEvent> events = this.messageReader
.read(ResolvableType.forClassWithGenerics(ServerSentEvent.class, String.class),
Expand Down Expand Up @@ -117,8 +120,9 @@ public void readServerSentEventsWithMultipleChunks() {

@Test
public void readString() {
String body = "data:foo\ndata:bar\n\ndata:baz\n\n";
MockServerHttpRequest request = MockServerHttpRequest.post("/").body(body);

MockServerHttpRequest request = MockServerHttpRequest.post("/")
.body(Mono.just(stringBuffer("data:foo\ndata:bar\n\ndata:baz\n\n")));

Flux<String> data = messageReader.read(ResolvableType.forClass(String.class),
request, Collections.emptyMap()).cast(String.class);
Expand All @@ -132,9 +136,10 @@ public void readString() {

@Test
public void readPojo() {
MockServerHttpRequest request = MockServerHttpRequest.post("/").body(
"data:{\"foo\": \"foofoo\", \"bar\": \"barbar\"}\n\n" +
"data:{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}\n\n");
MockServerHttpRequest request = MockServerHttpRequest.post("/")
.body(Mono.just(stringBuffer(
"data:{\"foo\": \"foofoo\", \"bar\": \"barbar\"}\n\n" +
"data:{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}\n\n")));

Flux<Pojo> data = messageReader.read(ResolvableType.forClass(Pojo.class), request,
Collections.emptyMap()).cast(Pojo.class);
Expand All @@ -155,7 +160,8 @@ public void readPojo() {
@Test // SPR-15331
public void decodeFullContentAsString() {
String body = "data:foo\ndata:bar\n\ndata:baz\n\n";
MockServerHttpRequest request = MockServerHttpRequest.post("/").body(body);
MockServerHttpRequest request = MockServerHttpRequest.post("/")
.body(Mono.just(stringBuffer(body)));

String actual = messageReader
.readMono(ResolvableType.forClass(String.class), request, Collections.emptyMap())
Expand All @@ -165,4 +171,25 @@ public void decodeFullContentAsString() {
assertEquals(body, actual);
}

@Test
public void readError() {

Flux<DataBuffer> body =
Flux.just(stringBuffer("data:foo\ndata:bar\n\ndata:baz\n\n"))
.mergeWith(Flux.error(new RuntimeException()));

MockServerHttpRequest request = MockServerHttpRequest.post("/")
.body(body);

Flux<String> data = messageReader.read(ResolvableType.forClass(String.class),
request, Collections.emptyMap()).cast(String.class);

StepVerifier.create(data)
.expectNextMatches(elem -> elem.equals("foo\nbar"))
.expectNextMatches(elem -> elem.equals("baz"))
.expectError()
.verify();
}


}

0 comments on commit 259b2ca

Please sign in to comment.