From 59836a35c9ee995be3d27be9130e7fa3bad8ba09 Mon Sep 17 00:00:00 2001 From: heedong <63043496+heedong-jung@users.noreply.github.com> Date: Tue, 7 Apr 2020 15:33:28 +0900 Subject: [PATCH] Clean TraceBack to reduce memory leaks for exception task (#6024) * Clean TraceBack to reduce memory leaks * add unit test * add unit test * reject unittest * Patch For Python 2.7 compatibility * update unittest * Register to the garbage collector by explicitly referring to f_locals. * need more check * update code coverage * update Missing unit test * 3.4 -> 3.5 Co-authored-by: heedong.jung --- celery/app/trace.py | 36 +++++++++++++++++- celery/backends/base.py | 13 +++++++ t/unit/backends/test_base.py | 24 +++++++++++- t/unit/tasks/test_trace.py | 72 +++++++++++++++++++++++++++++++++--- 4 files changed, 138 insertions(+), 7 deletions(-) diff --git a/celery/app/trace.py b/celery/app/trace.py index f4c802f7548..381526c63a1 100644 --- a/celery/app/trace.py +++ b/celery/app/trace.py @@ -82,7 +82,8 @@ """ log_policy_t = namedtuple( - 'log_policy_t', ('format', 'description', 'severity', 'traceback', 'mail'), + 'log_policy_t', + ('format', 'description', 'severity', 'traceback', 'mail'), ) log_policy_reject = log_policy_t(LOG_REJECTED, 'rejected', logging.WARN, 1, 1) @@ -256,6 +257,31 @@ def _log_error(self, task, req, einfo): extra={'data': context}) +def traceback_clear(exc=None): + # Cleared Tb, but einfo still has a reference to Traceback. + # exc cleans up the Traceback at the last moment that can be revealed. + tb = None + if exc is not None: + if hasattr(exc, '__traceback__'): + tb = exc.__traceback__ + else: + _, _, tb = sys.exc_info() + else: + _, _, tb = sys.exc_info() + + if sys.version_info >= (3, 5, 0): + while tb is not None: + try: + tb.tb_frame.clear() + tb.tb_frame.f_locals + except RuntimeError: + # Ignore the exception raised if the frame is still executing. + pass + tb = tb.tb_next + + elif (2, 7, 0) <= sys.version_info < (3, 0, 0): + sys.exc_clear() + def build_tracer(name, task, loader=None, hostname=None, store_errors=True, Info=TraceInfo, eager=False, propagate=False, app=None, monotonic=monotonic, trace_ok_t=trace_ok_t, @@ -385,16 +411,24 @@ def trace_task(uuid, args, kwargs, request=None): I, R = Info(REJECTED, exc), ExceptionInfo(internal=True) state, retval = I.state, I.retval I.handle_reject(task, task_request) + traceback_clear(exc) except Ignore as exc: I, R = Info(IGNORED, exc), ExceptionInfo(internal=True) state, retval = I.state, I.retval I.handle_ignore(task, task_request) + traceback_clear(exc) except Retry as exc: I, R, state, retval = on_error( task_request, exc, uuid, RETRY, call_errbacks=False) + traceback_clear(exc) except Exception as exc: I, R, state, retval = on_error(task_request, exc, uuid) +<<<<<<< HEAD except BaseException as exc: +======= + traceback_clear(exc) + except BaseException: +>>>>>>> c4dd6e840... Clean TraceBack to reduce memory leaks for exception task (#6024) raise else: try: diff --git a/celery/backends/base.py b/celery/backends/base.py index 2837d522bcb..ebdc8afbb02 100644 --- a/celery/backends/base.py +++ b/celery/backends/base.py @@ -229,6 +229,19 @@ def fail_from_current_stack(self, task_id, exc=None): self.mark_as_failure(task_id, exc, ei.traceback) return ei finally: + if sys.version_info >= (3, 5, 0): + while tb is not None: + try: + tb.tb_frame.clear() + tb.tb_frame.f_locals + except RuntimeError: + # Ignore the exception raised if the frame is still executing. + pass + tb = tb.tb_next + + elif (2, 7, 0) <= sys.version_info < (3, 0, 0): + sys.exc_clear() + del tb def prepare_exception(self, exc, serializer=None): diff --git a/t/unit/backends/test_base.py b/t/unit/backends/test_base.py index 4e02daeef69..4af8b6c6f3b 100644 --- a/t/unit/backends/test_base.py +++ b/t/unit/backends/test_base.py @@ -285,10 +285,22 @@ def test_reload_task_result(self): self.b.reload_task_result('task-exists') self.b._cache['task-exists'] = {'result': 'task'} + def test_fail_from_current_stack(self): + import inspect self.b.mark_as_failure = Mock() - try: + frame_list = [] + + if (2, 7, 0) <= sys.version_info < (3, 0, 0): + sys.exc_clear = Mock() + + def raise_dummy(): + frame_str_temp = str(inspect.currentframe().__repr__) + frame_list.append(frame_str_temp) + local_value = 1214 raise KeyError('foo') + try: + raise_dummy() except KeyError as exc: self.b.fail_from_current_stack('task_id') self.b.mark_as_failure.assert_called() @@ -297,6 +309,16 @@ def test_fail_from_current_stack(self): assert args[1] is exc assert args[2] + if sys.version_info >= (3, 5, 0): + tb_ = exc.__traceback__ + while tb_ is not None: + if str(tb_.tb_frame.__repr__) == frame_list[0]: + assert len(tb_.tb_frame.f_locals) == 0 + tb_ = tb_.tb_next + elif (2, 7, 0) <= sys.version_info < (3, 0, 0): + sys.exc_clear.assert_called() + + def test_prepare_value_serializes_group_result(self): self.b.serializer = 'json' g = self.app.GroupResult('group_id', [self.app.AsyncResult('foo')]) diff --git a/t/unit/tasks/test_trace.py b/t/unit/tasks/test_trace.py index 3174f203ae6..ce66dbd7f3a 100644 --- a/t/unit/tasks/test_trace.py +++ b/t/unit/tasks/test_trace.py @@ -12,7 +12,9 @@ log_policy_internal, log_policy_reject, log_policy_unexpected, reset_worker_optimizations, - setup_worker_optimizations, trace_task) + setup_worker_optimizations, trace_task, + traceback_clear) + from celery.exceptions import Ignore, Reject, Retry @@ -150,7 +152,60 @@ def add(x, y): with pytest.raises(MemoryError): self.trace(add, (2, 2), {}, eager=False) - def test_when_Ignore(self): + def test_traceback_clear(self): + import inspect, sys + sys.exc_clear = Mock() + frame_list =[] + + def raise_dummy(): + frame_str_temp = str(inspect.currentframe().__repr__) + frame_list.append(frame_str_temp) + local_value = 1214 + raise KeyError('foo') + try: + raise_dummy() + except KeyError as exc: + traceback_clear(exc) + + if sys.version_info >= (3, 5, 0): + tb_ = exc.__traceback__ + while tb_ is not None: + if str(tb_.tb_frame.__repr__) == frame_list[0]: + assert len(tb_.tb_frame.f_locals) == 0 + tb_ = tb_.tb_next + elif (2, 7, 0) <= sys.version_info < (3, 0, 0): + sys.exc_clear.assert_called() + + try: + raise_dummy() + except KeyError as exc: + traceback_clear() + + if sys.version_info >= (3, 5, 0): + tb_ = exc.__traceback__ + while tb_ is not None: + if str(tb_.tb_frame.__repr__) == frame_list[0]: + assert len(tb_.tb_frame.f_locals) == 0 + tb_ = tb_.tb_next + elif (2, 7, 0) <= sys.version_info < (3, 0, 0): + sys.exc_clear.assert_called() + + try: + raise_dummy() + except KeyError as exc: + traceback_clear(str(exc)) + + if sys.version_info >= (3, 5, 0): + tb_ = exc.__traceback__ + while tb_ is not None: + if str(tb_.tb_frame.__repr__) == frame_list[0]: + assert len(tb_.tb_frame.f_locals) == 0 + tb_ = tb_.tb_next + elif (2, 7, 0) <= sys.version_info < (3, 0, 0): + sys.exc_clear.assert_called() + + @patch('celery.app.trace.traceback_clear') + def test_when_Ignore(self, mock_traceback_clear): @self.app.task(shared=False) def ignored(): @@ -158,8 +213,10 @@ def ignored(): retval, info = self.trace(ignored, (), {}) assert info.state == states.IGNORED + mock_traceback_clear.assert_called() - def test_when_Reject(self): + @patch('celery.app.trace.traceback_clear') + def test_when_Reject(self, mock_traceback_clear): @self.app.task(shared=False) def rejecting(): @@ -167,6 +224,7 @@ def rejecting(): retval, info = self.trace(rejecting, (), {}) assert info.state == states.REJECTED + mock_traceback_clear.assert_called() def test_backend_cleanup_raises(self): self.add.backend.process_cleanup = Mock() @@ -248,17 +306,21 @@ def test_trace_SystemExit(self): with pytest.raises(SystemExit): self.trace(self.raises, (SystemExit(),), {}) - def test_trace_Retry(self): + @patch('celery.app.trace.traceback_clear') + def test_trace_Retry(self, mock_traceback_clear): exc = Retry('foo', 'bar') _, info = self.trace(self.raises, (exc,), {}) assert info.state == states.RETRY assert info.retval is exc + mock_traceback_clear.assert_called() - def test_trace_exception(self): + @patch('celery.app.trace.traceback_clear') + def test_trace_exception(self, mock_traceback_clear): exc = KeyError('foo') _, info = self.trace(self.raises, (exc,), {}) assert info.state == states.FAILURE assert info.retval is exc + mock_traceback_clear.assert_called() def test_trace_task_ret__no_content_type(self): _trace_task_ret(