Skip to content

Commit

Permalink
Clean TraceBack to reduce memory leaks for exception task (celery#6024)
Browse files Browse the repository at this point in the history
* Clean TraceBack to reduce memory leaks

* add unit test

* add unit test

* reject unittest

* Patch For Python 2.7 compatibility

* update unittest

* Register to the garbage collector by explicitly referring to f_locals.

* need more check

* update code coverage

* update Missing unit test

* 3.4 -> 3.5

Co-authored-by: heedong.jung <heedong.jung@samsung.com>
  • Loading branch information
2 people authored and thebarbershop committed Jul 1, 2020
1 parent f03ede7 commit 59836a3
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 7 deletions.
36 changes: 35 additions & 1 deletion celery/app/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@
"""

log_policy_t = namedtuple(
'log_policy_t', ('format', 'description', 'severity', 'traceback', 'mail'),
'log_policy_t',
('format', 'description', 'severity', 'traceback', 'mail'),
)

log_policy_reject = log_policy_t(LOG_REJECTED, 'rejected', logging.WARN, 1, 1)
Expand Down Expand Up @@ -256,6 +257,31 @@ def _log_error(self, task, req, einfo):
extra={'data': context})


def traceback_clear(exc=None):
# Cleared Tb, but einfo still has a reference to Traceback.
# exc cleans up the Traceback at the last moment that can be revealed.
tb = None
if exc is not None:
if hasattr(exc, '__traceback__'):
tb = exc.__traceback__
else:
_, _, tb = sys.exc_info()
else:
_, _, tb = sys.exc_info()

if sys.version_info >= (3, 5, 0):
while tb is not None:
try:
tb.tb_frame.clear()
tb.tb_frame.f_locals
except RuntimeError:
# Ignore the exception raised if the frame is still executing.
pass
tb = tb.tb_next

elif (2, 7, 0) <= sys.version_info < (3, 0, 0):
sys.exc_clear()

def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
Info=TraceInfo, eager=False, propagate=False, app=None,
monotonic=monotonic, trace_ok_t=trace_ok_t,
Expand Down Expand Up @@ -385,16 +411,24 @@ def trace_task(uuid, args, kwargs, request=None):
I, R = Info(REJECTED, exc), ExceptionInfo(internal=True)
state, retval = I.state, I.retval
I.handle_reject(task, task_request)
traceback_clear(exc)
except Ignore as exc:
I, R = Info(IGNORED, exc), ExceptionInfo(internal=True)
state, retval = I.state, I.retval
I.handle_ignore(task, task_request)
traceback_clear(exc)
except Retry as exc:
I, R, state, retval = on_error(
task_request, exc, uuid, RETRY, call_errbacks=False)
traceback_clear(exc)
except Exception as exc:
I, R, state, retval = on_error(task_request, exc, uuid)
<<<<<<< HEAD
except BaseException as exc:
=======
traceback_clear(exc)
except BaseException:
>>>>>>> c4dd6e840... Clean TraceBack to reduce memory leaks for exception task (#6024)
raise
else:
try:
Expand Down
13 changes: 13 additions & 0 deletions celery/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,19 @@ def fail_from_current_stack(self, task_id, exc=None):
self.mark_as_failure(task_id, exc, ei.traceback)
return ei
finally:
if sys.version_info >= (3, 5, 0):
while tb is not None:
try:
tb.tb_frame.clear()
tb.tb_frame.f_locals
except RuntimeError:
# Ignore the exception raised if the frame is still executing.
pass
tb = tb.tb_next

elif (2, 7, 0) <= sys.version_info < (3, 0, 0):
sys.exc_clear()

del tb

def prepare_exception(self, exc, serializer=None):
Expand Down
24 changes: 23 additions & 1 deletion t/unit/backends/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,10 +285,22 @@ def test_reload_task_result(self):
self.b.reload_task_result('task-exists')
self.b._cache['task-exists'] = {'result': 'task'}


def test_fail_from_current_stack(self):
import inspect
self.b.mark_as_failure = Mock()
try:
frame_list = []

if (2, 7, 0) <= sys.version_info < (3, 0, 0):
sys.exc_clear = Mock()

def raise_dummy():
frame_str_temp = str(inspect.currentframe().__repr__)
frame_list.append(frame_str_temp)
local_value = 1214
raise KeyError('foo')
try:
raise_dummy()
except KeyError as exc:
self.b.fail_from_current_stack('task_id')
self.b.mark_as_failure.assert_called()
Expand All @@ -297,6 +309,16 @@ def test_fail_from_current_stack(self):
assert args[1] is exc
assert args[2]

if sys.version_info >= (3, 5, 0):
tb_ = exc.__traceback__
while tb_ is not None:
if str(tb_.tb_frame.__repr__) == frame_list[0]:
assert len(tb_.tb_frame.f_locals) == 0
tb_ = tb_.tb_next
elif (2, 7, 0) <= sys.version_info < (3, 0, 0):
sys.exc_clear.assert_called()


def test_prepare_value_serializes_group_result(self):
self.b.serializer = 'json'
g = self.app.GroupResult('group_id', [self.app.AsyncResult('foo')])
Expand Down
72 changes: 67 additions & 5 deletions t/unit/tasks/test_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
log_policy_internal, log_policy_reject,
log_policy_unexpected,
reset_worker_optimizations,
setup_worker_optimizations, trace_task)
setup_worker_optimizations, trace_task,
traceback_clear)

from celery.exceptions import Ignore, Reject, Retry


Expand Down Expand Up @@ -150,23 +152,79 @@ def add(x, y):
with pytest.raises(MemoryError):
self.trace(add, (2, 2), {}, eager=False)

def test_when_Ignore(self):
def test_traceback_clear(self):
import inspect, sys
sys.exc_clear = Mock()
frame_list =[]

def raise_dummy():
frame_str_temp = str(inspect.currentframe().__repr__)
frame_list.append(frame_str_temp)
local_value = 1214
raise KeyError('foo')
try:
raise_dummy()
except KeyError as exc:
traceback_clear(exc)

if sys.version_info >= (3, 5, 0):
tb_ = exc.__traceback__
while tb_ is not None:
if str(tb_.tb_frame.__repr__) == frame_list[0]:
assert len(tb_.tb_frame.f_locals) == 0
tb_ = tb_.tb_next
elif (2, 7, 0) <= sys.version_info < (3, 0, 0):
sys.exc_clear.assert_called()

try:
raise_dummy()
except KeyError as exc:
traceback_clear()

if sys.version_info >= (3, 5, 0):
tb_ = exc.__traceback__
while tb_ is not None:
if str(tb_.tb_frame.__repr__) == frame_list[0]:
assert len(tb_.tb_frame.f_locals) == 0
tb_ = tb_.tb_next
elif (2, 7, 0) <= sys.version_info < (3, 0, 0):
sys.exc_clear.assert_called()

try:
raise_dummy()
except KeyError as exc:
traceback_clear(str(exc))

if sys.version_info >= (3, 5, 0):
tb_ = exc.__traceback__
while tb_ is not None:
if str(tb_.tb_frame.__repr__) == frame_list[0]:
assert len(tb_.tb_frame.f_locals) == 0
tb_ = tb_.tb_next
elif (2, 7, 0) <= sys.version_info < (3, 0, 0):
sys.exc_clear.assert_called()

@patch('celery.app.trace.traceback_clear')
def test_when_Ignore(self, mock_traceback_clear):

@self.app.task(shared=False)
def ignored():
raise Ignore()

retval, info = self.trace(ignored, (), {})
assert info.state == states.IGNORED
mock_traceback_clear.assert_called()

def test_when_Reject(self):
@patch('celery.app.trace.traceback_clear')
def test_when_Reject(self, mock_traceback_clear):

@self.app.task(shared=False)
def rejecting():
raise Reject()

retval, info = self.trace(rejecting, (), {})
assert info.state == states.REJECTED
mock_traceback_clear.assert_called()

def test_backend_cleanup_raises(self):
self.add.backend.process_cleanup = Mock()
Expand Down Expand Up @@ -248,17 +306,21 @@ def test_trace_SystemExit(self):
with pytest.raises(SystemExit):
self.trace(self.raises, (SystemExit(),), {})

def test_trace_Retry(self):
@patch('celery.app.trace.traceback_clear')
def test_trace_Retry(self, mock_traceback_clear):
exc = Retry('foo', 'bar')
_, info = self.trace(self.raises, (exc,), {})
assert info.state == states.RETRY
assert info.retval is exc
mock_traceback_clear.assert_called()

def test_trace_exception(self):
@patch('celery.app.trace.traceback_clear')
def test_trace_exception(self, mock_traceback_clear):
exc = KeyError('foo')
_, info = self.trace(self.raises, (exc,), {})
assert info.state == states.FAILURE
assert info.retval is exc
mock_traceback_clear.assert_called()

def test_trace_task_ret__no_content_type(self):
_trace_task_ret(
Expand Down

0 comments on commit 59836a3

Please sign in to comment.