Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core): fix database stuck on startup after some table drops #4152

Merged
merged 5 commits into from Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 4 additions & 2 deletions core/src/main/java/io/questdb/cairo/ColumnPurgeJob.java
Expand Up @@ -231,7 +231,7 @@ private void processTableRecords(CairoEngine engine) {
count++;
long ts = rec.getTimestamp(0);
if (ts != lastTs || taskRun == null) {
if (taskRun != null) {
if (taskRun != null && !taskRun.isEmpty()) {
jerrinot marked this conversation as resolved.
Show resolved Hide resolved
columnPurgeOperator.purgeExclusive(taskRun);
} else {
taskRun = taskPool.pop();
Expand Down Expand Up @@ -270,7 +270,9 @@ private void processTableRecords(CairoEngine engine) {
taskRun.appendColumnInfo(columnVersion, partitionTs, partitionNameTxn, rec.getUpdateRowId());
}
if (taskRun != null) {
columnPurgeOperator.purgeExclusive(taskRun);
if (!taskRun.isEmpty()) {
columnPurgeOperator.purgeExclusive(taskRun);
}
taskPool.push(taskRun);
}
}
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/java/io/questdb/cairo/ColumnPurgeOperator.java
Expand Up @@ -91,6 +91,7 @@ public void close() {
}

public boolean purge(ColumnPurgeTask task) {
assert task.getTableName() != null;
try {
boolean done = purge0(task, ScoreboardUseMode.INTERNAL);
setCompletionTimestamp(completedRowIds, microClock.getTicks());
Expand All @@ -103,6 +104,7 @@ public boolean purge(ColumnPurgeTask task) {
}

public boolean purge(ColumnPurgeTask task, TableReader tableReader) {
assert task.getTableName() != null;
try {
txReader = tableReader.getTxFile();
txnScoreboard = tableReader.getTxnScoreboard();
Expand All @@ -115,6 +117,7 @@ public boolean purge(ColumnPurgeTask task, TableReader tableReader) {
}

public void purgeExclusive(ColumnPurgeTask task) {
assert task.getTableName() != null;
try {
purge0(task, ScoreboardUseMode.EXCLUSIVE);
} catch (Throwable ex) {
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/java/io/questdb/tasks/ColumnPurgeTask.java
Expand Up @@ -27,6 +27,7 @@
import io.questdb.cairo.TableToken;
import io.questdb.std.LongList;
import io.questdb.std.Mutable;
import org.jetbrains.annotations.NotNull;

public class ColumnPurgeTask implements Mutable {
public static final int BLOCK_SIZE = 4;
Expand Down Expand Up @@ -100,7 +101,12 @@ public LongList getUpdatedColumnInfo() {
return updatedColumnInfo;
}

public boolean isEmpty() {
return tableName == null;
}

public void of(
@NotNull
TableToken tableName,
CharSequence columnName,
int tableId,
Expand All @@ -120,6 +126,7 @@ public void of(
}

public void of(
@NotNull
TableToken tableName,
CharSequence columnName,
int tableId,
Expand Down
65 changes: 65 additions & 0 deletions core/src/test/java/io/questdb/test/griffin/ColumnPurgeJobTest.java
Expand Up @@ -52,6 +52,71 @@ public void setUpUpdates() {
columnPurgeRetryDelay = 1;
}

@Test
public void testHandlesDroppedTablesAfterRestart() throws Exception {
assertMemoryLeak(() -> {
currentMicros = 0;
try (ColumnPurgeJob purgeJob = createPurgeJob()) {
createTable("up_part_o3");
createTable("up_part_o3_2");


drainWalQueue();
try (TableReader rdr = getReader("up_part_o3")) {
try (TableReader rdr2 = getReader("up_part_o3_2")) {
update("UPDATE up_part_o3 SET x = 100, str='abcd', sym2='EE' WHERE ts >= '1970-01-03'");
update("UPDATE up_part_o3_2 SET x = 100, str='abcd', sym2='EE' WHERE ts >= '1970-01-03'");
drainWalQueue();

drop("drop table up_part_o3");

runPurgeJob(purgeJob);
rdr.openPartition(0);
rdr2.openPartition(0);
runPurgeJob(purgeJob);
}
}
}


String purgeLogTableName;
try (ColumnPurgeJob purgeJob = createPurgeJob()) {
Assert.assertEquals(0, purgeJob.getOutstandingPurgeTasks());
purgeLogTableName = purgeJob.getLogTableName();
}

assertSql(
"ts\tx\tstr\tsym1\tsym2\n" +
"1970-01-01T02:00:00.000000Z\t1\ta\tA\t2\n" +
"1970-01-02T02:00:00.000000Z\t2\tb\tC\t4\n" +
"1970-01-03T02:00:00.000000Z\t100\tabcd\tA\tEE\n" +
"1970-01-04T02:00:00.000000Z\t100\tabcd\tA\tEE\n" +
"1970-01-05T02:00:00.000000Z\t100\tabcd\tD\tEE\n",
"up_part_o3_2"
);

// cleaned everything, table is truncated
assertSql("ts\ttable_name\tcolumn_name\ttable_id\ttruncate_version\tcolumnType\ttable_partition_by\tupdated_txn\tcolumn_version\tpartition_timestamp\tpartition_name_txn\tcompleted\n", purgeLogTableName);

// Check logging is ok. This test reproduces logging failure because of exception in the middle of logging.
// The result can be that this loop never finishes.
for(int i = 0; i < 1025; i++) {
LOG.infoW().$("test").$();
}
});
}

private static void createTable(String upPartO3) throws SqlException {
ddl("create table " + upPartO3 + " as" +
" (select timestamp_sequence('1970-01-01T02', 24 * 60 * 60 * 1000000L) ts," +
" x," +
" rnd_str('a', 'b', 'c', 'd') str," +
" rnd_symbol('A', 'B', 'C', 'D') sym1," +
" rnd_symbol('1', '2', '3', '4') sym2" +
" from long_sequence(5)), index(sym2)" +
" timestamp(ts) PARTITION BY DAY WAL");
}

@Test
public void testManyUpdatesInserts() throws Exception {
assertMemoryLeak(() -> {
Expand Down