# Deciphering vessel movements in the Salish Sea

![](assets/tally-ho.png)

__Join me on a voyage of discovery as we sail through some basic spatial data analysis together.__

We'll take a look at vessel movements around the Salish Sea in the Pacific North West of the USA and Canada. This is an expanse of water separated from the Pacific Ocean by Vancouver Island and the Olympic Peninsula.

It's home to some of the largest ports on the west coast of North America, including the Port of Vancouver, the Port of Seattle, and the Port of Tacoma (which together form the Northwest Seaport Alliance). It sees immense container ship traffic, bulk carriers, cruise ships, and one of the largest ferry networks in the world (BC Ferries and Washington State Ferries).

It is also a world-renowned destination for recreational boating. The protected waters, stunning scenery, and the thousands of islands, inlets, and anchorages (like the San Juan Islands and the Gulf Islands) make it a paradise for sailors, yachters, and kayakers.

In [0]:
%load_ext autoreload
%autoreload 2

In [0]:
%pip install geopandas numpy==2.1.3 folium>=0.12 matplotlib mapclassify

In [0]:
import os
import shutil

import geopandas as gpd

from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import ResourceAlreadyExists
from databricks.sdk.service.catalog import VolumeType

import pyspark.databricks.sql.functions as DBF
import pyspark.sql.functions as F
from pyspark.sql.window import Window

from utils.reader import ShapefileDataSource

## Load reference dataset (ports in the area)

In [0]:
spark.dataSource.register(ShapefileDataSource)

In [0]:
w = WorkspaceClient()

user_name = w.current_user.me().user_name

CATALOG = "dbacademy"
SCHEMA = user_name.split("@")[0]
VOLUME = "reference"

INPUT_PATH = f"/Volumes/{CATALOG}/{SCHEMA}/{VOLUME}"
LAYER_NAME = "salish-ports"

In [0]:
try:
    w.volumes.create(
        name=VOLUME,
        catalog_name=CATALOG,
        schema_name=SCHEMA,
        volume_type=VolumeType.MANAGED,
    )
except ResourceAlreadyExists:
    print("Skipping volume creation. Already exists")

In [0]:
shutil.copy(f"{os.getcwd()}/data/salish-ports.zip", INPUT_PATH)

In [0]:
ports_raw = (
  spark.read.format("shapefile")
  .option("layer_name", LAYER_NAME)
  .load(INPUT_PATH)
  )
  
display(ports_raw)

Expand out feature properties using `from_json` and `schema_of_json` expressions from `pyspark.sql.functions`.

In [0]:
props_schema = F.schema_of_json(
  """{"OID":"1726.0","WPIN":"18460.0","REGION":"Canada West Coast -- 18080","PORT":"Comox Harbor","UN_LOCODE":"CA COX","COUNTRY":"Canada","WORLD_WATE":"Alaska-Canada coastal waters; North Pacific Ocean","SIZE":"Small","TYPE":"Coastal (Natural)","LAT":49.666667,"LON":-124.916667}"""
  )

In [0]:
ports = (
  ports_raw
  .withColumn("properties", F.from_json(F.col("properties"), schema=props_schema))
  .select("geometry", "crs", "properties.*")
)

display(ports)

Quickly visualise these using GeoPandas `explore()` method.

In [0]:
gpd.GeoSeries.from_wkb(ports_raw.toPandas()["geometry"].values, crs=26910).explore()

Construct Databricks GEOMETRY typed column from well-known-binary geometries and transform coordinates from British National Grid projected CRS to WGS84 geographic CRS (EPSG:27700 to EPSG:4326)

In [0]:
property_cols = [
  "OID",
  "WPIN",
  "PORT",
  "UN_LOCODE",
  "COUNTRY",
  "REGION",
  "WORLD_WATE",
  "SIZE",
  "TYPE",
  "LAT",
  "LON",
]

UTM10N = 26910
WGS84 = 4326

ports_reprojected = (
  ports
  .withColumn("geometry_UTM10N", DBF.st_geomfromwkb("geometry", UTM10N))
  .withColumn("geometry_wgs84", DBF.st_transform("geometry_UTM10N", WGS84))
  .where(DBF.st_isvalid("geometry_wgs84"))
  .select(*property_cols, "geometry_UTM10N", "geometry_wgs84")
)

display(ports_reprojected)

Filter these to remove the 'very small' ports. We'll ignore these for the purposes of this analysis.

In [0]:
ports_filtered = ports_reprojected.filter(~(F.col("SIZE")=="Very Small"))
ports_filtered.count()

