Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

@union / UnionRule for letting the engine figure out paths to products not known in advance #7116

merged 19 commits into from Mar 6, 2019
Changes from 17 commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
separate the declared type from the actual subject type of the Get
cosmicexplorer Jan 20, 2019
plumb a dict of type constraint -> list<type constraint> for unions i…
cosmicexplorer Jan 20, 2019
add testing for union rules which fails
cosmicexplorer Jan 20, 2019
union rules work now
cosmicexplorer Jan 20, 2019
introduce Get.create_statically_for_rule_graph() to avoid confusion
cosmicexplorer Jan 20, 2019
introduce a wrapper datatype for normalized_rules() for no particular…
cosmicexplorer Jan 20, 2019
flesh out docstrings
cosmicexplorer Jan 20, 2019
add union_rules to all Scheduler() constructions
cosmicexplorer Jan 21, 2019
add followup issue link for graph visualization
cosmicexplorer Jan 21, 2019
test the Get typechecking which we now do during rule execution
cosmicexplorer Jan 21, 2019
add further checking to Get() arguments to overcome ambiguity that le…
cosmicexplorer Jan 21, 2019
make bundles and sources fields use union rules
cosmicexplorer Mar 3, 2019
remove all knowledge of union rules from the engine
cosmicexplorer Mar 3, 2019
turn panics into throws and make union tests pass!
cosmicexplorer Mar 3, 2019
remove @union_rule for UnionRule() datatype and add docstrings
cosmicexplorer Mar 3, 2019
cleanup in response to review comments
cosmicexplorer Mar 3, 2019
remove now-unused native lib helper
cosmicexplorer Mar 3, 2019
flesh out the error message TODO!
cosmicexplorer Mar 5, 2019
fix error message test
cosmicexplorer Mar 5, 2019
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.


Just for now

@@ -6,7 +6,7 @@

import logging
from builtins import object, str
from collections import namedtuple
from collections import OrderedDict, namedtuple

from twitter.common.collections import OrderedSet

@@ -38,6 +38,7 @@ def __init__(self):
self._exposed_context_aware_object_factory_by_alias = {}
self._optionables = OrderedSet()
self._rules = OrderedSet()
self._union_rules = OrderedDict()

def registered_aliases(self):
"""Return the registered aliases exposed in BUILD files.
@@ -152,10 +153,13 @@ def register_rules(self, rules):
raise TypeError('The rules must be an iterable, given {!r}'.format(rules))

# "Index" the rules to normalize them and expand their dependencies.
indexed_rules = RuleIndex.create(rules).normalized_rules()
normalized_rules = RuleIndex.create(rules).normalized_rules()
indexed_rules = normalized_rules.rules
union_rules = normalized_rules.union_rules

# Store the rules and record their dependency Optionables.
dependency_optionables = {do
for rule in indexed_rules
for do in rule.dependency_optionables
@@ -165,10 +169,17 @@ def register_rules(self, rules):
def rules(self):
"""Returns the registered rules.
:rtype list
:rtype: list
return list(self._rules)

