Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure that metrics are gathered during abnormal exits #11793

Merged
merged 2 commits into from Mar 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
46 changes: 27 additions & 19 deletions src/python/pants/bin/local_pants_runner.py
Expand Up @@ -4,6 +4,7 @@
from __future__ import annotations

import logging
import sys
from dataclasses import dataclass
from typing import Optional, Tuple

Expand Down Expand Up @@ -217,14 +218,28 @@ def _get_workunits_callbacks(self) -> Tuple[WorkunitsCallback, ...]:
)
return tuple(wcf.callback_factory() for wcf in workunits_callback_factories)

def run(self, start_time: float) -> ExitCode:
spec_parser = SpecsParser(get_buildroot())
specs = [str(spec_parser.parse_spec(spec)) for spec in self.options.specs]
self.run_tracker.start(run_start_time=start_time, specs=specs)
def _run_inner(self) -> ExitCode:
goals = tuple(self.options.goals)
if self.options.help_request:
return self._print_help(self.options.help_request)
if not goals:
return PANTS_SUCCEEDED_EXIT_CODE

try:
return self._perform_run(goals)
except Exception as e:
ExceptionSink.log_exception(e)
return PANTS_FAILED_EXIT_CODE
except KeyboardInterrupt:
print("Interrupted by user.\n", file=sys.stderr)
return PANTS_FAILED_EXIT_CODE

def run(self, start_time: float) -> ExitCode:
with maybe_profiled(self.profile_path):
spec_parser = SpecsParser(get_buildroot())
specs = [str(spec_parser.parse_spec(spec)) for spec in self.options.specs]
self.run_tracker.start(run_start_time=start_time, specs=specs)
global_options = self.options.for_global_scope()
goals = tuple(self.options.goals)

streaming_reporter = StreamingWorkunitHandler(
self.graph_session.scheduler_session,
Expand All @@ -236,19 +251,12 @@ def run(self, start_time: float) -> ExitCode:
pantsd=global_options.pantsd,
)
with streaming_reporter:
if self.options.help_request:
return self._print_help(self.options.help_request)
if not goals:
return PANTS_SUCCEEDED_EXIT_CODE

engine_result = PANTS_FAILED_EXIT_CODE
try:
engine_result = self._perform_run(goals)
except Exception as e:
ExceptionSink.log_exception(e)
engine_result = PANTS_FAILED_EXIT_CODE

metrics = self.graph_session.scheduler_session.metrics()
self.run_tracker.set_pantsd_scheduler_metrics(metrics)
self.run_tracker.end_run(engine_result)
engine_result = self._run_inner()
finally:
metrics = self.graph_session.scheduler_session.metrics()
self.run_tracker.set_pantsd_scheduler_metrics(metrics)
self.run_tracker.end_run(engine_result)

return engine_result
return engine_result
@@ -1,23 +1,67 @@
# Copyright 2020 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).

from __future__ import annotations

import os
import signal
from typing import List, Mapping, Tuple

from workunit_logger.register import FINISHED_SUCCESSFULLY

from pants.testutil.pants_integration_test import run_pants, setup_tmpdir
from pants.testutil.pants_integration_test import (
PantsResult,
run_pants,
setup_tmpdir,
temporary_workdir,
)
from pants.util.dirutil import maybe_read_file
from pants_test.pantsd.pantsd_integration_test_base import attempts, launch_waiter


def workunit_logger_config(log_dest: str) -> Mapping:
return {
"GLOBAL": {
"backend_packages.add": ["workunit_logger", "pants.backend.python"],
},
"workunit-logger": {"dest": log_dest},
}

def test_workunits_logger() -> None:

def run(args: List[str], success: bool = True) -> Tuple[PantsResult, str | None]:
with setup_tmpdir({}) as tmpdir:
dest = os.path.join(tmpdir, "dest.log")
pants_run = run_pants(
[
"--backend-packages=+['workunit_logger','pants.backend.python']",
f"--workunit-logger-dest={dest}",
"list",
"3rdparty::",
]
)
pants_run.assert_success()
# Assert that the file was created and non-empty.
assert maybe_read_file(dest)
pants_run = run_pants(args, config=workunit_logger_config(dest))
log_content = maybe_read_file(dest)
if success:
pants_run.assert_success()
assert log_content
assert FINISHED_SUCCESSFULLY in log_content
else:
pants_run.assert_failure()
return pants_run, log_content


