In [1]:
# import libraries
import numpy as np
import pandas as pd
import tracemalloc
from tabulate import tabulate
import os
import time
import psutil

import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as T
from torch.utils.data import DataLoader, Dataset, random_split, TensorDataset

from model.fcae import createLevel6Net_10x10, createLevel7Net_20x20, createLevel8Net_30x30, createLevel8Net_40x40, createLevel8Net_50x50

%config InlineBackend.figure_formats = ['svg']
import matplotlib.pyplot as plt
%matplotlib inline

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [3]:
def a_star(maze):
    rows, cols = maze.shape
    # Find the start and end gates (positions marked as '3')
    gates = np.argwhere(maze == (3.0 - 0.0) / (3.0 - 0.0))
    if len(gates) < 2:
        return maze  # If we don't have at least two gates, there's nothing to solve

    start = tuple(gates[0])
    end = tuple(gates[1])

    # Helper function to compute heuristic (Manhattan distance)
    def heuristic(a, b):
        return abs(a[0] - b[0]) + abs(a[1] - b[1])

    # Directions for movement (up, down, left, right)
    directions = [(-1, 0), (1, 0), (0, -1), (0, 1)]

    # Open set and cost tracking
    open_set = [start]
    came_from = {}
    g_score = {start: 0}
    f_score = {start: heuristic(start, end)}

    while open_set:
        # Find the node in open set with the lowest f_score
        current = min(open_set, key=lambda x: f_score.get(x, float("inf")))
        open_set.remove(current)

        if current == end:
            # Reconstruct path and remove obstacles (walls)
            solved_maze = np.where(maze == (1.0 - 0.0) / (3.0 - 0.0), 0, maze)  # Remove walls (obstacles)
            x, y = current
            while current in came_from:
                x, y = current
                if solved_maze[x, y] == 0:
                    solved_maze[x, y] = 2  # Mark the path with '2'
                current = came_from[current]
            return solved_maze

        # Explore neighbors
        for dx, dy in directions:
            neighbor = (current[0] + dx, current[1] + dy)

            if (
                0 <= neighbor[0] < rows
                and 0 <= neighbor[1] < cols
                and maze[neighbor] != ((1.0 - 0.0) / (3.0 - 0.0))
            ):
                tentative_g_score = g_score[current] + 1

                if tentative_g_score < g_score.get(neighbor, float("inf")):
                    came_from[neighbor] = current
                    g_score[neighbor] = tentative_g_score
                    f_score[neighbor] = tentative_g_score + heuristic(neighbor, end)
                    if neighbor not in open_set:
                        open_set.append(neighbor)

    return maze

In [4]:
def evaluate_found_path(answer: torch.Tensor, Y_test: torch.Tensor):
    start = find_gate(Y_test, 0.8, True, 3)
    end = find_gate(Y_test, 0.8, False, 3)

    if start is None or end is None:
        return False

    current = (start[0], start[1])
    visited = set()
    visited.add(current)

    max_moves = (
        answer.shape[0] * answer.shape[1]
    )  # Consider the grid size for max moves
    # max_moves = 17

    for i in range(max_moves):
        brightest_neighbour = find_brightest_neighbour(answer, current, visited, Y_test)
        if brightest_neighbour is None:
            return False
        current = (brightest_neighbour[0], brightest_neighbour[1])
        visited.add(current)
        if current == end:
            return True
    return False

def find_gate(answer: torch.Tensor, threshold: float, start: bool, gate_value: float = 1.0):
    # Definiere den Bereich, in dem nach dem Gate gesucht wird
    min_value = gate_value - threshold
    max_value = gate_value + threshold

    # Finde alle Kandidaten, die im Bereich [min_value, max_value] liegen
    gate_candidates = ((answer >= min_value) & (answer <= max_value)).nonzero(as_tuple=True)

    # Prüfe, ob Kandidaten existieren
    if len(gate_candidates[0]) == 0:
        return None  # Kein Gate gefunden

    # Wähle das erste oder letzte Gate aus
    if start:
        return gate_candidates[0][0].item(), gate_candidates[1][0].item()  # Erstes Gate
    else:
        return gate_candidates[0][-1].item(), gate_candidates[1][-1].item()  # Letztes Gate

