In [1]:
%pip install -r requirements.txt

Note: you may need to restart the kernel to use updated packages.


In [None]:
from typing import final
from numpy import shape
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, to_timestamp, expr, split, when, lit
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, TimestampType
from sedona.spark import SedonaContext
from sedona.utils import SedonaKryoRegistrator, KryoSerializer
from sedona.core.SpatialRDD import PointRDD
from sedona.core.formatMapper.shapefileParser import ShapefileReader
from sedona.core.enums import FileDataSplitter
from sedona.core.geom.envelope import Envelope
from sedona.sql.types import GeometryType
import yaml
import os
from shapely import wkt
from shapely.geometry import box
from pyspark.sql.functions import to_json, struct
from pyspark.sql.functions import explode


In [3]:
HDFS_PREFIX = "hdfs://26.3.217.119:9000"
READ_DIR = f"{HDFS_PREFIX}/climate_data/uhi_index_analytics/raw/"
SAVE_DIR = f"{HDFS_PREFIX}/climate_data/uhi_index_analytics/preprocessed/"


with open("config.yaml", "r") as file:
    config = yaml.safe_load(file)
COORDS = config["coords"]

In [4]:
def create_spark_session():
    # Create a Sedona Context using individual config calls
    builder = SedonaContext.builder()

    # Set application name
    builder = builder.config("spark.app.name", "GeoSpatialPreprocessing")

    # Add each configuration individually
    builder = builder.config(
        "spark.jars.packages",
        "org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.4.1,org.datasyslab:geotools-wrapper:1.4.0-28.2",
    )
    builder = builder.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    builder = builder.config("spark.sql.extensions", "org.apache.sedona.sql.SedonaSqlExtensions")
    builder = builder.config("spark.sql.catalog.sedona", "org.apache.sedona.sql.SpatialCatalog")
    builder = builder.config("spark.sql.catalog.sedona.options", "{}")
    builder = builder.config("spark.driver.memory", "8g")

    # Create and return the Sedona context
    sedona = builder.getOrCreate()

    return sedona

In [5]:
def filter_street_spark(spark, config, readfile, savefile):
    # Define coordinates from config
    COORDS = config["coords"]

    # Read GeoJSON file
    df = spark.read.format("json").load(readfile)

    print("Input schema:")
    df.printSchema()

    # Register temporary view
    df.createOrReplaceTempView("streets_raw")

    # Initial selection of needed columns and cleanup
    df = spark.sql(
        """
        SELECT 
            OBJECTID, SegmentID, Join_ID, StreetCode, Street, 
            TrafDir, StreetWidth_Min, StreetWidth_Max, 
            trim(RW_TYPE) as RW_TYPE, POSTED_SPEED,
            Number_Travel_Lanes, Number_Park_Lanes, Number_Total_Lanes,
            FeatureTyp, SegmentTyp, BikeLane, BIKE_TRAFDIR,
            XFrom, YFrom, XTo, YTo, ArcCenterX, ArcCenterY,
            NodeIDFrom, NodeIDTo, NodeLevelF, NodeLevelT,
            TRUCK_ROUTE_TYPE, Shape__Length, geometry
        FROM streets_raw
        WHERE 
            geometry IS NOT NULL
    """
    )

    # Apply filters using Spark SQL
    df = df.filter(
        ~col("FeatureTyp").isin("2", "5", "7", "9", "F")
        & ~col("SegmentTyp").isin("G", "F")
        & ~col("RW_TYPE").isin("4", "12", "14")
        & (col("Status") == "2")
    )

    # Spatial filter using Sedona
    df.createOrReplaceTempView("streets_filtered")

    bbox_query = f"""
    SELECT * FROM streets_filtered
    WHERE ST_Contains(
        ST_GeomFromWKT('POLYGON(({COORDS[0]} {COORDS[1]}, {COORDS[2]} {COORDS[1]}, 
                              {COORDS[2]} {COORDS[3]}, {COORDS[0]} {COORDS[3]}, 
                              {COORDS[0]} {COORDS[1]}))'),
        ST_GeomFromGeoJSON(to_json(geometry))
    )
    """

    df = spark.sql(bbox_query)

    # Configure replication and save
    spark.conf.set("spark.hadoop.dfs.replication", "1")

    # # Save to parquet for efficiency
    # df.write.format("parquet").mode("overwrite").save(f"{savefile}.parquet")

    # # Optionally save as GeoJSON (less efficient but matches original)
    # df.write.format("json").mode("overwrite").save(f"{savefile}")

    # print(f"Data is saved at {savefile}")

    return df

In [None]:
def filter_building(spark, config, readfile, savefile):
    COORDS = config["coords"]

    # Use multiline option for JSON arrays
    raw = spark.read.format("json").option("multiline", "true").load(readfile)

    print("Input schema:")
    raw.printSchema()

    raw.createOrReplaceTempView("buildings_raw")

    raw = spark.sql(
        """
        SELECT 
            bin, cnstrct_yr, heightroof, the_geom as geometry, base_bbl, mpluto_bbl,
            TO_TIMESTAMP(lstmoddate) as lstmoddate,
            feat_code, lststatype
        FROM buildings_raw
        WHERE 
            the_geom IS NOT NULL AND 
            bin IS NOT NULL AND 
            cnstrct_yr IS NOT NULL AND 
            heightroof IS NOT NULL
        """
    )

    # Convert data types
    raw = (
        raw.withColumn("bin", col("bin").cast(IntegerType()))
        .withColumn("cnstrct_yr", col("cnstrct_yr").cast(IntegerType()))
        .withColumn("heightroof", col("heightroof").cast(FloatType()))
        .withColumn("feat_code", col("feat_code").cast(IntegerType()))
        .withColumn("base_bbl", col("base_bbl").cast(StringType()))
        .withColumn("mpluto_bbl", col("mpluto_bbl").cast(StringType()))
    )

    filtered = raw.filter(
        (col("cnstrct_yr") <= 2021)
        & (col("bin") / 1000000).cast(IntegerType()).isin(1, 2)
        & (col("heightroof") >= 12)
        & (col("feat_code").isin(1006, 2100))
        & (col("lstmoddate") < "2021-07-24")
        & (col("lststatype") == "Constructed")
    )

    filtered.createOrReplaceTempView("buildings_filtered")

    # Convert the_geom GeoJSON to WKT format for spatial operations
    bbox_query = f"""
    SELECT * FROM buildings_filtered
    WHERE ST_Contains(
        ST_GeomFromWKT('POLYGON(({COORDS[0]} {COORDS[1]}, {COORDS[2]} {COORDS[1]}, 
                              {COORDS[2]} {COORDS[3]}, {COORDS[0]} {COORDS[3]}, 
                              {COORDS[0]} {COORDS[1]}))'),
        ST_GeomFromGeoJSON(geometry)
    )
    """

    bbox = spark.sql(bbox_query)

    bbox.createOrReplaceTempView("buildings_bbox")

    area_query = """
        SELECT *, 
               ST_Area(ST_Transform(ST_GeomFromGeoJSON(geometry), 'EPSG:4326', 'EPSG:2263')) as shape_area
        FROM buildings_bbox
        """

    shape_area = spark.sql(area_query)

    shape_area.createOrReplaceTempView("buildings_shape_area")
    
    print(shape_area.count())

    filter_shape = shape_area.filter(col("shape_area") >= 400)

    filter_shape.createOrReplaceTempView("buildings_filter_shape")

    final = spark.sql(
        """
        SELECT bin, cnstrct_yr, heightroof, geometry, base_bbl, mpluto_bbl,
               lstmoddate, feat_code, lststatype, shape_area
        FROM buildings_filter_shape
        """
    )

    spark.conf.set("spark.hadoop.dfs.replication", "1")
    final.write.format("parquet").mode("overwrite").save(f"{savefile}")

In [1]:
spark = create_spark_session()

NameError: name 'create_spark_session' is not defined

In [199]:
df = spark.read.format("json").option("multiline", "true").load(f"{READ_DIR}nyco.geojson")

In [200]:
print("Input schema:")
df.printSchema()

Input schema:
root
 |-- features: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- geometry: struct (nullable = true)
 |    |    |    |-- coordinates: array (nullable = true)
 |    |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- type: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- properties: struct (nullable = true)
 |    |    |    |-- OBJECTID: long (nullable = true)
 |    |    |    |-- OVERLAY: string (nullable = true)
 |    |    |    |-- Shape__Area: double (nullable = true)
 |    |    |    |-- Shape__Length: double (nullable = true)
 |    |    |-- type: string (nullable = true)
 |-- type: string (nullable = true)



In [201]:
if "features" in df.columns:
    zoning_df = df.select(explode("features").alias("feature"))
    zoning_df = zoning_df.select(
        col("feature.id").alias("id"),
        col("feature.geometry").alias("geometry"),
        col("feature.properties.*")  # Flatten properties
    )
else:
    # If already at feature level
    zoning_df = df

In [202]:
select_df = zoning_df.withColumn("Shape__Area", col("Shape__Area").cast(FloatType()))
select_df = zoning_df.withColumn("Shape__Length", col("Shape__Length").cast(FloatType()))

In [204]:
select_df = select_df.filter(col("OBJECTID") != 944)

In [205]:
select_df.count()

9637

In [206]:
select_df.createOrReplaceTempView("zoning_filtered")

In [207]:
bbox_query = f"""
    SELECT * FROM zoning_filtered
    WHERE ST_Contains(
        ST_GeomFromWKT('POLYGON(({COORDS[0]} {COORDS[1]}, {COORDS[2]} {COORDS[1]}, 
                              {COORDS[2]} {COORDS[3]}, {COORDS[0]} {COORDS[3]}, 
                              {COORDS[0]} {COORDS[1]}))'),
        ST_GeomFromGeoJSON(to_json(geometry))
    )
    """

In [208]:
spatial_df = spark.sql(bbox_query)

In [209]:
spatial_df.createOrReplaceTempView("temp_view")

In [186]:
filtered_df = spark.sql(
    """
    SELECT * FROM temp_view 
    WHERE OBJECTID != '944'
"""
)

In [210]:
filtered_df.show(604, truncate=False)

