Skip to content

Commit

Permalink
Fix row addressing in spatial index's filter
Browse files Browse the repository at this point in the history
PagesRTreeIndex#isJoinPositionEligible threw 'integer overflow' from
toIntExact(joinAddress) conversion performed before invoking JoinFilterFunction.
joinAddress is a SynteticAddress that encodes pageIndex and position within the
page. toIntExact(joinAddress) succeeds only if pageIndex == 0 and throws
'integer overflow' if pageIndex > 0.

This issue affects spatial joins with additional filters applied to columns from
both sides of the join.

Spatial joins without additional filters are not affected.

The fix is to pass joinPosition (an index within the addresses array) to
JoinFilterFunction instead of the synthetics address.

This commit renames joinAddress to joinPosition where applicable to avoid future
confusion.
  • Loading branch information
mbasmanova committed Mar 20, 2018
1 parent 5671e71 commit 8318235
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 40 deletions.
Expand Up @@ -210,6 +210,7 @@ public void testYield()

RowPagesBuilder buildPages = rowPagesBuilder(ImmutableList.of(GEOMETRY, VARCHAR))
.row(POLYGON_A, "A")
.pageBreak()
.row(POLYGON_B, "B");
PagesSpatialIndexFactory pagesSpatialIndexFactory = buildIndex(driverContext, (build, probe) -> build.contains(probe), Optional.of(filterFunction), buildPages);

Expand Down
Expand Up @@ -22,6 +22,7 @@
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.sql.gen.JoinFilterFunctionCompiler.JoinFilterFunctionFactory;
import io.airlift.slice.Slice;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import org.locationtech.jts.geom.Envelope;
import org.locationtech.jts.index.strtree.STRtree;
Expand All @@ -35,31 +36,31 @@
import static com.facebook.presto.operator.SyntheticAddress.decodePosition;
import static com.facebook.presto.operator.SyntheticAddress.decodeSliceIndex;
import static com.google.common.base.Verify.verify;
import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;

