In [None]:
from sedona.spark import SedonaContext

config = (
    SedonaContext.builder()
    .config(
        "spark.jars.packages",
        ",".join([
            "org.apache.sedona:sedona-spark-3.5_2.12:1.6.1",
            "org.datasyslab:geotools-wrapper:1.7.0-28.5",
            "org.apache.hadoop:hadoop-aws:3.3.2"
        ])
    )
    .config("spark.jars.repositories", "https://artifacts.unidata.ucar.edu/repository/unidata-all")
    .config("spark.hadoop.fs.s3a.endpoint", "https://data.source.coop") \
    .config("spark.hadoop.fs.s3a.access.key", "SOURCE_COOP_S3_ACCESS_KEY") \
    .config("spark.hadoop.fs.s3a.secret.key", "SOURCE_COOP_S3_SECRET_KEY") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "true") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    # .config("spark.hadoop.fs.s3a.aws.credentials.provider", 
    #         "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")  
    .config("spark.executor.memory", "12G")
    .config("spark.driver.memory", "12G")
    .config("spark.sql.shuffle.partitions", "2")
    .getOrCreate()
)

sedona = SedonaContext.create(config)
sedona.sparkContext.setLogLevel("ERROR")

sedona.conf.set("fs.https.impl", "org.apache.hadoop.fs.http.HttpsFileSystem")

# Simple Area Measurement 

Let's make sure everything is up and running correctly

In [None]:
sql = """
SELECT ST_AreaSpheroid(
    ST_GeomFromWKT('Polygon ((34 35, 28 30, 25 34, 34 35))')
) as result
"""

In [None]:
sedona.sql(sql).show(truncate=False)

# Load data for spatial join

1. Load in CitiBike trips from [SourceCooperative](https://source.coop/repositories/zluo43/citibike/description)
2. Use the NYC Neighborhood Boundaries dataset from [Tim Kiely](https://github.com/HodgesWardElliott/custom-nyc-neighborhoods)

In [None]:
bikes = sedona.read.format('parquet') \
    .load('s3a://zluo43/citibike/new_schema_combined_with_geom.parquet/*/*/*.parquet')

In [None]:
neighborhoods = sedona.read.format('geoparquet') \
    .load('../data/custom-pedia-cities-nyc-Mar2018.parquet')

In [None]:
bikes.count()

# Repartition to improve performance

In [None]:
bikes = bikes.repartition(8)
neighborhoods = neighborhoods.repartition(8)

# Broadcast smaller `neighborhoods` to all nodes

In [None]:
from pyspark.sql.functions import broadcast
broadcasted_neighborhoods = broadcast(neighborhoods)

In [None]:
bikes.createOrReplaceTempView('bikes')
broadcasted_neighborhoods.createOrReplaceTempView('neighborhoods')

In [None]:
data = sedona.sql('''select count(b.ride_id) as rides, n.neighborhood, n.geometry
from neighborhoods n
join bikes b on st_contains(n.geometry, st_geomfromwkb(b.start_geom))
where n.geometry is not null
and b.start_geom is not null
group by n.neighborhood, n.geometry''')

In [None]:
%%time
data.show()

# Export data to Geopandas to share as GeoJSON

In [None]:
import geopandas as gpd

In [None]:
df = data.toPandas()
gdf = gpd.GeoDataFrame(df, geometry="geometry")

In [None]:
gdf.to_file('bikes.geojson')