In [1]:
// Imports we'll need
import org.apache.spark.sql.SQLTypes
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.locationtech.geomesa.spark.jts._

// Register all udfs
SQLTypes.init(spark.sqlContext) 

println(sc.version)

Waiting for a Spark session to start...

2.3.0


# Load & filter data

In [2]:
// Load overall AIS dataframe 
val df = spark.read
  .format("geomesa")
  .option("fs.path","wasbs://data@geomesafsdsstorage.blob.core.windows.net/ais")
  .option("geomesa.feature", "AIS_2017_07_Zone15")
  .load()

df = [__fid__: string, MMSI: int ... 16 more fields]


[__fid__: string, MMSI: int ... 16 more fields]

In [3]:
// Show the schema
df.printSchema()

root
 |-- __fid__: string (nullable = false)
 |-- MMSI: integer (nullable = true)
 |-- BaseDateTime: timestamp (nullable = true)
 |-- LAT: float (nullable = true)
 |-- LON: float (nullable = true)
 |-- SOG: float (nullable = true)
 |-- COG: float (nullable = true)
 |-- Heading: float (nullable = true)
 |-- VesselName: string (nullable = true)
 |-- IMO: string (nullable = true)
 |-- CallSign: string (nullable = true)
 |-- VesselType: integer (nullable = true)
 |-- Status: string (nullable = true)
 |-- Length: float (nullable = true)
 |-- Width: float (nullable = true)
 |-- Draft: float (nullable = true)
 |-- Cargo: integer (nullable = true)
 |-- geom: point (nullable = true)



In [4]:
// Total number of records
df.count()

76915006

In [5]:
%Truncation off

// Filter for ships within 50km of Galveston on 7th July 2017 and show the query plan
val lon = -95.013398
val lat = 29.2335042
val dist = 50000
val ships = df
    .where(st_within($"geom", st_bufferPoint(st_makePoint(lon, lat), dist)))  
    .where($"BaseDateTime" > from_utc_timestamp(lit("2017-07-07 00:00:00"), "Z"))  
    .where($"BaseDateTime" < from_utc_timestamp(lit("2017-07-07 23:59:59"), "Z"))
ships.explain(true)