def test_list() -> None:
run(["list", "3rdparty::"])


def test_help() -> None:
run(["help"])
run(["--version"])


def test_ctrl_c() -> None:
with temporary_workdir() as workdir:
dest = os.path.join(workdir, "dest.log")

# Start a pantsd run that will wait forever, then kill the pantsd client.
client_handle, _, _ = launch_waiter(workdir=workdir, config=workunit_logger_config(dest))
client_pid = client_handle.process.pid
os.kill(client_pid, signal.SIGINT)

# Confirm that finish is still called (even though it may be backgrounded in the server).
for _ in attempts("The log should eventually show that the SWH shut down."):
content = maybe_read_file(dest)
if content and FINISHED_SUCCESSFULLY in content:
break
16 changes: 13 additions & 3 deletions src/python/pants/option/config.py
Expand Up @@ -620,10 +620,20 @@ def normalize(self) -> Dict:
for section, section_values in self.parsed.items():
# With TOML, we store dict values as strings to avoid ambiguity between sections/option
# scopes vs. dict values.
section_values = {
option: str(option_value) if isinstance(option_value, dict) else option_value
def normalize_section_value(option, option_value) -> Tuple[str, Any]:
option_value = str(option_value) if isinstance(option_value, dict) else option_value
if option.endswith(".add"):
option = option.rsplit(".", 1)[0]
option_value = f"+{option_value!r}"
elif option.endswith(".remove"):
option = option.rsplit(".", 1)[0]
option_value = f"-{option_value!r}"
return option, option_value

section_values = dict(
normalize_section_value(option, option_value)
for option, option_value in section_values.items()
}
)

def add_section_values(
section_component: str,
Expand Down
13 changes: 13 additions & 0 deletions src/python/pants/option/config_test.py
Expand Up @@ -281,3 +281,16 @@ def test_toml_serializer() -> None:
"cache": {"java": {"o": ""}},
"inception": {"nested": {"nested-again": {"one-more": {"o": ""}}}},
}


def test_toml_serializer_add_remove() -> None:
original_values: Dict = {
"GLOBAL": {
"backend_packages.add": ["added"],
},
}
assert TomlSerializer(original_values).normalize() == {
"GLOBAL": {
"backend_packages": "+['added']",
},
}
22 changes: 19 additions & 3 deletions testprojects/pants-plugins/src/python/workunit_logger/register.py
@@ -1,19 +1,26 @@
# Copyright 2020 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).

import logging
from dataclasses import dataclass
from typing import Tuple

from pants.engine.internals.scheduler import Workunit
from pants.engine.rules import collect_rules, rule
from pants.engine.streaming_workunit_handler import (
StreamingWorkunitContext,
WorkunitsCallback,
WorkunitsCallbackFactory,
WorkunitsCallbackFactoryRequest,
)
from pants.engine.unions import UnionRule
from pants.option.subsystem import Subsystem

logger = logging.getLogger(__name__)


FINISHED_SUCCESSFULLY = "$Finished Successfully$"


class WorkunitsLoggerOptions(Subsystem):
options_scope = "workunit-logger"
Expand All @@ -34,12 +41,21 @@ class WorkunitsLogger(WorkunitsCallback):

@property
def can_finish_async(self) -> bool:
# Because we don't write to the console, it's safe to finalize in the background.
return True
# We'd like to synchronously fail the run on the final call if need be.
return False

def __call__(self, *, completed_workunits: Tuple[Workunit, ...], **kwargs) -> None:
def __call__(
self,
*,
completed_workunits: Tuple[Workunit, ...],
finished: bool,
context: StreamingWorkunitContext,
**kwargs
) -> None:
with open(self.dest, "a") as dest:
print(str(completed_workunits), file=dest)
if finished and context.run_tracker.has_ended():
print(FINISHED_SUCCESSFULLY, file=dest)


