In [1]:
app_name = "getShortestDistance"

In [2]:
import findspark
findspark.init()

from pyspark import SparkConf
import pyspark.sql.functions as f
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
from pyspark.sql.window import Window
import pandas as pd
import seaborn as sns
import seaborn.objects as so
import math
from pyspark.sql.functions import udf
from datetime import datetime, timedelta
import folium
from folium.plugins import PolyLineTextPath

spark = SparkSession.builder\
      .master("local[2]")\
      .appName(app_name)\
      .config("spark.driver.memory", "4g")\
      .config("spark.network.timeout", "600s")\
      .config("spark.rpc.askTimeout", "600s")\
      .config("spark.task.maxFailures", "8")\
      .config("spark.speculation", "true")\
      .getOrCreate() 

In [49]:
def clean_name(col):
    """
    Cleans a column name (in string format) by:
      1. changing them to lowercase
      2. removing unwanted symbols
      3. replacing spaces with the "_" symbol
    """
    return col.lower().replace("# ", "").replace(" ", "_")

def clean_names(df):
    """
    Cleans the column names of a PySpark dataframe by:
      1. changing them to lowercase
      2. removing unwanted symbols
      3. replacing spaces with the "_" symbol
    """
    return df.toDF(*[clean_name(c) for c in df.columns])

def haversine(lon1, lat1, lon2, lat2):
    """
    Takes two sets of coordinates and returns the distance between them in kilometers, using the Harvesine formula.
    """
    # Convert decimal degrees to radians (as trigonometry functions in Python use radians, not degrees)
    lon1, lat1, lon2, lat2 = map(math.radians, [lon1, lat1, lon2, lat2])

    # Haversine formula
    dlon = lon2 - lon1
    dlat = lat2 - lat1
    a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2
    c = 2 * math.asin(math.sqrt(a))
    earth_radius = 6371  # Radius of the Earth in kilometers
    return c * earth_radius

haversine_udf = udf(haversine, DoubleType())

def get_data_for_row(smallest_distances, row_nr):
    """
    Takes the selected row from the smallest distances dataframe, and uses the values to read a corresponding csv file.
    The data is then filtered for the 20 minute interval around the encounter for the two vessels, and transformed for
    noise examination.
    """
    row = smallest_distances.loc[row_nr]
    selected_timestamp1, selected_timestamp2 = row["timestamp1"], row["timestamp2"]
    interval_start, interval_end = selected_timestamp1 - timedelta(minutes = 10), selected_timestamp1 + timedelta(minutes = 10)
    selected_date = str(selected_timestamp1)[0:10]

    csv_path = f"d:/tmp/zip_extracted/aisdk-{selected_date}.csv"
    vessel_ids = [row["mmsi1"], row["mmsi2"]]

    windowSpec = Window.partitionBy("mmsi").orderBy("timestamp")

    df = spark.read.option("header", "true").csv(csv_path)

    df = clean_names(df)\
        .filter(f.col("mmsi").isin(vessel_ids))\
        .dropDuplicates(subset = ["mmsi", "latitude", "longitude", "timestamp"])\
        .withColumn("latitude", f.col("latitude").cast("float"))\
        .withColumn("longitude", f.col("longitude").cast("float"))\
        .filter(haversine_udf("longitude", "latitude", f.lit(14.245000), f.lit(55.225000)) <= 50)\
        .withColumn("timestamp", f.to_timestamp(f.col("timestamp"), format="dd/MM/yyyy HH:mm:ss"))\
        .filter(f.col("timestamp").between(interval_start, interval_end))\
        .withColumn("prev_timestamp", f.lag("timestamp").over(windowSpec))\
        .withColumn("prev_latitude", f.lag("latitude").over(windowSpec))\
        .withColumn("prev_longitude", f.lag("longitude").over(windowSpec))\
        .filter(f.col("prev_latitude").isNotNull() & f.col("prev_longitude").isNotNull())\
        .withColumn("distance", haversine_udf("longitude", "latitude", "prev_longitude", "prev_latitude"))\
        .withColumn("hours_since_last_timestamp", (f.unix_timestamp("timestamp") - f.unix_timestamp("prev_timestamp").cast("long"))/3600)\
        .withColumn("speed_km_h", f.col("distance")/f.col("hours_since_last_timestamp"))
    
    return df.toPandas(), vessel_ids, selected_timestamp1, selected_timestamp2

