Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experiencing slow spatial joins #83

Closed
micvbang opened this issue Feb 10, 2017 · 16 comments
Closed

Experiencing slow spatial joins #83

micvbang opened this issue Feb 10, 2017 · 16 comments

Comments

@micvbang
Copy link

micvbang commented Feb 10, 2017

Hello,

Reading your paper, it seems to me that you are able to get great performance (on the order of minutes) on a spatial join of 3 million records on each side of the join, on your experimental setup.

I tried writing achieving the same with the following code, over a dataset of just 1 million rows on a large dual CPU E5-2650 v2 @ 2.60GHz (16 cores) machine with 128GB ram. Here, it takes hours.

My data is distributed around the Earth and is using the EPSG3857 projection. On the spark ui, the query seems to go really fast until the last 10-20 "ticks" (it looks like 5601+10/5611), this part taking the vast majority of the time.

Am I doing something wrong?

Also, may I see an example of one of the queries you yourself ran on a large dataset? I have not been able to find one in the example code provided.

from __future__ import print_function

import timeit

from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import Row, StructField, StructType, StringType, LongType, IntegerType, DoubleType


if __name__ == "__main__":
    sc = SparkContext(appName="test_test", master="spark://127.0.0.1:7077")
    sqlContext = SQLContext(sc)

    sqlContext.setConf('spark.sql.joins.distanceJoin', 'DJSpark')

    datafile = lambda f: "file://" + os.path.join(os.environ['SPARK_HOME'], "../../data/", f)

    path = datafile("gdelt_1m.json")
    rdd = sc.textFile(path)

    # Create a DataFrame from the file(s) pointed to by path
    airports = sqlContext.read.json(rdd)

    # Register this DataFrame as a table.
    airports.registerTempTable("input_table")


    sqlContext.sql("CREATE INDEX pidx ON input_table(euc_lon, euc_lat) USE rtree")
    sqlContext.sql("CREATE INDEX pidy ON input_table(id) USE hashmap")

    # SQL statements can be run by using the sql methods provided by sqlContext
    results = sqlContext.sql("""
          SELECT ARRAY(l.id, r.id) ids
          FROM input_table l DISTANCE JOIN input_table r ON POINT(r.euc_lon, r.euc_lat)
          IN CIRCLERANGE(POINT(l.euc_lon, l.euc_lat), 7500)
          WHERE l.id < r.id
    """)

    t1 = timeit.default_timer()
    results = results.collect()
    t2 = timeit.default_timer()
    elapsed = t2 - t1

    print("ELAPSED TIME: %d" % elapsed)
    for result in results[:20]:
        print(result)
    print(len(results))

    sc.stop()

Thanks

@dongx-psu
Copy link
Member

dongx-psu commented Feb 10, 2017 via email

@micvbang
Copy link
Author

Thanks for the fast response!

Alright, I will try without building the index.

What do you mean that something is wrong with the final predicate? It is supposed to be an inequality condition on integers :-)

I am using the 1.6 branch since I started out using that before the current master branch became available.

@dongx-psu
Copy link
Member

dongx-psu commented Feb 10, 2017 via email

@dongx-psu
Copy link
Member

dongx-psu commented Feb 10, 2017 via email

@micvbang
Copy link
Author

micvbang commented Feb 10, 2017

Here is what I'm getting from putting .toDebugString() on my results rdd.

