In [None]:
#!/usr/bin/env python
import pathlib, random, pandas as pd, geopandas as gpd
import matplotlib.pyplot as plt
import contextily as cx

# ── 1. PICK A FINISHED PARQUET DAY ───────────────────────────
PARQUET_DIR = pathlib.Path("~/ais_dk/parquet/2025").expanduser()
day_file = random.choice(list(PARQUET_DIR.rglob("2025-02-*.parquet")))
print(f"Plotting {day_file}")

# ── 2. LOAD + LIGHT DOWNSAMPLE (120 k points ≈ fast & detailed) ────────────
df = pd.read_parquet(day_file).sample(120_000, random_state=0)

# ── 3. BUILD A GEO DATAFRAME & REPROJECT TO EPSG:3857 ──────────────────────
gdf = gpd.GeoDataFrame(
    df,
    geometry=gpd.points_from_xy(df.longitude, df.latitude),
    crs="EPSG:4326",   # WGS-84
).to_crs(epsg=3857)    # Web-Mercator for contextily tiles

# ── 4. PLOTTING ────────────────────────────────────────────────────────────
fig, ax = plt.subplots(figsize=(9, 11))

# scatter: color by SOG, small dots, slight transparency
gdf.plot(
    ax=ax,
    column="sog",
    markersize=1.2,
    alpha=0.6,
    cmap="turbo",          # vibrant perceptual colormap
    legend=True,
    legend_kwds={"label": "Speed over ground (knots)", "shrink": 0.6},
)

# add OpenStreetMap tiles (zoom-adaptive)
cx.add_basemap(ax, crs=gdf.crs, source=cx.providers.OpenStreetMap.Mapnik)

ax.set_axis_off()
ax.set_title(f"AIS – {day_file.stem}", fontsize=14, weight="bold")
fig.tight_layout()

# make sure the figures/ folder exists
out_path = pathlib.Path("~/ais_dk/figures").expanduser()
out_path.mkdir(parents=True, exist_ok=True)

png = out_path / f"ais_{day_file.stem}.png"
fig.savefig(png, dpi=300)
print("✅  Saved", png)


In [None]:
#!/usr/bin/env python
"""
Crisp OSM basemap + ONLY the 10 busiest MMSI tracks
--------------------------------------------------
• Reads one Parquet “day” file
• Keeps the 10 MMSIs with the most rows
• Plots each track as coloured dots (Tab10 palette)
• Uses zoom-9 OSM tiles → sharp coastline detail
• Crops to Denmark (lon 7-16 °E, lat 53-58 °N)
"""

import pathlib
import pandas as pd
import geopandas as gpd
import matplotlib.pyplot as plt
import contextily as cx
import pyproj

# ── 1.  Parquet file you want to visualise ────────────────────────────────
DAY_FILE = pathlib.Path(
    "~/ais_dk/parquet/2025/2025-02-01/2025-02-01.parquet"
).expanduser()

# ── 2.  pull the 10 MMSIs with the most messages ───────────────────────────
df = pd.read_parquet(DAY_FILE)
top10 = df.mmsi.value_counts().head(10).index
df = df[df.mmsi.isin(top10)]                           # keep only those rows

# ── 3.  GeoDataFrame: WGS-84  →  Web-Mercator (EPSG:3857) ──────────────────
gdf_wgs = gpd.GeoDataFrame(
    df, geometry=gpd.points_from_xy(df.longitude, df.latitude), crs="EPSG:4326"
)
gdf = gdf_wgs.to_crs(epsg=3857)

# ── 4.  bounding box for Denmark (EPSG:3857, in metres) ────────────────────
proj = pyproj.Transformer.from_crs(4326, 3857, always_xy=True).transform
x_min, y_min = proj(7.0, 53.0)     # SW corner
x_max, y_max = proj(16.0, 58.0)    # NE corner

# ── 5.  plot ───────────────────────────────────────────────────────────────
fig, ax = plt.subplots(figsize=(10, 12), dpi=150)      # dpi↑ for print clarity

