diff --git a/pyproject.toml b/pyproject.toml index f572a663..aec95d71 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,7 +37,8 @@ dependencies = [ "funcy", "gto", "ruamel.yaml", - "scmrepo" + "scmrepo", + "psutil" ] [project.optional-dependencies] @@ -51,7 +52,9 @@ tests = [ "pytest-cov>=3.0.0,<4.0", "pytest-mock>=3.8.2,<4.0", "dvclive[image,plots,markdown]", - "ipython" + "ipython", + "pytest_voluptuous", + "dpath" ] dev = [ "dvclive[all,tests]", diff --git a/src/dvclive/live.py b/src/dvclive/live.py index f2b12fb7..f1a15f63 100644 --- a/src/dvclive/live.py +++ b/src/dvclive/live.py @@ -7,6 +7,7 @@ import os import shutil import tempfile + from pathlib import Path, PurePath from typing import Any, Dict, List, Optional, Set, Tuple, Union, TYPE_CHECKING, Literal @@ -40,6 +41,7 @@ from .report import BLANK_NOTEBOOK_REPORT, make_report from .serialize import dump_json, dump_yaml, load_yaml from .studio import get_dvc_studio_config, post_to_studio +from .monitor_system import CPUMonitor from .utils import ( StrPath, catch_and_warn, @@ -80,6 +82,7 @@ def __init__( cache_images: bool = False, exp_name: Optional[str] = None, exp_message: Optional[str] = None, + monitor_system: bool = False, ): """ Initializes a DVCLive logger. A `Live()` instance is required in order to log @@ -118,6 +121,8 @@ def __init__( provided string will be passed to `dvc exp save --message`. If DVCLive is used inside `dvc exp run`, the option will be ignored, use `dvc exp run --message` instead. + monitor_system (bool): if `True`, DVCLive will monitor CPU, ram, and disk + usage. Defaults to `False`. """ self.summary: Dict[str, Any] = {} @@ -163,6 +168,11 @@ def __init__( self._dvc_studio_config: Dict[str, Any] = {} self._init_studio() + self._cpu_monitor = None + if monitor_system: + self._cpu_monitor = CPUMonitor() + self._cpu_monitor(self) + def _init_resume(self): self._read_params() self.summary = self.read_latest() @@ -366,6 +376,17 @@ def step(self, value: int) -> None: self._step = value logger.debug(f"Step: {self.step}") + @property + def cpu_monitor(self) -> Optional[CPUMonitor]: + return self._cpu_monitor or None + + @cpu_monitor.setter + def cpu_monitor(self, cpu_monitor: CPUMonitor) -> None: + if self._cpu_monitor is not None: + self._cpu_monitor.end() + self._cpu_monitor = cpu_monitor + self._cpu_monitor(self) + def sync(self): self.make_summary() @@ -853,6 +874,11 @@ def end(self): # If next_step called before end, don't want to update step number if "step" in self.summary: self.step = self.summary["step"] + + # Kill threads that monitor the system metrics + if self._cpu_monitor is not None: + self._cpu_monitor.end() + self.sync() if self._inside_dvc_exp and self._dvc_repo: diff --git a/src/dvclive/monitor_system.py b/src/dvclive/monitor_system.py new file mode 100644 index 00000000..7e10b2ad --- /dev/null +++ b/src/dvclive/monitor_system.py @@ -0,0 +1,195 @@ +import abc +import logging +import os +from typing import Dict, Union, Optional, Tuple + +import psutil +from statistics import mean +from threading import Event, Thread +from funcy import merge_with + + +logger = logging.getLogger("dvclive") +GIGABYTES_DIVIDER = 1024.0**3 + +MINIMUM_CPU_USAGE_TO_BE_ACTIVE = 20 + +METRIC_CPU_COUNT = "system/cpu/count" +METRIC_CPU_USAGE_PERCENT = "system/cpu/usage (%)" +METRIC_CPU_PARALLELIZATION_PERCENT = "system/cpu/parallelization (%)" + +METRIC_RAM_USAGE_PERCENT = "system/ram/usage (%)" +METRIC_RAM_USAGE_GB = "system/ram/usage (GB)" +METRIC_RAM_TOTAL_GB = "system/ram/total (GB)" + +METRIC_DISK_USAGE_PERCENT = "system/disk/usage (%)" +METRIC_DISK_USAGE_GB = "system/disk/usage (GB)" +METRIC_DISK_TOTAL_GB = "system/disk/total (GB)" + + +class _SystemMonitor(abc.ABC): + """ + Monitor system resources and log them to DVC Live. + Use a separate thread to call a `_get_metrics` function at fix interval and + aggregate the results of this sampling using the average. + """ + + _plot_blacklist_prefix: Tuple = () + + def __init__( + self, + interval: float, + num_samples: int, + plot: bool = True, + ): + max_interval = 0.1 + if interval > max_interval: + interval = max_interval + logger.warning( + f"System monitoring `interval` should be less than {max_interval} " + f"seconds. Setting `interval` to {interval} seconds." + ) + + min_num_samples = 1 + max_num_samples = 30 + if not min_num_samples < num_samples < max_num_samples: + num_samples = max(min(num_samples, max_num_samples), min_num_samples) + logger.warning( + f"System monitoring `num_samples` should be between {min_num_samples} " + f"and {max_num_samples}. Setting `num_samples` to {num_samples}." + ) + + self._interval = interval # seconds + self._nb_samples = num_samples + self._plot = plot + self._warn_user = True + + def __call__(self, live): + self._live = live + self._shutdown_event = Event() + Thread( + target=self._monitoring_loop, + ).start() + + def _monitoring_loop(self): + while not self._shutdown_event.is_set(): + self._metrics = {} + for _ in range(self._nb_samples): + last_metrics = {} + try: + last_metrics = self._get_metrics() + except psutil.Error: + if self._warn_user: + logger.exception("Failed to monitor CPU metrics") + self._warn_user = False + + self._metrics = merge_with(sum, self._metrics, last_metrics) + self._shutdown_event.wait(self._interval) + if self._shutdown_event.is_set(): + break + for name, values in self._metrics.items(): + blacklisted = any( + name.startswith(prefix) for prefix in self._plot_blacklist_prefix + ) + self._live.log_metric( + name, + values / self._nb_samples, + timestamp=True, + plot=None if blacklisted else self._plot, + ) + + @abc.abstractmethod + def _get_metrics(self) -> Dict[str, Union[float, int]]: + pass + + def end(self): + self._shutdown_event.set() + + +class CPUMonitor(_SystemMonitor): + _plot_blacklist_prefix: Tuple = ( + METRIC_CPU_COUNT, + METRIC_RAM_TOTAL_GB, + METRIC_DISK_TOTAL_GB, + ) + + def __init__( + self, + interval: float = 0.1, + num_samples: int = 20, + directories_to_monitor: Optional[Dict[str, str]] = None, + plot: bool = True, + ): + """Monitor CPU resources and log them to DVC Live. + + Args: + interval (float): interval in seconds between two measurements. + Defaults to 0.5. + num_samples (int): number of samples to average. Defaults to 10. + directories_to_monitor (Optional[Dict[str, str]]): monitor disk usage + statistics about the partition which contains the given paths. The + statistics include total and used space in gygabytes and percent. + This argument expect a dict where the key is the name that will be used + in the metric's name and the value is the path to the directory to + monitor. Defaults to {"main": "/"}. + plot (bool): should the system metrics be saved as plots. Defaults to True. + + Raises: + ValueError: if the arguments passed to the function don't have a + supported type. + """ + super().__init__(interval=interval, num_samples=num_samples, plot=plot) + directories_to_monitor = ( + {"main": "/"} if directories_to_monitor is None else directories_to_monitor + ) + self._disks_to_monitor = {} + for disk_name, disk_path in directories_to_monitor.items(): + if disk_name != os.path.normpath(disk_name): + raise ValueError( # noqa: TRY003 + "Keys for `directories_to_monitor` should be a valid name" + f", but got '{disk_name}'." + ) + self._disks_to_monitor[disk_name] = disk_path + + self._warn_disk_doesnt_exist: Dict[str, bool] = {} + + def _get_metrics(self) -> Dict[str, Union[float, int]]: + ram_info = psutil.virtual_memory() + nb_cpus = psutil.cpu_count() + cpus_percent = psutil.cpu_percent(percpu=True) + result = { + METRIC_CPU_COUNT: nb_cpus, + METRIC_CPU_USAGE_PERCENT: mean(cpus_percent), + METRIC_CPU_PARALLELIZATION_PERCENT: len( + [ + percent + for percent in cpus_percent + if percent >= MINIMUM_CPU_USAGE_TO_BE_ACTIVE + ] + ) + * 100 + / nb_cpus, + METRIC_RAM_USAGE_PERCENT: ram_info.percent, + METRIC_RAM_USAGE_GB: ram_info.used / GIGABYTES_DIVIDER, + METRIC_RAM_TOTAL_GB: ram_info.total / GIGABYTES_DIVIDER, + } + for disk_name, disk_path in self._disks_to_monitor.items(): + try: + disk_info = psutil.disk_usage(disk_path) + except OSError: + if self._warn_disk_doesnt_exist.get(disk_name, True): + logger.warning( + f"Couldn't find directory '{disk_path}', ignoring it." + ) + self._warn_disk_doesnt_exist[disk_name] = False + continue + disk_metrics = { + f"{METRIC_DISK_USAGE_PERCENT}/{disk_name}": disk_info.percent, + f"{METRIC_DISK_USAGE_GB}/{disk_name}": disk_info.used + / GIGABYTES_DIVIDER, + f"{METRIC_DISK_TOTAL_GB}/{disk_name}": disk_info.total + / GIGABYTES_DIVIDER, + } + disk_metrics = {k.rstrip("/"): v for k, v in disk_metrics.items()} + result.update(disk_metrics) + return result diff --git a/tests/test_monitor_system.py b/tests/test_monitor_system.py new file mode 100644 index 00000000..49011639 --- /dev/null +++ b/tests/test_monitor_system.py @@ -0,0 +1,211 @@ +import time +from pathlib import Path +import pytest + +import dpath +from pytest_voluptuous import S + +from dvclive import Live +from dvclive.monitor_system import ( + CPUMonitor, + METRIC_CPU_COUNT, + METRIC_CPU_USAGE_PERCENT, + METRIC_CPU_PARALLELIZATION_PERCENT, + METRIC_RAM_USAGE_PERCENT, + METRIC_RAM_USAGE_GB, + METRIC_RAM_TOTAL_GB, + METRIC_DISK_USAGE_PERCENT, + METRIC_DISK_USAGE_GB, + METRIC_DISK_TOTAL_GB, + GIGABYTES_DIVIDER, +) +from dvclive.utils import parse_metrics + + +def mock_psutil_cpu(mocker): + mocker.patch( + "dvclive.monitor_system.psutil.cpu_percent", + return_value=[10, 10, 10, 40, 50, 60], + ) + mocker.patch("dvclive.monitor_system.psutil.cpu_count", return_value=6) + + +def mock_psutil_ram(mocker): + mocked_ram = mocker.MagicMock() + mocked_ram.percent = 50 + mocked_ram.used = 2 * GIGABYTES_DIVIDER + mocked_ram.total = 4 * GIGABYTES_DIVIDER + mocker.patch( + "dvclive.monitor_system.psutil.virtual_memory", return_value=mocked_ram + ) + + +def mock_psutil_disk(mocker): + mocked_disk = mocker.MagicMock() + mocked_disk.percent = 50 + mocked_disk.used = 16 * GIGABYTES_DIVIDER + mocked_disk.total = 32 * GIGABYTES_DIVIDER + mocker.patch("dvclive.monitor_system.psutil.disk_usage", return_value=mocked_disk) + + +def mock_psutil_disk_with_oserror(mocker): + mocked_disk = mocker.MagicMock() + mocked_disk.percent = 50 + mocked_disk.used = 16 * GIGABYTES_DIVIDER + mocked_disk.total = 32 * GIGABYTES_DIVIDER + mocker.patch( + "dvclive.monitor_system.psutil.disk_usage", + side_effect=[ + mocked_disk, + OSError, + mocked_disk, + OSError, + ], + ) + + +def test_monitor_system_is_false(tmp_dir, mocker): + mock_psutil_cpu(mocker) + mock_psutil_ram(mocker) + mock_psutil_disk(mocker) + cpu_monitor_mock = mocker.patch("dvclive.live.CPUMonitor") + with Live( + tmp_dir, + save_dvc_exp=False, + monitor_system=False, + ) as live: + assert live.cpu_monitor is None + + cpu_monitor_mock.assert_not_called() + + +def test_monitor_system_is_true(tmp_dir, mocker): + mock_psutil_cpu(mocker) + mock_psutil_ram(mocker) + mock_psutil_disk(mocker) + cpu_monitor_mock = mocker.patch("dvclive.live.CPUMonitor", spec=CPUMonitor) + + with Live( + tmp_dir, + save_dvc_exp=False, + monitor_system=True, + ) as live: + cpu_monitor = live.cpu_monitor + + assert isinstance(cpu_monitor, CPUMonitor) + cpu_monitor_mock.assert_called_once() + + end_spy = mocker.spy(cpu_monitor, "end") + end_spy.assert_not_called() + + # check the monitoring thread is stopped + end_spy.assert_called_once() + + +def test_ignore_non_existent_directories(tmp_dir, mocker): + mock_psutil_cpu(mocker) + mock_psutil_ram(mocker) + mock_psutil_disk_with_oserror(mocker) + with Live( + tmp_dir, + save_dvc_exp=False, + monitor_system=False, + ) as live: + non_existent_disk = "/non-existent" + monitor = CPUMonitor( + directories_to_monitor={"main": "/", "non-existent": non_existent_disk} + ) + monitor(live) + metrics = monitor._get_metrics() + monitor.end() + + assert not Path(non_existent_disk).exists() + + assert f"{METRIC_DISK_USAGE_PERCENT}/non-existent" not in metrics + assert f"{METRIC_DISK_USAGE_GB}/non-existent" not in metrics + assert f"{METRIC_DISK_TOTAL_GB}/non-existent" not in metrics + + +@pytest.mark.timeout(2) +def test_monitor_system_metrics(tmp_dir, mocker): + mock_psutil_cpu(mocker) + mock_psutil_ram(mocker) + mock_psutil_disk(mocker) + with Live( + tmp_dir, + save_dvc_exp=False, + monitor_system=False, + ) as live: + live.cpu_monitor = CPUMonitor(interval=0.05, num_samples=4) + # wait for the metrics to be logged. + # METRIC_DISK_TOTAL_GB is the last metric to be logged. + while len(dpath.search(live.summary, METRIC_DISK_TOTAL_GB)) == 0: + time.sleep(0.001) + live.next_step() + + _, latest = parse_metrics(live) + + schema = {} + for name, value in { + "step": 0, + METRIC_CPU_COUNT: 6, + METRIC_CPU_USAGE_PERCENT: 30.0, + METRIC_CPU_PARALLELIZATION_PERCENT: 50.0, + METRIC_RAM_USAGE_PERCENT: 50.0, + METRIC_RAM_USAGE_GB: 2.0, + METRIC_RAM_TOTAL_GB: 4.0, + f"{METRIC_DISK_USAGE_PERCENT}/main": 50.0, + f"{METRIC_DISK_USAGE_GB}/main": 16.0, + f"{METRIC_DISK_TOTAL_GB}/main": 32.0, + }.items(): + dpath.new(schema, name, value) + + assert latest == S(schema) + + +@pytest.mark.timeout(2) +def test_monitor_system_timeseries(tmp_dir, mocker): + mock_psutil_cpu(mocker) + mock_psutil_ram(mocker) + mock_psutil_disk(mocker) + with Live( + tmp_dir, + save_dvc_exp=False, + monitor_system=False, + ) as live: + live.cpu_monitor = CPUMonitor(interval=0.05, num_samples=4) + + # wait for the metrics to be logged. + # METRIC_DISK_TOTAL_GB is the last metric to be logged. + while len(dpath.search(live.summary, METRIC_DISK_TOTAL_GB)) == 0: + time.sleep(0.001) + + live.next_step() + + timeseries, _ = parse_metrics(live) + + def timeserie_schema(name): + return [{name: str, "timestamp": str, "step": str(0)}] + + # timeseries contains all the system metrics + prefix = Path(tmp_dir) / "plots/metrics" + assert timeseries == S( + { + str(prefix / f"{METRIC_CPU_USAGE_PERCENT}.tsv"): timeserie_schema( + METRIC_CPU_USAGE_PERCENT.split("/")[-1] + ), + str(prefix / f"{METRIC_CPU_PARALLELIZATION_PERCENT}.tsv"): timeserie_schema( + METRIC_CPU_PARALLELIZATION_PERCENT.split("/")[-1] + ), + str(prefix / f"{METRIC_RAM_USAGE_PERCENT}.tsv"): timeserie_schema( + METRIC_RAM_USAGE_PERCENT.split("/")[-1] + ), + str(prefix / f"{METRIC_RAM_USAGE_GB}.tsv"): timeserie_schema( + METRIC_RAM_USAGE_GB.split("/")[-1] + ), + str(prefix / f"{METRIC_DISK_USAGE_PERCENT}/main.tsv"): timeserie_schema( + "main" + ), + str(prefix / f"{METRIC_DISK_USAGE_GB}/main.tsv"): timeserie_schema("main"), + } + )