Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(http): large prometheus metrics responses split across multiple chunks #3843

Merged
merged 36 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
fda1a5c
It ain't workin' yet, but it sure ain't more broken.
amunra Oct 12, 2023
ef08240
Got it working, leaving logging in for now while adding more test sce…
amunra Oct 13, 2023
7c69d96
moar testz
amunra Oct 13, 2023
51c3184
pooling RequestState objects
amunra Oct 13, 2023
3b02931
Parallel metrics requests
amunra Oct 13, 2023
0497804
Removed prometheus metrics processor logging.
amunra Oct 13, 2023
ce524e0
fixed a concurrency bug
amunra Oct 13, 2023
90eb037
formatting
amunra Oct 13, 2023
6bfc5ea
removal of unreachable code
amunra Oct 13, 2023
7fd596f
cleaned up HttpChunkedResponse 'writeBytes' method
amunra Oct 13, 2023
f641ce9
fixed potential overflow
amunra Oct 13, 2023
256b7c9
common request state pool for all prometheus processors
amunra Oct 16, 2023
6450385
Merge remote-tracking branch 'origin/master' into chunked_response_me…
amunra Oct 16, 2023
fc5c5cf
Removed unused import
amunra Oct 16, 2023
b91d9e2
Merge branch 'master' into chunked_response_metrics
amunra Oct 16, 2023
1792690
Merge remote-tracking branch 'origin/master' into chunked_response_me…
amunra Oct 20, 2023
0dc0687
Updated scrapable interface for metrics from CharSink to a new Utf8Di…
amunra Oct 20, 2023
bde01c1
comment tweak
amunra Oct 20, 2023
c0b762a
Fixed compile issues, improved naming.
amunra Oct 20, 2023
95fd287
feedback to use Files.PAGE_SIZE
amunra Oct 20, 2023
30a18cd
Merge remote-tracking branch 'origin/master' into chunked_response_me…
amunra Oct 20, 2023
6d76706
removed todo
amunra Oct 20, 2023
a5d6a54
extra tests
bluestreak01 Oct 20, 2023
3a3d542
test and logic improvements, but now getting 'HttpException: bad prot…
amunra Oct 20, 2023
6bfaf19
Merge remote-tracking branch 'origin/master' into chunked_response_me…
amunra Oct 23, 2023
672bf83
Fixed corner case where 'sendResponse' was being called repeatedly wi…
amunra Oct 25, 2023
1d46d9c
Fixed writeBytes assertion to disallow len==0 and prevent similar bug…
amunra Oct 25, 2023
1936ff5
Merge remote-tracking branch 'origin/master' into chunked_response_me…
amunra Oct 25, 2023
b5ddd63
Added assertions on the prometheus RequestState pool size.
amunra Oct 25, 2023
d7ea28c
fixed merge issue
amunra Oct 25, 2023
e23f817
code review improvements
amunra Oct 25, 2023
442e328
Merge branch 'master' into chunked_response_metrics
amunra Oct 26, 2023
194bae0
Merge branch 'master' into chunked_response_metrics
amunra Oct 26, 2023
14326f2
chore: small refactoring
bluestreak01 Oct 26, 2023
c97a5b0
Merge branch 'master' into chunked_response_metrics
bluestreak01 Oct 26, 2023
d33a981
chore: fix leak
bluestreak01 Oct 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions benchmarks/src/main/java/org/questdb/GCMetricsBenchmark.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
package org.questdb;

