Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
b5a8171
add cpu monitoring
AlexandreKempf Feb 6, 2024
e3b654c
add unit tests and more cpu metrics
AlexandreKempf Feb 6, 2024
e6cff32
change default value for callback
AlexandreKempf Feb 6, 2024
8663011
uses a percentage value for cpu parallelism
AlexandreKempf Feb 6, 2024
1346750
add ram total
AlexandreKempf Feb 6, 2024
5f90bea
remove total ram measure from plots
AlexandreKempf Feb 6, 2024
2ac50f2
update pyproject.toml
AlexandreKempf Feb 7, 2024
5c0a288
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 7, 2024
30315bb
add tmpdir to metrics tests
AlexandreKempf Feb 7, 2024
0f55943
Merge branch 'main' into monitor-cpu-ressources
AlexandreKempf Feb 7, 2024
40686c7
default to no monitoring callbacks
AlexandreKempf Feb 7, 2024
4b98144
fix tmp_dir for test on windows and macos
AlexandreKempf Feb 9, 2024
36fd94d
fix tmp_dir for test on windows and macos
AlexandreKempf Feb 9, 2024
59db522
fix update data to studio live experiment
AlexandreKempf Feb 9, 2024
2393fb0
fix studio update data problem
AlexandreKempf Feb 9, 2024
3b327c5
debug studio updates
AlexandreKempf Feb 9, 2024
b0ac980
improve code readability:
AlexandreKempf Feb 12, 2024
5162ac2
remove hack lightning
AlexandreKempf Feb 12, 2024
9353645
fix lightning problem with steps in studio
AlexandreKempf Feb 12, 2024
871bebc
simplify the metric names
AlexandreKempf Feb 13, 2024
072252d
Merge branch 'main' into monitor-cpu-ressources
AlexandreKempf Feb 14, 2024
e169abc
don't increment `num_point_sent_to_studio` if studio didn't received …
AlexandreKempf Feb 14, 2024
aa5b511
add directory metrics to the list of metrics tracked + refacto
AlexandreKempf Feb 15, 2024
9d0e70d
clean code and split features into several PRs
AlexandreKempf Feb 15, 2024
ba72e01
cleaner user interface
AlexandreKempf Feb 15, 2024
6e29ff6
add docstrings
AlexandreKempf Feb 15, 2024
719087b
mypy conflicts
AlexandreKempf Feb 15, 2024
8654aee
change error types and `monitor_cpu` to `cpu_monitor`
AlexandreKempf Feb 16, 2024
f0d6234
add unit tests about _num_points_sent_to_studio behavior
AlexandreKempf Feb 16, 2024
bc48074
Merge branch 'main' into monitor-cpu-ressources
Feb 16, 2024
d2c1c84
use constant values for metrics names
AlexandreKempf Feb 16, 2024
7ab8915
improve test and user inputs
AlexandreKempf Feb 19, 2024
a8d1bfe
improve tests and error catching
AlexandreKempf Feb 20, 2024
4d42f0e
add docstring and fix typo
AlexandreKempf Feb 20, 2024
c4485d7
bugfix
AlexandreKempf Feb 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ dependencies = [
"funcy",
"gto",
"ruamel.yaml",
"scmrepo"
"scmrepo",
"psutil"
]