def find_brightest_neighbour(
    answer: torch.Tensor,
    position: tuple[int, int],
    visited: set[tuple[int, int]],
    Y_test: torch.Tensor,
):
    rows, cols = answer.shape  # Get the actual number of rows and columns

    ind_up = (max(position[0] - 1, 0), position[1])
    ind_down = (min(position[0] + 1, rows - 1), position[1])
    ind_left = (position[0], max(position[1] - 1, 0))
    ind_right = (position[0], min(position[1] + 1, cols - 1))

    value_up = answer[ind_up[0]][ind_up[1]] if ind_up not in visited else -1
    value_down = answer[ind_down[0]][ind_down[1]] if ind_down not in visited else -1
    value_left = answer[ind_left[0]][ind_left[1]] if ind_left not in visited else -1
    value_right = answer[ind_right[0]][ind_right[1]] if ind_right not in visited else -1

    # Find the maximum value among the neighbours and return the corresponding index
    max_value = max(value_up, value_down, value_left, value_right)
    if max_value == -1:
        return None

    if max_value == value_up and Y_test[ind_up[0]][ind_up[1]] != 1.0:
        return ind_up
    elif max_value == value_down and Y_test[ind_down[0]][ind_down[1]] != 1.0:
        return ind_down
    elif max_value == value_left and Y_test[ind_left[0]][ind_left[1]] != 1.0:
        return ind_left
    elif max_value == value_right and Y_test[ind_right[0]][ind_right[1]] != 1.0:
        return ind_right

    return None

def measure_accuracy(model, test_loader: DataLoader, algorithm_fn=None):
    """
    Evaluates the accuracy of a model or an algorithm (e.g., A*) in solving mazes.

    Args:
        model: The neural network model to evaluate (can be None if only algorithm_fn is used).
        test_loader: DataLoader containing the test data (input mazes and their ground truth solutions).
        algorithm_fn: A function implementing the A* algorithm (if provided, it will be used instead of the model).

    Returns:
        correct_percentage: Accuracy as a percentage of correctly solved mazes.
    """
    preds_ = torch.zeros(len(test_loader.dataset), dtype=bool)  # To store correctness of each maze
    rows, cols = test_loader.dataset[0][0].shape[-2:]  # Determine maze dimensions

    output = []
    output_truth = []

    # Collect predictions from the model or the A* algorithm
    with torch.no_grad():
        if model is not None:
            model.to(device)
            model.eval()
            for x_test, y_true in test_loader:
                # Pass the input through the model
                x_test = x_test.to(device)
                y_hat = model(x_test)
                output.append(y_hat)
                output_truth.append(y_true)
        elif algorithm_fn is not None:
            # Pass the input through the A* algorithm
            y_hat_batch = []
            for x_test, y_true in test_loader:
                for maze in x_test:
                    maze = maze.squeeze().numpy()
                    y_hat_batch.append(torch.tensor(algorithm_fn(maze), dtype=torch.float32))
                output.append(torch.stack(y_hat_batch))
                output_truth.append(y_true)

    # Concatenate predictions and ground truths
    output = torch.cat(output, dim=0).cpu()
    output_truth = torch.cat(output_truth, dim=0).cpu()

    print("Evalutating accuracy")

    # Evaluate accuracy
    for index, (y_hat, y_true) in enumerate(zip(output, output_truth)):
        if index % 200 == 0:
            print(f"testing maze {index}")
        # Evaluate the path accuracy
        preds_[index] = evaluate_found_path(
            y_hat.view(rows, cols), y_true.view(rows, cols)
        )

    # Calculate accuracy
    total = preds_.shape[0]
    correct = preds_.sum().item()
    correct_percentage = (correct / total) * 100

    return correct_percentage

In [5]:
def measure_inference_time_nn(model, test_loader):
    start = torch.cuda.Event(enable_timing=True)
    end = torch.cuda.Event(enable_timing=True)
    runtimes_per_sample = []

    # Modell vorbereiten
    model.eval()
    model.to(device)

    with torch.no_grad():
        for _ in range(2):  # Warm-up Runs
            for x_test, _ in test_loader:
                x_test = x_test.to(device)
                model(x_test)

        for _ in range(5):  # 
            print(f"round {_}")
            for x_test, _ in test_loader:
                batch_size = x_test.size(0)
                x_test = x_test.to(device)

                start.record()
                model(x_test)
                end.record()
                torch.cuda.synchronize()
                elapsed_time_per_batch = start.elapsed_time(end)
                elapsed_time_per_sample = (
                    elapsed_time_per_batch / batch_size
                )
                runtimes_per_sample.append(elapsed_time_per_sample * 1000) # milliseconds to microseconds

    avg_time_per_sample = sum(runtimes_per_sample) / len(runtimes_per_sample)
    return avg_time_per_sample, runtimes_per_sample

