# GPS Plotting Tools

In [None]:
from pynmeagps import NMEAMessage, haversine


def parse_nmea_latlon(msg: NMEAMessage):
    if msg.msgID == 'GGA' and  msg.lat != '':
        return (msg.lat, msg.lon)
    return None

def calc_distance_m(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
    '''Calculates distance in meters using haversine formula'''
    return haversine(lat1, lon1, lat2, lon2) * 1000


## Plot Live NMEA data

In [None]:
import socket
import time

from ipyleaflet import CircleMarker, LayerGroup, Map, Polyline, DivIcon, Marker
from pynmeagps import NMEAReader, NMEAMessage
from IPython.display import display

# Configuration
TCP_HOST = 'localhost'
TCP_PORT = 12345  # Change to your NMEA TCP port

# Map setup
INIT_CENTER = (51.0, 10.0)  # Default center (Germany)
m = Map(center=INIT_CENTER, zoom=14)
track_layer = LayerGroup()
m.add(track_layer)
m.layout.height = '600px'
display(m)

# Store all points for the polyline
track_points = []

# Store latest marker (as a Circle)
pos_marker = None

poly = None
first_point = True

sock = None
speed_marker = None
speed = 0
last_pos = None
last_time = 0
try:
    sock = socket.create_connection((TCP_HOST, TCP_PORT))
    sock_file = sock.makefile('r', encoding='ascii')
    while len(line := sock_file.readline()) > 0:
        nmea_str = line.strip()
        nmea: NMEAMessage = NMEAReader.parse(nmea_str)
        if nmea.msgID == 'GGA' and nmea.quality == 1:
            pos = (nmea.lat, nmea.lon)
            track_points.append(pos)

            # Remove previous circle marker
            if pos_marker:
                track_layer.remove(pos_marker)
            if speed_marker:
                track_layer.remove(speed_marker)

            current_time = time.time()
            if last_pos is not None:
                dist = calc_distance_m(last_pos[0], last_pos[1], pos[0], pos[1])
                print(dist)
                speed = dist / (current_time - last_time) * 3.6
            print(speed)
            last_time = current_time
            last_pos = pos

            # Add a new circle marker for the latest point
            text_icon = DivIcon(
                html=f'<div style="font-size:12px; background:white; padding:2px 4px; border-radius:4px;">{speed:.1f} km/h</div>',
                icon_size=(100, 30),
                icon_anchor=(0, -15)  # adjust to position text above the circle
            )
            speed_marker = Marker(location=pos, icon=text_icon)
            pos_marker = CircleMarker(location=pos, radius=4, color='blue', fill_color='blue', fill_opacity=0.8, weight=2)
            track_layer.add(pos_marker)
            track_layer.add(speed_marker)

            # Remove previous polyline
            if poly:
                track_layer.remove(poly)

            # Draw polyline for the whole track
            poly = Polyline(locations=track_points, color='red', fill=False, weight=2)
            track_layer.add(poly)

            # Center map only on first datapoint
            if first_point:
                m.center = pos
                first_point = False

except KeyboardInterrupt:
    print('Stopped by user.')
finally:
    if sock:
        try:
            sock.shutdown(socket.SHUT_RDWR)
        except Exception:
            pass
        sock.close()
        print('Socket closed.')

## Plot live PositionMessage stream

In [None]:
import time

from ipyleaflet import CircleMarker, DivIcon, LayerGroup, Map, Marker, Polyline
from IPython.display import display
from visionapi.common_pb2 import MessageType
from visionapi.sae_pb2 import PositionMessage
from visionlib.pipeline.consumer import RedisConsumer

# Configuration
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_STREAM = 'positionsource:self'
START_AT_STREAM_HEAD = False

# Map setup
INIT_CENTER = (51.0, 10.0)  # Default center (Germany)
m = Map(center=INIT_CENTER, zoom=14)
track_layer = LayerGroup()
m.add(track_layer)
m.layout.height = '600px'
display(m)

# Store all points for the polyline
track_points = []

# Store latest marker (as a Circle)
pos_marker = None

poly = None
first_point = True

with RedisConsumer(REDIS_HOST, REDIS_PORT, [REDIS_STREAM], start_at_head=START_AT_STREAM_HEAD) as consume:
    for _, proto_data in consume():
        if proto_data is None:
            print('No messages received')
            break
        pos_msg = PositionMessage()
        pos_msg.ParseFromString(proto_data)
        assert pos_msg.type == MessageType.POSITION, 'Stream does not contain POSITION messages'

        speed_marker = None
        speed = 0
        last_pos = None
        last_time = 0
        pos = (pos_msg.geo_coordinate.latitude, pos_msg.geo_coordinate.longitude)
        track_points.append(pos)

        # Remove previous circle marker
        if pos_marker:
            track_layer.remove(pos_marker)
        if speed_marker:
            track_layer.remove(speed_marker)

        current_time = time.time()
        if last_pos is not None:
            dist = calc_distance_m(last_pos[0], last_pos[1], pos[0], pos[1])
            print(dist)
            speed = dist / (current_time - last_time) * 3.6
        last_time = current_time
        last_pos = pos

        # Add a new circle marker for the latest point
        # text_icon = DivIcon(
        #     html=f'<div style="font-size:12px; background:white; padding:2px 4px; border-radius:4px;">{speed:.1f} km/h</div>',
        #     icon_size=(100, 30),
        #     icon_anchor=(0, -15)  # adjust to position text above the circle
        # )
        # speed_marker = Marker(location=pos, icon=text_icon)
        pos_marker = CircleMarker(location=pos, radius=4, color='blue', fill_color='blue', fill_opacity=0.8, weight=2)
        track_layer.add(pos_marker)
        # track_layer.add(speed_marker)

        # Remove previous polyline
        if poly:
            track_layer.remove(poly)

        # Draw polyline for the whole track
        poly = Polyline(locations=track_points, color='red', fill=False, weight=2)
        track_layer.add(poly)

        # Center map only on first datapoint
        if first_point:
            m.center = pos
            first_point = False

## Plot GPS log files

In [None]:
# Plot GPS track from a log file (NMEA sentences, one per line)
from datetime import datetime
from pathlib import Path
from typing import List, Tuple, NamedTuple
import re
import os
import time
import itertools

from ipyleaflet import CircleMarker, LayerGroup, Map, Polyline, DivIcon, Marker
from IPython.display import display
from pynmeagps import NMEAMessage, NMEAReader
from math import radians, sin, cos, sqrt, atan2

class GPSPoint(NamedTuple):
    timestamp: datetime
    lat: float
    lon: float

    def to_coord_tuple(self) -> Tuple[float, float]:
        return (self.lat, self.lon)

class Track(NamedTuple):
    source_file: Path
    gps_points: List[GPSPoint]

# Fast NMEA GGA parser (only supports GGA)
def parse_gga(nmea_str):
    if not nmea_str.startswith('$GPGGA'):
        return None
    parts = nmea_str.split(',')
    if len(parts) < 7:
        return None
    # Latitude
    lat_raw = parts[2]
    lat_dir = parts[3]
    lon_raw = parts[4]
    lon_dir = parts[5]
    quality = int(parts[6]) if parts[6].isdigit() else 0

    def dm_to_deg(dm, direction):
        if not dm or dm == '':
            return None
        if '.' not in dm:
            return None
        d, m = dm.split('.', 1)
        deg = int(d[:-2])
        min = float(d[-2:] + '.' + m)
        val = deg + min / 60
        if direction in ('S', 'W'):
            val = -val
        return val

    lat = dm_to_deg(lat_raw, lat_dir)
    lon = dm_to_deg(lon_raw, lon_dir)
    return {
        'msgID': 'GGA',
        'quality': quality,
        'lat': lat,
        'lon': lon
    }

def parse_file_into_track(log_file: Path) -> List[GPSPoint]:
    track_points = []
    with open(log_file, 'r', encoding='ascii') as f:
        for line in f:
            try:
                iso_str, nmea_str = line.strip().split(';')
                timestamp = datetime.fromisoformat(iso_str)
                nmea = parse_gga(nmea_str)
                if nmea is None:
                    continue
                if nmea['msgID'] == 'GGA' and nmea['quality'] == 1:
                    pos = GPSPoint(timestamp, nmea['lat'], nmea['lon'])
                    track_points.append(pos)
            except:
                pass
    return Track(source_file=log_file, gps_points=track_points)

def draw_track(track: Track, track_id: int, draw_endpoints: bool, draw_filenames: bool) -> LayerGroup:
    track_layer = LayerGroup()
    poly = Polyline(locations=[p.to_coord_tuple() for p in track.gps_points], color='blue', fill=False, weight=2)
    track_layer.add(poly)
    if draw_endpoints:
        # Start marker with timestamp
        start_pos = track.gps_points[0].to_coord_tuple()
        start_time = track.gps_points[0].timestamp.strftime('%H:%M:%S')
        start_icon = DivIcon(html=f'<div style="font-size:12px; background:#D8F0D8; padding:1px 1px; ">{track_id:02d} - {start_time}</div>', icon_size=(75, 18), icon_anchor=(0, -10))
        start_marker = CircleMarker(location=start_pos, radius=3, color='green', fill_color='green', fill_opacity=0.8, weight=2)
        start_time_marker = Marker(location=start_pos, icon=start_icon)
        track_layer.add(start_marker)
        track_layer.add(start_time_marker)
        # End marker with timestamp
        end_pos = track.gps_points[-1].to_coord_tuple()
        end_time = track.gps_points[-1].timestamp.strftime('%H:%M:%S')
        end_icon = DivIcon(html=f'<div style="font-size:12px; background:#ffd1d1; padding:1px 1px; ">{track_id:02d} - {end_time}</div>',icon_size=(75, 18),icon_anchor=(0, -10))
        end_marker = CircleMarker(location=end_pos, radius=3, color='red', fill_color='red', fill_opacity=0.8, weight=2)
        end_time_marker = Marker(location=end_pos, icon=end_icon)
        track_layer.add(end_marker)
        track_layer.add(end_time_marker)
    if draw_filenames:
        pos = track.gps_points[0].to_coord_tuple()
        filename_icon = DivIcon(html=f'<div style="font-size:12px; background:#fff5a0; padding:1px 1px; ">{track.source_file.name}</div>', icon_size=(150, 36), icon_anchor=(0, -10))
        filename_marker = Marker(location=pos, icon=filename_icon)
        track_layer.add(filename_marker)
    return track_layer


DIRECTORY = '/home/florian/Desktop/was-pod01/'
LOG_FILE_GLOB = '2025-08-19*gps*.log'
# DIRECTORY = '/home/florian/smb4k/INFRA-NAS/starwit/20-Computer-Vision/02-Raw-Data/WAS/2025-cw33_filtered/'
# LOG_FILE_GLOB = '*gps*.log'
LOG_FILES = sorted(Path(DIRECTORY).glob(LOG_FILE_GLOB))
DRAW_ENDPOINTS = True
SAVE_HTML = False
DRAW_FILENAMES = False

start_time = time.time()
checkpoint_time = start_time

tracks: List[Track] = []

for file in LOG_FILES:
    tracks.append(parse_file_into_track(file))

print(f'{len(LOG_FILES)} files parsed in {time.time() - checkpoint_time:.2f}s')
checkpoint_time = time.time()

# Calculate the mean latitude and longitude from all track points in all tracks
all_points = list(itertools.chain(*[t.gps_points for t in tracks]))
mean_lat = sum(p.lat for p in all_points) / len(all_points)
mean_lon = sum(p.lon for p in all_points) / len(all_points)
INIT_CENTER = (mean_lat, mean_lon)
m = Map(center=INIT_CENTER, zoom=13)
m.layout.height = '800px'

for idx, track in enumerate(tracks):
    if len(track.gps_points) > 0:
        m.add(draw_track(track, idx, DRAW_ENDPOINTS, DRAW_FILENAMES))

print(f'Tracks drawn in {time.time() - checkpoint_time:.2f}s')
total_points = sum(len(track.gps_points) for track in tracks)
print(f'total points {total_points}')

if SAVE_HTML:
    checkpoint_time = time.time()

    m.save(LOG_FILE_GLOB.split('*')[0] + '.html')

    print(f'html saved in {time.time() - checkpoint_time:.2f}s')

display(m)