In [1]:
import time
import findspark
findspark.init('D:/Software/Study/Spark/spark-3.1.3-bin-hadoop3.2')
from pyspark.sql import SparkSession
from pyspark import StorageLevel
import geopandas as gpd
import pandas as pd
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from shapely.geometry import Point

from sedona.register import SedonaRegistrator
from sedona.core.SpatialRDD import SpatialRDD
from sedona.core.SpatialRDD import PointRDD
from sedona.core.SpatialRDD import CircleRDD
from sedona.core.enums import FileDataSplitter
from sedona.core.spatialOperator import JoinQuery
from sedona.core.spatialOperator import JoinQueryRaw
from sedona.core.spatialOperator import RangeQuery
from sedona.core.spatialOperator import RangeQueryRaw
from sedona.core.formatMapper.shapefileParser import ShapefileReader
from sedona.sql.types import GeometryType
from sedona.core.enums import GridType
from sedona.core.enums import IndexType
from sedona.core.geom.envelope import Envelope
from sedona.utils import SedonaKryoRegistrator, KryoSerializer

In [2]:
# Use KryoSerializer.getName and SedonaKryoRegistrator.getName class properties to reduce memory impact.
spark = SparkSession.\
    builder.\
    master("local[*]").\
    appName("Sedona App").\
    config("spark.serializer", KryoSerializer.getName).\
    config("spark.kryo.registrator", SedonaKryoRegistrator.getName) .\
    config("spark.jars.packages", "org.apache.sedona:sedona-python-adapter-3.0_2.12:1.1.0-incubating,org.datasyslab:geotools-wrapper:1.1.0-25.2") .\
    getOrCreate()

In [3]:
# Register function is essential for Apache Sedona Core and Apache Sedona SQL. 
# this command will enable you to call the provided APIs
SedonaRegistrator.registerAll(spark)

True

Read the first PointRDD

In [4]:
t0 = time.time()

sc = spark.sparkContext
input_location = "D:/Programming/Python/GEOG770/Project/data/input_vertices_1_pt.csv"
offset = 0  # The point starts from Column 0
splitter = FileDataSplitter.CSV # FileDataSplitter enumeration, this parameter will pass the csv format to SpatialRDD
carry_other_attributes = True  # Carry othrt attributes, Column 2 (hotel, gas, bar...)
level = StorageLevel.MEMORY_ONLY # Storage level from pyspark, this parameter will tell Spark to cache the "rawSpatialRDD"

# create point_rdd
point_rdd = PointRDD(sc, input_location, offset, splitter, carry_other_attributes, StorageLevel.MEMORY_ONLY)
point_rdd.analyze()

print("number of point:", point_rdd.approximateTotalCount)

t1 = time.time()
print("time consumed to create a point rdd:", t1-t0)

number of point: 7
time consumed to create a point rdd: 0.4697861671447754


Read the second PointRDD

In [5]:
input_location_2 = "D:/Programming/Python/GEOG770/Project/data/input_vertices_2_pt.csv"

point_rdd_2 = PointRDD(sc, input_location_2, offset, splitter, carry_other_attributes, StorageLevel.MEMORY_ONLY)
point_rdd_2.analyze()

print("number of point:", point_rdd_2.approximateTotalCount)

number of point: 15


In [39]:
from pyspark.sql.types import DoubleType, StringType

schema = StructType(
    [
        #StructField("point", GeometryType(), True),
        StructField("x", DoubleType(), True),
        StructField("y", DoubleType(), True),
        StructField("ele", DoubleType(), True),
        # the third boolean variable means whether the field can be null (None) or not
    ]
)

In [40]:
pt1 = point_rdd.rawSpatialRDD.map(lambda x: [x.geom.x, x.geom.y, float(x.userData)])
df_pt1 = spark.createDataFrame(pt1, schema, verifySchema=False)

In [41]:
df_pt1.show(5,True)

+-----+-----+----+
|    x|    y| ele|
+-----+-----+----+
| 40.0| 20.0| 4.0|
| 22.0|400.0| 2.0|
| 66.0|200.0| 8.0|
|260.0| 80.0| 1.0|
|200.0|200.0|10.0|
+-----+-----+----+
only showing top 5 rows



In [42]:
pt2 = point_rdd_2.rawSpatialRDD.map(lambda x: [x.geom.x, x.geom.y, float(x.userData)])
df_pt2 = spark.createDataFrame(pt2, schema, verifySchema=False)

In [43]:
df_pt2.show(5,True)

+-----+-----+----+
|    x|    y| ele|
+-----+-----+----+
| 40.0| 20.0|12.0|
| 50.0|400.0|23.0|
| 66.0|200.0|20.0|
|260.0|100.0|18.0|
|200.0|200.0|10.0|
+-----+-----+----+
only showing top 5 rows



In [6]:
from pyspark.sql.types import DoubleType, StringType

schema_geo = StructType(
    [
        StructField("point", GeometryType(), True),
        #StructField("x", DoubleType(), True),
        #StructField("y", DoubleType(), True),
        StructField("ele", DoubleType(), True),
        # the third boolean variable means whether the field can be null (None) or not
    ]
)

Create a DataFrame from a PointRDD

In [8]:
pt1_geo = point_rdd.rawSpatialRDD.map(lambda x: [x.geom, float(x.userData)])
df_pt1_geo = spark.createDataFrame(pt1_geo, schema_geo, verifySchema=False)

In [9]:
df_pt1_geo_col = df_pt1_geo.collect()

In [10]:
type(df_pt1_geo_col[0][0])

shapely.geometry.point.Point

pt1.distance(pt2) to calculate the distance

In [11]:
df_pt1_geo_col[0][0].distance(df_pt1_geo_col[1][0])

380.4260769190251

In [13]:
df_pt1_geo.printSchema()

root
 |-- point: geometry (nullable = true)
 |-- ele: double (nullable = true)



In [15]:
from pyspark.sql.functions import udf, log, lit, first

add a column to a dataframe

In [16]:
df_pt1_geo = df_pt1_geo.withColumn("disRaw2Max", lit(0.1))

In [17]:
df_pt1_geo_col = df_pt1_geo.collect()

In [20]:
df_pt1_geo.show(5)

+---------------+----+----------+
|          point| ele|disRaw2Max|
+---------------+----+----------+
|  POINT (40 20)| 4.0|       0.1|
| POINT (22 400)| 2.0|       0.1|
| POINT (66 200)| 8.0|       0.1|
| POINT (260 80)| 1.0|       0.1|
|POINT (200 200)|10.0|       0.1|
+---------------+----+----------+
only showing top 5 rows



using a UDF when adding a column

In [22]:
@udf
def get_dis(pt1,pt2):
    #dis_temp = x.distance(y)
    dis_temp = pt1.x + pt2.x
    return dis_temp

In [23]:
df_pt1_geo = df_pt1_geo.withColumn("disRaw2Max", lit(get_dis(df_pt1_geo.point, df_pt1_geo.point)))

In [24]:
df_pt1_geo_col = df_pt1_geo.collect()

In [25]:
df_pt1_geo.show()

+---------------+----+----------+
|          point| ele|disRaw2Max|
+---------------+----+----------+
|  POINT (40 20)| 4.0|      80.0|
| POINT (22 400)| 2.0|      44.0|
| POINT (66 200)| 8.0|     132.0|
| POINT (260 80)| 1.0|     520.0|
|POINT (200 200)|10.0|     400.0|
|POINT (300 300)| 5.0|     600.0|
|POINT (400 400)|20.0|     800.0|
+---------------+----+----------+



Convert a Point geometry to double columns

In [26]:
@udf
def get_x(pt1):
    return pt1.x

In [27]:
@udf
def get_y(pt1):
    return pt1.y

In [28]:
df_pt1_geo = df_pt1_geo.withColumn("raw_x",lit(get_x(df_pt1_geo.point)))

In [29]:
df_pt1_geo = df_pt1_geo.withColumn("raw_y",lit(get_y(df_pt1_geo.point)))

In [30]:
df_pt1_geo.printSchema()

root
 |-- point: geometry (nullable = true)
 |-- ele: double (nullable = true)
 |-- disRaw2Max: string (nullable = true)
 |-- raw_x: string (nullable = true)
 |-- raw_y: string (nullable = true)



reorder the order of columns

In [31]:
df_pt1_geo = df_pt1_geo.select("point","raw_x","raw_y","ele","disRaw2Max")

In [32]:
df_pt1_geo.printSchema()