In [0]:
ports_tref = f"{CATALOG}.{SCHEMA}.ports"
spark.sql(f"DROP TABLE IF EXISTS {ports_tref}")

ports_filtered.write.saveAsTable(ports_tref, mode="overwrite")

## Connecting the dots

Great, let's bring this together with our AIS data

In [0]:
%sql
select * from stuart.pubsecgeo.ais_data_sample

In [0]:
_sqldf.count()

There's a lot of data here, let's subset it to just our area of interest.

We're going to create a buffer of 1 NM (Nautical Mile, not nanometre!) around each of our ports, then compute an envelope around this region to perform a very rough filtering of the AIS events.

In [0]:
buffered_ports = (
    spark.table(ports_tref)
    # 1. Buffer in the projected CRS for accurate distance
    .withColumn(
        "buffered_geom_projected", DBF.st_buffer("geometry_UTM10N", F.lit(1 * 1852))
    )
    # 2. Reproject the accurate buffer to CRS84 to match AIS data
    .withColumn(
        "buffered_geom_wgs84",
        DBF.st_transform("buffered_geom_projected", WGS84),
    )
)

buffered_ports.display()

In [0]:
salish_sea_envelope = buffered_ports.groupBy().agg(
    DBF.st_envelope_agg("buffered_geom_wgs84").alias("filter_envelope")
)

In [0]:
ais_events_tref = f"{CATALOG}.{SCHEMA}.ais_data_sample"

filtered_ais_events = (
  spark.table(ais_events_tref).alias("e")
  .join(salish_sea_envelope.alias("p"), on=(
    (F.col("e.longitude")>=DBF.st_xmin("p.filter_envelope")) &
    (F.col("e.longitude")<=DBF.st_xmax("p.filter_envelope")) &
    (F.col("e.latitude")>=DBF.st_ymin("p.filter_envelope")) &
    (F.col("e.latitude")<=DBF.st_ymax("p.filter_envelope"))
        )
  )
  )
filtered_ais_events.count()

## Identify vessels in port

Now we can go ahead and find all of the AIS events corresponding to vessels in the buffer zone around a port.

In [0]:
ais_in_ports = (
    filtered_ais_events.alias("e")
    .join(
        buffered_ports.hint("broadcast").alias("p"), 
        on=DBF.st_intersects(F.col("e.point_geom"), F.col("p.buffered_geom_wgs84"))
    )
    .select(
        "mmsi", 
        "vessel_name",
        "timestamp",
        "OID",
        "PORT"
    )
)

ais_in_ports.display()

By 'sessionizing' this trail of events, we can trace out the journeys between ports.

In [0]:
vessel_window = Window.partitionBy("mmsi").orderBy("timestamp")

In [0]:
ais_with_gaps = (
    ais_in_ports
    .withColumn("prev_port", F.lag("OID").over(vessel_window))
    .withColumn("prev_time", F.lag("timestamp").over(vessel_window))
    .withColumn(
        "is_new_session",
        (
            (F.col("prev_port") != F.col("OID")) | # Port changed
            (F.col("prev_port").isNull()) # First-ever ping
        ).cast("integer")
    )
)

In [0]:
session_window = Window.partitionBy("mmsi").orderBy("timestamp")

ais_with_sessions = (
    ais_with_gaps
    .withColumn("session_id", F.sum("is_new_session").over(session_window))
)

We'll apply a criteria that port stays require loitering in the buffer for at least 15 minutes

In [0]:
port_stays = (
    ais_with_sessions
    .groupBy("mmsi", "vessel_name", "OID", "PORT", "session_id")
    .agg(
        F.min("timestamp").alias("arrival_time"),
        F.max("timestamp").alias("departure_time")
    )
    .withColumn("duration_seconds", 
                F.col("departure_time").cast("long") - F.col("arrival_time").cast("long"))
    .filter(F.col("duration_seconds") > (15 * 60)) 
)

port_stays.display()

## Compute O/D journey counts

In [0]:
journey_window = Window.partitionBy("mmsi").orderBy("arrival_time")

# 2. Get the 'origin' (current port) and 'destination' (next port)
journeys = (
    port_stays
    .withColumn("destination_port", F.lead("PORT").over(journey_window))
    .withColumn("origin_port", F.col("PORT"))
    .filter(F.col("destination_port").isNotNull()) # Remove last-known journeys
    .filter(F.col("origin_port") != F.col("destination_port")) 
)

# 3. Compute the O/D matrix!
od_matrix = (
    journeys
    .groupBy("origin_port", "destination_port")
    .count()
    .orderBy(F.col("count").desc())
)

od_matrix.display()

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.