In [None]:
from pyspark.sql import SparkSession
from pyspark import StorageLevel
import geopandas as gpd
import pandas as pd
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import LongType
from shapely.geometry import Point
from shapely.geometry import Polygon

from sedona.register import SedonaRegistrator
from sedona.core.SpatialRDD import SpatialRDD
from sedona.core.SpatialRDD import PointRDD
from sedona.core.SpatialRDD import PolygonRDD
from sedona.core.SpatialRDD import LineStringRDD
from sedona.core.enums import FileDataSplitter
from sedona.utils.adapter import Adapter
from sedona.core.spatialOperator import KNNQuery
from sedona.core.spatialOperator import JoinQuery
from sedona.core.spatialOperator import JoinQueryRaw
from sedona.core.spatialOperator import RangeQuery
from sedona.core.spatialOperator import RangeQueryRaw
from sedona.core.formatMapper.shapefileParser import ShapefileReader
from sedona.core.formatMapper import WkbReader
from sedona.core.formatMapper import WktReader
from sedona.core.formatMapper import GeoJsonReader
from sedona.sql.types import GeometryType
from sedona.core.enums import GridType
from sedona.core.SpatialRDD import RectangleRDD
from sedona.core.enums import IndexType
from sedona.core.geom.envelope import Envelope
from sedona.utils import SedonaKryoRegistrator, KryoSerializer

In [None]:
spark = SparkSession\
    .builder\
    .master('yarn')\
    .config('spark.yarn.am.memory','7g')\
    .config('spark.yarn.am.cores','4')\
    .config('spark.executor.memory','7g')\
    .config('spark.executor.cores','4')\
    .config('spark.executor.instances','11')\
    .appName("SedonaDemo")\
    .config("spark.serializer", KryoSerializer.getName)\
    .config("spark.kryo.registrator", SedonaKryoRegistrator.getName)\
    .config("spark.jars.packages", "org.apache.sedona:sedona-python-adapter-3.0_2.12:1.2.0-incubating,org.datasyslab:geotools-wrapper:1.1.0-25.2")\
    .getOrCreate()

In [None]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)
#spark.conf.set("spark.kryoserializer.buffer.max.mb", "4096")

In [None]:
import pyspark.pandas as ps
ps.set_option("compute.default_index_type", "distributed")

In [None]:
sc = spark.sparkContext

In [None]:
SedonaRegistrator.registerAll(spark)

#### 房屋環域圖層

In [None]:
from concurrent.futures import ThreadPoolExecutor
from time import sleep, perf_counter

house_layer = {}

def house_buffer(city_name,file):
    print(f'Starting the buffer {file}_layer')
    house_rdd = ShapefileReader.readToGeometryRDD(sc, f"data/shp/{city_name}/{file}")
    house_rdd.analyze()
    house_rdd.spatialPartitioning(GridType.KDBTREE)
    print(f'Finished the buffer {file}_layer')
    return house_rdd


with ThreadPoolExecutor() as excutor:
    house_rdd_250_futures = excutor.submit(house_buffer,'NTPC',250)
    house_rdd_500_futures = excutor.submit(house_buffer,'NTPC',500)
    house_rdd_750_futures = excutor.submit(house_buffer,'NTPC',750)
    
house_layer['house_rdd_250'] = house_rdd_250_futures.result()
house_layer['house_rdd_500'] = house_rdd_500_futures.result()
house_layer['house_rdd_750'] = house_rdd_750_futures.result()

In [None]:
print(house_layer)

#### 設施圖層

In [None]:
def import_facilities_layer(facility):
    layer_rdd = ShapefileReader.readToGeometryRDD(sc, f"data/shp/{facility}")
    layer_rdd.analyze()
    layer_rdd.spatialPartitioning(GridType.KDBTREE)
    return layer_rdd

layers = {}

