Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broadcast spatial join #9474

Merged
merged 2 commits into from
Mar 8, 2018
Merged

Conversation

mbasmanova
Copy link
Contributor

@mbasmanova mbasmanova commented Dec 5, 2017

Implements broadcast spatial join using an R-Tree as described in #9890.

Supports spatial joins defined using ST_Contains and ST_Intersects functions.

The implementation consists of an optimizer rule and two custom operators. The new rule changes a cross join node with a spatial filter on top into a spatial join node which is executed using new operators. SpatialIndexBuilderOperator builds an R-Tree from build side geometries (relation on the right side of the join). SpatialLookupJoinOperator processes probe geometries one record at a time looking up matching geometries in the R-Tree.

For example, the plan for the following query

with 
    points as (select * from (values ('a', 1, 2)) as t(name, lat, lng)),
    polygons as (select * from (values ('p1', 'POLYGON ...')) as t(name, wkt))
select count(1)
from points, polygons
where ST_Contains(ST_GeometryFromText(wkt), ST_Point(lat, lng))

changes from

- FilterProject[filterPredicate = "st_contains"("st_geometryfromtext"(CAST("field_23" AS varchar)), "st_point"(CAST("field_0" AS double), CAST("field_1" AS double)))] => []
    - CrossJoin => [field_0:integer, field_1:integer, field_23:varchar(11)]

to

 - SpatialJoin["st_contains"("st_geometryfromtext", "st_point")] => []                     
     - Project[] => [field_0:integer, field_1:integer, st_point:Geometry]                  
             st_point := "st_point"(CAST("field_0" AS double), CAST("field_1" AS double))  
     - Project[] => [field_23:varchar(11), st_geometryfromtext:Geometry]               
             st_geometryfromtext := "st_geometryfromtext"(CAST("field_23" AS varchar)) 

screen shot 2018-02-28 at 9 30 36 am

Tests for the new functionality include:

  • unit test for the new rule: TestTransformSpatialPredicateToJoin
  • test for the query planner with the new rule: TestSpatialJoinPlanning
  • unit test for the new SpatialJoinOperator: TestSpatialJoinOperator
  • end-to-end test for the spatial join queries: TestSpatialJoins

Known limitations and future work:

  1. Only INNER JOIN is supported.

  2. The new optimizer rule runs after AddExchanges. Moving the rule to some place before AddExchanges requires changing PredicatePushDown rule to become aware of spatial joins and not unroll spatial join into cross join and a filter on top.

  3. The new optimizer rule and the new operators are defined in presto-main, although, logically, they belong to the presto-geospatial plugin. In the future, when plugin interface evolves to support custom rules and operators, the new rule and operators can be moved.

  4. The logic of computing memory size of a geometry object requires access to package private classes from the ESRI library. To gain access, presto-geospatial-toolkit includes a class in com.esri.core.geometry package. This split-package situation will block migration to Java 9 modules and needs to be addressed at some point. Add Geometry::estimateMemorySize() API Esri/geometry-api-java#156

  5. The R-Tree implementation in the JTS library is not GC-friendly as it creates a tree of Java objects. This implementation needs to be replaced.

  6. TODO: Use dictionary style blocks for probe columns (see LookupJoinPageBuilder)

Addressing items 4 and 5 will require changes to external libraries (ESRI and JTS).

@mbasmanova mbasmanova force-pushed the spatial-broadcast-join branch 3 times, most recently from 16812f3 to f0bbfb8 Compare December 5, 2017 02:26
@sopel39 sopel39 requested review from sopel39 and removed request for sopel39 December 5, 2017 09:24
@sopel39 sopel39 requested a review from dain December 13, 2017 19:01
@kokosing
Copy link
Contributor

@mbasmanova Can you please give a short explanation what spatial join is? Just a link coud good enough? Is it SQL standard compliant or is it enhancment to SQL?

@mbasmanova
Copy link
Contributor Author

@kokosing Grzegorz, spatial join combines relations with geometries using spatial relationships contains, intersects or distance. Point-in-polygon join is an example of a spatial join. To express spatial relationship, one can use ST_Contains, ST_Intersects or ST_Distance functions defined in ISO/IEC 13249 SQL/MM Part 3 Spatial: The Standard to Manage Spatial Data in Relational Database Systems - https://www.iso.org/standard/60343.html