[project.optional-dependencies]
Expand All @@ -51,7 +52,9 @@ tests = [
"pytest-cov>=3.0.0,<4.0",
"pytest-mock>=3.8.2,<4.0",
"dvclive[image,plots,markdown]",
"ipython"
"ipython",
"pytest_voluptuous",
"dpath"
]
dev = [
"dvclive[all,tests]",
Expand Down
26 changes: 26 additions & 0 deletions src/dvclive/live.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import os
import shutil
import tempfile

from pathlib import Path, PurePath
from typing import Any, Dict, List, Optional, Set, Tuple, Union, TYPE_CHECKING, Literal

Expand Down Expand Up @@ -40,6 +41,7 @@
from .report import BLANK_NOTEBOOK_REPORT, make_report
from .serialize import dump_json, dump_yaml, load_yaml
from .studio import get_dvc_studio_config, post_to_studio
from .monitor_system import CPUMonitor
from .utils import (
StrPath,
catch_and_warn,
Expand Down Expand Up @@ -80,6 +82,7 @@ def __init__(
cache_images: bool = False,
exp_name: Optional[str] = None,
exp_message: Optional[str] = None,
monitor_system: bool = False,
):
"""
Initializes a DVCLive logger. A `Live()` instance is required in order to log
Expand Down Expand Up @@ -118,6 +121,8 @@ def __init__(
provided string will be passed to `dvc exp save --message`.
If DVCLive is used inside `dvc exp run`, the option will be ignored, use
`dvc exp run --message` instead.
monitor_system (bool): if `True`, DVCLive will monitor CPU, ram, and disk
usage. Defaults to `False`.
"""
self.summary: Dict[str, Any] = {}

Expand Down Expand Up @@ -163,6 +168,11 @@ def __init__(
self._dvc_studio_config: Dict[str, Any] = {}
self._init_studio()

self._cpu_monitor = None
if monitor_system:
self._cpu_monitor = CPUMonitor()
self._cpu_monitor(self)

def _init_resume(self):
self._read_params()
self.summary = self.read_latest()
Expand Down Expand Up @@ -366,6 +376,17 @@ def step(self, value: int) -> None:
self._step = value
logger.debug(f"Step: {self.step}")

@property
def cpu_monitor(self) -> Optional[CPUMonitor]:
return self._cpu_monitor or None

@cpu_monitor.setter
def cpu_monitor(self, cpu_monitor: CPUMonitor) -> None:
if self._cpu_monitor is not None:
self._cpu_monitor.end()
self._cpu_monitor = cpu_monitor
self._cpu_monitor(self)

def sync(self):
self.make_summary()

Expand Down Expand Up @@ -853,6 +874,11 @@ def end(self):
# If next_step called before end, don't want to update step number
if "step" in self.summary:
self.step = self.summary["step"]

# Kill threads that monitor the system metrics
if self._cpu_monitor is not None:
self._cpu_monitor.end()

self.sync()

if self._inside_dvc_exp and self._dvc_repo:
Expand Down
195 changes: 195 additions & 0 deletions src/dvclive/monitor_system.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
import abc
import logging
import os
from typing import Dict, Union, Optional, Tuple

import psutil
from statistics import mean
from threading import Event, Thread
from funcy import merge_with


logger = logging.getLogger("dvclive")
GIGABYTES_DIVIDER = 1024.0**3

MINIMUM_CPU_USAGE_TO_BE_ACTIVE = 20

METRIC_CPU_COUNT = "system/cpu/count"
METRIC_CPU_USAGE_PERCENT = "system/cpu/usage (%)"
METRIC_CPU_PARALLELIZATION_PERCENT = "system/cpu/parallelization (%)"

METRIC_RAM_USAGE_PERCENT = "system/ram/usage (%)"
METRIC_RAM_USAGE_GB = "system/ram/usage (GB)"
METRIC_RAM_TOTAL_GB = "system/ram/total (GB)"

METRIC_DISK_USAGE_PERCENT = "system/disk/usage (%)"
METRIC_DISK_USAGE_GB = "system/disk/usage (GB)"
METRIC_DISK_TOTAL_GB = "system/disk/total (GB)"


class _SystemMonitor(abc.ABC):
"""
Monitor system resources and log them to DVC Live.
Use a separate thread to call a `_get_metrics` function at fix interval and
aggregate the results of this sampling using the average.
"""

_plot_blacklist_prefix: Tuple = ()

def __init__(
self,
interval: float,
num_samples: int,
plot: bool = True,
):
max_interval = 0.1
if interval > max_interval:
interval = max_interval
logger.warning(
f"System monitoring `interval` should be less than {max_interval} "
f"seconds. Setting `interval` to {interval} seconds."
)

min_num_samples = 1
max_num_samples = 30
if not min_num_samples < num_samples < max_num_samples:
num_samples = max(min(num_samples, max_num_samples), min_num_samples)
logger.warning(
f"System monitoring `num_samples` should be between {min_num_samples} "
f"and {max_num_samples}. Setting `num_samples` to {num_samples}."
)

self._interval = interval # seconds
self._nb_samples = num_samples
self._plot = plot
self._warn_user = True

def __call__(self, live):
self._live = live
self._shutdown_event = Event()
Thread(
target=self._monitoring_loop,
).start()

def _monitoring_loop(self):
while not self._shutdown_event.is_set():
self._metrics = {}
for _ in range(self._nb_samples):
last_metrics = {}
try:
last_metrics = self._get_metrics()
except psutil.Error:
if self._warn_user:
logger.exception("Failed to monitor CPU metrics")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it dump the whole stack trace? should we just include message if we are not in the DEBUG mode (I think we have a flag for this in DVCLive).

self._warn_user = False

self._metrics = merge_with(sum, self._metrics, last_metrics)
self._shutdown_event.wait(self._interval)
if self._shutdown_event.is_set():
break
for name, values in self._metrics.items():
blacklisted = any(
name.startswith(prefix) for prefix in self._plot_blacklist_prefix
)
self._live.log_metric(
name,
values / self._nb_samples,
timestamp=True,
plot=None if blacklisted else self._plot,
)

@abc.abstractmethod
def _get_metrics(self) -> Dict[str, Union[float, int]]:
pass

def end(self):
self._shutdown_event.set()


class CPUMonitor(_SystemMonitor):
_plot_blacklist_prefix: Tuple = (
METRIC_CPU_COUNT,
METRIC_RAM_TOTAL_GB,
METRIC_DISK_TOTAL_GB,
)

def __init__(
self,
interval: float = 0.1,
num_samples: int = 20,
directories_to_monitor: Optional[Dict[str, str]] = None,
plot: bool = True,
):
"""Monitor CPU resources and log them to DVC Live.

Args:
interval (float): interval in seconds between two measurements.
Defaults to 0.5.
num_samples (int): number of samples to average. Defaults to 10.
directories_to_monitor (Optional[Dict[str, str]]): monitor disk usage
statistics about the partition which contains the given paths. The
statistics include total and used space in gygabytes and percent.
This argument expect a dict where the key is the name that will be used
in the metric's name and the value is the path to the directory to
monitor. Defaults to {"main": "/"}.
plot (bool): should the system metrics be saved as plots. Defaults to True.

Raises:
ValueError: if the arguments passed to the function don't have a
supported type.
"""
super().__init__(interval=interval, num_samples=num_samples, plot=plot)
directories_to_monitor = (
{"main": "/"} if directories_to_monitor is None else directories_to_monitor
)
self._disks_to_monitor = {}
for disk_name, disk_path in directories_to_monitor.items():
if disk_name != os.path.normpath(disk_name):
raise ValueError( # noqa: TRY003
"Keys for `directories_to_monitor` should be a valid name"
f", but got '{disk_name}'."
)
self._disks_to_monitor[disk_name] = disk_path

self._warn_disk_doesnt_exist: Dict[str, bool] = {}

def _get_metrics(self) -> Dict[str, Union[float, int]]:
ram_info = psutil.virtual_memory()
nb_cpus = psutil.cpu_count()
cpus_percent = psutil.cpu_percent(percpu=True)
result = {
METRIC_CPU_COUNT: nb_cpus,
METRIC_CPU_USAGE_PERCENT: mean(cpus_percent),
METRIC_CPU_PARALLELIZATION_PERCENT: len(
[
percent
for percent in cpus_percent
if percent >= MINIMUM_CPU_USAGE_TO_BE_ACTIVE
]
)
* 100
/ nb_cpus,
METRIC_RAM_USAGE_PERCENT: ram_info.percent,
METRIC_RAM_USAGE_GB: ram_info.used / GIGABYTES_DIVIDER,
METRIC_RAM_TOTAL_GB: ram_info.total / GIGABYTES_DIVIDER,
}
for disk_name, disk_path in self._disks_to_monitor.items():
try:
disk_info = psutil.disk_usage(disk_path)
except OSError:
if self._warn_disk_doesnt_exist.get(disk_name, True):
logger.warning(
f"Couldn't find directory '{disk_path}', ignoring it."
)
self._warn_disk_doesnt_exist[disk_name] = False
continue
disk_metrics = {
f"{METRIC_DISK_USAGE_PERCENT}/{disk_name}": disk_info.percent,
f"{METRIC_DISK_USAGE_GB}/{disk_name}": disk_info.used
/ GIGABYTES_DIVIDER,
f"{METRIC_DISK_TOTAL_GB}/{disk_name}": disk_info.total
/ GIGABYTES_DIVIDER,
}
disk_metrics = {k.rstrip("/"): v for k, v in disk_metrics.items()}
result.update(disk_metrics)
return result
Loading