palette = plt.cm.get_cmap("tab10", len(top10))
for i, mmsi in enumerate(top10):
    tr = gdf[gdf.mmsi == mmsi]
    ax.scatter(
        tr.geometry.x,
        tr.geometry.y,
        s=3,                      # small points → coherent track
        color=palette(i),
        alpha=0.8,
        label=str(mmsi),
        linewidths=0,
    )

# High-resolution OSM tiles (zoom 9 feels right for DK)
cx.add_basemap(
    ax,
    crs=gdf.crs,
    source=cx.providers.OpenStreetMap.Mapnik,
    zoom=9,
    attribution_size=6,
)

# crop & annotate
ax.set_xlim(x_min, x_max)
ax.set_ylim(y_min, y_max)
ax.set_title(f"AIS — top-10 MMSI tracks — {DAY_FILE.stem}", fontsize=14, weight="bold")
ax.legend(title="MMSI", fontsize=8, frameon=True, loc="upper left")
ax.set_axis_off()

fig.tight_layout()

# ── 6.  save ───────────────────────────────────────────────────────────────
out_dir = pathlib.Path("~/ais_dk/figures").expanduser()
out_dir.mkdir(parents=True, exist_ok=True)
out_png = out_dir / f"ais_top10_only_{DAY_FILE.stem}.png"
fig.savefig(out_png, dpi=300)
print("✅  saved:", out_png)


In [None]:
#!/usr/bin/env python
"""
visualize_clean_vs_raw.py

Load one Parquet “day” file of DMA AIS, apply the cleaning filters,
and produce a side-by-side before/after scatter of lat/lon over Denmark.
"""

import pathlib
import polars as pl
import matplotlib.pyplot as plt

# ── Config ──────────────────────────────────────────────────────────────
DAY = "2025-02-01"  # change to the date you want
RAW_FILE = pathlib.Path(f"~/ais_dk/parquet/2025/{DAY}/{DAY}.parquet").expanduser()
FIG_DIR  = pathlib.Path("~/ais_dk/figures").expanduser()
FIG_DIR.mkdir(parents=True, exist_ok=True)
OUT_PNG  = FIG_DIR / f"before_after_cleaning_{DAY}.png"

# ── 1. Load the raw data with Polars ─────────────────────────────────────
df = pl.read_parquet(RAW_FILE).to_pandas()

# ── 2. Apply cleaning filters ────────────────────────────────────────────
df_clean = df[
    (df.latitude.between(-90, 90)) &
    (df.longitude.between(-180, 180)) &
    ((df.latitude.abs() + df.longitude.abs()) > 0.1) &
    (df.sog.between(0, 65)) &
    (df.cog.between(0, 360)) &
    (df.heading.between(0, 360))
]

# ── 3. Prepare the plot ─────────────────────────────────────────────────
# Denmark bounding box in lon/lat
MIN_LON, MAX_LON = 7.0, 16.0
MIN_LAT, MAX_LAT = 53.0, 58.0

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 6), dpi=150)

# Raw data
ax1.scatter(df.longitude, df.latitude, s=1, color="gray", alpha=0.5)
ax1.set_title(f"Raw AIS Data ({DAY})")
ax1.set_xlim(MIN_LON, MAX_LON)
ax1.set_ylim(MIN_LAT, MAX_LAT)
ax1.set_xlabel("Longitude")
ax1.set_ylabel("Latitude")

# Cleaned data
ax2.scatter(df_clean.longitude, df_clean.latitude, s=1, color="tab:blue", alpha=0.5)
ax2.set_title(f"Cleaned AIS Data ({DAY})")
ax2.set_xlim(MIN_LON, MAX_LON)
ax2.set_ylim(MIN_LAT, MAX_LAT)
ax2.set_xlabel("Longitude")
ax2.set_ylabel("Latitude")

fig.tight_layout()

# ── 4. Save the figure ─────────────────────────────────────────────────
fig.savefig(OUT_PNG)
print("Saved before/after cleaning visualization to:", OUT_PNG)


In [None]:
# reconnect or reuse your existing DuckDB connection
import duckdb
con = duckdb.connect("ais.duckdb")

