Skip to content

Commit

Permalink
ISPN-12918 Rest stats requests block the Netty IO thread
Browse files Browse the repository at this point in the history
Process metrics endpoint requests on a blocking executor thread.
  • Loading branch information
danberindei committed Apr 8, 2021
1 parent eecaa17 commit 5591086
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 36 deletions.
Expand Up @@ -129,7 +129,7 @@ protected void startInternal() {
resourceManager.registerResource(restContext, new SearchAdminResource(invocationHelper));
resourceManager.registerResource(restContext, new TasksResource(invocationHelper));
resourceManager.registerResource(restContext, new ProtobufResource(invocationHelper));
resourceManager.registerResource(rootContext, new MetricsResource(auth.metricsAuth()));
resourceManager.registerResource(rootContext, new MetricsResource(auth.metricsAuth(), invocationHelper.getExecutor()));
Path staticResources = configuration.staticResources();
if (staticResources != null) {
Path console = configuration.staticResources().resolve("console");
Expand Down
@@ -1,7 +1,6 @@
package org.infinispan.rest.resources;

import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.infinispan.commons.dataconversion.MediaType.APPLICATION_JSON_TYPE;
import static org.infinispan.commons.dataconversion.MediaType.APPLICATION_OPENMETRICS_TYPE;
import static org.infinispan.commons.dataconversion.MediaType.TEXT_PLAIN_TYPE;
Expand All @@ -11,7 +10,9 @@
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;

import org.infinispan.rest.NettyRestResponse;
Expand Down Expand Up @@ -41,10 +42,13 @@ public final class MetricsResource implements ResourceHandler {

private final MetricsRequestHandler requestHandler = new MetricsRequestHandler();
private final boolean auth;
private final Executor blockingExecutor;

public MetricsResource(boolean auth) {
registerBaseMetrics();
public MetricsResource(boolean auth, Executor blockingExecutor) {
this.auth = auth;
this.blockingExecutor = blockingExecutor;

registerBaseMetrics();
}

// this is kept separate just in case Quarkus needs to replace it with nil
Expand All @@ -67,41 +71,49 @@ public Invocations getInvocations() {
}

private CompletionStage<RestResponse> metrics(RestRequest restRequest) {
try {
List<String> accept = restRequest.headers(HttpHeaderNames.ACCEPT.toString());

// provide defaults for missing ACCEPT header (based on http method)
if (restRequest.method() == GET) {
if (accept.isEmpty()) {
// default to OpenMetrics (Prometheus) if nothing specified
accept = Collections.singletonList(TEXT_PLAIN_TYPE);
} else {
// to handle OpenMetrics we need to swap it to "text/plain" so smallrye can recognize it
accept = accept.stream()
.map(h -> h.startsWith(APPLICATION_OPENMETRICS_TYPE) ? TEXT_PLAIN_TYPE : h)
.collect(Collectors.toList());
}
} else if (restRequest.method() == OPTIONS) {
if (accept.isEmpty()) {
accept = Collections.singletonList(APPLICATION_JSON_TYPE);
}
}
List<String> accept = responseMediaTypes(restRequest);

CompletableFuture<RestResponse> cf = CompletableFuture.supplyAsync(() -> {
RestResponseBuilder<NettyRestResponse.Builder> builder = new NettyRestResponse.Builder();

requestHandler.handleRequest(restRequest.path(), restRequest.method().name(),
accept.stream(), (status, message, headers) -> {
builder.status(status).entity(message);
for (String header : headers.keySet()) {
builder.header(header, headers.get(header));
}
});

return completedFuture(builder.build());
} catch (Exception e) {
RestResponseBuilder<NettyRestResponse.Builder> builder = new NettyRestResponse.Builder()
.status(INTERNAL_SERVER_ERROR).entity(e.getMessage());
return completedFuture(builder.build());
try {
requestHandler.handleRequest(restRequest.path(), restRequest.method().name(),
accept.stream(), (status, message, headers) -> {
builder.status(status).entity(message);
for (String header : headers.keySet()) {
builder.header(header, headers.get(header));
}
});

return builder.build();
} catch (Exception e) {
RestResponseBuilder<NettyRestResponse.Builder> errorBuilder = new NettyRestResponse.Builder()
.status(INTERNAL_SERVER_ERROR).entity(e.getMessage());
return errorBuilder.build();
}
}, blockingExecutor);
return cf;
}

private List<String> responseMediaTypes(RestRequest restRequest) {
List<String> accept = restRequest.headers(HttpHeaderNames.ACCEPT.toString());

// provide defaults for missing ACCEPT header (based on http method)
if (restRequest.method() == GET) {
if (accept.isEmpty()) {
// default to OpenMetrics (Prometheus) if nothing specified
accept = Collections.singletonList(TEXT_PLAIN_TYPE);
} else {
// to handle OpenMetrics we need to swap it to "text/plain" so smallrye can recognize it
accept = accept.stream()
.map(h -> h.startsWith(APPLICATION_OPENMETRICS_TYPE) ? TEXT_PLAIN_TYPE : h)
.collect(Collectors.toList());
}
} else if (restRequest.method() == OPTIONS) {
if (accept.isEmpty()) {
accept = Collections.singletonList(APPLICATION_JSON_TYPE);
}
}
return accept;
}
}

0 comments on commit 5591086

Please sign in to comment.