Skip to content

Commit

Permalink
Refactor StreamingWorkunitHandler to be a class-based context manag…
Browse files Browse the repository at this point in the history
…er (#11685)

Before, it was possible to call `StreamingWorkunitHandler.session()` multiple times, meaning we'd be starting and stopping the same threads during the same Pants run. While this wasn't done in practice beyond a test, it complicates future changes and is an inaccurate representation of what we expect.

Even though it is still possible to do this with a class-based context manager, the class better expresses the intention than a reusable method. Further, this change will result in a RuntimeException if the user does use the context manager twice.

[ci skip-rust]
[ci skip-build-wheels]
  • Loading branch information
Eric-Arellano committed Mar 12, 2021
1 parent 1902297 commit 0f27e41
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 98 deletions.
14 changes: 7 additions & 7 deletions src/python/pants/bin/local_pants_runner.py
Expand Up @@ -225,9 +225,7 @@ def run(self, start_time: float) -> ExitCode:

with maybe_profiled(self.profile_path):
global_options = self.options.for_global_scope()

if self.options.help_request:
return self._print_help(self.options.help_request)
goals = tuple(self.options.goals)

streaming_reporter = StreamingWorkunitHandler(
self.graph_session.scheduler_session,
Expand All @@ -236,17 +234,19 @@ def run(self, start_time: float) -> ExitCode:
options_bootstrapper=self.options_bootstrapper,
callbacks=self._get_workunits_callbacks(),
report_interval_seconds=global_options.streaming_workunits_report_interval,
pantsd=global_options.pantsd,
)

goals = tuple(self.options.goals)
with streaming_reporter.session(pantsd=global_options.pantsd):
with streaming_reporter:
if self.options.help_request:
return self._print_help(self.options.help_request)
if not goals:
return PANTS_SUCCEEDED_EXIT_CODE
engine_result = PANTS_FAILED_EXIT_CODE

try:
engine_result = self._perform_run(goals)
except Exception as e:
ExceptionSink.log_exception(e)
engine_result = PANTS_FAILED_EXIT_CODE

metrics = self.graph_session.scheduler_session.metrics()
self.run_tracker.set_pantsd_scheduler_metrics(metrics)
Expand Down
82 changes: 35 additions & 47 deletions src/python/pants/engine/internals/engine_test.py
Expand Up @@ -304,7 +304,6 @@ def _fixture_for_rules(
self, rules, max_workunit_verbosity: LogLevel = LogLevel.INFO
) -> Tuple[SchedulerSession, WorkunitTracker, StreamingWorkunitHandler]:
scheduler = self.mk_scheduler(rules, include_trace_on_error=False)

tracker = WorkunitTracker()
handler = StreamingWorkunitHandler(
scheduler,
Expand All @@ -314,40 +313,39 @@ def _fixture_for_rules(
max_workunit_verbosity=max_workunit_verbosity,
specs=Specs.empty(),
options_bootstrapper=create_options_bootstrapper([]),
pantsd=False,
)
return scheduler, tracker, handler

def test_streaming_workunits_reporting(self):
scheduler, tracker, handler = self._fixture_for_rules([fib, QueryRule(Fib, (int,))])
with handler.session(pantsd=False):
with handler:
scheduler.product_request(Fib, subjects=[0])

flattened = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks))
# The execution of the single named @rule "fib" should be providing this one workunit.
self.assertEqual(len(flattened), 1)
assert len(flattened) == 1

tracker.finished_workunit_chunks = []
with handler.session(pantsd=False):
scheduler, tracker, handler = self._fixture_for_rules([fib, QueryRule(Fib, (int,))])
with handler:
scheduler.product_request(Fib, subjects=[10])

# Requesting a bigger fibonacci number will result in more rule executions and thus more reported workunits.
# In this case, we expect 10 invocations of the `fib` rule.
# Requesting a bigger fibonacci number will result in more rule executions and thus
# more reported workunits. In this case, we expect 11 invocations of the `fib` rule.
flattened = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks))
assert len(flattened) == 10
assert len(flattened) == 11
assert tracker.finished