root
 |-- point: geometry (nullable = true)
 |-- raw_x: string (nullable = true)
 |-- raw_y: string (nullable = true)
 |-- ele: double (nullable = true)
 |-- disRaw2Max: string (nullable = true)



drop a column

In [33]:
df_pt1_geo = df_pt1_geo.drop("point")

In [34]:
df_pt1_geo.show()

+-----+-----+----+----------+
|raw_x|raw_y| ele|disRaw2Max|
+-----+-----+----+----------+
| 40.0| 20.0| 4.0|      80.0|
| 22.0|400.0| 2.0|      44.0|
| 66.0|200.0| 8.0|     132.0|
|260.0| 80.0| 1.0|     520.0|
|200.0|200.0|10.0|     400.0|
|300.0|300.0| 5.0|     600.0|
|400.0|400.0|20.0|     800.0|
+-----+-----+----+----------+



In [35]:
df_pt1_geo = df_pt1_geo.withColumn("radius", lit(2.0))

In [36]:
df_pt1_geo.show()

+-----+-----+----+----------+------+
|raw_x|raw_y| ele|disRaw2Max|radius|
+-----+-----+----+----------+------+
| 40.0| 20.0| 4.0|      80.0|   2.0|
| 22.0|400.0| 2.0|      44.0|   2.0|
| 66.0|200.0| 8.0|     132.0|   2.0|
|260.0| 80.0| 1.0|     520.0|   2.0|
|200.0|200.0|10.0|     400.0|   2.0|
|300.0|300.0| 5.0|     600.0|   2.0|
|400.0|400.0|20.0|     800.0|   2.0|
+-----+-----+----+----------+------+



In [107]:
path='D:/Programming/Python/GEOG770/Project/data/folder_3'

output each partition to a same folder path

In [37]:
df_pt1_geo.write.partitionBy("disRaw2Max","radius").mode("append").csv("D:/Programming/Python/GEOG770/Project/data/folder_6")

using repartition to make the same group in one file, otherwise it may be in multiple files

In [38]:
df_pt1_geo.repartition("disRaw2Max","radius").write.partitionBy("disRaw2Max","radius").mode("append").csv("D:/Programming/Python/GEOG770/Project/data/folder_6")

coalesce(n) will assign n computer cores to output the data, if n is 1, there is no parallelism

In [36]:
df_pt1_geo.coalesce(3).write.partitionBy("disRaw2Max","radius").mode("append").csv("D:/Programming/Python/GEOG770/Project/data/folder_7")

In [39]:
df_pt1_geo.printSchema()

root
 |-- raw_x: string (nullable = true)
 |-- raw_y: string (nullable = true)
 |-- ele: double (nullable = true)
 |-- disRaw2Max: string (nullable = true)
 |-- radius: double (nullable = false)



In [83]:
test = df_pt1_geo.select('raw_x','raw_y','ele','disRaw2Max','radius')

In [84]:
test.printSchema()

root
 |-- raw_x: string (nullable = true)
 |-- raw_y: string (nullable = true)
 |-- ele: double (nullable = true)
 |-- disRaw2Max: string (nullable = true)
 |-- radius: double (nullable = false)



In [85]:
@udf
def MergeAsString(pt_x,pt_y):
    str_temp = str(pt_x)+','+str(pt_y)
    #str_temp = str(pt_x)+str(pt_y)
    return str_temp

In [86]:
test = test.withColumn("raw_xy", lit(MergeAsString(test.raw_x, test.raw_y)))

In [87]:
test.printSchema()

root
 |-- raw_x: string (nullable = true)
 |-- raw_y: string (nullable = true)
 |-- ele: double (nullable = true)
 |-- disRaw2Max: string (nullable = true)
 |-- radius: double (nullable = false)
 |-- raw_xy: string (nullable = true)



In [88]:
test = test.drop("raw_x")
test = test.drop("raw_y")

In [89]:
test.printSchema()

root
 |-- ele: double (nullable = true)
 |-- disRaw2Max: string (nullable = true)
 |-- radius: double (nullable = false)
 |-- raw_xy: string (nullable = true)



In [90]:
test.repartition("raw_xy").write.partitionBy("raw_xy").mode("append").csv("D:/Programming/Python/GEOG770/Project/data/folder_7")

In [91]:
test.repartition("ele").write.partitionBy("ele").mode("append").csv("D:/Programming/Python/GEOG770/Project/data/folder_8")