Skip to content

Commit

Permalink
Issue #14154: Reimplement the bigmem test memory watchdog as a subpro…
Browse files Browse the repository at this point in the history
…cess.
  • Loading branch information
Charles-François Natali committed Mar 24, 2012
1 parent 226ed7e commit 55bce63
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 240 deletions.
28 changes: 28 additions & 0 deletions Lib/test/memory_watchdog.py
@@ -0,0 +1,28 @@
"""Memory watchdog: periodically read the memory usage of the main test process
and print it out, until terminated."""
# stdin should refer to the process' /proc/<PID>/statm: we don't pass the
# process' PID to avoid a race condition in case of - unlikely - PID recycling.
# If the process crashes, reading from the /proc entry will fail with ESRCH.


import os
import sys
import time


try:
page_size = os.sysconf('SC_PAGESIZE')
except (ValueError, AttributeError):
try:
page_size = os.sysconf('SC_PAGE_SIZE')
except (ValueError, AttributeError):
page_size = 4096

while True:
sys.stdin.seek(0)
statm = sys.stdin.read()
data = int(statm.split()[5])
sys.stdout.write(" ... process data size: {data:.1f}G\n"
.format(data=data * page_size / (1024 ** 3)))
sys.stdout.flush()
time.sleep(1)
66 changes: 10 additions & 56 deletions Lib/test/support.py
Expand Up @@ -35,21 +35,11 @@
except ImportError:
multiprocessing = None

try:
import faulthandler
except ImportError:
faulthandler = None

try:
import zlib
except ImportError:
zlib = None

try:
import fcntl
except ImportError:
fcntl = None

__all__ = [
"Error", "TestFailed", "ResourceDenied", "import_module",
"verbose", "use_resources", "max_memuse", "record_original_stdout",
Expand Down Expand Up @@ -1151,62 +1141,26 @@ class _MemoryWatchdog:
def __init__(self):
self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid())
self.started = False
self.thread = None
try:
self.page_size = os.sysconf('SC_PAGESIZE')
except (ValueError, AttributeError):
try:
self.page_size = os.sysconf('SC_PAGE_SIZE')
except (ValueError, AttributeError):
self.page_size = 4096

def consumer(self, fd):
HEADER = "l"
header_size = struct.calcsize(HEADER)
try:
while True:
header = os.read(fd, header_size)
if len(header) < header_size:
# Pipe closed on other end
break
data_len, = struct.unpack(HEADER, header)
data = os.read(fd, data_len)
statm = data.decode('ascii')
data = int(statm.split()[5])
print(" ... process data size: {data:.1f}G"
.format(data=data * self.page_size / (1024 ** 3)))
finally:
os.close(fd)

def start(self):
if not faulthandler or not hasattr(faulthandler, '_file_watchdog'):
return
try:
rfd = os.open(self.procfile, os.O_RDONLY)
f = open(self.procfile, 'r')
except OSError as e:
warnings.warn('/proc not available for stats: {}'.format(e),
RuntimeWarning)
sys.stderr.flush()
return
pipe_fd, wfd = os.pipe()
# set the write end of the pipe non-blocking to avoid blocking the
# watchdog thread when the consumer doesn't drain the pipe fast enough
if fcntl:
flags = fcntl.fcntl(wfd, fcntl.F_GETFL)
fcntl.fcntl(wfd, fcntl.F_SETFL, flags|os.O_NONBLOCK)
# _file_watchdog() doesn't take the GIL in its child thread, and
# therefore collects statistics timely
faulthandler._file_watchdog(rfd, wfd, 1.0)

watchdog_script = findfile("memory_watchdog.py")
self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script],
stdin=f, stderr=subprocess.DEVNULL)
f.close()
self.started = True
self.thread = threading.Thread(target=self.consumer, args=(pipe_fd,))
self.thread.daemon = True
self.thread.start()

def stop(self):
if not self.started:
return
faulthandler._cancel_file_watchdog()
self.thread.join()
if self.started:
self.mem_watchdog.terminate()
self.mem_watchdog.wait()