def test_streaming_workunits_parent_id_and_rule_metadata(self):
scheduler, tracker, handler = self._fixture_for_rules(
[rule_one_function, rule_two, rule_three, rule_four, QueryRule(Beta, (Input,))]
)

with handler.session(pantsd=False):
with handler:
i = Input()
scheduler.product_request(Beta, subjects=[i])

assert tracker.finished

# rule_one should complete well-after the other rules because of the artificial delay in it caused by the sleep().
# rule_one should complete well-after the other rules because of the artificial delay in
# it caused by the sleep().
assert {item["name"] for item in tracker.finished_workunit_chunks[0]} == {
"pants.engine.internals.engine_test.rule_two",
"pants.engine.internals.engine_test.rule_three",
Expand Down Expand Up @@ -382,7 +380,8 @@ def test_streaming_workunits_parent_id_and_rule_metadata(self):
if item["name"] == "pants.engine.internals.engine_test.rule_four"
)

# rule_one should have no parent_id because its actual parent workunit was filted based on level
# rule_one should have no parent_id because its actual parent workunit was filtered based
# on level.
assert r1.get("parent_id", None) is None

assert r2["parent_id"] == r1["span_id"]
Expand All @@ -398,15 +397,15 @@ def test_streaming_workunit_log_levels(self) -> None:
[rule_one_function, rule_two, rule_three, rule_four, QueryRule(Beta, (Input,))],
max_workunit_verbosity=LogLevel.TRACE,
)

with handler.session(pantsd=False):
with handler:
i = Input()
scheduler.product_request(Beta, subjects=[i])

assert tracker.finished
finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks))

# With the max_workunit_verbosity set to TRACE, we should see the workunit corresponding to the Select node.
# With the max_workunit_verbosity set to TRACE, we should see the workunit corresponding
# to the Select node.
select = next(
item
for item in finished
Expand All @@ -428,8 +427,7 @@ def test_streaming_workunit_log_level_parent_rewrite(self) -> None:
rules = [rule_A, rule_B, rule_C, QueryRule(Alpha, (Input,))]

scheduler, tracker, info_level_handler = self._fixture_for_rules(rules)

with info_level_handler.session(pantsd=False):
with info_level_handler:
i = Input()
scheduler.product_request(Alpha, subjects=[i])

Expand All @@ -449,8 +447,7 @@ def test_streaming_workunit_log_level_parent_rewrite(self) -> None:
scheduler, tracker, debug_level_handler = self._fixture_for_rules(
rules, max_workunit_verbosity=LogLevel.TRACE
)

with debug_level_handler.session(pantsd=False):
with debug_level_handler:
i = Input()
scheduler.product_request(Alpha, subjects=[i])

Expand Down Expand Up @@ -485,8 +482,7 @@ def a_rule(n: int) -> ModifiedOutput:
scheduler, tracker, handler = self._fixture_for_rules(
[a_rule, QueryRule(ModifiedOutput, (int,))], max_workunit_verbosity=LogLevel.TRACE
)

with handler.session(pantsd=False):
with handler:
scheduler.product_request(ModifiedOutput, subjects=[0])

finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks))
Expand All @@ -513,8 +509,7 @@ def a_rule(n: int) -> ModifiedOutput:
scheduler, tracker, handler = self._fixture_for_rules(
[a_rule, QueryRule(ModifiedOutput, (int,))], max_workunit_verbosity=LogLevel.TRACE
)

with handler.session(pantsd=False):
with handler:
scheduler.product_request(ModifiedOutput, subjects=[0])

finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks))
Expand All @@ -538,8 +533,7 @@ def a_rule(n: int) -> Output:
scheduler, tracker, handler = self._fixture_for_rules(
[a_rule, QueryRule(Output, (int,))], max_workunit_verbosity=LogLevel.TRACE
)

with handler.session(pantsd=False):
with handler:
scheduler.product_request(Output, subjects=[0])

finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks))
Expand All @@ -564,8 +558,7 @@ def a_rule(n: int) -> Output:
scheduler, tracker, handler = self._fixture_for_rules(
[a_rule, QueryRule(Output, (int,))], max_workunit_verbosity=LogLevel.TRACE
)

with handler.session(pantsd=False):
with handler:
scheduler.product_request(Output, subjects=[0])

finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks))
Expand Down Expand Up @@ -595,8 +588,7 @@ def a_rule(n: int) -> Output:
scheduler, tracker, handler = self._fixture_for_rules(
[a_rule, QueryRule(Output, (int,))], max_workunit_verbosity=LogLevel.TRACE
)

with handler.session(pantsd=False):
with handler:
scheduler.product_request(Output, subjects=[0])

finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks))
Expand Down Expand Up @@ -624,8 +616,7 @@ async def a_rule() -> TrueResult:
[a_rule, QueryRule(TrueResult, tuple()), *process_rules(), *platform_rules()],
max_workunit_verbosity=LogLevel.TRACE,
)

with handler.session(pantsd=False):
with handler:
scheduler.record_test_observation(128)
scheduler.product_request(TrueResult, subjects=[0])
histograms_info = scheduler.get_observation_histograms()
Expand Down Expand Up @@ -687,8 +678,9 @@ def test_more_complicated_engine_aware(rule_runner: RuleRunner, run_tracker: Run
max_workunit_verbosity=LogLevel.TRACE,
specs=Specs.empty(),
options_bootstrapper=create_options_bootstrapper([]),
pantsd=False,
)
with handler.session(pantsd=False):
with handler:
input_1 = CreateDigest(
(
FileContent(path="a.txt", content=b"alpha"),
Expand All @@ -711,13 +703,11 @@ def test_more_complicated_engine_aware(rule_runner: RuleRunner, run_tracker: Run
item for item in finished if item["name"] == "pants.engine.internals.engine_test.a_rule"
)

streaming_workunit_context = handler._context

artifacts = workunit["artifacts"]
output_snapshot_1 = artifacts["snapshot_1"]
output_snapshot_2 = artifacts["snapshot_2"]

output_contents_list = streaming_workunit_context.snapshots_to_file_contents(
output_contents_list = handler.context.snapshots_to_file_contents(
[output_snapshot_1, output_snapshot_2]
)
assert len(output_contents_list) == 2
Expand Down Expand Up @@ -748,13 +738,14 @@ def test_process_digests_on_streaming_workunits(
max_workunit_verbosity=LogLevel.INFO,
specs=Specs.empty(),
options_bootstrapper=create_options_bootstrapper([]),
pantsd=False,
)

stdout_process = Process(
argv=("/bin/bash", "-c", "/bin/echo 'stdout output'"), description="Stdout process"
)

with handler.session(pantsd=False):
with handler:
result = rule_runner.request(ProcessResult, [stdout_process])

assert tracker.finished
Expand All @@ -780,13 +771,12 @@ def test_process_digests_on_streaming_workunits(
max_workunit_verbosity=LogLevel.INFO,
specs=Specs.empty(),
options_bootstrapper=create_options_bootstrapper([]),
pantsd=False,
)

stderr_process = Process(
argv=("/bin/bash", "-c", "1>&2 /bin/echo 'stderr output'"), description="Stderr process"
)

with handler.session(pantsd=False):
with handler:
result = rule_runner.request(ProcessResult, [stderr_process])

assert tracker.finished
Expand Down Expand Up @@ -846,13 +836,12 @@ def __call__(self, **kwargs) -> None:
max_workunit_verbosity=LogLevel.INFO,
specs=Specs.empty(),
options_bootstrapper=create_options_bootstrapper([]),
pantsd=False,
)

stdout_process = Process(
argv=("/bin/bash", "-c", "/bin/echo 'stdout output'"), description="Stdout process"
)

with handler.session(pantsd=False):
with handler:
rule_runner.request(ProcessResult, [stdout_process])


Expand Down Expand Up @@ -909,11 +898,10 @@ def __call__(self, **kwargs) -> None:
options_bootstrapper=create_options_bootstrapper(
["--backend-packages=pants.backend.python"]
),
pantsd=False,
)

stdout_process = Process(
argv=("/bin/bash", "-c", "/bin/echo 'stdout output'"), description="Stdout process"
)

with handler.session(pantsd=False):
with handler:
rule_runner.request(ProcessResult, [stdout_process])

0 comments on commit 0f27e41

Please sign in to comment.