Skip to content

Commit

Permalink
fix(core): fix snapshot restore to correctly restore tables when snap…
Browse files Browse the repository at this point in the history
…shot is taken under heavy DDL load (#4212)
  • Loading branch information
ideoma committed Feb 14, 2024
1 parent 0bda97e commit d5d6448
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 94 deletions.
138 changes: 57 additions & 81 deletions core/src/main/java/io/questdb/cairo/DatabaseSnapshotAgentImpl.java
Expand Up @@ -65,12 +65,12 @@ public class DatabaseSnapshotAgentImpl implements DatabaseSnapshotAgent, QuietCl
private final WalWriterMetadata metadata; // protected with #lock
private final StringSink nameSink = new StringSink(); // protected with #lock
private final Path path = new Path(); // protected with #lock
private final GrowOnlyTableNameRegistryStore tableNameRegistryStore; // protected with #lock
private SimpleWaitingLock walPurgeJobRunLock = null; // used as a suspend/resume handler for the WalPurgeJob
private final SymbolMapUtil symbolMapUtil = new SymbolMapUtil();
private final GrowOnlyTableNameRegistryStore tableNameRegistryStore; // protected with #lock
private ColumnVersionReader columnVersionReader = null;
private TableReaderMetadata tableMetadata = null;
private TxWriter txWriter = null;
private SimpleWaitingLock walPurgeJobRunLock = null; // used as a suspend/resume handler for the WalPurgeJob

DatabaseSnapshotAgentImpl(CairoEngine engine) {
this.engine = engine;
Expand Down Expand Up @@ -111,29 +111,21 @@ public void setWalPurgeJobRunLock(@Nullable SimpleWaitingLock walPurgeJobRunLock
this.walPurgeJobRunLock = walPurgeJobRunLock;
}

void completeSnapshot() throws SqlException {
if (!lock.tryLock()) {
throw SqlException.position(0).put("Another snapshot command in progress");
}
try {
// Delete snapshot/db directory.
path.of(configuration.getSnapshotRoot()).concat(configuration.getDbDirectory()).$();
ff.rmdir(path); // it's fine to ignore errors here

// Resume the WalPurgeJob
if (walPurgeJobRunLock != null) {
try {
walPurgeJobRunLock.unlock();
} catch (IllegalStateException ignore) {
// not an error here
// completeSnapshot can be called several time in a row.
}
}

// Reset snapshot in-flight flag.
inProgress.set(false);
} finally {
lock.unlock();
private static void copyOrError(Path srcPath, Path dstPath, FilesFacade ff, AtomicInteger counter, String fileName) {
srcPath.concat(fileName).$();
dstPath.concat(fileName).$();
if (ff.copy(srcPath, dstPath) < 0) {
LOG.error()
.$("could not copy ").$(fileName).$(" file [src=").$(srcPath)
.$(", dst=").$(dstPath)
.$(", errno=").$(ff.errno())
.I$();
} else {
counter.incrementAndGet();
LOG.info()
.$("recovered ").$(fileName).$(" file [src=").$(srcPath)
.$(", dst=").$(dstPath)
.I$();
}
}

Expand Down Expand Up @@ -204,6 +196,32 @@ private void rebuildTableFiles(Path tablePath, AtomicInteger recoveredSymbolFile
}
}

void completeSnapshot() throws SqlException {
if (!lock.tryLock()) {
throw SqlException.position(0).put("Another snapshot command in progress");
}
try {
// Delete snapshot/db directory.
path.of(configuration.getSnapshotRoot()).concat(configuration.getDbDirectory()).$();
ff.rmdir(path); // it's fine to ignore errors here

// Resume the WalPurgeJob
if (walPurgeJobRunLock != null) {
try {
walPurgeJobRunLock.unlock();
} catch (IllegalStateException ignore) {
// not an error here
// completeSnapshot can be called several time in a row.
}
}

// Reset snapshot in-flight flag.
inProgress.set(false);
} finally {
lock.unlock();
}
}

void prepareSnapshot(SqlExecutionContext executionContext) throws SqlException {
// Windows doesn't support sync() system call.
if (Os.isWindows()) {
Expand Down Expand Up @@ -390,7 +408,16 @@ void recoverSnapshot() {
memFile.smallFile(ff, srcPath, MemoryTag.MMAP_DEFAULT);

final CharSequence currentInstanceId = configuration.getSnapshotInstanceId();
final CharSequence snapshotInstanceId = memFile.getStr(0);
CharSequence snapshotInstanceId = memFile.getStr(0);
if (Chars.empty(snapshotInstanceId)) {
// Check _snapshot.txt file too reading it as a text file.
srcPath.trimTo(snapshotRootLen).concat(TableUtils.SNAPSHOT_META_FILE_NAME_TXT).$();
String snapshotIdTxt = TableUtils.readText(ff, srcPath);
if (snapshotIdTxt != null) {
snapshotInstanceId = snapshotIdTxt.trim();
}
}

if (Chars.empty(currentInstanceId) || Chars.empty(snapshotInstanceId) || Chars.equals(currentInstanceId, snapshotInstanceId)) {
LOG.info()
.$("skipping snapshot recovery [currentId=").$(currentInstanceId)
Expand Down Expand Up @@ -447,60 +474,9 @@ void recoverSnapshot() {
int srcPathLen = srcPath.size();
int dstPathLen = dstPath.size();

srcPath.concat(TableUtils.META_FILE_NAME).$();
dstPath.concat(TableUtils.META_FILE_NAME).$();
if (ff.exists(srcPath) && ff.exists(dstPath)) {
if (ff.copy(srcPath, dstPath) < 0) {
LOG.error()
.$("could not copy _meta file [src=").$(srcPath)
.$(", dst=").$(dstPath)
.$(", errno=").$(ff.errno())
.I$();
} else {
recoveredMetaFiles.incrementAndGet();
LOG.info()
.$("recovered _meta file [src=").$(srcPath)
.$(", dst=").$(dstPath)
.I$();
}
}

srcPath.trimTo(srcPathLen).concat(TableUtils.TXN_FILE_NAME).$();
dstPath.trimTo(dstPathLen).concat(TableUtils.TXN_FILE_NAME).$();
if (ff.exists(srcPath) && ff.exists(dstPath)) {
if (ff.copy(srcPath, dstPath) < 0) {
LOG.error()
.$("could not copy _txn file [src=").$(srcPath)
.$(", dst=").$(dstPath)
.$(", errno=").$(ff.errno())
.I$();
} else {
recoveredTxnFiles.incrementAndGet();
LOG.info()
.$("recovered _txn file [src=").$(srcPath)
.$(", dst=").$(dstPath)
.I$();
}
}

srcPath.trimTo(srcPathLen).concat(TableUtils.COLUMN_VERSION_FILE_NAME).$();
dstPath.trimTo(dstPathLen).concat(TableUtils.COLUMN_VERSION_FILE_NAME).$();
if (ff.exists(srcPath) && ff.exists(dstPath)) {
if (ff.copy(srcPath, dstPath) < 0) {
LOG.error()
.$("could not copy _cv file [src=").$(srcPath)
.$(", dst=").$(dstPath)
.$(", errno=").$(ff.errno())
.I$();
} else {
recoveredCVFiles.incrementAndGet();
LOG.info()
.$("recovered _cv file [src=").$(srcPath)
.$(", dst=").$(dstPath)
.I$();
}
}

copyOrError(srcPath, dstPath, ff, recoveredMetaFiles, TableUtils.META_FILE_NAME);
copyOrError(srcPath.trimTo(srcPathLen), dstPath.trimTo(dstPathLen), ff, recoveredTxnFiles, TableUtils.TXN_FILE_NAME);
copyOrError(srcPath.trimTo(srcPathLen), dstPath.trimTo(dstPathLen), ff, recoveredCVFiles, TableUtils.COLUMN_VERSION_FILE_NAME);
rebuildTableFiles(dstPath.trimTo(dstPathLen), symbolFilesCount);

// Go inside SEQ_DIR
Expand All @@ -512,7 +488,7 @@ void recoverSnapshot() {
dstPathLen = dstPath.size();
dstPath.concat(TableUtils.META_FILE_NAME).$();

if (ff.exists(srcPath) && ff.exists(dstPath)) {
if (ff.exists(srcPath)) {
if (ff.copy(srcPath, dstPath) < 0) {
LOG.critical()
.$("could not copy ").$(TableUtils.META_FILE_NAME).$(" file [src=").$(srcPath)
Expand Down
30 changes: 26 additions & 4 deletions core/src/main/java/io/questdb/cairo/TableUtils.java
Expand Up @@ -45,10 +45,7 @@
import io.questdb.mp.MPSequence;
import io.questdb.std.*;
import io.questdb.std.datetime.millitime.MillisecondClock;
import io.questdb.std.str.CharSink;
import io.questdb.std.str.LPSZ;
import io.questdb.std.str.Path;
import io.questdb.std.str.Utf8Sequence;
import io.questdb.std.str.*;
import io.questdb.tasks.O3PartitionPurgeTask;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -105,6 +102,7 @@ public final class TableUtils {
public static final int MIN_INDEX_VALUE_BLOCK_SIZE = Numbers.ceilPow2(4);
public static final int NULL_LEN = -1;
public static final String SNAPSHOT_META_FILE_NAME = "_snapshot";
public static final String SNAPSHOT_META_FILE_NAME_TXT = "_snapshot.txt";
public static final String SYMBOL_KEY_REMAP_FILE_SUFFIX = ".r";
public static final char SYSTEM_TABLE_NAME_SUFFIX = '~';
public static final int TABLE_DOES_NOT_EXIST = 1;
Expand Down Expand Up @@ -1283,6 +1281,30 @@ public static String readTableName(Path path, int rootLen, MemoryCMR mem, FilesF
}
}

public static String readText(FilesFacade ff, Path path1) {
int fd = ff.openRO(path1);
long bytes = 0;
long length = 0;
if (fd > -1) {
try {
length = ff.length(fd);
if (length > 0) {
bytes = Unsafe.malloc(length, MemoryTag.NATIVE_DEFAULT);
if (ff.read(fd, bytes, length, 0) == length) {
return Utf8s.stringFromUtf8Bytes(bytes, bytes + length);
}

}
} finally {
if (bytes != 0) {
Unsafe.free(bytes, length, MemoryTag.NATIVE_DEFAULT);
}
ff.close(fd);
}
}
return null;
}

public static void removeColumnFromMetadata(
CharSequence columnName,
LowerCaseCharSequenceIntHashMap columnNameIndexMap,
Expand Down
53 changes: 53 additions & 0 deletions core/src/test/java/io/questdb/test/griffin/SnapshotTest.java
Expand Up @@ -37,6 +37,7 @@
import io.questdb.mp.SOCountDownLatch;
import io.questdb.mp.SimpleWaitingLock;
import io.questdb.std.*;
import io.questdb.std.str.DirectUtf8Sink;
import io.questdb.std.str.Path;
import io.questdb.std.str.Utf8s;
import io.questdb.test.AbstractCairoTest;
Expand Down Expand Up @@ -210,6 +211,58 @@ public void testRecoverSnapshotRestoresDroppedColumns() throws Exception {
});
}

@Test
public void testRecoverSnapshotSupportsSnapshotTxtFile() throws Exception {
final int partitionCount = 10;
final String snapshotId = "id1";
final String restartedId = "id2";
assertMemoryLeak(() -> {
setProperty(PropertyKey.CAIRO_SNAPSHOT_INSTANCE_ID, "");
final String tableName = "t";
ddl(
"create table " + tableName + " as " +
"(select x, timestamp_sequence(0, 100000000000) ts from long_sequence(" + partitionCount + ")) timestamp(ts) partition by day"
);

ddl("snapshot prepare");

insert(
"insert into " + tableName +
" select x+20 x, timestamp_sequence(100000000000, 100000000000) ts from long_sequence(3)"
);

// Release all readers and writers, but keep the snapshot dir around.
engine.clear();

// create snapshot.txt file
FilesFacade ff = configuration.getFilesFacade();
path.trimTo(rootLen).concat(TableUtils.SNAPSHOT_META_FILE_NAME_TXT);
int fd = ff.openRW(path.$(), configuration.getWriterFileOpenOpts());
Assert.assertTrue(fd > 0);

try {
try (DirectUtf8Sink utf8 = new DirectUtf8Sink(3)) {
utf8.put(snapshotId);
ff.write(fd, utf8.ptr(), utf8.size(), 0);
ff.truncate(fd, utf8.size());
}
} finally {
ff.close(fd);
}
Assert.assertEquals(ff.length(path), restartedId.length());

setProperty(PropertyKey.CAIRO_SNAPSHOT_INSTANCE_ID, restartedId);
engine.recoverSnapshot();

// Data inserted after PREPARE SNAPSHOT should be discarded.
assertSql(
"count\n" +
partitionCount + "\n",
"select count() from " + tableName
);
});
}

@Test
public void testRunWalPurgeJobLockTimeout() throws Exception {
configureCircuitBreakerTimeoutOnFirstCheck(); // trigger timeout on first check
Expand Down
Expand Up @@ -40,15 +40,7 @@

public class SnapshotFuzzTest extends AbstractFuzzTest {
@Test
public void testFullFuzz() throws Exception {
Rnd rnd = generateRandom(LOG);
fullFuzz(rnd);
setFuzzProperties(rnd.nextLong(50), getRndO3PartitionSplit(rnd), getRndO3PartitionSplitMaxCount(rnd), 10 * Numbers.SIZE_1MB, 3);
runFuzzWithSnapshot(rnd);
}

@Test
public void testFullFuzzEjectedTransactions() throws Exception {
public void testSnapshotEjectedWalApply() throws Exception {
Rnd rnd = generateRandom(LOG);
fuzzer.setFuzzProbabilities(
0,
Expand Down Expand Up @@ -79,6 +71,14 @@ public void testFullFuzzEjectedTransactions() throws Exception {
runFuzzWithSnapshot(rnd);
}

@Test
public void testSnapshotFullFuzz() throws Exception {
Rnd rnd = generateRandom(LOG);
fullFuzz(rnd);
setFuzzProperties(rnd.nextLong(50), getRndO3PartitionSplit(rnd), getRndO3PartitionSplitMaxCount(rnd), 10 * Numbers.SIZE_1MB, 3);
runFuzzWithSnapshot(rnd);
}

private void createSnapshot() throws SqlException {
setProperty(PropertyKey.CAIRO_SNAPSHOT_INSTANCE_ID, "id_1");
LOG.info().$("starting snapshot").$();
Expand Down

0 comments on commit d5d6448

Please sign in to comment.