Output will NOT be truncated
== Parsed Logical Plan ==
'Filter ('BaseDateTime < from_utc_timestamp(2017-07-07 23:59:59, Z))
+- AnalysisBarrier
      +- Filter (BaseDateTime#2 > from_utc_timestamp(cast(2017-07-07 00:00:00 as timestamp), Z))
         +- Filter UDF(geom#17, if (isnull(50000.0)) null else UDF(UDF(-95.013398, 29.2335042), 50000.0))
            +- Relation[__fid__#0,MMSI#1,BaseDateTime#2,LAT#3,LON#4,SOG#5,COG#6,Heading#7,VesselName#8,IMO#9,CallSign#10,VesselType#11,Status#12,Length#13,Width#14,Draft#15,Cargo#16,geom#17] GeoMesaRelation(org.apache.spark.sql.SQLContext@1ced814c,SimpleFeatureTypeImpl AIS_2017_07_Zone15 identified extends Feature(MMSI:MMSI,BaseDateTime:BaseDateTime,LAT:LAT,LON:LON,SOG:SOG,COG:COG,Heading:Heading,VesselName:VesselName,IMO:IMO,CallSign:CallSign,VesselType:VesselType,Status:Status,Length:Length,Width:Width,Draft:Draft,Cargo:Cargo,geom:geom),StructType(StructField(__fid__,StringType,false), StructField(MMSI,IntegerType,true), StructField(BaseDateTim

lon = -95.013398
lat = 29.2335042
dist = 50000
ships = [__fid__: string, MMSI: int ... 16 more fields]


[__fid__: string, MMSI: int ... 16 more fields]

In [6]:
// Cache for performance 
ships.cache()
ships.count()

269899

In [7]:
%%dataframe --limit 1000
ships

__fid__,MMSI,BaseDateTime,LAT,LON,SOG,COG,Heading,VesselName,IMO,CallSign,VesselType,Status,Length,Width,Draft,Cargo,geom
6b5817a98fe2ec5eb31d12aef7050b55,259104000,2017-07-07 00:00:35.0,28.93493,-95.3313,0.0,-195.4,136.0,CLIPPER FREEPORT,IMO9789154,LAZL7,1024.0,moored,,,,,POINT (-95.331298828125 28.9349308013916)
10a2503e5d80818a1fc369321f1feda9,477077800,2017-07-07 00:00:13.0,29.68052,-95.00389,0.0,152.3,82.0,COSCO GENOA,IMO9484326,VRKT3,1004.0,moored,249.51,32.25,12.6,73.0,POINT (-95.00389099121094 29.680519104003906)
4631da3ea0847a3dcb3efb1492c97711,366920340,2017-07-07 00:00:07.0,28.93658,-95.3337,0.1,-64.7,126.0,LINCOLN SEA,IMO9219006,WDH2997,1025.0,under way using engine,37.88,12.94,6.3,22.0,POINT (-95.33370208740234 28.936580657958984)
eb0bf5c72ef3c5dabc50d048f6f635f0,366995840,2017-07-07 00:01:16.0,29.71303,-95.05647,0.0,-193.0,359.0,BIG AL,,WDC2635,1025.0,under way using engine,17.19,6.7,,,POINT (-95.05647277832031 29.713029861450195)
3324b046c95686c3e6637420dad4c88d,367442390,2017-07-07 00:00:01.0,29.31331,-94.77897,0.0,-150.7,262.0,YELLOW ROSE,,WDF3650,50.0,under way using engine,21.12,5.6,,50.0,POINT (-94.77896881103516 29.313310623168945)
de2bcb87433a8d27ebb8d7b6e55f79ae,636013274,2017-07-07 00:00:11.0,29.11275,-94.53156,0.0,-113.6,161.0,NS LAGUNA,IMO9339325,A8LU9,1024.0,at anchor,248.96,43.83,14.9,80.0,POINT (-94.53156280517578 29.112749099731445)
9cc8f469481dc340a757cf0d413748fd,366971520,2017-07-07 00:00:07.0,28.89996,-95.37718,5.9,-174.7,233.0,JERRY W TICHENOR,IMO8992144,WDB9187,1025.0,under way using engine,20.12,9.1,,31.0,POINT (-95.37718200683594 28.899959564208984)
ffe3c2f92e94a69f2dc6892ac6d8be17,338173235,2017-07-07 00:00:57.0,29.55231,-95.05789,0.0,6.0,511.0,NEVER ENOUGH,,,1019.0,,11.06,,,,POINT (-95.05789184570312 29.552310943603516)
69259c2dafbce59619ea8ff54fd10471,367649360,2017-07-07 00:01:17.0,29.40211,-94.75014,0.0,-203.2,511.0,SAMMY CENAC,,WDH7565,1025.0,under way using engine,21.0,,,,POINT (-94.75013732910156 29.402109146118164)
f1726243fa805dfbb000159d657030a4,366760610,2017-07-07 00:01:05.0,29.36892,-94.88986,0.9,-163.5,511.0,ATLAS,IMO8964745,WCY8471,1025.0,under way using engine,26.51,10.08,,52.0,POINT (-94.88986206054688 29.368919372558594)


# Find Vessel of Interest

![vessel](https://photos.marinetraffic.com/ais/showphoto.aspx?photoid=3473160 "Yellow Rose")


In [8]:
// Filter for specific vessel of interest
val name = "YELLOW ROSE"
val interesting = ships.where($"VesselName" === name)
interesting.count()

name = YELLOW ROSE
interesting = [__fid__: string, MMSI: int ... 16 more fields]


1295

In [9]:
// Plot where vessel of interest has been

import org.locationtech.geomesa.jupyter._

val voi = L.DataFrameLayerPoint(interesting, "__fid__", L.StyleOptions("#000000", "#FF0000", 0.50))
val osm = L.WMSLayer("osm_auto:all", geoserverURL = "https://maps.heigit.org/osm-wms/service/")
val aoi = L.Circle(lon, lat, dist, L.StyleOptions("#000000", "#FFFF00", 0.15))

kernel.display.html(L.render(Seq(osm, aoi, voi), (lat, lon), 8))


voi = DataFrameLayerPoint([__fid__: string, MMSI: int ... 16 more fields],__fid__,StyleOptions(#000000,#FF0000,0.5),5.0)
osm = WMSLayer(osm_auto:all,,INCLUDE,#FF0000,https://maps.heigit.org/osm-wms/service/,Map(),0.6,true)
aoi = Circle(-95.013398,29.2335042,50000.0,StyleOptions(#000000,#FFFF00,0.15))


Circle(-95.013398,29.2335042,50000.0,StyleOptions(#000000,#FFFF00,0.15))

# Look for suspicious vessels

In [10]:
// Quantise locations of voi using geohashes
val precision = 35
val interestingGhs = interesting
    .select(st_geoHash($"geom", precision).as("gh"), $"VesselName")
    

// Similarly, quantise locations of all ships
val shipsGhs = ships
    .select(st_geoHash($"geom", precision).as("gh"), $"VesselName")

// Count occurrences of ships in proximity to our voi
// NOTE: have to use .as(...) since they have the same schema so column names clash
// TODO: should also add a time criterion
val suspects = shipsGhs.repartition(20).as("ships")
    .join(broadcast(interestingGhs.as("interesting")))
    .where($"ships.gh" === $"interesting.gh")
    .where($"ships.VesselName" =!= $"interesting.VesselName")  // don't want to include self
    .groupBy($"ships.VesselName")
    .count()
    .orderBy(desc("count"))

// For more precision, use something like this:
//    .where(st_within($"ships.geom", st_bufferPoint($"interesting.geom", near)))
// But need to look carefully at partitioning to avoid O(n^2) in the general case

precision = 35
interestingGhs = [gh: string, VesselName: string]
shipsGhs = [gh: string, VesselName: string]
suspects = [VesselName: string, count: bigint]


[VesselName: string, count: bigint]

In [11]:
%%dataframe --limit 100
suspects

VesselName,count
LONE STAR,552710
PILOT BOAT HOUSTON,285890
,6863
SALLY KIM IV,2269
PRINCESS ANN,1908
PIC,1578
AMELIA,1142
EXCALIBUR,1118
MISS ANGELEY,880
LADY CATHY,858
