In [None]:
import json
import tempfile
import time
from multiprocessing.pool import ThreadPool
from pathlib import Path
from typing import cast

import matplotlib.pyplot as plt
import matplotlib.ticker as tkr
import numpy as np
import psutil
import rich
import seaborn as sns
from adjustText import adjust_text
from cpuinfo import get_cpu_info
from pooch import retrieve
from psutil._common import bytes2human
from rich.console import Console

import quackosm as qosm

In [None]:
sns.set_theme(style="darkgrid")

In [None]:
def get_cpu_cores(p: psutil.Process) -> tuple[float, float]:
    """Get CPU usage."""
    while True:
        try:
            return (
                (
                    p.cpu_percent(interval=None)
                    + sum(_pc.cpu_percent(interval=None) for _pc in p.children(recursive=True))
                )
                / 100,
                round(time.time(), 2),
            )
        except:
            pass


def get_cpu_percentage(p: psutil.Process) -> tuple[float, float]:
    """Get CPU usage."""
    while True:
        try:
            return (
                (
                    p.cpu_percent(interval=None)
                    + sum(_pc.cpu_percent(interval=None) for _pc in p.children(recursive=True))
                )
                / psutil.cpu_count(),
                round(time.time(), 2),
            )
        except:
            pass


def get_memory_percentage(p: psutil.Process) -> tuple[float, float]:
    """Get RAM usage."""
    while True:
        try:
            return (
                p.memory_percent()
                + sum(_pc.memory_percent() for _pc in p.children(recursive=True)),
                round(time.time(), 2),
            )
        except:
            pass


def get_memory_bytes_size(p: psutil.Process) -> tuple[float, float]:
    """Get RAM usage."""
    while True:
        try:
            return (p.memory_full_info().rss, round(time.time(), 2))
        except:
            pass


def get_directory_bytes_size(directory: Path) -> tuple[float, float]:
    """Get directory size in bytes."""
    while True:
        try:
            return (
                sum(f.stat().st_size for f in Path(directory).rglob("*")),
                round(time.time(), 2),
            )
        except:
            pass

In [None]:
GLOBAL_CONSOLE = None


def _get_rich_console_new(stderr: bool = False) -> Console:
    global GLOBAL_CONSOLE  # noqa: PLW0603
    GLOBAL_CONSOLE = Console(record=True, stderr=stderr)
    return GLOBAL_CONSOLE


rich.get_console = _get_rich_console_new

In [None]:
def _execute_example(pbf_path, working_directory) -> tuple[Path, dict]:
    path = qosm.convert_pbf_to_gpq(
        pbf_path=pbf_path,
        working_directory=working_directory,
        ignore_cache=True,
        verbosity_mode="silent",
        debug_times=True,
    )
    output_text = cast(Console, GLOBAL_CONSOLE).export_text()
    search_text = "Steps times: "
    times = json.loads(output_text[(len(search_text) - 1) :])
    return path, times


def _sizeof_fmt(x, pos):
    if x < 0:
        return ""
    return bytes2human(x)