def add_trajectory(mymap, latitudes_vessel, longitudes_vessel, name_vessel, timestamps_vessel, color):
    """
    Adds a vessel trajectory to a folium map, with the first and last timepoints for the vessel being marked.
    """
    trajectory_points_vessel = list(zip(latitudes_vessel, longitudes_vessel))
    folium.PolyLine(
        trajectory_points_vessel,
        color=color,   
        weight=2,
        opacity=0.8
    ).add_to(mymap)

    for i in [0, len(timestamps_vessel)-1]:
        folium.Marker(
            location=[latitudes_vessel[i], longitudes_vessel[i]],
            popup=f'{name_vessel} - {timestamps_vessel[i]}',
            icon=folium.Icon(color=color)
        ).add_to(mymap)

In [4]:
smallest_distances = pd.read_csv("combined_results.csv").sort_values(by = "distance").reset_index(drop = True)

smallest_distances["timestamp1"] = pd.to_datetime(smallest_distances["timestamp1"], format="%d/%m/%Y %H:%M:%S")
smallest_distances["timestamp2"] = pd.to_datetime(smallest_distances["timestamp2"], format="%d/%m/%Y %H:%M:%S")
smallest_distances["time_difference_minutes"] = round((smallest_distances["timestamp1"]-smallest_distances["timestamp2"]).dt.total_seconds()/60, 2)

# Exclude differences where the timestamps differ by more than 1 minute
smallest_distances = smallest_distances[smallest_distances["time_difference_minutes"] <= 1].reset_index(drop=True)

In [5]:
smallest_distances

Unnamed: 0,mmsi1,mmsi2,distance,timestamp1,timestamp2,latitude1,longitude1,latitude2,longitude2,time_difference_minutes
0,111219512,265388000,0.000496,2021-12-13 08:45:30,2021-12-13 08:45:29,55.240045,14.207297,55.240043,14.20729,0.02
1,219021240,232018267,0.004076,2021-12-13 02:27:58,2021-12-13 02:27:29,55.223092,14.243683,55.223067,14.24373,0.48
2,219001468,219019015,0.006948,2021-12-13 06:18:26,2021-12-13 06:18:21,55.237343,14.23632,55.237288,14.236372,0.08
3,265523410,265675230,0.009306,2021-12-13 10:17:04,2021-12-13 10:17:03,55.312875,14.205518,55.312865,14.205372,0.02


In [52]:
df_pd, vessel_ids, selected_timestamp1, selected_timestamp2 = get_data_for_row(smallest_distances, row_nr=0)

Py4JJavaError: An error occurred while calling o430.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 22.0 failed 1 times, most recent failure: Lost task 0.0 in stage 22.0 (TID 52) (host.docker.internal executor driver): java.io.FileNotFoundException: C:\Users\Ugne\AppData\Local\Temp\blockmgr-b3bd8366-37e0-4161-95a7-d100dd75c3da\21\shuffle_6_52_0.index.8c0bbd89-3eca-4c66-b753-24b8e7a9f041 (The system cannot find the path specified)
	at java.io.FileOutputStream.open0(Native Method)
	at java.io.FileOutputStream.open(Unknown Source)
	at java.io.FileOutputStream.<init>(Unknown Source)
	at java.io.FileOutputStream.<init>(Unknown Source)
	at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeMetadataFile(IndexShuffleBlockResolver.scala:455)
	at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeMetadataFileAndCommit(IndexShuffleBlockResolver.scala:390)
	at org.apache.spark.shuffle.sort.io.LocalDiskShuffleMapOutputWriter.commitAllPartitions(LocalDiskShuffleMapOutputWriter.java:118)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:141)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.io.FileNotFoundException: C:\Users\Ugne\AppData\Local\Temp\blockmgr-b3bd8366-37e0-4161-95a7-d100dd75c3da\21\shuffle_6_52_0.index.8c0bbd89-3eca-4c66-b753-24b8e7a9f041 (The system cannot find the path specified)
	at java.io.FileOutputStream.open0(Native Method)
	at java.io.FileOutputStream.open(Unknown Source)
	at java.io.FileOutputStream.<init>(Unknown Source)
	at java.io.FileOutputStream.<init>(Unknown Source)
	at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeMetadataFile(IndexShuffleBlockResolver.scala:455)
	at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeMetadataFileAndCommit(IndexShuffleBlockResolver.scala:390)
	at org.apache.spark.shuffle.sort.io.LocalDiskShuffleMapOutputWriter.commitAllPartitions(LocalDiskShuffleMapOutputWriter.java:118)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:141)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)