Py4JJavaError: An error occurred while calling o1398.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 99.0 failed 1 times, most recent failure: Lost task 0.0 in stage 99.0 (TID 92) (host.docker.internal executor driver): java.lang.RuntimeException: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `double[]`: no String-argument constructor/factory method to deserialize from String value ('[-73.9103449368792,40.8230547655496]')
 at [Source: UNKNOWN; byte offset: #UNKNOWN] (through reference chain: org.wololo.geojson.MultiPolygon["coordinates"]->java.lang.Object[][0]->java.lang.Object[][0]->java.lang.Object[][0])
	at org.wololo.geojson.GeoJSONFactory.create(GeoJSONFactory.java:31)
	at org.wololo.jts2geojson.GeoJSONReader.read(GeoJSONReader.java:20)
	at org.wololo.jts2geojson.GeoJSONReader.read(GeoJSONReader.java:16)
	at org.apache.sedona.common.utils.FormatUtils.readGeoJSON(FormatUtils.java:188)
	at org.apache.sedona.common.utils.FormatUtils.readGeometry(FormatUtils.java:286)
	at org.apache.sedona.common.Constructors.geomFromText(Constructors.java:112)
	at org.apache.spark.sql.sedona_sql.expressions.ST_GeomFromGeoJSON.eval(Constructors.scala:160)
	at org.apache.spark.sql.sedona_sql.expressions.ST_Predicate.eval(Predicates.scala:52)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.And_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source)
	at org.apache.spark.sql.execution.FilterEvaluatorFactory$FilterPartitionEvaluator.$anonfun$eval$1(FilterEvaluatorFactory.scala:42)
	at org.apache.spark.sql.execution.FilterEvaluatorFactory$FilterPartitionEvaluator.$anonfun$eval$1$adapted(FilterEvaluatorFactory.scala:41)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:515)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `double[]`: no String-argument constructor/factory method to deserialize from String value ('[-73.9103449368792,40.8230547655496]')
 at [Source: UNKNOWN; byte offset: #UNKNOWN] (through reference chain: org.wololo.geojson.MultiPolygon["coordinates"]->java.lang.Object[][0]->java.lang.Object[][0]->java.lang.Object[][0])
	at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:67)
	at com.fasterxml.jackson.databind.DeserializationContext.reportBadDefinition(DeserializationContext.java:1915)
	at com.fasterxml.jackson.databind.DatabindContext.reportBadDefinition(DatabindContext.java:414)
	at com.fasterxml.jackson.databind.DeserializationContext.handleMissingInstantiator(DeserializationContext.java:1355)
	at com.fasterxml.jackson.databind.deser.std.StdDeserializer._deserializeFromString(StdDeserializer.java:311)
	at com.fasterxml.jackson.databind.deser.std.PrimitiveArrayDeserializers.handleNonArray(PrimitiveArrayDeserializers.java:219)
	at com.fasterxml.jackson.databind.deser.std.PrimitiveArrayDeserializers$DoubleDeser.deserialize(PrimitiveArrayDeserializers.java:890)
	at com.fasterxml.jackson.databind.deser.std.PrimitiveArrayDeserializers$DoubleDeser.deserialize(PrimitiveArrayDeserializers.java:864)
	at com.fasterxml.jackson.databind.deser.std.ObjectArrayDeserializer.deserialize(ObjectArrayDeserializer.java:216)
	at com.fasterxml.jackson.databind.deser.std.ObjectArrayDeserializer.deserialize(ObjectArrayDeserializer.java:26)
	at com.fasterxml.jackson.databind.deser.std.ObjectArrayDeserializer.deserialize(ObjectArrayDeserializer.java:216)
	at com.fasterxml.jackson.databind.deser.std.ObjectArrayDeserializer.deserialize(ObjectArrayDeserializer.java:26)
	at com.fasterxml.jackson.databind.deser.std.ObjectArrayDeserializer.deserialize(ObjectArrayDeserializer.java:216)
	at com.fasterxml.jackson.databind.deser.std.ObjectArrayDeserializer.deserialize(ObjectArrayDeserializer.java:26)
	at com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:545)
	at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:570)
	at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:439)
	at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1409)
	at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:352)
	at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:220)
	at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:187)
	at com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer._deserializeTypedForId(AsPropertyTypeDeserializer.java:170)
	at com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer.deserializeTypedFromObject(AsPropertyTypeDeserializer.java:136)
	at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeWithType(BeanDeserializerBase.java:1296)
	at com.fasterxml.jackson.databind.deser.impl.TypeWrappedDeserializer.deserialize(TypeWrappedDeserializer.java:74)
	at com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:323)
	at com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:4801)
	at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2974)
	at org.wololo.geojson.GeoJSONFactory.readGeometry(GeoJSONFactory.java:69)
	at org.wololo.geojson.GeoJSONFactory.create(GeoJSONFactory.java:28)
	... 32 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4333)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4323)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4321)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4321)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3539)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at sun.reflect.GeneratedMethodAccessor164.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `double[]`: no String-argument constructor/factory method to deserialize from String value ('[-73.9103449368792,40.8230547655496]')
 at [Source: UNKNOWN; byte offset: #UNKNOWN] (through reference chain: org.wololo.geojson.MultiPolygon["coordinates"]->java.lang.Object[][0]->java.lang.Object[][0]->java.lang.Object[][0])
	at org.wololo.geojson.GeoJSONFactory.create(GeoJSONFactory.java:31)
	at org.wololo.jts2geojson.GeoJSONReader.read(GeoJSONReader.java:20)
	at org.wololo.jts2geojson.GeoJSONReader.read(GeoJSONReader.java:16)
	at org.apache.sedona.common.utils.FormatUtils.readGeoJSON(FormatUtils.java:188)
	at org.apache.sedona.common.utils.FormatUtils.readGeometry(FormatUtils.java:286)
	at org.apache.sedona.common.Constructors.geomFromText(Constructors.java:112)
	at org.apache.spark.sql.sedona_sql.expressions.ST_GeomFromGeoJSON.eval(Constructors.scala:160)
	at org.apache.spark.sql.sedona_sql.expressions.ST_Predicate.eval(Predicates.scala:52)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.And_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source)
	at org.apache.spark.sql.execution.FilterEvaluatorFactory$FilterPartitionEvaluator.$anonfun$eval$1(FilterEvaluatorFactory.scala:42)
	at org.apache.spark.sql.execution.FilterEvaluatorFactory$FilterPartitionEvaluator.$anonfun$eval$1$adapted(FilterEvaluatorFactory.scala:41)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:515)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `double[]`: no String-argument constructor/factory method to deserialize from String value ('[-73.9103449368792,40.8230547655496]')
 at [Source: UNKNOWN; byte offset: #UNKNOWN] (through reference chain: org.wololo.geojson.MultiPolygon["coordinates"]->java.lang.Object[][0]->java.lang.Object[][0]->java.lang.Object[][0])
	at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:67)
	at com.fasterxml.jackson.databind.DeserializationContext.reportBadDefinition(DeserializationContext.java:1915)
	at com.fasterxml.jackson.databind.DatabindContext.reportBadDefinition(DatabindContext.java:414)
	at com.fasterxml.jackson.databind.DeserializationContext.handleMissingInstantiator(DeserializationContext.java:1355)
	at com.fasterxml.jackson.databind.deser.std.StdDeserializer._deserializeFromString(StdDeserializer.java:311)
	at com.fasterxml.jackson.databind.deser.std.PrimitiveArrayDeserializers.handleNonArray(PrimitiveArrayDeserializers.java:219)
	at com.fasterxml.jackson.databind.deser.std.PrimitiveArrayDeserializers$DoubleDeser.deserialize(PrimitiveArrayDeserializers.java:890)
	at com.fasterxml.jackson.databind.deser.std.PrimitiveArrayDeserializers$DoubleDeser.deserialize(PrimitiveArrayDeserializers.java:864)
	at com.fasterxml.jackson.databind.deser.std.ObjectArrayDeserializer.deserialize(ObjectArrayDeserializer.java:216)
	at com.fasterxml.jackson.databind.deser.std.ObjectArrayDeserializer.deserialize(ObjectArrayDeserializer.java:26)
	at com.fasterxml.jackson.databind.deser.std.ObjectArrayDeserializer.deserialize(ObjectArrayDeserializer.java:216)
	at com.fasterxml.jackson.databind.deser.std.ObjectArrayDeserializer.deserialize(ObjectArrayDeserializer.java:26)
	at com.fasterxml.jackson.databind.deser.std.ObjectArrayDeserializer.deserialize(ObjectArrayDeserializer.java:216)
	at com.fasterxml.jackson.databind.deser.std.ObjectArrayDeserializer.deserialize(ObjectArrayDeserializer.java:26)
	at com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:545)
	at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:570)
	at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:439)
	at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1409)
	at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:352)
	at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:220)
	at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:187)
	at com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer._deserializeTypedForId(AsPropertyTypeDeserializer.java:170)
	at com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer.deserializeTypedFromObject(AsPropertyTypeDeserializer.java:136)
	at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeWithType(BeanDeserializerBase.java:1296)
	at com.fasterxml.jackson.databind.deser.impl.TypeWrappedDeserializer.deserialize(TypeWrappedDeserializer.java:74)
	at com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:323)
	at com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:4801)
	at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2974)
	at org.wololo.geojson.GeoJSONFactory.readGeometry(GeoJSONFactory.java:69)
	at org.wololo.geojson.GeoJSONFactory.create(GeoJSONFactory.java:28)
	... 32 more


In [53]:
spatial_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- geometry: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: array (containsNull = true)
 |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- OBJECTID: long (nullable = true)
 |-- Shape__Area: double (nullable = true)
 |-- Shape__Length: float (nullable = true)
 |-- ZONEDIST: string (nullable = true)



In [170]:
spatial_df.count()

Py4JJavaError: An error occurred while calling o1336.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 78.0 failed 1 times, most recent failure: Lost task 0.0 in stage 78.0 (TID 74) (host.docker.internal executor driver): java.lang.RuntimeException: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `double[]`: no String-argument constructor/factory method to deserialize from String value ('[-73.9103449368792,40.8230547655496]')
 at [Source: UNKNOWN; byte offset: #UNKNOWN] (through reference chain: org.wololo.geojson.MultiPolygon["coordinates"]->java.lang.Object[][0]->java.lang.Object[][0]->java.lang.Object[][0])
	at org.wololo.geojson.GeoJSONFactory.create(GeoJSONFactory.java:31)
	at org.wololo.jts2geojson.GeoJSONReader.read(GeoJSONReader.java:20)
	at org.wololo.jts2geojson.GeoJSONReader.read(GeoJSONReader.java:16)
	at org.apache.sedona.common.utils.FormatUtils.readGeoJSON(FormatUtils.java:188)
	at org.apache.sedona.common.utils.FormatUtils.readGeometry(FormatUtils.java:286)
	at org.apache.sedona.common.Constructors.geomFromText(Constructors.java:112)
	at org.apache.spark.sql.sedona_sql.expressions.ST_GeomFromGeoJSON.eval(Constructors.scala:160)
	at org.apache.spark.sql.sedona_sql.expressions.ST_Predicate.eval(Predicates.scala:52)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source)
	at org.apache.spark.sql.execution.FilterEvaluatorFactory$FilterPartitionEvaluator.$anonfun$eval$1(FilterEvaluatorFactory.scala:42)
	at org.apache.spark.sql.execution.FilterEvaluatorFactory$FilterPartitionEvaluator.$anonfun$eval$1$adapted(FilterEvaluatorFactory.scala:41)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:515)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `double[]`: no String-argument constructor/factory method to deserialize from String value ('[-73.9103449368792,40.8230547655496]')
 at [Source: UNKNOWN; byte offset: #UNKNOWN] (through reference chain: org.wololo.geojson.MultiPolygon["coordinates"]->java.lang.Object[][0]->java.lang.Object[][0]->java.lang.Object[][0])
	at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:67)
	at com.fasterxml.jackson.databind.DeserializationContext.reportBadDefinition(DeserializationContext.java:1915)
	at com.fasterxml.jackson.databind.DatabindContext.reportBadDefinition(DatabindContext.java:414)
	at com.fasterxml.jackson.databind.DeserializationContext.handleMissingInstantiator(DeserializationContext.java:1355)
	at com.fasterxml.jackson.databind.deser.std.StdDeserializer._deserializeFromString(StdDeserializer.java:311)
	at com.fasterxml.jackson.databind.deser.std.PrimitiveArrayDeserializers.handleNonArray(PrimitiveArrayDeserializers.java:219)
	at com.fasterxml.jackson.databind.deser.std.PrimitiveArrayDeserializers$DoubleDeser.deserialize(PrimitiveArrayDeserializers.java:890)
	at com.fasterxml.jackson.databind.deser.std.PrimitiveArrayDeserializers$DoubleDeser.deserialize(PrimitiveArrayDeserializers.java:864)
	at com.fasterxml.jackson.databind.deser.std.ObjectArrayDeserializer.deserialize(ObjectArrayDeserializer.java:216)
	at com.fasterxml.jackson.databind.deser.std.ObjectArrayDeserializer.deserialize(ObjectArrayDeserializer.java:26)
	at com.fasterxml.jackson.databind.deser.std.ObjectArrayDeserializer.deserialize(ObjectArrayDeserializer.java:216)
	at com.fasterxml.jackson.databind.deser.std.ObjectArrayDeserializer.deserialize(ObjectArrayDeserializer.java:26)
	at com.fasterxml.jackson.databind.deser.std.ObjectArrayDeserializer.deserialize(ObjectArrayDeserializer.java:216)
	at com.fasterxml.jackson.databind.deser.std.ObjectArrayDeserializer.deserialize(ObjectArrayDeserializer.java:26)
	at com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:545)
	at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:570)
	at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:439)
	at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1409)
	at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:352)
	at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:220)
	at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:187)
	at com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer._deserializeTypedForId(AsPropertyTypeDeserializer.java:170)
	at com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer.deserializeTypedFromObject(AsPropertyTypeDeserializer.java:136)
	at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeWithType(BeanDeserializerBase.java:1296)
	at com.fasterxml.jackson.databind.deser.impl.TypeWrappedDeserializer.deserialize(TypeWrappedDeserializer.java:74)
	at com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:323)
	at com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:4801)
	at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2974)
	at org.wololo.geojson.GeoJSONFactory.readGeometry(GeoJSONFactory.java:69)
	at org.wololo.geojson.GeoJSONFactory.create(GeoJSONFactory.java:28)
	... 30 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.lang.RuntimeException: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `double[]`: no String-argument constructor/factory method to deserialize from String value ('[-73.9103449368792,40.8230547655496]')
 at [Source: UNKNOWN; byte offset: #UNKNOWN] (through reference chain: org.wololo.geojson.MultiPolygon["coordinates"]->java.lang.Object[][0]->java.lang.Object[][0]->java.lang.Object[][0])
	at org.wololo.geojson.GeoJSONFactory.create(GeoJSONFactory.java:31)
	at org.wololo.jts2geojson.GeoJSONReader.read(GeoJSONReader.java:20)
	at org.wololo.jts2geojson.GeoJSONReader.read(GeoJSONReader.java:16)
	at org.apache.sedona.common.utils.FormatUtils.readGeoJSON(FormatUtils.java:188)
	at org.apache.sedona.common.utils.FormatUtils.readGeometry(FormatUtils.java:286)
	at org.apache.sedona.common.Constructors.geomFromText(Constructors.java:112)
	at org.apache.spark.sql.sedona_sql.expressions.ST_GeomFromGeoJSON.eval(Constructors.scala:160)
	at org.apache.spark.sql.sedona_sql.expressions.ST_Predicate.eval(Predicates.scala:52)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source)
	at org.apache.spark.sql.execution.FilterEvaluatorFactory$FilterPartitionEvaluator.$anonfun$eval$1(FilterEvaluatorFactory.scala:42)
	at org.apache.spark.sql.execution.FilterEvaluatorFactory$FilterPartitionEvaluator.$anonfun$eval$1$adapted(FilterEvaluatorFactory.scala:41)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:515)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `double[]`: no String-argument constructor/factory method to deserialize from String value ('[-73.9103449368792,40.8230547655496]')
 at [Source: UNKNOWN; byte offset: #UNKNOWN] (through reference chain: org.wololo.geojson.MultiPolygon["coordinates"]->java.lang.Object[][0]->java.lang.Object[][0]->java.lang.Object[][0])
	at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:67)
	at com.fasterxml.jackson.databind.DeserializationContext.reportBadDefinition(DeserializationContext.java:1915)
	at com.fasterxml.jackson.databind.DatabindContext.reportBadDefinition(DatabindContext.java:414)
	at com.fasterxml.jackson.databind.DeserializationContext.handleMissingInstantiator(DeserializationContext.java:1355)
	at com.fasterxml.jackson.databind.deser.std.StdDeserializer._deserializeFromString(StdDeserializer.java:311)
	at com.fasterxml.jackson.databind.deser.std.PrimitiveArrayDeserializers.handleNonArray(PrimitiveArrayDeserializers.java:219)
	at com.fasterxml.jackson.databind.deser.std.PrimitiveArrayDeserializers$DoubleDeser.deserialize(PrimitiveArrayDeserializers.java:890)
	at com.fasterxml.jackson.databind.deser.std.PrimitiveArrayDeserializers$DoubleDeser.deserialize(PrimitiveArrayDeserializers.java:864)
	at com.fasterxml.jackson.databind.deser.std.ObjectArrayDeserializer.deserialize(ObjectArrayDeserializer.java:216)
	at com.fasterxml.jackson.databind.deser.std.ObjectArrayDeserializer.deserialize(ObjectArrayDeserializer.java:26)
	at com.fasterxml.jackson.databind.deser.std.ObjectArrayDeserializer.deserialize(ObjectArrayDeserializer.java:216)
	at com.fasterxml.jackson.databind.deser.std.ObjectArrayDeserializer.deserialize(ObjectArrayDeserializer.java:26)
	at com.fasterxml.jackson.databind.deser.std.ObjectArrayDeserializer.deserialize(ObjectArrayDeserializer.java:216)
	at com.fasterxml.jackson.databind.deser.std.ObjectArrayDeserializer.deserialize(ObjectArrayDeserializer.java:26)
	at com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:545)
	at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:570)
	at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:439)
	at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1409)
	at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:352)
	at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:220)
	at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:187)
	at com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer._deserializeTypedForId(AsPropertyTypeDeserializer.java:170)
	at com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer.deserializeTypedFromObject(AsPropertyTypeDeserializer.java:136)
	at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeWithType(BeanDeserializerBase.java:1296)
	at com.fasterxml.jackson.databind.deser.impl.TypeWrappedDeserializer.deserialize(TypeWrappedDeserializer.java:74)
	at com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:323)
	at com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:4801)
	at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2974)
	at org.wololo.geojson.GeoJSONFactory.readGeometry(GeoJSONFactory.java:69)
	at org.wololo.geojson.GeoJSONFactory.create(GeoJSONFactory.java:28)
	... 30 more


