From e98d835a79bef567f75bf82abe092cd76d7e5530 Mon Sep 17 00:00:00 2001 From: Tim Pansino Date: Thu, 2 May 2024 16:30:18 -0700 Subject: [PATCH 01/12] Add new instrumentation for celery --- newrelic/config.py | 15 ++++++++ newrelic/hooks/application_celery.py | 51 ++++++++++++++++++++++------ 2 files changed, 56 insertions(+), 10 deletions(-) diff --git a/newrelic/config.py b/newrelic/config.py index 8e10218f9..bb991255f 100644 --- a/newrelic/config.py +++ b/newrelic/config.py @@ -4367,6 +4367,21 @@ def _process_module_builtin_defaults(): "newrelic.hooks.application_celery", "instrument_celery_app_base", ) + _process_module_definition( + "celery.app.trace", + "newrelic.hooks.application_celery", + "instrument_celery_execute_trace", + ) + _process_module_definition( + "celery.execute.trace", + "newrelic.hooks.application_celery", + "instrument_celery_execute_trace", + ) + _process_module_definition( + "celery.task.trace", + "newrelic.hooks.application_celery", + "instrument_celery_execute_trace", + ) _process_module_definition("billiard.pool", "newrelic.hooks.application_celery", "instrument_billiard_pool") _process_module_definition("flup.server.cgi", "newrelic.hooks.adapter_flup", "instrument_flup_server_cgi") diff --git a/newrelic/hooks/application_celery.py b/newrelic/hooks/application_celery.py index 25a86a4a6..7f8c4e1ff 100644 --- a/newrelic/hooks/application_celery.py +++ b/newrelic/hooks/application_celery.py @@ -35,9 +35,14 @@ MAPPING_TASK_NAMES = {"celery.starmap", "celery.map"} -def task_name(*args, **kwargs): +def task_name(name, instance, *args, **kwargs): + if name: + return name + # Grab the current task, which can be located in either place - if args: + if instance: + task = instance + elif args: task = args[0] elif "task" in kwargs: task = kwargs["task"] @@ -59,14 +64,15 @@ def task_name(*args, **kwargs): return task_name -def CeleryTaskWrapper(wrapped): +def CeleryTaskWrapper(wrapped, name=None, source=None): def wrapper(wrapped, instance, args, kwargs): transaction = current_transaction(active_only=False) - if instance is not None: - _name = task_name(instance, *args, **kwargs) - else: - _name = task_name(*args, **kwargs) + # Grab task name using careful naming logic + _name = task_name(name, instance, *args, **kwargs) + + # Set code level metrics source function + _source = source or instance # A Celery Task can be called either outside of a transaction, or # within the context of an existing transaction. There are 3 @@ -93,17 +99,18 @@ def wrapper(wrapped, instance, args, kwargs): return wrapped(*args, **kwargs) elif transaction: - with FunctionTrace(_name, source=instance): + with FunctionTrace(_name, source=_source): return wrapped(*args, **kwargs) else: - with BackgroundTask(application_instance(), _name, "Celery", source=instance) as transaction: + with BackgroundTask(application_instance(), _name, "Celery", source=_source) as transaction: # Attempt to grab distributed tracing headers try: # Headers on earlier versions of Celery may end up as attributes # on the request context instead of as custom headers. Handler this # by defaulting to using vars() if headers is not available - request = instance.request + task = instance or wrapped + request = task.request headers = getattr(request, "headers", None) or vars(request) settings = transaction.settings @@ -153,6 +160,7 @@ def run(self, *args, **kwargs): wrapped_task = TaskWrapper(wrapped, wrapper) # Reset __module__ to be less transparent so celery detects our monkey-patching wrapped_task.__module__ = CeleryTaskWrapper.__module__ + wrapped_task._nr_wrapped = True return wrapped_task @@ -205,6 +213,29 @@ def instrument_celery_app_base(module): wrap_function_wrapper(module, "Celery.send_task", wrap_Celery_send_task) +def instrument_celery_execute_trace(module): + # Triggered for 'celery.execute_trace'. + + if hasattr(module, "build_tracer"): + # Need to add a wrapper for background task entry point. + + _build_tracer = module.build_tracer + + def build_tracer(name, task, *args, **kwargs): + try: + task = task or module.tasks[name] + + task_cls = type(task) + if not hasattr(task_cls.__call__, "_nr_wrapped"): + task_cls.__call__ = CeleryTaskWrapper(task_cls.__call__, name, source=task.__wrapped__) + except Exception: + pass + + return _build_tracer(name, task, *args, **kwargs) + + module.build_tracer = build_tracer + + def instrument_celery_worker(module): # Triggered for 'celery.worker' and 'celery.concurrency.processes'. From 3ee0c3ad69c7256344205cf0e82cec30690d318e Mon Sep 17 00:00:00 2001 From: Tim Pansino Date: Thu, 2 May 2024 16:44:21 -0700 Subject: [PATCH 02/12] Fix wrapper detection --- newrelic/hooks/application_celery.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/newrelic/hooks/application_celery.py b/newrelic/hooks/application_celery.py index 7f8c4e1ff..1f4875d01 100644 --- a/newrelic/hooks/application_celery.py +++ b/newrelic/hooks/application_celery.py @@ -28,7 +28,7 @@ from newrelic.api.message_trace import MessageTrace from newrelic.api.pre_function import wrap_pre_function from newrelic.api.transaction import current_transaction -from newrelic.common.object_wrapper import FunctionWrapper, wrap_function_wrapper +from newrelic.common.object_wrapper import FunctionWrapper, wrap_function_wrapper, _NRBoundFunctionWrapper from newrelic.core.agent import shutdown_agent UNKNOWN_TASK_NAME = "" @@ -160,7 +160,6 @@ def run(self, *args, **kwargs): wrapped_task = TaskWrapper(wrapped, wrapper) # Reset __module__ to be less transparent so celery detects our monkey-patching wrapped_task.__module__ = CeleryTaskWrapper.__module__ - wrapped_task._nr_wrapped = True return wrapped_task @@ -214,10 +213,7 @@ def instrument_celery_app_base(module): def instrument_celery_execute_trace(module): - # Triggered for 'celery.execute_trace'. - if hasattr(module, "build_tracer"): - # Need to add a wrapper for background task entry point. _build_tracer = module.build_tracer @@ -226,7 +222,7 @@ def build_tracer(name, task, *args, **kwargs): task = task or module.tasks[name] task_cls = type(task) - if not hasattr(task_cls.__call__, "_nr_wrapped"): + if not isinstance(task_cls.__call__, _NRBoundFunctionWrapper): task_cls.__call__ = CeleryTaskWrapper(task_cls.__call__, name, source=task.__wrapped__) except Exception: pass From e33303add8950c13767ab308771d43cb2bd3992b Mon Sep 17 00:00:00 2001 From: Tim Pansino Date: Fri, 3 May 2024 12:34:14 -0700 Subject: [PATCH 03/12] Move instrumentation testing --- tests/application_celery/test_task_methods.py | 13 ------ tests/application_celery/test_wrappers.py | 45 +++++++++++++++++++ 2 files changed, 45 insertions(+), 13 deletions(-) create mode 100644 tests/application_celery/test_wrappers.py diff --git a/tests/application_celery/test_task_methods.py b/tests/application_celery/test_task_methods.py index f1d78f32f..258b488e0 100644 --- a/tests/application_celery/test_task_methods.py +++ b/tests/application_celery/test_task_methods.py @@ -25,19 +25,6 @@ FORGONE_TASK_METRICS = [("Function/_target_application.add", None), ("Function/_target_application.tsum", None)] -def test_task_wrapping_detection(): - """ - Ensure celery detects our monkeypatching properly and will run our instrumentation - on __call__ and runs that instead of micro-optimizing it away to a run() call. - - If this is not working, most other tests in this file will fail as the different ways - of running celery tasks will not all run our instrumentation. - """ - from celery.app.trace import task_has_custom - - assert task_has_custom(add, "__call__") - - @validate_transaction_metrics( name="_target_application.add", group="Celery", diff --git a/tests/application_celery/test_wrappers.py b/tests/application_celery/test_wrappers.py new file mode 100644 index 000000000..1694551a6 --- /dev/null +++ b/tests/application_celery/test_wrappers.py @@ -0,0 +1,45 @@ +# Copyright 2010 New Relic, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from _target_application import add + + +def test_task_wrapping_detection(): + """ + Ensure celery detects our monkeypatching properly and will run our instrumentation + on __call__ and runs that instead of micro-optimizing it away to a run() call. + + If this is not working, most other tests in this file will fail as the different ways + of running celery tasks will not all run our instrumentation. + """ + from celery.app.trace import task_has_custom + + assert task_has_custom(add, "__call__") + + +def test_worker_optimizations_preserve_instrumentation(celery_worker_available): + + from celery.app.trace import setup_worker_optimizations, reset_worker_optimizations + from celery.app.task import BaseTask + from newrelic.common.object_wrapper import _NRBoundFunctionWrapper + + is_instrumented = lambda: isinstance(BaseTask.__call__, _NRBoundFunctionWrapper) + + assert is_instrumented(), "Instrumentation not initially applied." + + setup_worker_optimizations(celery_worker_available.app) + assert is_instrumented(), "setup_worker_optimizations removed instrumentation." + + reset_worker_optimizations() + assert is_instrumented(), "reset_worker_optimizations removed instrumentation." From f098385bfc4b4bb6fc3e27f02d1311c46439604e Mon Sep 17 00:00:00 2001 From: Tim Pansino Date: Fri, 3 May 2024 12:35:01 -0700 Subject: [PATCH 04/12] Add metaprogramming instrumentation --- newrelic/config.py | 5 +++++ newrelic/hooks/application_celery.py | 31 ++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/newrelic/config.py b/newrelic/config.py index bb991255f..140077b52 100644 --- a/newrelic/config.py +++ b/newrelic/config.py @@ -4350,6 +4350,11 @@ def _process_module_builtin_defaults(): "newrelic.hooks.application_celery", "instrument_celery_app_task", ) + _process_module_definition( + "celery.app.trace", + "newrelic.hooks.application_celery", + "instrument_celery_app_trace", + ) _process_module_definition("celery.worker", "newrelic.hooks.application_celery", "instrument_celery_worker") _process_module_definition( "celery.concurrency.processes", diff --git a/newrelic/hooks/application_celery.py b/newrelic/hooks/application_celery.py index 1f4875d01..e6a819e6b 100644 --- a/newrelic/hooks/application_celery.py +++ b/newrelic/hooks/application_celery.py @@ -207,6 +207,28 @@ def wrap_Celery_send_task(wrapped, instance, args, kwargs): return wrapped(*args, **kwargs) +def wrap_worker_optimizations(wrapped, instance, args, kwargs): + # Attempt to uninstrument BaseTask before stack protection is installed or uninstalled + try: + from celery.app.task import BaseTask + + if isinstance(BaseTask.__call__, _NRBoundFunctionWrapper): + BaseTask.__call__ = BaseTask.__call__.__wrapped__ + except Exception: + BaseTask = None + + # task = CeleryTaskWrapper(task, name, source=task.__wrapped__) + + # Allow metaprogramming to run + result = wrapped(*args, **kwargs) + + # Rewrap finalized BaseTask + if BaseTask: # Ensure imports succeeded + BaseTask.__call__ = CeleryTaskWrapper(BaseTask.__call__) + + return result + + def instrument_celery_app_base(module): if hasattr(module, "Celery") and hasattr(module.Celery, "send_task"): wrap_function_wrapper(module, "Celery.send_task", wrap_Celery_send_task) @@ -266,3 +288,12 @@ def force_agent_shutdown(*args, **kwargs): if hasattr(module, "Worker"): wrap_pre_function(module, "Worker._do_exit", force_agent_shutdown) + + +def instrument_celery_app_trace(module): + # Uses same wrapper for setup and reset worker optimizations to prevent patching and unpatching from removing wrappers + if hasattr(module, "setup_worker_optimizations"): + wrap_function_wrapper(module, "setup_worker_optimizations", wrap_worker_optimizations) + + if hasattr(module, "reset_worker_optimizations"): + wrap_function_wrapper(module, "reset_worker_optimizations", wrap_worker_optimizations) From d6bbcfde83f7d52b9f3265b5f78e37bda953b6b1 Mon Sep 17 00:00:00 2001 From: Tim Pansino Date: Fri, 3 May 2024 12:35:37 -0700 Subject: [PATCH 05/12] Remove build_tracer instrumentation again --- newrelic/config.py | 15 --------------- newrelic/hooks/application_celery.py | 20 -------------------- 2 files changed, 35 deletions(-) diff --git a/newrelic/config.py b/newrelic/config.py index 140077b52..bed6b2d3b 100644 --- a/newrelic/config.py +++ b/newrelic/config.py @@ -4372,21 +4372,6 @@ def _process_module_builtin_defaults(): "newrelic.hooks.application_celery", "instrument_celery_app_base", ) - _process_module_definition( - "celery.app.trace", - "newrelic.hooks.application_celery", - "instrument_celery_execute_trace", - ) - _process_module_definition( - "celery.execute.trace", - "newrelic.hooks.application_celery", - "instrument_celery_execute_trace", - ) - _process_module_definition( - "celery.task.trace", - "newrelic.hooks.application_celery", - "instrument_celery_execute_trace", - ) _process_module_definition("billiard.pool", "newrelic.hooks.application_celery", "instrument_billiard_pool") _process_module_definition("flup.server.cgi", "newrelic.hooks.adapter_flup", "instrument_flup_server_cgi") diff --git a/newrelic/hooks/application_celery.py b/newrelic/hooks/application_celery.py index e6a819e6b..4a54c56cb 100644 --- a/newrelic/hooks/application_celery.py +++ b/newrelic/hooks/application_celery.py @@ -234,26 +234,6 @@ def instrument_celery_app_base(module): wrap_function_wrapper(module, "Celery.send_task", wrap_Celery_send_task) -def instrument_celery_execute_trace(module): - if hasattr(module, "build_tracer"): - - _build_tracer = module.build_tracer - - def build_tracer(name, task, *args, **kwargs): - try: - task = task or module.tasks[name] - - task_cls = type(task) - if not isinstance(task_cls.__call__, _NRBoundFunctionWrapper): - task_cls.__call__ = CeleryTaskWrapper(task_cls.__call__, name, source=task.__wrapped__) - except Exception: - pass - - return _build_tracer(name, task, *args, **kwargs) - - module.build_tracer = build_tracer - - def instrument_celery_worker(module): # Triggered for 'celery.worker' and 'celery.concurrency.processes'. From a5f7967ca98646fafbb94abde7d4bfb7559b3848 Mon Sep 17 00:00:00 2001 From: Tim Pansino Date: Fri, 3 May 2024 13:02:02 -0700 Subject: [PATCH 06/12] Fix code level metrics for celery --- newrelic/hooks/application_celery.py | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/newrelic/hooks/application_celery.py b/newrelic/hooks/application_celery.py index 4a54c56cb..5f3fdfab9 100644 --- a/newrelic/hooks/application_celery.py +++ b/newrelic/hooks/application_celery.py @@ -35,10 +35,7 @@ MAPPING_TASK_NAMES = {"celery.starmap", "celery.map"} -def task_name(name, instance, *args, **kwargs): - if name: - return name - +def task_name(instance, *args, **kwargs): # Grab the current task, which can be located in either place if instance: task = instance @@ -64,15 +61,12 @@ def task_name(name, instance, *args, **kwargs): return task_name -def CeleryTaskWrapper(wrapped, name=None, source=None): +def CeleryTaskWrapper(wrapped): def wrapper(wrapped, instance, args, kwargs): transaction = current_transaction(active_only=False) # Grab task name using careful naming logic - _name = task_name(name, instance, *args, **kwargs) - - # Set code level metrics source function - _source = source or instance + _name = task_name(instance, *args, **kwargs) # A Celery Task can be called either outside of a transaction, or # within the context of an existing transaction. There are 3 @@ -99,18 +93,17 @@ def wrapper(wrapped, instance, args, kwargs): return wrapped(*args, **kwargs) elif transaction: - with FunctionTrace(_name, source=_source): + with FunctionTrace(_name, source=instance): return wrapped(*args, **kwargs) else: - with BackgroundTask(application_instance(), _name, "Celery", source=_source) as transaction: + with BackgroundTask(application_instance(), _name, "Celery", source=instance) as transaction: # Attempt to grab distributed tracing headers try: # Headers on earlier versions of Celery may end up as attributes # on the request context instead of as custom headers. Handler this # by defaulting to using vars() if headers is not available - task = instance or wrapped - request = task.request + request = instance.request headers = getattr(request, "headers", None) or vars(request) settings = transaction.settings From d67d074e42cc3e55202809d21729efec27c8b4f6 Mon Sep 17 00:00:00 2001 From: Tim Pansino Date: Fri, 3 May 2024 13:17:43 -0700 Subject: [PATCH 07/12] Cleanup --- newrelic/hooks/application_celery.py | 2 -- tests/application_celery/test_wrappers.py | 23 ++++++++++++----------- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/newrelic/hooks/application_celery.py b/newrelic/hooks/application_celery.py index 5f3fdfab9..2df6ab118 100644 --- a/newrelic/hooks/application_celery.py +++ b/newrelic/hooks/application_celery.py @@ -210,8 +210,6 @@ def wrap_worker_optimizations(wrapped, instance, args, kwargs): except Exception: BaseTask = None - # task = CeleryTaskWrapper(task, name, source=task.__wrapped__) - # Allow metaprogramming to run result = wrapped(*args, **kwargs) diff --git a/tests/application_celery/test_wrappers.py b/tests/application_celery/test_wrappers.py index 1694551a6..1bca1b436 100644 --- a/tests/application_celery/test_wrappers.py +++ b/tests/application_celery/test_wrappers.py @@ -14,6 +14,13 @@ from _target_application import add +import celery + +from newrelic.common.object_wrapper import _NRBoundFunctionWrapper + + +FORGONE_TASK_METRICS = [("Function/_target_application.add", None), ("Function/_target_application.tsum", None)] + def test_task_wrapping_detection(): """ @@ -23,23 +30,17 @@ def test_task_wrapping_detection(): If this is not working, most other tests in this file will fail as the different ways of running celery tasks will not all run our instrumentation. """ - from celery.app.trace import task_has_custom - - assert task_has_custom(add, "__call__") + assert celery.app.trace.task_has_custom(add, "__call__") def test_worker_optimizations_preserve_instrumentation(celery_worker_available): + is_instrumented = lambda: isinstance(celery.app.task.BaseTask.__call__, _NRBoundFunctionWrapper) - from celery.app.trace import setup_worker_optimizations, reset_worker_optimizations - from celery.app.task import BaseTask - from newrelic.common.object_wrapper import _NRBoundFunctionWrapper - - is_instrumented = lambda: isinstance(BaseTask.__call__, _NRBoundFunctionWrapper) - + celery.app.trace.reset_worker_optimizations() assert is_instrumented(), "Instrumentation not initially applied." - setup_worker_optimizations(celery_worker_available.app) + celery.app.trace.setup_worker_optimizations(celery_worker_available.app) assert is_instrumented(), "setup_worker_optimizations removed instrumentation." - reset_worker_optimizations() + celery.app.trace.reset_worker_optimizations() assert is_instrumented(), "reset_worker_optimizations removed instrumentation." From fb5cbbd1d3a5d432e7de615c0cc784075e02ebfe Mon Sep 17 00:00:00 2001 From: Tim Pansino Date: Fri, 3 May 2024 13:18:08 -0700 Subject: [PATCH 08/12] Add code level metrics tests to all task methods --- tests/application_celery/test_task_methods.py | 42 +++++++++++++++++-- 1 file changed, 38 insertions(+), 4 deletions(-) diff --git a/tests/application_celery/test_task_methods.py b/tests/application_celery/test_task_methods.py index 258b488e0..d95e00294 100644 --- a/tests/application_celery/test_task_methods.py +++ b/tests/application_celery/test_task_methods.py @@ -12,9 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +import pytest from _target_application import add, tsum -from celery import chain, chord, group +from testing_support.validators.validate_code_level_metrics import ( + validate_code_level_metrics, +) from testing_support.validators.validate_transaction_count import ( validate_transaction_count, ) @@ -22,9 +25,21 @@ validate_transaction_metrics, ) +import celery + + FORGONE_TASK_METRICS = [("Function/_target_application.add", None), ("Function/_target_application.tsum", None)] +@pytest.fixture(scope="function", autouse=True, params=[False, True]) +def with_worker_optimizations(request, celery_worker_available): + if request.param: + celery.app.trace.setup_worker_optimizations(celery_worker_available.app) + + yield request.param + celery.app.trace.reset_worker_optimizations() + + @validate_transaction_metrics( name="_target_application.add", group="Celery", @@ -32,6 +47,7 @@ rollup_metrics=FORGONE_TASK_METRICS, background_task=True, ) +@validate_code_level_metrics("_target_application", "add") @validate_transaction_count(1) def test_celery_task_call(): """ @@ -48,6 +64,7 @@ def test_celery_task_call(): rollup_metrics=FORGONE_TASK_METRICS, background_task=True, ) +@validate_code_level_metrics("_target_application", "add") @validate_transaction_count(1) def test_celery_task_apply(): """ @@ -65,6 +82,7 @@ def test_celery_task_apply(): rollup_metrics=FORGONE_TASK_METRICS, background_task=True, ) +@validate_code_level_metrics("_target_application", "add") @validate_transaction_count(1) def test_celery_task_delay(): """ @@ -82,6 +100,7 @@ def test_celery_task_delay(): rollup_metrics=FORGONE_TASK_METRICS, background_task=True, ) +@validate_code_level_metrics("_target_application", "add") @validate_transaction_count(1) def test_celery_task_apply_async(): """ @@ -99,6 +118,7 @@ def test_celery_task_apply_async(): rollup_metrics=FORGONE_TASK_METRICS, background_task=True, ) +@validate_code_level_metrics("_target_application", "add") @validate_transaction_count(1) def test_celery_app_send_task(celery_session_app): """ @@ -116,6 +136,7 @@ def test_celery_app_send_task(celery_session_app): rollup_metrics=FORGONE_TASK_METRICS, background_task=True, ) +@validate_code_level_metrics("_target_application", "add") @validate_transaction_count(1) def test_celery_task_signature(): """ @@ -141,6 +162,8 @@ def test_celery_task_signature(): background_task=True, index=-2, ) +@validate_code_level_metrics("_target_application", "add") +@validate_code_level_metrics("_target_application", "add", index=-2) @validate_transaction_count(2) def test_celery_task_link(): """ @@ -166,12 +189,14 @@ def test_celery_task_link(): background_task=True, index=-2, ) +@validate_code_level_metrics("_target_application", "add") +@validate_code_level_metrics("_target_application", "add", index=-2) @validate_transaction_count(2) def test_celery_chain(): """ Executes multiple tasks on worker process and returns an AsyncResult. """ - result = chain(add.s(3, 4), add.s(5))() + result = celery.chain(add.s(3, 4), add.s(5))() result = result.get() assert result == 12 @@ -192,12 +217,14 @@ def test_celery_chain(): background_task=True, index=-2, ) +@validate_code_level_metrics("_target_application", "add") +@validate_code_level_metrics("_target_application", "add", index=-2) @validate_transaction_count(2) def test_celery_group(): """ Executes multiple tasks on worker process and returns an AsyncResult. """ - result = group(add.s(3, 4), add.s(1, 2))() + result = celery.group(add.s(3, 4), add.s(1, 2))() result = result.get() assert result == [7, 3] @@ -225,12 +252,15 @@ def test_celery_group(): background_task=True, index=-3, ) +@validate_code_level_metrics("_target_application", "tsum") +@validate_code_level_metrics("_target_application", "add", index=-2) +@validate_code_level_metrics("_target_application", "add", index=-3) @validate_transaction_count(3) def test_celery_chord(): """ Executes 2 add tasks, followed by a tsum task on the worker process and returns an AsyncResult. """ - result = chord([add.s(3, 4), add.s(1, 2)])(tsum.s()) + result = celery.chord([add.s(3, 4), add.s(1, 2)])(tsum.s()) result = result.get() assert result == 10 @@ -242,6 +272,7 @@ def test_celery_chord(): rollup_metrics=[("Function/_target_application.tsum", 2)], background_task=True, ) +@validate_code_level_metrics("_target_application", "tsum") @validate_transaction_count(1) def test_celery_task_map(): """ @@ -259,6 +290,7 @@ def test_celery_task_map(): rollup_metrics=[("Function/_target_application.add", 2)], background_task=True, ) +@validate_code_level_metrics("_target_application", "tsum") @validate_transaction_count(1) def test_celery_task_starmap(): """ @@ -284,6 +316,8 @@ def test_celery_task_starmap(): background_task=True, index=-2, ) +@validate_code_level_metrics("_target_application", "add") +@validate_code_level_metrics("_target_application", "add", index=-2) @validate_transaction_count(2) def test_celery_task_chunks(): """ From 891494d3b7a7ae817fa5a4e2f9f20e2c9f1e825c Mon Sep 17 00:00:00 2001 From: Tim Pansino Date: Fri, 3 May 2024 14:39:21 -0700 Subject: [PATCH 09/12] Wire in CLM source to task name lookup --- newrelic/hooks/application_celery.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/newrelic/hooks/application_celery.py b/newrelic/hooks/application_celery.py index 2df6ab118..f3261289d 100644 --- a/newrelic/hooks/application_celery.py +++ b/newrelic/hooks/application_celery.py @@ -35,7 +35,7 @@ MAPPING_TASK_NAMES = {"celery.starmap", "celery.map"} -def task_name(instance, *args, **kwargs): +def task_info(instance, *args, **kwargs): # Grab the current task, which can be located in either place if instance: task = instance @@ -48,6 +48,7 @@ def task_name(instance, *args, **kwargs): # Task can be either a task instance or a signature, which subclasses dict, or an actual dict in some cases. task_name = getattr(task, "name", None) or task.get("task", UNKNOWN_TASK_NAME) + task_source = task # Under mapping tasks, the root task name isn't descriptive enough so we append the # subtask name to differentiate between different mapping tasks @@ -55,18 +56,19 @@ def task_name(instance, *args, **kwargs): try: subtask = kwargs["task"]["task"] task_name = "/".join((task_name, subtask)) + task_source = task.app._tasks[subtask] except Exception: pass - return task_name + return task_name, task_source def CeleryTaskWrapper(wrapped): def wrapper(wrapped, instance, args, kwargs): transaction = current_transaction(active_only=False) - # Grab task name using careful naming logic - _name = task_name(instance, *args, **kwargs) + # Grab task name and source + _name, _source = task_info(instance, *args, **kwargs) # A Celery Task can be called either outside of a transaction, or # within the context of an existing transaction. There are 3 @@ -93,11 +95,11 @@ def wrapper(wrapped, instance, args, kwargs): return wrapped(*args, **kwargs) elif transaction: - with FunctionTrace(_name, source=instance): + with FunctionTrace(_name, source=_source): return wrapped(*args, **kwargs) else: - with BackgroundTask(application_instance(), _name, "Celery", source=instance) as transaction: + with BackgroundTask(application_instance(), _name, "Celery", source=_source) as transaction: # Attempt to grab distributed tracing headers try: # Headers on earlier versions of Celery may end up as attributes From 691498c33f91b9c8f779da191518ab1b1622a0a5 Mon Sep 17 00:00:00 2001 From: Tim Pansino Date: Fri, 3 May 2024 15:35:50 -0700 Subject: [PATCH 10/12] Fix bug in validate_transaction_count --- .../validators/validate_transaction_count.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/testing_support/validators/validate_transaction_count.py b/tests/testing_support/validators/validate_transaction_count.py index 3ceea7725..a5c72e261 100644 --- a/tests/testing_support/validators/validate_transaction_count.py +++ b/tests/testing_support/validators/validate_transaction_count.py @@ -17,18 +17,22 @@ def validate_transaction_count(count): - _transactions = [] + transactions = [] @transient_function_wrapper('newrelic.core.stats_engine', 'StatsEngine.record_transaction') def _increment_count(wrapped, instance, args, kwargs): - _transactions.append(getattr(args[0], "name", True)) + transactions.append(getattr(args[0], "name", True)) return wrapped(*args, **kwargs) @function_wrapper def _validate_transaction_count(wrapped, instance, args, kwargs): _new_wrapped = _increment_count(wrapped) result = _new_wrapped(*args, **kwargs) + + _transactions = list(transactions) + transactions.clear() + assert count == len(_transactions), (count, len(_transactions), _transactions) return result From ca5dbe8dd53af985e0e3bcd434a36c77d9064ea9 Mon Sep 17 00:00:00 2001 From: Tim Pansino Date: Fri, 3 May 2024 15:37:26 -0700 Subject: [PATCH 11/12] Module scoped fixture --- tests/application_celery/test_task_methods.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/application_celery/test_task_methods.py b/tests/application_celery/test_task_methods.py index d95e00294..509129b09 100644 --- a/tests/application_celery/test_task_methods.py +++ b/tests/application_celery/test_task_methods.py @@ -31,11 +31,11 @@ FORGONE_TASK_METRICS = [("Function/_target_application.add", None), ("Function/_target_application.tsum", None)] -@pytest.fixture(scope="function", autouse=True, params=[False, True]) +@pytest.fixture(scope="module", autouse=True, params=[False, True], ids=["unpatched", "patched"]) def with_worker_optimizations(request, celery_worker_available): if request.param: celery.app.trace.setup_worker_optimizations(celery_worker_available.app) - + yield request.param celery.app.trace.reset_worker_optimizations() @@ -272,7 +272,7 @@ def test_celery_chord(): rollup_metrics=[("Function/_target_application.tsum", 2)], background_task=True, ) -@validate_code_level_metrics("_target_application", "tsum") +@validate_code_level_metrics("_target_application", "tsum", count=3) @validate_transaction_count(1) def test_celery_task_map(): """ @@ -290,7 +290,7 @@ def test_celery_task_map(): rollup_metrics=[("Function/_target_application.add", 2)], background_task=True, ) -@validate_code_level_metrics("_target_application", "tsum") +@validate_code_level_metrics("_target_application", "add", count=3) @validate_transaction_count(1) def test_celery_task_starmap(): """ @@ -316,8 +316,8 @@ def test_celery_task_starmap(): background_task=True, index=-2, ) -@validate_code_level_metrics("_target_application", "add") -@validate_code_level_metrics("_target_application", "add", index=-2) +@validate_code_level_metrics("_target_application", "add", count=2) +@validate_code_level_metrics("_target_application", "add", count=2, index=-2) @validate_transaction_count(2) def test_celery_task_chunks(): """ From 383a2cf610ec370acd1714cd5515c7ff283547c5 Mon Sep 17 00:00:00 2001 From: Tim Pansino Date: Tue, 7 May 2024 11:09:16 -0700 Subject: [PATCH 12/12] Fix python 2 syntax --- tests/testing_support/validators/validate_transaction_count.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/testing_support/validators/validate_transaction_count.py b/tests/testing_support/validators/validate_transaction_count.py index a5c72e261..ffd4567cf 100644 --- a/tests/testing_support/validators/validate_transaction_count.py +++ b/tests/testing_support/validators/validate_transaction_count.py @@ -31,7 +31,7 @@ def _validate_transaction_count(wrapped, instance, args, kwargs): result = _new_wrapped(*args, **kwargs) _transactions = list(transactions) - transactions.clear() + del transactions[:] # Clear list for subsequent test runs assert count == len(_transactions), (count, len(_transactions), _transactions)