with ThreadPoolExecutor() as excutor:
    layers['cemetery_rdd_futures'] = excutor.submit(import_facilities_layer,'cemetery')
    layers['park_rdd_futures'] = excutor.submit(import_facilities_layer,'park')
    layers['parking_rdd_futures'] = excutor.submit(import_facilities_layer,'parking')
    layers['riverTW_rdd_futures'] = excutor.submit(import_facilities_layer,'river_TW')
    layers['LRT_rdd_futures'] = excutor.submit(import_facilities_layer,'LRT')
    layers['MRT_rdd_futures'] = excutor.submit(import_facilities_layer,'MRT')
    layers['TRA_rdd_futures'] = excutor.submit(import_facilities_layer,'TRA')
    layers['busstop_rdd_futures'] = excutor.submit(import_facilities_layer,'busstop')
    layers['clinic_rdd_futures'] = excutor.submit(import_facilities_layer,'clinic')
    layers['conveniencestore_rdd_futures'] = excutor.submit(import_facilities_layer,'conveniencestore')
    layers['dentist_rdd_futures'] = excutor.submit(import_facilities_layer,'dentist')
    layers['fastfood_rdd_futures'] = excutor.submit(import_facilities_layer,'fastfood')
    layers['firestation_rdd_futures'] = excutor.submit(import_facilities_layer,'firestation')
    layers['fuel_rdd_futures'] = excutor.submit(import_facilities_layer,'fuel')
    layers['hospital_rdd_futures'] = excutor.submit(import_facilities_layer,'hospital')
    layers['library_rdd_futures'] = excutor.submit(import_facilities_layer,'library')
    layers['market_rdd_futures'] = excutor.submit(import_facilities_layer,'market')
    layers['pharmacy_rdd_futures'] = excutor.submit(import_facilities_layer,'pharmacy')
    layers['placeofworkship_rdd_futures'] = excutor.submit(import_facilities_layer,'placeofworkship')
    layers['police_rdd_futures'] = excutor.submit(import_facilities_layer,'police')
    layers['school_rdd_futures'] = excutor.submit(import_facilities_layer,'school')
    layers['universuty_rdd_futures'] = excutor.submit(import_facilities_layer,'university')

layers_rdd = {}
for i, (k, v) in enumerate(layers.items()):
    layers_rdd[k] = v.result()

In [None]:
print(layers_rdd)

#### 環域分析

In [None]:
from pyspark.sql.types import DoubleType
from pyspark.sql.types import IntegerType

def insect_area_layer(layer_rdd, house_rdd):
    # partitioning the data
    layer_rdd2 = layer_rdd
    house_rdd2 = house_rdd
    layer_rdd2.spatialPartitioning(GridType.KDBTREE)
    house_rdd2.spatialPartitioning(layer_rdd2.getPartitioner())
    # building an index
    layer_rdd2.buildIndex(IndexType.RTREE, True)
    
    result = JoinQuery.SpatialJoinQueryFlat(layer_rdd2, house_rdd2, False, True)
    result2 = result.map(lambda x: [x[0].getUserData(), x[1].geom.area])
    
    schema = StructType([
      StructField("IDX", StringType(), False),
      StructField("Area", DoubleType(), False)
    ])
    
    result_spark = spark.createDataFrame(result2, schema, verifySchema=False)
    result_spark = result_spark.withColumn("IDX_int", result_spark["IDX"].cast(IntegerType()))
    result_spark = result_spark.drop("IDX")

    result_group_area = result_spark.groupBy("IDX_int").sum()
    result_group_area = result_group_area.drop("sum(IDX_int)")
    #for_group_result = result_group_area.toPandas()
    return result_group_area
    #return for_group_result

def insect_point_layer(layer_rdd, house_rdd):
    # partitioning the data
    layer_rdd2 = layer_rdd
    house_rdd2 = house_rdd
    layer_rdd2.spatialPartitioning(GridType.KDBTREE)
    house_rdd2.spatialPartitioning(layer_rdd2.getPartitioner())
    # building an index
    layer_rdd2.buildIndex(IndexType.RTREE, True)
    
    result = JoinQuery.SpatialJoinQueryFlat(layer_rdd2, house_rdd2, False, True)
    result2 = result.map(lambda x: [x[0].getUserData()])
    
    schema = StructType([
      StructField("IDX", StringType(), False)
    ])
    
    result_spark = spark.createDataFrame(result2, schema, verifySchema=False)
    result_spark = result_spark.withColumn("IDX_int", result_spark["IDX"].cast(IntegerType()))
    result_spark = result_spark.drop("IDX")

    result_group_count = result_spark.groupBy("IDX_int").count()
    #for_group_result = result_group_area.toPandas()
    return result_group_count
    #return for_group_result

