Skip to content

Commit

Permalink
ESQL: Begin optimizing Block#lookup (elastic#108482)
Browse files Browse the repository at this point in the history
This creates the infrastructure to allow optimizing the `lookup` method
when applied to `Vector`s and then implements that optimization for
constant vectors. Constant vectors now take one of six paths:
1. An empty positions `Block` yields an empty result set.
2. If `positions` is a `Block`, perform the un-optimized lookup.
3. If the `min` of the `positions` *Vector* is less that 0 then throw an
   exception.
4. If the `min` of the positions Vector is greater than the number of
   positions in the lookup block then return a single
   `ConstantNullBlock` because you are looking up outside the range.
5. If the `max` of the positions Vector is less than the number of
   positions in the lookup block then return a `Constant$Type$Block`
   with the same value as the lookup block. This is a lookup that's
   entirely within range.
6. Otherwise return the unoptimized lookup.

This is *fairly* simple but demonstrates how we can plug in more complex
optimizations later.
  • Loading branch information
nik9000 committed May 10, 2024
1 parent 2d14095 commit 04d3b99
Show file tree
Hide file tree
Showing 37 changed files with 431 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,30 @@ public String toString() {

};
}

/**
* Returns an empty iterator over the supplied value.
*/
static <T extends Releasable> ReleasableIterator<T> empty() {
return new ReleasableIterator<>() {
@Override
public boolean hasNext() {
return false;
}

@Override
public T next() {
assert false : "hasNext is always false so next should never be called";
return null;
}

@Override
public void close() {}

@Override
public String toString() {
return "ReleasableIterator[<empty>]";
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;

import java.io.IOException;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -91,6 +93,11 @@ public BooleanVector filter(int... positions) {
}
}

@Override
public ReleasableIterator<BooleanBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
return new BooleanLookup(asBlock(), positions, targetBlockSize);
}

public static long ramBytesEstimated(boolean[] values) {
return BASE_RAM_BYTES_USED + RamUsageEstimator.sizeOf(values);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BitArray;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.ReleasableIterator;

import java.io.IOException;

Expand Down Expand Up @@ -87,6 +89,11 @@ public BooleanVector filter(int... positions) {
return new BooleanBigArrayVector(filtered, positions.length, blockFactory);
}

@Override
public ReleasableIterator<BooleanBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
return new BooleanLookup(asBlock(), positions, targetBlockSize);
}

@Override
public void closeInternal() {
// The circuit breaker that tracks the values {@link BitArray} is adjusted outside
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;

import java.io.IOException;

Expand All @@ -27,6 +29,9 @@ public sealed interface BooleanVector extends Vector permits ConstantBooleanVect
@Override
BooleanVector filter(int... positions);

@Override
ReleasableIterator<? extends BooleanBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize);

/**
* Compares the given object with this vector for equality. Returns {@code true} if and only if the
* given object is a BooleanVector, and both vectors are {@link #equals(BooleanVector, BooleanVector) equal}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,8 @@ public BooleanBlock filter(int... positions) {
}

@Override
public ReleasableIterator<BooleanBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
// TODO optimizations
return new BooleanLookup(this, positions, targetBlockSize);
public ReleasableIterator<? extends BooleanBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
return vector.lookup(positions, targetBlockSize);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BytesRefArray;
import org.elasticsearch.core.ReleasableIterator;
import org.elasticsearch.core.Releasables;

import java.io.IOException;
Expand Down Expand Up @@ -91,6 +93,11 @@ public BytesRefVector filter(int... positions) {
}
}

@Override
public ReleasableIterator<BytesRefBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
return new BytesRefLookup(asBlock(), positions, targetBlockSize);
}

public static long ramBytesEstimated(BytesRefArray values) {
return BASE_RAM_BYTES_USED + RamUsageEstimator.sizeOf(values);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;

import java.io.IOException;

Expand All @@ -34,6 +36,9 @@ public sealed interface BytesRefVector extends Vector permits ConstantBytesRefVe
@Override
BytesRefVector filter(int... positions);

@Override
ReleasableIterator<? extends BytesRefBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize);

/**
* Compares the given object with this vector for equality. Returns {@code true} if and only if the
* given object is a BytesRefVector, and both vectors are {@link #equals(BytesRefVector, BytesRefVector) equal}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,8 @@ public BytesRefBlock filter(int... positions) {
}

@Override
public ReleasableIterator<BytesRefBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
// TODO optimizations
return new BytesRefLookup(this, positions, targetBlockSize);
public ReleasableIterator<? extends BytesRefBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
return vector.lookup(positions, targetBlockSize);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
package org.elasticsearch.compute.data;

import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;

/**
* Vector implementation that stores a constant boolean value.
Expand Down Expand Up @@ -39,6 +41,28 @@ public BooleanVector filter(int... positions) {
return blockFactory().newConstantBooleanVector(value, positions.length);
}

@Override
public ReleasableIterator<BooleanBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
if (positions.getPositionCount() == 0) {
return ReleasableIterator.empty();
}
IntVector positionsVector = positions.asVector();
if (positionsVector == null) {
return new BooleanLookup(asBlock(), positions, targetBlockSize);
}
int min = positionsVector.min();
if (min < 0) {
throw new IllegalArgumentException("invalid position [" + min + "]");
}
if (min > getPositionCount()) {
return ReleasableIterator.single((BooleanBlock) positions.blockFactory().newConstantNullBlock(positions.getPositionCount()));
}
if (positionsVector.max() < getPositionCount()) {
return ReleasableIterator.single(positions.blockFactory().newConstantBooleanBlockWith(value, positions.getPositionCount()));
}
return new BooleanLookup(asBlock(), positions, targetBlockSize);
}

@Override
public ElementType elementType() {
return ElementType.BOOLEAN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;

/**
* Vector implementation that stores a constant BytesRef value.
Expand Down Expand Up @@ -45,6 +47,28 @@ public BytesRefVector filter(int... positions) {
return blockFactory().newConstantBytesRefVector(value, positions.length);
}

@Override
public ReleasableIterator<BytesRefBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
if (positions.getPositionCount() == 0) {
return ReleasableIterator.empty();
}
IntVector positionsVector = positions.asVector();
if (positionsVector == null) {
return new BytesRefLookup(asBlock(), positions, targetBlockSize);
}
int min = positionsVector.min();
if (min < 0) {
throw new IllegalArgumentException("invalid position [" + min + "]");
}
if (min > getPositionCount()) {
return ReleasableIterator.single((BytesRefBlock) positions.blockFactory().newConstantNullBlock(positions.getPositionCount()));
}
if (positionsVector.max() < getPositionCount()) {
return ReleasableIterator.single(positions.blockFactory().newConstantBytesRefBlockWith(value, positions.getPositionCount()));
}
return new BytesRefLookup(asBlock(), positions, targetBlockSize);
}

@Override
public ElementType elementType() {
return ElementType.BYTES_REF;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
package org.elasticsearch.compute.data;

import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;

/**
* Vector implementation that stores a constant double value.
Expand Down Expand Up @@ -39,6 +41,28 @@ public DoubleVector filter(int... positions) {
return blockFactory().newConstantDoubleVector(value, positions.length);
}

@Override
public ReleasableIterator<DoubleBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
if (positions.getPositionCount() == 0) {
return ReleasableIterator.empty();
}
IntVector positionsVector = positions.asVector();
if (positionsVector == null) {
return new DoubleLookup(asBlock(), positions, targetBlockSize);
}
int min = positionsVector.min();
if (min < 0) {
throw new IllegalArgumentException("invalid position [" + min + "]");
}
if (min > getPositionCount()) {
return ReleasableIterator.single((DoubleBlock) positions.blockFactory().newConstantNullBlock(positions.getPositionCount()));
}
if (positionsVector.max() < getPositionCount()) {
return ReleasableIterator.single(positions.blockFactory().newConstantDoubleBlockWith(value, positions.getPositionCount()));
}
return new DoubleLookup(asBlock(), positions, targetBlockSize);
}

@Override
public ElementType elementType() {
return ElementType.DOUBLE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
package org.elasticsearch.compute.data;

import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;

/**
* Vector implementation that stores a constant int value.
Expand Down Expand Up @@ -39,6 +41,28 @@ public IntVector filter(int... positions) {
return blockFactory().newConstantIntVector(value, positions.length);
}

@Override
public ReleasableIterator<IntBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
if (positions.getPositionCount() == 0) {
return ReleasableIterator.empty();
}
IntVector positionsVector = positions.asVector();
if (positionsVector == null) {
return new IntLookup(asBlock(), positions, targetBlockSize);
}
int min = positionsVector.min();
if (min < 0) {
throw new IllegalArgumentException("invalid position [" + min + "]");
}
if (min > getPositionCount()) {
return ReleasableIterator.single((IntBlock) positions.blockFactory().newConstantNullBlock(positions.getPositionCount()));
}
if (positionsVector.max() < getPositionCount()) {
return ReleasableIterator.single(positions.blockFactory().newConstantIntBlockWith(value, positions.getPositionCount()));
}
return new IntLookup(asBlock(), positions, targetBlockSize);
}

/**
* The minimum value in the block.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
package org.elasticsearch.compute.data;

import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;

/**
* Vector implementation that stores a constant long value.
Expand Down Expand Up @@ -39,6 +41,28 @@ public LongVector filter(int... positions) {
return blockFactory().newConstantLongVector(value, positions.length);
}

@Override
public ReleasableIterator<LongBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
if (positions.getPositionCount() == 0) {
return ReleasableIterator.empty();
}
IntVector positionsVector = positions.asVector();
if (positionsVector == null) {
return new LongLookup(asBlock(), positions, targetBlockSize);
}
int min = positionsVector.min();
if (min < 0) {
throw new IllegalArgumentException("invalid position [" + min + "]");
}
if (min > getPositionCount()) {
return ReleasableIterator.single((LongBlock) positions.blockFactory().newConstantNullBlock(positions.getPositionCount()));
}
if (positionsVector.max() < getPositionCount()) {
return ReleasableIterator.single(positions.blockFactory().newConstantLongBlockWith(value, positions.getPositionCount()));
}
return new LongLookup(asBlock(), positions, targetBlockSize);
}

@Override
public ElementType elementType() {
return ElementType.LONG;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;

import java.io.IOException;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -90,6 +92,11 @@ public DoubleVector filter(int... positions) {
}
}

@Override
public ReleasableIterator<DoubleBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
return new DoubleLookup(asBlock(), positions, targetBlockSize);
}

public static long ramBytesEstimated(double[] values) {
return BASE_RAM_BYTES_USED + RamUsageEstimator.sizeOf(values);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.DoubleArray;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.ReleasableIterator;

import java.io.IOException;

Expand Down Expand Up @@ -86,6 +88,11 @@ public DoubleVector filter(int... positions) {
return new DoubleBigArrayVector(filtered, positions.length, blockFactory);
}

@Override
public ReleasableIterator<DoubleBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
return new DoubleLookup(asBlock(), positions, targetBlockSize);
}

@Override
public void closeInternal() {
// The circuit breaker that tracks the values {@link DoubleArray} is adjusted outside
Expand Down

0 comments on commit 04d3b99

Please sign in to comment.