def union_rules(self):
"""Returns a mapping of registered union base types -> [a list of union member types].
:rtype: OrderedDict
return self._union_rules

def _get_addressable_factory(self, target_type, alias):
return TargetAddressable.factory(target_type=target_type, alias=alias)
@@ -5,7 +5,7 @@
from __future__ import absolute_import, division, print_function, unicode_literals

import logging
from builtins import str, zip
from builtins import object, str, zip
from collections import defaultdict, deque
from contextlib import contextmanager
from os.path import dirname
@@ -24,7 +24,8 @@
from pants.engine.addressable import BuildFileAddresses
from pants.engine.fs import PathGlobs, Snapshot
from pants.engine.legacy.address_mapper import LegacyAddressMapper
from pants.engine.legacy.structs import BundleAdaptor, BundlesField, SourcesField, TargetAdaptor
from pants.engine.legacy.structs import (BundleAdaptor, BundlesField, HydrateableField,
SourcesField, TargetAdaptor)
from pants.engine.mapper import AddressMapper
from pants.engine.objects import Collection
from pants.engine.parser import SymbolTable, TargetAdaptorContainer
@@ -506,9 +507,7 @@ def hydrate_target(target_adaptor_container):
target_adaptor = target_adaptor_container.value
"""Construct a HydratedTarget from a TargetAdaptor and hydrated versions of its adapted fields."""
# Hydrate the fields of the adaptor and re-construct it.
hydrated_fields = yield [(Get(HydratedField, BundlesField, fa)
if type(fa) is BundlesField
else Get(HydratedField, SourcesField, fa))
hydrated_fields = yield [Get(HydratedField, HydrateableField, fa)

This comment has been minimized.

Copy link

stuhood Mar 3, 2019



for fa in target_adaptor.field_adaptors]
kwargs = target_adaptor.kwargs()
for field in hydrated_fields:
@@ -15,6 +15,7 @@
from pants.engine.addressable import addressable_list
from pants.engine.fs import GlobExpansionConjunction, PathGlobs
from pants.engine.objects import Locatable
from pants.engine.rules import UnionRule, union
from pants.engine.struct import Struct, StructWithDeps
from pants.source import wrapped_globs
from pants.util.collections_abc_backport import MutableSequence, MutableSet
@@ -129,6 +130,10 @@ class Field(object):
"""A marker for Target(Adaptor) fields for which the engine might perform extra construction."""

class HydrateableField(object): pass

class SourcesField(
datatype(['address', 'arg', 'filespecs', 'base_globs', 'path_globs', 'validate_fn']),
@@ -426,3 +431,10 @@ class GlobsWithConjunction(datatype([
def for_literal_files(cls, file_paths, spec_path):
return cls(Files(*file_paths, spec_path=spec_path), GlobExpansionConjunction.all_match)

def rules():
return [
UnionRule(HydrateableField, SourcesField),
UnionRule(HydrateableField, BundlesField),
@@ -423,30 +423,30 @@ def extern_generator_send(self, context_handle, func, arg):
if isinstance(res, Get):
# Get.
values = [res.subject]
constraints = [constraint_for(res.product)]
products = [constraint_for(res.product)]
tag = 2
elif type(res) in (tuple, list):
# GetMulti.
values = [g.subject for g in res]
constraints = [constraint_for(g.product) for g in res]
products = [constraint_for(g.product) for g in res]
tag = 3
# Break.
values = [res]
constraints = []
products = []
tag = 0
except Exception as e:
# Throw.
val = e
val._formatted_exc = traceback.format_exc()
values = [val]
constraints = []
products = []
tag = 1

return (
c.vals_buf([c.to_value(v) for v in values]),
c.vals_buf([c.to_value(v) for v in constraints])
c.vals_buf([c.to_value(v) for v in products]),

@_extern_decl('PyResult', ['ExternContext*', 'Handle*', 'Handle**', 'uint64_t'])
@@ -275,7 +275,9 @@ def resolve_type(name):
gets.update(Get(resolve_type(p), resolve_type(s)) for p, s in rule_visitor.gets)
Get.create_statically_for_rule_graph(resolve_type(p), resolve_type(s))
for p, s in rule_visitor.gets)

# For @console_rule, redefine the function to avoid needing a literal return of the output type.
if for_goal:
@@ -314,6 +316,50 @@ def console_rule(goal_name, input_selectors):
return _make_rule(output_type, input_selectors, goal_name, False)

def union(cls):
"""A class decorator which other classes can specify that they can resolve to with `UnionRule`.
Annotating a class with @union allows other classes to use a UnionRule() instance to indicate that
they can be resolved to this base union class. This class will never be instantiated, and should
have no members -- it is used as a tag only, and will be replaced with whatever object is passed
in as the subject of a `yield Get(...)`. See the following example:
class UnionBase(object): pass
@rule(B, [Select(X)])
def get_some_union_type(x):
result = yield Get(ResultType, UnionBase, x.f())
# ...
If there exists a single path from (whatever type the expression `x.f()` returns) -> `ResultType`
in the rule graph, the engine will retrieve and execute that path to produce a `ResultType` from
`x.f()`. This requires also that whatever type `x.f()` returns was registered as a union member of
`UnionBase` with a `UnionRule`.
Unions allow @rule bodies to be written without knowledge of what types may eventually be provided
as input -- rather, they let the engine check that there is a valid path to the desired result.
# TODO: Check that the union base type is used as a tag and nothing else (e.g. no attributes)!
assert isinstance(cls, type)
return type(cls.__name__, (cls,), {
'_is_union': True,

class UnionRule(datatype([
('union_base', type),
('union_member', type),
"""Specify that an instance of `union_member` can be substituted wherever `union_base` is used."""

def __new__(cls, union_base, union_member):
if not getattr(union_base, '_is_union', False):
raise cls.make_type_error('union_base must be a type annotated with @union: was {} (type {})'
.format(union_base, type(union_base).__name__))
return super(UnionRule, cls).__new__(cls, union_base, union_member)

class Rule(AbstractClass):
"""Rules declare how to produce products for the product graph.
@@ -375,9 +421,12 @@ def __new__(cls,

def __str__(self):
return '({}, {!r}, {})'.format(type_or_constraint_repr(self.output_constraint),
return ('({}, {!r}, {}, gets={}, opts={})'

class SingletonRule(datatype(['output_constraint', 'value']), Rule):
@@ -420,49 +469,68 @@ def dependency_optionables(self):
return tuple()

class RuleIndex(datatype(['rules', 'roots'])):
class RuleIndex(datatype(['rules', 'roots', 'union_rules'])):
"""Holds a normalized index of Rules used to instantiate Nodes."""

def create(cls, rule_entries):
def create(cls, rule_entries, union_rules=None):
"""Creates a RuleIndex with tasks indexed by their output type."""
serializable_rules = OrderedDict()
serializable_roots = OrderedSet()
union_rules = OrderedDict(union_rules or ())

def add_task(product_type, rule):
if product_type not in serializable_rules:
serializable_rules[product_type] = OrderedSet()

def add_root_rule(root_rule):

def add_rule(rule):
if isinstance(rule, RootRule):
# TODO: Ensure that interior types work by indexing on the list of types in
# the constraint. This heterogenity has some confusing implications:
# see
for kind in rule.output_constraint.types:
add_task(kind, rule)
add_task(rule.output_constraint, rule)
# TODO: Ensure that interior types work by indexing on the list of types in
# the constraint. This heterogenity has some confusing implications:
# see
for kind in rule.output_constraint.types:
add_task(kind, rule)
add_task(rule.output_constraint, rule)

def add_type_transition_rule(union_rule):
# NB: This does not require that union bases be supplied to `def rules():`, as the union type
# is never instantiated!
union_base = union_rule.union_base
assert union_base._is_union
union_member = union_rule.union_member
if union_base not in union_rules:
union_rules[union_base] = OrderedSet()

for entry in rule_entries:
if isinstance(entry, Rule):
elif isinstance(entry, UnionRule):
elif hasattr(entry, '__call__'):
rule = getattr(entry, 'rule', None)
if rule is None:
raise TypeError("Expected callable {} to be decorated with @rule.".format(entry))
# TODO: update this message!

This comment has been minimized.

Copy link

illicitonion Mar 5, 2019


Either do this, or flesh out the TODO explaining what update should happen.

This comment has been minimized.

Copy link

cosmicexplorer Mar 5, 2019

Author Contributor

Oops! Will do!

raise TypeError("Unexpected rule type: {}. "
"Rules either extend Rule, or are static functions "
"decorated with @rule.".format(type(entry)))

return cls(serializable_rules, serializable_roots)
return cls(serializable_rules, serializable_roots, union_rules)

class NormalizedRules(datatype(['rules', 'union_rules'])): pass

def normalized_rules(self):
rules = OrderedSet(rule
for ruleset in self.rules.values()
for rule in ruleset)
return rules
return self.NormalizedRules(rules, self.union_rules)
@@ -55,6 +55,7 @@ def __init__(
@@ -66,6 +67,8 @@ def __init__(
:param work_dir: The pants work dir.
:param local_store_dir: The directory to use for storing the engine's LMDB store in.
:param rules: A set of Rules which is used to compute values in the graph.
:param union_rules: A dict mapping union base types to member types so that rules can be written
against abstract union types without knowledge of downstream rulesets.
:param execution_options: Execution options for (remote) processes.
:param include_trace_on_error: Include the trace through the graph upon encountering errors.
:type include_trace_on_error: bool
@@ -79,7 +82,7 @@ def __init__(
self.include_trace_on_error = include_trace_on_error
self._visualize_to_dir = visualize_to_dir
# Validate and register all provided and intrinsic tasks.
rule_index = RuleIndex.create(list(rules))
rule_index = RuleIndex.create(list(rules), union_rules)
self._root_subject_types = [r.output_constraint for r in rule_index.roots]

# Create the native Scheduler and Session.
@@ -188,7 +191,7 @@ def _register_rules(self, rule_index):
if type(rule) is SingletonRule:
self._register_singleton(output_constraint, rule)
elif type(rule) is TaskRule:
self._register_task(output_constraint, rule)
self._register_task(output_constraint, rule, rule_index.union_rules)
raise ValueError('Unexpected Rule type: {}'.format(rule))

@@ -201,7 +204,7 @@ def _register_singleton(self, output_constraint, rule):

def _register_task(self, output_constraint, rule):
def _register_task(self, output_constraint, rule, union_rules):
"""Register the given TaskRule with the native scheduler."""
func = Function(self._to_key(rule.func))
self._native.lib.tasks_task_begin(self._tasks, func, output_constraint, rule.cacheable)
@@ -212,10 +215,21 @@ def _register_task(self, output_constraint, rule):
self._native.lib.tasks_add_select(self._tasks, product_constraint)
raise ValueError('Unrecognized Selector type: {}'.format(selector))
for get in rule.input_gets:

def add_get_edge(product, subject):

for get in rule.input_gets:
union_members = union_rules.get(get.subject_declared_type, None)
if union_members:
# If the registered subject type is a union, add get edges to all registered union members.
for union_member in union_members:
add_get_edge(get.product, union_member)
# Otherwise, the Get subject is a "concrete" type, so add a single get edge.
add_get_edge(get.product, get.subject_declared_type)

def visualize_graph_to_file(self, session, filename):
@@ -490,7 +504,6 @@ def run_console_rule(self, product, subject):
:param product: product type for the request.
:param subject: subject for the request.
:param v2_ui: whether to render the v2 engine UI
request = self.execution_request([product], [subject])
returns, throws = self.execute(request)
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.