SELECT ...
FROM polygons a, points b
WHERE 
    ST_Contains(
        ST_GeometryFromText(a.wkt), 
        ST_Point(b.longitude, b.latitude)
    )

Presto supports a subset of spatial functions from the standard mentioned above: https://prestodb.io/docs/current/functions/geospatial.html

@kokosing
Copy link
Contributor

Thank you. This was a functional description. From the point of view of technical implementation. It seems that the simplest would be to run cross join and evaluate the join condition. Do you follow this or do you have a smarter idea? Do you somehow partition the geospatial space, so then you could run hash join?

@mbasmanova mbasmanova force-pushed the spatial-broadcast-join branch 2 times, most recently from 2a1833b to 8c794e5 Compare December 21, 2017 21:33
@mbasmanova
Copy link
Contributor Author

@kokosing Grzegorz, you are correct. The simplest way is to run cross join and evaluate the join condition. Presto already supports this and no changes are necessary to use this for spatial joins. However, for large datasets this is quite inefficient and takes a very long time. http://www.cs.umd.edu/~hjs/pubs/jacoxtrjoin07.pdf provides a nice overview of the spatial partitioning techniques which make spatial joins a lot more efficient. In the long term, I'd like to implement a spatial join using KDB-tree partitioning of the sources and R-Tree index on individual join nodes similar to how this is done in https://github.com/DataSystemsLab/GeoSpark . In the short term, however, I'm looking to implement (1) broadcast spatial join using R-Tree and (2) tile-based generic spatial join.

(1) is useful for when one side of the join is quite small and can be replicated to all the join nodes; in this case, creating R-Tree from the smaller side and streaming the larger side through it is fast and efficient. This is what this PR is about.

(2) one can convert a spatial join into an equi-join using Bing Tiles (see Bing Tiles section in https://prestodb.io/docs/current/functions/geospatial.html). For example, point-in-polygon join can be converted like so:

spatial join

SELECT ...
FROM polygons a, points b
WHERE 
    ST_Contains(
        ST_GeometryFromText(a.wkt), 
        ST_Point(b.longitude, b.latitude)
    )

an equivalent equi-join:

SELECT ...
FROM
    (
        SELECT *, ST_GeometryFromText(wkt) as geometry
        FROM polygons
        CROSS JOIN UNNEST 
            (geometry_to_bing_tiles(ST_GeometryFromText(wkt), 17)) as t(tile)
     ) a,
    points b,
WHERE a.tile = bing_tile_at(b.latitude, b.longitude, 17)
    AND ST_Contains(a.geometry, ST_Point(b.longitude, b.latitude))    

@findepi
Copy link
Contributor

findepi commented Dec 21, 2017

(2) one can convert a spatial join into an equi-join using Bing Tiles ..

do you plan to have a Rule that does that for the users?

@mbasmanova mbasmanova force-pushed the spatial-broadcast-join branch 2 times, most recently from d910121 to 7837f24 Compare December 22, 2017 03:51
@mbasmanova
Copy link
Contributor Author

@findepi Piotr, I'm still thinking about it, but tentatively I'm planning to write a rule to covert spatial join to tile-based equi-join.

@mbasmanova mbasmanova force-pushed the spatial-broadcast-join branch 5 times, most recently from fb5140c to 9f0a0d5 Compare December 22, 2017 12:55
@kokosing
Copy link
Contributor

@mbasmanova Maria, thank you for explanation. It is really interesting topic.

@mbasmanova mbasmanova force-pushed the spatial-broadcast-join branch 2 times, most recently from a891114 to aa40350 Compare December 22, 2017 18:15
@kokosing
Copy link
Contributor

@mbasmanova I am sorry if I ask dumb questions. I have not read papers spatial related papers, so I might not see some basic issues. Anyway, I wanted to ask have you considered any alternative approaches to support spatial queries efficiently other than modifying join execution code?

Have you had a chance to read: #9585 (comment)?

To me more natural approach with introducing anIndexNode is quite easy to implement and giving more flexibility to handle other similar cases with using indexes like bioinformatic or text searching. It makes possible to make index creation to be extracted to UDF, so user might want to provide their own indexes and the use in other UDF functions (like scalars). I believe that having spatial index next to the build table is going to be possibly the fastest, but might not be the most flexible. What are your thoughts about it?

Here, for reference, I copy-paste the relevant part of the mentioned above comment:


The biggest concern I have is that join execution already is very complicated adding there more logic do not help. Probably you already researched alternative solutions and what you propose is the best one, but let me try with an idea which came to my mind.

Auxiliary questions:

  • what is the R-Tree build cost? O(nlogn)?
  • what is the R-Tree space cost? O(n)?

Do you think that using R-Tree for queries like SELECT ST_Contains(r.a, r.y) FROM (...) r would be beneficial? Considering O(nlong) build index cost, no. What about:

SELECT ST_Contains(r.a1, r.y), ST_Contains(r.a2, r.y), ..., ST_Contains(r.an, r.y) FROM (...) r

A bit closer, but still no. What if number of distinct values of r.y would be low, or if r.y would come from small relation. Then maybe at some point it would be worth... maybe it is potential task for cost-based optimizer.

What if we would introduce Index node. This node would mean that we create an index that would be used later by some expression in order to make them more performant. In this case it will be used for spatial use case, but in general it could be used for other things too like text relatated expression or bioinformatic or ML. The function it is using a kind of aggregation function which takes all the values of given columns and produces single value, but without doing any grouping. Then plan for SELECT ST_Contains(r.a, r.y) as c FROM (...) r would look like:

- Project (ST_contains(r.a, r.y, rtree_index) as c)
  - Index(create_rtree_index(r.y) as rtree_index)
     -  some node which produces r relation

rtree_index column (symbol) would contain same rtree index for all the positions in the block (dictionary block with single value).

Having this:

  • you reuse rtree_index in several places
  • you can make optimizer decide if it is beneficial push down rtree_index creation to different node etc
  • if I have not missed anything it will work for any kind of join out the box
  • notice that could be even helpful to users as they could try to use create_rtree_index manually in queries (storing it if possible, or reusing in single query)
  • it could be used anywhere not only in joins
  • IMO adding new plan node is much easier than modifying join execution
SELECT ... FROM (...) l JOIN (...) r ON ST_Contains(l.x, r.y)

would produce

- join with conditinon ST_Contains(l.x, r.y, r.rtree_index)
  - probe side - l relation
    ...
  - build side - r relation
    - Index (create_rtree_index(y) as rtree_index)
      - ...

@mbasmanova mbasmanova closed this Mar 7, 2018
@mbasmanova mbasmanova reopened this Mar 7, 2018
return NOT_BLOCKED;
}

