In [1]:
from datetime import datetime
from dataclasses import dataclass



class Connection:
    def __init__(self, line, departure_time, arrival_time, start_latitude, start_longitude, end_latitude, end_longitude, end_stop):
        self.line: str = line
        self.departure_time: datetime = departure_time
        self.arrival_time: datetime = arrival_time
        self.start_latitude: float = start_latitude
        self.start_longitude: float = start_longitude
        self.end_latitude: float = end_latitude
        self.end_longitude: float = end_longitude
        self.end_stop: str = end_stop
        
    def __repr__(self):
        return f"Connection({self.line}, {self.departure_time.strftime('%H:%M:%S')} -> {self.arrival_time.strftime('%H:%M:%S')}, {self.end_stop})"




class BusStop:
    def __init__(self, name: str):
        self.name: str = name
        self.connections: dict[str, list[Connection]] = {}

    def add_connection(self, end_stop: str, connection: Connection):
        if end_stop not in self.connections:
            self.connections[end_stop] = [connection]
        else:
            self.connections[end_stop].append(connection)
    
    def __repr__(self):
        return f"BusStop({self.name}, {len(self.connections)} connections)"



In [2]:
import pandas as pd
from datetime import datetime

datetime_format = '%Y-%m-%d %H:%M:%S'


def convert_time(time: str) -> datetime:
    hour = int(time[:2])
    if hour >= 24:
        hour -= 24
        return datetime.strptime(f'2025-01-02 {hour:02}{time[2:]}', datetime_format)
    else:
        return datetime.strptime(f'2025-01-01 {time}', datetime_format)


def add_connection(graph: dict[str, BusStop], start_stop: str,
                   end_stop: str, connection: Connection):
    if start_stop not in graph:
        graph[start_stop] = BusStop(name=start_stop)
    if end_stop not in graph:
        graph[end_stop] = BusStop(name=end_stop)
    graph[start_stop].add_connection(end_stop, connection)


def load_csv_data(filename: str):
    df = pd.read_csv(filename)
    graph: dict[str, BusStop] = {}

    for index, row in df.iterrows():
        start_stop = row['start_stop']
        end_stop = row['end_stop']
        departure_time = convert_time(row['departure_time'])
        arrival_time = convert_time(row['arrival_time'])
        start_lat = row['start_stop_lat']
        start_lon = row['start_stop_lon']
        end_lat = row['end_stop_lat']
        end_lon = row['end_stop_lon']
        connection = Connection(
            line=row['line'],
            departure_time=departure_time,
            arrival_time=arrival_time,
            start_latitude=start_lat,
            start_longitude=start_lon,
            end_latitude=end_lat,
            end_longitude=end_lon,
            end_stop=end_stop
        )
        add_connection(graph, start_stop, end_stop, connection)
    
    return graph

In [62]:
graph = load_csv_data('mpk_indexed.csv')

  df = pd.read_csv(filename)


In [73]:
def find_earliest_connection(connections: list[Connection], current_time: datetime, previous_line=None, transfer_time=2) -> Connection:

    earliest = None
    earliest_departure = None
    
    for conn in connections:
        required_time = current_time
        if previous_line is not None and conn.line != previous_line:
            required_time = current_time + timedelta(minutes=transfer_time)
        
        if conn.departure_time >= required_time and (earliest is None or conn.departure_time < earliest_departure):
            earliest = conn
            earliest_departure = conn.departure_time
    
    return earliest

In [67]:
import heapq
from datetime import timedelta

In [68]:

