Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core): fix snapshot restore to correctly restore tables when snapshot is taken under heavy DDL load #4212

Merged
merged 7 commits into from Feb 14, 2024
Merged
138 changes: 57 additions & 81 deletions core/src/main/java/io/questdb/cairo/DatabaseSnapshotAgentImpl.java
Expand Up @@ -65,12 +65,12 @@ public class DatabaseSnapshotAgentImpl implements DatabaseSnapshotAgent, QuietCl
private final WalWriterMetadata metadata; // protected with #lock
private final StringSink nameSink = new StringSink(); // protected with #lock
private final Path path = new Path(); // protected with #lock
private final GrowOnlyTableNameRegistryStore tableNameRegistryStore; // protected with #lock
private SimpleWaitingLock walPurgeJobRunLock = null; // used as a suspend/resume handler for the WalPurgeJob
private final SymbolMapUtil symbolMapUtil = new SymbolMapUtil();
private final GrowOnlyTableNameRegistryStore tableNameRegistryStore; // protected with #lock
private ColumnVersionReader columnVersionReader = null;
private TableReaderMetadata tableMetadata = null;
private TxWriter txWriter = null;
private SimpleWaitingLock walPurgeJobRunLock = null; // used as a suspend/resume handler for the WalPurgeJob

DatabaseSnapshotAgentImpl(CairoEngine engine) {
this.engine = engine;
Expand Down Expand Up @@ -111,29 +111,21 @@ public void setWalPurgeJobRunLock(@Nullable SimpleWaitingLock walPurgeJobRunLock
this.walPurgeJobRunLock = walPurgeJobRunLock;
}

void completeSnapshot() throws SqlException {
if (!lock.tryLock()) {
throw SqlException.position(0).put("Another snapshot command in progress");
}
try {
// Delete snapshot/db directory.
path.of(configuration.getSnapshotRoot()).concat(configuration.getDbDirectory()).$();
ff.rmdir(path); // it's fine to ignore errors here

// Resume the WalPurgeJob
if (walPurgeJobRunLock != null) {
try {
walPurgeJobRunLock.unlock();
} catch (IllegalStateException ignore) {
// not an error here
// completeSnapshot can be called several time in a row.
}
}

// Reset snapshot in-flight flag.
inProgress.set(false);
} finally {
lock.unlock();
private static void copyOrError(Path srcPath, Path dstPath, FilesFacade ff, AtomicInteger counter, String fileName) {
srcPath.concat(fileName).$();
dstPath.concat(fileName).$();
if (ff.copy(srcPath, dstPath) < 0) {
LOG.error()
.$("could not copy ").$(fileName).$(" file [src=").$(srcPath)
.$(", dst=").$(dstPath)
.$(", errno=").$(ff.errno())
.I$();
} else {
counter.incrementAndGet();
LOG.info()
.$("recovered ").$(fileName).$(" file [src=").$(srcPath)
.$(", dst=").$(dstPath)
.I$();
}
}

Expand Down Expand Up @@ -204,6 +196,32 @@ private void rebuildTableFiles(Path tablePath, AtomicInteger recoveredSymbolFile
}
}

void completeSnapshot() throws SqlException {
if (!lock.tryLock()) {
throw SqlException.position(0).put("Another snapshot command in progress");
}
try {
// Delete snapshot/db directory.
path.of(configuration.getSnapshotRoot()).concat(configuration.getDbDirectory()).$();
ff.rmdir(path); // it's fine to ignore errors here

// Resume the WalPurgeJob
if (walPurgeJobRunLock != null) {
try {
walPurgeJobRunLock.unlock();
} catch (IllegalStateException ignore) {
// not an error here
// completeSnapshot can be called several time in a row.
}
}

// Reset snapshot in-flight flag.
inProgress.set(false);
} finally {
lock.unlock();
}
}

void prepareSnapshot(SqlExecutionContext executionContext) throws SqlException {
// Windows doesn't support sync() system call.
if (Os.isWindows()) {
Expand Down Expand Up @@ -390,7 +408,16 @@ void recoverSnapshot() {
memFile.smallFile(ff, srcPath, MemoryTag.MMAP_DEFAULT);

final CharSequence currentInstanceId = configuration.getSnapshotInstanceId();
final CharSequence snapshotInstanceId = memFile.getStr(0);
CharSequence snapshotInstanceId = memFile.getStr(0);
if (Chars.empty(snapshotInstanceId)) {
// Check _snapshot.txt file too reading it as a text file.
srcPath.trimTo(snapshotRootLen).concat(TableUtils.SNAPSHOT_META_FILE_NAME_TXT).$();
String snapshotIdTxt = TableUtils.readText(ff, srcPath);
if (snapshotIdTxt != null) {
snapshotInstanceId = snapshotIdTxt.trim();
}
}

if (Chars.empty(currentInstanceId) || Chars.empty(snapshotInstanceId) || Chars.equals(currentInstanceId, snapshotInstanceId)) {
LOG.info()
.$("skipping snapshot recovery [currentId=").$(currentInstanceId)
Expand Down Expand Up @@ -447,60 +474,9 @@ void recoverSnapshot() {
int srcPathLen = srcPath.size();
int dstPathLen = dstPath.size();

srcPath.concat(TableUtils.META_FILE_NAME).$();
dstPath.concat(TableUtils.META_FILE_NAME).$();
if (ff.exists(srcPath) && ff.exists(dstPath)) {
if (ff.copy(srcPath, dstPath) < 0) {
LOG.error()
.$("could not copy _meta file [src=").$(srcPath)
.$(", dst=").$(dstPath)
.$(", errno=").$(ff.errno())
.I$();
} else {
recoveredMetaFiles.incrementAndGet();
LOG.info()
.$("recovered _meta file [src=").$(srcPath)
.$(", dst=").$(dstPath)
.I$();
}
}

srcPath.trimTo(srcPathLen).concat(TableUtils.TXN_FILE_NAME).$();
dstPath.trimTo(dstPathLen).concat(TableUtils.TXN_FILE_NAME).$();
if (ff.exists(srcPath) && ff.exists(dstPath)) {
if (ff.copy(srcPath, dstPath) < 0) {
LOG.error()
.$("could not copy _txn file [src=").$(srcPath)
.$(", dst=").$(dstPath)
.$(", errno=").$(ff.errno())
.I$();
} else {
recoveredTxnFiles.incrementAndGet();
LOG.info()
.$("recovered _txn file [src=").$(srcPath)
.$(", dst=").$(dstPath)
.I$();
}
}

srcPath.trimTo(srcPathLen).concat(TableUtils.COLUMN_VERSION_FILE_NAME).$();
dstPath.trimTo(dstPathLen).concat(TableUtils.COLUMN_VERSION_FILE_NAME).$();
if (ff.exists(srcPath) && ff.exists(dstPath)) {
if (ff.copy(srcPath, dstPath) < 0) {
LOG.error()
.$("could not copy _cv file [src=").$(srcPath)
.$(", dst=").$(dstPath)
.$(", errno=").$(ff.errno())
.I$();
} else {
recoveredCVFiles.incrementAndGet();
LOG.info()
.$("recovered _cv file [src=").$(srcPath)
.$(", dst=").$(dstPath)
.I$();
}
}

copyOrError(srcPath, dstPath, ff, recoveredMetaFiles, TableUtils.META_FILE_NAME);
copyOrError(srcPath.trimTo(srcPathLen), dstPath.trimTo(dstPathLen), ff, recoveredTxnFiles, TableUtils.TXN_FILE_NAME);
copyOrError(srcPath.trimTo(srcPathLen), dstPath.trimTo(dstPathLen), ff, recoveredCVFiles, TableUtils.COLUMN_VERSION_FILE_NAME);
rebuildTableFiles(dstPath.trimTo(dstPathLen), symbolFilesCount);

// Go inside SEQ_DIR
Expand All @@ -512,7 +488,7 @@ void recoverSnapshot() {
dstPathLen = dstPath.size();
dstPath.concat(TableUtils.META_FILE_NAME).$();

if (ff.exists(srcPath) && ff.exists(dstPath)) {
if (ff.exists(srcPath)) {
if (ff.copy(srcPath, dstPath) < 0) {
LOG.critical()
.$("could not copy ").$(TableUtils.META_FILE_NAME).$(" file [src=").$(srcPath)
Expand Down
30 changes: 26 additions & 4 deletions core/src/main/java/io/questdb/cairo/TableUtils.java
Expand Up @@ -45,10 +45,7 @@
import io.questdb.mp.MPSequence;
import io.questdb.std.*;
import io.questdb.std.datetime.millitime.MillisecondClock;
import io.questdb.std.str.CharSink;
import io.questdb.std.str.LPSZ;
import io.questdb.std.str.Path;
import io.questdb.std.str.Utf8Sequence;
import io.questdb.std.str.*;
import io.questdb.tasks.O3PartitionPurgeTask;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -105,6 +102,7 @@ public final class TableUtils {
public static final int MIN_INDEX_VALUE_BLOCK_SIZE = Numbers.ceilPow2(4);
public static final int NULL_LEN = -1;
public static final String SNAPSHOT_META_FILE_NAME = "_snapshot";
public static final String SNAPSHOT_META_FILE_NAME_TXT = "_snapshot.txt";
public static final String SYMBOL_KEY_REMAP_FILE_SUFFIX = ".r";
public static final char SYSTEM_TABLE_NAME_SUFFIX = '~';
public static final int TABLE_DOES_NOT_EXIST = 1;
Expand Down Expand Up @@ -1283,6 +1281,30 @@ public static String readTableName(Path path, int rootLen, MemoryCMR mem, FilesF
}
}

public static String readText(FilesFacade ff, Path path1) {
int fd = ff.openRO(path1);
long bytes = 0;
long length = 0;
if (fd > -1) {
try {
length = ff.length(fd);
if (length > 0) {
bytes = Unsafe.malloc(length, MemoryTag.NATIVE_DEFAULT);
if (ff.read(fd, bytes, length, 0) == length) {
return Utf8s.stringFromUtf8Bytes(bytes, bytes + length);
}

}
} finally {
if (bytes != 0) {
Unsafe.free(bytes, length, MemoryTag.NATIVE_DEFAULT);
}
ff.close(fd);
}
}
return null;
}

public static void removeColumnFromMetadata(
CharSequence columnName,
LowerCaseCharSequenceIntHashMap columnNameIndexMap,
Expand Down
53 changes: 53 additions & 0 deletions core/src/test/java/io/questdb/test/griffin/SnapshotTest.java
Expand Up @@ -37,6 +37,7 @@
import io.questdb.mp.SOCountDownLatch;
import io.questdb.mp.SimpleWaitingLock;
import io.questdb.std.*;
import io.questdb.std.str.DirectUtf8Sink;
import io.questdb.std.str.Path;
import io.questdb.std.str.Utf8s;
import io.questdb.test.AbstractCairoTest;
Expand Down Expand Up @@ -210,6 +211,58 @@ public void testRecoverSnapshotRestoresDroppedColumns() throws Exception {
});
}

@Test
public void testRecoverSnapshotSupportsSnapshotTxtFile() throws Exception {
final int partitionCount = 10;
final String snapshotId = "id1";
final String restartedId = "id2";
assertMemoryLeak(() -> {
setProperty(PropertyKey.CAIRO_SNAPSHOT_INSTANCE_ID, "");
final String tableName = "t";
ddl(
"create table " + tableName + " as " +
"(select x, timestamp_sequence(0, 100000000000) ts from long_sequence(" + partitionCount + ")) timestamp(ts) partition by day"
);

ddl("snapshot prepare");

insert(
"insert into " + tableName +
" select x+20 x, timestamp_sequence(100000000000, 100000000000) ts from long_sequence(3)"
);

// Release all readers and writers, but keep the snapshot dir around.
engine.clear();

// create snapshot.txt file
FilesFacade ff = configuration.getFilesFacade();
path.trimTo(rootLen).concat(TableUtils.SNAPSHOT_META_FILE_NAME_TXT);
int fd = ff.openRW(path.$(), configuration.getWriterFileOpenOpts());
Assert.assertTrue(fd > 0);

try {
try (DirectUtf8Sink utf8 = new DirectUtf8Sink(3)) {
utf8.put(snapshotId);
ff.write(fd, utf8.ptr(), utf8.size(), 0);
ff.truncate(fd, utf8.size());
}
} finally {
ff.close(fd);
}
Assert.assertEquals(ff.length(path), restartedId.length());

setProperty(PropertyKey.CAIRO_SNAPSHOT_INSTANCE_ID, restartedId);
engine.recoverSnapshot();

// Data inserted after PREPARE SNAPSHOT should be discarded.
assertSql(
"count\n" +
partitionCount + "\n",
"select count() from " + tableName
);
});
}

@Test
public void testRunWalPurgeJobLockTimeout() throws Exception {
configureCircuitBreakerTimeoutOnFirstCheck(); // trigger timeout on first check
Expand Down
Expand Up @@ -40,15 +40,7 @@

public class SnapshotFuzzTest extends AbstractFuzzTest {
@Test
public void testFullFuzz() throws Exception {
Rnd rnd = generateRandom(LOG);
fullFuzz(rnd);
setFuzzProperties(rnd.nextLong(50), getRndO3PartitionSplit(rnd), getRndO3PartitionSplitMaxCount(rnd), 10 * Numbers.SIZE_1MB, 3);
runFuzzWithSnapshot(rnd);
}

@Test
public void testFullFuzzEjectedTransactions() throws Exception {
public void testSnapshotEjectedWalApply() throws Exception {
Rnd rnd = generateRandom(LOG);
fuzzer.setFuzzProbabilities(
0,
Expand Down Expand Up @@ -79,6 +71,14 @@ public void testFullFuzzEjectedTransactions() throws Exception {
runFuzzWithSnapshot(rnd);
}

@Test
public void testSnapshotFullFuzz() throws Exception {
Rnd rnd = generateRandom(LOG);
fullFuzz(rnd);
setFuzzProperties(rnd.nextLong(50), getRndO3PartitionSplit(rnd), getRndO3PartitionSplitMaxCount(rnd), 10 * Numbers.SIZE_1MB, 3);
runFuzzWithSnapshot(rnd);
}

private void createSnapshot() throws SqlException {
setProperty(PropertyKey.CAIRO_SNAPSHOT_INSTANCE_ID, "id_1");
LOG.info().$("starting snapshot").$();
Expand Down