Skip to content

Commit

Permalink
Include API that will store target info in run_tracker (#4561)
Browse files Browse the repository at this point in the history
### Problem
We would like to collect stats about the targets and tests being run so we would like to add a way to track this info. 

### Solution

The change included some refactoring to ease code re-use, a method to parse all the test information from the xml files, and two additional methods to collect target info and test info.

### Result

The changes shouldn't affect the end-user behavior. This change is for future development.
  • Loading branch information
dotordogh authored and Stu Hood committed May 13, 2017
1 parent e513b99 commit d523736
Show file tree
Hide file tree
Showing 7 changed files with 339 additions and 11 deletions.
12 changes: 12 additions & 0 deletions src/docs/dev_tasks.md
Expand Up @@ -265,3 +265,15 @@ Pants allows more fine grained cache management, although it then becomes the re
if self.artifact_cache_writes_enabled():
self.update_artifact_cache((vt, [output_location]))

Recording Target Specific Data
------------------------------
If you would like to track target information such as the targets being run,
their run times, or some other target-specific piece of data, `run_tracker`
provides this ability via the `report_target_info` method. The data reported
will be stored in the `run_info` JSON blob along with timestamp, run id, etc.

There are various reasons you might want to collect target information. The
information could be used for things like tracking developer behavior (for
example, inferring what code developers are constantly changing by observing
which targets are run most often) or target heath (for example, a historical
look at targets and their flakiness).
21 changes: 14 additions & 7 deletions src/python/pants/backend/jvm/tasks/junit_run.py
Expand Up @@ -351,6 +351,13 @@ def _collect_test_targets(self, targets):
return test_registry

def _run_tests(self, test_registry, output_dir, coverage=None):

def parse_error_handler(parse_error):
# Just log and move on since the result is only used to characterize failures, and raising
# an error here would just distract from the underlying test failures.
self.context.log.error('Error parsing test result file {path}: {cause}'
.format(path=parse_error.xml_path, cause=parse_error.cause))

if coverage:
extra_jvm_options = coverage.extra_jvm_options
classpath_prepend = coverage.classpath_prepend
Expand Down Expand Up @@ -423,17 +430,17 @@ def _run_tests(self, test_registry, output_dir, coverage=None):
create_synthetic_jar=self.synthetic_classpath,
))

tests_info = self.parse_test_info(output_dir, parse_error_handler, ['classname'])
for test_name, test_info in tests_info.items():
test_item = Test(test_info['classname'], test_name)
test_target = test_registry.get_owning_target(test_item)
self.report_test_info(self.options_scope, test_target, test_name, test_info)

if result != 0 and self._fail_fast:
break

if result != 0:
def error_handler(parse_error):
# Just log and move on since the result is only used to characterize failures, and raising
# an error here would just distract from the underlying test failures.
self.context.log.error('Error parsing test result file {path}: {cause}'
.format(path=parse_error.junit_xml_path, cause=parse_error.cause))

target_to_failed_test = parse_failed_targets(test_registry, output_dir, error_handler)
target_to_failed_test = parse_failed_targets(test_registry, output_dir, parse_error_handler)

def sort_owning_target(t):
return t.address.spec if t else None
Expand Down
27 changes: 24 additions & 3 deletions src/python/pants/backend/python/tasks2/pytest_run.py
Expand Up @@ -435,16 +435,21 @@ def _do_run_tests_with_args(self, pex, args):
self.context.log.info(traceback.format_exc())
return PythonTestResult.exception()

def _get_failed_targets_from_junitxml(self, junitxml, targets):
def _map_relsrc_to_targets(self, targets):
pex_src_root = os.path.relpath(
self.context.products.get_data(GatherSources.PYTHON_SOURCES).path(), get_buildroot())
# First map chrooted sources back to their targets.
relsrc_to_target = {os.path.join(pex_src_root, src): target for target in targets
for src in target.sources_relative_to_source_root()}
for src in target.sources_relative_to_source_root()}
# Also map the source tree-rooted sources, because in some cases (e.g., a failure to even
# eval the test file during test collection), that's the path pytest will use in the junit xml.
relsrc_to_target.update({src: target for target in targets
for src in target.sources_relative_to_buildroot()})
for src in target.sources_relative_to_buildroot()})

return relsrc_to_target

def _get_failed_targets_from_junitxml(self, junitxml, targets):
relsrc_to_target = self._map_relsrc_to_targets(targets)

# Now find the sources that contained failing tests.
failed_targets = set()
Expand All @@ -465,6 +470,11 @@ def _get_failed_targets_from_junitxml(self, junitxml, targets):

return failed_targets

def _get_target_from_test(self, test_info, targets):
relsrc_to_target = self._map_relsrc_to_targets(targets)
file_info = test_info['file']
return relsrc_to_target.get(file_info)

def _run_tests(self, targets):
if self.get_options().fast:
result = self._do_run_tests(targets)
Expand Down Expand Up @@ -528,6 +538,17 @@ def _do_run_tests(self, targets):
safe_mkdir(external_junit_xml_dir)
shutil.copy(junitxml_path, external_junit_xml_dir)
failed_targets = self._get_failed_targets_from_junitxml(junitxml_path, targets)

def parse_error_handler(parse_error):
# Simple error handler to pass to xml parsing function.
raise TaskError('Error parsing xml file at {}: {}'
.format(parse_error.xml_path, parse_error.cause))

all_tests_info = self.parse_test_info(junitxml_path, parse_error_handler, ['file', 'name'])
for test_name, test_info in all_tests_info.items():
test_target = self._get_target_from_test(test_info, targets)
self.report_test_info(self.options_scope, test_target, test_name, test_info)

return result.with_failed_targets(failed_targets)

def _pex_run(self, pex, workunit_name, args, env):
Expand Down
44 changes: 43 additions & 1 deletion src/python/pants/goal/run_tracker.py
Expand Up @@ -5,6 +5,7 @@
from __future__ import (absolute_import, division, generators, nested_scopes, print_function,
unicode_literals, with_statement)

import ast
import json
import multiprocessing
import os
Expand Down Expand Up @@ -144,6 +145,20 @@ def __init__(self, *args, **kwargs):

self._aborted = False

# Data will be organized first by target and then scope.
# Eg:
# {
# 'target/address:name': {
# 'running_scope': {
# 'run_duration': 356.09
# },
# 'GLOBAL': {
# 'target_type': 'pants.test'
# }
# }
# }
self._target_to_data = {}

def register_thread(self, parent_workunit):
"""Register the parent workunit for all work in the calling thread.
Expand Down Expand Up @@ -285,8 +300,13 @@ def write_stats_to_json(cls, file_name, stats):

def store_stats(self):
"""Store stats about this run in local and optionally remote stats dbs."""
run_information = self.run_info.get_as_dict()
target_data = run_information.get('target_data', None)
if target_data:
run_information['target_data'] = ast.literal_eval(target_data)

stats = {
'run_info': self.run_info.get_as_dict(),
'run_info': run_information,
'cumulative_timings': self.cumulative_timings.get_all(),
'self_timings': self.self_timings.get_all(),
'artifact_cache_stats': self.artifact_cache_stats.get_all(),
Expand Down Expand Up @@ -353,6 +373,9 @@ def end(self):
# If the goal is clean-all then the run info dir no longer exists, so ignore that error.
self.run_info.add_info('outcome', outcome_str, ignore_errors=True)

if self._target_to_data:
self.run_info.add_info('target_data', self._target_to_data)

self.report.close()
self.store_stats()

Expand Down Expand Up @@ -390,3 +413,22 @@ def shutdown_worker_pool(self):
N.B. This exists only for internal use and to afford for fork()-safe operation in pantsd.
"""
SubprocPool.shutdown(self._aborted)

def report_target_info(self, scope, target, key, val):
"""Add target information to run_info under target_data.
:param string scope: The scope for which we are reporting the information.
:param string target: The target for which we want to store information.
:param string key: The key that will point to the information being stored.
:param dict or string val: The value of the information being stored.
:API: public
"""
target_data = self._target_to_data.get(target)
if target_data is None:
self._target_to_data.update({target: {scope: {key: val}}})
else:
scope_data = target_data.get(scope)
if scope_data is None:
self._target_to_data[target][scope] = scope_data = {}
scope_data.update({key: val})
85 changes: 85 additions & 0 deletions src/python/pants/task/testrunner_task_mixin.py
Expand Up @@ -5,11 +5,14 @@
from __future__ import (absolute_import, division, generators, nested_scopes, print_function,
unicode_literals, with_statement)

import os
import re
from abc import abstractmethod
from threading import Timer

from pants.base.exceptions import ErrorWhileTesting
from pants.util.timeout import Timeout, TimeoutReached
from pants.util.xml_parser import XmlParser


class TestRunnerTaskMixin(object):
Expand Down Expand Up @@ -63,6 +66,88 @@ def execute(self):
all_targets = self._get_targets()
self._execute(all_targets)

def report_test_info(self, scope, target, test_name, test_info):
"""Add test information to target information.
:param string scope: The scope for which we are reporting information.
:param Target target: The target that we want to store the test information under.
:param string test_name: The key (test name) for the information being stored.
:param dict test_info: The information being stored.
"""
if target and scope:
address = target.address.spec
target_type = target.type_alias
self.context.run_tracker.report_target_info('GLOBAL', address, 'target_type', target_type)
self.context.run_tracker.report_target_info(scope, address, test_name, test_info)

@staticmethod
def parse_test_info(xml_path, error_handler, additional_testcase_attributes=None):
"""Parses the junit file for information needed about each test.
Will include:
- test result
- test run time duration
- test name
:param string xml_path: The path of the xml file to be parsed.
:param function error_handler: The error handler function.
:param list of string additional_testcase_attributes: A list of additional attributes belonging
to each testcase that should be included in test information.
:return: A dictionary of test information.
"""
tests_in_path = {}
testcase_attributes = additional_testcase_attributes or []

SUCCESS = 'success'
SKIPPED = 'skipped'
FAILURE = 'failure'
ERROR = 'error'

_XML_MATCHER = re.compile(r'^TEST-.+\.xml$')

class ParseError(Exception):
"""Indicates an error parsing a xml report file."""

def __init__(self, xml_path, cause):
super(ParseError, self).__init__('Error parsing test result file {}: {}'
.format(xml_path, cause))
self.xml_path = xml_path
self.cause = cause

def parse_xml_file(path):
try:
xml = XmlParser.from_file(path)
for testcase in xml.parsed.getElementsByTagName('testcase'):
test_info = {'time': float(testcase.getAttribute('time'))}
for attribute in testcase_attributes:
test_info[attribute] = testcase.getAttribute(attribute)

test_error = testcase.getElementsByTagName('error')
test_fail = testcase.getElementsByTagName('failure')
test_skip = testcase.getElementsByTagName('skipped')

if test_fail:
test_info.update({'result_code': FAILURE})
elif test_error:
test_info.update({'result_code': ERROR})
elif test_skip:
test_info.update({'result_code': SKIPPED})
else:
test_info.update({'result_code': SUCCESS})

tests_in_path.update({testcase.getAttribute('name'): test_info})

except (XmlParser.XmlError, ValueError) as e:
error_handler(ParseError(path, e))

if os.path.isdir(xml_path):
for name in os.listdir(xml_path):
if _XML_MATCHER.match(name):
parse_xml_file(os.path.join(xml_path, name))
else:
parse_xml_file(xml_path)

return tests_in_path

def _get_test_targets_for_spawn(self):
"""Invoked by _spawn_and_wait to know targets being executed. Defaults to _get_test_targets().
Expand Down
3 changes: 3 additions & 0 deletions tests/python/pants_test/base/context_utils.py
Expand Up @@ -55,6 +55,9 @@ def add_misses(self, cache_name, targets, causes): pass

artifact_cache_stats = DummyArtifactCacheStats()

def report_target_info(self, scope, target, keys, val): pass


@contextmanager
def new_workunit(self, name, labels=None, cmd='', log_config=None):
"""
Expand Down

0 comments on commit d523736

Please sign in to comment.