def dijkstra(start_stop: str, end_stop: str, departure_time: str, graph: dict[str, BusStop], transfer_time: int = 2):

    if start_stop not in graph or end_stop not in graph:
        return None, []

    start_time = convert_time(departure_time)

    distances = {stop: float('inf') for stop in graph}
    previous = {stop: None for stop in graph}
    arrival_times = {stop: None for stop in graph}
    previous_lines = {stop: None for stop in graph}

    distances[start_stop] = 0
    arrival_times[start_stop] = start_time

    priority_queue = [(0, start_stop)]

    visited_nodes = 0
    visited_connections = 0

    while priority_queue:
        current_distance, current_stop = heapq.heappop(priority_queue)

        if current_stop == end_stop:
            break

        if current_distance > distances[current_stop]:
            continue

        visited_nodes += 1 

        current_time = arrival_times[current_stop]
        current_line = previous_lines[current_stop]

        for next_stop, connections in graph[current_stop].connections.items():
            visited_connections += 1  

            earliest_conn = find_earliest_connection(
                connections, 
                current_time, 
                previous_line=current_line, 
                transfer_time=transfer_time
            )

            if earliest_conn is None:
                continue

            travel_time = (earliest_conn.arrival_time - current_time).total_seconds() / 60

            if distances[current_stop] + travel_time < distances[next_stop]:
                distances[next_stop] = distances[current_stop] + travel_time
                previous[next_stop] = (current_stop, earliest_conn)
                arrival_times[next_stop] = earliest_conn.arrival_time
                previous_lines[next_stop] = earliest_conn.line

                heapq.heappush(priority_queue, (distances[next_stop], next_stop))

    print(f"Odwiedzone węzły: {visited_nodes}, odwiedzone krawędzie: {visited_connections}")

    route = []
    current_stop = end_stop
    while current_stop and previous[current_stop]:
        prev_stop, conn = previous[current_stop]
        route.append(conn)
        current_stop = prev_stop

    route.reverse()

    total_time = distances[end_stop] if distances[end_stop] != float('inf') else None
    return total_time, route


In [93]:
start = "Iwiny - rondo"
end = "Psie Pole (Rondo Lotników Polskich)"
departure = "11:25:00"

total_time, route = dijkstra(start, end, departure, graph)
print_path(total_time, route)


Odwiedzone węzły: 400, odwiedzone krawędzie: 1209
Całkowity czas podróży: 58.0 minut

Trasa:
1. Linia 110: odjazd 11:25:00, przyjazd 11:26:00 -> Vivaldiego
2. Linia 110: odjazd 11:26:00, przyjazd 11:28:00 -> Kajdasza
3. Linia 110: odjazd 11:28:00, przyjazd 11:30:00 -> Jagodzińska
4. Linia 110: odjazd 11:30:00, przyjazd 11:31:00 -> Malinowskiego
5. Linia 110: odjazd 11:31:00, przyjazd 11:32:00 -> Konduktorska
6. Linia 110: odjazd 11:32:00, przyjazd 11:33:00 -> Buforowa-Rondo
7. Linia 110: odjazd 11:33:00, przyjazd 11:34:00 -> BARDZKA (Cmentarz)
8. Linia 110: odjazd 11:34:00, przyjazd 11:35:00 -> Morwowa
9. Linia 110: odjazd 11:35:00, przyjazd 11:36:00 -> Krynicka
10. Linia 110: odjazd 11:36:00, przyjazd 11:38:00 -> Bardzka
11. Linia 110: odjazd 11:38:00, przyjazd 11:40:00 -> Kamienna
12. Linia 110: odjazd 11:40:00, przyjazd 11:41:00 -> Prudnicka
13. Linia 110: odjazd 11:41:00, przyjazd 11:42:00 -> Hubska (Dawida)
14. Linia 16 (przesiadka): odjazd 11:47:00, przyjazd 11:51:00 -> Kościuszk

In [None]:
import heapq
from datetime import datetime
from typing import Callable, List, Dict, Optional, Tuple