# recreate ais_raw with the correct names and added year/month/day
con.execute("""
CREATE OR REPLACE TABLE ais_raw AS
SELECT
  mmsi,
  "timestamp",
  latitude  AS lat,
  longitude AS lon,
  sog,
  cog,
  heading,
  ship_type,
  EXTRACT(year  FROM "timestamp") AS year,
  EXTRACT(month FROM "timestamp") AS month,
  EXTRACT(day   FROM "timestamp") AS day
FROM read_parquet('ais_dk/parquet/2025/*/*.parquet');
""")

# verify it worked
print(con.execute("DESCRIBE ais_raw").fetchall())
print(con.execute("SELECT COUNT(*) FROM ais_raw").fetchone())


In [None]:
import pathlib

out_dir = pathlib.Path("partitioned_ais")
out_dir.mkdir(exist_ok=True)

con.execute(f"""
COPY ais_raw 
TO '{out_dir}/' 
(FORMAT PARQUET, PARTITION_BY (year, month, day));
""")

In [None]:
import duckdb, pathlib

# 1) Connect
con = duckdb.connect("data/ais.duckdb")  # optional: keep the catalog inside `data/`

# 2) Load & derive partitions from your moved raw files
raw_pattern = "data/ais_dk/parquet/2025/*/*.parquet"
con.execute(f"""
CREATE OR REPLACE TABLE ais_raw AS
SELECT
  mmsi,
  "timestamp",
  latitude AS lat,
  longitude AS lon,
  sog,
  cog,
  heading,
  ship_type,
  EXTRACT(year  FROM "timestamp") AS year,
  EXTRACT(month FROM "timestamp") AS month,
  EXTRACT(day   FROM "timestamp") AS day
FROM read_parquet('{raw_pattern}');
""")

# 3) Write out Hive-style, year/month/day partitions under data/partitioned_ais
out_dir = pathlib.Path("data/partitioned_ais")
out_dir.mkdir(parents=True, exist_ok=True)

con.execute(f"""
COPY ais_raw
TO '{out_dir}/'
(FORMAT PARQUET, PARTITION_BY (year, month, day));
""")

# 4) Verify
print("Partitions written to:", out_dir)
for p in sorted(out_dir.rglob("*.parquet"))[:5]:
    print(" ", p)


# This is the cleaner v1

In [None]:
import duckdb, pathlib

# 1) Connect to (or create) your DuckDB catalog
con = duckdb.connect("data/ais.duckdb")

# 2) Build a cleaned table
con.execute(r"""
CREATE OR REPLACE TABLE ais_clean AS
WITH raw AS (
  SELECT * 
  FROM read_parquet('data/partitioned_ais/*/*/*/*.parquet')
),
dedup AS (
  -- remove exact duplicates of mmsi/timestamp/lat/lon
  SELECT DISTINCT
    mmsi, timestamp, lat, lon, sog, cog, heading, ship_type,
    year, month, day
  FROM raw
),
valid_geo AS (
  -- drop nulls & out‐of‐bounds coords
  SELECT *
  FROM dedup
  WHERE lat  BETWEEN -90  AND  90
    AND lon  BETWEEN -180 AND 180
    AND lat IS NOT NULL
    AND lon IS NOT NULL
),
with_lag AS (
  -- grab the previous point per vessel
  SELECT
    *,
    LAG(lat)       OVER (PARTITION BY mmsi ORDER BY timestamp) AS lat_prev,
    LAG(lon)       OVER (PARTITION BY mmsi ORDER BY timestamp) AS lon_prev,
    LAG(timestamp) OVER (PARTITION BY mmsi ORDER BY timestamp) AS ts_prev
  FROM valid_geo
),
with_speed AS (
  -- compute Haversine‐distance / time to get knots
  SELECT
    *,
    CASE
      WHEN ts_prev IS NULL THEN NULL
      ELSE
        (
          2 * 3440.065
          * ASIN(
              SQRT(
                POWER(SIN((RADIANS(lat)      - RADIANS(lat_prev)) / 2), 2)
                + COS(RADIANS(lat_prev)) * COS(RADIANS(lat))
                  * POWER(SIN((RADIANS(lon) - RADIANS(lon_prev)) / 2), 2)
              )
            )
        )
        / (
            (EXTRACT(EPOCH FROM timestamp)
             - EXTRACT(EPOCH FROM ts_prev))
            / 3600.0
          )
    END AS speed_kt
  FROM with_lag
)
-- finally, keep only rows where speed ≤ 80 knots (or first‐point NULL speed)
SELECT
  mmsi, timestamp, lat, lon, sog, cog, heading, ship_type,
  year, month, day
FROM with_speed
WHERE speed_kt IS NULL OR speed_kt <= 80;
""")

