In [1]:
"Timetable parsing"

from tramnetwork import *

lookup_day = "20250128"

network = TramNetwork()
network.load_day(lookup_day)

loaded routes.txt
loaded transfers.txt
loaded stops.txt
loaded trips.txt


KeyboardInterrupt: 

In [2]:
import pygame
pygame.init()

pygame 2.0.1 (SDL 2.0.14, Python 3.9.13)
Hello from the pygame community. https://www.pygame.org/contribute.html


(7, 0)

In [3]:
# read strava gpx

strava_gpx_path = "assets/strava-28012025.gpx"

import xml.etree.ElementTree as ET
from xml.etree.ElementTree import Element
from dateutil import parser as dateparser

tree = ET.parse(strava_gpx_path)
root = tree.getroot()

class GpsPoint:

    def __init__(self, coords: tuple[float, float], time: datetime, elevation: float):
        self.coords = coords
        self.time = time
        self.elevation = elevation

    @classmethod
    def from_trkpt_element(cls, trkpt: Element) -> "GpsPoint":
        coords = trkpt.get("lat"), trkpt.get("lon")
        coords = tuple(float(c) for c in coords)
        time = datetime.fromisoformat(trkpt[1].text[:-1]) + timedelta(hours=1)
        elevation = float(trkpt[0].text)
        return cls(coords, time, elevation)
    
    def __repr__(self):
        return f"{self.__class__.__name__}({self.coords!r}, {self.time!r}, {self.elevation!r})"

point_elements = root.find("{http://www.topografix.com/GPX/1/1}trk").find("{http://www.topografix.com/GPX/1/1}trkseg")
gps_points = [GpsPoint.from_trkpt_element(element) for element in point_elements]

# because the gps data itself is too many points to draw all individually (>33811), we
# only consider every 10th point
gps_points = [p for i, p in enumerate(gps_points) if i % 10 == 0 or i == len(gps_points) - 1]

gps_point_times = [point.time for point in gps_points]
gps_point_coords = [point.coords for point in gps_points]

print(f"Loaded {len(gps_points)} gps points from {strava_gpx_path!r}")

Loaded 3383 gps points from 'assets/strava-28012025.gpx'


In [4]:
# show animation of noel riding a specific path

path = TramPath.load("ant.2025-01-28--9-06-18.txt", network)

import pygame.display
import pygame.time
import pygame.draw
import pygame.image
import pygame.transform
import pygame.font
import time, bisect
from pygame.math import Vector2

draw_player_opt = True
draw_player_path_opt = False
draw_plan_opt = True
draw_plan_path_opt = True

def get_curr_time(path: TramPath, t: float) -> datetime:
    path_start_time = min(path.arrival_times[0], gps_point_times[0])
    path_end_time = max(path.arrival_times[-1], gps_point_times[-1])
    return path_start_time + (path_end_time - path_start_time) * t