def astar(
    start_stop: str, 
    end_stop: str, 
    departure_time: str, 
    graph: dict[str, BusStop], 
    heuristic_function: Callable[[str, str, Dict[str, BusStop]], float],
    transfer_time: int = 2
):

    if start_stop not in graph:
        return None, []
    if end_stop not in graph:
        return None, []

    start_time = convert_time(departure_time)

    distances = {stop: float('inf') for stop in graph}
    previous = {stop: None for stop in graph}
    arrival_times = {stop: None for stop in graph}
    previous_lines = {stop: None for stop in graph}

    distances[start_stop] = 0
    arrival_times[start_stop] = start_time

    priority_queue = [(0 + heuristic_function(start_stop, end_stop, graph), start_stop)]

    visited_nodes = 0
    visited_connections = 0

    while priority_queue:
        current_f_score, current_stop = heapq.heappop(priority_queue)


        if current_stop == end_stop:
            break

        current_h_score = heuristic_function(current_stop, end_stop, graph)
        current_g_score = current_f_score - current_h_score
        
        if current_g_score > distances[current_stop]:
            continue

        visited_nodes += 1

        current_time = arrival_times[current_stop]
        current_line = previous_lines[current_stop]

        for next_stop, connections in graph[current_stop].connections.items():
            visited_connections += 1


            earliest_conn = find_earliest_connection(
                connections, 
                current_time, 
                previous_line=current_line, 
                transfer_time=transfer_time
            )

            if earliest_conn is None:
                continue

            travel_time = (earliest_conn.arrival_time - current_time).total_seconds() / 60

            if distances[current_stop] + travel_time < distances[next_stop]:
                distances[next_stop] = distances[current_stop] + travel_time
                previous[next_stop] = (current_stop, earliest_conn)
                arrival_times[next_stop] = earliest_conn.arrival_time
                previous_lines[next_stop] = earliest_conn.line

                next_h_score = heuristic_function(next_stop, end_stop, graph)
                heapq.heappush(priority_queue, (distances[next_stop] + next_h_score, next_stop))

    print(f"Odwiedzone węzły: {visited_nodes}, odwiedzone krawędzie: {visited_connections}")

    if distances[end_stop] == float('inf') or previous[end_stop] is None:
        print("Nie znaleziono trasy.")
        return None, []

    route = []
    current_stop = end_stop
    while current_stop and previous[current_stop]:
        prev_stop, conn = previous[current_stop]
        route.append(conn)
        current_stop = prev_stop

    route.reverse()

    total_time = distances[end_stop] if distances[end_stop] != float('inf') else None
    return total_time, route

def convert_time(time_str: str) -> datetime:
    today = datetime.today().replace(hour=0, minute=0, second=0, microsecond=0)
    time_parts = list(map(int, time_str.split(':')))
    
    if len(time_parts) == 2:
        time_parts.append(0)
        
    return today.replace(hour=time_parts[0], minute=time_parts[1], second=time_parts[2])

def zero_heuristic(current_stop: str, target_stop: str, graph: Dict[str, BusStop]) -> float:
    return 0



In [88]:
import math

import math

def euclidean_distance_heuristic(current_stop: str, target_stop: str, graph: Dict[str, BusStop]) -> float:
    try:
        # Znajdź pierwsze połączenie dla current_stop
        first_current_conn = None
        for next_stop, connections in graph[current_stop].connections.items():
            if connections:  # jeśli lista nie jest pusta
                first_current_conn = connections[0]
                break
        
        # Znajdź pierwsze połączenie dla target_stop
        first_target_conn = None
        for next_stop, connections in graph[target_stop].connections.items():
            if connections:  # jeśli lista nie jest pusta
                first_target_conn = connections[0]
                break
        
        # Sprawdź czy znaleziono połączenia
        if first_current_conn is None or first_target_conn is None:
            return 0
        
        # Oblicz odległość euklidesową
        dist = math.sqrt(
            (first_current_conn.start_latitude - first_target_conn.start_latitude)**2 + 
            (first_current_conn.start_longitude - first_target_conn.start_longitude)**2
        )
        
        # Konwersja do minut podróży
        return dist * 111 / 0.5 * 0.8  # Mnożnik 0.8 dla dopuszczalności
    
    except (KeyError, IndexError, AttributeError):
        return 0  # W przypadku błędu zwróć 0


