In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from sedona.spark import *
from shapely import wkt
import os
os.environ['USE_PYGEOS'] = '0'
import geopandas as gpd
import pandas as pd

dd wkt csvs as geopandas geodataframes

In [None]:
chinatown_df = pd.read_csv('Geofences/chinatown_wkt.csv')
north_end_df = pd.read_csv('Geofences/Northend_wkt.csv')

In [None]:
chinatown_df['geometry'] = chinatown_df['WKT'].apply(wkt.loads)
north_end_df['geometry'] = north_end_df['WKT'].apply(wkt.loads)

In [None]:
chinatown_gdf = gpd.GeoDataFrame(chinatown_df, crs='EPSG:3857')
north_end_gdf = gpd.GeoDataFrame(north_end_df, crs='EPSG:3857')

Configure sedona

In [None]:
config = SedonaContext.builder(). \
    config('spark.jars.packages',
           'org.apache.sedona:sedona-spark-shaded-3.4_2.12:1.4.1,'
           'org.datasyslab:geotools-wrapper:1.4.0-28.2'). \
    getOrCreate()
sedona = SedonaContext.create(config)

efine Spark streaming context and build app

In [None]:
spark = SparkSession \
  .builder \
  .appName("Spark_Streaming_Session_Creator_v1.py") \
  .getOrCreate()
spark.sparkContext.setLogLevel("WARN")

Load kafka data into spark streaming session

In [None]:
stream_df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "mbta-realtime") \
  .load()
stream_df.printSchema()

ast json value as string

In [None]:
stream_df = stream_df.withColumn("value", stream_df["value"].cast("STRING"))

arse out lat and long information

In [None]:
latitude_expr = expr("get_json_object(value, '$.vehicle.position.latitude')").alias("latitude")
longitude_expr = expr("get_json_object(value, '$.vehicle.position.longitude')").alias("longitude")

Create new columns with lat and long information

In [None]:
parsed_df = stream_df.withColumn("latitude", latitude_expr).withColumn("longitude", longitude_expr)
parsed_df.printSchema()
parsed_df.createOrReplaceTempView("parsed_json")

onvert the geometry to wkt and create spatial dataframe

In [None]:
spatial_df = sedona.sql("SELECT timestamp, st_point(longitude, latitude) AS geometry FROM parsed_json")
spatial_df.show(n=20)

tart the streaming query to process data in realtime

In [None]:
query = spatial_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start() \

In [None]:
query.awaitTermination()

In [None]:
spark.stop()