@rule
Expand Down
4 changes: 3 additions & 1 deletion tests/python/pants_test/pantsd/BUILD
Expand Up @@ -8,14 +8,16 @@ python_tests(

python_library(
name = 'pantsd_integration_test_base',
dependencies = [
'testprojects/src/python:coordinated_runs_directory',
],
)

python_integration_tests(
name = 'pantsd_integration',
dependencies = [
'testprojects/src/python:hello_directory',
'testprojects/src/python:bad_requirements_directory',
'testprojects/src/python:coordinated_runs_directory',
'testprojects/src/python:print_env_directory',
],
uses_pants_run=True,
Expand Down
75 changes: 8 additions & 67 deletions tests/python/pants_test/pantsd/pantsd_integration_test.py
Expand Up @@ -8,28 +8,18 @@
import time
import unittest
from textwrap import dedent
from typing import List, Optional, Tuple
from typing import List, Optional

import psutil
import pytest

from pants.testutil.pants_integration_test import (
PantsJoinHandle,
read_pants_log,
setup_tmpdir,
temporary_workdir,
)
from pants.testutil.pants_integration_test import read_pants_log, setup_tmpdir, temporary_workdir
from pants.util.contextutil import environment_as, temporary_dir, temporary_file
from pants.util.dirutil import (
maybe_read_file,
rm_rf,
safe_file_dump,
safe_mkdir,
safe_open,
safe_rmtree,
touch,
from pants.util.dirutil import rm_rf, safe_file_dump, safe_mkdir, safe_open, safe_rmtree, touch
from pants_test.pantsd.pantsd_integration_test_base import (
PantsDaemonIntegrationTestBase,
launch_waiter,
)
from pants_test.pantsd.pantsd_integration_test_base import PantsDaemonIntegrationTestBase, attempts


def launch_file_toucher(f):
Expand Down Expand Up @@ -440,55 +430,6 @@ def test_pantsd_parse_exception_success(self):
finally:
rm_rf(test_path)

@unittest.skip("TODO https://github.com/pantsbuild/pants/issues/7654")
def test_pantsd_multiple_parallel_runs(self):
with self.pantsd_test_context() as (workdir, config, checker):
file_to_make = os.path.join(workdir, "some_magic_file")
waiter_handle = self.run_pants_with_workdir_without_waiting(
["run", "testprojects/src/python/coordinated_runs:waiter", "--", file_to_make],
workdir=workdir,
config=config,
)

checker.assert_started()

creator_handle = self.run_pants_with_workdir_without_waiting(
["run", "testprojects/src/python/coordinated_runs:creator", "--", file_to_make],
workdir=workdir,
config=config,
)

creator_handle.join().assert_success()
waiter_handle.join().assert_success()

@classmethod
def _launch_waiter(cls, workdir: str, config) -> Tuple[PantsJoinHandle, int, str]:
"""Launch a process via pantsd that will wait forever for the a file to be created.

Returns the pid of the pantsd client, the pid of the waiting child process, and the file to
create to cause the waiting child to exit.
"""
file_to_make = os.path.join(workdir, "some_magic_file")
waiter_pid_file = os.path.join(workdir, "pid_file")

argv = [
"run",
"testprojects/src/python/coordinated_runs:waiter",
"--",
file_to_make,
waiter_pid_file,
]
client_handle = cls.run_pants_with_workdir_without_waiting(
argv, workdir=workdir, config=config
)
waiter_pid = -1
for _ in attempts("The waiter process should have written its pid."):
waiter_pid_str = maybe_read_file(waiter_pid_file)
if waiter_pid_str:
waiter_pid = int(waiter_pid_str)
break
return client_handle, waiter_pid, file_to_make

def _assert_pantsd_keyboardinterrupt_signal(
self, signum: int, regexps: Optional[List[str]] = None
):
Expand All @@ -498,7 +439,7 @@ def _assert_pantsd_keyboardinterrupt_signal(
:param regexps: Assert that all of these regexps match somewhere in stderr.
"""
with self.pantsd_test_context() as (workdir, config, checker):
client_handle, waiter_process_pid, _ = self._launch_waiter(workdir, config)
client_handle, waiter_process_pid, _ = launch_waiter(workdir=workdir, config=config)
client_pid = client_handle.process.pid
waiter_process = psutil.Process(waiter_process_pid)

Expand Down Expand Up @@ -531,7 +472,7 @@ def test_sigint_kills_request_waiting_for_lock(self):
config = {"GLOBAL": {"pantsd_timeout_when_multiple_invocations": -1, "level": "debug"}}
with self.pantsd_test_context(extra_config=config) as (workdir, config, checker):
# Run a process that will wait forever.
first_run_handle, _, file_to_create = self._launch_waiter(workdir, config)
first_run_handle, _, file_to_create = launch_waiter(workdir=workdir, config=config)

checker.assert_started()
checker.assert_running()
Expand Down