def get_stop_coordinates(stop_name: str, graph: Dict[str, BusStop]) -> tuple[float, float] or None:
    """
    Pomocnicza funkcja zwracająca uśrednione współrzędne dla przystanku.
    """
    latitudes = []
    longitudes = []
    
    # Szukamy przystanku jako punktu startowego połączeń
    if stop_name in graph:
        for next_stop, connections in graph[stop_name].connections.items():
            for conn in connections:
                latitudes.append(conn.start_latitude)
                longitudes.append(conn.start_longitude)
    
    # Szukamy przystanku jako punktu końcowego połączeń
    for stop, bus_stop in graph.items():
        for next_stop, connections in bus_stop.connections.items():
            if next_stop == stop_name:
                for conn in connections:
                    latitudes.append(conn.end_latitude)
                    longitudes.append(conn.end_longitude)
    
    if not latitudes or not longitudes:
        return None
    
    # Zwróć średnie współrzędne
    return (sum(latitudes) / len(latitudes), sum(longitudes) / len(longitudes))


def haversine_heuristic(current_stop: str, target_stop: str, graph: Dict[str, BusStop]) -> float:
    """
    Heurystyka wykorzystująca wzór haversine do obliczenia odległości po powierzchni kuli ziemskiej.
    Zwraca szacowany czas podróży w minutach.
    """
    # Pobierz współrzędne obu przystanków
    current_coords = get_stop_coordinates(current_stop, graph)
    target_coords = get_stop_coordinates(target_stop, graph)
    
    if not current_coords or not target_coords:
        return 0
    
    # Przypisz współrzędne
    lat1, lon1 = current_coords
    lat2, lon2 = target_coords
    
    # Konwersja na radiany
    lat1_rad = math.radians(lat1)
    lon1_rad = math.radians(lon1)
    lat2_rad = math.radians(lat2)
    lon2_rad = math.radians(lon2)
    
    # Wzór haversine
    dlon = lon2_rad - lon1_rad
    dlat = lat2_rad - lat1_rad
    a = math.sin(dlat/2)**2 + math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(dlon/2)**2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
    
    # Promień Ziemi w km
    R = 6371.0
    distance = R * c
    
    # Zakładamy średnią prędkość pojazdu 30 km/h (0.5 km/min)
    travel_time = distance / 0.5
    
    # Mnożnik 0.9 zapewnia dopuszczalność heurystyki
    return travel_time * 0.9


def transfer_count_heuristic(current_stop: str, target_stop: str, graph: Dict[str, BusStop]) -> float:
    """
    Heurystyka szacująca minimalną liczbę przesiadek potrzebnych do dotarcia do celu.
    Daje wyższy priorytet trasom z bezpośrednimi połączeniami.
    """
    # Sprawdź, czy istnieje bezpośrednie połączenie
    if current_stop in graph and target_stop in graph[current_stop].connections:
        return 0  # Brak przesiadek
    
    # Jeśli nie ma bezpośredniego połączenia, szacujemy 1 przesiadkę
    # Zakładamy, że każda przesiadka zajmuje średnio 5 minut
    return 5


def combined_heuristic(current_stop: str, target_stop: str, graph: Dict[str, BusStop]) -> float:
    """
    Heurystyka łącząca odległość geograficzną z oszacowaniem liczby przesiadek.
    """
    # Oblicz odległość geograficzną
    distance_estimate = haversine_heuristic(current_stop, target_stop, graph)
    
    # Sprawdź liczbę przesiadek
    transfer_estimate = transfer_count_heuristic(current_stop, target_stop, graph)
    
    # Łączymy oba szacunki
    return distance_estimate + transfer_estimate