In [None]:
insection_layer = {}
col = 1
for house_idx, (housr_layer_name, house_layer_obj) in enumerate(house_layer.items()):
    for i, (k, v) in enumerate(layers_rdd.items()):
        if i < 4:
            insection_layer[str(k)+str(250*(house_idx+1))] = insect_area_layer(v, house_layer_obj)
        else:
            insection_layer[str(k)+str(250*(house_idx+1))] = insect_point_layer(v, house_layer_obj)
print(insection_layer)

In [None]:
df = insection_layer['cemetery_rdd_futures250'].withColumnRenamed('sum(area)', 'cemetery_rdd_futures250')
for i, (k, v) in enumerate(insection_layer.items()):
    if i == 0:
        continue
    elif i > 0 and i < 4:
        df = df.join(v.withColumnRenamed('sum(area)', str(k)),["IDX_int"] , 'outer')
    elif i > 21 and i < 26:
        df = df.join(v.withColumnRenamed('sum(area)', str(k)),["IDX_int"] , 'outer')
    elif i > 43 and i < 48:
        df = df.join(v.withColumnRenamed('sum(area)', str(k)),["IDX_int"] , 'outer')
    else:
        df = df.join(v.withColumnRenamed('count', str(k)),["IDX_int"] , 'outer')

In [None]:
print(df)

In [None]:
result_pdf = df.select("*").toPandas()

