Skip to content

Commit

Permalink
fix(sql): segfault when running SAMPLE BY query with index-based filt…
Browse files Browse the repository at this point in the history
…er and last/first functions
  • Loading branch information
puzpuzpuz committed Feb 26, 2024
1 parent 322d412 commit 6f9684c
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 16 deletions.
5 changes: 3 additions & 2 deletions core/src/main/c/share/bitmap_index_utils.cpp
Expand Up @@ -194,7 +194,7 @@ inline int32_t findFirstLastInFrame0(
// outputRowIds(timestamp) : 0, 2, 3
// Output in timestampOut contains indexes of the periods found
// Last value of output buffers reserved for positions to resume the search
// outputRowIds[n + 0] : 7 ( index position )
// outputRowIds[n + 0] : 7 ( index position )
// outputRowIds[n + 1] : 108 ( timestamp column position )
// outputRowIds[n + 2] : 4 ( last processed period end index )
int32_t periodIndex = 0;
Expand All @@ -220,9 +220,10 @@ inline int32_t findFirstLastInFrame0(
if (outIndex > 0
&& outputRowIds[outIndex - 1].timestamp_index == periodIndex + sampleIndexOffset - 1 // prev out row is for period - 1
&& indexLo > 0) {

int64_t prevLastRowId = fnTsRowIdByIndex(indexLo - 1);
outputRowIds[outIndex - 1].last_row_id = prevLastRowId - frameBaseOffset;
firstRowUpdated |= outIndex == 1; // need to know if firt row lat_row_id is updated
firstRowUpdated |= outIndex == 1; // need to know if first row last_row_id is updated
}

if (indexLo == indexHi || sampleStart > maxTs || periodIndex > samplePeriodCount - 2) {
Expand Down
Expand Up @@ -177,6 +177,21 @@ public boolean usesIndex() {
return base.usesIndex();
}

private static long findSafeIndexFrameSize(IndexFrame indexFrame, long dataFrameHi) {
long frameAddress = indexFrame.getAddress();
if (frameAddress == 0) {
return indexFrame.getSize();
}
long safeFrameSize = indexFrame.getSize();
for (long p = frameAddress + (indexFrame.getSize() - 1) * Long.BYTES; p >= frameAddress; p -= Long.BYTES) {
if (Unsafe.getUnsafe().getLong(p) < dataFrameHi) {
break;
}
safeFrameSize--;
}
return safeFrameSize;
}

private void buildFirstLastIndex(
int[] firstLastIndex,
int[] queryToFrameColumnMapping,
Expand Down Expand Up @@ -462,7 +477,7 @@ private int getNextState(int state) {
int outPosition = crossRowState == NONE ? 0 : 1;
long offsetTimestampColumnAddress = currentFrame.getPageAddress(timestampIndex) - dataFrameLo * Long.BYTES;
long iFrameAddress = indexFrame.getAddress();
long iFrameSize = indexFrame.getSize();
long iFrameSize = findSafeIndexFrameSize(indexFrame, dataFrameHi);
long lastIndexRowId = iFrameAddress > 0
? Unsafe.getUnsafe().getLong(iFrameAddress + (iFrameSize - 1) * Long.BYTES)
: Long.MAX_VALUE;
Expand All @@ -483,7 +498,8 @@ private int getNextState(int state) {
samplePeriodCount,
samplePeriodIndexOffset,
rowIdOutAddress.getAddress(),
pageSize);
pageSize
);

boolean firstRowLastRowIdUpdated = rowsFound < 0;
rowsFound = Math.abs(rowsFound);
Expand Down Expand Up @@ -608,6 +624,7 @@ private void saveRowIdValueToCrossRow(long rowId, int columnIndex) {
int frameColIndex = queryToFrameColumnMapping[columnIndex];
long pageAddress = currentFrame.getPageAddress(frameColIndex);
if (pageAddress > 0) {
assert currentFrame.getPageSize(frameColIndex) > rowId : currentFrame.getPageSize(frameColIndex) + ", rowId: " + rowId;
saveFixedColToBufferWithLongAlignment(columnIndex, crossFrameRow, columnType, pageAddress, rowId);
} else {
crossFrameRow.set(columnIndex, LongNullUtils.getLongNull(columnType));
Expand Down
43 changes: 32 additions & 11 deletions core/src/main/java/io/questdb/std/BitmapIndexUtilsNative.java
Expand Up @@ -25,6 +25,7 @@
package io.questdb.std;

public class BitmapIndexUtilsNative {

public static int findFirstLastInFrame(
int outIndex,
long rowIdLo,
Expand All @@ -38,7 +39,8 @@ public static int findFirstLastInFrame(
int samplePeriodsCount,
long samplePeriodIndexOffset,
long rowIdOutAddress,
int outSize) {
int outSize
) {
if (symbolIndexAddress > 0) {
return findFirstLastInFrame0(
outIndex,
Expand All @@ -56,6 +58,7 @@ public static int findFirstLastInFrame(
outSize
);
} else {
// null value in filter case
return findFirstLastInFrameNoFilter0(
outIndex,
rowIdLo,
Expand All @@ -71,10 +74,18 @@ public static int findFirstLastInFrame(
}
}

public static void latestScanBackward(long keysMemory, long keysMemorySize, long valuesMemory,
long valuesMemorySize, long argsMemory, long unIndexedNullCount,
long maxValue, long minValue,
int partitionIndex, int blockValueCountMod) {
public static void latestScanBackward(
long keysMemory,
long keysMemorySize,
long valuesMemory,
long valuesMemorySize,
long argsMemory,
long unIndexedNullCount,
long maxValue,
long minValue,
int partitionIndex,
int blockValueCountMod
) {
assert keysMemory > 0;
assert keysMemorySize > 0;
assert valuesMemory > 0;
Expand All @@ -100,7 +111,8 @@ private static native int findFirstLastInFrame0(
int samplePeriodCount,
long samplePeriodIndexOffset,
long rowIdOutAddress,
int outSize);
int outSize
);

private static native int findFirstLastInFrameNoFilter0(
int outIndex,
Expand All @@ -112,10 +124,19 @@ private static native int findFirstLastInFrameNoFilter0(
int samplePeriodCount,
long samplePeriodIndexOffset,
long rowIdOutAddress,
int outSize);
int outSize
);

private static native void latestScanBackward0(long keysMemory, long keysMemorySize, long valuesMemory,
long valuesMemorySize, long argsMemory, long unIndexedNullCount,
long maxValue, long minValue,
int partitionIndex, int blockValueCountMod);
private static native void latestScanBackward0(
long keysMemory,
long keysMemorySize,
long valuesMemory,
long valuesMemorySize,
long argsMemory,
long unIndexedNullCount,
long maxValue,
long minValue,
int partitionIndex,
int blockValueCountMod
);
}
Binary file not shown.
Empty file modified core/src/main/resources/io/questdb/bin/linux/libquestdb.so 100755 → 100644
Empty file.
Expand Up @@ -38,9 +38,13 @@
import io.questdb.griffin.model.QueryColumn;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.mp.SOCountDownLatch;
import io.questdb.mp.WorkerPool;
import io.questdb.std.Chars;
import io.questdb.std.FilesFacade;
import io.questdb.std.Misc;
import io.questdb.std.ObjList;
import io.questdb.std.datetime.microtime.Timestamps;
import io.questdb.test.AbstractCairoTest;
import io.questdb.test.cairo.DefaultTestCairoConfiguration;
import io.questdb.test.cutlass.text.SqlExecutionContextStub;
Expand All @@ -51,6 +55,9 @@
import org.junit.Test;

import java.util.Arrays;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class SampleByTest extends AbstractCairoTest {
private final static Log LOG = LogFactory.getLog(SampleByTest.class);
Expand Down Expand Up @@ -3073,6 +3080,28 @@ public void testSampleByFirstLastFactoryIsNotChosenIfKeyedByNonSymbol() throws E
});
}

@Test
public void testSampleByFirstLastIndexFilterByNullConcurrent() throws Exception {
testSampleByFirstLastIndexedConcurrent(
"SELECT first(kms) as k_first, last(kms) AS k_last, first(d1) as d1_first, last(d1) as d1_last, first(d2) as d2_first, last(d2) as d2_last\n" +
"FROM x\n" +
"WHERE k BETWEEN '1970-01-01T00:15:33.063Z' AND '1970-01-01T01:15:33.063Z'\n" +
" AND s = null\n" +
"SAMPLE BY 10s;"
);
}

@Test
public void testSampleByFirstLastIndexFilterConcurrent() throws Exception {
testSampleByFirstLastIndexedConcurrent(
"SELECT first(kms) as k_first, last(kms) AS k_last, first(d1) as d1_first, last(d1) as d1_last, first(d2) as d2_first, last(d2) as d2_last\n" +
"FROM x\n" +
"WHERE k BETWEEN '1970-01-01T00:15:33.063Z' AND '1970-01-01T01:15:33.063Z'\n" +
" AND s = 'a'\n" +
"SAMPLE BY 10s;"
);
}

@Test
public void testSampleByFirstLastRecordCursorFactoryInvalidColumns() {
try {
Expand Down Expand Up @@ -3175,6 +3204,28 @@ public void testSampleByFirstLastWithNonTsOrFilteredSymbolColumn() throws Except
);
}

@Test
public void testSampleByLastIndexFilterByNullConcurrent() throws Exception {
testSampleByFirstLastIndexedConcurrent(
"SELECT last(kms) as k, last(d1) as d1, last(d2) as d2\n" +
"FROM x\n" +
"WHERE k BETWEEN '1970-01-01T00:15:33.063Z' AND '1970-01-01T01:15:33.063Z'\n" +
" AND s = null\n" +
"SAMPLE BY 10s;"
);
}

@Test
public void testSampleByLastOnlyIndexFilterConcurrent() throws Exception {
testSampleByFirstLastIndexedConcurrent(
"SELECT last(kms) as k, last(d1) as d1, last(d2) as d2\n" +
"FROM x\n" +
"WHERE k BETWEEN '1970-01-01T00:15:33.063Z' AND '1970-01-01T01:15:33.063Z'\n" +
" AND s = 'a'\n" +
"SAMPLE BY 10s;"
);
}

@Test
public void testSampleByMicrosFillNoneNotKeyedEmpty() throws Exception {
assertQuery(
Expand Down Expand Up @@ -10275,7 +10326,7 @@ public void testTimestampIsNotRequiredAfterSubqueryWithExplicitTsNotInSelectList
"), timed as (\n" +
" select * from ordered timestamp(ts)\n" +
"), sampled as (\n" +
" SELECT sum(value) as value\n" + //no ts in select list
" SELECT sum(value) as value\n" + //no ts in select list
" FROM timed\n" +
" SAMPLE BY 1d FILL(0) ALIGN TO CALENDAR \n" +
")\n" +
Expand Down Expand Up @@ -10508,6 +10559,76 @@ private boolean isNone(String fill) {
return "".equals(fill) || "none".equals(fill);
}

private void testSampleByFirstLastIndexedConcurrent(String query) throws Exception {
final int threadCount = 4;
final int workerCount = 2;

WorkerPool pool = new WorkerPool((() -> workerCount));
TestUtils.execute(pool, (engine, compiler, sqlExecutionContext) -> {
engine.ddl(
"create table x (d1 double, d2 double, s symbol index, kms long, k timestamp) timestamp(k) partition by day;",
sqlExecutionContext
);

final RecordCursorFactory[] factories = new RecordCursorFactory[threadCount];
for (int i = 0; i < threadCount; i++) {
factories[i] = engine.select(query, sqlExecutionContext);
}

final AtomicInteger errors = new AtomicInteger();
final CyclicBarrier barrier = new CyclicBarrier(threadCount);
final SOCountDownLatch haltLatch = new SOCountDownLatch(threadCount);
final AtomicBoolean writerDone = new AtomicBoolean();
for (int i = 0; i < threadCount; i++) {
final int finalI = i;
new Thread(() -> {
TestUtils.await(barrier);

final RecordCursorFactory factory = factories[finalI];
while (!writerDone.get()) {
try (RecordCursor cursor = factory.getCursor(sqlExecutionContext)) {
while (cursor.hasNext()) {
// no-op
}
} catch (Throwable e) {
e.printStackTrace();
errors.incrementAndGet();
}
}
haltLatch.countDown();
}).start();
}

final int rows = 10000;
final int batchSize = 10;
long ts = 0;
try (TableWriter writer = TestUtils.getWriter(engine, "x")) {
for (int i = 0; i < rows; i++) {
TableWriter.Row row = writer.newRow(ts);
row.putDouble(0, 42);
row.putDouble(1, 42);
row.putSym(2, (char) ('a' + i % 3));
ts += Timestamps.SECOND_MICROS;
row.putLong(3, ts / Timestamps.MILLI_MICROS);
row.append();
if ((i % batchSize) == 0) {
writer.commit();
}
}
writer.commit();
}

writerDone.set(true);
haltLatch.await();

Misc.free(factories);
Assert.assertEquals(0, errors.get());
},
configuration,
LOG
);
}

private void testSampleByPeriodFails(String query, int errorPosition, String errorContains) throws Exception {
assertMemoryLeak(() -> {
ddl(
Expand Down

0 comments on commit 6f9684c

Please sign in to comment.