public class PagesRTreeIndex
implements PagesSpatialIndex
{
private static final long[] EMPTY_ADDRESSES = new long[0];
private static final int[] EMPTY_ADDRESSES = new int[0];

private final LongArrayList addresses;
private final List<Type> types;
private final List<Integer> outputChannels;
private final List<List<Block>> channels;
private final STRtree rtree;
private final BiPredicate<OGCGeometry, OGCGeometry> spatialRelationshipTest;
private final JoinFilterFunction filterFunction;

public static final class GeometryWithAddress
public static final class GeometryWithPosition
{
private static final int INSTANCE_SIZE = ClassLayout.parseClass(GeometryWithAddress.class).instanceSize();
private static final int INSTANCE_SIZE = ClassLayout.parseClass(GeometryWithPosition.class).instanceSize();
private final OGCGeometry ogcGeometry;
private final long address;
private final int position;

public GeometryWithAddress(OGCGeometry ogcGeometry, long address)
public GeometryWithPosition(OGCGeometry ogcGeometry, int position)
{
this.ogcGeometry = ogcGeometry;
this.address = address;
this.position = position;
}

public long getEstimatedMemorySizeInBytes()
Expand All @@ -78,6 +79,7 @@ public PagesRTreeIndex(
BiPredicate<OGCGeometry, OGCGeometry> spatialRelationshipTest,
Optional<JoinFilterFunctionFactory> filterFunctionFactory)
{
this.addresses = requireNonNull(addresses, "addresses is null");
this.types = types;
this.outputChannels = outputChannels;
this.channels = requireNonNull(channels, "channels is null");
Expand All @@ -98,11 +100,11 @@ private static Envelope getEnvelope(OGCGeometry ogcGeometry)
* Returns an array of addresses from {@link PagesIndex#valueAddresses} corresponding
* to rows with matching geometries.
* <p>
* The caller is responsible for calling {@link #isJoinAddressEligible(long, int, Page)}
* The caller is responsible for calling {@link #isJoinPositionEligible(int, int, Page)}
* for each of these addresses to apply additional join filters.
*/
@Override
public long[] findJoinAddresses(int probePosition, Page probe, int probeGeometryChannel)
public int[] findJoinPositions(int probePosition, Page probe, int probeGeometryChannel)
{
Block probeGeometryBlock = probe.getBlock(probeGeometryChannel);
if (probeGeometryBlock.isNull(probePosition)) {
Expand All @@ -116,28 +118,29 @@ public long[] findJoinAddresses(int probePosition, Page probe, int probeGeometry
return EMPTY_ADDRESSES;
}

LongArrayList matchingAddresses = new LongArrayList();
IntArrayList matchingPositions = new IntArrayList();

Envelope envelope = getEnvelope(probeGeometry);
rtree.query(envelope, item -> {
GeometryWithAddress geometryWithAddress = (GeometryWithAddress) item;
if (spatialRelationshipTest.test(geometryWithAddress.ogcGeometry, probeGeometry)) {
matchingAddresses.add(geometryWithAddress.address);
GeometryWithPosition geometryWithPosition = (GeometryWithPosition) item;
if (spatialRelationshipTest.test(geometryWithPosition.ogcGeometry, probeGeometry)) {
matchingPositions.add(geometryWithPosition.position);
}
});

return matchingAddresses.toLongArray(null);
return matchingPositions.toIntArray(null);
}

@Override
public boolean isJoinAddressEligible(long joinAddress, int probePosition, Page probe)
public boolean isJoinPositionEligible(int joinPosition, int probePosition, Page probe)
{
return filterFunction == null || filterFunction.filter(toIntExact(joinAddress), probePosition, probe);
return filterFunction == null || filterFunction.filter(joinPosition, probePosition, probe);
}

@Override
public void appendTo(long joinAddress, PageBuilder pageBuilder, int outputChannelOffset)
public void appendTo(int joinPosition, PageBuilder pageBuilder, int outputChannelOffset)
{
long joinAddress = addresses.getLong(joinPosition);
int blockIndex = decodeSliceIndex(joinAddress);
int blockPosition = decodePosition(joinAddress);

Expand Down
Expand Up @@ -18,30 +18,30 @@

public interface PagesSpatialIndex
{
long[] findJoinAddresses(int probePosition, Page probe, int probeGeometryChannel);
int[] findJoinPositions(int probePosition, Page probe, int probeGeometryChannel);

boolean isJoinAddressEligible(long joinPosition, int probePosition, Page probe);
boolean isJoinPositionEligible(int joinPosition, int probePosition, Page probe);

void appendTo(long joinAddress, PageBuilder pageBuilder, int outputChannelOffset);
void appendTo(int joinPosition, PageBuilder pageBuilder, int outputChannelOffset);

PagesSpatialIndex EMPTY_INDEX = new PagesSpatialIndex()
{
private final long[] emptyAddresses = new long[0];
private final int[] emptyAddresses = new int[0];

@Override
public long[] findJoinAddresses(int probePosition, Page probe, int probeGeometryChannel)
public int[] findJoinPositions(int probePosition, Page probe, int probeGeometryChannel)
{
return emptyAddresses;
}

@Override
public boolean isJoinAddressEligible(long joinPosition, int probePosition, Page probe)
public boolean isJoinPositionEligible(int joinPosition, int probePosition, Page probe)
{
throw new UnsupportedOperationException();
}

@Override
public void appendTo(long joinAddress, PageBuilder pageBuilder, int outputChannelOffset)
public void appendTo(int joinPosition, PageBuilder pageBuilder, int outputChannelOffset)
{
throw new UnsupportedOperationException();
}
Expand Down
Expand Up @@ -15,7 +15,7 @@

import com.esri.core.geometry.ogc.OGCGeometry;
import com.facebook.presto.Session;
import com.facebook.presto.operator.PagesRTreeIndex.GeometryWithAddress;
import com.facebook.presto.operator.PagesRTreeIndex.GeometryWithPosition;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.sql.gen.JoinFilterFunctionCompiler;
Expand Down Expand Up @@ -103,7 +103,7 @@ private static STRtree buildRTree(LongArrayList addresses, List<List<Block>> cha
continue;
}

rtree.insert(getEnvelope(ogcGeometry), new GeometryWithAddress(ogcGeometry, pageAddress));
rtree.insert(getEnvelope(ogcGeometry), new GeometryWithPosition(ogcGeometry, position));
}

rtree.build();
Expand All @@ -128,7 +128,7 @@ private long computeMemorySizeInBytes(AbstractNode root)

private long computeMemorySizeInBytes(ItemBoundable item)
{
return ENVELOPE_INSTANCE_SIZE + ((GeometryWithAddress) item.getItem()).getEstimatedMemorySizeInBytes();
return ENVELOPE_INSTANCE_SIZE + ((GeometryWithPosition) item.getItem()).getEstimatedMemorySizeInBytes();
}

// doesn't include memory used by channels and addresses which are shared with PagesIndex
Expand Down
Expand Up @@ -130,8 +130,8 @@ public OperatorFactory duplicate()
// filled up pageBuilder before processing all records in a probe page.
private int probePosition;
@Nullable
private long[] joinAddresses;
private int nextJoinAddressIndex;
private int[] joinPositions;
private int nextJoinPositionIndex;

private boolean finishing;
private boolean finished;
Expand Down Expand Up @@ -180,7 +180,7 @@ public void addInput(Page page)
probe = page;
probePosition = 0;

joinAddresses = null;
joinPositions = null;
}

@Override
Expand Down Expand Up @@ -219,23 +219,23 @@ private void processProbe()
PagesSpatialIndex pagesSpatialIndex = getDone(pagesSpatialIndexFuture);
DriverYieldSignal yieldSignal = operatorContext.getDriverContext().getYieldSignal();
while (probePosition < probe.getPositionCount()) {
if (joinAddresses == null) {
joinAddresses = pagesSpatialIndex.findJoinAddresses(probePosition, probe, probeGeometryChannel);
localUserMemoryContext.setBytes(sizeOf(joinAddresses));
nextJoinAddressIndex = 0;
if (joinPositions == null) {
joinPositions = pagesSpatialIndex.findJoinPositions(probePosition, probe, probeGeometryChannel);
localUserMemoryContext.setBytes(sizeOf(joinPositions));
nextJoinPositionIndex = 0;
if (yieldSignal.isSet()) {
return;
}
}

while (nextJoinAddressIndex < joinAddresses.length) {
while (nextJoinPositionIndex < joinPositions.length) {
if (pageBuilder.isFull()) {
return;
}

long joinAddress = joinAddresses[nextJoinAddressIndex];
int joinPosition = joinPositions[nextJoinPositionIndex];

if (pagesSpatialIndex.isJoinAddressEligible(joinAddress, probePosition, probe)) {
if (pagesSpatialIndex.isJoinPositionEligible(joinPosition, probePosition, probe)) {
pageBuilder.declarePosition();
int outputChannelOffset = 0;
for (int outputIndex : probeOutputChannels) {
Expand All @@ -244,17 +244,17 @@ private void processProbe()
type.appendTo(block, probePosition, pageBuilder.getBlockBuilder(outputChannelOffset));
outputChannelOffset++;
}
pagesSpatialIndex.appendTo(joinAddress, pageBuilder, outputChannelOffset);
pagesSpatialIndex.appendTo(joinPosition, pageBuilder, outputChannelOffset);
}

nextJoinAddressIndex++;
nextJoinPositionIndex++;

if (yieldSignal.isSet()) {
return;
}
}

joinAddresses = null;
joinPositions = null;
localUserMemoryContext.setBytes(0);
probePosition++;
}
Expand Down

0 comments on commit 8318235

Please sign in to comment.