Skip to content

Commit

Permalink
fix(http): large prometheus metrics responses split across multiple c…
Browse files Browse the repository at this point in the history
…hunks (#3843)
  • Loading branch information
amunra committed Oct 26, 2023
1 parent efbc947 commit bd92d9b
Show file tree
Hide file tree
Showing 32 changed files with 507 additions and 95 deletions.
4 changes: 2 additions & 2 deletions benchmarks/src/main/java/org/questdb/GCMetricsBenchmark.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
package org.questdb;

import io.questdb.metrics.GCMetrics;
import io.questdb.std.str.DirectCharSink;
import io.questdb.std.str.DirectByteCharSink;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.profile.GCProfiler;
import org.openjdk.jmh.runner.Runner;
Expand All @@ -41,7 +41,7 @@
public class GCMetricsBenchmark {

GCMetrics metrics = new GCMetrics();
DirectCharSink sink = new DirectCharSink(1024 * 1024);
DirectByteCharSink sink = new DirectByteCharSink(1024 * 1024);

public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
Expand Down
21 changes: 19 additions & 2 deletions benchmarks/src/main/java/org/questdb/MetricsScrapeBenchmark.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import io.questdb.metrics.MetricsRegistry;
import io.questdb.metrics.MetricsRegistryImpl;
import io.questdb.std.Sinkable;
import io.questdb.std.bytes.NativeByteSink;
import io.questdb.std.str.CharSink;
import io.questdb.std.str.DirectUtf8CharSink;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
Expand All @@ -46,7 +48,7 @@ public class MetricsScrapeBenchmark {
private static final MetricsRegistry metricsRegistry = new MetricsRegistryImpl();
private static final Counter counter = metricsRegistry.newCounter("counter");
private static final LongGauge gauge = metricsRegistry.newLongGauge("gauge");
private static final CharSink sink = new NullCharSink();
private static final DirectUtf8CharSink sink = new NullUtf8CharSink();

public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
Expand Down Expand Up @@ -81,7 +83,22 @@ public void testScrape() {
metricsRegistry.scrapeIntoPrometheus(sink);
}

private static class NullCharSink implements CharSink {
private static class NullUtf8CharSink implements DirectUtf8CharSink {

@Override
public NativeByteSink borrowDirectByteSink() {
return new NativeByteSink() {
@Override
public long ptr() {
return 0;
}

@Override
public void close() {

}
};
}

@Override
public int encodeSurrogate(char c, CharSequence in, int pos, int hi) {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/io/questdb/Metrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import io.questdb.std.MemoryTag;
import io.questdb.std.Os;
import io.questdb.std.Unsafe;
import io.questdb.std.str.CharSink;
import io.questdb.std.str.DirectUtf8CharSink;

public class Metrics implements Scrapable {
private final boolean enabled;
Expand Down Expand Up @@ -95,7 +95,7 @@ public PGWireMetrics pgWire() {
}

@Override
public void scrapeIntoPrometheus(CharSink sink) {
public void scrapeIntoPrometheus(DirectUtf8CharSink sink) {
metricsRegistry.scrapeIntoPrometheus(sink);
if (enabled) {
gcMetrics.scrapeIntoPrometheus(sink);
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/io/questdb/WorkerPoolManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.questdb.std.CharSequenceObjHashMap;
import io.questdb.std.ObjList;
import io.questdb.std.str.CharSink;
import io.questdb.std.str.DirectUtf8CharSink;
import org.jetbrains.annotations.NotNull;

import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -126,7 +127,7 @@ public void start(Log sharedPoolLog) {
}

@Override
public void scrapeIntoPrometheus(CharSink sink) {
public void scrapeIntoPrometheus(DirectUtf8CharSink sink) {
long now = Worker.CLOCK_MICROS.getTicks();
sharedPool.updateWorkerMetrics(now);
ObjList<CharSequence> poolNames = dedicatedPools.keys();
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/java/io/questdb/cutlass/Services.java
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,10 @@ public HttpRequestProcessor newInstance() {
}
}, true);
if (metrics.isEnabled()) {
final PrometheusMetricsProcessor.RequestStatePool pool = new PrometheusMetricsProcessor.RequestStatePool(
configuration.getWorkerCount()
);
server.registerClosable(pool);
server.bind(new HttpRequestProcessorFactory() {
@Override
public String getUrl() {
Expand All @@ -216,7 +220,7 @@ public String getUrl() {

@Override
public HttpRequestProcessor newInstance() {
return new PrometheusMetricsProcessor(metrics, configuration);
return new PrometheusMetricsProcessor(metrics, configuration, pool);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,6 @@ public interface HttpChunkedResponseSocket extends CharSink {
void shutdownWrite();

void status(int status, CharSequence contentType);

int writeBytes(long srcAddr, int len);
}
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,15 @@ public void status(int status, CharSequence contentType) {
headerImpl.put("Content-Encoding: gzip").put(Misc.EOL);
}
}

@Override
public int writeBytes(long srcAddr, int len) {
assert len > 0;
len = (int) Math.min(len, buffer.getWriteNAvailable());
Vect.memcpy(buffer.getWriteAddress(len), srcAddr, len);
buffer.onWrite(len);
return len;
}
}

public class HttpRawSocketImpl implements HttpRawSocket {
Expand Down
11 changes: 7 additions & 4 deletions core/src/main/java/io/questdb/cutlass/http/HttpServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@
import io.questdb.MessageBus;
import io.questdb.Metrics;
import io.questdb.cairo.CairoEngine;
import io.questdb.cutlass.http.processors.StaticContentProcessor;
import io.questdb.cutlass.http.processors.TableStatusCheckProcessor;
import io.questdb.cutlass.http.processors.TextImportProcessor;
import io.questdb.cutlass.http.processors.TextQueryProcessor;
import io.questdb.cutlass.http.processors.*;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.mp.FanOut;
Expand All @@ -52,6 +49,7 @@ public class HttpServer implements Closeable {
private final HttpContextFactory httpContextFactory;
private final WaitProcessor rescheduleContext;
private final ObjList<HttpRequestProcessorSelectorImpl> selectors;
private final ObjList<Closeable> closeables = new ObjList<>();
private final int workerCount;

public HttpServer(
Expand Down Expand Up @@ -218,9 +216,14 @@ public void close() {
Misc.free(dispatcher);
Misc.free(rescheduleContext);
Misc.freeObjListAndClear(selectors);
Misc.freeObjListAndClear(closeables);
Misc.free(httpContextFactory);
}

public void registerClosable(Closeable closeable) {
closeables.add(closeable);
}

@FunctionalInterface
public interface HttpRequestProcessorBuilder {
HttpRequestProcessor newInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,42 +24,174 @@

package io.questdb.cutlass.http.processors;

import io.questdb.cutlass.http.HttpChunkedResponseSocket;
import io.questdb.cutlass.http.HttpConnectionContext;
import io.questdb.cutlass.http.HttpMinServerConfiguration;
import io.questdb.cutlass.http.HttpRequestProcessor;
import io.questdb.cutlass.http.*;
import io.questdb.metrics.Scrapable;
import io.questdb.network.PeerDisconnectedException;
import io.questdb.network.PeerIsSlowToReadException;
import io.questdb.std.*;
import io.questdb.std.str.DirectByteCharSink;
import org.jetbrains.annotations.TestOnly;

public class PrometheusMetricsProcessor implements HttpRequestProcessor {
private static final CharSequence CONTENT_TYPE_TEXT = "text/plain; version=0.0.4; charset=utf-8";
private static final LocalValue<RequestState> LV = new LocalValue<>();
private final Scrapable metrics;
private final RequestStatePool pool;
private final boolean requiresAuthentication;

public PrometheusMetricsProcessor(Scrapable metrics, HttpMinServerConfiguration configuration) {
public PrometheusMetricsProcessor(Scrapable metrics, HttpMinServerConfiguration configuration, RequestStatePool pool) {
this.metrics = metrics;
this.requiresAuthentication = configuration.isHealthCheckAuthenticationRequired();
this.pool = pool;
}

@Override
public void onRequestComplete(HttpConnectionContext context) throws PeerDisconnectedException, PeerIsSlowToReadException {
HttpChunkedResponseSocket r = context.getChunkedResponseSocket();
final RequestState state = setupState(context);

// We double-buffer the metrics response.
// This is because we send back chunked responses and each chunk may not be large enough.
metrics.scrapeIntoPrometheus(state.sink);

final HttpChunkedResponseSocket r = context.getChunkedResponseSocket();
r.status(200, CONTENT_TYPE_TEXT);
r.sendHeader();
sendResponse(r, state);
}

metrics.scrapeIntoPrometheus(r);

r.done();
@Override
public boolean requiresAuthentication() {
return requiresAuthentication;
}

/**
* Continues after `PeerIsSlowToReadException` was thrown
* by `onRequestComplete` or earlier call to `resumeSend`.
*/
@Override
public void resumeSend(HttpConnectionContext context) throws PeerDisconnectedException, PeerIsSlowToReadException {
// Send the remainder of the current, partially sent, chunk.
context.resumeResponseSend();

// Send any remaining chunks, if any.
final RequestState state = LV.get(context);
assert state != null;
final HttpChunkedResponseSocket r = context.getChunkedResponseSocket();
sendResponse(r, state);
}

@Override
public boolean requiresAuthentication() {
return requiresAuthentication;
private void sendNextChunk(HttpChunkedResponseSocket r, RequestState state) throws PeerIsSlowToReadException, PeerDisconnectedException {
final int pending = state.countPending();
final int wrote = r.writeBytes(state.sink.ptr() + state.written, pending);
state.written += wrote;
r.sendChunk(wrote == pending); // Will raise `PeerIsSlowToReadException` if the tcp send buffer is full.
}

private void sendResponse(HttpChunkedResponseSocket r, RequestState state) throws PeerIsSlowToReadException, PeerDisconnectedException {
while (state.countPending() > 0) {
sendNextChunk(r, state);
}
}

private RequestState setupState(HttpConnectionContext context) {
RequestState state = LV.get(context);
if (state == null) {
state = pool.pop();
LV.set(context, state);
} else {
state.clear();
}
return state;
}

/**
* State for processing a single request across multiple response chunks.
* Each object is used for the lifetime of one request, then returned the pool.
*/
private static class RequestState implements QuietCloseable, Mutable {
/**
* Metrics serialization destination, sent into one or more chunks later.
*/
public final DirectByteCharSink sink = new DirectByteCharSink(Files.PAGE_SIZE);
private final RequestStatePool pool;
/**
* Total number of bytes written to one or more chunks (`HttpChunkedResponseSocket` objects).
*/
public int written = 0;

private RequestState(RequestStatePool pool) {
this.pool = pool;
}

@Override
public void clear() {
sink.clear();
written = 0;
}

/**
* Return to pool at the end of a request.
*/
@Override
public void close() {
clear();
pool.push(this);
}

/**
* Release off-heap buffer.
*/
public void free() {
sink.close();
}

/**
* Calculate the number of bytes that still need to be written to chunks.
*/
public int countPending() {
return sink.size() - written;
}
}

public static class RequestStatePool implements QuietCloseable {
private final ObjList<RequestState> objects = new ObjList<>();
private final int maxPoolSize;

public RequestStatePool(int maxPoolSize) {
assert maxPoolSize > 0;
this.maxPoolSize = maxPoolSize;
}

@TestOnly
public synchronized int size() {
return objects.size();
}

@Override
public void close() {
for (int i = 0, n = objects.size(); i < n; i++) {
objects.getQuick(i).free();
}
objects.clear(); }

public synchronized void push(RequestState requestState) {
if (objects.size() < maxPoolSize) {
objects.add(requestState);
} else {
requestState.free();
}
}

public synchronized RequestState pop() {
final RequestState state;
if (objects.size() > 0) {
final int last = objects.size() - 1;
state = objects.getQuick(last);
objects.remove(last);
} else {
state = new RequestState(this);
}
return state;
}
}
}
4 changes: 2 additions & 2 deletions core/src/main/java/io/questdb/metrics/CounterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

package io.questdb.metrics;

import io.questdb.std.str.CharSink;
import io.questdb.std.str.DirectUtf8CharSink;

import java.util.concurrent.atomic.LongAdder;

Expand All @@ -49,7 +49,7 @@ public long getValue() {
}

@Override
public void scrapeIntoPrometheus(CharSink sink) {
public void scrapeIntoPrometheus(DirectUtf8CharSink sink) {
PrometheusFormatUtils.appendCounterType(name, sink);
PrometheusFormatUtils.appendCounterNamePrefix(name, sink);
PrometheusFormatUtils.appendSampleLineSuffix(sink, counter.longValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

package io.questdb.metrics;

import io.questdb.std.str.CharSink;
import io.questdb.std.str.DirectUtf8CharSink;

import java.util.concurrent.atomic.LongAdder;

Expand All @@ -50,7 +50,7 @@ public void inc(short label0) {
}

@Override
public void scrapeIntoPrometheus(CharSink sink) {
public void scrapeIntoPrometheus(DirectUtf8CharSink sink) {
PrometheusFormatUtils.appendCounterType(name, sink);
for (int i = 0, n = counters.length; i < n; i++) {
PrometheusFormatUtils.appendCounterNamePrefix(name, sink);
Expand Down

0 comments on commit bd92d9b

Please sign in to comment.