Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions core/src/main/java/io/questdb/PropServerConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ public class PropServerConfiguration implements ServerConfiguration {
private final String acceptingWrites;
private final ObjObjHashMap<ConfigPropertyKey, ConfigPropertyValue> allPairs = new ObjObjHashMap<>();
private final boolean allowTableRegistrySharedWrite;
private final boolean asyncMunmapEnabled;
private final DateFormat backupDirTimestampFormat;
private final int backupMkdirMode;
private final String backupRoot;
Expand Down Expand Up @@ -1526,6 +1527,10 @@ public PropServerConfiguration(

this.writerMixedIOEnabled = getBoolean(properties, env, PropertyKey.DEBUG_CAIRO_ALLOW_MIXED_IO, ff.allowMixedIO(this.dbRoot));
this.fileDescriptorCacheEnabled = getBoolean(properties, env, PropertyKey.CAIRO_FILE_DESCRIPTOR_CACHE_ENABLED, true);
this.asyncMunmapEnabled = getBoolean(properties, env, PropertyKey.CAIRO_FILE_ASYNC_MUNMAP_ENABLED, false);
if (asyncMunmapEnabled && Os.isWindows()) {
throw new ServerConfigurationException("Async munmap is not supported on Windows");
}

this.inputFormatConfiguration = new InputFormatConfiguration(
DateFormatFactory.INSTANCE,
Expand Down Expand Up @@ -2977,6 +2982,11 @@ public boolean getAllowTableRegistrySharedWrite() {
return allowTableRegistrySharedWrite;
}

@Override
public boolean getAsyncMunmapEnabled() {
return asyncMunmapEnabled;
}

@Override
public @NotNull String getAttachPartitionSuffix() {
return cairoAttachPartitionSuffix;
Expand Down
1 change: 1 addition & 0 deletions core/src/main/java/io/questdb/PropertyKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,7 @@ public enum PropertyKey implements ConfigPropertyKey {
CAIRO_SQL_COLUMN_ALIAS_EXPRESSION_ENABLED("cairo.sql.column.alias.expression.enabled"),
CAIRO_SQL_COLUMN_ALIAS_GENERATED_MAX_SIZE("cairo.sql.column.alias.generated.max.size"),
CAIRO_FILE_DESCRIPTOR_CACHE_ENABLED("cairo.file.descriptor.cache.enabled"),
CAIRO_FILE_ASYNC_MUNMAP_ENABLED("cairo.file.async.munmap.enabled"),
CAIRO_RESOURCE_POOL_TRACING_ENABLED("cairo.resource.pool.tracing.enabled");

private static final Map<String, PropertyKey> nameMapping;
Expand Down
17 changes: 10 additions & 7 deletions core/src/main/java/io/questdb/ServerMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@
import io.questdb.mp.WorkerPool;
import io.questdb.mp.WorkerPoolUtils;
import io.questdb.std.Chars;
import io.questdb.std.Files;
import io.questdb.std.Misc;
import io.questdb.std.MmapCache;
import io.questdb.std.datetime.Clock;
import io.questdb.std.filewatch.FileWatcher;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -186,6 +188,13 @@ public void close() {
}
}

public long getActiveConnectionCount(String processorName) {
if (httpServer == null) {
return 0;
}
return httpServer.getActiveConnectionTracker().getActiveConnections(processorName);
}

public ServerConfiguration getConfiguration() {
return bootstrap.getConfiguration();
}
Expand All @@ -204,13 +213,6 @@ public int getHttpServerPort() {
throw CairoException.nonCritical().put("http server is not running");
}

public long getActiveConnectionCount(String processorName) {
if (httpServer == null) {
return 0;
}
return httpServer.getActiveConnectionTracker().getActiveConnections(processorName);
}

public int getPgWireServerPort() {
if (pgServer != null) {
return pgServer.getPort();
Expand Down Expand Up @@ -294,6 +296,7 @@ protected void configureWorkerPools(final WorkerPool sharedPoolQuery, final Work
if (engineMaintenanceJob != null) {
sharedPoolWrite.assign(engineMaintenanceJob);
}
WorkerPoolUtils.setupAsyncMunmapJob(sharedPoolQuery, engine);
WorkerPoolUtils.setupQueryJobs(sharedPoolQuery, engine);

if (!config.getCairoConfiguration().isReadOnlyInstance()) {
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/java/io/questdb/cairo/CairoConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ default String getArchivedCrashFilePrefix() {
return "crash+";
}

boolean getAsyncMunmapEnabled();

@NotNull
String getAttachPartitionSuffix();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ public boolean getAllowTableRegistrySharedWrite() {
return getDelegate().getAllowTableRegistrySharedWrite();
}

@Override
public boolean getAsyncMunmapEnabled() {
return getDelegate().getAsyncMunmapEnabled();
}

@Override
public @NotNull String getAttachPartitionSuffix() {
return getDelegate().getAttachPartitionSuffix();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ public boolean getAllowTableRegistrySharedWrite() {
return false;
}

@Override
public boolean getAsyncMunmapEnabled() {
return false;
}

@Override
public @NotNull String getAttachPartitionSuffix() {
return TableUtils.ATTACHABLE_DIR_MARKER;
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/java/io/questdb/mp/WorkerPoolUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,26 @@
import io.questdb.griffin.engine.groupby.GroupByMergeShardJob;
import io.questdb.griffin.engine.groupby.vect.GroupByVectorAggregateJob;
import io.questdb.griffin.engine.table.LatestByAllIndexedJob;
import io.questdb.std.AsyncMunmapJob;
import io.questdb.std.Files;
import io.questdb.std.Os;
import io.questdb.std.Rnd;
import io.questdb.std.datetime.Clock;

public class WorkerPoolUtils {

public static void setupAsyncMunmapJob(WorkerPool pool, CairoEngine engine) {
CairoConfiguration config = engine.getConfiguration();
if (config.getAsyncMunmapEnabled()) {
assert Os.isPosix();
Files.ASYNC_MUNMAP_ENABLED = true;
AsyncMunmapJob asyncMunmapJob = new AsyncMunmapJob();
pool.assign(asyncMunmapJob);
} else {
Files.ASYNC_MUNMAP_ENABLED = false;
}
}

public static void setupQueryJobs(
WorkerPool sharedPoolQuery,
CairoEngine cairoEngine
Expand Down
42 changes: 42 additions & 0 deletions core/src/main/java/io/questdb/std/AsyncMunmapJob.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2024 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/

package io.questdb.std;

import io.questdb.mp.Job;
import org.jetbrains.annotations.NotNull;

public final class AsyncMunmapJob implements Job {

private final MmapCache cache;

public AsyncMunmapJob() {
this.cache = Files.getMmapCache();
}

@Override
public boolean run(int workerId, @NotNull RunStatus runStatus) {
return cache.asyncMunmap();
}
}
9 changes: 7 additions & 2 deletions core/src/main/java/io/questdb/std/Files.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ public final class Files {
public static final Charset UTF_8;
public static final int WINDOWS_ERROR_FILE_EXISTS = 0x50;
private static final int VIRTIO_FS_MAGIC = 0x6a656a63;
private final static FdCache fdCache = new FdCache();
private static final MmapCache mmapCache = new MmapCache();
private static final FdCache fdCache = new FdCache();
private static final MmapCache mmapCache = MmapCache.INSTANCE;
public static boolean ASYNC_MUNMAP_ENABLED = false;
public static boolean FS_CACHE_ENABLED = true;
// To be set in tests to check every call for using OPEN file descriptor
public static boolean VIRTIO_FS_DETECTED = false;
Expand Down Expand Up @@ -229,6 +230,10 @@ public static long getLastModified(LPSZ lpsz) {
*/
public native static long getMapCountLimit();

public static MmapCache getMmapCache() {
return mmapCache;
}

public static long getMmapReuseCount() {
return mmapCache.getReuseCount();
}
Expand Down
96 changes: 88 additions & 8 deletions core/src/main/java/io/questdb/std/MmapCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,59 @@
package io.questdb.std;

import io.questdb.cairo.CairoException;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.mp.MCSequence;
import io.questdb.mp.MPSequence;
import io.questdb.mp.RingQueue;

/**
* Thread-safe cache for memory-mapped file regions with reference counting.
* Reuses existing mappings for the same file when possible to reduce system calls.
*/
public class MmapCache {
public final class MmapCache {
public static final MmapCache INSTANCE = new MmapCache();

private static final Log LOG = LogFactory.getLog(MmapCache.class);
private static final int MAX_RECORD_POOL_CAPACITY = 16 * 1024;
private static final int MUNMAP_QUEUE_CAPACITY = 8 * 1024;
private final LongObjHashMap<MmapCacheRecord> mmapAddrCache = new LongObjHashMap<>();
private final LongObjHashMap<MmapCacheRecord> mmapFileCache = new LongObjHashMap<>();
private final MCSequence munmapConsumerSequence;
private final MPSequence munmapProducesSequence;
private final RingQueue<MunmapTask> munmapTaskRingQueue;
private final ObjStack<MmapCacheRecord> recordPool = new ObjStack<>();
private long mmapReuseCount = 0;

private MmapCache() {
munmapTaskRingQueue = new RingQueue<>(MunmapTask::new, MUNMAP_QUEUE_CAPACITY);
munmapProducesSequence = new MPSequence(munmapTaskRingQueue.getCycle());
munmapConsumerSequence = new MCSequence(munmapTaskRingQueue.getCycle());
munmapProducesSequence.then(munmapConsumerSequence).then(munmapProducesSequence);
}

/**
* Process accumulated unmap requests.
* This method is not thread safe! It's meant to be called from a synchronized job - one per Server.
*
* @return true if at least one mapping was unmapped, false otherwise.
*/
public boolean asyncMunmap() {
boolean useful = false;
long cursor;
do {
cursor = munmapConsumerSequence.next();
if (cursor > -1) {
useful = true;
munmapTaskConsumer(munmapTaskRingQueue.get(cursor));
munmapConsumerSequence.done(cursor);
} else if (cursor == -2) {
Os.pause();
}
} while (cursor != -1);
return useful;
}

/**
* Maps file region into memory, reusing existing mapping if available.
*
Expand Down Expand Up @@ -302,15 +343,17 @@ private static long mremap0(int fd, long address, long previousSize, long newSiz
return address;
}

private static void unmap0(long address, long len, int memoryTag) {
int result = Files.munmap0(address, len);
private static void munmapTaskConsumer(MunmapTask task) {
int result = Files.munmap0(task.address, task.size);
if (result != -1) {
Unsafe.recordMemAlloc(-len, memoryTag);
Unsafe.recordMemAlloc(-task.size, task.memoryTag);
} else {
throw CairoException.critical(Os.errno())
.put("munmap failed [address=").put(address)
.put(", len=").put(len)
.put(", memoryTag=").put(memoryTag).put(']');
int errno = Os.errno();
LOG.critical().$("munmap failed [address=").$(task.address)
.$(", size=").$(task.size)
.$(", tag=").$(MemoryTag.nameOf(task.memoryTag))
.$(", errno=").$(errno)
.I$();
}
}

Expand All @@ -323,6 +366,37 @@ private MmapCacheRecord createMmapCacheRecord(int fd, long fileCacheKey, long le
return new MmapCacheRecord(fd, fileCacheKey, len, address, 1, memoryTag);
}

private void unmap0(long address, long len, int memoryTag) {
if (Files.ASYNC_MUNMAP_ENABLED) {
// sequence returning -2 -> we lost a CAS race. we do a cheap retry
// sequence returning -1 -> the queue is full. then it's cheaper to do the munmap ourserlves
long seq;
while ((seq = munmapProducesSequence.next()) == -2) {
Os.pause();
}

if (seq > -1) {
MunmapTask task = munmapTaskRingQueue.get(seq);
task.address = address;
task.size = len;
task.memoryTag = memoryTag;
munmapProducesSequence.done(seq);
return;
} else {
LOG.info().$("async munmap queue is full").$();
}
}
int result = Files.munmap0(address, len);
if (result != -1) {
Unsafe.recordMemAlloc(-len, memoryTag);
} else {
throw CairoException.critical(Os.errno())
.put("munmap failed [address=").put(address)
.put(", len=").put(len)
.put(", memoryTag=").put(memoryTag).put(']');
}
}

/**
* Cache record holding memory mapping details and reference count.
*/
Expand Down Expand Up @@ -352,4 +426,10 @@ public void of(int fd, long fileCacheKey, long len, long address, int count, int
this.memoryTag = memoryTag;
}
}

private static class MunmapTask {
private long address;
private int memoryTag;
private long size;
}
}
37 changes: 37 additions & 0 deletions core/src/test/java/io/questdb/test/ServerMainTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,42 @@ public void setUp() {
dbPath.parent().$();
}

@Test
public void testAsyncMunmap() throws Exception {
assertMemoryLeak(() -> {
try {
Map<String, String> env = new HashMap<>(System.getenv());
env.put(PropertyKey.CAIRO_FILE_ASYNC_MUNMAP_ENABLED.getEnvVarName(), "true");

Bootstrap bootstrap;
try {
bootstrap = new Bootstrap(
new DefaultBootstrapConfiguration() {
@Override
public Map<String, String> getEnv() {
return env;
}
},
getServerMainArgs()
);
} catch (Bootstrap.BootstrapException ex) {
if (!Os.isWindows()) {
throw ex;
}
TestUtils.assertContains(ex.getMessage(), "Async munmap is not supported on Windows");
return;
}

Assert.assertFalse(Os.isWindows());
try (final ServerMain serverMain = new ServerMain(bootstrap)) {
serverMain.start();
}
} finally {
Files.ASYNC_MUNMAP_ENABLED = false;
}
});
}

@Test
public void testPgWirePort() throws Exception {
assertMemoryLeak(() -> {
Expand Down Expand Up @@ -418,6 +454,7 @@ public void testShowParameters() throws Exception {
"cairo.wal.inactive.writer.ttl\tQDB_CAIRO_WAL_INACTIVE_WRITER_TTL\t120000\tdefault\tfalse\tfalse\n" +
"cairo.wal.max.lag.txn.count\tQDB_CAIRO_WAL_MAX_LAG_TXN_COUNT\t-1\tdefault\tfalse\tfalse\n" +
"cairo.wal.max.segment.file.descriptors.cache\tQDB_CAIRO_WAL_MAX_SEGMENT_FILE_DESCRIPTORS_CACHE\t30\tdefault\tfalse\tfalse\n" +
"cairo.file.async.munmap.enabled\tQDB_CAIRO_FILE_ASYNC_MUNMAP_ENABLED\tfalse\tdefault\tfalse\tfalse\n" +
"cairo.wal.max.lag.size\tQDB_CAIRO_WAL_MAX_LAG_SIZE\t78643200\tdefault\tfalse\tfalse\n" +
"cairo.wal.purge.interval\tQDB_CAIRO_WAL_PURGE_INTERVAL\t30000\tdefault\tfalse\tfalse\n" +
"cairo.wal.recreate.distressed.sequencer.attempts\tQDB_CAIRO_WAL_RECREATE_DISTRESSED_SEQUENCER_ATTEMPTS\t3\tdefault\tfalse\tfalse\n" +
Expand Down
Loading