Skip to content

Commit

Permalink
chore(sql): fix occasional failures with first()/last()/first_not_nul…
Browse files Browse the repository at this point in the history
…l()/last_not_null() direct string function (#4324)
  • Loading branch information
jerrinot committed Mar 22, 2024
1 parent 83c3d3f commit 2fd5292
Show file tree
Hide file tree
Showing 115 changed files with 1,556 additions and 1,729 deletions.
3 changes: 2 additions & 1 deletion core/src/main/java/io/questdb/cairo/RecoverVarIndex.java
Expand Up @@ -80,7 +80,8 @@ protected void doReindex(
ff,
path.$(),
maxOffset,
MemoryTag.MMAP_DEFAULT
MemoryTag.MMAP_DEFAULT,
false
)) {

path.trimTo(colNameLen).put(".i");
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/io/questdb/cairo/TableReader.java
Expand Up @@ -842,7 +842,7 @@ private MemoryMR openOrCreateMemory(
if (mem != null && mem != NullMemoryMR.INSTANCE) {
mem.of(ff, path, columnSize, columnSize, MemoryTag.MMAP_TABLE_READER);
} else {
mem = Vm.getMRInstance(ff, path, columnSize, MemoryTag.MMAP_TABLE_READER);
mem = Vm.getMRInstance(ff, path, columnSize, MemoryTag.MMAP_TABLE_READER, true);
columns.setQuick(primaryIndex, mem);
}
return mem;
Expand Down
19 changes: 4 additions & 15 deletions core/src/main/java/io/questdb/cairo/TableReaderRecord.java
Expand Up @@ -30,7 +30,6 @@
import io.questdb.std.Long256;
import io.questdb.std.Rows;
import io.questdb.std.str.CharSink;
import io.questdb.std.str.DirectCharSequence;
import io.questdb.std.str.Sinkable;
import io.questdb.std.str.Utf8Sequence;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -100,18 +99,6 @@ public char getChar(int col) {
return reader.getColumn(absoluteColumnIndex).getChar(offset);
}

@Override
public DirectCharSequence getDirectStr(int col) {
final long recordIndex = getAdjustedRecordIndex(col) * Long.BYTES;
final int absoluteColumnIndex = ifOffsetNegThen0ElseValue(
recordIndex,
TableReader.getPrimaryColumnIndex(columnBase, col)
);
return reader.getColumn(absoluteColumnIndex).getDirectStr(
reader.getColumn(absoluteColumnIndex + 1).getLong(recordIndex)
);
}

@Override
public double getDouble(int col) {
final long offset = getAdjustedRecordIndex(col) * Double.BYTES;
Expand Down Expand Up @@ -310,12 +297,14 @@ public long getUpdateRowId() {
return getRowId();
}

@Override @Nullable
@Override
@Nullable
public Utf8Sequence getVarcharA(int col) {
return getVarchar(col, 1);
}

@Override @Nullable
@Override
@Nullable
public Utf8Sequence getVarcharB(int col) {
return getVarchar(col, 2);
}
Expand Down
Expand Up @@ -28,7 +28,6 @@
import io.questdb.std.*;
import io.questdb.std.str.CharSink;
import io.questdb.std.str.Utf8Sequence;
import io.questdb.std.str.DirectCharSequence;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -102,19 +101,6 @@ public char getChar(int columnIndex) {
return reader.getColumn(absoluteColumnIndex).getChar(offset);
}

@Override
public DirectCharSequence getDirectStr(int columnIndex) {
final int col = deferenceColumn(columnIndex);
final long recordIndex = getAdjustedRecordIndex(col) * Long.BYTES;
final int absoluteColumnIndex = ifOffsetNegThen0ElseValue(
recordIndex,
TableReader.getPrimaryColumnIndex(columnBase, col)
);
long offset = reader.getColumn(absoluteColumnIndex + 1).getLong(recordIndex);
assert recordIndex != 0 || (offset == 0 || offset == Numbers.LONG_NaN);
return reader.getColumn(absoluteColumnIndex).getDirectStr(offset);
}

@Override
public double getDouble(int columnIndex) {
final int col = deferenceColumn(columnIndex);
Expand Down

This file was deleted.

23 changes: 0 additions & 23 deletions core/src/main/java/io/questdb/cairo/sql/Function.java
Expand Up @@ -33,7 +33,6 @@
import io.questdb.std.Long256;
import io.questdb.std.ObjList;
import io.questdb.std.str.CharSink;
import io.questdb.std.str.DirectCharSequence;
import io.questdb.std.str.Utf16Sink;
import io.questdb.std.str.Utf8Sequence;
import io.questdb.std.str.Utf8Sink;
Expand Down Expand Up @@ -91,18 +90,6 @@ default void close() {

long getDate(Record rec);

/**
* Returns UTF-16 encoded off-heap string.
* <p>
* Must be called only if {@link #supportsDirectStr()} method returned true.
* The method is guaranteed to return off-heap strings with stable pointers,
* i.e. once a string is returned, its pointer remains actual until the end
* of query execution.
*/
default DirectCharSequence getDirectStr(Record rec) {
throw new UnsupportedOperationException();
}

double getDouble(Record rec);

float getFloat(Record rec);
Expand Down Expand Up @@ -218,16 +205,6 @@ default boolean isUndefined() {
return getType() == ColumnType.UNDEFINED;
}

/**
* Returns true if {@link #getDirectStr(Record)} method can be safely called.
* The method is guaranteed to return off-heap strings with stable pointers,
* i.e. once a string is returned, its pointer remains actual until the end
* of query execution.
*/
default boolean supportsDirectStr() {
return false;
}

/**
* Returns true if the function supports parallel execution, e.g. parallel filter
* or GROUP BY. If the method returns false, single-threaded execution plan
Expand Down
Expand Up @@ -39,8 +39,8 @@
public class PageAddressCacheRecord implements Record, Closeable {

private final MemoryCR.ByteSequenceView bsview = new MemoryCR.ByteSequenceView();
private final DirectString csviewA = new DirectString();
private final DirectString csviewB = new DirectString();
private final StableDirectString csviewA = new StableDirectString();
private final StableDirectString csviewB = new StableDirectString();
private final Long256Impl long256A = new Long256Impl();
private final Long256Impl long256B = new Long256Impl();
private final ObjList<SymbolTable> symbolTableCache = new ObjList<>();
Expand Down Expand Up @@ -119,18 +119,6 @@ public char getChar(int columnIndex) {
return Unsafe.getUnsafe().getChar(address + (rowIndex << 1));
}

@Override
public DirectCharSequence getDirectStr(int columnIndex) {
final long dataPageAddress = pageAddressCache.getPageAddress(frameIndex, columnIndex);
if (dataPageAddress == 0) {
return NullMemoryMR.INSTANCE.getDirectStr(0);
}
final long indexPageAddress = pageAddressCache.getIndexPageAddress(frameIndex, columnIndex);
final long offset = Unsafe.getUnsafe().getLong(indexPageAddress + (rowIndex << 3));
final long size = pageAddressCache.getPageSize(frameIndex, columnIndex);
return getStrA(dataPageAddress, offset, size, csviewA);
}

@Override
public double getDouble(int columnIndex) {
final long address = pageAddressCache.getPageAddress(frameIndex, columnIndex);
Expand Down
14 changes: 4 additions & 10 deletions core/src/main/java/io/questdb/cairo/sql/Record.java
Expand Up @@ -26,7 +26,10 @@

import io.questdb.std.BinarySequence;
import io.questdb.std.Long256;
import io.questdb.std.str.*;
import io.questdb.std.str.CharSink;
import io.questdb.std.str.Utf16Sink;
import io.questdb.std.str.Utf8Sequence;
import io.questdb.std.str.Utf8Sink;
import org.jetbrains.annotations.Nullable;

/**
Expand Down Expand Up @@ -102,15 +105,6 @@ default long getDate(int col) {
return getLong(col);
}

/**
* Gets the off-heap (direct) value of a string column by index
*
* @param col numeric index of the column
* @return direct string, null if string is empty
*/
default DirectCharSequence getDirectStr(int col) {
throw new UnsupportedOperationException();
}

/**
* Gets the value of a double column by index
Expand Down
20 changes: 16 additions & 4 deletions core/src/main/java/io/questdb/cairo/vm/AbstractMemoryCR.java
Expand Up @@ -34,8 +34,8 @@
public abstract class AbstractMemoryCR implements MemoryCR, Mutable {

private final MemoryCR.ByteSequenceView bsview = new MemoryCR.ByteSequenceView();
private final DirectString csviewA = new DirectString();
private final DirectString csviewB = new DirectString();
private final DirectString csviewA;
private final DirectString csviewB;
private final Long256Impl long256A = new Long256Impl();
private final Long256Impl long256B = new Long256Impl();
private final Utf8SplitString utf8SplitViewA = new Utf8SplitString();
Expand All @@ -48,6 +48,16 @@ public abstract class AbstractMemoryCR implements MemoryCR, Mutable {
protected long size = 0;
private long shiftAddressRight = 0;

public AbstractMemoryCR(boolean stableStrings) {
if (stableStrings) {
csviewA = new StableDirectString();
csviewB = new StableDirectString();
} else {
csviewA = new DirectString();
csviewB = new DirectString();
}
}

public long addressOf(long offset) {
offset -= shiftAddressRight;
assert offset <= size : "offset=" + offset + ", size=" + size;
Expand Down Expand Up @@ -112,12 +122,14 @@ public final CharSequence getStrB(long offset) {
return getStr(offset, csviewB);
}

@Override @NotNull
@Override
@NotNull
public Utf8Sequence getVarcharA(long offset, int size, boolean ascii) {
return getVarchar(offset, size, utf8viewA, ascii);
}

@Override @NotNull
@Override
@NotNull
public Utf8Sequence getVarcharB(long offset, int size, boolean ascii) {
return getVarchar(offset, size, utf8viewB, ascii);
}
Expand Down

0 comments on commit 2fd5292

Please sign in to comment.