In [None]:
import pandas as pd
import geopandas as gpd
import movingpandas as mpd
from hvplot import pandas
from datetime import timedelta, datetime
import folium
import warnings
import sys
warnings.filterwarnings('ignore')

print("Geopandas has version {}".format(gpd.__version__))
print("Movingpandas has version {}".format(mpd.__version__))

In [None]:
'''
This notebook takes raw AIS data as input, cleans the data and saves it to a parquet file.
Cleaning steps:
* Drop duplicates (AIS messages can be recorded multiple times by different stations, e.g. satellite, coastal station etc)
  Only the first registered message at a certain location is retained
* The data is split into trajectories, where each trajectory receives a unique ID. 
  A trajectory is split into sub-trajectories, when the observation gap between AIS messages exceeds 10min and if the resulting
  sub trajectory is longer than 100m
* Drop trajectories with 'hops' in the AIS messages (Sometimes the GPS location jumps inexplainably between two consecutive timesteps)
'''

In [None]:
# add paths for modules
sys.path.append('../visualization')
print(sys.path)

# import modules
import visualize

In [None]:
# read data from file
filename = '../../data/raw/AIS_04-09_2022/ais_202204.csv'
df = pd.read_csv(filename, delimiter=';', decimal='.')
n_messages = len(df)

# convert to geopandas df
gdf = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df.lon, df.lat), crs="EPSG:4326")
df = []  # free memory

# drop duplicate AIS data (reported by multiple stations)
gdf.drop_duplicates(subset = ['mmsi', 'lat', 'lon'],
                    keep = 'first', inplace=True)

In [None]:
# convert to trajectories
size = 200000  # set the number of AIS messages for processing: len(gdf) processes all
trajectories = mpd.TrajectoryCollection(gdf.iloc[0:size], traj_id_col='mmsi', 
                                        obj_id_col='mmsi', t='date_time_utc')

In [None]:
# add a trajectory splitter
split_trajectories = mpd.ObservationGapSplitter(trajectories).split(gap=timedelta(minutes=10), min_length=100)
print(f'Trajectory splitter split {len(trajectories)} trajectories into {len(split_trajectories)} sub-trajectories')

In [None]:
# drop trajectories with 'hops' due to corrupted AIS data
# We measure the speed of a vessel between consecutive points. If the speed exceeds a certain threshold we discard the trajectory
split_trajectories.add_speed()  # calculate speed
speed_thresh = 500 / 3.6  # speed in m/s
split_gdf = split_trajectories.to_point_gdf()
bad_track_ids = split_gdf[split_gdf.speed > speed_thresh]['mmsi'].unique()  # IDs that violate the threshold
valid_track_ids = list(set(split_gdf.mmsi.unique()) - set(bad_track_ids))  # IDs that satisfy the threshold
split_trajectories = split_trajectories.filter('mmsi', valid_track_ids)  # retain valid trajectories
print(f'{len(bad_track_ids)} trajectories were found that exceed the speed limit and dropped from the list of trajectories')

In [None]:
# report about cleaning
n_retained = len(split_trajectories.to_point_gdf())
print(f'Cleaning reduced {n_messages} AIS messages to {n_retained} points ({n_retained/n_messages*100:.2f}%)')

In [None]:
# save to file
split_trajectories.to_traj_gdf().to_parquet('../../data/processed/202204_trajectories_stavanger_cleaned_200k.parquet')
split_trajectories.to_point_gdf().to_parquet('../../data/processed/202204_points_stavanger_cleaned_200k.parquet')