From d2194f143d467b3fbd5b4fe18bc59753bbcb8d01 Mon Sep 17 00:00:00 2001 From: Maor Kleinberger Date: Thu, 31 Mar 2022 14:44:39 +0300 Subject: [PATCH 1/2] Support celery app id with queue workers --- .gitignore | 3 +++ gprofiler/metadata/application_identifiers.py | 8 ++++++ tests/test_appids.py | 25 +++++++++++++++++-- 3 files changed, 34 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index aa1b954f1..4cd3a9d63 100644 --- a/.gitignore +++ b/.gitignore @@ -145,3 +145,6 @@ cython_debug/ # Resources gprofiler/resources/* !gprofiler/resources/flamegraph + +# IDEs +.idea diff --git a/gprofiler/metadata/application_identifiers.py b/gprofiler/metadata/application_identifiers.py index f6ef65e7b..a593c4e5c 100644 --- a/gprofiler/metadata/application_identifiers.py +++ b/gprofiler/metadata/application_identifiers.py @@ -204,6 +204,14 @@ def get_application_name(self, process: Process) -> Optional[str]: app_name = _get_cli_arg_by_name(process.cmdline(), "-A") or _get_cli_arg_by_name( process.cmdline(), "--app", check_for_equals_arg=True ) + if app_name is _NON_AVAILABLE_ARG: + queue_name = _get_cli_arg_by_name(process.cmdline(), "-Q") or _get_cli_arg_by_name( + process.cmdline(), "--queues", check_for_equals_arg=True + ) + # TODO: One worker can handle multiple queues, it could be useful to encode that into the app id. + if queue_name is not _NON_AVAILABLE_ARG: + # The queue handler routing is defined in the directory where the worker is run + return f'celery queue: {queue_name} ({process.cwd()})' if app_name is _NON_AVAILABLE_ARG: _logger.warning( f"{self.__class__.__name__}: Couldn't find positional argument -A or --app for application indication", diff --git a/tests/test_appids.py b/tests/test_appids.py index d716460c5..034c43762 100644 --- a/tests/test_appids.py +++ b/tests/test_appids.py @@ -81,7 +81,7 @@ def get_uwsgi_config(process: Process, config_file: str) -> TextIO: assert "uwsgi: my.ini" == get_application_name(process_with_cmdline(["uwsgi", "a", "b", "--ini", "my.ini"])) -def test_celery() -> None: +def test_celery_with_app() -> None: # celery -A assert f"celery: app1 ({PROCESS_CWD}/app1.py)" == get_application_name( process_with_cmdline(["celery", "a", "b", "-A", "app1"]) @@ -110,7 +110,28 @@ def test_celery() -> None: assert "celery: /path/to/app3 (/path/to/app3.py)" == get_application_name( process_with_cmdline(["celery", "a", "b", "--app=/path/to/app3"]) ) - # No app + + +def test_celery_with_queue() -> None: + # celery -Q + assert f"celery queue: qqq ({PROCESS_CWD})" == get_application_name( + process_with_cmdline(["celery", "a", "b", "-Q", "qqq"]) + ) + # python celery -Q + assert f"celery queue: qqq ({PROCESS_CWD})" == get_application_name( + process_with_cmdline(["python", "/path/to/celery", "a", "b", "-Q", "qqq"]) + ) + # --queues queue + assert f"celery queue: qqq ({PROCESS_CWD})" == get_application_name( + process_with_cmdline(["celery", "a", "b", "--queues", "qqq"]) + ) + # --queues=queue + assert f"celery queue: qqq ({PROCESS_CWD})" == get_application_name( + process_with_cmdline(["celery", "a", "b", "--queues=qqq"]) + ) + + +def test_celery_without_app() -> None: assert get_application_name(process_with_cmdline(["celery", "a", "b"])) is None From b8232affa1be5b9ce9e7d327b68f37d86830eb42 Mon Sep 17 00:00:00 2001 From: Maor Kleinberger Date: Thu, 31 Mar 2022 15:43:23 +0300 Subject: [PATCH 2/2] Support short prefix args and add test case for multiple queues --- gprofiler/metadata/application_identifiers.py | 24 ++++++++++++------- tests/test_appids.py | 15 ++++++++++-- 2 files changed, 28 insertions(+), 11 deletions(-) diff --git a/gprofiler/metadata/application_identifiers.py b/gprofiler/metadata/application_identifiers.py index a593c4e5c..5d6f60b71 100644 --- a/gprofiler/metadata/application_identifiers.py +++ b/gprofiler/metadata/application_identifiers.py @@ -18,7 +18,6 @@ _logger = get_logger_adapter(__name__) - _PYTHON_BIN_RE = re.compile(r"^python([23](\.\d{1,2})?)?$") @@ -47,7 +46,9 @@ def _is_python_bin(bin_name: str) -> bool: return _PYTHON_BIN_RE.match(os.path.basename(bin_name)) is not None -def _get_cli_arg_by_name(args: List[str], arg_name: str, check_for_equals_arg: bool = False) -> str: +def _get_cli_arg_by_name( + args: List[str], arg_name: str, check_for_equals_arg: bool = False, check_for_short_prefix_arg: bool = False +) -> str: if arg_name in args: return args[args.index(arg_name) + 1] @@ -57,6 +58,11 @@ def _get_cli_arg_by_name(args: List[str], arg_name: str, check_for_equals_arg: b if arg_key == arg_name: return arg_val + if check_for_short_prefix_arg: + for arg in args: + if arg.startswith(arg_name): + return arg[len(arg_name) :] + return _NON_AVAILABLE_ARG @@ -201,17 +207,17 @@ def get_application_name(self, process: Process) -> Optional[str]: if not self.is_celery_process(process): return None - app_name = _get_cli_arg_by_name(process.cmdline(), "-A") or _get_cli_arg_by_name( - process.cmdline(), "--app", check_for_equals_arg=True - ) + app_name = _get_cli_arg_by_name( + process.cmdline(), "-A", check_for_short_prefix_arg=True + ) or _get_cli_arg_by_name(process.cmdline(), "--app", check_for_equals_arg=True) if app_name is _NON_AVAILABLE_ARG: - queue_name = _get_cli_arg_by_name(process.cmdline(), "-Q") or _get_cli_arg_by_name( - process.cmdline(), "--queues", check_for_equals_arg=True - ) + queue_name = _get_cli_arg_by_name( + process.cmdline(), "-Q", check_for_short_prefix_arg=True + ) or _get_cli_arg_by_name(process.cmdline(), "--queues", check_for_equals_arg=True) # TODO: One worker can handle multiple queues, it could be useful to encode that into the app id. if queue_name is not _NON_AVAILABLE_ARG: # The queue handler routing is defined in the directory where the worker is run - return f'celery queue: {queue_name} ({process.cwd()})' + return f"celery queue: {queue_name} ({process.cwd()})" if app_name is _NON_AVAILABLE_ARG: _logger.warning( f"{self.__class__.__name__}: Couldn't find positional argument -A or --app for application indication", diff --git a/tests/test_appids.py b/tests/test_appids.py index 034c43762..8987e90bb 100644 --- a/tests/test_appids.py +++ b/tests/test_appids.py @@ -89,6 +89,9 @@ def test_celery_with_app() -> None: assert "celery: /path/to/app1 (/path/to/app1.py)" == get_application_name( process_with_cmdline(["celery", "a", "b", "-A", "/path/to/app1"]) ) + assert "celery: /path/to/app1 (/path/to/app1.py)" == get_application_name( + process_with_cmdline(["celery", "a", "b", "-A/path/to/app1"]) + ) # python celery -A assert f"celery: app1 ({PROCESS_CWD}/app1.py)" == get_application_name( process_with_cmdline(["python", "/path/to/celery", "a", "b", "-A", "app1"]) @@ -113,11 +116,15 @@ def test_celery_with_app() -> None: def test_celery_with_queue() -> None: - # celery -Q + # celery -Q queue assert f"celery queue: qqq ({PROCESS_CWD})" == get_application_name( process_with_cmdline(["celery", "a", "b", "-Q", "qqq"]) ) - # python celery -Q + # celery -Qqueue + assert f"celery queue: qqq ({PROCESS_CWD})" == get_application_name( + process_with_cmdline(["celery", "a", "b", "-Qqqq"]) + ) + # python celery -Q queue assert f"celery queue: qqq ({PROCESS_CWD})" == get_application_name( process_with_cmdline(["python", "/path/to/celery", "a", "b", "-Q", "qqq"]) ) @@ -129,6 +136,10 @@ def test_celery_with_queue() -> None: assert f"celery queue: qqq ({PROCESS_CWD})" == get_application_name( process_with_cmdline(["celery", "a", "b", "--queues=qqq"]) ) + # multiple queues + assert f"celery queue: qqq,www ({PROCESS_CWD})" == get_application_name( + process_with_cmdline(["celery", "a", "b", "-Q", "qqq,www"]) + ) def test_celery_without_app() -> None: