-
Notifications
You must be signed in to change notification settings - Fork 150
/
JettyReactiveHttpResponse.java
112 lines (97 loc) · 3.58 KB
/
JettyReactiveHttpResponse.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package reactivefeign.jetty.client;
import com.fasterxml.jackson.core.async_.JsonFactory;
import com.fasterxml.jackson.databind.ObjectReader;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.reactive.client.ContentChunk;
import org.reactivestreams.Publisher;
import reactivefeign.client.ReactiveHttpResponse;
import reactivejson.ReactorObjectReader;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Arrays.asList;
import static java.util.Optional.ofNullable;
import static org.eclipse.jetty.http.HttpHeader.CONTENT_TYPE;
class JettyReactiveHttpResponse implements ReactiveHttpResponse{
public static final String CHARSET_DELIMITER = ";charset=";
private final Response clientResponse;
private final Publisher<ContentChunk> contentChunks;
private final Class returnPublisherType;
private Class<?> returnActualClass;
private final ObjectReader objectReader;
private final JsonFactory jsonFactory;
JettyReactiveHttpResponse(Response clientResponse, Publisher<ContentChunk> contentChunks,
Class returnPublisherType, Class returnActualClass,
JsonFactory jsonFactory, ObjectReader objectReader) {
this.clientResponse = clientResponse;
this.contentChunks = contentChunks;
this.returnPublisherType = returnPublisherType;
this.returnActualClass = returnActualClass;
this.objectReader = objectReader;
this.jsonFactory = jsonFactory;
}
@Override
public int status() {
return clientResponse.getStatus();
}
@Override
public Map<String, List<String>> headers() {
return clientResponse.getHeaders().stream()
.collect(Collectors.toMap(HttpField::getName, field -> asList(field.getValues())));
}
@Override
public Publisher<?> body() {
ReactorObjectReader reactorObjectReader = new ReactorObjectReader(jsonFactory);
Flux<ByteBuffer> content = directContent();
if(returnActualClass == ByteBuffer.class){
return content;
} else if(returnActualClass.isAssignableFrom(String.class)
&& returnPublisherType == Mono.class){
Charset charset = getCharset();
return content.map(byteBuffer -> charset.decode(byteBuffer).toString());
} else {
if (returnPublisherType == Mono.class) {
return reactorObjectReader.read(content, objectReader);
} else if(returnPublisherType == Flux.class){
return reactorObjectReader.readElements(content, objectReader);
} else {
throw new IllegalArgumentException("Unknown returnPublisherType: " + returnPublisherType);
}
}
}
private Charset getCharset() {
return ofNullable(clientResponse.getHeaders().get(CONTENT_TYPE.asString()))
.map(header -> {
int pos = header.indexOf(CHARSET_DELIMITER);
if(pos >= 0){
return header.substring(pos + CHARSET_DELIMITER.length());
} else {
return null;
}
})
.map(Charset::forName)
.orElse(UTF_8);
}
private Flux<ByteBuffer> directContent() {
return Flux.from(contentChunks).map(contentChunk -> contentChunk.buffer.slice());
}
@Override
public Mono<byte[]> bodyData() {
return joinChunks();
}
private Mono<byte[]> joinChunks() {
return directContent().reduce(new ByteArrayOutputStream(), (baos, byteBuffer) -> {
for(int i = byteBuffer.position(), limit = byteBuffer.limit(); i < limit; i++){
baos.write(byteBuffer.get(i));
}
return baos;
}).map(ByteArrayOutputStream::toByteArray);
}
}