def run_example(example_name: str, pbf_download_url: str) -> None:
    """Run example and monitor usage."""
    memory_values = []
    cpu_values = []
    disk_values = []

    downloaded_file = retrieve(url=pbf_download_url, known_hash=None)

    Path("files").mkdir(parents=True, exist_ok=True)
    p = psutil.Process()

    with tempfile.TemporaryDirectory(dir="files") as tmp_dir_name:
        start_memory, start_time = get_memory_bytes_size(p)
        memory_values.append(get_memory_bytes_size(p))
        cpu_values.append(get_cpu_cores(p))
        disk_values.append(get_directory_bytes_size(tmp_dir_name))

        time.sleep(0.1)

        memory_values.append(get_memory_bytes_size(p))
        cpu_values.append(get_cpu_cores(p))
        disk_values.append(get_directory_bytes_size(tmp_dir_name))

        with ThreadPool() as pool:
            r = pool.apply_async(_execute_example, args=(downloaded_file, tmp_dir_name))
            while not r.ready():
                memory_values.append(get_memory_bytes_size(p))
                cpu_values.append(get_cpu_cores(p))
                disk_values.append(get_directory_bytes_size(tmp_dir_name))
            result_gpq_file, steps_times = r.get()
            print(result_gpq_file)
            gpq_file_size = Path(result_gpq_file).stat().st_size

    time.sleep(0.1)

    memory_values.append(get_memory_bytes_size(p))
    cpu_values.append(get_cpu_cores(p))
    disk_values.append(get_directory_bytes_size(tmp_dir_name))

    operation_start_time = min(steps_times.values())

    memory_values_adjusted = [
        (val - start_memory, ts - operation_start_time)
        for val, ts in memory_values
        if ts >= (operation_start_time - 0.1)
    ]
    cpu_values_adjusted = [
        (val, ts - operation_start_time) for val, ts in cpu_values if ts >= (operation_start_time - 0.1)
    ]
    disk_values_adjusted = [
        (val, ts - operation_start_time) for val, ts in disk_values if ts >= (operation_start_time - 0.1)
    ]

    fig = plt.figure(figsize=(20, 10))
    gs = fig.add_gridspec(nrows=4, ncols=1, height_ratios=[1, 1, 1, 1.5])
    ax_0 = fig.add_subplot(gs[0])
    ax_1 = fig.add_subplot(gs[1], sharex=ax_0)
    ax_2 = fig.add_subplot(gs[2], sharex=ax_0)
    ax_3 = fig.add_subplot(gs[3], sharex=ax_0)

    plt.setp(ax_0.get_xticklabels(), visible=False)
    plt.setp(ax_1.get_xticklabels(), visible=False)
    plt.setp(ax_2.get_xticklabels(), visible=False)
    plt.setp(ax_3.get_yticklabels(), visible=False)

    mem_val, mem_times = zip(*memory_values_adjusted)
    cpu_val, cpu_times = zip(*cpu_values_adjusted)
    dsk_val, dsk_times = zip(*disk_values_adjusted)

    sns.lineplot(x=cpu_times, y=cpu_val, ax=ax_0)
    sns.lineplot(x=mem_times, y=mem_val, ax=ax_1)
    sns.lineplot(x=dsk_times, y=dsk_val, ax=ax_2)

    pbf_file_size = Path(downloaded_file).stat().st_size
    ax_2.hlines(y=pbf_file_size, xmin=dsk_times[0], xmax=dsk_times[-1], color="r", linestyles="--")

    ax_2.annotate(
        f"PBF file size ({bytes2human(pbf_file_size)})",
        xy=(dsk_times[-1] / 2, pbf_file_size),
        xytext=(0, -8),
        textcoords="offset points",
        ha="center",
        va="top",
    )

    ax_2.hlines(
        y=gpq_file_size, xmin=dsk_times[0], xmax=dsk_times[-1], color="orange", linestyles="--"
    )

    ax_2.annotate(
        f"Result file size ({bytes2human(gpq_file_size)})",
        xy=(dsk_times[-1] / 2, gpq_file_size),
        xytext=(0, 4),
        textcoords="offset points",
        ha="center",
        va="bottom",
    )
    steps = [0.15, -0.15, 0.35, -0.35, 0.55, -0.55, 0.75, -0.75, 0.95, -0.95]
    levels = np.tile(steps, int(np.ceil(len(steps_times) / len(steps))))[: len(steps_times)]

    texts = []
    times = []
    for (step_name, timestamp), level in zip(steps_times.items(), levels):
        time_s = timestamp - operation_start_time
        ax_3.vlines(
            x=time_s,
            ymin=0,
            ymax=level,
            alpha=0.4,
        )
        ax_3.plot(time_s, level, "-o", markerfacecolor="w", markersize=1)
        texts.append(plt.text(time_s, level, step_name))

    times = ax_3.plot(
        [ts - operation_start_time for ts in steps_times.values()],
        [0 for _ in steps_times.values()],
        "-o",
        markerfacecolor="w",
    )

    adjust_text(
        texts,
        arrowprops=dict(arrowstyle="->", color="k", lw=1),
        objects=times,
        avoid_self=False,
        pull_threshold=1000,
        expand=(1, 1),
    )

    ax_3.set_ylim(-1, 1)

    ax_1.yaxis.set_major_formatter(tkr.FuncFormatter(_sizeof_fmt))
    ax_2.yaxis.set_major_formatter(tkr.FuncFormatter(_sizeof_fmt))

    ax_0.set_ylabel("CPU usage (threads)")
    ax_1.set_ylabel("Memory usage")
    ax_2.set_ylabel("Disk usage")
    ax_3.set_ylabel("Steps")

    ax_3.set_xlabel("Time (s)")

    plt.suptitle(f"Resources usage - {example_name}.osm.pbf file")

    plt.tight_layout()

    assets_path = Path(".").resolve().parent / "docs" / "assets" / "images"

    plt.savefig(assets_path / f"{example_name.lower()}_disk_spillage.png", bbox_inches="tight")

In [None]:
cpu_info = get_cpu_info()
cpu_name = cpu_info["brand_raw"]
cpu_cores = cpu_info["count"]
cpu_freq = cpu_info["hz_advertised_friendly"]

total_ram = bytes2human(psutil.virtual_memory().total)

print(f"{cpu_name} ({cpu_cores} threads, {cpu_freq} clock speed)\n{total_ram} total memory")

In [None]:
examples = {
    "Monaco": "https://download.geofabrik.de/europe/monaco-latest.osm.pbf",
    "Estonia": "https://download.geofabrik.de/europe/estonia-latest.osm.pbf",
    "Poland": "https://download.geofabrik.de/europe/poland-latest.osm.pbf",
}

In [None]:
for file_name, url in examples.items():
    run_example(file_name, url)
    # break