Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into chunked_response_me…
Browse files Browse the repository at this point in the history
…trics
  • Loading branch information
amunra committed Oct 23, 2023
2 parents 3a3d542 + ad5603c commit 6bfaf19
Show file tree
Hide file tree
Showing 22 changed files with 643 additions and 145 deletions.
11 changes: 11 additions & 0 deletions core/src/main/java/io/questdb/Metrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import io.questdb.cairo.TableWriterMetrics;
import io.questdb.cairo.wal.WalMetrics;
import io.questdb.metrics.WorkerMetrics;
import io.questdb.cutlass.http.processors.JsonQueryMetrics;
import io.questdb.cutlass.line.LineMetrics;
import io.questdb.cutlass.pgwire.PGWireMetrics;
Expand All @@ -49,6 +50,7 @@ public class Metrics implements Scrapable {
private final VirtualLongGauge.StatProvider jvmTotalMemRef = runtime::totalMemory;
private final TableWriterMetrics tableWriter;
private final WalMetrics walMetrics;
private final WorkerMetrics workerMetrics;

public Metrics(boolean enabled, MetricsRegistry metricsRegistry) {
this.enabled = enabled;
Expand All @@ -61,6 +63,7 @@ public Metrics(boolean enabled, MetricsRegistry metricsRegistry) {
this.walMetrics = new WalMetrics(metricsRegistry);
createMemoryGauges(metricsRegistry);
this.metricsRegistry = metricsRegistry;
this.workerMetrics = new WorkerMetrics(metricsRegistry);
}

public static Metrics disabled() {
Expand Down Expand Up @@ -107,6 +110,14 @@ public WalMetrics walMetrics() {
return walMetrics;
}

public WorkerMetrics workerMetrics() {
return workerMetrics;
}

void addScrapable(Scrapable scrapable){
metricsRegistry.addScrapable(scrapable);
}

private void createMemoryGauges(MetricsRegistry metricsRegistry) {
for (int i = 0; i < MemoryTag.SIZE; i++) {
metricsRegistry.newLongGauge(i);
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/java/io/questdb/ServerMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public ServerMain(final Bootstrap bootstrap) {
final boolean walSupported = config.getCairoConfiguration().isWalSupported();
final boolean isReadOnly = config.getCairoConfiguration().isReadOnlyInstance();
final boolean walApplyEnabled = config.getCairoConfiguration().isWalApplyEnabled();
workerPoolManager = new WorkerPoolManager(config, metrics.health()) {
workerPoolManager = new WorkerPoolManager(config, metrics) {
@Override
protected void configureSharedPool(WorkerPool sharedPool) {
try {
Expand All @@ -116,8 +116,9 @@ protected void configureSharedPool(WorkerPool sharedPool) {
sharedPool.assign(walPurgeJob);
sharedPool.freeOnExit(walPurgeJob);

// wal apply job in the shared pool when there is no dedicated pool
if (walApplyEnabled && !config.getWalApplyPoolConfiguration().isEnabled()) {
setupWalApplyJob(sharedPool, engine, getSharedWorkerCount());
setupWalApplyJob(sharedPool, engine, sharedPool.getWorkerCount());
}
}

Expand Down Expand Up @@ -151,7 +152,7 @@ protected void configureSharedPool(WorkerPool sharedPool) {
if (walApplyEnabled && !isReadOnly && walSupported && config.getWalApplyPoolConfiguration().isEnabled()) {
WorkerPool walApplyWorkerPool = workerPoolManager.getInstance(
config.getWalApplyPoolConfiguration(),
metrics.health(),
metrics,
WorkerPoolManager.Requester.WAL_APPLY
);
setupWalApplyJob(walApplyWorkerPool, engine, workerPoolManager.getSharedWorkerCount());
Expand Down
24 changes: 19 additions & 5 deletions core/src/main/java/io/questdb/WorkerPoolManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,29 +26,33 @@

import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.metrics.HealthMetrics;
import io.questdb.metrics.Scrapable;
import io.questdb.mp.Worker;
import io.questdb.mp.WorkerPool;
import io.questdb.mp.WorkerPoolConfiguration;
import io.questdb.std.CharSequenceObjHashMap;
import io.questdb.std.ObjList;
import io.questdb.std.str.CharSink;
import io.questdb.std.str.DirectUtf8CharSink;
import org.jetbrains.annotations.NotNull;

import java.util.concurrent.atomic.AtomicBoolean;

public abstract class WorkerPoolManager {
public abstract class WorkerPoolManager implements Scrapable {

private static final Log LOG = LogFactory.getLog(WorkerPoolManager.class);
protected final WorkerPool sharedPool;
private final AtomicBoolean closed = new AtomicBoolean();
private final CharSequenceObjHashMap<WorkerPool> dedicatedPools = new CharSequenceObjHashMap<>(4);
private final AtomicBoolean running = new AtomicBoolean();
private final WorkerPool sharedPool;

public WorkerPoolManager(ServerConfiguration config, HealthMetrics metrics) {
public WorkerPoolManager(ServerConfiguration config, Metrics metrics) {
sharedPool = new WorkerPool(config.getWorkerPoolConfiguration(), metrics);
configureSharedPool(sharedPool); // abstract method giving callers the chance to assign jobs
metrics.addScrapable(this);
}

public WorkerPool getInstance(@NotNull WorkerPoolConfiguration config, @NotNull HealthMetrics metrics, @NotNull Requester requester) {
public WorkerPool getInstance(@NotNull WorkerPoolConfiguration config, @NotNull Metrics metrics, @NotNull Requester requester) {
if (running.get() || closed.get()) {
throw new IllegalStateException("can only get instance before start");
}
Expand Down Expand Up @@ -122,6 +126,16 @@ public void start(Log sharedPoolLog) {
}
}

@Override
public void scrapeIntoPrometheus(DirectUtf8CharSink sink) {
long now = Worker.CLOCK_MICROS.getTicks();
sharedPool.updateWorkerMetrics(now);
ObjList<CharSequence> poolNames = dedicatedPools.keys();
for (int i = 0, limit = poolNames.size(); i < limit; i++) {
dedicatedPools.get(poolNames.getQuick(i)).updateWorkerMetrics(now);
}
}

/**
* @param sharedPool A reference to the SHARED pool
*/
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/java/io/questdb/cutlass/Services.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public static HttpServer createHttpServer(
return createHttpServer(
configuration,
cairoEngine,
workerPoolManager.getInstance(configuration, metrics.health(), Requester.HTTP_SERVER),
workerPoolManager.getInstance(configuration, metrics, Requester.HTTP_SERVER),
workerPoolManager.getSharedWorkerCount(),
metrics
);
Expand Down Expand Up @@ -130,12 +130,12 @@ public static LineTcpReceiver createLineTcpReceiver(

final WorkerPool ioPool = workerPoolManager.getInstance(
config.getIOWorkerPoolConfiguration(),
metrics.health(),
metrics,
Requester.LINE_TCP_IO
);
final WorkerPool writerPool = workerPoolManager.getInstance(
config.getWriterWorkerPoolConfiguration(),
metrics.health(),
metrics,
Requester.LINE_TCP_WRITER
);
return new LineTcpReceiver(config, cairoEngine, ioPool, writerPool);
Expand Down Expand Up @@ -175,7 +175,7 @@ public static HttpServer createMinHttpServer(
// - SHARED otherwise
final WorkerPool workerPool = workerPoolManager.getInstance(
configuration,
metrics.health(),
metrics,
Requester.HTTP_MIN_SERVER
);
return createMinHttpServer(configuration, cairoEngine, workerPool, metrics);
Expand Down Expand Up @@ -238,7 +238,7 @@ public static PGWireServer createPGWireServer(
// - SHARED otherwise
final WorkerPool workerPool = workerPoolManager.getInstance(
configuration,
metrics.health(),
metrics,
Requester.PG_WIRE_SERVER
);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2023 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/

package io.questdb.griffin.engine.functions.groupby;

import io.questdb.cairo.ArrayColumnTypes;
import io.questdb.cairo.ColumnType;
import io.questdb.cairo.map.MapValue;
import io.questdb.cairo.sql.Function;
import io.questdb.cairo.sql.Record;
import io.questdb.griffin.engine.functions.GroupByFunction;
import io.questdb.griffin.engine.functions.StrFunction;
import io.questdb.griffin.engine.functions.UnaryFunction;
import io.questdb.std.str.StringSink;
import org.jetbrains.annotations.NotNull;

public class FirstStrGroupByFunction extends StrFunction implements GroupByFunction, UnaryFunction {
private final Function arg;
private final StringSink sink = new StringSink();
private int valueIndex;

public FirstStrGroupByFunction(@NotNull Function arg) {
this.arg = arg;
}

@Override
public void clear() {
sink.clear();
}

@Override
public CharSequence getStr(Record rec) {
boolean hasStr = rec.getBool(valueIndex);
return hasStr ? sink : null;
}

@Override
public CharSequence getStrB(Record rec) {
return getStr(rec);
}

@Override
public boolean isConstant() {
return false;
}

@Override
public boolean isScalar() {
return false;
}

@Override
public void pushValueTypes(ArrayColumnTypes columnTypes) {
this.valueIndex = columnTypes.getColumnCount();
columnTypes.add(ColumnType.INT);
}

@Override
public void computeFirst(MapValue mapValue, Record record) {
CharSequence str = arg.getStr(record);
sink.clear();
if (null != str) {
mapValue.putBool(this.valueIndex, true);
sink.put(str);
} else {
mapValue.putBool(this.valueIndex, false);
}
}

@Override
public void computeNext(MapValue mapValue, Record record) {
// empty
}

@Override
public Function getArg() {
return this.arg;
}

@Override
public String getName() {
return "first";
}

@Override
public void setNull(MapValue mapValue) {
mapValue.putBool(valueIndex, false);
sink.clear();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2023 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/

package io.questdb.griffin.engine.functions.groupby;

import io.questdb.cairo.CairoConfiguration;
import io.questdb.cairo.sql.Function;
import io.questdb.griffin.FunctionFactory;
import io.questdb.griffin.SqlExecutionContext;
import io.questdb.std.IntList;
import io.questdb.std.ObjList;

public class FirstStrGroupByFunctionFactory implements FunctionFactory {
@Override
public String getSignature() {
return "first(S)";
}

@Override
public boolean isGroupBy() {
return true;
}

@Override
public Function newInstance(int position, ObjList<Function> args, IntList argPositions, CairoConfiguration configuration, SqlExecutionContext sqlExecutionContext) {
return new FirstStrGroupByFunction(args.getQuick(0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,25 @@
*
******************************************************************************/

package io.questdb.mp;
package io.questdb.griffin.engine.functions.groupby;

@FunctionalInterface
public interface WorkerCleaner {
void run(Throwable ex);
import io.questdb.cairo.map.MapValue;
import io.questdb.cairo.sql.Function;
import io.questdb.cairo.sql.Record;
import org.jetbrains.annotations.NotNull;

public class LastStrGroupByFunction extends FirstStrGroupByFunction {
public LastStrGroupByFunction(@NotNull Function arg) {
super(arg);
}

@Override
public void computeNext(MapValue mapValue, Record record) {
computeFirst(mapValue, record);
}

@Override
public String getName() {
return "last";
}
}

0 comments on commit 6bfaf19

Please sign in to comment.