In [39]:
area_query = """
    SELECT *, 
           ST_Area(ST_Transform(ST_GeomFromGeoJSON(to_json(geometry)), 'EPSG:4326', 'EPSG:2263')) as area_sqm
    FROM zoning_bbox
    """

In [40]:
area_df = spark.sql(area_query)

In [41]:
area_df.show(5)

+--------+-----------+-------------+--------------------+-------------------+
|OBJECTID|Shape__Area|Shape__Length|            geometry|           area_sqm|
+--------+-----------+-------------+--------------------+-------------------+
|     298|  142598.66|    1665.3289|{[[[-73.892051298...|1.381991657409128E7|
|     303|   780.5681|    322.51852|{[[[-73.903506374...|  75756.17098496873|
|     304|   75266.35|    1259.1455|{[[[-73.891167366...|  7294919.686124862|
|     305|  38180.176|    1243.3271|{[[[-73.890457615...| 3699470.9591754447|
|     306|  15974.725|    598.66406|{[[[-73.899581025...| 1550268.6037419448|
+--------+-----------+-------------+--------------------+-------------------+
only showing top 5 rows



In [10]:
if "features" in df.columns:
    streets_df = df.select(explode("features").alias("feature"))
    streets_df = streets_df.select(
        col("feature.id").alias("id"),
        col("feature.geometry").alias("geometry"),
        col("feature.properties.*")  # Flatten properties
    )
else:
    # If already at feature level
    streets_df = df
    


In [None]:
# Register temporary view
streets_df.createOrReplaceTempView("streets_raw")

In [12]:
# Initial filtering based on the street criteria
filtered_df = streets_df.filter(
    (~col("FeatureTyp").isin("2", "5", "7", "9", "F")) &
    (~col("SegmentTyp").isin("G", "F")) &
    (~col("RW_TYPE").isin("4", "12", "14")) &
    (col("Status") == "2")
)

In [13]:
cols_to_keep = [
    "OBJECTID",
    "SegmentID",
    "Join_ID",
    "StreetCode",
    "Street",
    "TrafDir",
    "StreetWidth_Min",
    "StreetWidth_Max",
    "RW_TYPE",
    "POSTED_SPEED",
    "Number_Travel_Lanes",
    "Number_Park_Lanes",
    "Number_Total_Lanes",
    "FeatureTyp",
    "SegmentTyp",
    "BikeLane",
    "BIKE_TRAFDIR",
    "XFrom",
    "YFrom",
    "XTo",
    "YTo",
    "ArcCenterX",
    "ArcCenterY",
    "NodeIDFrom",
    "NodeIDTo",
    "NodeLevelF",
    "NodeLevelT",
    "TRUCK_ROUTE_TYPE",
    "Shape__Length",
    "geometry",
]

In [14]:
select_df = filtered_df.select([col(c) for c in cols_to_keep if c in filtered_df.columns])

In [15]:
# Convert string fields to proper types
select_df = select_df.withColumn("StreetWidth_Min", col("StreetWidth_Min").cast(IntegerType()))
select_df = select_df.withColumn("StreetWidth_Max", col("StreetWidth_Max").cast(IntegerType()))
select_df = select_df.withColumn("Shape__Length", col("Shape__Length").cast(FloatType()))

In [16]:
 select_df.createOrReplaceTempView("streets_filtered")

In [8]:
spark.stop()

In [19]:
bbox_query = f"""
SELECT * FROM streets_filtered
WHERE ST_Contains(
    ST_GeomFromWKT('POLYGON(({COORDS[0]} {COORDS[1]}, {COORDS[2]} {COORDS[1]}, 
                            {COORDS[2]} {COORDS[3]}, {COORDS[0]} {COORDS[3]}, 
                            {COORDS[0]} {COORDS[1]}))'),
    ST_GeomFromGeoJSON(to_json(geometry))
)
"""

In [20]:
spatial_df = spark.sql(bbox_query)

In [23]:
spatial_df.show(5)

+--------+---------+---------------+----------+---------------+-------+---------------+---------------+-------+------------+-------------------+-----------------+------------------+----------+----------+--------+------------+-------+------+-------+------+----------+----------+----------+--------+----------+----------+----------------+-------------+--------------------+
|OBJECTID|SegmentID|        Join_ID|StreetCode|         Street|TrafDir|StreetWidth_Min|StreetWidth_Max|RW_TYPE|POSTED_SPEED|Number_Travel_Lanes|Number_Park_Lanes|Number_Total_Lanes|FeatureTyp|SegmentTyp|BikeLane|BIKE_TRAFDIR|  XFrom| YFrom|    XTo|   YTo|ArcCenterX|ArcCenterY|NodeIDFrom|NodeIDTo|NodeLevelF|NodeLevelT|TRUCK_ROUTE_TYPE|Shape__Length|            geometry|
+--------+---------+---------------+----------+---------------+-------+---------------+---------------+-------+------------+-------------------+-----------------+------------------+----------+----------+--------+------------+-------+------+-------+------+-

In [26]:
spatial_df.createOrReplaceTempView("streets_bbox")

In [35]:
length_query = """
    SELECT *, 
           ST_Length(ST_Transform(ST_GeomFromGeoJSON(to_json(geometry)), 'EPSG:2263', 'EPSG:4326')) as length_meters
    FROM streets_bbox
    """

In [28]:
length_df = spark.sql(length_query)

In [29]:
length_df.show(5)

+--------+---------+---------------+----------+---------------+-------+---------------+---------------+-------+------------+-------------------+-----------------+------------------+----------+----------+--------+------------+-------+------+-------+------+----------+----------+----------+--------+----------+----------+----------------+-------------+--------------------+------------------+
|OBJECTID|SegmentID|        Join_ID|StreetCode|         Street|TrafDir|StreetWidth_Min|StreetWidth_Max|RW_TYPE|POSTED_SPEED|Number_Travel_Lanes|Number_Park_Lanes|Number_Total_Lanes|FeatureTyp|SegmentTyp|BikeLane|BIKE_TRAFDIR|  XFrom| YFrom|    XTo|   YTo|ArcCenterX|ArcCenterY|NodeIDFrom|NodeIDTo|NodeLevelF|NodeLevelT|TRUCK_ROUTE_TYPE|Shape__Length|            geometry|     length_meters|
+--------+---------+---------------+----------+---------------+-------+---------------+---------------+-------+------------+-------------------+-----------------+------------------+----------+----------+--------+------

In [30]:
length_df.count()

39838

In [9]:
filter_building(spark, config, f"{READ_DIR}/building/building.json", "building.parquet")

Input schema:
root
 |-- base_bbl: string (nullable = true)
 |-- bin: string (nullable = true)
 |-- cnstrct_yr: string (nullable = true)
 |-- doitt_id: string (nullable = true)
 |-- feat_code: string (nullable = true)
 |-- geomsource: string (nullable = true)
 |-- globalid: string (nullable = true)
 |-- groundelev: string (nullable = true)
 |-- heightroof: string (nullable = true)
 |-- lstmoddate: string (nullable = true)
 |-- lststatype: string (nullable = true)
 |-- mpluto_bbl: string (nullable = true)
 |-- name: string (nullable = true)
 |-- shape_area: string (nullable = true)
 |-- shape_len: string (nullable = true)
 |-- the_geom: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: array (containsNull = true)
 |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)

6073

In [34]:
spark.read.parquet("building.parquet").show(5)

+---+----------+----------+--------+--------+----------+----------+---------+----------+----------+
|bin|cnstrct_yr|heightroof|geometry|base_bbl|mpluto_bbl|lstmoddate|feat_code|lststatype|shape_area|
+---+----------+----------+--------+--------+----------+----------+---------+----------+----------+
+---+----------+----------+--------+--------+----------+----------+---------+----------+----------+



In [None]:
def extract_feature_building(spark, readfile, savefile):
    """Extract building features using PySpark"""
    # Read parquet file
    df = spark.read.parquet(f"{READ_DIR}{readfile}")

    # Column types already set in parquet, but ensure consistency
    df = df.select(
        col("bin").cast(IntegerType()),
        col("cnstrct_yr").cast(IntegerType()),
        col("heightroof").cast(FloatType()),
        col("shape_area").cast(FloatType()),
        col("geometry"),
        col("base_bbl").cast(StringType()),
        col("mpluto_bbl").cast(StringType()),
    )

    # Save as GeoJSON
    df.write.format("parquet").mode("overwrite").save(f"{SAVE_DIR}{savefile}")
    print(f"Data is saved at {SAVE_DIR}{savefile}.")

    return df

In [3]:
import cdsapi
import yaml

# with open("config.yaml", "r") as file:
#     config = yaml.safe_load(file)

# Initialize the CDS API client
dataset = "satellite-sea-level-global"
request = {
    "variable": ["monthly_mean"],
    "year": [
        "1993",
        "1994",
        "1995",
        "1996",
        "1997",
        "1998",
        "1999",
        "2000",
        "2001",
        "2002",
        "2003",
        "2004",
        "2005",
        "2006",
        "2007",
        "2008",
        "2009",
        "2010",
        "2011",
        "2012",
        "2013",
        "2014",
        "2015",
        "2016",
        "2017",
        "2018",
        "2019",
        "2020",
        "2021",
        "2022",
        "2023",
    ],
    "month": ["01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12"],
    "version": "vdt2024",
}

client = cdsapi.Client()
client.retrieve(dataset, request).download()

Exception: Missing/incomplete configuration file: C:\Users\Leo/.cdsapirc

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, when, expr, lit, degrees, atan2, split, substring
from pyspark.sql.types import StringType, IntegerType, FloatType, DoubleType, ArrayType, StructType, StructField
from sedona.spark import SedonaContext
from sedona.sql.types import GeometryType
import yaml
import math
import os
from pyspark.sql.functions import trim
from pyproj import Transformer

In [3]:
with open("config.yaml", "r") as file:
    config = yaml.safe_load(file)

In [4]:
def create_spark_session(
    core: int = 6,
    driver_menory: str = "8g",
):
    # Create a Sedona Context using individual config calls
    builder = SedonaContext.builder()

    # Set application name
    builder = builder.config("spark.app.name", "GeoSpatialPreprocessing")

    # Add each configuration individually
    builder = builder.config(
        "spark.jars.packages",
        "org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.4.1,org.datasyslab:geotools-wrapper:1.4.0-28.2",
    )
    builder = builder.master(f"local[{core}]")
    builder = builder.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    builder = builder.config("spark.sql.extensions", "org.apache.sedona.sql.SedonaSqlExtensions")
    builder = builder.config("spark.sql.catalog.sedona", "org.apache.sedona.sql.SpatialCatalog")
    builder = builder.config("spark.sql.catalog.sedona.options", "{}")
    builder = builder.config("spark.driver.memory", f"{driver_menory}")

    # Create and return the Sedona context
    sedona = builder.getOrCreate()

    return sedona


def convert_to_float_spark(df, column):
    return df.withColumn(column, expr(f"CAST(NULLIF(TRIM({column}), '') AS FLOAT)"))


def feet_to_degree_spark(df, lon_col, lat_col):
    transformer = Transformer.from_crs("EPSG:2263", "EPSG:4326", always_xy=True)

    def transform_coords(lon, lat):
        if lon is not None and lat is not None:
            return transformer.transform(lon, lat)
        return (None, None)

    transform_udf = udf(
        transform_coords, StructType([StructField("lon", FloatType()), StructField("lat", FloatType())])
    )
    return df.withColumn("transformed", transform_udf(col(lon_col), col(lat_col)))


def street_direction_spark(df):
    df = df.withColumn("dx", col("XTo") - col("XFrom"))
    df = df.withColumn("dy", col("YTo") - col("YFrom"))
    df = df.withColumn("angle", expr("degrees(atan2(dy, dx))"))
    df = df.withColumn("angle", when(col("angle") < 0, col("angle") + 360).otherwise(col("angle")))
    df = df.withColumn(
        "direction",
        when(
            ((22.5 <= col("angle")) & (col("angle") < 67.5)) | ((157.5 <= col("angle")) & (col("angle") < 202.5)),
            "NE-SW",
        )
        .when(
            ((67.5 <= col("angle")) & (col("angle") < 112.5)) | ((247.5 <= col("angle")) & (col("angle") < 292.5)),
            "N-S",
        )
        .when(
            ((112.5 <= col("angle")) & (col("angle") < 157.5)) | ((292.5 <= col("angle")) & (col("angle") < 337.5)),
            "NW-SE",
        )
        .otherwise("E-W"),
    )
    return df


def extract_feature_street_spark(spark, readfile, savefile):
    # Read GeoJSON file
    df = spark.read.parquet(readfile)
    print("Input schema:")
    df.printSchema()
    
    # Convert string to float
    columns_to_convert = [
        "RW_TYPE",
        "Number_Travel_Lanes",
        "Number_Park_Lanes",
        "Number_Total_Lanes",
        "POSTED_SPEED",
        "BikeLane",
        "TRUCK_ROUTE_TYPE",
    ]
    for col_name in columns_to_convert:
        df = convert_to_float_spark(df, col_name)
    df = df.withColumn("RW_TYPE", col("RW_TYPE").cast("int"))

    # Handle BIKE_TRAFDIR
    df = df.withColumn(
        "BIKE_TRAFDIR", when(expr("TRIM(BIKE_TRAFDIR) = ''"), None).otherwise(expr("TRIM(BIKE_TRAFDIR)"))
    )

    # Calculate average street width
    df = df.withColumn("street_width_avg", (col("StreetWidth_Min") + col("StreetWidth_Max")) / 2)

    # Transform coordinates
    df = feet_to_degree_spark(df, "XFrom", "YFrom")
    df = df.withColumn("XFrom", col("transformed.lon")).withColumn("YFrom", col("transformed.lat")).drop("transformed")
    df = feet_to_degree_spark(df, "XTo", "YTo")
    df = df.withColumn("XTo", col("transformed.lon")).withColumn("YTo", col("transformed.lat")).drop("transformed")

    # Calculate street direction
    df = street_direction_spark(df)

    # Extract features
    df = df.select(
        [
            "OBJECTID",
            "Join_ID",
            "StreetCode",
            "Street",
            "TrafDir",
            "StreetWidth_Min",
            "StreetWidth_Max",
            "street_width_avg",
            "RW_TYPE",
            "POSTED_SPEED",
            "Number_Travel_Lanes",
            "Number_Park_Lanes",
            "Number_Total_Lanes",
            "FeatureTyp",
            "SegmentTyp",
            "BikeLane",
            "BIKE_TRAFDIR",
            "TRUCK_ROUTE_TYPE",
            "Shape__Length",
            "direction",
            "geometry",
        ]
    )
    
    df.show(truncate=False)

    # # Save to Parquet
    # df.write.format("parquet").mode("overwrite").save(savefile)
    # print(f"Data is saved at {savefile}.")

In [5]:
print(os.environ['VIRTUAL_ENV'])

d:\HW_Project\BigData\code\uhi_index_analytics\.venv


In [6]:
os.environ["PYSPARK_PYTHON"] = os.path.join(os.environ["VIRTUAL_ENV"], "Scripts", "python.exe")
spark = create_spark_session()


In [None]:

HDFS_PREFIX = "hdfs://26.3.217.119:9000"
READ_DIR = f"{HDFS_PREFIX}/climate_data/uhi_index_analytics/preprocessed/"
SAVE_DIR = f"{HDFS_PREFIX}/climate_data/uhi_index_analytics/features_extraction/"

extract_feature_street_spark(spark, f"{READ_DIR}street.parquet", "street_features.parquet")

In [7]:
spark.read.parquet(f"hdfs://26.3.217.119:9000/climate_data/uhi_index_analytics/features_extraction/street_features.parquet").show(truncate=False)

+--------+---------------+----------+-----------------+-------+---------------+---------------+----------------+-------+------------+-------------------+-----------------+------------------+----------+----------+--------+------------+----------------+-------------+---------+--------------------------------------------------------------------------------------------+
|OBJECTID|Join_ID        |StreetCode|Street           |TrafDir|StreetWidth_Min|StreetWidth_Max|street_width_avg|RW_TYPE|POSTED_SPEED|Number_Travel_Lanes|Number_Park_Lanes|Number_Total_Lanes|FeatureTyp|SegmentTyp|BikeLane|BIKE_TRAFDIR|TRUCK_ROUTE_TYPE|Shape__Length|direction|geometry                                                                                    |
+--------+---------------+----------+-----------------+-------+---------------+---------------+----------------+-------+------------+-------------------+-----------------+------------------+----------+----------+--------+------------+----------------+-----------

In [10]:
spark.read.parquet(f"hdfs://26.3.217.119:9000/climate_data/trend_analytics/preprocessed/clean_co2.parquet").show(
    truncate=False
)

+----+-----+------------+-------+--------------+-----+-----+-----+
|year|month|decimal_date|average|deseasonalized|ndays|sdev |unc  |
+----+-----+------------+-------+--------------+-----+-----+-----+
|1958|3    |1958.2027   |315.71 |314.44        |-1   |-9.99|-0.99|
|1958|4    |1958.2877   |317.45 |315.16        |-1   |-9.99|-0.99|
|1958|5    |1958.3699   |317.51 |314.69        |-1   |-9.99|-0.99|
|1958|6    |1958.4548   |317.27 |315.15        |-1   |-9.99|-0.99|
|1958|7    |1958.537    |315.87 |315.2         |-1   |-9.99|-0.99|
|1958|8    |1958.6219   |314.93 |316.21        |-1   |-9.99|-0.99|
|1958|9    |1958.7068   |313.21 |316.11        |-1   |-9.99|-0.99|
|1958|10   |1958.789    |312.42 |315.41        |-1   |-9.99|-0.99|
|1958|11   |1958.874    |313.33 |315.21        |-1   |-9.99|-0.99|
|1958|12   |1958.9562   |314.67 |315.43        |-1   |-9.99|-0.99|
|1959|1    |1959.0411   |315.58 |315.52        |-1   |-9.99|-0.99|
|1959|2    |1959.126    |316.49 |315.84        |-1   |-9.99|-0

In [1]:
from google.cloud import storage


In [2]:
def list_buckets():
    """Lists all buckets."""

    storage_client = storage.Client()
    buckets = storage_client.list_buckets()

    for bucket in buckets:
        print(bucket.name)

list_buckets()

DefaultCredentialsError: Your default credentials were not found. To set up Application Default Credentials, see https://cloud.google.com/docs/authentication/external/set-up-adc for more information.

In [1]:
import numpy as np
import pandas as pd

[0 0 3 4 5]