# 3) Write cleaned, year/month/day–partitioned Parquet out
out_dir = pathlib.Path("data/cleaned_partitioned_ais")
out_dir.mkdir(parents=True, exist_ok=True)

con.execute(f"""
COPY ais_clean
TO '{out_dir}/'
(FORMAT PARQUET, PARTITION_BY (year, month, day));
""")

# 4) Quick sanity‐check: list a few output files
print("Written partitions:")
for p in sorted(out_dir.rglob("year=*/month=*/day=*/part-*.parquet"))[:6]:
    print(" ", p)


# This is the cleaner v2

In [None]:
import polars as pl
import pathlib
from math import radians, sin, cos, sqrt, asin

# 1) Define haversine distance in nautical miles
R_NM = 3440.065

def haversine_nm(lat1, lon1, lat2, lon2):
    """
    Compute great-circle distance between (lat1, lon1) and (lat2, lon2) in nautical miles.
    All inputs in decimal degrees.
    """
    φ1, φ2 = radians(lat1), radians(lat2)
    Δφ = radians(lat2 - lat1)
    Δλ = radians(lon2 - lon1)
    a = sin(Δφ/2)**2 + cos(φ1)*cos(φ2)*sin(Δλ/2)**2
    return 2 * R_NM * asin(sqrt(a))

# 2) Stateful filter: always compare to last accepted point
def filter_track(df: pl.DataFrame, speed_thresh=80.0) -> pl.DataFrame:
    """
    Given a Polars DataFrame for a single vessel (sorted by timestamp),
    returns a new DataFrame with outliers (speed > threshold) removed,
    comparing each fix against the last kept fix, preserving the original schema.
    """
    cols = df.columns
    kept = {col: [] for col in cols}
    last_lat = last_lon = None
    last_ts = None

    for row in df.rows():
        ts = row[cols.index("timestamp")]
        lat = row[cols.index("lat")]
        lon = row[cols.index("lon")]

        # decide if we accept this point
        if last_ts is None:
            accept = True
        else:
            dt_h = (ts - last_ts).total_seconds() / 3600.0
            if dt_h > 0:
                speed = haversine_nm(last_lat, last_lon, lat, lon) / dt_h
                accept = (speed <= speed_thresh)
            else:
                accept = False

        if accept:
            # record every column value
            for i, col in enumerate(cols):
                kept[col].append(row[i])
            last_ts, last_lat, last_lon = ts, lat, lon

    # build new DataFrame enforcing original schema
    if kept[cols[0]]:
        return pl.DataFrame(kept, schema=df.schema)
    else:
        # empty but same schema
        return pl.DataFrame({col: [] for col in cols}, schema=df.schema)

# 3) Iterate over daily partitions and apply cleaning
input_root = pathlib.Path("data/partitioned_ais")
output_root = pathlib.Path("data/cleaned_partitioned_ais")

