Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-33331: Monitor memory usage at select steps of BPS submission #112

Merged
merged 2 commits into from
Feb 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/changes/DM-33331.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Context manager time_this can now include memory usage in its report.
4 changes: 4 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ ignore_errors = True
ignore_missing_imports = True
ignore_errors = True

[mypy-astropy.*]
ignore_missing_imports = True
ignore_errors = True

[mypy-eups.*]
ignore_missing_imports = True

Expand Down
57 changes: 56 additions & 1 deletion python/lsst/utils/timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,17 @@
Tuple,
)

from astropy import units as u

from .usage import get_current_mem_usage, get_peak_mem_usage

if TYPE_CHECKING:
from .logging import LsstLoggers


_LOG = logging.getLogger(__name__)


def _add_to_metadata(metadata: MutableMapping, name: str, value: Any) -> None:
"""Add a value to dict-like object, creating list as needed.

Expand Down Expand Up @@ -363,6 +370,10 @@ def time_this(
level: int = logging.DEBUG,
prefix: Optional[str] = "timer",
args: Iterable[Any] = (),
mem_usage: bool = False,
mem_child: bool = False,
mem_unit: u.Quantity = u.byte,
mem_fmt: str = ".0f",
) -> Iterator[None]:
"""Time the enclosed block and issue a log message.