In [None]:
# df = cemetery_insect_250.withColumnRenamed('sum(area)', '1').withColumnRenamed('sum(idx_int)', '2') \
# .join(cemetery_insect_500.withColumnRenamed('sum(area)', '3').withColumnRenamed('sum(idx_int)', '4'),["IDX_int"],'outer') \
# .join(cemetery_insect_750.withColumnRenamed('sum(area)', '5').withColumnRenamed('sum(idx_int)', '6'),["IDX_int"],'outer') \
# .join(park_insect_250.withColumnRenamed('sum(area)', '7').withColumnRenamed('sum(idx_int)', '8'),["IDX_int"],'outer') \
# .join(park_insect_500.withColumnRenamed('sum(area)', '9').withColumnRenamed('sum(idx_int)', '10'),["IDX_int"],'outer') \
# .join(park_insect_750.withColumnRenamed('sum(area)', '11').withColumnRenamed('sum(idx_int)', '12'),["IDX_int"],'outer') \
# .join(parking_insect_250.withColumnRenamed('sum(area)', '13').withColumnRenamed('sum(idx_int)', '14'),["IDX_int"],'outer') \
# .join(parking_insect_500.withColumnRenamed('sum(area)', '15').withColumnRenamed('sum(idx_int)', '16'),["IDX_int"],'outer') \
# .join(parking_insect_750.withColumnRenamed('sum(area)', '17').withColumnRenamed('sum(idx_int)', '18'),["IDX_int"],'outer') \
# .join(riverTW_insect_250.withColumnRenamed('sum(area)', '19').withColumnRenamed('sum(idx_int)', '20'),["IDX_int"],'outer') \
# .join(riverTW_insect_500.withColumnRenamed('sum(area)', '21').withColumnRenamed('sum(idx_int)', '22'),["IDX_int"],'outer') \
# .join(riverTW_insect_750.withColumnRenamed('sum(area)', '23').withColumnRenamed('sum(idx_int)', '24'),["IDX_int"],'outer') \
# .join(LRT_insect_250.withColumnRenamed('count', '25'),["IDX_int"],'outer') \
# .join(LRT_insect_500.withColumnRenamed('count', '26'),["IDX_int"],'outer') \
# .join(LRT_insect_750.withColumnRenamed('count', '27'),["IDX_int"],'outer') \
# .join(MRT_insect_250.withColumnRenamed('count', '28'),["IDX_int"],'outer') \
# .join(MRT_insect_500.withColumnRenamed('count', '29'),["IDX_int"],'outer') \
# .join(MRT_insect_750.withColumnRenamed('count', '30'),["IDX_int"],'outer') \
# .join(TRA_insect_250.withColumnRenamed('count', '31'),["IDX_int"],'outer') \
# .join(TRA_insect_500.withColumnRenamed('count', '32'),["IDX_int"],'outer') \
# .join(TRA_insect_750.withColumnRenamed('count', '33'),["IDX_int"],'outer') \
# .join(busstop_insect_250.withColumnRenamed('count', '34'),["IDX_int"],'outer') \
# .join(busstop_insect_500.withColumnRenamed('count', '35'),["IDX_int"],'outer') \
# .join(busstop_insect_750.withColumnRenamed('count', '36'),["IDX_int"],'outer') \
# .join(clinic_insect_250.withColumnRenamed('count', '37'),["IDX_int"],'outer') \
# .join(clinic_insect_500.withColumnRenamed('count', '38'),["IDX_int"],'outer') \
# .join(clinic_insect_750.withColumnRenamed('count', '39'),["IDX_int"],'outer') \
# .join(conveniencestore_insect_250.withColumnRenamed('count', '40'),["IDX_int"],'outer') \
# .join(conveniencestore_insect_500.withColumnRenamed('count', '41'),["IDX_int"],'outer') \
# .join(conveniencestore_insect_750.withColumnRenamed('count', '42'),["IDX_int"],'outer') \
# .join(dentist_insect_250.withColumnRenamed('count', '43'),["IDX_int"],'outer') \
# .join(dentist_insect_500.withColumnRenamed('count', '44'),["IDX_int"],'outer') \
# .join(dentist_insect_750.withColumnRenamed('count', '45'),["IDX_int"],'outer') \
# .join(fastfood_insect_250.withColumnRenamed('count', '46'),["IDX_int"],'outer') \
# .join(fastfood_insect_500.withColumnRenamed('count', '47'),["IDX_int"],'outer') \
# .join(fastfood_insect_750.withColumnRenamed('count', '48'),["IDX_int"],'outer') \
# .join(firestation_insect_250.withColumnRenamed('count', '49'),["IDX_int"],'outer') \
# .join(firestation_insect_500.withColumnRenamed('count', '50'),["IDX_int"],'outer') \
# .join(firestation_insect_750.withColumnRenamed('count', '51'),["IDX_int"],'outer') \
# .join(fuel_insect_250.withColumnRenamed('count', '52'),["IDX_int"],'outer') \
# .join(fuel_insect_500.withColumnRenamed('count', '53'),["IDX_int"],'outer') \
# .join(fuel_insect_750.withColumnRenamed('count', '54'),["IDX_int"],'outer') \
# .join(hospital_insect_250.withColumnRenamed('count', '55'),["IDX_int"],'outer') \
# .join(hospital_insect_500.withColumnRenamed('count', '56'),["IDX_int"],'outer') \
# .join(hospital_insect_750.withColumnRenamed('count', '57'),["IDX_int"],'outer') \
# .join(school_insect_250.withColumnRenamed('count', '58'),["IDX_int"],'outer') \
# .join(school_insect_500.withColumnRenamed('count', '59'),["IDX_int"],'outer') \
# .join(school_insect_750.withColumnRenamed('count', '60'),["IDX_int"],'outer') \
# .join(university_insect_250.withColumnRenamed('count', '61'),["IDX_int"],'outer') \
# .join(university_insect_500.withColumnRenamed('count', '62'),["IDX_int"],'outer') \
# .join(university_insect_750.withColumnRenamed('count', '63'),["IDX_int"],'outer') \
# .join(library_insect_250.withColumnRenamed('count', '64'),["IDX_int"],'outer') \
# .join(library_insect_500.withColumnRenamed('count', '65'),["IDX_int"],'outer') \
# .join(library_insect_750.withColumnRenamed('count', '66'),["IDX_int"],'outer') \
# .join(market_insect_250.withColumnRenamed('count', '67'),["IDX_int"],'outer') \
# .join(market_insect_500.withColumnRenamed('count', '68'),["IDX_int"],'outer') \
# .join(market_insect_750.withColumnRenamed('count', '69'),["IDX_int"],'outer') \
# .join(pharmacy_insect_250.withColumnRenamed('count', '70'),["IDX_int"],'outer') \
# .join(pharmacy_insect_500.withColumnRenamed('count', '71'),["IDX_int"],'outer') \
# .join(pharmacy_insect_750.withColumnRenamed('count', '72'),["IDX_int"],'outer') \
# .join(placeofworkship_insect_250.withColumnRenamed('count', '73'),["IDX_int"],'outer') \
# .join(placeofworkship_insect_500.withColumnRenamed('count', '74'),["IDX_int"],'outer') \
# .join(placeofworkship_insect_750.withColumnRenamed('count', '75'),["IDX_int"],'outer') \
# .join(police_insect_250.withColumnRenamed('count', '76'),["IDX_int"],'outer') \
# .join(police_insect_500.withColumnRenamed('count', '77'),["IDX_int"],'outer') \
# .join(police_insect_750.withColumnRenamed('count', '78'),["IDX_int"],'outer') \
# .collect()
# .write.csv('data/sedona_df.csv')

