Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop caching console rules (related to #6146) #6516

Merged
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/python/pants/engine/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@
Handle val_for(Key);

Tasks* tasks_create(void);
void tasks_task_begin(Tasks*, Function, TypeConstraint);
void tasks_task_begin(Tasks*, Function, TypeConstraint, _Bool);
void tasks_add_get(Tasks*, TypeConstraint, TypeId);
void tasks_add_select(Tasks*, TypeConstraint);
void tasks_task_end(Tasks*);
Expand Down
12 changes: 6 additions & 6 deletions src/python/pants/engine/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def _terminated(generator, terminator):
yield terminator


def _make_rule(output_type, input_selectors, for_goal=None):
def _make_rule(output_type, input_selectors, for_goal=None, cacheable=True):
"""A @decorator that declares that a particular static function may be used as a TaskRule.

:param Constraint output_type: The return/output type for the Rule. This may be either a
Expand Down Expand Up @@ -118,7 +118,7 @@ def goal_and_return(*args, **kwargs):
else:
wrapped_func = func

wrapped_func._rule = TaskRule(output_type, input_selectors, wrapped_func, input_gets=list(gets))
wrapped_func._rule = TaskRule(output_type, input_selectors, wrapped_func, input_gets=list(gets), cacheable=cacheable)
wrapped_func.output_type = output_type
wrapped_func.goal = for_goal

Expand All @@ -132,7 +132,7 @@ def rule(output_type, input_selectors):

def console_rule(goal_name, input_selectors):
output_type = _GoalProduct.for_name(goal_name)
return _make_rule(output_type, input_selectors, goal_name)
return _make_rule(output_type, input_selectors, goal_name, False)


class Rule(AbstractClass):
Expand All @@ -147,13 +147,13 @@ def output_constraint(self):
"""An output Constraint type for the rule."""


class TaskRule(datatype(['output_constraint', 'input_selectors', 'input_gets', 'func']), Rule):
class TaskRule(datatype(['output_constraint', 'input_selectors', 'input_gets', 'func', 'cacheable']), Rule):
"""A Rule that runs a task function when all of its input selectors are satisfied.

TODO: Make input_gets non-optional when more/all rules are using them.
"""

def __new__(cls, output_type, input_selectors, func, input_gets=None):
def __new__(cls, output_type, input_selectors, func, input_gets=None, cacheable=True):
# Validate result type.
if isinstance(output_type, Exactly):
constraint = output_type
Expand All @@ -175,7 +175,7 @@ def __new__(cls, output_type, input_selectors, func, input_gets=None):
func.__name__, type(input_gets)))

# Create.
return super(TaskRule, cls).__new__(cls, constraint, tuple(input_selectors), tuple(input_gets), func)
return super(TaskRule, cls).__new__(cls, constraint, tuple(input_selectors), tuple(input_gets), func, cacheable)

def __str__(self):
return '({}, {!r}, {})'.format(type_or_constraint_repr(self.output_constraint),
Expand Down
2 changes: 1 addition & 1 deletion src/python/pants/engine/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ def _register_singleton(self, output_constraint, rule):
def _register_task(self, output_constraint, rule):
"""Register the given TaskRule with the native scheduler."""
func = Function(self._to_key(rule.func))
self._native.lib.tasks_task_begin(self._tasks, func, output_constraint)
self._native.lib.tasks_task_begin(self._tasks, func, output_constraint, rule.cacheable)
for selector in rule.input_selectors:
selector_type = type(selector)
product_constraint = self._to_constraint(selector.product)
Expand Down
24 changes: 18 additions & 6 deletions src/rust/engine/graph/src/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ impl Generation {
}

#[cfg_attr(feature = "cargo-clippy", allow(type_complexity))]
#[derive(Debug)]
pub(crate) enum EntryState<N: Node> {
// A node that has either been explicitly cleared, or has not yet started Running. In this state
// there is no need for a dirty bit because the RunToken is either in its initial state, or has
Expand Down Expand Up @@ -122,7 +123,7 @@ impl<N: Node> EntryKey<N> {
///
/// An Entry and its adjacencies.
///
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct Entry<N: Node> {
// TODO: This is a clone of the Node, which is also kept in the `nodes` map. It would be
// nice to avoid keeping two copies of each Node, but tracking references between the two
Expand Down Expand Up @@ -243,7 +244,7 @@ impl<N: Node> Entry<N> {
start_time: Instant::now(),
run_token,
generation,
previous_result,
previous_result: previous_result,
dirty: false,
}
}
Expand Down Expand Up @@ -295,7 +296,7 @@ impl<N: Node> Entry<N> {
dirty,
..
}
if !dirty =>
if !dirty && self.node.content().cacheable() =>
{
return future::result(result.clone())
.map(move |res| (res, generation))
Expand Down Expand Up @@ -329,21 +330,32 @@ impl<N: Node> Entry<N> {
dirty,
} => {
assert!(
dirty,
dirty || !self.node.content().cacheable(),
"A clean Node should not reach this point: {:?}",
result
);
// The Node has already completed but is now marked dirty. This indicates that we are the
// first caller to request it since it was marked dirty. We attempt to clean it (which will
// cause it to re-run if the dep_generations mismatch).
// Note that if the node is uncacheable, we avoid storing a previous result, which will
// transitively invalidate every node that depends on us. This works because, in practice,
// the only uncacheable nodes are Select nodes and @console_rule Task nodes. See #6146 and #6598
Self::run(
context,
&self.node,
entry_id,
run_token,
generation,
Some(dep_generations),
Some(result),
if self.node.content().cacheable() {
Some(dep_generations)
} else {
None
},
if self.node.content().cacheable() {
Some(result)
} else {
None
},
)
}
EntryState::Running { .. } => {
Expand Down
5 changes: 5 additions & 0 deletions src/rust/engine/graph/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ pub trait Node: Clone + Debug + Eq + Hash + Send + 'static {
/// If the given Node output represents an FS operation, returns its Digest.
///
fn digest(result: Self::Item) -> Option<Digest>;

///
/// If the node result is cacheable, return true.
///
fn cacheable(&self) -> bool;
}

pub trait NodeError: Clone + Debug + Eq + Send {
Expand Down
3 changes: 2 additions & 1 deletion src/rust/engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,9 +460,10 @@ pub extern "C" fn tasks_task_begin(
tasks_ptr: *mut Tasks,
func: Function,
output_type: TypeConstraint,
cacheable: bool,
) {
with_tasks(tasks_ptr, |tasks| {
tasks.task_begin(func, output_type);
tasks.task_begin(func, output_type, cacheable);
})
}

Expand Down
15 changes: 13 additions & 2 deletions src/rust/engine/src/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,7 @@ impl NodeKey {
}
}

#[allow(dead_code)]
impl Node for NodeKey {
type Context = Context;

Expand Down Expand Up @@ -908,10 +909,11 @@ impl Node for NodeKey {
typstr(&s.selector.product)
),
&NodeKey::Task(ref s) => format!(
"Task({}, {}, {})",
"Task({}, {}, {}, {})",
externs::project_str(&externs::val_for(&s.task.func.0), "__name__"),
keystr(&s.params.expect_single()),
typstr(&s.product)
typstr(&s.product),
s.task.cacheable,
),
&NodeKey::Snapshot(ref s) => format!("Snapshot({})", keystr(&s.0)),
}
Expand All @@ -927,6 +929,15 @@ impl Node for NodeKey {
| NodeResult::Value(_) => None,
}
}

fn cacheable(&self) -> bool {
Copy link
Sponsor Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed in the ticket: it would be totally fine to mark this method (and anything else that needs it) as #[allow(dead_code)] and defer actually using the cacheable flag for another PR. Up to you!

match self {
&NodeKey::Task(ref s) => s.task.cacheable,
// TODO Select nodes are made uncacheable as a workaround to #6146. Will be worked on in #6598
&NodeKey::Select(_) => false,
_ => true,
}
}
}

impl NodeError for Failure {
Expand Down
4 changes: 2 additions & 2 deletions src/rust/engine/src/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,14 @@ impl Tasks {
///
/// The following methods define the Task registration lifecycle.
///
pub fn task_begin(&mut self, func: Function, product: TypeConstraint) {
pub fn task_begin(&mut self, func: Function, product: TypeConstraint, cacheable: bool) {
assert!(
self.preparing.is_none(),
"Must `end()` the previous task creation before beginning a new one!"
);

self.preparing = Some(Task {
cacheable: true,
cacheable: cacheable,
product: product,
clause: Vec::new(),
gets: Vec::new(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,23 @@

class TestConsoleRuleIntegration(PantsDaemonIntegrationTestBase):

# TODO: Running this command a second time with the daemon will result in no output because
# of product caching. See #6146.
@ensure_daemon
def test_v2_list(self):
result = self.do_command(
'--no-v1',
'--v2',
'list',
'::',
success=True
)

output_lines = result.stdout_data.splitlines()
self.assertGreater(len(output_lines), 1000)
self.assertIn('3rdparty/python:psutil', output_lines)
with self.pantsd_successful_run_context() as (runner, checker, workdir, pantsd_config):
def run_list():
command = ['--no-v1',
'--v2',
'list',
'::',]

result = runner(command)
checker.assert_started()
return result

first_run = run_list()
first_run_output_lines = first_run.stdout_data.splitlines()
second_run = run_list()
second_run_output_lines = second_run.stdout_data.splitlines()
self.assertEqual(first_run_output_lines, second_run_output_lines)

@ensure_daemon
def test_v2_goal_validation(self):
Expand Down