Skip to content

Commit

Permalink
Snapshot contains a DirectoryDigest
Browse files Browse the repository at this point in the history
This better encapsulates the data, and will make it simpler to
differentiate between, and traverse across, places where we use each
type.

Also:
 * Stop double-hexifying fingerprints. We pass the hex version up from
   rust to python, so we shouldn't hexify that.
 * Use shared code for type checks.
 * Add proper error handling when lifting Digests and ExecuteProcesses.
 * Take dicts on the python side for env, not tuples - do the
   translation as a serialization stage, not as part of the interface.
  • Loading branch information
illicitonion authored and stuhood committed May 13, 2018
1 parent b994d45 commit 8b7dbf3
Show file tree
Hide file tree
Showing 10 changed files with 113 additions and 75 deletions.
46 changes: 30 additions & 16 deletions src/python/pants/engine/fs.py
Expand Up @@ -5,7 +5,6 @@
from __future__ import (absolute_import, division, generators, nested_scopes, print_function,
unicode_literals, with_statement)

from binascii import hexlify
from os.path import join

from pants.base.project_tree import Dir, File
Expand Down Expand Up @@ -50,19 +49,38 @@ def create(relative_to, include, exclude=tuple()):
tuple(join(relative_to, f) for f in exclude))


class Snapshot(datatype(['fingerprint', 'digest_length', 'path_stats'])):
class DirectoryDigest(datatype([('fingerprint', str), ('serialized_bytes_length', int)])):
"""A DirectoryDigest is an opaque handle to a set of files known about by the engine.
The contents of files can be inspected by requesting a FilesContent for it.
In the future, it will be possible to inspect the file metadata by requesting a Snapshot for it,
but at the moment we can't install rules which go both:
PathGlobs -> DirectoryDigest -> Snapshot
PathGlobs -> Snapshot
because it would lead to an ambiguity in the engine, and we have existing code which already
relies on the latter existing. This can be resolved when ordering is removed from Snapshots. See
https://github.com/pantsbuild/pants/issues/5802
"""

def __repr__(self):
return '''DirectoryDigest(fingerprint={}, serialized_bytes_length={})'''.format(
self.fingerprint[:8],
self.digest_length
)

def __str__(self):
return repr(self)


class Snapshot(datatype([('directory_digest', DirectoryDigest), ('path_stats', tuple)])):
"""A Snapshot is a collection of Files and Dirs fingerprinted by their names/content.
Snapshots are used to make it easier to isolate process execution by fixing the contents
of the files being operated on and easing their movement to and from isolated execution
sandboxes.
"""

def __new__(cls, fingerprint, digest_length, path_stats):
# We get a unicode instance when this is instantiated, so ensure it is
# converted to a str.
return super(Snapshot, cls).__new__(cls, str(fingerprint), digest_length, path_stats)

@property
def dirs(self):
return [p for p in self.path_stats if type(p.stat) == Dir]
Expand All @@ -79,12 +97,6 @@ def files(self):
def file_stats(self):
return [p.stat for p in self.files]

def __repr__(self):
return '''Snapshot(fingerprint='{}', digest_length='{}', entries={})'''.format(hexlify(self.fingerprint)[:8], self.digest_length, len(self.path_stats))

def __str__(self):
return repr(self)


FilesContent = Collection.of(FileContent)

Expand All @@ -95,9 +107,11 @@ def __str__(self):


EMPTY_SNAPSHOT = Snapshot(
fingerprint=_EMPTY_FINGERPRINT,
digest_length=0,
path_stats=[],
directory_digest=DirectoryDigest(
fingerprint=str(_EMPTY_FINGERPRINT),
serialized_bytes_length=0
),
path_stats=(),
)


Expand Down
43 changes: 16 additions & 27 deletions src/python/pants/engine/isolated_process.py
Expand Up @@ -7,50 +7,39 @@

import logging