In [6]:
def measure_inference_time_nn_cpu(model, test_loader):
    runtimes_per_sample = []

    # Modell vorbereiten
    model.eval()
    model.to("cpu")

    with torch.no_grad():
        for _ in range(5):  # 
            print(f"round {_}")
            for x_test, _ in test_loader:
                batch_size = x_test.size(0)

                start_time = time.perf_counter_ns()
                model(x_test)
                elapsed_time_per_batch = time.perf_counter_ns() - start_time
                
                elapsed_time_per_sample = (
                    elapsed_time_per_batch / batch_size
                )
                runtimes_per_sample.append(elapsed_time_per_sample / 1000)

    avg_time_per_sample = sum(runtimes_per_sample) / len(runtimes_per_sample)
    return avg_time_per_sample, runtimes_per_sample

In [7]:
def measure_inference_time_alg(algorithm_fn, test_loader):
    """
    Measures the average inference time per maze for the A* algorithm on the CPU.

    Args:
        algorithm_fn: The A* function that takes a single maze as input.
        test_loader: Dataloader containing batches of mazes.

    Returns:
        avg_time_per_sample: Average inference time per maze in microseconds.
        runtimes_per_sample: List of measured times per maze in microseconds.
    """
    runtimes_per_sample = []

    # Warm-up Runs (for system consistency)
    # for _ in range(5):
    #     for x_batch, _ in test_loader:
    #         for maze in x_batch:
    #             algorithm_fn(maze.squeeze().cpu().numpy())  # Ensure A* runs on the CPU

    # Actual Measurements
    with torch.no_grad():
        for _ in range(5):  # Number of measurement runs
            print(f"round {_}")
            for x_batch, _ in test_loader:
                batch_times = []
                x_batch = x_batch.squeeze().numpy()
                for maze in x_batch:
                    start_time = time.perf_counter_ns()  # Start timing in nanoseconds
                    algorithm_fn(maze)
                    elapsed_time = time.perf_counter_ns() - start_time  # Time in nanoseconds
                    batch_times.append(elapsed_time / 1_000)  # Convert to microseconds
                runtimes_per_sample.extend(batch_times)

    avg_time_per_sample = sum(runtimes_per_sample) / len(runtimes_per_sample)
    return avg_time_per_sample, runtimes_per_sample

In [8]:
def measure_memory_usage_nn(model, test_loader):
    model.eval()
    model.to(device)

    peak_memory = torch.cuda.max_memory_allocated(device)  # GPU Peak Memory
    torch.cuda.reset_peak_memory_stats(device)

    with torch.no_grad():
        for x_test, _ in test_loader:
            x_test = x_test.to(device)
            model(x_test)

    peak_memory = torch.cuda.max_memory_allocated(device) / (1024**2)  # In MB
    torch.cuda.reset_peak_memory_stats(device)
    return peak_memory

In [9]:
# def measure_memory_usage_nn_cpu(model, test_loader):
#     model.eval()

#     tracemalloc.start()
#     tracemalloc.reset_peak()  # Reset memory stats before function call
#     model.to("cpu")

    # with torch.no_grad():
    #     for x_test, _ in test_loader:
    #         # x_test = x_test.to(device)
    #         model(x_test)

#     # Get peak memory usage
#     _, peak_memory = tracemalloc.get_traced_memory()
#     tracemalloc.stop()

#     # Convert to MB and return the memory usage in MB
#     return peak_memory / (1024**2)

def measure_memory_usage_nn_cpu(model, test_loader):
    model.eval()
    model.to("cpu")
    
    process = psutil.Process(os.getpid())
    start_memory = process.memory_info().rss  # In Bytes
    
    with torch.no_grad():
        for x_test, _ in test_loader:
            model(x_test)  # Simulate forward pass

    end_memory = process.memory_info().rss  # In Bytes
    peak_memory = (end_memory - start_memory) / (1024**2)  # Convert to MB
    
    return peak_memory

In [10]:
def measure_memory_usage_alg(algorithm_fn, test_loader):
    tracemalloc.start()
    tracemalloc.reset_peak()  # Reset memory stats before function call

    # Call the function
    i = 0
    for x_batch, _ in test_loader:
        x_batch = x_batch.squeeze().numpy()
        for maze in x_batch:
            if i % 200 == 0:
                print(f"memory usage for {i}")

            i += 1
            algorithm_fn(maze)

    # Get peak memory usage
    _, peak_memory = tracemalloc.get_traced_memory()
    tracemalloc.stop()

    # Convert to MB and return the memory usage in MB
    return peak_memory / (1024**2)