In [None]:
# spark_df.shape

In [None]:
# spark_df

In [None]:
# def export_pandas(layer):
#     # partitioning the data
#     # print(f'Starting the export dataframe_{layer}')
#     layer = layer.collent()
#     for_group_result = layer.toPandas()
#     # print(f'Finished the export dataframe_{layer}')
#     return for_group_result

In [None]:
# with ThreadPoolExecutor(max_workers=11) as excutor:
#     cemetery_250 = excutor.submit(export_pandas,cemetery_insect_250)
#     cemetery_500 = excutor.submit(export_pandas,cemetery_insect_500)
#     cemetery_750 = excutor.submit(export_pandas,cemetery_insect_750)
    
#     park_250 = excutor.submit(export_pandas,park_insect_250)
#     park_500 = excutor.submit(export_pandas,park_insect_500)
#     park_750 = excutor.submit(export_pandas,park_insect_750)
    
#     parking_250 = excutor.submit(export_pandas,parking_insect_250)
#     parking_500 = excutor.submit(export_pandas,parking_insect_500)
#     parking_750 = excutor.submit(export_pandas,parking_insect_750)
    
#     riverTW_250 = excutor.submit(export_pandas,riverTW_insect_250)
#     riverTW_500 = excutor.submit(export_pandas,riverTW_insect_500)
#     riverTW_750 = excutor.submit(export_pandas,riverTW_insect_750)
    
#     LRT_250 = excutor.submit(export_pandas,LRT_insect_250)
#     LRT_500 = excutor.submit(export_pandas,LRT_insect_500)
#     LRT_750 = excutor.submit(export_pandas,LRT_insect_750)
    
#     MRT_250 = excutor.submit(export_pandas,MRT_insect_250)
#     MRT_500 = excutor.submit(export_pandas,MRT_insect_500)
#     MRT_750 = excutor.submit(export_pandas,MRT_insect_750)
    
#     TRA_250 = excutor.submit(export_pandas,TRA_insect_250)
#     TRA_500 = excutor.submit(export_pandas,TRA_insect_500)
#     TRA_750 = excutor.submit(export_pandas,TRA_insect_750)
    
#     busstop_250 = excutor.submit(export_pandas,busstop_insect_250)
#     busstop_500 = excutor.submit(export_pandas,busstop_insect_500)
#     busstop_750 = excutor.submit(export_pandas,busstop_insect_750)
    
