diff --git a/src/_vmprof.c b/src/_vmprof.c index 07e9a074..36fe706a 100644 --- a/src/_vmprof.c +++ b/src/_vmprof.c @@ -269,10 +269,11 @@ static PyObject *enable_vmprof(PyObject* self, PyObject *args) int memory = 0; int lines = 0; int native = 0; + int real_time = 0; double interval; char *p_error; - if (!PyArg_ParseTuple(args, "id|iii", &fd, &interval, &memory, &lines, &native)) { + if (!PyArg_ParseTuple(args, "id|iiii", &fd, &interval, &memory, &lines, &native, &real_time)) { return NULL; } @@ -291,6 +292,13 @@ static PyObject *enable_vmprof(PyObject* self, PyObject *args) return NULL; } +#ifndef VMPROF_UNIX + if (real_time) { + PyErr_SetString(PyExc_ValueError, "real time profiling is only supported on Linux and MacOS"); + return NULL; + } +#endif + vmp_profile_lines(lines); if (!Original_code_dealloc) { @@ -298,13 +306,13 @@ static PyObject *enable_vmprof(PyObject* self, PyObject *args) PyCode_Type.tp_dealloc = &cpyprof_code_dealloc; } - p_error = vmprof_init(fd, interval, memory, lines, "cpython", native); + p_error = vmprof_init(fd, interval, memory, lines, "cpython", native, real_time); if (p_error) { PyErr_SetString(PyExc_ValueError, p_error); return NULL; } - if (vmprof_enable(memory, native) < 0) { + if (vmprof_enable(memory, native, real_time) < 0) { PyErr_SetFromErrno(PyExc_OSError); return NULL; } @@ -466,18 +474,72 @@ static PyObject * vmp_get_profile_path(PyObject *module, PyObject *noargs) { } #endif + +#ifdef VMPROF_UNIX +static PyObject * +insert_real_time_thread(PyObject *module, PyObject * noargs) { + ssize_t thread_count; + + if (!is_enabled) { + PyErr_SetString(PyExc_ValueError, "vmprof is not enabled"); + return NULL; + } + + if (signal_type != SIGALRM) { + PyErr_SetString(PyExc_ValueError, "vmprof is not in real time mode"); + return NULL; + } + + while (__sync_lock_test_and_set(&spinlock, 1)) { + } + + thread_count = insert_thread(pthread_self(), -1); + __sync_lock_release(&spinlock); + + return PyLong_FromSsize_t(thread_count); +} + +static PyObject * +remove_real_time_thread(PyObject *module, PyObject * noargs) { + ssize_t thread_count; + + if (!is_enabled) { + PyErr_SetString(PyExc_ValueError, "vmprof is not enabled"); + return NULL; + } + + if (signal_type != SIGALRM) { + PyErr_SetString(PyExc_ValueError, "vmprof is not in real time mode"); + return NULL; + } + + while (__sync_lock_test_and_set(&spinlock, 1)) { + } + + thread_count = remove_thread(pthread_self(), -1); + __sync_lock_release(&spinlock); + + return PyLong_FromSsize_t(thread_count); +} +#endif + + static PyMethodDef VMProfMethods[] = { {"enable", enable_vmprof, METH_VARARGS, "Enable profiling."}, {"disable", disable_vmprof, METH_VARARGS, "Disable profiling."}, {"write_all_code_objects", write_all_code_objects, METH_VARARGS, "Write eagerly all the IDs of code objects"}, {"sample_stack_now", sample_stack_now, METH_VARARGS, "Sample the stack now"}, + {"is_enabled", vmp_is_enabled, METH_NOARGS, "Indicates if vmprof is currently sampling."}, #ifdef VMP_SUPPORTS_NATIVE_PROFILING {"resolve_addr", resolve_addr, METH_VARARGS, "Return the name of the addr"}, #endif - {"is_enabled", vmp_is_enabled, METH_NOARGS, "Indicates if vmprof is currently sampling."}, #ifdef VMPROF_UNIX {"get_profile_path", vmp_get_profile_path, METH_NOARGS, "Profile path the profiler logs to."}, + {"insert_real_time_thread", insert_real_time_thread, METH_NOARGS, + "Insert a thread into the real time profiling list."}, + {"remove_real_time_thread", remove_real_time_thread, METH_NOARGS, + "Remove a thread from the real time profiling list."}, #endif {NULL, NULL, 0, NULL} /* Sentinel */ }; diff --git a/src/vmprof.h b/src/vmprof.h index a5747970..f72f5c0e 100644 --- a/src/vmprof.h +++ b/src/vmprof.h @@ -26,6 +26,7 @@ #define PROFILE_LINES '\x02' #define PROFILE_NATIVE '\x04' #define PROFILE_RPYTHON '\x08' +#define PROFILE_REAL_TIME '\x10' #define DYN_JIT_FLAG 0xbeefbeef diff --git a/src/vmprof_common.h b/src/vmprof_common.h index 24aa3c18..66da392e 100644 --- a/src/vmprof_common.h +++ b/src/vmprof_common.h @@ -12,17 +12,82 @@ #include "vmprof_mt.h" #endif +#ifdef VMPROF_LINUX +#include +#endif + #define MAX_FUNC_NAME 1024 static long prepare_interval_usec = 0; static long profile_interval_usec = 0; -static int opened_profile(const char *interp_name, int memory, int proflines, int native); +static int opened_profile(const char *interp_name, int memory, int proflines, int native, int real_time); #ifdef VMPROF_UNIX +static int signal_type = SIGPROF; +static int itimer_type = ITIMER_PROF; +static pthread_t *threads = NULL; +static size_t threads_size = 0; +static size_t thread_count = 0; +static size_t threads_size_step = 8; static struct profbuf_s *volatile current_codes; #endif +#ifdef VMPROF_UNIX + +static inline ssize_t search_thread(pthread_t tid, ssize_t i) { + if (i < 0) + i = 0; + while (i < thread_count) { + if (pthread_equal(threads[i], tid)) + return i; + i++; + } + return -1; +} + +ssize_t insert_thread(pthread_t tid, ssize_t i) { + assert(signal_type == SIGALRM); + i = search_thread(tid, i); + if (i > 0) + return -1; + if (thread_count == threads_size) { + threads_size += threads_size_step; + threads = realloc(threads, sizeof(pid_t) * threads_size); + assert(threads != NULL); + memset(threads + thread_count, 0, sizeof(pid_t) * threads_size_step); + } + threads[thread_count++] = tid; + return thread_count; +} + +ssize_t remove_thread(pthread_t tid, ssize_t i) { + assert(signal_type == SIGALRM); + if (thread_count == 0) + return -1; + if (threads == NULL) + return -1; + i = search_thread(tid, i); + if (i < 0) + return -1; + threads[i] = threads[--thread_count]; + threads[thread_count] = 0; + return thread_count; +} + +ssize_t remove_threads(void) { + assert(signal_type == SIGALRM); + if (threads != NULL) { + free(threads); + threads = NULL; + } + thread_count = 0; + threads_size = 0; + return 0; +} + +#endif + #define MAX_STACK_DEPTH \ ((SINGLE_BUF_SIZE - sizeof(struct prof_stacktrace_s)) / sizeof(void *)) @@ -64,7 +129,7 @@ typedef struct prof_stacktrace_s { RPY_EXTERN char *vmprof_init(int fd, double interval, int memory, - int proflines, const char *interp_name, int native) + int proflines, const char *interp_name, int native, int real_time) { if (!(interval >= 1e-6 && interval < 1.0)) { /* also if it is NaN */ return "bad value for 'interval'"; @@ -74,6 +139,13 @@ char *vmprof_init(int fd, double interval, int memory, if (prepare_concurrent_bufs() < 0) return "out of memory"; #if VMPROF_UNIX + if (real_time) { + signal_type = SIGALRM; + itimer_type = ITIMER_REAL; + } else { + signal_type = SIGPROF; + itimer_type = ITIMER_PROF; + } current_codes = NULL; assert(fd >= 0); #else @@ -85,14 +157,14 @@ char *vmprof_init(int fd, double interval, int memory, } #endif vmp_set_profile_fileno(fd); - if (opened_profile(interp_name, memory, proflines, native) < 0) { + if (opened_profile(interp_name, memory, proflines, native, real_time) < 0) { vmp_set_profile_fileno(0); return strerror(errno); } return NULL; } -static int opened_profile(const char *interp_name, int memory, int proflines, int native) +static int opened_profile(const char *interp_name, int memory, int proflines, int native, int real_time) { int success; int bits; @@ -119,7 +191,7 @@ static int opened_profile(const char *interp_name, int memory, int proflines, in header.interp_name[1] = '\x00'; header.interp_name[2] = VERSION_TIMESTAMP; header.interp_name[3] = memory*PROFILE_MEMORY + proflines*PROFILE_LINES + \ - native*PROFILE_NATIVE; + native*PROFILE_NATIVE + real_time*PROFILE_REAL_TIME; #ifdef RPYTHON_VMPROF header.interp_name[3] += PROFILE_RPYTHON; #endif diff --git a/src/vmprof_main.h b/src/vmprof_main.h index a85ae8d0..9f0e0c74 100644 --- a/src/vmprof_main.h +++ b/src/vmprof_main.h @@ -48,11 +48,14 @@ #include "rss_darwin.h" #endif +#if VMPROF_LINUX +#include +#endif /************************************************************/ static void *(*mainloop_get_virtual_ip)(char *) = 0; -static int opened_profile(const char *interp_name, int memory, int proflines, int native); +static int opened_profile(const char *interp_name, int memory, int proflines, int native, int real_time); static void flush_codes(void); /************************************************************/ @@ -94,7 +97,7 @@ int get_stack_trace(PY_THREAD_STATE_T * current, void** result, int max_depth, i { PY_STACK_FRAME_T * frame; #ifdef RPYTHON_VMPROF - // do nothing here, + // do nothing here, frame = (PY_STACK_FRAME_T*)current; #else if (!current) { @@ -181,11 +184,48 @@ static PY_THREAD_STATE_T * _get_pystate_for_this_thread(void) { } #endif +#ifdef VMPROF_UNIX +static int broadcast_signal_for_threads(void) +{ + int done = 1; + size_t i = 0; + pthread_t self = pthread_self(); + pthread_t tid; + while (i < thread_count) { + tid = threads[i]; + if (pthread_equal(tid, self)) { + done = 0; + } else if (pthread_kill(tid, SIGALRM)) { + remove_thread(tid, i); + } + i++; + } + return done; +} +#endif + +#ifdef VMPROF_LINUX +static inline int is_main_thread(void) +{ + pid_t pid = getpid(); + pid_t tid = (pid_t) syscall(SYS_gettid); + return (pid == tid); +} +#endif + +#ifdef VMPROF_APPLE +static inline int is_main_thread(void) +{ + return pthread_main_np(); +} +#endif + static void sigprof_handler(int sig_nr, siginfo_t* info, void *ucontext) { int commit; PY_THREAD_STATE_T * tstate = NULL; void (*prevhandler)(int); + #ifndef RPYTHON_VMPROF // TERRIBLE HACK AHEAD // on OS X, the thread local storage is sometimes uninitialized @@ -199,6 +239,21 @@ static void sigprof_handler(int sig_nr, siginfo_t* info, void *ucontext) // get_current_thread_state returns a sane result while (__sync_lock_test_and_set(&spinlock, 1)) { } + +#ifdef VMPROF_UNIX + // SIGNAL ABUSE AHEAD + // On linux, the prof timer will deliver the signal to the thread which triggered the timer, + // because these timers are based on process and system time, and as such, are thread-aware. + // For the real timer, the signal gets delivered to the main thread, seemingly always. + // Consequently if we want to sample multiple threads, we need to forward this signal. + if (signal_type == SIGALRM) { + if (is_main_thread() && broadcast_signal_for_threads()) { + __sync_lock_release(&spinlock); + return; + } + } +#endif + prevhandler = signal(SIGSEGV, &segfault_handler); int fault_code = setjmp(restore_point); if (fault_code == 0) { @@ -257,7 +312,7 @@ static int install_sigprof_handler(void) sa.sa_sigaction = sigprof_handler; sa.sa_flags = SA_RESTART | SA_SIGINFO; if (sigemptyset(&sa.sa_mask) == -1 || - sigaction(SIGPROF, &sa, NULL) == -1) + sigaction(signal_type, &sa, NULL) == -1) return -1; return 0; } @@ -269,7 +324,7 @@ static int remove_sigprof_handler(void) ign_sigint.sa_flags = 0; sigemptyset(&ign_sigint.sa_mask); - if (sigaction(SIGPROF, &ign_sigint, NULL) < 0) { + if (sigaction(signal_type, &ign_sigint, NULL) < 0) { fprintf(stderr, "Could not remove the signal handler (for profiling)\n"); return -1; } @@ -282,7 +337,7 @@ static int install_sigprof_timer(void) timer.it_interval.tv_sec = 0; timer.it_interval.tv_usec = (int)profile_interval_usec; timer.it_value = timer.it_interval; - if (setitimer(ITIMER_PROF, &timer, NULL) != 0) + if (setitimer(itimer_type, &timer, NULL) != 0) return -1; return 0; } @@ -291,7 +346,7 @@ static int remove_sigprof_timer(void) { static struct itimerval timer; timerclear(&(timer.it_interval)); timerclear(&(timer.it_value)); - if (setitimer(ITIMER_PROF, &timer, NULL) != 0) { + if (setitimer(itimer_type, &timer, NULL) != 0) { fprintf(stderr, "Could not disable the signal handler (for profiling)\n"); return -1; } @@ -361,7 +416,7 @@ static void disable_cpyprof(void) #endif RPY_EXTERN -int vmprof_enable(int memory, int native) +int vmprof_enable(int memory, int native, int real_time) { #ifdef VMP_SUPPORTS_NATIVE_PROFILING init_cpyprof(native); @@ -371,6 +426,10 @@ int vmprof_enable(int memory, int native) profile_interval_usec = prepare_interval_usec; if (memory && setup_rss() == -1) goto error; +#if VMPROF_UNIX + if (real_time && insert_thread(pthread_self(), -1) == -1) + goto error; +#endif if (install_pthread_atfork_hooks() == -1) goto error; if (install_sigprof_handler() == -1) @@ -416,6 +475,10 @@ int vmprof_disable(void) return -1; if (remove_sigprof_handler() == -1) return -1; +#ifdef VMPROF_UNIX + if ((signal_type == SIGALRM) && remove_threads() == -1) + return -1; +#endif flush_codes(); if (shutdown_concurrent_bufs(vmp_profile_fileno()) < 0) return -1; diff --git a/src/vmprof_main_win32.h b/src/vmprof_main_win32.h index ce4d7151..8e769ca7 100644 --- a/src/vmprof_main_win32.h +++ b/src/vmprof_main_win32.h @@ -160,7 +160,7 @@ long __stdcall vmprof_mainloop(void *arg) } RPY_EXTERN -int vmprof_enable(int memory, int native) +int vmprof_enable(int memory, int native, int real_time) { if (!thread_started) { if (!CreateThread(NULL, 0, vmprof_mainloop, NULL, 0, NULL)) { diff --git a/vmprof/__init__.py b/vmprof/__init__.py index 4ed6eb67..e3296406 100644 --- a/vmprof/__init__.py +++ b/vmprof/__init__.py @@ -47,7 +47,7 @@ def _is_native_enabled(native): return native if IS_PYPY: - def enable(fileno, period=DEFAULT_PERIOD, memory=False, lines=False, native=None, warn=True): + def enable(fileno, period=DEFAULT_PERIOD, memory=False, lines=False, native=None, real_time=False, warn=True): pypy_version_info = sys.pypy_version_info[:3] if not isinstance(period, float): raise ValueError("You need to pass a float as an argument") @@ -61,17 +61,17 @@ def enable(fileno, period=DEFAULT_PERIOD, memory=False, lines=False, native=None native = _is_native_enabled(native) gz_fileno = _gzip_start(fileno) if pypy_version_info >= (5, 8, 0): - _vmprof.enable(gz_fileno, period, memory, lines, native) + _vmprof.enable(gz_fileno, period, memory, lines, native, real_time) else: _vmprof.enable(gz_fileno, period) # , memory, lines, native) else: # CPYTHON - def enable(fileno, period=DEFAULT_PERIOD, memory=False, lines=False, native=None): + def enable(fileno, period=DEFAULT_PERIOD, memory=False, lines=False, native=None, real_time=False): if not isinstance(period, float): raise ValueError("You need to pass a float as an argument") gz_fileno = _gzip_start(fileno) native = _is_native_enabled(native) - _vmprof.enable(gz_fileno, period, memory, lines, native) + _vmprof.enable(gz_fileno, period, memory, lines, native, real_time) def sample_stack_now(skip=0): """ Helper utility mostly for tests, this is considered @@ -91,6 +91,21 @@ def resolve_addr(addr): return _vmprof.resolve_addr(addr) +def insert_real_time_thread(): + """ Inserts a thread into the list of threads to be sampled in real time mode. + When enabling real time mode, the caller thread is inserted automatically. + Returns the number of registered threads, or -1 if we can't insert thread. + """ + return _vmprof.insert_real_time_thread() + +def remove_real_time_thread(): + """ Removes a thread from the list of threads to be sampled in real time mode. + When disabling in real time mode, *all* threads are removed automatically. + Returns the number of registered threads, or -1 if we can't remove thread. + """ + return _vmprof.remove_real_time_thread() + + def is_enabled(): """ Indicates if vmprof has already been enabled for this process. Returns True or False. None is returned if the state is unknown. diff --git a/vmprof/profiler.py b/vmprof/profiler.py index 2ee2fa84..b281d80e 100644 --- a/vmprof/profiler.py +++ b/vmprof/profiler.py @@ -11,18 +11,21 @@ class VMProfError(Exception): class ProfilerContext(object): done = False - def __init__(self, name, memory, native, only_needed): + def __init__(self, name, period, memory, native, only_needed, real_time): if name is None: self.tmpfile = tempfile.NamedTemporaryFile("w+b", delete=False) else: self.tmpfile = open(name, "w+b") self.filename = self.tmpfile.name + self.period = period self.memory = memory self.native = native self.only_needed = only_needed + self.real_time = real_time def __enter__(self): - vmprof.enable(self.tmpfile.fileno(), 0.001, self.memory, native=self.native) + vmprof.enable(self.tmpfile.fileno(), self.period, self.memory, + native=self.native, real_time=self.real_time) def __exit__(self, type, value, traceback): vmprof.disable(only_needed=self.only_needed) @@ -54,8 +57,8 @@ class Profiler(object): def __init__(self): self._lib_cache = {} - def measure(self, name=None, memory=False, native=False, only_needed=False): - self.ctx = ProfilerContext(name, memory, native, only_needed) + def measure(self, name=None, period=0.001, memory=False, native=False, only_needed=False, real_time=False): + self.ctx = ProfilerContext(name, period, memory, native, only_needed, real_time) return self.ctx def get_stats(self): diff --git a/vmprof/test/test_run.py b/vmprof/test/test_run.py index 5a9fdc3f..cd12f695 100644 --- a/vmprof/test/test_run.py +++ b/vmprof/test/test_run.py @@ -3,6 +3,7 @@ import py import sys import tempfile +import time import gzip import time import pytz @@ -77,11 +78,28 @@ def function_bar(): return function_foo() +def functime_foo(t=0.05, insert=False): + if (insert): + vmprof.insert_real_time_thread() + return time.sleep(t) + + +def functime_bar(t=0.05, remove=False): + if (remove): + vmprof.remove_real_time_thread() + return time.sleep(t) + + foo_full_name = "py:function_foo:%d:%s" % (function_foo.__code__.co_firstlineno, function_foo.__code__.co_filename) bar_full_name = "py:function_bar:%d:%s" % (function_bar.__code__.co_firstlineno, function_bar.__code__.co_filename) +foo_time_name = "py:functime_foo:%d:%s" % (functime_foo.__code__.co_firstlineno, + functime_foo.__code__.co_filename) +bar_time_name = "py:functime_bar:%d:%s" % (functime_bar.__code__.co_firstlineno, + functime_bar.__code__.co_filename) + GZIP = False def test_basic(): @@ -238,6 +256,43 @@ def function_bar(): prof.get_stats() + +@py.test.mark.skipif("sys.platform == 'win32'") +def test_vmprof_real_time(): + prof = vmprof.Profiler() + with prof.measure(real_time=True): + functime_foo() + stats = prof.get_stats() + tprof = stats.top_profile() + d = dict(tprof) + assert d[foo_time_name] > 0 + + +@py.test.mark.skipif("'__pypy__' in sys.builtin_module_names") +@py.test.mark.skipif("sys.platform == 'win32'") +@py.test.mark.parametrize("insert_foo,remove_bar", [ + (False, False), + (False, True), + ( True, False), + ( True, True), +]) +def test_vmprof_real_time_threaded(insert_foo, remove_bar): + import threading + prof = vmprof.Profiler() + wait = 0.5 + thread = threading.Thread(target=functime_foo, args=[wait, insert_foo]) + with prof.measure(period=0.25, real_time=True): + thread.start() + functime_bar(wait, remove_bar) + thread.join() + stats = prof.get_stats() + tprof = stats.top_profile() + d = dict(tprof) + assert insert_foo == (foo_time_name in d) + assert remove_bar != (bar_time_name in d) + + + if GZIP: def test_gzip_problem(): tmpfile = tempfile.NamedTemporaryFile(delete=False)