Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class FailedRequestException extends RuntimeException {
*/
public FailedRequestException(HttpResponseException e, URI uri) {
super(
String.format("HTTP request failed uri: %s %s", uri, e.getMessage())
String.format("HTTP request of %s failed with %d: %s", uri, e.getStatusCode(), e.getMessage())
);
this.uri = uri;
this.statusCode = e.getStatusCode();
Expand All @@ -28,7 +28,7 @@ public FailedRequestException(HttpResponseException e, URI uri) {
*/
public FailedRequestException(HttpResponseStatus status, URI uri, String msg) {
super(
String.format("HTTP request failed uri: %s %s", uri, msg)
String.format("HTTP request of %s failed with %s: %s", uri, status, msg)
);
this.uri = uri;
this.statusCode = status.code();
Expand Down
121 changes: 62 additions & 59 deletions src/main/java/me/itzg/helpers/http/FetchBuilderBase.java
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
package me.itzg.helpers.http;

import static io.netty.handler.codec.http.HttpHeaderNames.ACCEPT;
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.handler.codec.http.HttpStatusClass;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.BiConsumer;
import lombok.extern.slf4j.Slf4j;
import me.itzg.helpers.errors.GenericException;
import me.itzg.helpers.json.ObjectMappers;
import org.apache.hc.client5.http.HttpResponseException;
import org.apache.hc.client5.http.classic.HttpClient;
import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.classic.methods.HttpHead;
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.message.BasicHttpRequest;
import org.slf4j.Logger;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.http.client.HttpClientResponse;

@Slf4j
public class FetchBuilderBase<SELF extends FetchBuilderBase<SELF>> {
Expand Down Expand Up @@ -58,30 +61,24 @@ public OutputToDirectoryFetchBuilder toDirectory(Path directory) {
return new OutputToDirectoryFetchBuilder(this.state, directory);
}

/**
* NOTE: this will set expected content types to application/json
*/
public <T> ObjectFetchBuilder<T> toObject(Class<T> type) {
return new ObjectFetchBuilder<>(this.state, type);
acceptContentTypes(Collections.singletonList("application/json"));
return new ObjectFetchBuilder<>(this.state, type, false, ObjectMappers.defaultMapper());
}

/**
* NOTE: this will set expected content types to application/json
*/
public <T> ObjectListFetchBuilder<T> toObjectList(Class<T> type) {
return new ObjectListFetchBuilder<>(this.state, type);
acceptContentTypes(Collections.singletonList("application/json"));
return new ObjectListFetchBuilder<>(this.state, type, ObjectMappers.defaultMapper());
}

public <T> ObjectFetchBuilder<T> toObject(Class<T> type, ObjectMapper objectMapper) {
return new ObjectFetchBuilder<>(this.state, type, objectMapper);
}

protected HttpGet get() throws IOException {
final HttpGet request = new HttpGet(state.uri);
configureRequest(request);
return request;
}

protected HttpHead head(boolean withConfigure) throws IOException {
final HttpHead request = new HttpHead(state.uri);
if (withConfigure) {
configureRequest(request);
}
return request;
return new ObjectFetchBuilder<>(this.state, type, false, objectMapper);
}

protected URI uri() {
Expand Down Expand Up @@ -117,14 +114,8 @@ protected interface PreparedFetchUser<R> {
R use(SharedFetch sharedFetch) throws IOException;
}

protected interface ClientUser<R> {
R use(HttpClient client) throws IOException;
}

/**
* Intended to be called by subclass specific <code>execute</code> methods.
* @param user provided either a multi-request {@link SharedFetch} or an instance scoped to this call.
* Either way, {@link SharedFetch#getClient()} can be used to execute requests.
*/
protected <R> R usePreparedFetch(PreparedFetchUser<R> user) throws IOException {
if (state.sharedFetch != null) {
Expand All @@ -147,43 +138,55 @@ protected <R> R usePreparedFetch(PreparedFetchUser<R> user) throws IOException {
}
}

protected static BiConsumer<? super HttpClientRequest, ? super Connection> debugLogRequest(
Logger log, String operation
) {
return (req, connection) ->
log.debug("{}: uri={} headers={}",
operation.toUpperCase(), req.resourceUrl(), req.requestHeaders()
);
}

protected <R> Mono<R> failedRequestMono(HttpClientResponse resp, String description) {
return Mono.error(new FailedRequestException(resp.status(), uri(), description));
}

protected static boolean notSuccess(HttpClientResponse resp) {
return HttpStatusClass.valueOf(resp.status().code()) != HttpStatusClass.SUCCESS;
}

/**
* Intended to be called by subclass specific <code>execute</code> methods.
* This is a convenience version of {@link #usePreparedFetch(PreparedFetchUser)}
* that provides just the {@link HttpClient} to execute requests.
* @return false if response content type is not one of the expected content types,
* but true if no expected content types
*/
protected <R> R useClient(ClientUser<R> user) throws IOException {
return usePreparedFetch(sharedFetch ->
user.use(sharedFetch.getClient())
);
}
protected boolean notExpectedContentType(HttpClientResponse resp) {
final List<String> contentTypes = getAcceptContentTypes();
if (contentTypes != null && !contentTypes.isEmpty()) {
final List<String> respTypes = resp.responseHeaders()
.getAll(CONTENT_TYPE);

protected void configureRequest(BasicHttpRequest request) throws IOException {
if (state.acceptContentTypes != null) {
for (final String type : state.acceptContentTypes) {
request.addHeader(HttpHeaders.ACCEPT, type);
}
return respTypes.stream().noneMatch(contentTypes::contains);
}
return false;
}

for (final Entry<String, String> entry : state.requestHeaders.entrySet()) {
request.addHeader(entry.getKey(), entry.getValue());
}
// and apply shared headers that weren't overridden by per-request
if (state.sharedFetch != null) {
for (final Entry<String, String> entry : state.sharedFetch.getHeaders().entrySet()) {
if (!state.requestHeaders.containsKey(entry.getKey())) {
request.addHeader(entry.getKey(), entry.getValue());
}
}
}
protected <R> Mono<R> failedContentTypeMono(HttpClientResponse resp) {
return Mono.error(new GenericException(
String.format("Unexpected content type in response. Expected '%s' but got '%s'",
getAcceptContentTypes(), resp.responseHeaders()
.getAll(CONTENT_TYPE)
)));
}

protected static BiConsumer<? super HttpClientRequest, ? super Connection> debugLogRequest(
Logger log, String operation
) {
return (req, connection) ->
log.debug("{}: uri={} headers={}",
operation.toUpperCase(), req.resourceUrl(), req.requestHeaders()
protected void applyHeaders(io.netty.handler.codec.http.HttpHeaders headers) {
final List<String> contentTypes = getAcceptContentTypes();
if (contentTypes != null && !contentTypes.isEmpty()) {
headers.set(
ACCEPT.toString(),
contentTypes
);
}

state.requestHeaders.forEach(headers::set);
}
}
92 changes: 53 additions & 39 deletions src/main/java/me/itzg/helpers/http/ObjectFetchBuilder.java
Original file line number Diff line number Diff line change
@@ -1,74 +1,88 @@
package me.itzg.helpers.http;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.handler.codec.http.HttpHeaderNames;
import com.fasterxml.jackson.databind.ObjectReader;
import java.io.IOException;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import me.itzg.helpers.errors.GenericException;
import me.itzg.helpers.json.ObjectMappers;
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.message.BasicHttpRequest;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.netty.ByteBufMono;
import reactor.netty.http.client.HttpClientResponse;

@Slf4j
public class ObjectFetchBuilder<T> extends FetchBuilderBase<ObjectFetchBuilder<T>> {

private final Class<T> type;
private final ObjectMapper objectMapper;
private final boolean listOf;
private final ObjectReader reader;

ObjectFetchBuilder(State state, Class<T> type, ObjectMapper objectMapper) {
protected ObjectFetchBuilder(State state, Class<T> type, boolean listOf, ObjectMapper objectMapper) {
super(state);
this.type = type;
this.objectMapper = objectMapper;
this.listOf = listOf;
if (listOf) {
reader = objectMapper.readerForListOf(type);
}
else {
reader = objectMapper.readerFor(type);
}
}

ObjectFetchBuilder(State state, Class<T> type) {
this(state, type, ObjectMappers.defaultMapper());
public T execute() throws IOException {
return assemble().block();
}

@Override
protected void configureRequest(BasicHttpRequest request) throws IOException {
super.configureRequest(request);
request.addHeader(HttpHeaders.ACCEPT, "application/json");
public Mono<T> assemble() throws IOException {
return assembleCommon();
}

public T execute() throws IOException {
return assemble().block();
protected Mono<List<T>> assembleToList() throws IOException {
return assembleCommon();
}

public Mono<T> assemble() throws IOException {
private <R> Mono<R> assembleCommon() throws IOException {
return usePreparedFetch(sharedFetch ->
sharedFetch.getReactiveClient()
.headers(headers ->
headers.set(HttpHeaderNames.ACCEPT, "application/json")
)
.headers(this::applyHeaders)
.followRedirect(true)
.doOnRequest(debugLogRequest(log, "json fetch"))
.get()
.uri(uri())
.responseContent()
.aggregate()
.asInputStream()
.publishOn(Schedulers.boundedElastic())
.flatMap(inputStream -> {
.responseSingle(this::handleResponse)
);
}

private <R> Mono<R> handleResponse(HttpClientResponse resp, ByteBufMono bodyMono) {
if (notSuccess(resp)) {
return failedRequestMono(resp, "Fetching object content");
}
if (notExpectedContentType(resp)) {
return failedContentTypeMono(resp);
}

return bodyMono.asInputStream()
.publishOn(Schedulers.boundedElastic())
.flatMap(inputStream -> {
try {
try {
try {
return Mono.just(objectMapper.readValue(inputStream, type));
} catch (IOException e) {
return Mono.error(new GenericException("Failed to parse response body into " + type, e));
}
return Mono.just(reader.readValue(inputStream));
} catch (IOException e) {
return Mono.error(new GenericException(
"Failed to parse response body into " +
(listOf ? "list of " + type : type),
e
));
}
finally {
try {
//noinspection BlockingMethodInNonBlockingContext
inputStream.close();
} catch (IOException e) {
log.warn("Unable to close body input stream", e);
}
} finally {
try {
//noinspection BlockingMethodInNonBlockingContext
inputStream.close();
} catch (IOException e) {
log.warn("Unable to close body input stream", e);
}
})
);
}
});
}

}
27 changes: 8 additions & 19 deletions src/main/java/me/itzg/helpers/http/ObjectListFetchBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,24 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.List;
import me.itzg.helpers.json.ObjectMappers;
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.message.BasicHttpRequest;
import reactor.core.publisher.Mono;

public class ObjectListFetchBuilder<T> extends FetchBuilderBase<ObjectListFetchBuilder<T>> {

private final Class<T> type;
private final ObjectMapper objectMapper;
private final ObjectFetchBuilder<T> delegate;

ObjectListFetchBuilder(State state, Class<T> type, ObjectMapper objectMapper) {
super(state);
this.type = type;
this.objectMapper = objectMapper;
}

ObjectListFetchBuilder(State state, Class<T> type) {
this(state, type, ObjectMappers.defaultMapper());
}

@Override
protected void configureRequest(BasicHttpRequest request) throws IOException {
super.configureRequest(request);
request.addHeader(HttpHeaders.ACCEPT, "application/json");
delegate = new ObjectFetchBuilder<>(state, type, true, objectMapper);
}

public List<T> execute() throws IOException {
return useClient(client ->
client.execute(get(), new ObjectListMapperHandler<>(type, objectMapper))
);
return delegate.assembleToList()
.block();
}

public Mono<List<T>> assemble() throws IOException {
return delegate.assembleToList();
}
}
24 changes: 0 additions & 24 deletions src/main/java/me/itzg/helpers/http/ObjectListMapperHandler.java

This file was deleted.

Loading