def bigmemtest(size, memuse, dry_run=True):
Expand Down Expand Up @@ -1234,7 +1188,7 @@ def wrapper(self):
"not enough memory: %.1fG minimum needed"
% (size * memuse / (1024 ** 3)))

if real_max_memuse and verbose and faulthandler and threading:
if real_max_memuse and verbose:
print()
print(" ... expected peak memory use: {peak:.1f}G"
.format(peak=size * memuse / (1024 ** 3)))
Expand Down
184 changes: 0 additions & 184 deletions Modules/faulthandler.c
Expand Up @@ -13,7 +13,6 @@

#ifdef WITH_THREAD
# define FAULTHANDLER_LATER
# define FAULTHANDLER_WATCHDOG
#endif

#ifndef MS_WINDOWS
Expand Down Expand Up @@ -66,20 +65,6 @@ static struct {
} thread;
#endif

#ifdef FAULTHANDLER_WATCHDOG
static struct {
int rfd;
int wfd;
PY_TIMEOUT_T period_us; /* period in microseconds */
/* The main thread always holds this lock. It is only released when
faulthandler_watchdog() is interrupted before this thread exits, or at
Python exit. */
PyThread_type_lock cancel_event;
/* released by child thread when joined */
PyThread_type_lock running;
} watchdog;
#endif

#ifdef FAULTHANDLER_USER
typedef struct {
int enabled;
Expand Down Expand Up @@ -604,139 +589,6 @@ faulthandler_cancel_dump_tracebacks_later_py(PyObject *self)
}
#endif /* FAULTHANDLER_LATER */

#ifdef FAULTHANDLER_WATCHDOG

static void
file_watchdog(void *unused)
{
PyLockStatus st;
PY_TIMEOUT_T timeout;

#define MAXDATA 1024
char buf1[MAXDATA], buf2[MAXDATA];
char *data = buf1, *old_data = buf2;
Py_ssize_t data_len, old_data_len = -1;

#if defined(HAVE_PTHREAD_SIGMASK) && !defined(HAVE_BROKEN_PTHREAD_SIGMASK)
sigset_t set;

/* we don't want to receive any signal */
sigfillset(&set);
pthread_sigmask(SIG_SETMASK, &set, NULL);
#endif

/* On first pass, feed file contents immediately */
timeout = 0;
do {
st = PyThread_acquire_lock_timed(watchdog.cancel_event,
timeout, 0);
timeout = watchdog.period_us;
if (st == PY_LOCK_ACQUIRED) {
PyThread_release_lock(watchdog.cancel_event);
break;
}
/* Timeout => read and write data */
assert(st == PY_LOCK_FAILURE);

if (lseek(watchdog.rfd, 0, SEEK_SET) < 0) {
break;
}
data_len = read(watchdog.rfd, data, MAXDATA);
if (data_len < 0) {
break;
}
if (data_len != old_data_len || memcmp(data, old_data, data_len)) {
char *tdata;
Py_ssize_t tlen;
/* Contents changed, feed them to wfd */
long x = (long) data_len;
/* We can't do anything if the consumer is too slow, just bail out */
if (write(watchdog.wfd, (void *) &x, sizeof(x)) < sizeof(x))
break;
if (write(watchdog.wfd, data, data_len) < data_len)
break;
tdata = data;
data = old_data;
old_data = tdata;
tlen = data_len;
data_len = old_data_len;
old_data_len = tlen;
}
} while (1);

close(watchdog.rfd);
close(watchdog.wfd);

/* The only way out */
PyThread_release_lock(watchdog.running);
#undef MAXDATA
}

static void
cancel_file_watchdog(void)
{
/* Notify cancellation */
PyThread_release_lock(watchdog.cancel_event);

/* Wait for thread to join */
PyThread_acquire_lock(watchdog.running, 1);
PyThread_release_lock(watchdog.running);

/* The main thread should always hold the cancel_event lock */
PyThread_acquire_lock(watchdog.cancel_event, 1);
}