screen_size = (2000 // 3, 2216 // 3)
top_left_coords = (47.459425, 8.437827)
bottom_right_coords = (47.310020, 8.637158)
background_image = pygame.image.load("assets/karte3.png")
player_image = pygame.image.load("assets/tsp-pin-yellow.png")
plan_image = pygame.image.load("assets/plan-dot.png")
player_size_factor = 0.15
plan_size_factor = 0.04
animation_seconds = (get_curr_time(path, 1) - get_curr_time(path, 0)).total_seconds() / (10 * 60) # 60

start_time = time.time()
player_size_xy = player_image.get_size()[1] / player_image.get_size()[0]
player_size = (round(screen_size[0] * player_size_factor), round(screen_size[0] * player_size_factor * player_size_xy))
background_image = pygame.transform.smoothscale(background_image, screen_size)
player_image = pygame.transform.smoothscale(player_image, player_size)
plan_size_xy = plan_image.get_size()[1] / plan_image.get_size()[0]
plan_size = (round(screen_size[0] * plan_size_factor), round(screen_size[0] * plan_size_factor * plan_size_xy))
plan_image = pygame.transform.smoothscale(plan_image, plan_size)
percent_reverse_animation = 0.05

pygame.font.init()
cascadia_code = pygame.font.Font("C:\\Windows\\Fonts\\CascadiaCode.ttf", 50)

def interpolate_coords(c1: tuple[float], c2: tuple[float], t: float) -> tuple[float]:
    delta = (c2[0] - c1[0], c2[1] - c1[1])
    return (c1[0] + delta[0] * t, c1[1] + delta[1] * t)

def get_curr_plan_coords(path: TramPath, t: float, curr_time: datetime):
    "get coord from path at t ([0, 1]) from path"
    path_coords = []
    
    for i, (stop, ari, dep, curr_tram) in enumerate(path.zip()):
        if curr_time >= ari:
            path_coords.append(stop.coords)

        if dep is None or (curr_time >= ari and curr_time <= dep):
            return stop.coords, path_coords
        
        if i + 1 < len(path):
            next_stop = path.stops[i + 1]
            next_ari = path.arrival_times[i + 1]
            next_dep = path.departure_times[i + 1]
            next_tram = path.transportation_names[i + 1]

            if curr_time >= dep and curr_time <= next_ari:
                tram_t = (curr_time - dep) / (next_ari - dep)
                return interpolate_coords(stop.coords, next_stop.coords, tram_t), path_coords
        
    return top_left_coords, path_coords

def get_curr_player_coords(t: float, curr_time: datetime):
    index = bisect.bisect(gps_point_times, curr_time)
    if index <= 1 or index == len(gps_points):
        return gps_points[min(index, len(gps_points) - 1)].coords, gps_point_coords[:index]
    gps_p1 = gps_points[index - 1]
    gps_p2 = gps_points[index]
    sub_t = (curr_time - gps_p1.time) / (gps_p2.time - gps_p1.time)
    return interpolate_coords(gps_p1.coords, gps_p2.coords, sub_t), gps_point_coords[:index]

def px_from_coords(coords: tuple[float]) -> Vector2:
    lat_delta = top_left_coords[0] - bottom_right_coords[0]
    lon_delta = bottom_right_coords[1] - top_left_coords[1]
    relative_x = (coords[1] - top_left_coords[1]) / lon_delta
    relative_y = 1 - (coords[0] - bottom_right_coords[0]) / lat_delta
    return Vector2(float(relative_x * screen_size[0]), float(relative_y * screen_size[1]))

player_pos = Vector2(0, 0)
plan_pos = Vector2(0, 0)
curr_ingame_time = None
player_path_pos = []
plan_path_pos = []

def update():
    global curr_ingame_time
    t = ((time.time() - start_time) / animation_seconds) % 1

    curr_ingame_time = get_curr_time(path, t)

    curr_plan_coord, plan_path_coords = get_curr_plan_coords(path, t, curr_ingame_time)
    plan_pos.update(px_from_coords(curr_plan_coord))
    plan_path_pos.clear()
    for coord in plan_path_coords:
        plan_path_pos.append(px_from_coords(coord))
    plan_path_pos.append(plan_pos)
    
    curr_player_coord, player_path_coords = get_curr_player_coords(t, curr_ingame_time)
    player_pos.update(px_from_coords(curr_player_coord))
    player_path_pos.clear()
    for coord in player_path_coords:
        player_path_pos.append(px_from_coords(coord))
    player_path_pos.append(player_pos)

def draw_network(screen: pygame.Surface):
    # draw background image (map)
    screen.blit(background_image, (0, 0))

    # draw stops
    # overlay = pygame.Surface(screen_size, pygame.SRCALPHA)
    # for stop in network.stops:
    #     screen_pos = px_from_coords(stop.coords)
    #     pygame.draw.circle(overlay, (0, 0, 255, 100), screen_pos, 5)
    # screen.blit(overlay, (0, 0))

def draw_path(screen: pygame.Surface, path, color):
    for i in range(len(path) - 1):
        pos1, pos2 = path[i:i + 2]
        pygame.draw.line(screen, color, pos1, pos2, 5)

def draw_plan(screen: pygame.Surface):
    offset_pos = (plan_pos.x - plan_size[0] / 2, plan_pos.y - plan_size[1] / 2)
    screen.blit(plan_image, offset_pos)

def draw_player(screen: pygame.Surface):
    offset_pos = (player_pos.x - player_size[0] / 2, player_pos.y - player_size[1])
    screen.blit(player_image, offset_pos)

def draw_time(screen: pygame.Surface):
    time_str = curr_ingame_time.strftime("%H:%M")
    text_surface = cascadia_code.render(time_str, True, (0, 0, 0))
    screen.blit(text_surface, (screen_size[0] / 50, screen_size[0] / 50))

def draw(screen: pygame.Surface):
    draw_network(screen)
    draw_time(screen)

    if draw_plan_path_opt:
        draw_path(screen, plan_path_pos, (128, 128, 255))

    if draw_player_path_opt:
        draw_path(screen, player_path_pos, (255, 255, 0))

    if draw_plan_opt:
        draw_plan(screen)

    if draw_player_opt:
        draw_player(screen)

screen = pygame.display.set_mode(screen_size, pygame.SRCALPHA, 32)
clock = pygame.time.Clock()

pygame.display.init()
pygame.display.set_caption("ZH Tram Animation")
pygame.display.update()

running = True
while running:
    for event in pygame.event.get():
        if event.type == pygame.QUIT:
            running = False

    update()
    screen.fill((255, 255, 255))
    draw(screen)
    pygame.display.update()
    clock.tick(60)

pygame.display.quit()

In [None]:
# init dijkstra logic

import bisect

MAX_TRANSITION_SECONDS = 30 * 60
MIN_CHANGE_BUFFER_SECONDS = 30

class NoConnectionsLeftException(Exception):
    pass

def calc_dijkstra_path_to(start_time: datetime, start: TramStop, destination_criterium,
                          start_tram: TramName=None, weight_func=None) -> TramPath:
    visited_stops = set()
    start_connection = TramPath([start], [start_time], [None], [None])
    visit_stack = [(0, start_time, start_connection, start)]

    while len(visit_stack) > 0:
        _, curr_time, prev_stops, stop = visit_stack.pop(0)
        if stop in visited_stops:
            continue

        visited_stops.add(stop)
        if destination_criterium(stop):
            return prev_stops

        connections = stop.get_departures_after(curr_time)
        for connection in connections:
            wait_seconds = (connection.arrival_time - curr_time).total_seconds()
            
            if len(prev_stops) >= 2:
                last_tram_name = prev_stops.transportation_names[-2]
            else:
                last_tram_name = start_tram

            if last_tram_name is not None and connection.tram_name != last_tram_name:
                # it's a change of tram!
                if wait_seconds < MIN_CHANGE_BUFFER_SECONDS:
                    continue

            if wait_seconds >= MAX_TRANSITION_SECONDS:
                break

            new_stop = connection.stops[1]
            new_time = connection.arrival_times[1]
            if new_stop in visited_stops:
                continue

            prev_stops_copy = prev_stops.slice(0)
            prev_stops_copy.stop_departure_times[-1] = connection.departure_time
            prev_stops_copy.transportation_names[-1] = connection.tram_name

            prev_stops_copy.add_stop(new_stop, new_time, None, None)
            
            weight = new_time.timestamp()
            if weight_func is not None:
                weight += weight_func(new_stop)

            bisect.insort(visit_stack, (weight, new_time, prev_stops_copy, new_stop))
            
    raise NoConnectionsLeftException("Couldn't find connection in time")

def calc_dijkstra_path_between(start_time: datetime, start: TramStop, destination: TramStop,
                               start_tram: TramName=None, weight_func=None) -> TramPath:
    destination_criterium = lambda stop: stop == destination
    return calc_dijkstra_path_to(start_time, start, destination_criterium, start_tram=start_tram, weight_func=weight_func)

stop1 = network.search_stops("Albis")[0]
stop2 = network.search_stops("Flug")[0]
start_time = gt_parse_time("12:00:00", lookup_day)

connection = calc_dijkstra_path_between(start_time, stop1, stop2)
connection.print_summary()

Start: Zürich, Albisgütli
Destination: Zürich Flughafen, Bahnhof
Total Time: 0:52:18
Start Time: 2025-01-28 12:03:00
Total Stops: 35


In [6]:
"time dependant ant colony optimization"

# ant colony utility

import json
from functools import cache

precomputed_distance_matrix: dict[TramStop, dict[TramStop, float]] = {}
with open("graph-exports/graph2.txt", "r") as file:
    distances = json.load(file)["distance_matrix"]
    precomputed_distance_matrix = {
        stop1: {
            stop2: distances[i][j]
            for j, stop2 in enumerate(network.stops)
        }
        for i, stop1 in enumerate(network.stops)
    }

pheromone_map: dict[TramStop, dict[TramStop, float]] = None

def reset_pheromone_map():
    global pheromone_map
    pheromone_map = {
        stop1: {stop2: 1. for stop2 in network.stops}
        for stop1 in network.stops
    }

best_ant_path: TramPath = None

@cache
def get_path(t: datetime, a: TramStop, b: TramStop, c: TramName) -> TramPath:
    return calc_dijkstra_path_between(t, a, b, start_tram=c)

def calc_ant_colony_round(start_time: datetime, start_stop: TramStop, end_time: datetime, num_ants=50,
                          reset_pheromones=False, pheromone_strength=0.01, pheromone_decay_factor=0.9,
                          min_pheromone=0.1, distance_power=2):
    global best_ant_path
    if pheromone_map is None or reset_pheromones:
        reset_pheromone_map()

    def simulate_ant(ant_index: int) -> TramPath:
        curr_path = TramPath()
        curr_path.add_stop(start_stop, start_time)
        curr_tram = None
        decisions = []
        
        remaining_stops = set(network.stops) - set(curr_path.stops)
        while len(remaining_stops) > 0:
            curr_stop = curr_path.stops[-1]
            curr_time = curr_path.arrival_times[-1]

            available_stops = list(remaining_stops)
            available_weights = [
                pheromone_map[curr_stop][stop] * 100000
                / (precomputed_distance_matrix[curr_stop][stop] ** distance_power)
                for stop in available_stops
            ]

            new_stop = random_choice_distribution(available_stops, available_weights)

            try:
                if curr_time > end_time:
                    raise NoConnectionsLeftException("End time reached")
                
                new_path = get_path(
                    curr_path.arrival_times[-1],
                    curr_stop,
                    new_stop,
                    curr_tram
                )
            except NoConnectionsLeftException:
                return curr_path, decisions, False
            
            decisions.append((curr_stop, new_stop, curr_time))
            curr_path.add_path(new_path)
            curr_tram = curr_path.transportation_names[-2]

            for stop in new_path.stops:
                if stop in remaining_stops:
                    remaining_stops.remove(stop)

        return curr_path, decisions, True

    # simulate all the ants
    ant_paths: list[TramPath] = []
    ant_decision_list: list[list[tuple[TramStop, TramStop, datetime]]] = []
    ant_result_indicators: list[bool] = []

    for ant_index in range(num_ants):
        ant_result = simulate_ant(ant_index)
        if ant_result is not None:
            ant_path, ant_decisions, ant_success = ant_result
            ant_paths.append(ant_path)
            ant_decision_list.append(ant_decisions)
            ant_result_indicators.append(ant_success)

    # drop pheromones based on ant paths
    for ant_path, ant_decisions, success in zip(ant_paths, ant_decision_list, ant_result_indicators):
        if not ant_success:
            continue
        path_seconds = ant_path.time_delta().total_seconds()
        pheromone_drop = pheromone_strength * len(network.stops) / path_seconds
        for sstop, end_stop, _ in ant_decisions:
            pheromone_map[sstop][end_stop] += pheromone_drop

        if best_ant_path is None or path_seconds < best_ant_path.time_delta().total_seconds():
            best_ant_path = ant_path
            best_ant_path.save("ant2-night")

    # decay pheromones a bit 
    for stop1 in network.stops:
        for stop2 in network.stops:
            val = pheromone_map[stop1][stop2]
            pheromone_map[stop1][stop2] = max(min_pheromone, val * pheromone_decay_factor)

    return ant_paths

MIN_CHANGE_BUFFER_SECONDS = 30

In [7]:
# simulate ant colony optimization

import pygame.display
import pygame.time
import pygame.draw
import pygame.image
import pygame.transform
import pygame.font
import time, bisect, json, random
from pygame.math import Vector2

# setup screen size and map coordinates
screen_size = (2000 // 3, 2216 // 3)
top_left_coords = (47.459425, 8.437827)
bottom_right_coords = (47.310020, 8.637158)

# load background map image
background_image = pygame.image.load("assets/karte3.png")
background_image = pygame.transform.smoothscale(background_image, screen_size)

# init font
pygame.font.init()
cascadia_code = pygame.font.Font("C:\\Windows\\Fonts\\CascadiaCode.ttf", 50)

# setup ant colony optimization
precomputed_distance_matrix: dict[TramStop, dict[TramStop, float]] = {}
with open("graph-exports/graph2.txt", "r") as file:
    distances = json.load(file)["distance_matrix"]
    precomputed_distance_matrix = {
        stop1: {
            stop2: distances[i][j]
            for j, stop2 in enumerate(network.stops)
        }
        for i, stop1 in enumerate(network.stops)
    }

pheromone_map: dict[TramStop, dict[TramStop, float]] = None

def get_path_coords_at_time(path: TramPath, curr_time: datetime):
    "get coord from path at t ([0, 1]) from path"
    path_coords = []
    
    for i, (stop, ari, dep, _) in enumerate(path.zip()):
        if curr_time >= ari:
            path_coords.append(stop.coords)

        if dep is None or (curr_time >= ari and curr_time <= dep):
            return stop.coords, path_coords
        
        if i + 1 < len(path):
            next_stop = path.stops[i + 1]
            next_ari = path.arrival_times[i + 1]

            if curr_time >= dep and curr_time <= next_ari:
                tram_t = (curr_time - dep) / (next_ari - dep)
                return interpolate_coords(stop.coords, next_stop.coords, tram_t), path_coords
        
    return top_left_coords, path_coords

def reset_pheromone_map():
    global pheromone_map
    pheromone_map = {
        stop1: {stop2: 1. for stop2 in network.stops}
        for stop1 in network.stops
    }

# reset intial conditions
reset_pheromone_map()
best_ant_path: TramPath = None

# init aco variables
curr_colony_paths = []
num_ants = 10
pheromone_strength=50
pheromone_decay_factor=0.9
min_pheromone=0.1
distance_power=2
ant_speed = 10

FPS = 60

animation_minutes_per_second = 30

ant_global_t = 0

start_colony_time = gt_parse_time("09:00:00", lookup_day)
end_colony_time = gt_parse_time("23:00:00", lookup_day)
start_colony_stop = network.search_stops("Bahnhofplatz/HB")[0]
curr_colony_time = None
generation_index = 0

def px_from_coords(coords: tuple[float]) -> Vector2:
    lat_delta = top_left_coords[0] - bottom_right_coords[0]
    lon_delta = bottom_right_coords[1] - top_left_coords[1]
    relative_x = (coords[1] - top_left_coords[1]) / lon_delta
    relative_y = 1 - (coords[0] - bottom_right_coords[0]) / lat_delta
    return Vector2(float(relative_x * screen_size[0]), float(relative_y * screen_size[1]))

def update():
    global curr_colony_time

    if curr_colony_time is None:
        curr_colony_time = start_colony_time
        calc_ant_paths()
    elif curr_colony_time > end_colony_time:
        curr_colony_time = None
        update()
    else:
        time_inc = timedelta(minutes=animation_minutes_per_second / FPS)
        curr_colony_time += time_inc

def draw_colony_time(screen: pygame.Surface):
    time_str = curr_colony_time.strftime("%H:%M")
    text_surface = cascadia_code.render(time_str, True, (0, 0, 0))
    screen.blit(text_surface, (screen_size[0] / 50, screen_size[0] / 50))

pheromone_overlay = pygame.Surface(screen_size, pygame.SRCALPHA)

def draw_path(screen: pygame.Surface, path: list[tuple[float, float]], color: tuple[float, float, float, float]):
    overlay = pygame.Surface(screen_size, pygame.SRCALPHA)
    for i in range(len(path) - 1):
        pos1, pos2 = [px_from_coords(c) for c in path[i:i + 2]]
        pygame.draw.line(overlay, color, pos1, pos2, 5)
    screen.blit(overlay, (0, 0))

def recompute_pheromone_overlay(pheromone_color=[255, 255, 0, 0]):
    global pheromone_overlay
    pheromone_overlay = pygame.Surface(screen_size, pygame.SRCALPHA)

    max_pheromone_value = max(
        max(pheromone_map[stop1][stop2] for stop2 in network.stops)
        for stop1 in network.stops
    )
    
    def color_from_pheromone(value: float) -> tuple[float, float, float, float]:
        x = value / max_pheromone_value
        color = list(pheromone_color)
        color[3] = int(x * 200)
        return color

    for stop1 in network.stops:
        for stop2 in network.stops:
            value = pheromone_map[stop1][stop2]
            color = color_from_pheromone(value)
            pos1, pos2 = [px_from_coords(c) for c in (stop1.coords, stop2.coords)]
            pygame.draw.line(pheromone_overlay, color, pos1, pos2, 5)
    
    if best_ant_path is not None:
        draw_path(pheromone_overlay, [s.coords for s in best_ant_path.stops], (0, 0, 255, 100))

def draw(screen: pygame.Surface):
    screen.blit(background_image, (0, 0))
    screen.blit(pheromone_overlay, (0, 0))
    draw_colony_time(screen)

    if curr_colony_paths is None:
        return
    
    for ant_path in curr_colony_paths:
        curr_coords, _ = get_path_coords_at_time(ant_path, curr_colony_time)
        screen_pos = px_from_coords(curr_coords)
        pygame.draw.circle(screen, (0, 0, 0), screen_pos, 5)
    
def calc_ant_paths():
    global curr_colony_paths
    global generation_index

    print(f"calculating {num_ants} ant movements...", end="\r")
    generation_index += 1
    curr_colony_paths = calc_ant_colony_round(
        start_colony_time,
        start_colony_stop,
        end_colony_time,
        num_ants=num_ants,
        reset_pheromones=False,
        pheromone_strength=pheromone_strength,
        pheromone_decay_factor=pheromone_decay_factor,
        min_pheromone=min_pheromone,
        distance_power=distance_power
    )

    print(f"redrawing the pheromone map...          ", end="\r")
    recompute_pheromone_overlay()
    
    best_time = best_ant_path.time_delta() if best_ant_path is not None else None
    print(f"[G{generation_index:03}] Best Path Length: {best_time}")

def start():
    update()

    screen = pygame.display.set_mode(screen_size, pygame.SRCALPHA, 32)
    clock = pygame.time.Clock()

    pygame.display.init()
    pygame.display.set_caption("ZH Tram Ant Colony Optimization")
    pygame.display.update()

    running = True
    while running:
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                running = False

        update()
        screen.fill((255, 255, 255))
        draw(screen)
        pygame.display.update()
        clock.tick(FPS)

    pygame.display.quit()

reset_pheromone_map()
start()

[G001] Best Path Length: 12:37:42       
calculating 10 ant movements...

: 