Expand All @@ -384,6 +395,16 @@ def time_this(
args : iterable of any
Additional parameters passed to the log command that should be
written to ``msg``.
mem_usage : `bool`, optional
Flag indicating whether to include the memory usage in the report.
Defaults, to False.
mem_child : `bool`, optional
Flag indication whether to include memory usage of the child processes.
mem_unit : `astropy.units.Unit`, optional
Unit to use when reporting the memory usage. Defaults to bytes.
mem_fmt : `str`, optional
Format specifier to use when displaying values related to memory usage.
Defaults to '.0f'.
"""
if log is None:
log = logging.getLogger()
Expand All @@ -393,6 +414,10 @@ def time_this(

success = False
start = time.time()
if mem_usage:
current_usages_start = get_current_mem_usage()
peak_usages_start = get_peak_mem_usage()

try:
yield
success = True
Expand All @@ -405,11 +430,41 @@ def time_this(
if msg is None:
msg = ""

# Convert user provided parameters (if any) to mutable sequence to make
# mypy stop complaining when additional parameters will be added below.
params = list(args) if args else []

if not success:
# Something went wrong so change the log level to indicate
# this.
level = logging.ERROR

# Specify stacklevel to ensure the message is reported from the
# caller (1 is this file, 2 is contextlib, 3 is user)
log.log(level, msg + "%sTook %.4f seconds", *args, ": " if msg else "", end - start, stacklevel=3)
params += (": " if msg else "", end - start)
mxk62 marked this conversation as resolved.
Show resolved Hide resolved
msg += "%sTook %.4f seconds"
if mem_usage and log.isEnabledFor(level):
current_usages_end = get_current_mem_usage()
peak_usages_end = get_peak_mem_usage()

current_deltas = [end - start for end, start in zip(current_usages_end, current_usages_start)]
peak_deltas = [end - start for end, start in zip(peak_usages_end, peak_usages_start)]

current_usage = current_usages_end[0]
current_delta = current_deltas[0]
peak_delta = peak_deltas[0]
if mem_child:
current_usage += current_usages_end[1]
current_delta += current_deltas[1]
peak_delta += peak_deltas[1]

if not mem_unit.is_equivalent(u.byte):
_LOG.warning("Invalid memory unit '%s', using '%s' instead", mem_unit, u.byte)
mem_unit = u.byte

msg += (
f"; current memory usage: {current_usage.to(mem_unit):{mem_fmt}}"
f", delta: {current_delta.to(mem_unit):{mem_fmt}}"
f", peak delta: {peak_delta.to(mem_unit):{mem_fmt}}"
)
log.log(level, msg, *params, stacklevel=3)
70 changes: 70 additions & 0 deletions python/lsst/utils/usage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# This file is part of utils.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# Use of this source code is governed by a 3-clause BSD-style
# license that can be found in the LICENSE file.

"""Utilities for measuring resource consumption.
"""

import platform
import resource
from typing import Tuple

import psutil
from astropy import units as u

__all__ = ["get_current_mem_usage", "get_peak_mem_usage"]


def get_current_mem_usage() -> Tuple[u.Quantity, u.Quantity]:
"""Report current memory usage.

Returns
-------
usage_main : `astropy.units.Quantity`
Current memory usage of the calling process expressed in bytes.
usage_child : `astropy.units.Quantity`
Current memory usage of the child processes (zero if there are none)
expressed in bytes.

Notes
-----
Function reports current memory usage using resident set size as a proxy.
As such the values it reports are capped at available physical RAM and may
not reflect the actual memory allocated to the process and its children.
"""
proc = psutil.Process()
usage_main = proc.memory_info().rss * u.byte
usage_child = sum([child.memory_info().rss for child in proc.children()]) * u.byte
return usage_main, usage_child


def get_peak_mem_usage() -> Tuple[u.Quantity, u.Quantity]:
"""Report peak memory usage.

Returns
-------
peak_main: `astropy.units.Quantity`
Peak memory usage (maximum resident set size) of the calling process.
peak_child: `astropy.units.Quantity`
Peak memory usage (resident set size) of the largest child process.

Notes
-----
Function reports peak memory usage using the maximum resident set size as
a proxy. As such the value it reports is capped at available physical RAM
and may not reflect the actual maximal value.
"""
# Units getrusage(2) uses to report the maximum resident set size are
# platform dependent (kilobytes on Linux, bytes on OSX).
unit = u.kibibyte if platform.system() == "Linux" else u.byte

peak_main = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss * unit
peak_child = resource.getrusage(resource.RUSAGE_CHILDREN).ru_maxrss * unit
return peak_main, peak_child
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ numpy >= 1.17
psutil >= 5.7
deprecated >= 1.2
pyyaml >5.1
astropy >= 5.0
55 changes: 55 additions & 0 deletions tests/test_timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import unittest
from dataclasses import dataclass

from astropy import units as u
from lsst.utils.timer import logInfo, logPairs, time_this, timeMethod

log = logging.getLogger("test_timer")
Expand Down Expand Up @@ -186,8 +187,62 @@ def testTimer(self):
self.assertEqual(cm.records[0].name, "root")
self.assertEqual(cm.records[0].levelname, "DEBUG")
self.assertIn("Took", cm.output[0])
self.assertNotIn(": Took", cm.output[0])
self.assertNotIn("; ", cm.output[0])
self.assertEqual(cm.records[0].filename, THIS_FILE)

# Report memory usage.
with self.assertLogs(level="DEBUG") as cm:
with time_this(level=logging.DEBUG, prefix=None, mem_usage=True):
pass
self.assertEqual(cm.records[0].name, "root")
self.assertEqual(cm.records[0].levelname, "DEBUG")
self.assertIn("Took", cm.output[0])
mxk62 marked this conversation as resolved.
Show resolved Hide resolved
self.assertIn("memory", cm.output[0])
self.assertIn("delta", cm.output[0])
self.assertIn("peak delta", cm.output[0])
self.assertIn("byte", cm.output[0])

# Report memory usage including child processes.
with self.assertLogs(level="DEBUG") as cm:
with time_this(level=logging.DEBUG, prefix=None, mem_usage=True, mem_child=True):
pass
self.assertEqual(cm.records[0].name, "root")
self.assertEqual(cm.records[0].levelname, "DEBUG")
self.assertIn("Took", cm.output[0])
self.assertIn("memory", cm.output[0])
self.assertIn("delta", cm.output[0])
self.assertIn("peak delta", cm.output[0])
self.assertIn("byte", cm.output[0])

# Report memory usage, use non-default, but a valid memory unit.
with self.assertLogs(level="DEBUG") as cm:
with time_this(level=logging.DEBUG, prefix=None, mem_usage=True, mem_unit=u.kilobyte):
pass
self.assertEqual(cm.records[0].name, "root")
self.assertEqual(cm.records[0].levelname, "DEBUG")
self.assertIn("Took", cm.output[0])
self.assertIn("memory", cm.output[0])
self.assertIn("delta", cm.output[0])
self.assertIn("peak delta", cm.output[0])
self.assertIn("kbyte", cm.output[0])

# Report memory usage, use an invalid memory unit.
with self.assertLogs(level="DEBUG") as cm:
with time_this(level=logging.DEBUG, prefix=None, mem_usage=True, mem_unit=u.gram):
pass
self.assertEqual(cm.records[0].name, "lsst.utils.timer")
self.assertEqual(cm.records[0].levelname, "WARNING")
self.assertIn("Invalid", cm.output[0])
self.assertIn("byte", cm.output[0])
self.assertEqual(cm.records[1].name, "root")
self.assertEqual(cm.records[1].levelname, "DEBUG")
self.assertIn("Took", cm.output[1])
self.assertIn("memory", cm.output[1])
self.assertIn("delta", cm.output[1])
self.assertIn("peak delta", cm.output[1])
self.assertIn("byte", cm.output[1])

# Change logging level
with self.assertLogs(level="INFO") as cm:
with time_this(level=logging.INFO, prefix=None):
Expand Down
51 changes: 51 additions & 0 deletions tests/test_usage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# This file is part of utils.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# Use of this source code is governed by a 3-clause BSD-style
# license that can be found in the LICENSE file.

import unittest

from astropy import units as u
from lsst.utils.usage import get_current_mem_usage, get_peak_mem_usage


class UsageTestCase(unittest.TestCase):
def testGetCurrentMemUsage(self):
main1, children1 = get_current_mem_usage()
self.assertGreater(main1, 0 * u.byte)
self.assertGreaterEqual(children1.value, 0)

self.assertTrue(main1.unit.is_equivalent(u.byte))
self.assertTrue(children1.unit.is_equivalent(u.byte))

# Allocate some memory.
arr = [None] * 1_000_000 # noqa: F841

main2, children2 = get_current_mem_usage()
self.assertGreater(main2, main1)
self.assertGreaterEqual(children2, children1)

def testGetPeakMemUsage(self):
main1, child1 = get_peak_mem_usage()
self.assertGreater(main1, 0 * u.byte)
self.assertGreaterEqual(child1, 0 * u.byte)

self.assertTrue(main1.unit.is_equivalent(u.byte))
self.assertTrue(child1.unit.is_equivalent(u.byte))

# Allocate some memory.
arr = [None] * 2_000_000 # noqa: F841

main2, child2 = get_peak_mem_usage()
self.assertGreater(main2, main1)
self.assertGreaterEqual(child2, child1)


if __name__ == "__main__":
unittest.main()