#     clinic_250 = excutor.submit(export_pandas,clinic_insect_250)
#     clinic_500 = excutor.submit(export_pandas,clinic_insect_500)
#     clinic_750 = excutor.submit(export_pandas,clinic_insect_750)
    
#     conveniencestore_250 = excutor.submit(export_pandas,conveniencestore_insect_250)
#     conveniencestore_500 = excutor.submit(export_pandas,conveniencestore_insect_500)
#     conveniencestore_750 = excutor.submit(export_pandas,conveniencestore_insect_750)
    
#     dentist_250 = excutor.submit(export_pandas,dentist_insect_250)
#     dentist_500 = excutor.submit(export_pandas,dentist_insect_500)
#     dentist_750 = excutor.submit(export_pandas,dentist_insect_750)
    
#     fastfood_250 = excutor.submit(export_pandas,fastfood_insect_250)
#     fastfood_500 = excutor.submit(export_pandas,fastfood_insect_500)
#     fastfood_750 = excutor.submit(export_pandas,fastfood_insect_750)
    
#     firestation_250 = excutor.submit(export_pandas,firestation_insect_250)
#     firestation_500 = excutor.submit(export_pandas,firestation_insect_500)
#     firestation_750 = excutor.submit(export_pandas,firestation_insect_750)
    
#     fuel_250 = excutor.submit(export_pandas,fuel_insect_250)
#     fuel_500 = excutor.submit(export_pandas,fuel_insect_500)
#     fuel_750 = excutor.submit(export_pandas,fuel_insect_750)
    
#     hospital_250 = excutor.submit(export_pandas,hospital_insect_250)
#     hospital_500 = excutor.submit(export_pandas,hospital_insect_500)
#     hospital_750 = excutor.submit(export_pandas,hospital_insect_750)
    
#     library_250 = excutor.submit(export_pandas,library_insect_250)
#     library_500 = excutor.submit(export_pandas,library_insect_500)
#     library_750 = excutor.submit(export_pandas,library_insect_750)
    
#     market_250 = excutor.submit(export_pandas,market_insect_250)
#     market_500 = excutor.submit(export_pandas,market_insect_500)
#     market_750 = excutor.submit(export_pandas,market_insect_750)
    
#     pharmacy_250 = excutor.submit(export_pandas,pharmacy_insect_250)
#     pharmacy_500 = excutor.submit(export_pandas,pharmacy_insect_500)
#     pharmacy_750 = excutor.submit(export_pandas,pharmacy_insect_750)
    
#     placeofworkship_250 = excutor.submit(export_pandas,placeofworkship_insect_250)
#     placeofworkship_500 = excutor.submit(export_pandas,placeofworkship_insect_500)
#     placeofworkship_750 = excutor.submit(export_pandas,placeofworkship_insect_750)
    
#     police_250 = excutor.submit(export_pandas,police_insect_250)
#     police_500 = excutor.submit(export_pandas,police_insect_500)
#     police_750 = excutor.submit(export_pandas,police_insect_750)
    
#     school_250 = excutor.submit(export_pandas,school_insect_250)
#     school_500 = excutor.submit(export_pandas,school_insect_500)
#     school_750 = excutor.submit(export_pandas,school_insect_750)
    
#     university_250 = excutor.submit(export_pandas,university_insect_250)
#     university_500 = excutor.submit(export_pandas,university_insect_500)
#     university_750 = excutor.submit(export_pandas,university_insect_750)

In [None]:
# cemetery_250_df = cemetery_250.result()
# cemetery_500_df = cemetery_500.result()
# cemetery_750_df = cemetery_750.result()

# park_250_df = park_250.result()
# park_500_df = park_500.result()
# park_750_df = park_750.result()

# parking_250_df = parking_250.result()
# parking_500_df = parking_500.result()
# parking_750_df = parking_750.result()

# riverTW_250_df = riverTW_250.result()
# riverTW_500_df = riverTW_500.result()
# riverTW_750_df = riverTW_750.result()

# LRT_250_df = LRT_250.result()
# LRT_500_df = LRT_500.result()
# LRT_750_df = LRT_750.result()

