Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(sql): fix occasional failures with first()/last()/first_not_null()/last_not_null() direct string function #4324

Merged
merged 15 commits into from Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/main/java/io/questdb/cairo/RecoverVarIndex.java
Expand Up @@ -80,7 +80,8 @@ protected void doReindex(
ff,
path.$(),
maxOffset,
MemoryTag.MMAP_DEFAULT
MemoryTag.MMAP_DEFAULT,
false
)) {

path.trimTo(colNameLen).put(".i");
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/io/questdb/cairo/TableReader.java
Expand Up @@ -842,7 +842,7 @@ private MemoryMR openOrCreateMemory(
if (mem != null && mem != NullMemoryMR.INSTANCE) {
mem.of(ff, path, columnSize, columnSize, MemoryTag.MMAP_TABLE_READER);
} else {
mem = Vm.getMRInstance(ff, path, columnSize, MemoryTag.MMAP_TABLE_READER);
mem = Vm.getMRInstance(ff, path, columnSize, MemoryTag.MMAP_TABLE_READER, true);
columns.setQuick(primaryIndex, mem);
}
return mem;
Expand Down
19 changes: 4 additions & 15 deletions core/src/main/java/io/questdb/cairo/TableReaderRecord.java
Expand Up @@ -30,7 +30,6 @@
import io.questdb.std.Long256;
import io.questdb.std.Rows;
import io.questdb.std.str.CharSink;
import io.questdb.std.str.DirectCharSequence;
import io.questdb.std.str.Sinkable;
import io.questdb.std.str.Utf8Sequence;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -100,18 +99,6 @@ public char getChar(int col) {
return reader.getColumn(absoluteColumnIndex).getChar(offset);
}

@Override
public DirectCharSequence getDirectStr(int col) {
final long recordIndex = getAdjustedRecordIndex(col) * Long.BYTES;
final int absoluteColumnIndex = ifOffsetNegThen0ElseValue(
recordIndex,
TableReader.getPrimaryColumnIndex(columnBase, col)
);
return reader.getColumn(absoluteColumnIndex).getDirectStr(
reader.getColumn(absoluteColumnIndex + 1).getLong(recordIndex)
);
}

@Override
public double getDouble(int col) {
final long offset = getAdjustedRecordIndex(col) * Double.BYTES;
Expand Down Expand Up @@ -310,12 +297,14 @@ public long getUpdateRowId() {
return getRowId();
}

@Override @Nullable
@Override
@Nullable
public Utf8Sequence getVarcharA(int col) {
return getVarchar(col, 1);
}

@Override @Nullable
@Override
@Nullable
public Utf8Sequence getVarcharB(int col) {
return getVarchar(col, 2);
}
Expand Down
Expand Up @@ -28,7 +28,6 @@
import io.questdb.std.*;
import io.questdb.std.str.CharSink;
import io.questdb.std.str.Utf8Sequence;
import io.questdb.std.str.DirectCharSequence;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -102,19 +101,6 @@ public char getChar(int columnIndex) {
return reader.getColumn(absoluteColumnIndex).getChar(offset);
}

@Override
public DirectCharSequence getDirectStr(int columnIndex) {
final int col = deferenceColumn(columnIndex);
final long recordIndex = getAdjustedRecordIndex(col) * Long.BYTES;
final int absoluteColumnIndex = ifOffsetNegThen0ElseValue(
recordIndex,
TableReader.getPrimaryColumnIndex(columnBase, col)
);
long offset = reader.getColumn(absoluteColumnIndex + 1).getLong(recordIndex);
assert recordIndex != 0 || (offset == 0 || offset == Numbers.LONG_NaN);
return reader.getColumn(absoluteColumnIndex).getDirectStr(offset);
}

@Override
public double getDouble(int columnIndex) {
final int col = deferenceColumn(columnIndex);
Expand Down

This file was deleted.

23 changes: 0 additions & 23 deletions core/src/main/java/io/questdb/cairo/sql/Function.java
Expand Up @@ -33,7 +33,6 @@
import io.questdb.std.Long256;
import io.questdb.std.ObjList;
import io.questdb.std.str.CharSink;
import io.questdb.std.str.DirectCharSequence;
import io.questdb.std.str.Utf16Sink;
import io.questdb.std.str.Utf8Sequence;
import io.questdb.std.str.Utf8Sink;
Expand Down Expand Up @@ -91,18 +90,6 @@ default void close() {

long getDate(Record rec);

/**
* Returns UTF-16 encoded off-heap string.
* <p>
* Must be called only if {@link #supportsDirectStr()} method returned true.
* The method is guaranteed to return off-heap strings with stable pointers,
* i.e. once a string is returned, its pointer remains actual until the end
* of query execution.
*/
default DirectCharSequence getDirectStr(Record rec) {
throw new UnsupportedOperationException();
}

double getDouble(Record rec);

float getFloat(Record rec);
Expand Down Expand Up @@ -218,16 +205,6 @@ default boolean isUndefined() {
return getType() == ColumnType.UNDEFINED;
}

/**
* Returns true if {@link #getDirectStr(Record)} method can be safely called.
* The method is guaranteed to return off-heap strings with stable pointers,
* i.e. once a string is returned, its pointer remains actual until the end
* of query execution.
*/
default boolean supportsDirectStr() {
return false;
}

/**
* Returns true if the function supports parallel execution, e.g. parallel filter
* or GROUP BY. If the method returns false, single-threaded execution plan
Expand Down
Expand Up @@ -39,8 +39,8 @@
public class PageAddressCacheRecord implements Record, Closeable {

private final MemoryCR.ByteSequenceView bsview = new MemoryCR.ByteSequenceView();
private final DirectString csviewA = new DirectString();
private final DirectString csviewB = new DirectString();
private final StableDirectString csviewA = new StableDirectString();
private final StableDirectString csviewB = new StableDirectString();
private final Long256Impl long256A = new Long256Impl();
private final Long256Impl long256B = new Long256Impl();
private final ObjList<SymbolTable> symbolTableCache = new ObjList<>();
Expand Down Expand Up @@ -119,18 +119,6 @@ public char getChar(int columnIndex) {
return Unsafe.getUnsafe().getChar(address + (rowIndex << 1));
}

@Override
jerrinot marked this conversation as resolved.
Show resolved Hide resolved
public DirectCharSequence getDirectStr(int columnIndex) {
final long dataPageAddress = pageAddressCache.getPageAddress(frameIndex, columnIndex);
if (dataPageAddress == 0) {
return NullMemoryMR.INSTANCE.getDirectStr(0);
}
final long indexPageAddress = pageAddressCache.getIndexPageAddress(frameIndex, columnIndex);
final long offset = Unsafe.getUnsafe().getLong(indexPageAddress + (rowIndex << 3));
final long size = pageAddressCache.getPageSize(frameIndex, columnIndex);
return getStrA(dataPageAddress, offset, size, csviewA);
}

@Override
public double getDouble(int columnIndex) {
final long address = pageAddressCache.getPageAddress(frameIndex, columnIndex);
Expand Down
14 changes: 4 additions & 10 deletions core/src/main/java/io/questdb/cairo/sql/Record.java
Expand Up @@ -26,7 +26,10 @@

import io.questdb.std.BinarySequence;
import io.questdb.std.Long256;
import io.questdb.std.str.*;
import io.questdb.std.str.CharSink;
import io.questdb.std.str.Utf16Sink;
import io.questdb.std.str.Utf8Sequence;
import io.questdb.std.str.Utf8Sink;
import org.jetbrains.annotations.Nullable;

/**
Expand Down Expand Up @@ -102,15 +105,6 @@ default long getDate(int col) {
return getLong(col);
}

/**
* Gets the off-heap (direct) value of a string column by index
*
* @param col numeric index of the column
* @return direct string, null if string is empty
*/
default DirectCharSequence getDirectStr(int col) {
throw new UnsupportedOperationException();
}

/**
* Gets the value of a double column by index
Expand Down
20 changes: 16 additions & 4 deletions core/src/main/java/io/questdb/cairo/vm/AbstractMemoryCR.java
Expand Up @@ -34,8 +34,8 @@
public abstract class AbstractMemoryCR implements MemoryCR, Mutable {

private final MemoryCR.ByteSequenceView bsview = new MemoryCR.ByteSequenceView();
private final DirectString csviewA = new DirectString();
private final DirectString csviewB = new DirectString();
private final DirectString csviewA;
private final DirectString csviewB;
private final Long256Impl long256A = new Long256Impl();
private final Long256Impl long256B = new Long256Impl();
private final Utf8SplitString utf8SplitViewA = new Utf8SplitString();
Expand All @@ -48,6 +48,16 @@ public abstract class AbstractMemoryCR implements MemoryCR, Mutable {
protected long size = 0;
private long shiftAddressRight = 0;

public AbstractMemoryCR(boolean stableStrings) {
if (stableStrings) {
csviewA = new StableDirectString();
csviewB = new StableDirectString();
} else {
csviewA = new DirectString();
csviewB = new DirectString();
}
}

public long addressOf(long offset) {
offset -= shiftAddressRight;
assert offset <= size : "offset=" + offset + ", size=" + size;
Expand Down Expand Up @@ -112,12 +122,14 @@ public final CharSequence getStrB(long offset) {
return getStr(offset, csviewB);
}

@Override @NotNull
@Override
@NotNull
public Utf8Sequence getVarcharA(long offset, int size, boolean ascii) {
return getVarchar(offset, size, utf8viewA, ascii);
}

@Override @NotNull
@Override
@NotNull
public Utf8Sequence getVarcharB(long offset, int size, boolean ascii) {
return getVarchar(offset, size, utf8viewB, ascii);
}
Expand Down