Permalink
Browse files

ENH: Tests!

  • Loading branch information...
rkern committed Aug 31, 2014
1 parent 14808ac commit 8f44d6cf640ce2754af07a4c16e30b9d7e1582de
Showing with 208 additions and 27 deletions.
  1. +19 −14 _line_profiler.pyx
  2. +10 −11 line_profiler.py
  3. +6 −2 python25.pxd
  4. +74 −0 tests/test_kernprof.py
  5. +99 −0 tests/test_line_profiler.py
View
@@ -58,10 +58,8 @@ cdef class LineTiming:
"""
cdef public object code
cdef public int lineno
# Note: leave at least total_time private. This should help compile under
# Python 2.4.
cdef PY_LONG_LONG total_time
cdef long nhits
cdef public PY_LONG_LONG total_time
cdef public long nhits
def __init__(self, object code, int lineno):
self.code = code
@@ -106,9 +104,9 @@ class LineStats(object):
cdef class LineProfiler:
""" Time the execution of lines of Python code.
"""
cdef public object functions
cdef public object code_map
cdef public object last_time
cdef public list functions
cdef public dict code_map
cdef public dict last_time
cdef public double timer_unit
cdef public long enable_count
@@ -186,21 +184,26 @@ cdef class LastTime:
self.time = time
cdef int python_trace_callback(object self, PyFrameObject *py_frame, int what,
cdef int python_trace_callback(object self_, PyFrameObject *py_frame, int what,
PyObject *arg):
""" The PyEval_SetTrace() callback.
"""
cdef object code, line_entries, key
cdef LineProfiler self
cdef object code, key
cdef dict line_entries, last_time
cdef LineTiming entry
cdef LastTime old
cdef PY_LONG_LONG time
self = <LineProfiler>self_
last_time = self.last_time
if what == PyTrace_LINE or what == PyTrace_RETURN:
code = <object>py_frame.f_code
if code in self.code_map:
time = hpTimer()
if code in self.last_time:
old = self.last_time[code]
if code in last_time:
old = last_time[code]
line_entries = self.code_map[code]
key = old.f_lineno
if key not in line_entries:
@@ -212,11 +215,13 @@ cdef int python_trace_callback(object self, PyFrameObject *py_frame, int what,
if what == PyTrace_LINE:
# Get the time again. This way, we don't record much time wasted
# in this function.
self.last_time[code] = LastTime(py_frame.f_lineno, hpTimer())
last_time[code] = LastTime(py_frame.f_lineno, hpTimer())
else:
# We are returning from a function, not executing a line. Delete
# the last_time record.
del self.last_time[code]
# the last_time record. It may have already been deleted if we
# are profiling a generator that is being pumped past its end.
if code in last_time:
del last_time[code]
return 0
View
@@ -10,6 +10,7 @@
from cStringIO import StringIO
except ImportError:
from io import StringIO
import functools
import inspect
import linecache
import optparse
@@ -60,19 +61,16 @@ def __call__(self, func):
"""
self.add_function(func)
if is_generator(func):
f = self.wrap_generator(func)
wrapper = self.wrap_generator(func)
else:
f = self.wrap_function(func)
f.__module__ = func.__module__
f.__name__ = func.__name__
f.__doc__ = func.__doc__
f.__dict__.update(getattr(func, '__dict__', {}))
return f
wrapper = self.wrap_function(func)
return wrapper
def wrap_generator(self, func):
""" Wrap a generator to profile it.
"""
def f(*args, **kwds):
@functools.wraps(func)
def wrapper(*args, **kwds):
g = func(*args, **kwds)
# The first iterate will not be a .send()
self.enable_by_count()
@@ -89,19 +87,20 @@ def f(*args, **kwds):
finally:
self.disable_by_count()
input = (yield item)
return f
return wrapper
def wrap_function(self, func):
""" Wrap a function to profile it.
"""
def f(*args, **kwds):
@functools.wraps(func)
def wrapper(*args, **kwds):
self.enable_by_count()
try:
result = func(*args, **kwds)
finally:
self.disable_by_count()
return result
return f
return wrapper
def dump_stats(self, filename):
""" Dump a representation of the data to a file as a pickled LineStats
View
@@ -399,8 +399,12 @@ cdef extern from "Python.h":
ctypedef struct PyThreadState:
PyFrameObject * frame
int recursion_depth
void * curexc_type, * curexc_value, * curexc_traceback
void * exc_type, * exc_value, * exc_traceback
void * curexc_type
void * curexc_value
void * curexc_traceback
void * exc_type
void * exc_value
void * exc_traceback
void PyEval_AcquireLock ()
void PyEval_ReleaseLock ()
View
@@ -0,0 +1,74 @@
import unittest
from kernprof import ContextualProfile
def f(x):
""" A function. """
y = x + 10
return y
def g(x):
""" A generator. """
y = yield x + 10
yield y + 20
class TestKernprof(unittest.TestCase):
def test_enable_disable(self):
profile = ContextualProfile()
self.assertEqual(profile.enable_count, 0)
profile.enable_by_count()
self.assertEqual(profile.enable_count, 1)
profile.enable_by_count()
self.assertEqual(profile.enable_count, 2)
profile.disable_by_count()
self.assertEqual(profile.enable_count, 1)
profile.disable_by_count()
self.assertEqual(profile.enable_count, 0)
profile.disable_by_count()
self.assertEqual(profile.enable_count, 0)
with profile:
self.assertEqual(profile.enable_count, 1)
with profile:
self.assertEqual(profile.enable_count, 2)
self.assertEqual(profile.enable_count, 1)
self.assertEqual(profile.enable_count, 0)
with self.assertRaises(RuntimeError):
self.assertEqual(profile.enable_count, 0)
with profile:
self.assertEqual(profile.enable_count, 1)
raise RuntimeError()
self.assertEqual(profile.enable_count, 0)
def test_function_decorator(self):
profile = ContextualProfile()
f_wrapped = profile(f)
self.assertEqual(f_wrapped.__name__, f.__name__)
self.assertEqual(f_wrapped.__doc__, f.__doc__)
self.assertEqual(profile.enable_count, 0)
value = f_wrapped(10)
self.assertEqual(profile.enable_count, 0)
self.assertEqual(value, f(10))
def test_gen_decorator(self):
profile = ContextualProfile()
g_wrapped = profile(g)
self.assertEqual(g_wrapped.__name__, g.__name__)
self.assertEqual(g_wrapped.__doc__, g.__doc__)
self.assertEqual(profile.enable_count, 0)
i = g_wrapped(10)
self.assertEqual(profile.enable_count, 0)
self.assertEqual(next(i), 20)
self.assertEqual(profile.enable_count, 0)
self.assertEqual(i.send(30), 50)
self.assertEqual(profile.enable_count, 0)
with self.assertRaises(StopIteration):
next(i)
self.assertEqual(profile.enable_count, 0)
@@ -0,0 +1,99 @@
import unittest
from line_profiler import LineProfiler
def f(x):
y = x + 10
return y
def g(x):
y = yield x + 10
yield y + 20
class TestLineProfiler(unittest.TestCase):
def test_init(self):
lp = LineProfiler()
self.assertEqual(lp.functions, [])
self.assertEqual(lp.code_map, {})
lp = LineProfiler(f)
self.assertEqual(lp.functions, [f])
self.assertEqual(lp.code_map, {f.__code__: {}})
lp = LineProfiler(f, g)
self.assertEqual(lp.functions, [f, g])
self.assertEqual(lp.code_map, {
f.__code__: {},
g.__code__: {},
})
def test_enable_disable(self):
lp = LineProfiler()
self.assertEqual(lp.enable_count, 0)
lp.enable_by_count()
self.assertEqual(lp.enable_count, 1)
lp.enable_by_count()
self.assertEqual(lp.enable_count, 2)
lp.disable_by_count()
self.assertEqual(lp.enable_count, 1)
lp.disable_by_count()
self.assertEqual(lp.enable_count, 0)
self.assertEqual(lp.last_time, {})
lp.disable_by_count()
self.assertEqual(lp.enable_count, 0)
with lp:
self.assertEqual(lp.enable_count, 1)
with lp:
self.assertEqual(lp.enable_count, 2)
self.assertEqual(lp.enable_count, 1)
self.assertEqual(lp.enable_count, 0)
self.assertEqual(lp.last_time, {})
with self.assertRaises(RuntimeError):
self.assertEqual(lp.enable_count, 0)
with lp:
self.assertEqual(lp.enable_count, 1)
raise RuntimeError()
self.assertEqual(lp.enable_count, 0)
self.assertEqual(lp.last_time, {})
def test_function_decorator(self):
profile = LineProfiler()
f_wrapped = profile(f)
self.assertEqual(f_wrapped.__name__, 'f')
self.assertEqual(profile.enable_count, 0)
value = f_wrapped(10)
self.assertEqual(profile.enable_count, 0)
self.assertEqual(value, f(10))
timings = profile.code_map[f.__code__]
self.assertEqual(len(timings), 2)
for timing in timings.values():
self.assertEqual(timing.nhits, 1)
def test_gen_decorator(self):
profile = LineProfiler()
g_wrapped = profile(g)
self.assertEqual(g_wrapped.__name__, 'g')
timings = profile.code_map[g.__code__]
self.assertEqual(profile.enable_count, 0)
i = g_wrapped(10)
self.assertEqual(profile.enable_count, 0)
self.assertEqual(next(i), 20)
self.assertEqual(profile.enable_count, 0)
self.assertEqual(len(timings), 1)
self.assertEqual(i.send(30), 50)
self.assertEqual(profile.enable_count, 0)
self.assertEqual(len(timings), 2)
with self.assertRaises(StopIteration):
next(i)
self.assertEqual(profile.enable_count, 0)
self.assertEqual(len(timings), 2)
for timing in timings.values():
self.assertEqual(timing.nhits, 1)

0 comments on commit 8f44d6c

Please sign in to comment.