(1184) MapPartitionsRDD[34] at javaToPython at NativeMethodAccessorImpl.java:-2 []
  |    MapPartitionsRDD[33] at javaToPython at NativeMethodAccessorImpl.java:-2 []
  |    MapPartitionsRDD[32] at javaToPython at NativeMethodAccessorImpl.java:-2 []
  |    ZippedPartitionsRDD2[31] at javaToPython at NativeMethodAccessorImpl.java:-2 []
  |    MapPartitionsRDD[27] at javaToPython at NativeMethodAccessorImpl.java:-2 []
  |    ShuffledRDD[26] at javaToPython at NativeMethodAccessorImpl.java:-2 []
  +-(210) MapPartitionsRDD[25] at javaToPython at NativeMethodAccessorImpl.java:-2 []
      |   MapPartitionsRDD[23] at javaToPython at NativeMethodAccessorImpl.java:-2 []
      |   ShuffledRDD[18] at javaToPython at NativeMethodAccessorImpl.java:-2 []
      +-(6) MapPartitionsRDD[15] at javaToPython at NativeMethodAccessorImpl.java:-2 []
         |  MapPartitionsRDD[12] at javaToPython at NativeMethodAccessorImpl.java:-2 []
         |  MapPartitionsRDD[11] at javaToPython at NativeMethodAccessorImpl.java:-2 []
         |  MapPartitionsRDD[8] at javaToPython at NativeMethodAccessorImpl.java:-2 []
         |  MapPartitionsRDD[7] at javaToPython at NativeMethodAccessorImpl.java:-2 []
         |  MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:-2 []
         |  file:///tmp/spark/data/gdelt_1m.json HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:-2 []
  |    MapPartitionsRDD[30] at javaToPython at NativeMethodAccessorImpl.java:-2 []
  |    ShuffledRDD[29] at javaToPython at NativeMethodAccessorImpl.java:-2 []
  +-(210) MapPartitionsRDD[28] at javaToPython at NativeMethodAccessorImpl.java:-2 []
      |   MapPartitionsRDD[24] at javaToPython at NativeMethodAccessorImpl.java:-2 []
      |   ShuffledRDD[22] at javaToPython at NativeMethodAccessorImpl.java:-2 []
      +-(6) MapPartitionsRDD[19] at javaToPython at NativeMethodAccessorImpl.java:-2 []
         |  MapPartitionsRDD[14] at javaToPython at NativeMethodAccessorImpl.java:-2 []
         |  MapPartitionsRDD[13] at javaToPython at NativeMethodAccessorImpl.java:-2 []
         |  MapPartitionsRDD[10] at javaToPython at NativeMethodAccessorImpl.java:-2 []
         |  MapPartitionsRDD[9] at javaToPython at NativeMethodAccessorImpl.java:-2 []
         |  MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:-2 []
         |  file:///tmp/spark/data/gdelt_1m.json HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:-2 []

@dongx-psu
Copy link
Member

dongx-psu commented Feb 10, 2017 via email

@micvbang
Copy link
Author

Ah right, I'm sorry!

== Parsed Logical Plan ==                                                       
'Project [unresolvedalias('ARRAY('l.id,'r.id))]
+- 'Filter ('l.id < 'r.id)
   +- 'Join DistanceJoin, Some( **(pointwrapperexpression('r.euc_lon,'r.euc_lat)) IN CIRCLERANGE (pointwrapperexpression('l.euc_lon,'l.euc_lat)) within  (500)**  )
      :- 'UnresolvedRelation `input_table`, Some(l)
      +- 'UnresolvedRelation `input_table`, Some(r)