List<SettableFuture<PagesSpatialIndex>> settableFutures;
synchronized (this) {
this.pagesSpatialIndex = pagesSpatialIndex;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: also verify( this.pagesSpatialIndex == null)

private final List<Type> probeTypes;
private final List<Type> outputTypes;
private final List<Integer> probeOutputChannels;
private final int probeGeometryChannel;
private final PagesSpatialIndexFactory pagesSpatialIndexFactory;

private ListenableFuture<Supplier<PagesSpatialIndex>> pagesSpatialIndexFuture;
private PagesSpatialIndex pagesSpatialIndex;
private ListenableFuture<PagesSpatialIndex> pagesSpatialIndexFuture;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i know you're nulling this out when finishing. Also null this out in close(), in case this operator is closed without finishing.

DriverYieldSignal yieldSignal = operatorContext.getDriverContext().getYieldSignal();
while (probePosition < probe.getPositionCount()) {
if (joinAddresses == null) {
joinAddresses = pagesSpatialIndex.findJoinAddresses(probePosition, probe, probeGeometryChannel);
localUserMemoryContext.setBytes(SizeOf.sizeOfLongArray(joinAddresses.length));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just SizeOf.sizeOf(joinAddresses) would do. + static import

@mbasmanova mbasmanova force-pushed the spatial-broadcast-join branch 3 times, most recently from b4150b9 to b727cb0 Compare March 7, 2018 18:08
@mbasmanova
Copy link
Contributor Author

@findepi @nezihyigitbasi @dain @highker Nezih, thanks for helping me to figure out how to properly account for memory used by List<List<Block>> channels in PagesSpatialIndexSupplier and SimplePagesHashStrategy. I updated the PR accordingly.

Copy link
Contributor

@nezihyigitbasi nezihyigitbasi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comments for size calculations.

// <header-size-of-ImmutableList> +
// <instance-size-of-ObjectArrayList> * <length-of-outer-list> +
// <sum-of-all-Blocks>
return INSTANCE_SIZE + IMMUTABLE_LIST_HEADER_SIZE + OBJECT_ARRAY_LIST_INSTANCE_SIZE * channels.size() +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can format this as below to categorize different types of overhead.

return INSTANCE_SIZE +
        IMMUTABLE_LIST_HEADER_SIZE +
        OBJECT_ARRAY_LIST_INSTANCE_SIZE * channels.size() +
        channels.stream()
            .flatMap(List::stream)
            .mapToLong(Block::getRetainedSizeInBytes)
            .sum();

@@ -77,6 +81,13 @@ public PagesSpatialIndexSupplier(
this.memorySizeInBytes = INSTANCE_SIZE +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can extract a method for this complex instance size calculation, which will simplify the constructor.

Copy link
Contributor

@nezihyigitbasi nezihyigitbasi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few minor comments.

// The line below is the same as newCachedThreadPool(daemonThreadsNamed(...)) except RejectionExecutionHandler.
// RejectionExecutionHandler is set to DiscardPolicy (instead of the default AbortPolicy) here.
// Otherwise, a large number of RejectedExecutionException will flood logging, resulting in Travis failure.
executor = new ThreadPoolExecutor(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executor and scheduledExecutor are shared across the test methods, so I think we should mark this test as single threaded (@Test(singleThreaded = true)). That will also help capping the memory usage and number of active threads, since we create a thread pool with potentially unlimited size in setup.

.row(POINT_W, "w");
OperatorFactory joinOperatorFactory = new SpatialJoinOperatorFactory(2, new PlanNodeId("test"), probePages.getTypes(), Ints.asList(1), 0, pagesSpatialIndexFactory);

// expected
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary comment

.row(POINT_W, "w");
OperatorFactory joinOperatorFactory = new SpatialJoinOperatorFactory(2, new PlanNodeId("test"), probePages.getTypes(), Ints.asList(1), 0, pagesSpatialIndexFactory);

// expected
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary comment

RowPagesBuilder probePages = rowPagesBuilder(ImmutableList.of(GEOMETRY, VARCHAR));
OperatorFactory joinOperatorFactory = new SpatialJoinOperatorFactory(2, new PlanNodeId("test"), probePages.getTypes(), Ints.asList(1), 0, pagesSpatialIndexFactory);

// expected
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary comment


public long getAddress()
{
return address;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be unused. Below you directly access the address field.

if (rtree.isEmpty()) {
return EMPTY_INDEX;
}
else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

else not needed.

if (root.getLevel() == 0) {
return ABSTRACT_NODE_INSTANCE_SIZE + ENVELOPE_INSTANCE_SIZE + root.getChildBoundables().stream().mapToLong(child -> computeMemorySizeInBytes((ItemBoundable) child)).sum();
}
else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

else not needed.

private boolean finishing;
private boolean finished;

public SpatialIndexBuilderOperator(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this class can be private


/**
* Called by {@link SpatialIndexBuilderOperator} to provide a
* {@link Supplier} of spatial indexes for {@link SpatialJoinOperator}S to use.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

{@link SpatialJoinOperator}S -> {@link SpatialJoinOperator}s

int expectedPositions,
PagesIndex.Factory pagesIndexFactory)
{
this.operatorContext = operatorContext;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can add requireNonNull checks here.

@mbasmanova
Copy link
Contributor Author

@nezihyigitbasi Nezih, thank you for review. I made the changes you suggested and updated the PR.

@@ -33,6 +34,9 @@
implements PagesHashStrategy
{
private static final int INSTANCE_SIZE = ClassLayout.parseClass(SimplePagesHashStrategy.class).instanceSize();
private static final int OBJECT_ARRAY_LIST_INSTANCE_SIZE = ClassLayout.parseClass(ObjectArrayList.class).instanceSize();
private static final int IMMUTABLE_LIST_HEADER_SIZE = ClassLayout.parseClass(ImmutableList.class).headerSize();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ImmutableList is abstract, so is this better than parseClass(Object.class).headerSize()? (except for documentation purposes)

(no change requested)

// Each element of the List is an ObjectArrayList<Block>.
// Hence, the memory used by channels is estimated as
// <header-size-of-ImmutableList> +
// <instance-size-of-ObjectArrayList> * <length-of-outer-list> +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still has some % error. Do we need to include ObjectArrayList#a size too?
(imagine: channels is single element, channels[0] is very long list of empty blocks. we would calculate O(1) size, ignoring channels[0].a size)

}

if (finishing && indexNotNeeded.isDone()) {
index.clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add close() method and have index.clear() there too.
Like in #10039

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can for example move

index.clear();
localUserMemoryContext.setBytes(index.getEstimatedSize().toBytes());

to close() and just call it here.

@mbasmanova mbasmanova force-pushed the spatial-broadcast-join branch 2 times, most recently from b97ab2b to 045fb77 Compare March 8, 2018 02:23
return new PagesSpatialIndexSupplier(session, valueAddresses, types, outputChannels, channels, sizeOfChannels(), geometryChannel, spatialRelationshipTest, filterFunctionFactory);
}

private long sizeOfChannels()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we put this logic inside PagesSpatialIndexSupplier given it will take channels anyway.

private long sizeOfChannels()
{
return IMMUTABLE_LIST_HEADER_SIZE +
stream(channels)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, stream is much slower than for. @nezihyigitbasi can provide more context.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@highker this is different than @nezihyigitbasi 's bd06b0b, because AFAIU this is called once in operator's lifetime

@mbasmanova
Copy link
Contributor Author

mbasmanova commented Mar 8, 2018

@highker @nezihyigitbasi @findepi

This still has some % error. Do we need to include ObjectArrayList#a size too?

To fix that I made the following changes:

  • modified PagesSpatialIndexSupplier#getEstimatesSize to not include the size of channels and addresses shared with PagesIndex
  • modified SpatialIndexBuilderOperator to add up sizes of PagesSpatialIndexSupplier and PagesIndex (the existing PagesIndex#calculateEstimatedSize computation seems reasonable)

Hope this works.

@findepi
Copy link
Contributor

findepi commented Mar 8, 2018

modified SpatialIndexBuilderOperator to add up sizes of PagesSpatialIndexSupplier and PagesIndex (the existing PagesIndex#calculateEstimatedSize computation seems reasonable)

But then this is pretty different than in hash join, isn't it?

modified PagesSpatialIndexSupplier#getEstimatesSize to not include the size of channels and addresses shared with PagesIndex

Why not simplify improve PagesSpatialIndexSupplier#getEstimatesSize (assuming an improvement is indeed needed) ?

@mbasmanova
Copy link
Contributor Author

@findepi

Why not simplify improve PagesSpatialIndexSupplier#getEstimatesSize (assuming an improvement is indeed needed) ?

PagesSpatialIndexSupplier receives channels as <List<List<Block>>. To compute the size of channels correctly, it needs to make assumptions about the implementations of outer and inner lists. The hardcoding of ImmutableList for outer list and ObjectArrayList for inner list in the getEstimatedSize is brittle. Changing actual implementations may not trigger automatic update of the computation logic. Hence, I feel keeping computation logic in PagesIndex is more robust.

Furthermore, SpatialIndexBuilderOperator already must know that PagesIndex and PagesSpatialIndexSupplier share data. Without this knowledge it would be incorrect to account for memory of PagesSpatialIndexSupplier and not for PagesIndex.

Hence, changing PagesSpatialIndexSupplier#getEstimatedSize to exclude data shared with PagesIndex and modifying SpatialIndexBuilderOperator to compute the sum of PagesIndex size and PagesSpatialIndexSupplier size appears to be the clearest option.

But then this is pretty different than in hash join, isn't it?

I decided that fixing memory accounting for the hash join is out of scope for the PR. The straightforward way I tried earlier suffers from the issue of having to assume the implementations of outer and inner lists.

Benchmark                           (pointCount)  Mode  Cnt      Score     Error  Units
BenchmarkSpatialJoin.benchmarkJoin            10  avgt   30     38.842 ±   2.867  ms/op
BenchmarkSpatialJoin.benchmarkJoin           100  avgt   30    212.509 ±   9.965  ms/op
BenchmarkSpatialJoin.benchmarkJoin          1000  avgt   30   1937.329 ±  79.988  ms/op
BenchmarkSpatialJoin.benchmarkJoin         10000  avgt   30  18822.191 ± 460.088  ms/op

BenchmarkSpatialJoin.benchmarkUserOptimizedJoin            10  avgt   30   15.621 ± 1.221  ms/op
BenchmarkSpatialJoin.benchmarkUserOptimizedJoin           100  avgt   30   16.939 ± 1.209  ms/op
BenchmarkSpatialJoin.benchmarkUserOptimizedJoin          1000  avgt   30   29.448 ± 1.990  ms/op
BenchmarkSpatialJoin.benchmarkUserOptimizedJoin         10000  avgt   30  102.185 ± 4.111  ms/op
import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
import static java.util.Collections.emptyList;

public class TestSpatialJoinPlanning
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️


public BaseRuleTest(Plugin... plugins)
{
this.plugins = plugins;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ImmutableList.copy()

An optimizer rule to rewrite a cross join with a spatial filter on top into a spatial join and custom operators to execute spatial joins efficiently (broadcast joins only).

For example, the plan for the following query

SELECT ...
FROM points, polygons
WHERE ST_Contains(ST_GeometryFromText(wkt), ST_Point(longitude, latitude))

is rewritten from

- FilterProject[filterPredicate = "st_contains"("st_geometryfromtext"("wkt"), "st_point"("longitude", "latitude"))] => []
    - CrossJoin => [latitude:double, longitude:double, wkt:varchar]

into

- SpatialJoin["st_contains"("st_geometryfromtext", "st_point")] => []
     - ScanProject[table = ...
             st_point := "st_point"("longitude", "latitude")
     - LocalExchange[SINGLE] () => st_geometryfromtext:Geometry
         - RemoteExchange[REPLICATE] => st_geometryfromtext:Geometry
             - Project[] => [st_geometryfromtext:Geometry]
                     st_geometryfromtext := "st_geometryfromtext"("wkt")
                 - ScanFilterProject[table = ...

Benchmark results:

Benchmark                                        (pointCount)  Mode  Cnt   Score   Error  Units
BenchmarkSpatialJoin.benchmarkJoin                         10  avgt   30  15.163 ± 1.610  ms/op
BenchmarkSpatialJoin.benchmarkJoin                        100  avgt   30  13.837 ± 0.919  ms/op
BenchmarkSpatialJoin.benchmarkJoin                       1000  avgt   30  16.205 ± 1.360  ms/op
BenchmarkSpatialJoin.benchmarkJoin                      10000  avgt   30  22.915 ± 1.731  ms/op
BenchmarkSpatialJoin.benchmarkUserOptimizedJoin            10  avgt   30  14.426 ± 1.048  ms/op
BenchmarkSpatialJoin.benchmarkUserOptimizedJoin           100  avgt   30  14.507 ± 0.518  ms/op
BenchmarkSpatialJoin.benchmarkUserOptimizedJoin          1000  avgt   30  16.265 ± 1.447  ms/op
BenchmarkSpatialJoin.benchmarkUserOptimizedJoin         10000  avgt   30  22.076 ± 1.547  ms/op
@mbasmanova mbasmanova merged commit fddf5d8 into prestodb:master Mar 8, 2018
@kokosing
Copy link
Contributor

kokosing commented Mar 8, 2018

Congrats!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

8 participants