import io.questdb.metrics.GCMetrics;
import io.questdb.std.str.DirectCharSink;
import io.questdb.std.str.DirectByteCharSink;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.profile.GCProfiler;
import org.openjdk.jmh.runner.Runner;
Expand All @@ -41,7 +41,7 @@
public class GCMetricsBenchmark {

GCMetrics metrics = new GCMetrics();
DirectCharSink sink = new DirectCharSink(1024 * 1024);
DirectByteCharSink sink = new DirectByteCharSink(1024 * 1024);

public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
Expand Down
21 changes: 19 additions & 2 deletions benchmarks/src/main/java/org/questdb/MetricsScrapeBenchmark.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import io.questdb.metrics.MetricsRegistry;
import io.questdb.metrics.MetricsRegistryImpl;
import io.questdb.std.Sinkable;
import io.questdb.std.bytes.NativeByteSink;
import io.questdb.std.str.CharSink;
import io.questdb.std.str.DirectUtf8CharSink;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
Expand All @@ -46,7 +48,7 @@ public class MetricsScrapeBenchmark {
private static final MetricsRegistry metricsRegistry = new MetricsRegistryImpl();
private static final Counter counter = metricsRegistry.newCounter("counter");
private static final LongGauge gauge = metricsRegistry.newLongGauge("gauge");
private static final CharSink sink = new NullCharSink();
private static final DirectUtf8CharSink sink = new NullUtf8CharSink();

public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
Expand Down Expand Up @@ -81,7 +83,22 @@ public void testScrape() {
metricsRegistry.scrapeIntoPrometheus(sink);
}

private static class NullCharSink implements CharSink {
private static class NullUtf8CharSink implements DirectUtf8CharSink {

@Override
public NativeByteSink borrowDirectByteSink() {
return new NativeByteSink() {
@Override
public long ptr() {
return 0;
}

@Override
public void close() {

}
};
}

@Override
public int encodeSurrogate(char c, CharSequence in, int pos, int hi) {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/io/questdb/Metrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import io.questdb.std.MemoryTag;
import io.questdb.std.Os;
import io.questdb.std.Unsafe;
import io.questdb.std.str.CharSink;
import io.questdb.std.str.DirectUtf8CharSink;

public class Metrics implements Scrapable {
private final boolean enabled;
Expand Down Expand Up @@ -95,7 +95,7 @@ public PGWireMetrics pgWire() {
}

@Override
public void scrapeIntoPrometheus(CharSink sink) {
public void scrapeIntoPrometheus(DirectUtf8CharSink sink) {
metricsRegistry.scrapeIntoPrometheus(sink);
if (enabled) {
gcMetrics.scrapeIntoPrometheus(sink);
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/io/questdb/WorkerPoolManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.questdb.std.CharSequenceObjHashMap;
import io.questdb.std.ObjList;
import io.questdb.std.str.CharSink;
import io.questdb.std.str.DirectUtf8CharSink;
import org.jetbrains.annotations.NotNull;

import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -126,7 +127,7 @@ public void start(Log sharedPoolLog) {
}

@Override
public void scrapeIntoPrometheus(CharSink sink) {
public void scrapeIntoPrometheus(DirectUtf8CharSink sink) {
long now = Worker.CLOCK_MICROS.getTicks();
sharedPool.updateWorkerMetrics(now);
ObjList<CharSequence> poolNames = dedicatedPools.keys();
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/java/io/questdb/cutlass/Services.java
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ public HttpRequestProcessor newInstance() {
}
}, true);
if (metrics.isEnabled()) {
final PrometheusMetricsProcessor.RequestStatePool pool = new PrometheusMetricsProcessor.RequestStatePool(
configuration.getWorkerCount());
server.registerClosable(pool);
server.bind(new HttpRequestProcessorFactory() {
@Override
public String getUrl() {
Expand All @@ -216,7 +219,7 @@ public String getUrl() {

@Override
public HttpRequestProcessor newInstance() {
return new PrometheusMetricsProcessor(metrics, configuration);
return new PrometheusMetricsProcessor(metrics, configuration, pool);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,6 @@ public interface HttpChunkedResponseSocket extends CharSink {
void shutdownWrite();

void status(int status, CharSequence contentType);

int writeBytes(long srcAddr, int len);
}
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,15 @@ public void status(int status, CharSequence contentType) {
headerImpl.put("Content-Encoding: gzip").put(Misc.EOL);
}
}

@Override
public int writeBytes(long srcAddr, int len) {
assert len > 0;
len = (int) Math.min(len, buffer.getWriteNAvailable());
Vect.memcpy(buffer.getWriteAddress(len), srcAddr, len);
buffer.onWrite(len);
return len;
}
}

public class HttpRawSocketImpl implements HttpRawSocket {
Expand Down
11 changes: 7 additions & 4 deletions core/src/main/java/io/questdb/cutlass/http/HttpServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@
import io.questdb.MessageBus;
import io.questdb.Metrics;
import io.questdb.cairo.CairoEngine;
import io.questdb.cutlass.http.processors.StaticContentProcessor;
import io.questdb.cutlass.http.processors.TableStatusCheckProcessor;
import io.questdb.cutlass.http.processors.TextImportProcessor;
import io.questdb.cutlass.http.processors.TextQueryProcessor;
import io.questdb.cutlass.http.processors.*;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.mp.FanOut;
Expand All @@ -52,6 +49,7 @@ public class HttpServer implements Closeable {
private final HttpContextFactory httpContextFactory;
private final WaitProcessor rescheduleContext;
private final ObjList<HttpRequestProcessorSelectorImpl> selectors;
private final ObjList<Closeable> closeables = new ObjList<>();
private final int workerCount;

public HttpServer(
Expand Down Expand Up @@ -218,9 +216,14 @@ public void close() {
Misc.free(dispatcher);
Misc.free(rescheduleContext);
Misc.freeObjListAndClear(selectors);
Misc.freeObjListAndClear(closeables);
Misc.free(httpContextFactory);
}

public void registerClosable(Closeable closeable) {
closeables.add(closeable);
}

@FunctionalInterface
public interface HttpRequestProcessorBuilder {
HttpRequestProcessor newInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,42 +24,178 @@

package io.questdb.cutlass.http.processors;

import io.questdb.cutlass.http.HttpChunkedResponseSocket;
import io.questdb.cutlass.http.HttpConnectionContext;
import io.questdb.cutlass.http.HttpMinServerConfiguration;
import io.questdb.cutlass.http.HttpRequestProcessor;
import io.questdb.cutlass.http.*;
import io.questdb.metrics.Scrapable;
import io.questdb.network.PeerDisconnectedException;
import io.questdb.network.PeerIsSlowToReadException;
import io.questdb.std.Files;
import io.questdb.std.Mutable;
import io.questdb.std.ObjList;
import io.questdb.std.QuietCloseable;
import io.questdb.std.str.DirectByteCharSink;
import org.jetbrains.annotations.TestOnly;

public class PrometheusMetricsProcessor implements HttpRequestProcessor {
private static final CharSequence CONTENT_TYPE_TEXT = "text/plain; version=0.0.4; charset=utf-8";
private static final LocalValue<RequestState> LV = new LocalValue<>();
private final Scrapable metrics;
private final RequestStatePool pool;
private final boolean requiresAuthentication;

public PrometheusMetricsProcessor(Scrapable metrics, HttpMinServerConfiguration configuration) {
public PrometheusMetricsProcessor(Scrapable metrics, HttpMinServerConfiguration configuration, RequestStatePool pool) {
this.metrics = metrics;
this.requiresAuthentication = configuration.isHealthCheckAuthenticationRequired();
this.pool = pool;
}

@Override
public void onRequestComplete(HttpConnectionContext context) throws PeerDisconnectedException, PeerIsSlowToReadException {
HttpChunkedResponseSocket r = context.getChunkedResponseSocket();
final RequestState state = setupState(context);

// We double-buffer the metrics response.
// This is because we send back chunked responses and each chunk may not be large enough.
metrics.scrapeIntoPrometheus(state.sink);

final HttpChunkedResponseSocket r = context.getChunkedResponseSocket();
r.status(200, CONTENT_TYPE_TEXT);
r.sendHeader();
sendResponse(r, state);
}

metrics.scrapeIntoPrometheus(r);

r.done();
@Override
public boolean requiresAuthentication() {
return requiresAuthentication;
}

/**
* Continues after `PeerIsSlowToReadException` was thrown
* by `onRequestComplete` or earlier call to `resumeSend`.
*/
@Override
public void resumeSend(HttpConnectionContext context) throws PeerDisconnectedException, PeerIsSlowToReadException {
// Send the remainder of the current, partially sent, chunk.
context.resumeResponseSend();

// Send any remaining chunks, if any.
final RequestState state = LV.get(context);
assert state != null;
final HttpChunkedResponseSocket r = context.getChunkedResponseSocket();
sendResponse(r, state);
}

@Override
public boolean requiresAuthentication() {
return requiresAuthentication;
private void sendNextChunk(HttpChunkedResponseSocket r, RequestState state) throws PeerIsSlowToReadException, PeerDisconnectedException {
final int pending = state.countPending();
final int wrote = r.writeBytes(state.sink.ptr() + state.written, pending);
state.written += wrote;
r.sendChunk(wrote == pending); // Will raise `PeerIsSlowToReadException` if the tcp send buffer is full.
}

private void sendResponse(HttpChunkedResponseSocket r, RequestState state) throws PeerIsSlowToReadException, PeerDisconnectedException {
while (state.countPending() > 0) {
sendNextChunk(r, state);
}
}

private RequestState setupState(HttpConnectionContext context) {
RequestState state = LV.get(context);
if (state == null) {
state = pool.pop();
LV.set(context, state);
} else {
state.clear();
}
return state;
}

/**
* State for processing a single request across multiple response chunks.
* Each object is used for the lifetime of one request, then returned the pool.
*/
private static class RequestState implements QuietCloseable, Mutable {
/**
* Metrics serialization destination, sent into one or more chunks later.
*/
public final DirectByteCharSink sink = new DirectByteCharSink(Files.PAGE_SIZE);
private final RequestStatePool pool;
/**
* Total number of bytes written to one or more chunks (`HttpChunkedResponseSocket` objects).
*/
public int written = 0;

private RequestState(RequestStatePool pool) {
this.pool = pool;
}

@Override
public void clear() {
sink.clear();
written = 0;
}

/**
* Return to pool at the end of a request.
*/
@Override
public void close() {
clear();
pool.push(this);
}

/**
* Release off-heap buffer.
*/
public void free() {
sink.close();
}

/**
* Calculate the number of bytes that still need to be written to chunks.
*/
public int countPending() {
return sink.size() - written;
}
}

public static class RequestStatePool implements QuietCloseable {
private final ObjList<RequestState> objects = new ObjList<>();
private final int maxPoolSize;

public RequestStatePool(int maxPoolSize) {
assert maxPoolSize > 0;
this.maxPoolSize = maxPoolSize;
}

@TestOnly
public synchronized int size() {
return objects.size();
}

@Override
public void close() {
for (int i = 0, n = objects.size(); i < n; i++) {
objects.getQuick(i).free();
}
objects.clear();
}

public synchronized void push(RequestState requestState) {
if (objects.size() < maxPoolSize) {
objects.add(requestState);
} else {
requestState.free();
}
}

public synchronized RequestState pop() {
final RequestState state;
if (objects.size() > 0) {
final int last = objects.size() - 1;
amunra marked this conversation as resolved.
Show resolved Hide resolved
state = objects.getQuick(last);
objects.remove(last);
} else {
state = new RequestState(this);
}
return state;
}
}
}
4 changes: 2 additions & 2 deletions core/src/main/java/io/questdb/metrics/CounterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

package io.questdb.metrics;

import io.questdb.std.str.CharSink;
import io.questdb.std.str.DirectUtf8CharSink;

import java.util.concurrent.atomic.LongAdder;

Expand All @@ -49,7 +49,7 @@ public long getValue() {
}

@Override
public void scrapeIntoPrometheus(CharSink sink) {
public void scrapeIntoPrometheus(DirectUtf8CharSink sink) {
PrometheusFormatUtils.appendCounterType(name, sink);
PrometheusFormatUtils.appendCounterNamePrefix(name, sink);
PrometheusFormatUtils.appendSampleLineSuffix(sink, counter.longValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

package io.questdb.metrics;

import io.questdb.std.str.CharSink;
import io.questdb.std.str.DirectUtf8CharSink;

import java.util.concurrent.atomic.LongAdder;

Expand All @@ -50,7 +50,7 @@ public void inc(short label0) {
}

@Override
public void scrapeIntoPrometheus(CharSink sink) {
public void scrapeIntoPrometheus(DirectUtf8CharSink sink) {
PrometheusFormatUtils.appendCounterType(name, sink);
for (int i = 0, n = counters.length; i < n; i++) {
PrometheusFormatUtils.appendCounterNamePrefix(name, sink);
Expand Down