def average_speed_heuristic(current_stop: str, target_stop: str, graph: Dict[str, BusStop]) -> float:
    """
    Heurystyka uwzględniająca rzeczywistą średnią prędkość połączeń w sieci.
    """
    # Oblicz średnią prędkość wszystkich połączeń w sieci
    total_distance = 0
    total_time = 0
    connection_count = 0
    
    for stop, bus_stop in graph.items():
        for next_stop, connections in bus_stop.connections.items():
            for conn in connections:
                # Oblicz odległość geograficzną
                dist = math.sqrt((conn.end_latitude - conn.start_latitude)**2 + 
                                (conn.end_longitude - conn.start_longitude)**2) * 111  # km
                
                # Oblicz czas podróży w minutach
                time = (conn.arrival_time - conn.departure_time).total_seconds() / 60
                
                if time > 0:
                    total_distance += dist
                    total_time += time
                    connection_count += 1
    
    # Jeśli nie ma danych, użyj standardowej prędkości 30 km/h
    if connection_count == 0 or total_time == 0:
        avg_speed = 0.5  # km/min (30 km/h)
    else:
        avg_speed = total_distance / total_time  # km/min
    
    # Oblicz odległość między przystankami
    current_coords = get_stop_coordinates(current_stop, graph)
    target_coords = get_stop_coordinates(target_stop, graph)
    
    if not current_coords or not target_coords:
        return 0
    
    # Oblicz odległość po powierzchni Ziemi używając wzoru haversine
    lat1, lon1 = current_coords
    lat2, lon2 = target_coords
    
    lat1_rad = math.radians(lat1)
    lon1_rad = math.radians(lon1)
    lat2_rad = math.radians(lat2)
    lon2_rad = math.radians(lon2)
    
    dlon = lon2_rad - lon1_rad
    dlat = lat2_rad - lat1_rad
    a = math.sin(dlat/2)**2 + math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(dlon/2)**2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
    
    R = 6371.0  # Promień Ziemi w km
    distance = R * c
    
    # Szacowany czas podróży na podstawie średniej prędkości
    if avg_speed > 0:
        estimated_time = distance / avg_speed
    else:
        estimated_time = distance / 0.5  # Użyj domyślnej prędkości
    
    # Mnożnik 0.85 zapewnia dopuszczalność heurystyki
    return estimated_time * 0.85

In [92]:
start = "Iwiny - rondo"
end = "Psie Pole (Rondo Lotników Polskich)"
departure = "11:25:00"

total_time, route = astar(start, end, departure, graph, heuristic_function=euclidean_distance_heuristic)
print_path(total_time, route)


Rozpoczynam wyszukiwanie trasy z Iwiny - rondo do Psie Pole (Rondo Lotników Polskich) o godzinie 11:25:00
Odwiedzone węzły: 143, odwiedzone krawędzie: 483
Całkowity czas podróży: 61.0 minut

Trasa:
1. Linia 110: odjazd 11:25:00, przyjazd 11:26:00 -> Vivaldiego
2. Linia 110: odjazd 11:26:00, przyjazd 11:28:00 -> Kajdasza
3. Linia 110: odjazd 11:28:00, przyjazd 11:30:00 -> Jagodzińska
4. Linia 110: odjazd 11:30:00, przyjazd 11:31:00 -> Malinowskiego
5. Linia 110: odjazd 11:31:00, przyjazd 11:32:00 -> Konduktorska
6. Linia 110: odjazd 11:32:00, przyjazd 11:33:00 -> Buforowa-Rondo
7. Linia 110: odjazd 11:33:00, przyjazd 11:34:00 -> BARDZKA (Cmentarz)
8. Linia 110: odjazd 11:34:00, przyjazd 11:35:00 -> Morwowa
9. Linia 100 (przesiadka): odjazd 11:39:00, przyjazd 11:40:00 -> Złotostocka
10. Linia 100: odjazd 11:40:00, przyjazd 11:41:00 -> TARNOGAJ
11. Linia 100: odjazd 11:41:00, przyjazd 11:42:00 -> Gazowa
12. Linia 100: odjazd 11:42:00, przyjazd 11:44:00 -> Karwińska
13. Linia 5 (przesiadka

In [91]:
start = "Iwiny - rondo"
end = "Psie Pole (Rondo Lotników Polskich)"
departure = "11:25:00"

total_time, route = astar(start, end, departure, graph, heuristic_function=average_speed_heuristic)
print_path(total_time, route)


Rozpoczynam wyszukiwanie trasy z Iwiny - rondo do Psie Pole (Rondo Lotników Polskich) o godzinie 11:25:00


KeyboardInterrupt: 