for year_dir in sorted(input_root.glob("year=*")):
    for month_dir in sorted(year_dir.glob("month=*")):
        for day_dir in sorted(month_dir.glob("day=*")):
            files = list(day_dir.glob("*.parquet"))
            if not files:
                continue
            df = pl.read_parquet([str(f) for f in files])

            # a) dedupe
            df = df.unique(subset=["mmsi", "timestamp", "lat", "lon"])
            # b) drop nulls & out-of-bounds coords
            df = df.filter(
                pl.col("lat").is_not_null() & pl.col("lon").is_not_null() &
                pl.col("lat").is_between(-90, 90) & pl.col("lon").is_between(-180, 180)
            )
            # c) sort
            df = df.sort(["mmsi", "timestamp"])

            # d) group per vessel manually and filter each group
            cleaned_parts = []
            for m in df.get_column("mmsi").unique().to_list():
                grp = df.filter(pl.col("mmsi") == m).sort("timestamp")
                cleaned_grp = filter_track(grp, speed_thresh=80.0)
                if cleaned_grp.height > 0:
                    cleaned_parts.append(cleaned_grp)

            # e) concat or empty
            if cleaned_parts:
                cleaned = pl.concat(cleaned_parts, how="vertical")
            else:
                cleaned = pl.DataFrame({col: [] for col in df.columns}, schema=df.schema)

            # f) write back to cleaned partition structure
            out_dir = output_root / year_dir.name / month_dir.name / day_dir.name
            out_dir.mkdir(parents=True, exist_ok=True)
            out_path = out_dir / "part-0.parquet"
            cleaned.write_parquet(str(out_path))
            print(f"Cleaned {year_dir.name}/{month_dir.name}/{day_dir.name} -> {out_path}, rows: {cleaned.height}")


# This is the cleaner v3

In [None]:
import polars as pl
import pathlib
from math import radians, sin, cos, sqrt, asin

# --- PARAMETERS ---
SPEED_THRESH = 80.0    # max speed in knots
GAP_HOURS   = 6        # split track when gap > this (hours)

# Earth radius in nautical miles
R_NM = 3440.065