from pants.engine.fs import EMPTY_SNAPSHOT
from pants.util.objects import datatype
from pants.engine.fs import EMPTY_SNAPSHOT, DirectoryDigest
from pants.util.objects import TypeCheckError, datatype


logger = logging.getLogger(__name__)


class ExecuteProcessRequest(datatype(['argv', 'env', 'input_files_digest', 'digest_length'])):
class ExecuteProcessRequest(datatype([('argv', tuple), ('env', tuple), ('input_files', DirectoryDigest)])):
"""Request for execution with args and snapshots to extract."""

@classmethod
def create_from_snapshot(cls, argv, env, snapshot):
cls._verify_env_is_dict(env)
return ExecuteProcessRequest(
argv=argv,
env=env,
input_files_digest=snapshot.fingerprint,
digest_length=snapshot.digest_length,
env=tuple(env.items()),
input_files=snapshot.directory_digest,
)

@classmethod
def create_with_empty_snapshot(cls, argv, env):
return cls.create_from_snapshot(argv, env, EMPTY_SNAPSHOT)

def __new__(cls, argv, env, input_files_digest, digest_length):
"""
:param args: Arguments to the process being run.
:param env: A tuple of environment variables and values.
"""
if not isinstance(argv, tuple):
raise ValueError('argv must be a tuple.')

if not isinstance(env, tuple):
raise ValueError('env must be a tuple.')

if not isinstance(input_files_digest, str):
raise ValueError('input_files_digest must be a str.')

if not isinstance(digest_length, int):
raise ValueError('digest_length must be an int.')
if digest_length < 0:
raise ValueError('digest_length must be >= 0.')

return super(ExecuteProcessRequest, cls).__new__(cls, argv, env, input_files_digest, digest_length)
@classmethod
def _verify_env_is_dict(cls, env):
if not isinstance(env, dict):
raise TypeCheckError(
cls.__name__,
"arg 'env' was invalid: value {} (with type {}) must be a dict".format(
env,
type(env)
)
)


class ExecuteProcessResult(datatype(['stdout', 'stderr', 'exit_code'])):
Expand Down
2 changes: 1 addition & 1 deletion src/python/pants/engine/legacy/graph.py
Expand Up @@ -365,7 +365,7 @@ def _eager_fileset_with_spec(spec_path, filespec, snapshot, include_dirs=False):
return EagerFilesetWithSpec(spec_path,
relpath_adjusted_filespec,
files=files,
files_hash=snapshot.fingerprint)
files_hash=snapshot.directory_digest.fingerprint)