For the vessel with the MMSI of 111219512, we cannot sure that the entry is not a noise, as there are two different entries at the same timestamp with different coordinates:

In [None]:
df_pd[df_pd["timestamp"].isin([selected_timestamp1, selected_timestamp2])][["timestamp", "mmsi", "name", "latitude", "longitude", "navigational_status", "distance", "hours_since_last_timestamp"]]

Unnamed: 0,timestamp,mmsi,name,latitude,longitude,navigational_status,distance,hours_since_last_timestamp
52,2021-12-13 02:27:29,219021240,KARIN HOEJ,55.223068,14.24373,Under way using engine,0.045365,0.003056
142,2021-12-13 02:27:29,232018267,MV SCOT CARRIER,55.223099,14.244907,Under way using engine,0.022198,0.001111
150,2021-12-13 02:27:58,232018267,MV SCOT CARRIER,55.223091,14.243683,Under way using engine,0.011463,0.001667


Moving on to the following vessel pair, we don't see the same indication of noise being present:

In [37]:
df_pd, vessel_ids, selected_timestamp1, selected_timestamp2 = get_data_for_row(smallest_distances, row_nr=1)

df_pd[df_pd["timestamp"].isin([selected_timestamp1, selected_timestamp2])][["timestamp", "mmsi", "name", "latitude", "longitude", "navigational_status", "distance", "hours_since_last_timestamp"]]

Unnamed: 0,timestamp,mmsi,name,latitude,longitude,navigational_status,distance,hours_since_last_timestamp
52,2021-12-13 02:27:29,219021240,KARIN HOEJ,55.223068,14.24373,Under way using engine,0.045365,0.003056
142,2021-12-13 02:27:29,232018267,MV SCOT CARRIER,55.223099,14.244907,Under way using engine,0.022198,0.001111
150,2021-12-13 02:27:58,232018267,MV SCOT CARRIER,55.223091,14.243683,Under way using engine,0.011463,0.001667


## Plotting the vessel trajectories

We see a collision of two vessels - 'KARIN HOEJ' and 'MV SCOT CARRIER'. The trajectory of KARIN HOEJ ends abruptly after the closest point, whereas MV SCOT CARRIER goes on to make a turn. A quick Google search confirms that such a collision [really did happen](https://www.gov.uk/government/news/scot-carrier-and-karin-hoej-report-published).

In [32]:
filtered_df = df_pd[df_pd["mmsi"] == str(vessel_ids[0])].reset_index(drop=True)
latitudes_vessel1 = list(filtered_df["latitude"])
longitudes_vessel1 = list(filtered_df["longitude"])
timestamps_vessel1 = list(filtered_df["timestamp"])
name_vessel1 = filtered_df["name"].loc[0]

In [33]:
filtered_df = df_pd[df_pd["mmsi"] == str(vessel_ids[1])].reset_index(drop=True)
latitudes_vessel2 = list(filtered_df["latitude"])
longitudes_vessel2 = list(filtered_df["longitude"])
timestamps_vessel2 = list(filtered_df["timestamp"])
name_vessel2 = filtered_df["name"].loc[0]

In [51]:
center_coords = (55.225000, 14.245000)
mymap = folium.Map(location=center_coords, zoom_start=13)
radius_km = 25*1000

folium.Marker(
    location=center_coords,
    popup='Center Point',
    icon=folium.Icon(color='green')
).add_to(mymap)

folium.Circle(
    location=center_coords,
    radius=radius_km,
    color='green',
    fill=True,
    fill_opacity=0.2
).add_to(mymap)

add_trajectory(mymap, latitudes_vessel1, longitudes_vessel1, name_vessel1, timestamps_vessel1, "red")
add_trajectory(mymap, latitudes_vessel2, longitudes_vessel2, name_vessel2, timestamps_vessel2, "blue")

mymap.save("trajectories.html")
mymap
