## Virtual Gate Crossing

This notebook demonstrations a parallel, distributed, share-nothing spatial join between a relatively large dataset and a small dataset.

In this case, virtual gates are defined at various locations in a port, and the outcome is an account of the number of crossings of these gates by ships using their AIS target positions.

Note that the join is to a "small" spatial dataset that we can:

- [Broadcast](https://spark.apache.org/docs/latest/rdd-programming-guide.html#broadcast-variables) to all the spark workers.
- Brutly traverse it on each worker, as it is cheaper and faster to do so that spatially index it.

To get started, make sure to install [shapely](https://shapely.readthedocs.io/en/latest/) in the current active conda environment using the command line:

```
conda install shapely
```

### Install the required modules.

In [None]:
import os
import arcgis
from spark_esri import spark_start, spark_stop

### Stop existing spark instance.

In [None]:
spark_stop()

### Start a spark instance with user defined configurtations.

In [None]:
config = {
    "spark.driver.memory":"16G"
}
spark = spark_start(config=config)

### Defined a global spatial reference to be used throught the notebook.

All coordinates will be in web mercator meters.

In [None]:
sp_ref = arcpy.SpatialReference(3857)

### Load into a broadcast variable the virtual gates.

A gate is a feature has:

- A LineString shape type.
- One attribute named `GateID` of type `Long`.

Note that the shape is read in [WKB](https://en.wikipedia.org/wiki/Well-known_text_representation_of_geometry) format to enable its serialization during the broadcast internal mechanism.

In [None]:
fields = ["GateID","SHAPE@WKB"]
with arcpy.da.SearchCursor("Gates", fields, spatial_reference=sp_ref) as rows:
    bv = spark.sparkContext.broadcast(list(rows))

### Read the AIS target locations.

Note that the target timestamp is converted to an epoch value (seconds since 1970).

In [None]:
fields = ["SHAPE@X","SHAPE@Y","MMSI","BaseDateTime"]
with arcpy.da.SearchCursor("Broadcast", fields, spatial_reference=sp_ref) as rows:
    spark\
        .createDataFrame(rows, "x double,y double,mmsi string,t timestamp")\
        .selectExpr("mmsi","x","y","unix_timestamp(t) t")\
        .createOrReplaceTempView("v0")

### A micropath is created from the current record and the leading record based on the `mmsi` field and the epoch `t` field.

In [None]:
spark\
    .sql("""
select mmsi,
x x1,
y y1,
t t1,
lead(x,1,0.0) over (partition by mmsi order by t) x2,
lead(y,1,0.0) over (partition by mmsi order by t) y2,
lead(t,1,0) over (partition by mmsi order by t) t2
from v0
""")\
    .createOrReplaceTempView("v1")

### Each micropath is enriched with the vertical and horizontal displacement and the time difference.

In [None]:
spark.sql("select *,(x2-x1) dx,(y2-y1) dy,(t2-t1) dt from v1 where t1 < t2").createOrReplaceTempView("v2")

### Enrich with travel distance.

In [None]:
spark.sql("select mmsi,x1,y1,x2,y2,sqrt(dx*dx+dy*dy) dd,dt from v2").createOrReplaceTempView("v3")

### Enrich with travel velocity.

In [None]:
spark.sql("select *,dd/dt mps from v3").createOrReplaceTempView("v4")

### Remove noisy micropaths.

In [None]:
df1 = spark.sql("""
select x1,y1,x2,y2
from v4
where dd between 1 and 1500
and mps < 25
and dt < 130
""")

### Defined a function to be execute on each micropath partition.

The function accepts a set of micropaths as an argument and will:

- Read the broadcasted gates once at start of the function and will convert the gates to shapely geometries.
- Iterate through every micropath and will check if it intersecs with of a gate.
- If an intersection is detected, then the intersection point is emitted and so is the direction of travel.
- The direction of travel is defined as `LR` (left to right) or `RL` (right to left).  The following illustrates what is defined as left and right. This is relative to the how gate is "drawn" on the map.

![](media/Gates0.png)

In [None]:
def func(partition):
    from shapely.wkb import loads
    from shapely.geometry import LineString
    
    def name_geom(_g):
        """Function to convert WKB to a shapely geometry and line vector.
        """
        geom = loads(bytes(_g[1]))
        coords = geom.coords
        head = coords[0]
        last = coords[-1]
        vx = last[0] - head[0]
        vy = last[1] - head[1]
        return _g[0],geom,vx,vy
        
    # Read the gates in WKB format from the broadcast variable.
    gates = [name_geom(v) for v in bv.value]
        
    # Perform cartesian product of paths and gates.
    for micropath in partition:
        x1 = micropath["x1"]
        y1 = micropath["y1"]
        x2 = micropath["x2"]
        y2 = micropath["y2"]
        px = x2 - x1
        py = y2 - y1
        path = LineString([(x1,y1),(x2,y2)])
        for gate_id, gate_geom, gx, gy in gates:
            point = gate_geom.intersection(path)
            if not point.is_empty:
                # Calculate the cross product between the micropath vector and the gate vector.
                cross = px * gy - py * gx
                lr_rl = "RL" if cross < 0.0 else "LR" 
                yield point.x,point.y,gate_id,lr_rl

### Perform the distributed spatial join.

As of this writing and in Spark 2.X in python, a dataframe has to be converted to an RDD to apply a function on each distributed partition and then converted back to a dataframe.

In [None]:
df2 = df1\
    .rdd\
    .mapPartitions(func)\
    .toDF(["gate_x","gate_y","gate_id","travel_dir"])\
    .cache()

df2.createOrReplaceTempView("v5")

### Collect on the driver all the intersection points at each gate.

In [None]:
rows = df2.collect()

### Create a point feature class of the collected points to be placed in the TOC.

In [None]:
ws = "memory"
nm = "GatePoints"

fc = os.path.join(ws,nm)

arcpy.management.Delete(fc)

arcpy.management.CreateFeatureclass(ws, nm, "POINT", spatial_reference=sp_ref)
arcpy.management.AddField(fc, "GATE_ID", "LONG")
arcpy.management.AddField(fc, "TRAVEL_DIR", "TEXT")

with arcpy.da.InsertCursor(fc, ["SHAPE@X","SHAPE@Y","GATE_ID","TRAVEL_DIR"]) as cursor:
    for row in rows:
        cursor.insertRow(row)

### Calculate the statistics for each gate and its travel direction.

In [None]:
rows = spark.sql("""
select gate_id,travel_dir,count(1) cnt
from v5
group by gate_id,travel_dir
order by gate_id,travel_dir
""")\
    .collect()

### Create a table of the statistics and place it in the TOC.

In [None]:
ws = "memory"
nm = "GateStats"

fc = os.path.join(ws,nm)

arcpy.management.Delete(fc)

arcpy.management.CreateTable(ws, nm)
arcpy.management.AddField(fc, "GATE_ID", "LONG")
arcpy.management.AddField(fc, "TRAVEL_DIR", "TEXT")
arcpy.management.AddField(fc, "TRAVEL_CNT", "LONG")

with arcpy.da.InsertCursor(fc, ["GATE_ID","TRAVEL_DIR","TRAVEL_CNT"]) as cursor:
    for row in rows:
        cursor.insertRow(row)

### Stop the spark instance.

In [None]:
spark_stop()