static PyObject*
faulthandler_file_watchdog(PyObject *self,
PyObject *args, PyObject *kwargs)
{
static char *kwlist[] = {"rfd", "wfd", "period", NULL};
double period;
PY_TIMEOUT_T period_us;
int rfd, wfd;

if (!PyArg_ParseTupleAndKeywords(args, kwargs,
"iid:_file_watchdog", kwlist,
&rfd, &wfd, &period))
return NULL;
if ((period * 1e6) >= (double) PY_TIMEOUT_MAX) {
PyErr_SetString(PyExc_OverflowError, "period value is too large");
return NULL;
}
period_us = (PY_TIMEOUT_T)(period * 1e6);
if (period_us <= 0) {
PyErr_SetString(PyExc_ValueError, "period must be greater than 0");
return NULL;
}

/* Cancel previous thread, if running */
cancel_file_watchdog();

watchdog.rfd = rfd;
watchdog.wfd = wfd;
watchdog.period_us = period_us;

/* Arm these locks to serve as events when released */
PyThread_acquire_lock(watchdog.running, 1);

if (PyThread_start_new_thread(file_watchdog, NULL) == -1) {
PyThread_release_lock(watchdog.running);
PyErr_SetString(PyExc_RuntimeError,
"unable to start file watchdog thread");
return NULL;
}

Py_RETURN_NONE;
}

static PyObject*
faulthandler_cancel_file_watchdog(PyObject *self)
{
cancel_file_watchdog();
Py_RETURN_NONE;
}
#endif /* FAULTHANDLER_WATCHDOG */

#ifdef FAULTHANDLER_USER
static int
faulthandler_register(int signum, int chain, _Py_sighandler_t *p_previous)
Expand Down Expand Up @@ -1126,18 +978,6 @@ static PyMethodDef module_methods[] = {
"to dump_tracebacks_later().")},
#endif

#ifdef FAULTHANDLER_WATCHDOG
{"_file_watchdog",
(PyCFunction)faulthandler_file_watchdog, METH_VARARGS|METH_KEYWORDS,
PyDoc_STR("_file_watchdog(rfd, wfd, period):\n"
"feed the contents of 'rfd' to 'wfd', if changed,\n"
"every 'period seconds'.")},
{"_cancel_file_watchdog",
(PyCFunction)faulthandler_cancel_file_watchdog, METH_NOARGS,
PyDoc_STR("_cancel_file_watchdog():\ncancel the previous call "
"to _file_watchdog().")},
#endif

#ifdef FAULTHANDLER_USER
{"register",
(PyCFunction)faulthandler_register_py, METH_VARARGS|METH_KEYWORDS,
Expand Down Expand Up @@ -1263,16 +1103,6 @@ int _PyFaulthandler_Init(void)
}
PyThread_acquire_lock(thread.cancel_event, 1);
#endif
#ifdef FAULTHANDLER_WATCHDOG
watchdog.cancel_event = PyThread_allocate_lock();
watchdog.running = PyThread_allocate_lock();
if (!watchdog.cancel_event || !watchdog.running) {
PyErr_SetString(PyExc_RuntimeError,
"could not allocate locks for faulthandler");
return -1;
}
PyThread_acquire_lock(watchdog.cancel_event, 1);
#endif

return faulthandler_env_options();
}
Expand All @@ -1297,20 +1127,6 @@ void _PyFaulthandler_Fini(void)
}
#endif

#ifdef FAULTHANDLER_WATCHDOG
/* file watchdog */
if (watchdog.cancel_event) {
cancel_file_watchdog();
PyThread_release_lock(watchdog.cancel_event);
PyThread_free_lock(watchdog.cancel_event);
watchdog.cancel_event = NULL;
}
if (watchdog.running) {
PyThread_free_lock(watchdog.running);
watchdog.running = NULL;
}
#endif

#ifdef FAULTHANDLER_USER
/* user */
if (user_signals != NULL) {
Expand Down

0 comments on commit 55bce63

Please sign in to comment.