```bash
export PATH=${SPARK_HOME}/bin:${PATH}
export SPARK_LOCAL_IP=localhost
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='lab --ip=0.0.0.0 --allow-root --no-browser --NotebookApp.token=""'
export PACKAGES="com.esri:filegdb:0.12.5"
export PACKAGES="${PACKAGES},com.esri:spark-functions:0.10"
pyspark\
  --master local[*]\
  --num-executors 1\
  --driver-memory 30G\
  --executor-memory 30G\
  --conf spark.ui.enabled=false\
  --packages ${PACKAGES}\
  --exclude-packages org.scala-lang:scala-reflect
```

In [1]:
import os

In [2]:
_ = sql("select 'Initialize Spark'").collect()

In [3]:
spark._jvm.com.esri.spark.Functions.registerFunctions()

In [4]:
gdb = spark._jvm.com.esri.gdb.FileGDB

In [5]:
gdb_path = os.path.join("data", "Miami.gdb")
for tab in gdb.listTables(gdb_path):
    print(tab)

NameIndex(MiamiExtent,9)
NameIndex(Voyage,10)
NameIndex(Broadcast,11)
NameIndex(Vessel,12)
NameIndex(BaseStations,13)
NameIndex(AttributeUnits,14)
NameIndex(Extent,15)


In [6]:
df = spark.read \
    .format('com.esri.gdb') \
    .options(path=gdb_path, name='Broadcast') \
    .load()

In [7]:
df.printSchema()

root
 |-- OBJECTID: integer (nullable = false)
 |-- Shape: struct (nullable = true)
 |    |-- x: double (nullable = true)
 |    |-- y: double (nullable = true)
 |-- SOG: integer (nullable = true)
 |-- COG: integer (nullable = true)
 |-- Heading: integer (nullable = true)
 |-- ROT: integer (nullable = true)
 |-- BaseDateTime: timestamp (nullable = true)
 |-- Status: integer (nullable = true)
 |-- VoyageID: integer (nullable = true)
 |-- MMSI: integer (nullable = true)
 |-- ReceiverType: string (nullable = true)
 |-- ReceiverID: string (nullable = true)



In [8]:
df.count()

1365578

In [9]:
df.registerTempTable("bc")

In [12]:
sql("""
select mmsi,dt1,x1,y1,dt2,x2,y2
from (
select MMSI mmsi,
BaseDateTime as dt1,
Shape.x as x1,
Shape.y as y1,
lead(BaseDateTime) over (partition by MMSI order by BaseDateTime) as dt2,
lead(Shape.x) over (partition by MMSI order by BaseDateTime) as x2,
lead(Shape.y) over (partition by MMSI order by BaseDateTime) as y2
from bc
)
where dt1 < dt2 and haversine(x1,y1,x2,y2) > 500.0
""").show(32, truncate=True)

+---------+--------------------+----------+------------------+--------------------+----------+------------------+
|     mmsi|                 dt1|        x1|                y1|                 dt2|        x2|                y2|
+---------+--------------------+----------+------------------+--------------------+----------+------------------+
|209570000| 2009-01-03 23:30:00|-79.972952|25.910472999999996|2009-01-03 23:31:...| -79.97176|         25.906058|
|209570000|2009-01-03 23:44:...| -79.95738|25.854034999999996| 2009-01-03 23:45:00|-79.956087|25.849666999999997|
|209570000| 2009-01-04 00:15:00|-79.923492|25.737345000000005|2009-01-04 00:16:...|-79.922198|25.732974999999996|
|367136710| 2009-01-02 22:09:00|-79.844383|25.592617000000004|2009-01-02 22:10:...|-79.843567|25.599082999999993|
|367136710|2009-01-02 22:10:...|-79.843567|25.599082999999993|2009-01-02 22:11:...|-79.842817|25.605099999999993|
|367136710|2009-01-02 22:11:...|-79.842817|25.605099999999993| 2009-01-02 22:12:00| -79.