<img src="images/logo.png" width="200">


# KogSys-KI-B - Assignment 1

### Search, Heuristic Search

_Submission Deadline: **25.05.2025**_

---



#### Submission Information

Upload your solution via the VC course. Please upload **one Zip** archive per group. This must contain:

- Your solution as **Notebook** (a `.inpynb` file)
- A folder named **images** with all your images, if you used any (keep the image sizes relatively small)

Your Zip file should be named as follows:

```
assignment_<assignment number>_solution_<group number>.zip
```

In this assignment, you can achieve a total of **20** points. From these points, **2 bonus points** for the exam will be calculated as follows:

| **Points in Assignment** | **Bonus Points for Exam** |
| :----------------------: | :-----------------------: |
|            20            |             2             |
|            15            |            1.5            |
|            10            |             1             |
|            5             |            0.5            |


<div class='alert alert-block alert-danger'>

##### **Important Notes**

1. **This assignment is graded. You can earn bonus points for the exam.**
2. **If it is evident that an assignment was copied from another source and no independent work was done, no bonus points will be awarded. Please formulate all answers in your own words!**
3. **If LLMs (such as ChatGPT or Copilot) were used to create your submission, please indicate this in accordance with common scientific practices. See also the [AI Policy in the VC Course](https://vc.uni-bamberg.de/mod/page/view.php?id=1980835)**

### Setup

To setup your assignment, you need to install the required dependencies, specified in `requirements.txt`. You can do this by executing the following code cell.

In [None]:
# Installs the required packages with the currently selected Python version
%pip install -U -r requirements.txt

#### Library imports

The following library imports may be used throughout this assignment. Do not use any other third-party libraries

- `typing.Any` and `typing.Callable`: Required for type hints in the method specifications.
- `pathlib.Path`. An object-oriented and platform-independent representation of file paths.
- `math`. Various mathematical functions such as _sqrt_ or _sin_.
- `dataclasses.dataclass`. Simple creation of immutable classes.
- `random`: These two libraries are used to set _seeds_ in order to keep the results of random generators consistent.
- `pandas` and `numpy`: You may use Pandas to import and adjust datasets
- `networkx`: This library is used to store Graph networks
- `matplotlib.pyplot`: This library is used to show images of the graphs

**_Nothing needs to be changed in the next code cell._**

In [None]:
# Python Core libraries
from typing import Any, Callable
import random
from pathlib import Path
import math
from dataclasses import dataclass

# Allowed Third-party libraries
import numpy as np
import pandas as pd
import networkx as nx
import matplotlib.pyplot as plt

random.seed(2025)

## Task 1 | Search

*for totally 8 points*

You are given a flow network representing the pipe system of a city. Each edge in the graph has a capacity indicating the maximum amount of water that can flow through that pipe per minute. There is one water `source` (node with index 0) and one `target` (node with highest index).

---

<details>
<summary><strong>Hint:</strong> How to access the graphs nodes and edges in networkx</summary>

[The documentation is availabe here.](https://networkx.org/documentation/stable/tutorial.html)

**Getting all data**
```python
# Getting the nodes
> list(G.nodes)
[0,1,2,3,4,5,6,7,8,9,10]
# Getting the edges
> list(G.edges)
[(0, 1),
 (0, 2),
 (1, 9),
 ...]
# Getting edges with the attached data
> list(G.edges(data=True))
[(0, 1, {'capacity': 57}),
 (0, 2, {'capacity': 41}),
 (1, 9, {'capacity': 59}),
 ...]
```

**Accessing neighbours**
```python
# Neighbours of node 0
> list(G[0]) # or list(G.neighbors(0)) or list(G.successors(0))
[1, 2]
```

**Accessing edges**
```python
# The data at edge 0 -> 1
> G[0][1] # or G.edges[0,1]
{'capacity': 57}
```

**Sample imperative implementation of a depth first search**
```python
def dfs(G: nx.DiGraph, source: int, target: int) -> list[int] | None:
    stack = [(source, [source])]
    while stack:
        (node, path) = stack.pop()
        for next in G.neighbors(node):
            if next in path:
                continue
            elif next == target:
                return path + [next]
            else:
                stack.append((next, path + [next]))
    return None
dfs(G, 0, 10)
# -> [0, 2, 4, 6, 10]
```

</details>

#### Flow Network Generation

In the following, we define a function that generates the random flow network that is used in this task.

**_Nothing needs to be changed in the next code cell._**

In [None]:
# Code for generating a flow network and plotting it

def generate_flow_network(num_nodes: int, num_edges: int, min_capacity: int, max_capacity: int, seed=2025) -> nx.Graph:
    """
    Generate a random directed flow network with specified parameters.
    
    Returns:
        A directed graph (DiGraph) with random capacities and costs on edges.
        The graph is read-only to prevent unintended modifications.
    """
    # Reset the random seed for reproducibility
    random.seed(seed)
    
    # Need to distribute the edges among the nodes, ensuring that each node has at least one incoming and one outgoing edge.
    distribute_amount = lambda x, n: [1 + ((remaining := x - n) // n) + (1 if i < remaining % n else 0) for i in range(n)] if n <= x else []
    degrees = distribute_amount(num_edges, num_nodes - 1)
    in_degree = [0] + random.sample(degrees, k=len(degrees))  # First node has no outgoing edges
    out_degree = random.sample(degrees, k=len(degrees)) + [0]  # Last node has no outgoing edges

    G = nx.DiGraph()
    G.add_nodes_from(range(num_nodes))
    
    # Add additional edges to ensure the graph has the desired number of edges
    for u in range(num_nodes):
        for _ in range(out_degree[u]): 
            # Select a random node to connect to, ensuring it is not the same as u and does not already have an edge with u
            try:
                v = random.choice([n for n in G.nodes() if n != u and not G.has_edge(u, n) and not G.has_edge(n, u) and in_degree[n] > G.in_degree(n)]) 
            except IndexError as e: # due to the randomness of the selection, it is possible that no node is available to connect to
                v = random.choice([n for n in G.nodes() if n != u and not G.has_edge(u, n) and not G.has_edge(n, u) and in_degree[n] != 0])
            capacity = random.randint(min_capacity, max_capacity)
            G.add_edge(u, v, capacity=capacity)

    return nx.restricted_view(G, [], [])

def plot_flow_network(G: nx.DiGraph, path: list[int] | None = None, size: tuple[float,float] = (10, 6)) -> None:
    """Plot the flow network using matplotlib and networkx.

    Args:
        G (nx.DiGraph): The flow network graph to plot.
        path (list[int] | None, optional): A path through the network to highlight. Defaults to None.
    """
    pos = nx.kamada_kawai_layout(G)
    edge_colors = 'black'
    if path:
        # Highlight the path in the graph
        path_edges = list(zip(path, path[1:]))
        edge_colors = ['blue' if (u, v) in path_edges else 'black' for u, v in G.edges()]
    nx.draw_networkx(G, pos, node_color=['green']+ ['pink']*(len(G.nodes)-2) + ['orange'], arrowsize=10, with_labels=True, edge_color=edge_colors)

    fig = plt.gcf()
    fig.set_size_inches(*size)

    plt.axis("off")
    plt.show()

G = generate_flow_network(11, 20, 30, 60, 2)
plot_flow_network(G)


#### **(01.1.1)** Path with maximum flow

_For **4** points_

Find the path with the maximum flow through the network from the `source` to the `target`. Which algorithm that was presented in the lectures would you use for this task, and why?

Write out your approach before implementing it in Python.



>
> Please enter your answer here.
> 

In [None]:
# Implementation of task 01.1.1

def path_with_maximum_capacity(G: nx.DiGraph, source: int, target: int) -> list[int]:
    """
    Find the path with maximum capacity from source to target in the flow network.

    Args:
        G (nx.DiGraph): The flow network graph.
        source (int): The source node.
        target (int): The target node.

    Returns:
        list[int]: The path with maximum capacity from source to target.
    """
    # TODO: Implement this
    return []

path = path_with_maximum_capacity(G, 0, 10)
plot_flow_network(G, path)

#### **(01.1.2)** Large network

_For **2** points_

In the following code cells, we have generated larger networks. What happens when you run your code on these networks? Explain why!

In [None]:
# You do not need to modify the code in this cell.
# You may however do some experiments with the parameters to see how your code behaves.

G_large = generate_flow_network(21, 80, 25, 60)

path_with_maximum_capacity(G_large, 0, 20)

In [None]:
G_huge = generate_flow_network(51, 200, 25, 60)

path_with_maximum_capacity(G_huge, 0, 10)

>
> Please enter your answer here.
> 


#### **(01.1.3)** Cost per liter

_For **2** points_

Suppose each pipe also had an associated cost per liter of water flowing through it, and you wanted to minimize the total cost while maximizing the flow.
How do you need to change in your implementation for a function `path_with_maximum_capacity_minimum_cost` to support this?

Write out your approach before implementing it in Python.

**Add costs to edges**

A function to add random costs to the graph.

**_Nothing needs to be changed in the next code cell._**

In [None]:
def graph_with_edge_costs(G: nx.DiGraph, min_cost: int, max_cost: int, seed: int = 2025) -> None:
    """
    Add random costs to the edges of the graph.

    Args:
        G (nx.DiGraph): The flow network graph.
        min_cost (int): Minimum cost for edges.
        max_cost (int): Maximum cost for edges.
    """
    random.seed(seed)
    G = nx.DiGraph(G)  # Create a copy of the graph, since it is read-only
    for u, v in G.edges():
        cost = random.randint(min_cost, max_cost)
        G[u][v]['cost'] = cost
    return nx.restricted_view(G, [], [])

G_with_costs = graph_with_edge_costs(G, 20, 50)

>
> Please enter your answer here.
> 


In [None]:
# Implementation of task 01.1.3

def path_with_maximum_capacity_minimum_cost(G: nx.DiGraph, source: int, target: int) -> list[int]:
    """
    Find the path with maximum capacity and minimum cost from source to target in the flow network.

    Args:
        G (nx.DiGraph): The flow network graph.
        source (int): The source node.
        target (int): The target node.

    Returns:
        list[int]: The path with maximum capacity and minimum cost from source to target.
    """
    
    # TODO: Implement this
    
    return []

path = path_with_maximum_capacity_minimum_cost(G_with_costs, 0, 10)
plot_flow_network(G, path)

## Task 2 | Heuristic Search with A*

_for totally 12 points_

Throughout this task, we will use the [GTFS.de](https://gtfs.de/de/feeds/) dataset for the German rail network. We aim to implement an A* algorithm that allows us to find the fastest connections.

<div class="alert-warning" style="padding: 1rem">
For simplification, we will ignore the different arrival and departure times in this task. We will only consider the stations and travel duration.

**_Data Model._** In the following cell, two classes are defined to represent the data.

- `Connection` represents a connection from one `Stop` to another `target_stop_id`. This connection has a travel duration of `travel_minutes`.
- `Stop` represents a stop with a unique `id`, a `name`, and `connections`. A stop has multiple _outgoing_ `Connections`. Additionally, `Stop` also stores the `lat` and `lon` coordinates of the stop.

**_No changes must be made to the next code cell._**

In [None]:
@dataclass(frozen=True)
class Connection:
    """A connection, from one stop to the target stop that takes some time."""

    target_stop_id: str
    travel_minutes: float


@dataclass
class Stop:
    """A single stop with outgoing connections."""

    id: str
    name: str
    lat: float
    lon: float
    connections: frozenset[Connection]

#### (01.2.1) Importing the Dataset

_For **2** points_

In the following cell, the dataset _Rail Passenger Transport Germany_ will be loaded. This dataset is located in the `task_2_data` folder and was downloaded from [GTFS.de](https://gtfs.de/de/feeds/) under the [Creative Commons 4.0](https://creativecommons.org/licenses/by/4.0/).

This dataset contains various files. All data is stored in CSV format ("Comma-Separated Values"). This format saves tabular data, where the values of a row are separated by commas. Each row corresponds to a record entry, and the first row typically contains the column headers.

To iterate over the rows of a CSV file in Python, the following code can be used:

```python
import csv

with open("stops.csv", encoding="utf-8") as f:
    reader = csv.DictReader(f)
    
    # or do whatever you want with the reader iterable
    map(lambda row: print(row), reader)
```


We only need two files from the dataset: `stops.csv` and `stop_times.csv`. Their content is briefly explained below.

The file `stops.csv` contains all available stops in the transit network. Each line describes a stop based on four essential attributes:
- `stop_id`: A unique identification number for the stop.
- `stop_name`: The name of the stop (e.g., "Central Station").
- `stop_lat`: The latitude of the stop (geographic coordinate).
- `stop_lon`: The longitude of the stop (geographic coordinate).

The file `stop_times.csv` contains the sequence in which stops are visited during a trip (`trip_id`), as well as the arrival and departure times. Each line describes a stop at a specific stop during a trip with the following information:
- `trip_id`: An ID that describes a specific trip.
- `arrival_time`: The arrival time at the stop (in `HH:MM:SS` format).
- `departure_time`: The departure time from the stop (also `HH:MM:SS`).
- `stop_id`: The ID of the stop.
- `stop_sequence`: The order of this stop within the trip.

_In fact, both files contain additional attributes, but these are not necessary for completing the task._

Now complete the provided code structure to load all `Stops` with their `Connections` from the dataset.


---

<details>
<summary>Tip for parsing the edges of the graph</summary>

To parse the edges of the graph from the data, you look at the consecutive entries within a trip (from the `stop_times.csv` file). Whenever one stop (`stop_id`) follows another stop (`stop_sequence`), a directed connection is created from the first stop to the second stop. The travel time is calculated as the difference between the departure time at the first stop and the arrival time at the second stop.

</details>

In [None]:
# Implementation of task 02.1.1
def parse_stops(base_dir: Path) -> list[Stop]:
    """Parses all stops from the given dataset."""
    # TODO: Implement here

    return []


# Load the stops from the dataset
stops = parse_stops(Path("./task_2_data"))

print(f"Successfully loaded {len(stops)} stops!")

<div class="alert-danger" style="padding: 1rem">
If you are unable to complete the previous task, run the following code cell to load the pre-prepared data and be able to work on the subsequent tasks.

In [None]:
import json
from pathlib import Path


def load_stops_from_json(file_path: Path) -> list[Stop]:
    """Loads data from the prepared failsafe data."""
    with open(file_path, encoding="utf-8") as f:
        data = json.load(f)

    return [
        Stop(
            **{
                **entry,
                "connections": [
                    Connection(**connection)
                    for connection in entry.get("connections", [])
                ],
            }
        )
        for entry in data
    ]


stops = load_stops_from_json(Path("./task_2_data/failsafe_data.json"))
print(f"Successfully loaded {len(stops)} stops (from failsafe data)!")


#### **(01.2.2)** A*

_For **5** points_

The complete A* algorithm should now be implemented in the function `a_star`.

**_Declaration of the heuristics._** In the following cell, two heuristics `h_zero` and `h_euclid` are defined.

**_No changes are required in the next code cell._**

In [None]:
def h_zero(source: Stop, goal: Stop) -> float:
    """
    A heuristic that always returns zero.

    :param source: The current node.
    :param goal: The goal node.
    """
    return 0

def h_euclid(source: Stop, goal: Stop) -> float:
    """
    A heuristic that uses the euclidean distance of the coordinate pairs.

    :param source: The current node.
    :param goal: The goal node.
    """
    dx = source.lat - goal.lat
    dy = source.lon - goal.lon

    return math.sqrt(dx * dx + dy * dy)

In the next cell, we have already defined the signature of the `a_star` function; your task is now to complete it. This function takes as its first parameter the heuristic to be used, which has already been provided. Additionally, it receives `start_id`, the `id` of the `Stop` from which the search starts, and `goal_id`, the `id` of the `Stop` for which a path should be found.

The return value of the function is a tuple that contains, in the first position, the chosen path and, in the second position, the total cost. The chosen path is, in turn, a list of tuples, where the first item is the `Stop` from which the second item, the `Connection`, departs.

You are, of course, allowed to define additional functions!

In [None]:
# Implementation of task 01.2.2
def a_star(heuristic: Callable[[Stop, Stop], float], start_id: str, goal_id: str, stops: list[Stop]) -> tuple[list[tuple[Stop, Connection]], float]:
    """
    Finds the shortest path to the given `goal_id`, starting from `start_id`.

    :param heuristic: The heuristic to use when searching for a path.
    :param start_id: The id of the stop to start the search from.
    :param goal_id: The id of the stop to search a path to.
    :param stops: The list of available stops.
    """
    # TODO: Implement the A* algorithm

    return [], float('inf')

**_Test calls._** You can use the following calls of the algorithm to test whether your implementation works correctly for the examples.

**_No changes need to be made to the next code cell._**

In [None]:
import time

def print_path(path: list[tuple[Stop, Connection]], total_minutes: float):
    """
    Helper function to print the result of a search in a nice way.

    :param path: The actual path, together with the used connections.
    :param time: The total minutes this path needs.
    """
    if not path:
        print("No path found.")
        return

    print("Path found:")
    for stop, connection in path:
        next_stop_name = next(s.name for s in stops if s.id == connection.target_stop_id)

        print(f"  * {stop.name} -> {next_stop_name} ({connection.travel_minutes} min)")

    print(f"\nTotal duration: {total_minutes:.1f} minutes\n")

def measured[A,**B](func: Callable[B, A]) -> Callable[B, A]:
    """
    Measures the time it takes to run a function.

    :param func: The function to measure.
    :param args: The arguments to pass to the function.
    :param kwargs: The keyword arguments to pass to the function.
    """
    def wrapper(*args: B.args, **kwargs: B.kwargs) -> A:
        start_time = time.perf_counter()
        result = func(*args, **kwargs)
        end_time = time.perf_counter()
        print(f"Time taken: {end_time - start_time:.5f} seconds")

        return result

    return wrapper



# Test Calls

# Aachen Hbf -> Amsterdam Centraal
print("A* with h_zero:")
path, total_minutes = measured(a_star)(heuristic=h_zero, start_id="525449", goal_id="351939", stops=stops)

print_path(path, total_minutes)
# Path found:
#  * Aachen Hbf -> Herzogenrath (14.0 min)
#  * Herzogenrath -> Geilenkirchen (6.0 min)
#  * Geilenkirchen -> Erkelenz (9.0 min)
#  * Erkelenz -> Rheydt Hbf (7.0 min)
#  * Rheydt Hbf -> Mönchengladbach Hbf (4.0 min)
#  * Mönchengladbach Hbf -> Venlo(Gr) (11.0 min)
#  * Venlo(Gr) -> 's-Hertogenbosch (35.0 min)
#  * 's-Hertogenbosch -> Utrecht Centraal (27.0 min)
#  * Utrecht Centraal -> Amsterdam Centraal (25.0 min)
#
# Total duration: 138.0 minutes

print() # Newline

# Aachen Hbf -> Amsterdam Centraal
print("A* with h_euclid:")
path, total_minutes = measured(a_star)(heuristic=h_euclid, start_id="525449", goal_id="351939", stops=stops)

print_path(path, total_minutes)
# Path found:
#  * Aachen Hbf -> Herzogenrath (14.0 min)
#  * Herzogenrath -> Geilenkirchen (6.0 min)
#  * Geilenkirchen -> Erkelenz (9.0 min)
#  * Erkelenz -> Rheydt Hbf (7.0 min)
#  * Rheydt Hbf -> Mönchengladbach Hbf (4.0 min)
#  * Mönchengladbach Hbf -> Venlo(Gr) (11.0 min)
#  * Venlo(Gr) -> 's-Hertogenbosch (35.0 min)
#  * 's-Hertogenbosch -> Utrecht Centraal (27.0 min)
#  * Utrecht Centraal -> Amsterdam Centraal (25.0 min)
#
# Total duration: 138.0 minutes


#### **(01.2.3)** Evaluation of the Heuristics

_For **2** points_

Again, consider the given heuristics. What is the intuition behind both provided heuristics? How does the A* algorithm behave when using `h_zero`, and why could `h_euclid` provide an improvement?

> 
> Please enter your answer here.
> 

#### **(01.2.4)** Custom Heuristic

_For **3** points_

Both provided heuristics are not particularly well-suited for working with coordinates (`lat` and `lon`). Therefore, research a suitable heuristic on your own and implement it in the following code cell. Also, explain why your heuristic is better than both of the given ones.

In [None]:
# Implementation of task 01.2.4
def h_custom(source: Stop, goal: Stop) -> float:
    # TODO: Implement custom heuristic
    return 0

> 
> Please enter your answer here.
> 

**_Test call._** The code in the following cell can be used to test the algorithm with your heuristic.

**_No changes are necessary in the next code cell._**

In [None]:
path, total_minutes = measured(a_star)(heuristic=h_custom, start_id="525449", goal_id="351939", stops=stops)

print_path(path, total_minutes)