In [11]:
def compare_nn_alg(test_loader, model, algorithm_fn):
    """
    Vergleicht Modelle basierend auf erweiterten Metriken.
    """
    results = []

    # Path Accuracy
    print(f"nn_accuracy")
    nn_accuracy = measure_accuracy(model, test_loader)
    print(f"alg_accuracy")
    # alg_accuracy = measure_accuracy(None, test_loader, algorithm_fn)

    # Inference Time per Sample
    print(f"nn_avg_inference_time_gpu")
    nn_avg_inference_time_gpu, _ = measure_inference_time_nn(model, test_loader)
    print(f"nn_avg_inference_time_cpu")
    nn_avg_inference_time_cpu, _ = measure_inference_time_nn_cpu(model, test_loader)
    print(f"alg_avg_inference_time")
    alg_avg_inference_time, _ = measure_inference_time_alg(algorithm_fn, test_loader)

    # Memory Usage
    print(f"nn_memory_usage_gpu")
    nn_memory_usage_gpu = measure_memory_usage_nn(model, test_loader)
    print(f"nn_memory_usage_cpu")
    nn_memory_usage_cpu = measure_memory_usage_nn_cpu(model, test_loader)
    print(f"alg_memory_usage")
    alg_memory_usage = measure_memory_usage_alg(algorithm_fn, test_loader)
    # Parameter Count & Model Size
    # param_count, model_size = get_model_summary(model)

    data_size = test_loader.dataset[0][0].shape[-2:]

    # # Ergebnisse sammeln
    results.append(
        {
            "Model": f"FCAE {model.__class__.__name__}",
            "Maze size (px)": f"{data_size[0]}x{data_size[1]}",
            "Path Accuracy (%)": nn_accuracy,
            "Inference Time GPU (µs)": nn_avg_inference_time_gpu,
            "Inference Time CPU (µs)": nn_avg_inference_time_cpu,
            "Memory Usage GPU (MB)": nn_memory_usage_gpu,
            "Memory Usage CPU (MB)": nn_memory_usage_cpu,
        }
    )
    results.append(
        {
            "Model": "A*",
            "Maze size (px)": f"{data_size[0]}x{data_size[1]}",
            "Path Accuracy (%)": 100,
            "Inference Time GPU (µs)": "---",
            "Inference Time CPU (µs)": alg_avg_inference_time,
            "Memory Usage GPU (MB)": "---",
            "Memory Usage CPU (MB)": alg_memory_usage,
        }
    )

    # Ergebnisse als DataFrame
    results_df = pd.DataFrame(results)
    print(results_df)
    return results_df

In [12]:
# # Train/Test Split
path_to_data = "./data/"

data_size = (10, 10)

X_file = np.load(f"{path_to_data}100000x{data_size[0]}x{data_size[1]}_unsolved.npy")
Y_file = np.load(f"{path_to_data}100000x{data_size[0]}x{data_size[1]}_solved.npy")

X_file = (X_file - X_file.min() ) / ( X_file.max() - X_file.min())

x_tensor = torch.tensor(X_file, dtype=torch.float32)
y_tensor = torch.tensor(Y_file, dtype=torch.float32)

dataset = TensorDataset(x_tensor, y_tensor)

total_size = len(dataset)
train_size = int(0.8 * total_size)
test_size = total_size - train_size

train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

# Create DataLoaders
batchsize = 32
train_loader = DataLoader(train_dataset, batch_size=batchsize, shuffle=True, drop_last=True)
test_loader = DataLoader(test_dataset, batch_size=batchsize)

In [13]:
# 10x10
model, _, _ = createLevel6Net_10x10()
model.load_state_dict(
    torch.load(
        "./archive/Level6Net_10x10_99.9900/net.pt",
        weights_only=True,
    )
)
# 20x20
# model, _, _ = createLevel7Net_20x20()
# model.load_state_dict(
#     torch.load(
#         "./archive/Level7Net_20x20_99.1000/net.pt",
#         weights_only=True,
#     )
# )
# 30x30
# model, _, _ = createLevel8Net_30x30()
# model.load_state_dict(
#     torch.load(
#         "./archive/Level8Net_30x30_89.8200/net.pt",
#         weights_only=True,
#     )
# )
# 40x40
# model, _, _ = createLevel8Net_40x40()
# model.load_state_dict(
#     torch.load(
#         "./archive/Level8Net_40x40_83.9350/net.pt",
#         weights_only=True,
#     )
# )
# 50x50
# model, _, _ = createLevel8Net_50x50()
# model.load_state_dict(
#     torch.load(
#         "./archive/Level8Net_50x50_75.3600/net.pt",
#         weights_only=True,
#     )
# )

<All keys matched successfully>

In [14]:
time, _ = measure_inference_time_nn(model, test_loader)
time

round 0
round 1
round 2
round 3
round 4


47.84432768583298

In [15]:
# df = compare_nn_alg(test_loader, model, a_star)