# MRT_250_df = MRT_250.result()
# MRT_500_df = MRT_500.result()
# MRT_750_df = MRT_750.result()

# TRA_250_df = TRA_250.result()
# TRA_500v = TRA_500.result()
# TRA_750_df = TRA_750.result()

# busstop_250_df = busstop_250.result()
# busstop_500_df = busstop_500.result()
# busstop_750_df = busstop_750.result()

# clinic_250_df = clinic_250.result()
# clinic_500_df = clinic_500.result()
# clinic_750_df = clinic_750.result()

# conveniencestore_250_df = conveniencestore_250.result()
# conveniencestore_500_df = conveniencestore_500.result()
# conveniencestore_750_df = conveniencestore_750.result()

# dentist_250_df = dentist_250.result()
# dentist_500_df = dentist_500.result()
# dentist_750_df = dentist_750.result()

# fastfood_250_df = fastfood_250.result()
# fastfood_500_df = fastfood_500.result()
# fastfood_750_df = fastfood_750.result()

# firestation_250_df = firestation_250.result()
# firestation_500_df = firestation_500.result()
# firestation_750_df = firestation_750.result()

# fuel_250_df = fuel_250.result()
# fuel_500_df = fuel_500.result()
# fuel_750_df = fuel_750.result()

# hospital_250_df = hospital_250.result()
# hospital_500_df = hospital_500.result()
# hospital_750_df = hospital_750.result()

# library_250_df = library_250.result()
# library_500_df = library_500.result()
# library_750_df = library_750.result()

# market_250_df = market_250.result()
# market_500_df = market_500.result()
# market_750_df = market_750.result()

# pharmacy_250_df = pharmacy_250.result()
# pharmacy_500_df = pharmacy_500.result()
# pharmacy_750_df = pharmacy_750.result()

# placeofworkship_250_df = placeofworkship_250.result()
# placeofworkship_500_df = placeofworkship_500.result()
# placeofworkship_750_df = placeofworkship_750.result()

# police_250_df = police_250.result()
# police_500_df = police_500.result()
# police_750_df = police_750.result()

# school_250_df = school_250.result()
# school_500_df = school_500.result()
# school_750_df = school_750.result()

# university_250_df = university_250.result()
# university_500_df = university_500.result()
# university_750_df = university_750.result()

#### 測試語法

In [None]:
# # partitioning the data
# hospital_rdd.spatialPartitioning(GridType.KDBTREE)
# house_rdd_250.spatialPartitioning(hospital_rdd.getPartitioner())
# # building an index
# hospital_rdd.buildIndex(IndexType.RTREE, True)

In [None]:
# result = JoinQuery.SpatialJoinQueryFlat(hospital_rdd, house_rdd_250, False, True)

In [None]:
#result2 = result.map(lambda x: [x[0].getUserData(), x[1].geom])
# result2 = result.map(lambda x: [x[0].getUserData()])

In [None]:
# result2

In [None]:
# university_insect_250.join(university_insect_500,["IDX_int"],'outer') \
#     .join(university_insect_750,["IDX_int"],'outer') \
#     .show()

In [None]:
# schema = StructType([
#     StructField("IDX", StringType(), False),
#     StructField("Count", DoubleType(), False)
# ])
# schema = StructType([
#     StructField("IDX", StringType(), False)
# ])

In [None]:
# result_spark = spark.createDataFrame(result2, schema, verifySchema=False)
# result_spark = result_spark.withColumn("IDX_int", result_spark["IDX"].cast(IntegerType()))
# result_spark = result_spark.drop("IDX")
# result_spark

In [None]:
# result_group_area = result_spark.groupBy("IDX_int").count()
# result_group_area

In [None]:
# for_group_result = result_group_area.toPandas()
#for_group_result.sort_values(by=['IDX_int'])

In [None]:
# for_group_result

In [None]:
#result_group_area.collect()

In [None]:
#for_group_result = result_group_area.toPandas()

In [None]:
#for_group_result.sort_values(by=['IDX_int']).head()