@rule(HydratedField, [Select(SourcesField)])
Expand Down
6 changes: 6 additions & 0 deletions src/python/pants/engine/native.py
Expand Up @@ -183,6 +183,8 @@
Function,
Function,
Function,
Function,
TypeConstraint,
TypeConstraint,
TypeConstraint,
TypeConstraint,
Expand Down Expand Up @@ -705,6 +707,7 @@ def new_scheduler(self,
build_root,
work_dir,
ignore_patterns,
construct_directory_digest,
construct_snapshot,
construct_file_content,
construct_files_content,
Expand All @@ -717,6 +720,7 @@ def new_scheduler(self,
constraint_address,
constraint_variants,
constraint_path_globs,
constraint_directory_digest,
constraint_snapshot,
constraint_files_content,
constraint_dir,
Expand All @@ -735,6 +739,7 @@ def tc(constraint):
scheduler = self.lib.scheduler_create(
tasks,
# Constructors/functions.
func(construct_directory_digest),
func(construct_snapshot),
func(construct_file_content),
func(construct_files_content),
Expand All @@ -748,6 +753,7 @@ def tc(constraint):
tc(constraint_has_products),
tc(constraint_variants),
tc(constraint_path_globs),
tc(constraint_directory_digest),
tc(constraint_snapshot),
tc(constraint_files_content),
tc(constraint_dir),
Expand Down
4 changes: 3 additions & 1 deletion src/python/pants/engine/scheduler.py
Expand Up @@ -14,7 +14,7 @@
from pants.base.exceptions import TaskError
from pants.base.project_tree import Dir, File, Link
from pants.build_graph.address import Address
from pants.engine.fs import FileContent, FilesContent, Path, PathGlobs, Snapshot
from pants.engine.fs import DirectoryDigest, FileContent, FilesContent, Path, PathGlobs, Snapshot
from pants.engine.isolated_process import ExecuteProcessRequest, ExecuteProcessResult
from pants.engine.native import Function, TypeConstraint, TypeId
from pants.engine.nodes import Return, State, Throw
Expand Down Expand Up @@ -88,6 +88,7 @@ def __init__(self, native, build_root, work_dir, ignore_patterns, rule_index):
build_root,
work_dir,
ignore_patterns,
DirectoryDigest,
Snapshot,
FileContent,
FilesContent,
Expand All @@ -100,6 +101,7 @@ def __init__(self, native, build_root, work_dir, ignore_patterns, rule_index):
constraint_for(Address),
constraint_for(Variants),
constraint_for(PathGlobs),
constraint_for(DirectoryDigest),
constraint_for(Snapshot),
constraint_for(FilesContent),
constraint_for(Dir),
Expand Down
4 changes: 4 additions & 0 deletions src/rust/engine/src/lib.rs
Expand Up @@ -185,6 +185,7 @@ pub extern "C" fn externs_val_for(key: Key) -> Value {
#[no_mangle]
pub extern "C" fn scheduler_create(
tasks_ptr: *mut Tasks,
construct_directory_digest: Function,
construct_snapshot: Function,
construct_file_content: Function,
construct_files_content: Function,
Expand All @@ -197,6 +198,7 @@ pub extern "C" fn scheduler_create(
type_has_products: TypeConstraint,
type_has_variants: TypeConstraint,
type_path_globs: TypeConstraint,
type_directory_digest: TypeConstraint,
type_snapshot: TypeConstraint,
type_files_content: TypeConstraint,
type_dir: TypeConstraint,
Expand All @@ -217,6 +219,7 @@ pub extern "C" fn scheduler_create(
.to_strings()
.unwrap_or_else(|e| panic!("Failed to decode ignore patterns as UTF8: {:?}", e));
let types = Types {
construct_directory_digest: construct_directory_digest,
construct_snapshot: construct_snapshot,
construct_file_content: construct_file_content,
construct_files_content: construct_files_content,
Expand All @@ -229,6 +232,7 @@ pub extern "C" fn scheduler_create(
has_products: type_has_products,
has_variants: type_has_variants,
path_globs: type_path_globs,
directory_digest: type_directory_digest,
snapshot: type_snapshot,
files_content: type_files_content,
dir: type_dir,
Expand Down
52 changes: 36 additions & 16 deletions src/rust/engine/src/nodes.rs
Expand Up @@ -296,7 +296,11 @@ impl Select {
self.variants.clone(),
edges,
).run(context.clone())
.and_then(move |process_request_val| context.get(ExecuteProcess::lift(&process_request_val)))
.and_then(|process_request_val| {
ExecuteProcess::lift(&process_request_val)
.map_err(|str| throw(&format!("Error lifting ExecuteProcess: {}", str)))
})
.and_then(move |process_request| context.get(process_request))
.to_boxed()
}

Expand Down Expand Up @@ -535,34 +539,41 @@ impl SelectDependencies {
pub struct ExecuteProcess(process_execution::ExecuteProcessRequest);

impl ExecuteProcess {
fn lift_digest(digest: &Value) -> Result<hashing::Digest, String> {
let fingerprint = externs::project_str(&digest, "fingerprint");
let digest_length = externs::project_str(&digest, "serialized_bytes_length");
let digest_length_as_usize = digest_length
.parse::<usize>()
.map_err(|err| format!("Length was not a usize: {:?}", err))?;
Ok(hashing::Digest(
hashing::Fingerprint::from_hex_string(&fingerprint)?,
digest_length_as_usize,
))
}

///
/// Lifts a Key representing a python ExecuteProcessRequest value into a ExecuteProcess Node.
///
fn lift(value: &Value) -> ExecuteProcess {
fn lift(value: &Value) -> Result<ExecuteProcess, String> {
let mut env: BTreeMap<String, String> = BTreeMap::new();
let env_var_parts = externs::project_multi_strs(&value, "env");
// TODO: Error if env_var_parts.len() % 2 != 0
if env_var_parts.len() % 2 != 0 {
return Err(format!("Error parsing env: odd number of parts"));
}
for i in 0..(env_var_parts.len() / 2) {
env.insert(
env_var_parts[2 * i].clone(),
env_var_parts[2 * i + 1].clone(),
);
}
let digest = Self::lift_digest(&externs::project_ignoring_type(&value, "input_files"))
.map_err(|err| format!("Error parsing digest {}", err))?;

// TODO: Make this much less unwrap-happy with https://github.com/pantsbuild/pants/issues/5502
let fingerprint = externs::project_str(&value, "input_files_digest");
let digest_length = externs::project_str(&value, "digest_length");
let digest_length_as_usize = digest_length.parse::<usize>().unwrap();
let digest = hashing::Digest(
hashing::Fingerprint::from_hex_string(&fingerprint).unwrap(),
digest_length_as_usize,
);

ExecuteProcess(process_execution::ExecuteProcessRequest {
Ok(ExecuteProcess(process_execution::ExecuteProcessRequest {
argv: externs::project_multi_strs(&value, "argv"),
env: env,
input_files: digest,
})
}))
}
}

Expand Down Expand Up @@ -738,6 +749,16 @@ impl Snapshot {
})
}

fn store_directory(context: &Context, item: &hashing::Digest) -> Value {
externs::unsafe_call(
&context.core.types.construct_directory_digest,
&[
externs::store_bytes(item.0.to_hex().as_bytes()),
externs::store_i32(item.1 as i32),
],
)
}

fn store_snapshot(context: &Context, item: &fs::Snapshot) -> Value {
let path_stats: Vec<_> = item
.path_stats
Expand All @@ -747,8 +768,7 @@ impl Snapshot {
externs::unsafe_call(
&context.core.types.construct_snapshot,
&[
externs::store_bytes(&(item.digest.0).to_hex().as_bytes()),
externs::store_i32(item.digest.1 as i32),
Self::store_directory(context, &item.digest),
externs::store_tuple(&path_stats),
],
)
Expand Down
2 changes: 2 additions & 0 deletions src/rust/engine/src/types.rs
@@ -1,6 +1,7 @@
use core::{Function, TypeConstraint, TypeId};

pub struct Types {
pub construct_directory_digest: Function,
pub construct_snapshot: Function,
pub construct_file_content: Function,
pub construct_files_content: Function,
Expand All @@ -13,6 +14,7 @@ pub struct Types {
pub has_products: TypeConstraint,
pub has_variants: TypeConstraint,
pub path_globs: TypeConstraint,
pub directory_digest: TypeConstraint,
pub snapshot: TypeConstraint,
pub files_content: TypeConstraint,
pub dir: TypeConstraint,
Expand Down
2 changes: 1 addition & 1 deletion tests/python/pants_test/engine/test_fs.py
Expand Up @@ -65,7 +65,7 @@ def assert_digest(self, filespecs, expected_files):
result = self.execute(scheduler, Snapshot, self.specs('', *filespecs))[0]
# Confirm all expected files were digested.
self.assertEquals(set(expected_files), set(f.path for f in result.files))
self.assertTrue(result.fingerprint is not None)
self.assertTrue(result.directory_digest.fingerprint is not None)

def assert_fsnodes(self, filespecs, subject_product_pairs):
with self.mk_project_tree() as project_tree:
Expand Down

0 comments on commit 8b7dbf3

Please sign in to comment.