== Analyzed Logical Plan ==
_c0: array<string>
Project [array(id#3,id#12) AS _c0#18]
+- Filter (id#3 < id#12)
   +- Join DistanceJoin, Some( **(pointwrapperexpression(euc_lon#11,euc_lat#10)) IN CIRCLERANGE (pointwrapperexpression(euc_lon#2,euc_lat#1)) within  (500)**  )
      :- Subquery l
      :  +- Subquery input_table
      :     +- Relation[date#0,euc_lat#1,euc_lon#2,id#3,merc_lat#4,merc_lon#5,num_articles#6,num_mentions#7,num_sources#8] JSONRelation
      +- Subquery r
         +- Subquery input_table
            +- Relation[date#9,euc_lat#10,euc_lon#11,id#12,merc_lat#13,merc_lon#14,num_articles#15,num_mentions#16,num_sources#17] JSONRelation

== Optimized Logical Plan ==
Project [array(id#3,id#12) AS _c0#18]
+- Filter (id#3 < id#12)
   +- Join DistanceJoin, Some( **(pointwrapperexpression(euc_lon#11,euc_lat#10)) IN CIRCLERANGE (pointwrapperexpression(euc_lon#2,euc_lat#1)) within  (500)**  )
      :- Relation[date#0,euc_lat#1,euc_lon#2,id#3,merc_lat#4,merc_lon#5,num_articles#6,num_mentions#7,num_sources#8] JSONRelation
      +- Relation[date#9,euc_lat#10,euc_lon#11,id#12,merc_lat#13,merc_lon#14,num_articles#15,num_mentions#16,num_sources#17] JSONRelation

== Physical Plan ==
Project [array(id#3,id#12) AS _c0#18]
+- Filter (id#3 < id#12)
   +- DJSpark pointwrapperexpression(euc_lon#2,euc_lat#1), pointwrapperexpression(euc_lon#11,euc_lat#10), 500
      :- ConvertToSafe
      :  +- Scan JSONRelation[date#0,euc_lat#1,euc_lon#2,id#3,merc_lat#4,merc_lon#5,num_articles#6,num_mentions#7,num_sources#8] InputPaths: 
      +- ConvertToSafe
         +- Scan JSONRelation[date#9,euc_lat#10,euc_lon#11,id#12,merc_lat#13,merc_lon#14,num_articles#15,num_mentions#16,num_sources#17] InputPaths:

@dongx-psu
Copy link
Member

dongx-psu commented Feb 10, 2017 via email

@micvbang
Copy link
Author

Yep, I'm still experiencing the same problem.

I tried porting the code to Scala and the same thing happens. Hmm.

object GdeltTest {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("gdelt_test").setMaster("spark://127.0.0.1:7077")
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)

    import sqlContext.implicits._

    println("hey there")
    val df1 = sqlContext.read.json("/tmp/spark/data/gdelt_1m.json")
    df1.registerTempTable("input_table")


    println("----------------------------")

    val sqlQuery = " SELECT ARRAY(l.id, r.id) ids FROM input_table l DISTANCE JOIN input_table r ON POINT(r.euc_lon, r.euc_lat) IN CIRCLERANGE(POINT(l.euc_lon, l.euc_lat), 1000) WHERE l.id < r.id"
    val df = sqlContext.sql(sqlQuery)
    println(df.queryExecution)

    val r = df.collect()
    println(r.length)
    sc.stop()
  }
}

If I go down to using 256k rows from my original dataset of 1m, Spark starts throwing OutOfMemory exceptions after approximately 1 hour.

I have started the Scala-jobs on the machine described in the first post, with the following configuration:

spark.eventLog.enabled           true                 
spark.eventLog.dir               /tmp/spark/eventlog  
spark.driver.memory              30g                  
spark.executor.memory            80g                  
spark.driver.maxResultSize       20g                  
spark.python.worker.memory       1g                   

@micvbang
Copy link
Author

micvbang commented Feb 10, 2017

Huh. I just tried with the OSM dataset, using 700k points located in the UK. With this dataset, I can do the join using python (have not tested Scala yet) in 6 minutes.

I believe that the dataset is making the difference. I'm not yet sure what is causing the slow queries, though.

@micvbang
Copy link
Author

Yep, almost positive the dataset making the difference. Just successfully performed a join on a different dataset of 2.5 million points in 16 minutes.

@micvbang
Copy link
Author

After investigating my dataset, I found that it contained points that were repeated tens of thousands of times. This meant that billions of rows were generated for spatial joins of any query distance, causing the slow queries.

Thank you so much for your help in investigating my problem!

@dongx-psu
Copy link
Member

Yeah, that is the problem exactly I think. GDELT's spatial tag are not exact real coordinates. I think they just search the place name in google map and give a general centroid point of the area. And this is the reason why you got so many duplicates.

Thus, the reason why it is slow is simply because your output size is too big. As a result, there is no way it can be fast.

@micvbang
Copy link
Author

That is exactly the same conclusion that I arrived at!

@micvbang
Copy link
Author

micvbang commented Feb 17, 2017

When you said that I don't have to build indexes for joins, why is that?

Is this because indexes are built on-the-fly for joins?
Or does it have to do with the open issue that the left side of a join is always repartitioned, regardless of there being any indexes on it or not?

@dongx-psu
Copy link
Member

It is a legacy issue, since we need to include partition and index time in the paper results. Thus, it will repartition and build local index all the time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants