From 154249a174032781b6dd37c9b28abaf91218bb96 Mon Sep 17 00:00:00 2001 From: ideoma <2159629+ideoma@users.noreply.github.com> Date: Tue, 13 Feb 2024 19:34:53 +0000 Subject: [PATCH 1/5] chore(core): support text file _snapshot.txt file for snapshot identification --- .../cairo/DatabaseSnapshotAgentImpl.java | 63 ++++++++++--------- .../java/io/questdb/cairo/TableUtils.java | 30 +++++++-- 2 files changed, 60 insertions(+), 33 deletions(-) diff --git a/core/src/main/java/io/questdb/cairo/DatabaseSnapshotAgentImpl.java b/core/src/main/java/io/questdb/cairo/DatabaseSnapshotAgentImpl.java index f6b53a748fa5..0ec3f6976bd9 100644 --- a/core/src/main/java/io/questdb/cairo/DatabaseSnapshotAgentImpl.java +++ b/core/src/main/java/io/questdb/cairo/DatabaseSnapshotAgentImpl.java @@ -65,12 +65,12 @@ public class DatabaseSnapshotAgentImpl implements DatabaseSnapshotAgent, QuietCl private final WalWriterMetadata metadata; // protected with #lock private final StringSink nameSink = new StringSink(); // protected with #lock private final Path path = new Path(); // protected with #lock - private final GrowOnlyTableNameRegistryStore tableNameRegistryStore; // protected with #lock - private SimpleWaitingLock walPurgeJobRunLock = null; // used as a suspend/resume handler for the WalPurgeJob private final SymbolMapUtil symbolMapUtil = new SymbolMapUtil(); + private final GrowOnlyTableNameRegistryStore tableNameRegistryStore; // protected with #lock private ColumnVersionReader columnVersionReader = null; private TableReaderMetadata tableMetadata = null; private TxWriter txWriter = null; + private SimpleWaitingLock walPurgeJobRunLock = null; // used as a suspend/resume handler for the WalPurgeJob DatabaseSnapshotAgentImpl(CairoEngine engine) { this.engine = engine; @@ -111,32 +111,6 @@ public void setWalPurgeJobRunLock(@Nullable SimpleWaitingLock walPurgeJobRunLock this.walPurgeJobRunLock = walPurgeJobRunLock; } - void completeSnapshot() throws SqlException { - if (!lock.tryLock()) { - throw SqlException.position(0).put("Another snapshot command in progress"); - } - try { - // Delete snapshot/db directory. - path.of(configuration.getSnapshotRoot()).concat(configuration.getDbDirectory()).$(); - ff.rmdir(path); // it's fine to ignore errors here - - // Resume the WalPurgeJob - if (walPurgeJobRunLock != null) { - try { - walPurgeJobRunLock.unlock(); - } catch (IllegalStateException ignore) { - // not an error here - // completeSnapshot can be called several time in a row. - } - } - - // Reset snapshot in-flight flag. - inProgress.set(false); - } finally { - lock.unlock(); - } - } - private void rebuildSymbolFiles(Path tablePath, AtomicInteger recoveredSymbolFiles, int pathTableLen) { int denseSymbolIndex = 0; tablePath.trimTo(pathTableLen); @@ -356,6 +330,32 @@ void prepareSnapshot(SqlExecutionContext executionContext) throws SqlException { } } + void completeSnapshot() throws SqlException { + if (!lock.tryLock()) { + throw SqlException.position(0).put("Another snapshot command in progress"); + } + try { + // Delete snapshot/db directory. + path.of(configuration.getSnapshotRoot()).concat(configuration.getDbDirectory()).$(); + ff.rmdir(path); // it's fine to ignore errors here + + // Resume the WalPurgeJob + if (walPurgeJobRunLock != null) { + try { + walPurgeJobRunLock.unlock(); + } catch (IllegalStateException ignore) { + // not an error here + // completeSnapshot can be called several time in a row. + } + } + + // Reset snapshot in-flight flag. + inProgress.set(false); + } finally { + lock.unlock(); + } + } + void recoverSnapshot() { if (!configuration.isSnapshotRecoveryEnabled()) { return; @@ -390,7 +390,12 @@ void recoverSnapshot() { memFile.smallFile(ff, srcPath, MemoryTag.MMAP_DEFAULT); final CharSequence currentInstanceId = configuration.getSnapshotInstanceId(); - final CharSequence snapshotInstanceId = memFile.getStr(0); + CharSequence snapshotInstanceId = memFile.getStr(0); + if (Chars.empty(snapshotInstanceId)) { + srcPath.trimTo(snapshotRootLen).concat(TableUtils.SNAPSHOT_META_FILE_NAME_TXT).$(); + snapshotInstanceId = TableUtils.readText(ff, path); + } + if (Chars.empty(currentInstanceId) || Chars.empty(snapshotInstanceId) || Chars.equals(currentInstanceId, snapshotInstanceId)) { LOG.info() .$("skipping snapshot recovery [currentId=").$(currentInstanceId) diff --git a/core/src/main/java/io/questdb/cairo/TableUtils.java b/core/src/main/java/io/questdb/cairo/TableUtils.java index 2eb6a7b92737..5846d0e02258 100644 --- a/core/src/main/java/io/questdb/cairo/TableUtils.java +++ b/core/src/main/java/io/questdb/cairo/TableUtils.java @@ -45,10 +45,7 @@ import io.questdb.mp.MPSequence; import io.questdb.std.*; import io.questdb.std.datetime.millitime.MillisecondClock; -import io.questdb.std.str.CharSink; -import io.questdb.std.str.LPSZ; -import io.questdb.std.str.Path; -import io.questdb.std.str.Utf8Sequence; +import io.questdb.std.str.*; import io.questdb.tasks.O3PartitionPurgeTask; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -105,6 +102,7 @@ public final class TableUtils { public static final int MIN_INDEX_VALUE_BLOCK_SIZE = Numbers.ceilPow2(4); public static final int NULL_LEN = -1; public static final String SNAPSHOT_META_FILE_NAME = "_snapshot"; + public static final String SNAPSHOT_META_FILE_NAME_TXT = "_snapshot.txt"; public static final String SYMBOL_KEY_REMAP_FILE_SUFFIX = ".r"; public static final char SYSTEM_TABLE_NAME_SUFFIX = '~'; public static final int TABLE_DOES_NOT_EXIST = 1; @@ -1283,6 +1281,30 @@ public static String readTableName(Path path, int rootLen, MemoryCMR mem, FilesF } } + public static String readText(FilesFacade ff, Path path1) { + int fd = ff.openRO(path1); + long bytes = 0; + long length = 0; + if (fd > -1) { + try { + length = ff.length(fd); + if (length > 0) { + bytes = Unsafe.malloc(length, MemoryTag.NATIVE_DEFAULT); + if (ff.read(fd, bytes, length, 0) == length) { + return Utf8s.stringFromUtf8Bytes(bytes, bytes + length); + } + + } + } finally { + if (bytes != 0) { + Unsafe.free(bytes, length, MemoryTag.NATIVE_DEFAULT); + } + ff.close(fd); + } + } + return null; + } + public static void removeColumnFromMetadata( CharSequence columnName, LowerCaseCharSequenceIntHashMap columnNameIndexMap, From 4a2dcbbe01133832a25dce42babbe7e751e49837 Mon Sep 17 00:00:00 2001 From: ideoma <2159629+ideoma@users.noreply.github.com> Date: Wed, 14 Feb 2024 09:54:14 +0000 Subject: [PATCH 2/5] test --- .../cairo/DatabaseSnapshotAgentImpl.java | 2 +- .../io/questdb/test/griffin/SnapshotTest.java | 53 +++++++++++++++++++ 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/io/questdb/cairo/DatabaseSnapshotAgentImpl.java b/core/src/main/java/io/questdb/cairo/DatabaseSnapshotAgentImpl.java index 0ec3f6976bd9..c5d16cffdff0 100644 --- a/core/src/main/java/io/questdb/cairo/DatabaseSnapshotAgentImpl.java +++ b/core/src/main/java/io/questdb/cairo/DatabaseSnapshotAgentImpl.java @@ -393,7 +393,7 @@ void recoverSnapshot() { CharSequence snapshotInstanceId = memFile.getStr(0); if (Chars.empty(snapshotInstanceId)) { srcPath.trimTo(snapshotRootLen).concat(TableUtils.SNAPSHOT_META_FILE_NAME_TXT).$(); - snapshotInstanceId = TableUtils.readText(ff, path); + snapshotInstanceId = TableUtils.readText(ff, srcPath); } if (Chars.empty(currentInstanceId) || Chars.empty(snapshotInstanceId) || Chars.equals(currentInstanceId, snapshotInstanceId)) { diff --git a/core/src/test/java/io/questdb/test/griffin/SnapshotTest.java b/core/src/test/java/io/questdb/test/griffin/SnapshotTest.java index 2e1d22d5ef0d..f44f7f8ccbce 100644 --- a/core/src/test/java/io/questdb/test/griffin/SnapshotTest.java +++ b/core/src/test/java/io/questdb/test/griffin/SnapshotTest.java @@ -37,6 +37,7 @@ import io.questdb.mp.SOCountDownLatch; import io.questdb.mp.SimpleWaitingLock; import io.questdb.std.*; +import io.questdb.std.str.DirectUtf8Sink; import io.questdb.std.str.Path; import io.questdb.std.str.Utf8s; import io.questdb.test.AbstractCairoTest; @@ -210,6 +211,58 @@ public void testRecoverSnapshotRestoresDroppedColumns() throws Exception { }); } + @Test + public void testRecoverSnapshotSupportsSnapshotTxtFile() throws Exception { + final int partitionCount = 10; + final String snapshotId = "id1"; + final String restartedId = "id2"; + assertMemoryLeak(() -> { + setProperty(PropertyKey.CAIRO_SNAPSHOT_INSTANCE_ID, ""); + final String tableName = "t"; + ddl( + "create table " + tableName + " as " + + "(select x, timestamp_sequence(0, 100000000000) ts from long_sequence(" + partitionCount + ")) timestamp(ts) partition by day" + ); + + ddl("snapshot prepare"); + + insert( + "insert into " + tableName + + " select x+20 x, timestamp_sequence(100000000000, 100000000000) ts from long_sequence(3)" + ); + + // Release all readers and writers, but keep the snapshot dir around. + engine.clear(); + + // create snapshot.txt file + var ff = configuration.getFilesFacade(); + path.trimTo(rootLen).concat(TableUtils.SNAPSHOT_META_FILE_NAME_TXT); + int fd = ff.openRW(path.$(), configuration.getWriterFileOpenOpts()); + Assert.assertTrue(fd > 0); + + try { + try (DirectUtf8Sink utf8 = new DirectUtf8Sink(3)) { + utf8.put(snapshotId); + ff.write(fd, utf8.ptr(), utf8.size(), 0); + ff.truncate(fd, utf8.size()); + } + } finally { + ff.close(fd); + } + Assert.assertEquals(ff.length(path), restartedId.length()); + + setProperty(PropertyKey.CAIRO_SNAPSHOT_INSTANCE_ID, restartedId); + engine.recoverSnapshot(); + + // Data inserted after PREPARE SNAPSHOT should be discarded. + assertSql( + "count\n" + + partitionCount + "\n", + "select count() from " + tableName + ); + }); + } + @Test public void testRunWalPurgeJobLockTimeout() throws Exception { configureCircuitBreakerTimeoutOnFirstCheck(); // trigger timeout on first check From 0d17fb48d1ec9f566e3a5433817d31af1fe5987a Mon Sep 17 00:00:00 2001 From: ideoma <2159629+ideoma@users.noreply.github.com> Date: Wed, 14 Feb 2024 15:52:02 +0000 Subject: [PATCH 3/5] fix(core): fix snapshot restore to correctly restore tables when snapshot is taken under heavy ddl load --- .../cairo/DatabaseSnapshotAgentImpl.java | 127 +++++++----------- .../test/griffin/wal/SnapshotFuzzTest.java | 18 +-- 2 files changed, 56 insertions(+), 89 deletions(-) diff --git a/core/src/main/java/io/questdb/cairo/DatabaseSnapshotAgentImpl.java b/core/src/main/java/io/questdb/cairo/DatabaseSnapshotAgentImpl.java index f6b53a748fa5..d662a9a60271 100644 --- a/core/src/main/java/io/questdb/cairo/DatabaseSnapshotAgentImpl.java +++ b/core/src/main/java/io/questdb/cairo/DatabaseSnapshotAgentImpl.java @@ -65,12 +65,12 @@ public class DatabaseSnapshotAgentImpl implements DatabaseSnapshotAgent, QuietCl private final WalWriterMetadata metadata; // protected with #lock private final StringSink nameSink = new StringSink(); // protected with #lock private final Path path = new Path(); // protected with #lock - private final GrowOnlyTableNameRegistryStore tableNameRegistryStore; // protected with #lock - private SimpleWaitingLock walPurgeJobRunLock = null; // used as a suspend/resume handler for the WalPurgeJob private final SymbolMapUtil symbolMapUtil = new SymbolMapUtil(); + private final GrowOnlyTableNameRegistryStore tableNameRegistryStore; // protected with #lock private ColumnVersionReader columnVersionReader = null; private TableReaderMetadata tableMetadata = null; private TxWriter txWriter = null; + private SimpleWaitingLock walPurgeJobRunLock = null; // used as a suspend/resume handler for the WalPurgeJob DatabaseSnapshotAgentImpl(CairoEngine engine) { this.engine = engine; @@ -111,29 +111,21 @@ public void setWalPurgeJobRunLock(@Nullable SimpleWaitingLock walPurgeJobRunLock this.walPurgeJobRunLock = walPurgeJobRunLock; } - void completeSnapshot() throws SqlException { - if (!lock.tryLock()) { - throw SqlException.position(0).put("Another snapshot command in progress"); - } - try { - // Delete snapshot/db directory. - path.of(configuration.getSnapshotRoot()).concat(configuration.getDbDirectory()).$(); - ff.rmdir(path); // it's fine to ignore errors here - - // Resume the WalPurgeJob - if (walPurgeJobRunLock != null) { - try { - walPurgeJobRunLock.unlock(); - } catch (IllegalStateException ignore) { - // not an error here - // completeSnapshot can be called several time in a row. - } - } - - // Reset snapshot in-flight flag. - inProgress.set(false); - } finally { - lock.unlock(); + private static void copyOrError(Path srcPath, Path dstPath, FilesFacade ff, AtomicInteger counter, String fileName) { + srcPath.concat(fileName).$(); + dstPath.concat(fileName).$(); + if (ff.copy(srcPath, dstPath) < 0) { + LOG.error() + .$("could not copy ").$(fileName).$(" file [src=").$(srcPath) + .$(", dst=").$(dstPath) + .$(", errno=").$(ff.errno()) + .I$(); + } else { + counter.incrementAndGet(); + LOG.info() + .$("recovered ").$(fileName).$(" file [src=").$(srcPath) + .$(", dst=").$(dstPath) + .I$(); } } @@ -204,6 +196,32 @@ private void rebuildTableFiles(Path tablePath, AtomicInteger recoveredSymbolFile } } + void completeSnapshot() throws SqlException { + if (!lock.tryLock()) { + throw SqlException.position(0).put("Another snapshot command in progress"); + } + try { + // Delete snapshot/db directory. + path.of(configuration.getSnapshotRoot()).concat(configuration.getDbDirectory()).$(); + ff.rmdir(path); // it's fine to ignore errors here + + // Resume the WalPurgeJob + if (walPurgeJobRunLock != null) { + try { + walPurgeJobRunLock.unlock(); + } catch (IllegalStateException ignore) { + // not an error here + // completeSnapshot can be called several time in a row. + } + } + + // Reset snapshot in-flight flag. + inProgress.set(false); + } finally { + lock.unlock(); + } + } + void prepareSnapshot(SqlExecutionContext executionContext) throws SqlException { // Windows doesn't support sync() system call. if (Os.isWindows()) { @@ -447,60 +465,9 @@ void recoverSnapshot() { int srcPathLen = srcPath.size(); int dstPathLen = dstPath.size(); - srcPath.concat(TableUtils.META_FILE_NAME).$(); - dstPath.concat(TableUtils.META_FILE_NAME).$(); - if (ff.exists(srcPath) && ff.exists(dstPath)) { - if (ff.copy(srcPath, dstPath) < 0) { - LOG.error() - .$("could not copy _meta file [src=").$(srcPath) - .$(", dst=").$(dstPath) - .$(", errno=").$(ff.errno()) - .I$(); - } else { - recoveredMetaFiles.incrementAndGet(); - LOG.info() - .$("recovered _meta file [src=").$(srcPath) - .$(", dst=").$(dstPath) - .I$(); - } - } - - srcPath.trimTo(srcPathLen).concat(TableUtils.TXN_FILE_NAME).$(); - dstPath.trimTo(dstPathLen).concat(TableUtils.TXN_FILE_NAME).$(); - if (ff.exists(srcPath) && ff.exists(dstPath)) { - if (ff.copy(srcPath, dstPath) < 0) { - LOG.error() - .$("could not copy _txn file [src=").$(srcPath) - .$(", dst=").$(dstPath) - .$(", errno=").$(ff.errno()) - .I$(); - } else { - recoveredTxnFiles.incrementAndGet(); - LOG.info() - .$("recovered _txn file [src=").$(srcPath) - .$(", dst=").$(dstPath) - .I$(); - } - } - - srcPath.trimTo(srcPathLen).concat(TableUtils.COLUMN_VERSION_FILE_NAME).$(); - dstPath.trimTo(dstPathLen).concat(TableUtils.COLUMN_VERSION_FILE_NAME).$(); - if (ff.exists(srcPath) && ff.exists(dstPath)) { - if (ff.copy(srcPath, dstPath) < 0) { - LOG.error() - .$("could not copy _cv file [src=").$(srcPath) - .$(", dst=").$(dstPath) - .$(", errno=").$(ff.errno()) - .I$(); - } else { - recoveredCVFiles.incrementAndGet(); - LOG.info() - .$("recovered _cv file [src=").$(srcPath) - .$(", dst=").$(dstPath) - .I$(); - } - } - + copyOrError(srcPath, dstPath, ff, recoveredMetaFiles, TableUtils.META_FILE_NAME); + copyOrError(srcPath.trimTo(srcPathLen), dstPath.trimTo(dstPathLen), ff, recoveredTxnFiles, TableUtils.TXN_FILE_NAME); + copyOrError(srcPath.trimTo(srcPathLen), dstPath.trimTo(dstPathLen), ff, recoveredCVFiles, TableUtils.COLUMN_VERSION_FILE_NAME); rebuildTableFiles(dstPath.trimTo(dstPathLen), symbolFilesCount); // Go inside SEQ_DIR @@ -512,7 +479,7 @@ void recoverSnapshot() { dstPathLen = dstPath.size(); dstPath.concat(TableUtils.META_FILE_NAME).$(); - if (ff.exists(srcPath) && ff.exists(dstPath)) { + if (ff.exists(srcPath)) { if (ff.copy(srcPath, dstPath) < 0) { LOG.critical() .$("could not copy ").$(TableUtils.META_FILE_NAME).$(" file [src=").$(srcPath) diff --git a/core/src/test/java/io/questdb/test/griffin/wal/SnapshotFuzzTest.java b/core/src/test/java/io/questdb/test/griffin/wal/SnapshotFuzzTest.java index 9cd12bc229d9..b6a32cd3078c 100644 --- a/core/src/test/java/io/questdb/test/griffin/wal/SnapshotFuzzTest.java +++ b/core/src/test/java/io/questdb/test/griffin/wal/SnapshotFuzzTest.java @@ -40,15 +40,7 @@ public class SnapshotFuzzTest extends AbstractFuzzTest { @Test - public void testFullFuzz() throws Exception { - Rnd rnd = generateRandom(LOG); - fullFuzz(rnd); - setFuzzProperties(rnd.nextLong(50), getRndO3PartitionSplit(rnd), getRndO3PartitionSplitMaxCount(rnd), 10 * Numbers.SIZE_1MB, 3); - runFuzzWithSnapshot(rnd); - } - - @Test - public void testFullFuzzEjectedTransactions() throws Exception { + public void testSnapshotEjectedWalApply() throws Exception { Rnd rnd = generateRandom(LOG); fuzzer.setFuzzProbabilities( 0, @@ -79,6 +71,14 @@ public void testFullFuzzEjectedTransactions() throws Exception { runFuzzWithSnapshot(rnd); } + @Test + public void testSnapshotFullFuzz() throws Exception { + Rnd rnd = generateRandom(LOG); + fullFuzz(rnd); + setFuzzProperties(rnd.nextLong(50), getRndO3PartitionSplit(rnd), getRndO3PartitionSplitMaxCount(rnd), 10 * Numbers.SIZE_1MB, 3); + runFuzzWithSnapshot(rnd); + } + private void createSnapshot() throws SqlException { setProperty(PropertyKey.CAIRO_SNAPSHOT_INSTANCE_ID, "id_1"); LOG.info().$("starting snapshot").$(); From 4a59ad0d8b1524581cd9d8db4c00e30f92e900ae Mon Sep 17 00:00:00 2001 From: ideoma <2159629+ideoma@users.noreply.github.com> Date: Wed, 14 Feb 2024 15:56:25 +0000 Subject: [PATCH 4/5] fix java 8 compilation --- core/src/test/java/io/questdb/test/griffin/SnapshotTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/java/io/questdb/test/griffin/SnapshotTest.java b/core/src/test/java/io/questdb/test/griffin/SnapshotTest.java index f44f7f8ccbce..f75aac9cdcff 100644 --- a/core/src/test/java/io/questdb/test/griffin/SnapshotTest.java +++ b/core/src/test/java/io/questdb/test/griffin/SnapshotTest.java @@ -235,7 +235,7 @@ public void testRecoverSnapshotSupportsSnapshotTxtFile() throws Exception { engine.clear(); // create snapshot.txt file - var ff = configuration.getFilesFacade(); + FilesFacade ff = configuration.getFilesFacade(); path.trimTo(rootLen).concat(TableUtils.SNAPSHOT_META_FILE_NAME_TXT); int fd = ff.openRW(path.$(), configuration.getWriterFileOpenOpts()); Assert.assertTrue(fd > 0); From fa5626d8c1f789e4d4c9128355c237baf69096e9 Mon Sep 17 00:00:00 2001 From: ideoma <2159629+ideoma@users.noreply.github.com> Date: Wed, 14 Feb 2024 17:28:17 +0000 Subject: [PATCH 5/5] PR feedback --- .../java/io/questdb/cairo/DatabaseSnapshotAgentImpl.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/io/questdb/cairo/DatabaseSnapshotAgentImpl.java b/core/src/main/java/io/questdb/cairo/DatabaseSnapshotAgentImpl.java index e8048a3c403e..55d9cf2195bc 100644 --- a/core/src/main/java/io/questdb/cairo/DatabaseSnapshotAgentImpl.java +++ b/core/src/main/java/io/questdb/cairo/DatabaseSnapshotAgentImpl.java @@ -410,8 +410,12 @@ void recoverSnapshot() { final CharSequence currentInstanceId = configuration.getSnapshotInstanceId(); CharSequence snapshotInstanceId = memFile.getStr(0); if (Chars.empty(snapshotInstanceId)) { + // Check _snapshot.txt file too reading it as a text file. srcPath.trimTo(snapshotRootLen).concat(TableUtils.SNAPSHOT_META_FILE_NAME_TXT).$(); - snapshotInstanceId = TableUtils.readText(ff, srcPath); + String snapshotIdTxt = TableUtils.readText(ff, srcPath); + if (snapshotIdTxt != null) { + snapshotInstanceId = snapshotIdTxt.trim(); + } } if (Chars.empty(currentInstanceId) || Chars.empty(snapshotInstanceId) || Chars.equals(currentInstanceId, snapshotInstanceId)) {