def haversine_nm(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
    """
    Great-circle distance between two points (degrees) in nautical miles.
    """
    φ1, φ2 = radians(lat1), radians(lat2)
    Δφ = radians(lat2 - lat1)
    Δλ = radians(lon2 - lon1)
    a = sin(Δφ/2)**2 + cos(φ1)*cos(φ2)*sin(Δλ/2)**2
    return 2 * R_NM * asin(sqrt(a))

# Stateful speed filter
def filter_track(df: pl.DataFrame, speed_thresh: float) -> pl.DataFrame:
    """
    Given a Polars DataFrame for one vessel (sorted by 'timestamp'),
    removes points where speed > threshold compared to last kept point.
    """
    cols = df.columns
    kept = {c: [] for c in cols}
    last_lat = last_lon = None
    last_ts = None

    for row in df.rows():
        ts = row[cols.index('timestamp')]
        lat = row[cols.index('lat')]
        lon = row[cols.index('lon')]

        if last_ts is None:
            accept = True
        else:
            dt_h = (ts - last_ts).total_seconds() / 3600.0
            accept = False
            if dt_h > 0:
                speed = haversine_nm(last_lat, last_lon, lat, lon) / dt_h
                accept = (speed <= speed_thresh)

        if accept:
            for i, c in enumerate(cols):
                kept[c].append(row[i])
            last_ts, last_lat, last_lon = ts, lat, lon

    if kept[cols[0]]:
        return pl.DataFrame(kept, schema=df.schema)
    else:
        return pl.DataFrame({c: [] for c in cols}, schema=df.schema)

# --- MAIN CLEANING LOOP WITH TRACK SPLITTING ACROSS DAYS ---
state = {}  # mmsi -> (last_ts: datetime, last_track_id: int)

input_root = pathlib.Path('data/partitioned_ais')
output_root = pathlib.Path('data/cleaned_partitioned_ais')

for year_dir in sorted(input_root.glob('year=*')):
    for month_dir in sorted(year_dir.glob('month=*')):
        for day_dir in sorted(month_dir.glob('day=*')):
            files = list(day_dir.glob('*.parquet'))
            if not files:
                continue
            df = pl.read_parquet([str(f) for f in files])

            # a) dedupe & bounds
            df = df.unique(subset=['mmsi','timestamp','lat','lon'])
            df = df.filter(
                pl.col('lat').is_not_null() & pl.col('lon').is_not_null() &
                pl.col('lat').is_between(-90, 90) & pl.col('lon').is_between(-180, 180)
            )
            df = df.sort(['mmsi','timestamp'])

            cleaned_parts = []
            for m in df['mmsi'].unique().to_list():
                grp = df.filter(pl.col('mmsi') == m).sort('timestamp')
                if grp.is_empty():
                    continue

                # retrieve previous state
                last_ts, offset = state.get(m, (None, 0))

                # 1) speed filter
                clean = filter_track(grp, SPEED_THRESH)
                if clean.is_empty():
                    continue

                # 2) split into tracks with offset, carrying over across days
                gap_flag = clean['timestamp'].diff() > pl.duration(hours=GAP_HOURS)
                clean = clean.with_columns([
                    gap_flag.cast(pl.Int32).alias('gap_int')
                ])
                clean = clean.with_columns([
                    (pl.col('gap_int').cum_sum() + offset).alias('track_id')
                ]).drop('gap_int')

                                # 3) update state for next day
                new_last_ts = clean['timestamp'].max()
                # handle potential None if track_id is missing
                raw_max = clean['track_id'].max()
                new_offset = int(raw_max) if raw_max is not None else offset
                state[m] = (new_last_ts, new_offset)

                cleaned_parts.append(clean)

            # concat all vessels for this day
            if cleaned_parts:
                cleaned = pl.concat(cleaned_parts, how='vertical')
            else:
                # empty DataFrame with track_id
                cols = df.columns + ['track_id']
                cleaned = pl.DataFrame({c: [] for c in cols})

            # write out
            out_dir = output_root / year_dir.name / month_dir.name / day_dir.name
            out_dir.mkdir(parents=True, exist_ok=True)
            cleaned.write_parquet(str(out_dir / 'part-0.parquet'))
            print(f"Processed {year_dir.name}/{month_dir.name}/{day_dir.name}: {cleaned.height} rows")


In [None]:
import polars as pl

# Option A: eager read of all files (OK if your dataset fits in memory)
# df = pl.read_parquet("data/cleaned_partitioned_ais/*/*/*/*.parquet")

# Option B: lazy scan + collect (lets you push down filters, etc., before materializing)
lf = pl.scan_parquet("data/cleaned_partitioned_ais/*/*/*/*.parquet")
df = lf.collect()

In [None]:
df['mmsi'].value_counts(sort=True)

# Random Track with ~20k posits

In [None]:
import polars as pl
import folium
from folium.plugins import MarkerCluster

# 1) filter and convert to Pandas
pdf = (
    df
    .filter(pl.col("mmsi") == 219031645)
    .select([pl.col("lat"), pl.col("lon")])
    .to_pandas()
)

# 2) center map on mean location
center = [pdf['lat'].mean(), pdf['lon'].mean()]
m = folium.Map(location=center, zoom_start=7)

# 3) draw the track as a line
coords = pdf[['lat','lon']].values.tolist()
folium.PolyLine(coords, color='crimson', weight=2.5, opacity=0.8).add_to(m)

# 4) add clustered points if you like
mc = MarkerCluster().add_to(m)
for _, row in pdf.iterrows():
    folium.CircleMarker(
        location=[row['lat'], row['lon']],
        radius=3,
        color='blue', fill=True
    ).add_to(mc)

# 5) display in Jupyter (or save to HTML)
m  # in a notebook cell this will render the map
# or: m.save("track_219031645.html")

# Track with 1st most posits

In [None]:
import polars as pl
import folium
from folium.plugins import MarkerCluster

# 1) filter and convert to Pandas
pdf = (
    df
    .filter(pl.col("mmsi") == 2579999)
    .select([pl.col("lat"), pl.col("lon")])
    .to_pandas()
)

# 2) center map on mean location
center = [pdf['lat'].mean(), pdf['lon'].mean()]
m = folium.Map(location=center, zoom_start=7)

# 3) draw the track as a line
coords = pdf[['lat','lon']].values.tolist()
folium.PolyLine(coords, color='crimson', weight=2.5, opacity=0.8).add_to(m)

# 4) add clustered points if you like
mc = MarkerCluster().add_to(m)
for _, row in pdf.iterrows():
    folium.CircleMarker(
        location=[row['lat'], row['lon']],
        radius=3,
        color='blue', fill=True
    ).add_to(mc)

# 5) display in Jupyter (or save to HTML)
m  # in a notebook cell this will render the map
# or: m.save("track_219031645.html")

# Track with the 2nd most posits

In [None]:
import polars as pl
import folium
from folium.plugins import MarkerCluster

# 1) filter and convert to Pandas
pdf = (
    df
    .filter(pl.col("mmsi") == 259027440)
    .select([pl.col("lat"), pl.col("lon")])
    .to_pandas()
)

# 2) center map on mean location
center = [pdf['lat'].mean(), pdf['lon'].mean()]
m = folium.Map(location=center, zoom_start=7)

# 3) draw the track as a line
coords = pdf[['lat','lon']].values.tolist()
folium.PolyLine(coords, color='crimson', weight=2.5, opacity=0.8).add_to(m)

# 4) add clustered points if you like
mc = MarkerCluster().add_to(m)
for _, row in pdf.iterrows():
    folium.CircleMarker(
        location=[row['lat'], row['lon']],
        radius=3,
        color='blue', fill=True
    ).add_to(mc)

# 5) display in Jupyter (or save to HTML)
m  # in a notebook cell this will render the map
# or: m.save("track_219031645.html")

In [None]:
single_track = df.filter(pl.col("mmsi") == 219028133)

In [None]:
single_track

In [None]:
import polars as pl

def add_velocity_knots(df: pl.DataFrame) -> pl.DataFrame:
    """
    Takes a Polars DataFrame with columns ['mmsi','timestamp','lat','lon',…]
    and returns a new DataFrame with an added 'velocity_knots' column,
    computed as the great‐circle speed (in knots) between each point
    and its predecessor for the same MMSI.
    """
    R_NM = 3440.065  # Earth radius in nautical miles

    return (
        df
        # 1) Ensure sorted by vessel and time
        .sort(["mmsi", "timestamp"])
        # 2) Grab the previous row’s lat/lon/timestamp per MMSI
        .with_columns([
            pl.col("lat").shift(1).over("mmsi").alias("lat_prev"),
            pl.col("lon").shift(1).over("mmsi").alias("lon_prev"),
            pl.col("timestamp").shift(1).over("mmsi").alias("ts_prev"),
        ])
        # 3) Build the haversine “a” term
        .with_columns([
            (
                ((pl.col("lat").radians() - pl.col("lat_prev").radians()) / 2).sin().pow(2)
                + pl.col("lat_prev").radians().cos()
                  * pl.col("lat").radians().cos()
                  * ((pl.col("lon").radians() - pl.col("lon_prev").radians()) / 2)
                    .sin()
                    .pow(2)
            ).alias("a")
        ])
        # 4) Compute distance in nautical miles and delta time in hours
        .with_columns([
            # dist_nm = 2 * R_NM * arcsin(sqrt(a))
            (2 * R_NM * pl.col("a").sqrt().arcsin()).alias("dist_nm"),
            # dt_hrs = (timestamp - ts_prev) cast to Int64 ns, divided to hours
            ((pl.col("timestamp").cast(pl.Int64) - pl.col("ts_prev").cast(pl.Int64))
             / 3.6e12).alias("dt_hrs"),
        ])
        # 5) Compute velocity in knots, null for first point or zero dt
        .with_columns([
            pl.when(pl.col("dt_hrs") > 0)
              .then(pl.col("dist_nm") / pl.col("dt_hrs"))
              .alias("velocity_knots")
        ])
        # 6) Drop intermediates
        .drop(["lat_prev", "lon_prev", "ts_prev", "a", "dist_nm", "dt_hrs"])
    )

In [None]:
single_track_with_vel = add_velocity_knots(single_track)

single_track_with_vel.filter(pl.col("velocity_knots") >= 80)