diff --git a/src/python/pants/engine/native.py b/src/python/pants/engine/native.py index 09fb9355bf1..16b1bda7013 100644 --- a/src/python/pants/engine/native.py +++ b/src/python/pants/engine/native.py @@ -33,11 +33,11 @@ CFFI_TYPEDEFS = ''' typedef uint64_t Id; -// Value is declared as a typedef rather than a wrapper struct because it avoids needing to wrap +// Handle is declared as a typedef rather than a wrapper struct because it avoids needing to wrap // the inner handle/`void*` in a tuple or datatype at the ffi boundary. For most types that -// overhead would not be worth worrying about, but Value is used often enough that it gives a 6% +// overhead would not be worth worrying about, but Handle is used often enough that it gives a 6% // speedup to avoid the wrapping. -typedef void* Value; +typedef void* Handle; typedef struct { Id id_; @@ -59,41 +59,40 @@ typedef struct { uint8_t* bytes_ptr; uint64_t bytes_len; - Value handle_; + Handle handle_; } Buffer; typedef struct { - Value* values_ptr; + Handle* values_ptr; uint64_t values_len; - Value handle_; -} ValueBuffer; + Handle handle_; +} HandleBuffer; typedef struct { TypeId* ids_ptr; uint64_t ids_len; - Value handle_; + Handle handle_; } TypeIdBuffer; typedef struct { Buffer* bufs_ptr; uint64_t bufs_len; - Value handle_; + Handle handle_; } BufferBuffer; typedef struct { _Bool is_throw; - Value value; + Handle value; } PyResult; typedef struct { - uint8_t tag; - ValueBuffer values; - ValueBuffer constraints; + uint8_t tag; + HandleBuffer values; + HandleBuffer constraints; } PyGeneratorResponse; typedef struct { int64_t hash_; - Value value; TypeId type_id; } Ident; @@ -102,22 +101,22 @@ // On the rust side the integration is defined in externs.rs typedef void (*extern_ptr_log)(ExternContext*, uint8_t, uint8_t*, uint64_t); typedef uint8_t extern_log_level; -typedef Ident (*extern_ptr_identify)(ExternContext*, Value*); -typedef _Bool (*extern_ptr_equals)(ExternContext*, Value*, Value*); -typedef Value (*extern_ptr_clone_val)(ExternContext*, Value*); -typedef void (*extern_ptr_drop_handles)(ExternContext*, Value*, uint64_t); +typedef Ident (*extern_ptr_identify)(ExternContext*, Handle*); +typedef _Bool (*extern_ptr_equals)(ExternContext*, Handle*, Handle*); +typedef Handle (*extern_ptr_clone_val)(ExternContext*, Handle*); +typedef void (*extern_ptr_drop_handles)(ExternContext*, Handle*, uint64_t); typedef Buffer (*extern_ptr_type_to_str)(ExternContext*, TypeId); -typedef Buffer (*extern_ptr_val_to_str)(ExternContext*, Value*); -typedef _Bool (*extern_ptr_satisfied_by)(ExternContext*, Value*, Value*); -typedef _Bool (*extern_ptr_satisfied_by_type)(ExternContext*, Value*, TypeId*); -typedef Value (*extern_ptr_store_tuple)(ExternContext*, Value*, uint64_t); -typedef Value (*extern_ptr_store_bytes)(ExternContext*, uint8_t*, uint64_t); -typedef Value (*extern_ptr_store_i64)(ExternContext*, int64_t); -typedef ValueBuffer (*extern_ptr_project_multi)(ExternContext*, Value*, uint8_t*, uint64_t); -typedef Value (*extern_ptr_project_ignoring_type)(ExternContext*, Value*, uint8_t*, uint64_t); -typedef Value (*extern_ptr_create_exception)(ExternContext*, uint8_t*, uint64_t); -typedef PyResult (*extern_ptr_call)(ExternContext*, Value*, Value*, uint64_t); -typedef PyGeneratorResponse (*extern_ptr_generator_send)(ExternContext*, Value*, Value*); +typedef Buffer (*extern_ptr_val_to_str)(ExternContext*, Handle*); +typedef _Bool (*extern_ptr_satisfied_by)(ExternContext*, Handle*, Handle*); +typedef _Bool (*extern_ptr_satisfied_by_type)(ExternContext*, Handle*, TypeId*); +typedef Handle (*extern_ptr_store_tuple)(ExternContext*, Handle**, uint64_t); +typedef Handle (*extern_ptr_store_bytes)(ExternContext*, uint8_t*, uint64_t); +typedef Handle (*extern_ptr_store_i64)(ExternContext*, int64_t); +typedef HandleBuffer (*extern_ptr_project_multi)(ExternContext*, Handle*, uint8_t*, uint64_t); +typedef Handle (*extern_ptr_project_ignoring_type)(ExternContext*, Handle*, uint8_t*, uint64_t); +typedef Handle (*extern_ptr_create_exception)(ExternContext*, uint8_t*, uint64_t); +typedef PyResult (*extern_ptr_call)(ExternContext*, Handle*, Handle**, uint64_t); +typedef PyGeneratorResponse (*extern_ptr_generator_send)(ExternContext*, Handle*, Handle*); typedef PyResult (*extern_ptr_eval)(ExternContext*, uint8_t*, uint64_t); typedef void Tasks; @@ -129,7 +128,7 @@ Key subject; TypeConstraint product; uint8_t state_tag; - Value state_value; + Handle state_value; } RawNode; typedef struct { @@ -163,8 +162,8 @@ extern_ptr_create_exception, TypeId); -Key externs_key_for(Value); -Value externs_val_for(Key); +Key key_for(Handle); +Handle val_for(Key); Tasks* tasks_create(void); void tasks_task_begin(Tasks*, Function, TypeConstraint); @@ -172,7 +171,7 @@ void tasks_add_select(Tasks*, TypeConstraint); void tasks_add_select_variant(Tasks*, TypeConstraint, Buffer); void tasks_task_end(Tasks*); -void tasks_singleton_add(Tasks*, Value, TypeConstraint); +void tasks_singleton_add(Tasks*, Handle, TypeConstraint); void tasks_destroy(Tasks*); Scheduler* scheduler_create(Tasks*, @@ -212,7 +211,7 @@ uint64_t, _Bool); void scheduler_pre_fork(Scheduler*); -Value scheduler_metrics(Scheduler*, Session*); +Handle scheduler_metrics(Scheduler*, Session*); RawNodes* scheduler_execute(Scheduler*, Session*, ExecutionRequest*); void scheduler_destroy(Scheduler*); @@ -230,13 +229,13 @@ PyResult execution_add_root_select(Scheduler*, ExecutionRequest*, Key, TypeConstraint); -PyResult capture_snapshots(Scheduler*, Value); +PyResult capture_snapshots(Scheduler*, Handle); -PyResult merge_directories(Scheduler*, Value); +PyResult merge_directories(Scheduler*, Handle); -PyResult materialize_directories(Scheduler*, Value); +PyResult materialize_directories(Scheduler*, Handle); -Value validator_run(Scheduler*); +PyResult validator_run(Scheduler*); void rule_graph_visualize(Scheduler*, TypeIdBuffer, char*); void rule_subgraph_visualize(Scheduler*, TypeId, TypeConstraint, char*); @@ -253,23 +252,23 @@ CFFI_EXTERNS = ''' extern "Python" { void extern_log(ExternContext*, uint8_t, uint8_t*, uint64_t); - PyResult extern_call(ExternContext*, Value*, Value*, uint64_t); - PyGeneratorResponse extern_generator_send(ExternContext*, Value*, Value*); + PyResult extern_call(ExternContext*, Handle*, Handle**, uint64_t); + PyGeneratorResponse extern_generator_send(ExternContext*, Handle*, Handle*); PyResult extern_eval(ExternContext*, uint8_t*, uint64_t); - Ident extern_identify(ExternContext*, Value*); - _Bool extern_equals(ExternContext*, Value*, Value*); - Value extern_clone_val(ExternContext*, Value*); - void extern_drop_handles(ExternContext*, Value*, uint64_t); + Ident extern_identify(ExternContext*, Handle*); + _Bool extern_equals(ExternContext*, Handle*, Handle*); + Handle extern_clone_val(ExternContext*, Handle*); + void extern_drop_handles(ExternContext*, Handle*, uint64_t); Buffer extern_type_to_str(ExternContext*, TypeId); - Buffer extern_val_to_str(ExternContext*, Value*); - _Bool extern_satisfied_by(ExternContext*, Value*, Value*); - _Bool extern_satisfied_by_type(ExternContext*, Value*, TypeId*); - Value extern_store_tuple(ExternContext*, Value*, uint64_t); - Value extern_store_bytes(ExternContext*, uint8_t*, uint64_t); - Value extern_store_i64(ExternContext*, int64_t); - Value extern_project_ignoring_type(ExternContext*, Value*, uint8_t*, uint64_t); - ValueBuffer extern_project_multi(ExternContext*, Value*, uint8_t*, uint64_t); - Value extern_create_exception(ExternContext*, uint8_t*, uint64_t); + Buffer extern_val_to_str(ExternContext*, Handle*); + _Bool extern_satisfied_by(ExternContext*, Handle*, Handle*); + _Bool extern_satisfied_by_type(ExternContext*, Handle*, TypeId*); + Handle extern_store_tuple(ExternContext*, Handle**, uint64_t); + Handle extern_store_bytes(ExternContext*, uint8_t*, uint64_t); + Handle extern_store_i64(ExternContext*, int64_t); + Handle extern_project_ignoring_type(ExternContext*, Handle*, uint8_t*, uint64_t); + HandleBuffer extern_project_multi(ExternContext*, Handle*, uint8_t*, uint64_t); + Handle extern_create_exception(ExternContext*, uint8_t*, uint64_t); } ''' @@ -364,28 +363,27 @@ def extern_log(context_handle, level, msg_ptr, msg_len): @ffi.def_extern() def extern_identify(context_handle, val): - """Return an Ident containing a clone of the Value with its __hash__ and TypeId.""" + """Return an Ident containing the __hash__ and TypeId for the given Handle.""" c = ffi.from_handle(context_handle) obj = ffi.from_handle(val[0]) hash_ = hash(obj) - cloned = c.to_value(obj) type_id = c.to_id(type(obj)) - return (hash_, cloned, TypeId(type_id)) + return (hash_, TypeId(type_id)) @ffi.def_extern() def extern_equals(context_handle, val1, val2): - """Return true if the given Values are __eq__.""" + """Return true if the given Handles are __eq__.""" return ffi.from_handle(val1[0]) == ffi.from_handle(val2[0]) @ffi.def_extern() def extern_clone_val(context_handle, val): - """Clone the given Value.""" + """Clone the given Handle.""" c = ffi.from_handle(context_handle) return c.to_value(ffi.from_handle(val[0])) @ffi.def_extern() def extern_drop_handles(context_handle, handles_ptr, handles_len): - """Drop the given Values.""" + """Drop the given Handles.""" c = ffi.from_handle(context_handle) handles = ffi.unpack(handles_ptr, handles_len) c.drop_handles(handles) @@ -398,13 +396,13 @@ def extern_type_to_str(context_handle, type_id): @ffi.def_extern() def extern_val_to_str(context_handle, val): - """Given a Value for `obj`, write str(obj) and return it.""" + """Given a Handle for `obj`, write str(obj) and return it.""" c = ffi.from_handle(context_handle) return c.utf8_buf(six.text_type(c.from_value(val[0]))) @ffi.def_extern() def extern_satisfied_by(context_handle, constraint_val, val): - """Given a TypeConstraint and a Value return constraint.satisfied_by(value).""" + """Given a TypeConstraint and a Handle return constraint.satisfied_by(value).""" constraint = ffi.from_handle(constraint_val[0]) return constraint.satisfied_by(ffi.from_handle(val[0])) @@ -417,25 +415,25 @@ def extern_satisfied_by_type(context_handle, constraint_val, cls_id): @ffi.def_extern() def extern_store_tuple(context_handle, vals_ptr, vals_len): - """Given storage and an array of Values, return a new Value to represent the list.""" + """Given storage and an array of Handles, return a new Handle to represent the list.""" c = ffi.from_handle(context_handle) - return c.to_value(tuple(c.from_value(val) for val in ffi.unpack(vals_ptr, vals_len))) + return c.to_value(tuple(c.from_value(val[0]) for val in ffi.unpack(vals_ptr, vals_len))) @ffi.def_extern() def extern_store_bytes(context_handle, bytes_ptr, bytes_len): - """Given a context and raw bytes, return a new Value to represent the content.""" + """Given a context and raw bytes, return a new Handle to represent the content.""" c = ffi.from_handle(context_handle) return c.to_value(bytes(ffi.buffer(bytes_ptr, bytes_len))) @ffi.def_extern() def extern_store_i64(context_handle, i64): - """Given a context and int32_t, return a new Value to represent the int32_t.""" + """Given a context and int32_t, return a new Handle to represent the int32_t.""" c = ffi.from_handle(context_handle) return c.to_value(i64) @ffi.def_extern() def extern_project_ignoring_type(context_handle, val, field_str_ptr, field_str_len): - """Given a Value for `obj`, and a field name, project the field as a new Value.""" + """Given a Handle for `obj`, and a field name, project the field as a new Handle.""" c = ffi.from_handle(context_handle) obj = c.from_value(val[0]) field_name = to_py_str(field_str_ptr, field_str_len) @@ -499,12 +497,12 @@ def extern_call(context_handle, func, args_ptr, args_len): """Given a callable, call it.""" c = ffi.from_handle(context_handle) runnable = c.from_value(func[0]) - args = tuple(c.from_value(arg) for arg in ffi.unpack(args_ptr, args_len)) + args = tuple(c.from_value(arg[0]) for arg in ffi.unpack(args_ptr, args_len)) return call(c, runnable, args) @ffi.def_extern() def extern_eval(context_handle, python_code_str_ptr, python_code_str_len): - """Given an evalable string, eval it and return a Value for its result.""" + """Given an evalable string, eval it and return a Handle for its result.""" c = ffi.from_handle(context_handle) return call(c, eval, [to_py_str(python_code_str_ptr, python_code_str_len)]) @@ -533,7 +531,7 @@ class ExternContext(object): """A wrapper around python objects used in static extern functions in this module. See comments in `src/rust/engine/src/interning.rs` for more information on the relationship - between `Key`s and `Value`s. + between `Key`s and `Handle`s. """ def __init__(self, ffi, lib): @@ -566,7 +564,7 @@ def utf8_buf_buf(self, strings): return (buf_buf, len(bufs), self.to_value(buf_buf)) def vals_buf(self, vals): - buf = self._ffi.new('Value[]', vals) + buf = self._ffi.new('Handle[]', vals) return (buf, len(vals), self.to_value(buf)) def type_ids_buf(self, types): @@ -581,6 +579,15 @@ def to_value(self, obj): def from_value(self, val): return self._ffi.from_handle(val) + def raise_or_return(self, pyresult): + """Consumes the given PyResult to raise/return the exception/value it represents.""" + value = self.from_value(pyresult.value) + self._handles.remove(pyresult.value) + if pyresult.is_throw: + raise value + else: + return value + def drop_handles(self, handles): self._handles -= set(handles) @@ -593,11 +600,11 @@ def from_id(self, type_id): return self._types[type_id] def to_key(self, obj): - cdata = self._lib.externs_key_for(self.to_value(obj)) + cdata = self._lib.key_for(self.to_value(obj)) return Key(cdata.id_, TypeId(cdata.type_id.id_)) def from_key(self, key): - return self._lib.externs_val_for(key) + return self._lib.val_for(key) class Native(object): @@ -662,7 +669,7 @@ def _ffi_module(self): @memoized_property def context(self): # We statically initialize a ExternContext to correspond to the queue of dropped - # Values that the native code maintains. + # Handles that the native code maintains. def init_externs(): context = ExternContext(self.ffi, self.lib) self.lib.externs_set(context._handle, diff --git a/src/python/pants/engine/scheduler.py b/src/python/pants/engine/scheduler.py index 7aec846ea63..5c1b2546c6a 100644 --- a/src/python/pants/engine/scheduler.py +++ b/src/python/pants/engine/scheduler.py @@ -163,11 +163,7 @@ def graph_trace(self, execution_request): yield line.rstrip() def _assert_ruleset_valid(self): - raw_value = self._native.lib.validator_run(self._scheduler) - value = self._from_value(raw_value) - - if isinstance(value, Exception): - raise ValueError(str(value)) + self._raise_or_return(self._native.lib.validator_run(self._scheduler)) def _to_value(self, obj): return self._native.context.to_value(obj) @@ -176,11 +172,7 @@ def _from_value(self, val): return self._native.context.from_value(val) def _raise_or_return(self, pyresult): - value = self._from_value(pyresult.value) - if pyresult.is_throw: - raise value - else: - return value + return self._native.context.raise_or_return(pyresult) def _to_id(self, typ): return self._native.context.to_id(typ) diff --git a/src/rust/engine/fs/src/glob_matching.rs b/src/rust/engine/fs/src/glob_matching.rs index 8cc9e08e88d..392a448d3cf 100644 --- a/src/rust/engine/fs/src/glob_matching.rs +++ b/src/rust/engine/fs/src/glob_matching.rs @@ -25,7 +25,7 @@ pub trait GlobMatching: VFS { /// /// TODO: Should handle symlink loops (which would exhibit as an infinite loop in expand). /// - fn canonicalize(&self, symbolic_path: PathBuf, link: Link) -> BoxFuture, E> { + fn canonicalize(&self, symbolic_path: PathBuf, link: &Link) -> BoxFuture, E> { GlobMatchingImplementation::canonicalize(self, symbolic_path, link) } @@ -94,7 +94,8 @@ trait GlobMatchingImplementation: VFS { // Match any relevant Stats, and join them into PathStats. future::join_all( dir_listing - .into_iter() + .0 + .iter() .filter(|stat| { // Match relevant filenames. stat @@ -121,12 +122,14 @@ trait GlobMatchingImplementation: VFS { } else { match stat { Stat::Link(l) => context.canonicalize(stat_symbolic_path, l), - Stat::Dir(d) => { - future::ok(Some(PathStat::dir(stat_symbolic_path.to_owned(), d))).to_boxed() - } - Stat::File(f) => { - future::ok(Some(PathStat::file(stat_symbolic_path.to_owned(), f))).to_boxed() - } + Stat::Dir(d) => future::ok(Some(PathStat::dir( + stat_symbolic_path.to_owned(), + d.clone(), + ))).to_boxed(), + Stat::File(f) => future::ok(Some(PathStat::file( + stat_symbolic_path.to_owned(), + f.clone(), + ))).to_boxed(), } } }) @@ -134,7 +137,7 @@ trait GlobMatchingImplementation: VFS { ) }) .map(|path_stats| { - // See the TODO above. + // See the note above. path_stats.into_iter().filter_map(|pso| pso).collect() }) .to_boxed() @@ -369,7 +372,7 @@ trait GlobMatchingImplementation: VFS { } } - fn canonicalize(&self, symbolic_path: PathBuf, link: Link) -> BoxFuture, E> { + fn canonicalize(&self, symbolic_path: PathBuf, link: &Link) -> BoxFuture, E> { // Read the link, which may result in PathGlob(s) that match 0 or 1 Path. let context = self.clone(); self diff --git a/src/rust/engine/fs/src/lib.rs b/src/rust/engine/fs/src/lib.rs index 6601ba99067..2bb9f66df33 100644 --- a/src/rust/engine/fs/src/lib.rs +++ b/src/rust/engine/fs/src/lib.rs @@ -123,6 +123,9 @@ impl PathStat { } } +#[derive(Debug, Eq, PartialEq)] +pub struct DirectoryListing(pub Vec); + #[derive(Debug)] pub struct GitignoreStyleExcludes { patterns: Vec, @@ -649,23 +652,24 @@ impl PosixFS { PosixFS::stat_internal(relative_path, metadata.file_type(), &root, || Ok(metadata)) } - pub fn scandir(&self, dir: &Dir) -> BoxFuture, io::Error> { + pub fn scandir(&self, dir: &Dir) -> BoxFuture { let dir = dir.to_owned(); let root = self.root.0.clone(); self .pool .spawn_fn(move || PosixFS::scandir_sync(root, &dir)) + .map(DirectoryListing) .to_boxed() } } impl VFS for Arc { - fn read_link(&self, link: Link) -> BoxFuture { - PosixFS::read_link(self, &link) + fn read_link(&self, link: &Link) -> BoxFuture { + PosixFS::read_link(self, link) } - fn scandir(&self, dir: Dir) -> BoxFuture, io::Error> { - PosixFS::scandir(self, &dir) + fn scandir(&self, dir: Dir) -> BoxFuture, io::Error> { + PosixFS::scandir(self, &dir).map(Arc::new).to_boxed() } fn is_ignored(&self, stat: &Stat) -> bool { @@ -702,7 +706,7 @@ impl PathStatGetter for Arc { .and_then(move |maybe_stat| { match maybe_stat { // Note: This will drop PathStats for symlinks which don't point anywhere. - Some(Stat::Link(link)) => fs.canonicalize(link.0.clone(), link), + Some(Stat::Link(link)) => fs.canonicalize(link.0.clone(), &link), Some(Stat::Dir(dir)) => { future::ok(Some(PathStat::dir(dir.0.clone(), dir))).to_boxed() } @@ -722,8 +726,8 @@ impl PathStatGetter for Arc { /// A context for filesystem operations parameterized on an error type 'E'. /// pub trait VFS: Clone + Send + Sync + 'static { - fn read_link(&self, link: Link) -> BoxFuture; - fn scandir(&self, dir: Dir) -> BoxFuture, E>; + fn read_link(&self, link: &Link) -> BoxFuture; + fn scandir(&self, dir: Dir) -> BoxFuture, E>; fn is_ignored(&self, stat: &Stat) -> bool; fn mk_error(msg: &str) -> E; } @@ -782,7 +786,9 @@ mod posixfs_test { extern crate tempfile; extern crate testutil; - use super::{Dir, File, Link, PathStat, PathStatGetter, PosixFS, ResettablePool, Stat}; + use super::{ + Dir, DirectoryListing, File, Link, PathStat, PathStatGetter, PosixFS, ResettablePool, Stat, + }; use futures::Future; use std; use std::path::{Path, PathBuf}; @@ -914,7 +920,10 @@ mod posixfs_test { let posix_fs = new_posixfs(&dir.path()); let path = PathBuf::from("empty_enclosure"); std::fs::create_dir(dir.path().join(&path)).unwrap(); - assert_eq!(posix_fs.scandir(&Dir(path)).wait().unwrap(), vec![]); + assert_eq!( + posix_fs.scandir(&Dir(path)).wait().unwrap(), + DirectoryListing(vec![]) + ); } #[test] @@ -948,7 +957,7 @@ mod posixfs_test { assert_eq!( posix_fs.scandir(&Dir(path)).wait().unwrap(), - vec![ + DirectoryListing(vec![ Stat::File(File { path: a_marmoset, is_executable: false, @@ -963,7 +972,7 @@ mod posixfs_test { path: sneaky_marmoset, is_executable: false, }), - ] + ]) ); } @@ -1062,8 +1071,8 @@ mod posixfs_test { fn assert_only_file_is_executable(path: &Path, want_is_executable: bool) { let fs = new_posixfs(path); let stats = fs.scandir(&Dir(PathBuf::from("."))).wait().unwrap(); - assert_eq!(stats.len(), 1); - match stats.get(0).unwrap() { + assert_eq!(stats.0.len(), 1); + match stats.0.get(0).unwrap() { &super::Stat::File(File { is_executable: got, .. }) => assert_eq!(want_is_executable, got), diff --git a/src/rust/engine/fs/src/snapshot.rs b/src/rust/engine/fs/src/snapshot.rs index 76e47705bd4..da9ce1e306b 100644 --- a/src/rust/engine/fs/src/snapshot.rs +++ b/src/rust/engine/fs/src/snapshot.rs @@ -21,7 +21,7 @@ pub const EMPTY_FINGERPRINT: Fingerprint = Fingerprint([ ]); pub const EMPTY_DIGEST: Digest = Digest(EMPTY_FINGERPRINT, 0); -#[derive(Clone, Eq, Hash, PartialEq)] +#[derive(Eq, Hash, PartialEq)] pub struct Snapshot { pub digest: Digest, pub path_stats: Vec, @@ -614,10 +614,14 @@ mod tests { ); let merged_res = { - let snapshot = Snapshot::from_path_stats(store.clone(), digester.clone(), vec![file]) + let snapshot1 = + Snapshot::from_path_stats(store.clone(), digester.clone(), vec![file.clone()]) + .wait() + .unwrap(); + let snapshot2 = Snapshot::from_path_stats(store.clone(), digester.clone(), vec![file]) .wait() .unwrap(); - Snapshot::merge(store.clone(), &[snapshot.clone(), snapshot]).wait() + Snapshot::merge(store.clone(), &[snapshot1, snapshot2]).wait() }; match merged_res { diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index 56d9f4c94d7..cec7cda4515 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -12,10 +12,9 @@ use futures::Future; use boxfuture::{BoxFuture, Boxable}; use core::{Failure, TypeId}; -use externs; use fs::{safe_create_dir_all_ioerror, PosixFS, ResettablePool, Store}; use graph::{EntryId, Graph, NodeContext}; -use handles::maybe_drain_handles; +use handles::maybe_drop_handles; use nodes::{NodeKey, TryInto, WrappedNode}; use process_execution::{self, BoundedCommandRunner, CommandRunner}; use resettable::Resettable; @@ -148,9 +147,7 @@ impl Context { /// pub fn get(&self, node: N) -> BoxFuture { // TODO: Odd place for this... could do it periodically in the background? - if let Some(handles) = maybe_drain_handles() { - externs::drop_handles(&handles); - } + maybe_drop_handles(); self .core .graph diff --git a/src/rust/engine/src/core.rs b/src/rust/engine/src/core.rs index d8a11f47fef..b03571a4505 100644 --- a/src/rust/engine/src/core.rs +++ b/src/rust/engine/src/core.rs @@ -4,11 +4,12 @@ use fnv::FnvHasher; use std::collections::HashMap; -use std::ops::Drop; +use std::ops::Deref; +use std::sync::Arc; use std::{fmt, hash}; use externs; -use handles::{enqueue_drop_handle, Handle}; +use handles::Handle; pub type FNV = hash::BuildHasherDefault; @@ -106,57 +107,47 @@ impl Key { } /// -/// Represents a handle to a python object, explicitly without equality or hashing. Whenever -/// the equality/identity of a Value matters, a Key should be computed for it and used instead. +/// A wrapper around a handle: soon to contain an Arc. /// -/// Value implements Clone by calling out to a python extern `clone_val` which clones the -/// underlying CFFI handle. -/// -#[repr(C)] -pub struct Value(Handle); +#[derive(Clone, Eq, PartialEq)] +pub struct Value(Arc); -// By default, Values would not be marked Send because of the raw pointer they hold. -// Because the handle is opaque and can't be cloned, we can safely implement Send. -unsafe impl Send for Value {} -unsafe impl Sync for Value {} - -impl Drop for Value { - fn drop(&mut self) { - enqueue_drop_handle(self.0); +impl Value { + pub fn new(handle: Handle) -> Value { + Value(Arc::new(handle)) } } -impl Value { - /// - /// An escape hatch to allow for cloning a Value without cloning its handle. You should generally - /// not do this unless you are certain the input Value has been mem::forgotten (otherwise it - /// will be `Drop`ed twice). - /// - pub unsafe fn clone_without_handle(&self) -> Value { - Value(self.0) +impl Deref for Value { + type Target = Handle; + + fn deref(&self) -> &Handle { + &self.0 } } -impl PartialEq for Value { - fn eq(&self, other: &Value) -> bool { - externs::equals(self, other) +impl fmt::Debug for Value { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", externs::val_to_str(&self)) } } -impl Eq for Value {} - /// -/// Implemented by calling back to python to clone the underlying Handle. +/// Creates a Handle (which represents exclusive access) from a Value (which might be shared), +/// cloning if necessary. /// -impl Clone for Value { - fn clone(&self) -> Value { - externs::clone_val(self) +impl From for Handle { + fn from(value: Value) -> Self { + match Arc::try_unwrap(value.0) { + Ok(handle) => handle, + Err(arc_handle) => externs::clone_val(&arc_handle), + } } } -impl fmt::Debug for Value { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}", externs::val_to_str(&self)) +impl From for Value { + fn from(handle: Handle) -> Self { + Value::new(handle) } } diff --git a/src/rust/engine/src/externs.rs b/src/rust/engine/src/externs.rs index 8321a51f88f..4c7c3f44448 100644 --- a/src/rust/engine/src/externs.rs +++ b/src/rust/engine/src/externs.rs @@ -10,7 +10,7 @@ use std::sync::RwLock; use core::{Failure, Function, Key, TypeConstraint, TypeId, Value}; use enum_primitive::FromPrimitive; -use handles::Handle; +use handles::{DroppingHandle, Handle}; use interning::Interns; use log; @@ -19,11 +19,11 @@ pub fn eval(python: &str) -> Result { } pub fn identify(val: &Value) -> Ident { - with_externs(|e| (e.identify)(e.context, val)) + with_externs(|e| (e.identify)(e.context, val as &Handle)) } -pub fn equals(val1: &Value, val2: &Value) -> bool { - with_externs(|e| (e.equals)(e.context, val1, val2)) +pub fn equals(h1: &Handle, h2: &Handle) -> bool { + with_externs(|e| (e.equals)(e.context, h1, h2)) } pub fn key_for(val: Value) -> Key { @@ -36,45 +36,69 @@ pub fn val_for(key: &Key) -> Value { interns.get(key).clone() } -pub fn clone_val(val: &Value) -> Value { - with_externs(|e| (e.clone_val)(e.context, val)) +pub fn clone_val(handle: &Handle) -> Handle { + with_externs(|e| (e.clone_val)(e.context, handle)) } -pub fn drop_handles(handles: &[Handle]) { +pub fn drop_handles(handles: &[DroppingHandle]) { with_externs(|e| (e.drop_handles)(e.context, handles.as_ptr(), handles.len() as u64)) } pub fn satisfied_by(constraint: &TypeConstraint, obj: &Value) -> bool { let interns = INTERNS.read().unwrap(); - with_externs(|e| (e.satisfied_by)(e.context, interns.get(&constraint.0), obj)) + with_externs(|e| { + (e.satisfied_by)( + e.context, + interns.get(&constraint.0) as &Handle, + obj as &Handle, + ) + }) } pub fn satisfied_by_type(constraint: &TypeConstraint, cls: TypeId) -> bool { let interns = INTERNS.read().unwrap(); - with_externs(|e| (e.satisfied_by_type)(e.context, interns.get(&constraint.0), &cls)) + with_externs(|e| (e.satisfied_by_type)(e.context, interns.get(&constraint.0) as &Handle, &cls)) } pub fn store_tuple(values: &[Value]) -> Value { - with_externs(|e| (e.store_tuple)(e.context, values.as_ptr(), values.len() as u64)) + let handles: Vec<_> = values + .iter() + .map(|v| v as &Handle as *const Handle) + .collect(); + with_externs(|e| (e.store_tuple)(e.context, handles.as_ptr(), handles.len() as u64).into()) } pub fn store_bytes(bytes: &[u8]) -> Value { - with_externs(|e| (e.store_bytes)(e.context, bytes.as_ptr(), bytes.len() as u64)) + with_externs(|e| (e.store_bytes)(e.context, bytes.as_ptr(), bytes.len() as u64).into()) } pub fn store_i64(val: i64) -> Value { - with_externs(|e| (e.store_i64)(e.context, val)) + with_externs(|e| (e.store_i64)(e.context, val).into()) } /// /// Pulls out the value specified by the field name from a given Value /// pub fn project_ignoring_type(value: &Value, field: &str) -> Value { - with_externs(|e| (e.project_ignoring_type)(e.context, value, field.as_ptr(), field.len() as u64)) + with_externs(|e| { + (e.project_ignoring_type)( + e.context, + value as &Handle, + field.as_ptr(), + field.len() as u64, + ).into() + }) } pub fn project_multi(value: &Value, field: &str) -> Vec { - with_externs(|e| (e.project_multi)(e.context, value, field.as_ptr(), field.len() as u64).to_vec()) + with_externs(|e| { + (e.project_multi)( + e.context, + value as &Handle, + field.as_ptr(), + field.len() as u64, + ).to_vec() + }) } pub fn project_multi_strs(item: &Value, field: &str) -> Vec { @@ -86,7 +110,12 @@ pub fn project_multi_strs(item: &Value, field: &str) -> Vec { pub fn project_str(value: &Value, field: &str) -> String { let name_val = with_externs(|e| { - (e.project_ignoring_type)(e.context, value, field.as_ptr(), field.len() as u64) + (e.project_ignoring_type)( + e.context, + value as &Handle, + field.as_ptr(), + field.len() as u64, + ).into() }); val_to_str(&name_val) } @@ -105,14 +134,14 @@ pub fn type_to_str(type_id: TypeId) -> String { pub fn val_to_str(val: &Value) -> String { with_externs(|e| { - (e.val_to_str)(e.context, val) + (e.val_to_str)(e.context, val as &Handle) .to_string() .unwrap_or_else(|e| format!("", val, e)) }) } pub fn create_exception(msg: &str) -> Value { - with_externs(|e| (e.create_exception)(e.context, msg.as_ptr(), msg.len() as u64)) + with_externs(|e| (e.create_exception)(e.context, msg.as_ptr(), msg.len() as u64).into()) } pub fn call_method(value: &Value, method: &str, args: &[Value]) -> Result { @@ -120,11 +149,20 @@ pub fn call_method(value: &Value, method: &str, args: &[Value]) -> Result Result { - with_externs(|e| (e.call)(e.context, func, args.as_ptr(), args.len() as u64)).into() + let arg_handles: Vec<_> = args.iter().map(|v| v as &Handle as *const Handle).collect(); + with_externs(|e| { + (e.call)( + e.context, + func as &Handle, + arg_handles.as_ptr(), + args.len() as u64, + ) + }).into() } pub fn generator_send(generator: &Value, arg: &Value) -> Result { - let response = with_externs(|e| (e.generator_send)(e.context, generator, arg)); + let response = + with_externs(|e| (e.generator_send)(e.context, generator as &Handle, arg as &Handle)); match response.res_type { PyGeneratorResponseType::Break => Ok(GeneratorResponse::Break(response.values.unwrap_one())), PyGeneratorResponseType::Throw => Err(PyResult::failure_from(response.values.unwrap_one())), @@ -241,30 +279,34 @@ unsafe impl Send for Externs {} pub type LogExtern = extern "C" fn(*const ExternContext, u8, str_ptr: *const u8, str_len: u64); pub type SatisfiedByExtern = - extern "C" fn(*const ExternContext, *const Value, *const Value) -> bool; + extern "C" fn(*const ExternContext, *const Handle, *const Handle) -> bool; pub type SatisfiedByTypeExtern = - extern "C" fn(*const ExternContext, *const Value, *const TypeId) -> bool; + extern "C" fn(*const ExternContext, *const Handle, *const TypeId) -> bool; -pub type IdentifyExtern = extern "C" fn(*const ExternContext, *const Value) -> Ident; +pub type IdentifyExtern = extern "C" fn(*const ExternContext, *const Handle) -> Ident; -pub type EqualsExtern = extern "C" fn(*const ExternContext, *const Value, *const Value) -> bool; +pub type EqualsExtern = extern "C" fn(*const ExternContext, *const Handle, *const Handle) -> bool; -pub type CloneValExtern = extern "C" fn(*const ExternContext, *const Value) -> Value; +pub type CloneValExtern = extern "C" fn(*const ExternContext, *const Handle) -> Handle; -pub type DropHandlesExtern = extern "C" fn(*const ExternContext, *const Handle, u64); +pub type DropHandlesExtern = extern "C" fn(*const ExternContext, *const DroppingHandle, u64); -pub type StoreTupleExtern = extern "C" fn(*const ExternContext, *const Value, u64) -> Value; +pub type StoreTupleExtern = + extern "C" fn(*const ExternContext, *const *const Handle, u64) -> Handle; -pub type StoreBytesExtern = extern "C" fn(*const ExternContext, *const u8, u64) -> Value; +pub type StoreBytesExtern = extern "C" fn(*const ExternContext, *const u8, u64) -> Handle; -pub type StoreI64Extern = extern "C" fn(*const ExternContext, i64) -> Value; +pub type StoreI64Extern = extern "C" fn(*const ExternContext, i64) -> Handle; +/// +/// NB: When a PyResult is handed from Python to Rust, the Rust side destroys the handle. But when +/// it is passed from Rust to Python, Python must destroy the handle. +/// #[repr(C)] -#[derive(Debug)] pub struct PyResult { is_throw: bool, - value: Value, + handle: Handle, } impl PyResult { @@ -276,10 +318,11 @@ impl PyResult { impl From for Result { fn from(result: PyResult) -> Self { + let value = result.handle.into(); if result.is_throw { - Err(PyResult::failure_from(result.value)) + Err(PyResult::failure_from(value)) } else { - Ok(result.value) + Ok(value) } } } @@ -289,11 +332,11 @@ impl From> for PyResult { match res { Ok(v) => PyResult { is_throw: false, - value: v, + handle: v.into(), }, Err(msg) => PyResult { is_throw: true, - value: create_exception(&msg), + handle: create_exception(&msg).into(), }, } } @@ -318,8 +361,8 @@ pub enum PyGeneratorResponseType { #[repr(C)] pub struct PyGeneratorResponse { res_type: PyGeneratorResponseType, - values: ValueBuffer, - constraints: ValueBuffer, + values: HandleBuffer, + constraints: HandleBuffer, } #[derive(Debug)] @@ -331,11 +374,12 @@ pub enum GeneratorResponse { GetMulti(Vec), } -// The result of an `identify` call, including the __hash__ of a Value and its TypeId. +/// +/// The result of an `identify` call, including the __hash__ of a Handle and its TypeId. +/// #[repr(C)] pub struct Ident { pub hash: i64, - pub value: Value, pub type_id: TypeId, } @@ -343,49 +387,47 @@ pub struct Ident { /// Points to an array containing a series of values allocated by Python. /// /// TODO: An interesting optimization might be possible where we avoid actually -/// allocating the values array for values_len == 1, and instead store the Value in +/// allocating the values array for values_len == 1, and instead store the Handle in /// the `handle_` field. /// #[repr(C)] -pub struct ValueBuffer { - values_ptr: *mut Value, - values_len: u64, - // A Value handle to hold the underlying buffer alive. - handle_: Value, +pub struct HandleBuffer { + handles_ptr: *mut Handle, + handles_len: u64, + // A Handle to hold the underlying buffer alive. + handle_: Handle, } -impl ValueBuffer { +impl HandleBuffer { pub fn to_vec(&self) -> Vec { - with_vec( - self.values_ptr, - self.values_len as usize, - |value_vec| unsafe { value_vec.iter().map(|v| v.clone_without_handle()).collect() }, - ) + with_vec(self.handles_ptr, self.handles_len as usize, |handle_vec| { + handle_vec + .iter() + .map(|h| Value::new(unsafe { h.clone_shallow() })) + .collect() + }) } - /// Asserts that the ValueBuffer contains one value, and returns it. + /// Asserts that the HandleBuffer contains one value, and returns it. pub fn unwrap_one(&self) -> Value { assert!( - self.values_len == 1, - "ValueBuffer contained more than one value: {}", - self.values_len + self.handles_len == 1, + "HandleBuffer contained more than one value: {}", + self.handles_len ); - with_vec( - self.values_ptr, - self.values_len as usize, - |value_vec| unsafe { value_vec.iter().next().unwrap().clone_without_handle() }, - ) + with_vec(self.handles_ptr, self.handles_len as usize, |handle_vec| { + Value::new(unsafe { handle_vec.iter().next().unwrap().clone_shallow() }) + }) } } // Points to an array of TypeIds. #[repr(C)] -#[derive(Debug)] pub struct TypeIdBuffer { ids_ptr: *mut TypeId, ids_len: u64, - // handle to hold the underlying array alive - handle_: Value, + // A Handle to hold the underlying array alive. + handle_: Handle, } impl TypeIdBuffer { @@ -394,21 +436,26 @@ impl TypeIdBuffer { } } -pub type ProjectIgnoringTypeExtern = - extern "C" fn(*const ExternContext, *const Value, field_name_ptr: *const u8, field_name_len: u64) - -> Value; +pub type ProjectIgnoringTypeExtern = extern "C" fn( + *const ExternContext, + *const Handle, + field_name_ptr: *const u8, + field_name_len: u64, +) -> Handle; -pub type ProjectMultiExtern = - extern "C" fn(*const ExternContext, *const Value, field_name_ptr: *const u8, field_name_len: u64) - -> ValueBuffer; +pub type ProjectMultiExtern = extern "C" fn( + *const ExternContext, + *const Handle, + field_name_ptr: *const u8, + field_name_len: u64, +) -> HandleBuffer; #[repr(C)] -#[derive(Debug)] pub struct Buffer { bytes_ptr: *mut u8, bytes_len: u64, - // A Value handle to hold the underlying array alive. - handle_: Value, + // A Handle to hold the underlying array alive. + handle_: Handle, } impl Buffer { @@ -425,13 +472,18 @@ impl Buffer { } } -// Points to an array of (byte) Buffers. +/// +/// Points to an array of (byte) Buffers. +/// +/// TODO: Because this is only ever passed from Python to Rust, it could just use +/// `project_multi_strs`. +/// #[repr(C)] pub struct BufferBuffer { bufs_ptr: *mut Buffer, bufs_len: u64, - // handle to hold the underlying array alive - handle_: Value, + // A Handle to hold the underlying array alive. + handle_: Handle, } impl BufferBuffer { @@ -460,16 +512,16 @@ impl BufferBuffer { pub type TypeToStrExtern = extern "C" fn(*const ExternContext, TypeId) -> Buffer; -pub type ValToStrExtern = extern "C" fn(*const ExternContext, *const Value) -> Buffer; +pub type ValToStrExtern = extern "C" fn(*const ExternContext, *const Handle) -> Buffer; pub type CreateExceptionExtern = - extern "C" fn(*const ExternContext, str_ptr: *const u8, str_len: u64) -> Value; + extern "C" fn(*const ExternContext, str_ptr: *const u8, str_len: u64) -> Handle; pub type CallExtern = - extern "C" fn(*const ExternContext, *const Value, *const Value, u64) -> PyResult; + extern "C" fn(*const ExternContext, *const Handle, *const *const Handle, u64) -> PyResult; pub type GeneratorSendExtern = - extern "C" fn(*const ExternContext, *const Value, *const Value) -> PyGeneratorResponse; + extern "C" fn(*const ExternContext, *const Handle, *const Handle) -> PyGeneratorResponse; pub type EvalExtern = extern "C" fn(*const ExternContext, python_ptr: *const u8, python_len: u64) -> PyResult; diff --git a/src/rust/engine/src/handles.rs b/src/rust/engine/src/handles.rs index 973b47c819b..c39b4956690 100644 --- a/src/rust/engine/src/handles.rs +++ b/src/rust/engine/src/handles.rs @@ -4,14 +4,62 @@ use std::os::raw; use std::sync::Mutex; -pub type Handle = *const raw::c_void; +use externs; + +pub type RawHandle = *const raw::c_void; + +/// +/// Represents a handle to a python object, explicitly without equality or hashing. Whenever +/// the equality/identity of a Value matters, a Key should be computed for it and used instead. +/// +/// Handle implements Clone by calling out to a python extern `clone_val` which clones the +/// underlying CFFI handle. +/// +#[repr(C)] +pub struct Handle(RawHandle); + +impl Handle { + /// + /// An escape hatch to allow for cloning a Handle without cloning the value it points to. You + /// should generally not do this unless you are certain the input Handle has been mem::forgotten + /// (otherwise it will be `Drop`ed twice). + /// + pub unsafe fn clone_shallow(&self) -> Handle { + Handle(self.0) + } +} + +impl PartialEq for Handle { + fn eq(&self, other: &Handle) -> bool { + externs::equals(self, other) + } +} + +impl Eq for Handle {} + +impl Drop for Handle { + fn drop(&mut self) { + DROPPING_HANDLES + .lock() + .unwrap() + .push(DroppingHandle(self.0)); + } +} + +// By default, a Handle would not be marked Send because of the raw pointer it holds. +// Because Python objects are threadsafe, we can safely implement Send. +unsafe impl Send for Handle {} +unsafe impl Sync for Handle {} const MIN_DRAIN_HANDLES: usize = 256; -// A sendable wrapper around a Handle. -struct SendableHandle(Handle); +/// +/// A Handle that is currently being dropped. This wrapper exists to mark the pointer Send. +/// +#[repr(C)] +pub struct DroppingHandle(RawHandle); -unsafe impl Send for SendableHandle {} +unsafe impl Send for DroppingHandle {} /// /// A static queue of Handles which used to be owned by `Value`s. When a Value is dropped, its @@ -24,28 +72,22 @@ unsafe impl Send for SendableHandle {} /// TODO: This queue should likely move to `core` to allow `enqueue` to be private. /// lazy_static! { - static ref DROPPING_HANDLES: Mutex> = Mutex::new(Vec::new()); -} - -/// -/// Enqueue a handle to be dropped. -/// -pub fn enqueue_drop_handle(handle: Handle) { - DROPPING_HANDLES - .lock() - .unwrap() - .push(SendableHandle(handle)); + static ref DROPPING_HANDLES: Mutex> = Mutex::new(Vec::new()); } /// -/// If an appreciable number of Handles have been queued, drain them. +/// If an appreciable number of Handles have been queued, drop them. /// -pub fn maybe_drain_handles() -> Option> { - let mut q = DROPPING_HANDLES.lock().unwrap(); - if q.len() > MIN_DRAIN_HANDLES { - let handles: Vec<_> = q.drain(..).collect(); - Some(handles.iter().map(|sh| sh.0).collect()) - } else { - None +pub fn maybe_drop_handles() { + let handles: Option> = { + let mut q = DROPPING_HANDLES.lock().unwrap(); + if q.len() > MIN_DRAIN_HANDLES { + Some(q.drain(..).collect::>()) + } else { + None + } + }; + if let Some(handles) = handles { + externs::drop_handles(&handles); } } diff --git a/src/rust/engine/src/interning.rs b/src/rust/engine/src/interning.rs index 3c254ed8e21..f53bc8adca7 100644 --- a/src/rust/engine/src/interning.rs +++ b/src/rust/engine/src/interning.rs @@ -50,7 +50,7 @@ impl Interns { let id_generator = self.id_generator; let key = self .forward - .entry(InternKey(ident.hash, ident.value)) + .entry(InternKey(ident.hash, v.clone())) .or_insert_with(|| { inserted = true; Key::new(id_generator, type_id) diff --git a/src/rust/engine/src/lib.rs b/src/rust/engine/src/lib.rs index 1434debc0e2..d24044a5024 100644 --- a/src/rust/engine/src/lib.rs +++ b/src/rust/engine/src/lib.rs @@ -50,6 +50,7 @@ use externs::{ TypeToStrExtern, ValToStrExtern, }; use futures::Future; +use handles::Handle; use hashing::Digest; use rule_graph::{GraphMaker, RuleGraph}; use scheduler::{ExecutionRequest, RootResult, Scheduler, Session}; @@ -68,9 +69,9 @@ enum RawStateTag { pub struct RawNode { subject: Key, product: TypeConstraint, - // The Value represents a union tagged with RawStateTag. + // The Handle represents a union tagged with RawStateTag. state_tag: u8, - state_value: Value, + state_handle: Handle, } impl RawNode { @@ -92,7 +93,7 @@ impl RawNode { subject: subject.clone(), product: product.clone(), state_tag: state_tag, - state_value: state_value, + state_handle: state_value.into(), } } } @@ -172,13 +173,13 @@ pub extern "C" fn externs_set( } #[no_mangle] -pub extern "C" fn externs_key_for(value: Value) -> Key { - externs::key_for(value) +pub extern "C" fn key_for(value: Handle) -> Key { + externs::key_for(value.into()) } #[no_mangle] -pub extern "C" fn externs_val_for(key: Key) -> Value { - externs::val_for(&key) +pub extern "C" fn val_for(key: Key) -> Handle { + externs::val_for(&key).into() } /// @@ -291,13 +292,13 @@ pub extern "C" fn scheduler_create( } /// -/// Returns a Value representing a tuple of tuples of metric name string and metric value int. +/// Returns a Handle representing a tuple of tuples of metric name string and metric value int. /// #[no_mangle] pub extern "C" fn scheduler_metrics( scheduler_ptr: *mut Scheduler, session_ptr: *mut Session, -) -> Value { +) -> Handle { with_scheduler(scheduler_ptr, |scheduler| { with_session(session_ptr, |session| { let values = scheduler @@ -310,7 +311,7 @@ pub extern "C" fn scheduler_metrics( ]) }) .collect::>(); - externs::store_tuple(&values) + externs::store_tuple(&values).into() }) }) } @@ -371,11 +372,11 @@ pub extern "C" fn tasks_create() -> *const Tasks { #[no_mangle] pub extern "C" fn tasks_singleton_add( tasks_ptr: *mut Tasks, - value: Value, + handle: Handle, output_constraint: TypeConstraint, ) { with_tasks(tasks_ptr, |tasks| { - tasks.singleton_add(value, output_constraint); + tasks.singleton_add(handle.into(), output_constraint); }) } @@ -519,12 +520,9 @@ pub extern "C" fn execution_request_destroy(ptr: *mut ExecutionRequest) { } #[no_mangle] -pub extern "C" fn validator_run(scheduler_ptr: *mut Scheduler) -> Value { +pub extern "C" fn validator_run(scheduler_ptr: *mut Scheduler) -> PyResult { with_scheduler(scheduler_ptr, |scheduler| { - match scheduler.core.rule_graph.validate() { - Ok(_) => externs::store_tuple(&[]), - Err(msg) => externs::create_exception(&msg), - } + scheduler.core.rule_graph.validate().into() }) } @@ -607,9 +605,9 @@ pub extern "C" fn lease_files_in_graph(scheduler_ptr: *mut Scheduler) { #[no_mangle] pub extern "C" fn capture_snapshots( scheduler_ptr: *mut Scheduler, - path_globs_and_root_tuple_wrapper: Value, + path_globs_and_root_tuple_wrapper: Handle, ) -> PyResult { - let values = externs::project_multi(&path_globs_and_root_tuple_wrapper, "dependencies"); + let values = externs::project_multi(&path_globs_and_root_tuple_wrapper.into(), "dependencies"); let path_globs_and_roots_result: Result, String> = values .iter() .map(|value| { @@ -649,10 +647,10 @@ pub extern "C" fn capture_snapshots( #[no_mangle] pub extern "C" fn merge_directories( scheduler_ptr: *mut Scheduler, - directories_value: Value, + directories_value: Handle, ) -> PyResult { let digests_result: Result, String> = - externs::project_multi(&directories_value, "dependencies") + externs::project_multi(&directories_value.into(), "dependencies") .iter() .map(|v| nodes::lift_digest(v)) .collect(); @@ -675,9 +673,9 @@ pub extern "C" fn merge_directories( #[no_mangle] pub extern "C" fn materialize_directories( scheduler_ptr: *mut Scheduler, - directories_paths_and_digests_value: Value, + directories_paths_and_digests_value: Handle, ) -> PyResult { - let values = externs::project_multi(&directories_paths_and_digests_value, "dependencies"); + let values = externs::project_multi(&directories_paths_and_digests_value.into(), "dependencies"); let directories_paths_and_digests_results: Result, String> = values .iter() .map(|value| { diff --git a/src/rust/engine/src/nodes.rs b/src/rust/engine/src/nodes.rs index ed61aca93a8..b1c83ddf63b 100644 --- a/src/rust/engine/src/nodes.rs +++ b/src/rust/engine/src/nodes.rs @@ -14,8 +14,8 @@ use context::{Context, Core}; use core::{throw, Failure, Key, Noop, TypeConstraint, Value, Variants}; use externs; use fs::{ - self, Dir, File, FileContent, GlobMatching, Link, PathGlobs, PathStat, StoreFileByDigest, - StrictGlobMatching, VFS, + self, Dir, DirectoryListing, File, FileContent, GlobMatching, Link, PathGlobs, PathStat, + StoreFileByDigest, StrictGlobMatching, VFS, }; use hashing; use process_execution::{self, CommandRunner}; @@ -47,12 +47,12 @@ fn was_required(failure: Failure) -> Failure { } impl VFS for Context { - fn read_link(&self, link: Link) -> NodeFuture { - self.get(ReadLink(link)).map(|res| res.0).to_boxed() + fn read_link(&self, link: &Link) -> NodeFuture { + self.get(ReadLink(link.clone())).map(|res| res.0).to_boxed() } - fn scandir(&self, dir: Dir) -> NodeFuture> { - self.get(Scandir(dir)).map(|res| res.0).to_boxed() + fn scandir(&self, dir: Dir) -> NodeFuture> { + self.get(Scandir(dir)) } fn is_ignored(&self, stat: &fs::Stat) -> bool { @@ -257,8 +257,12 @@ impl Select { } } - fn snapshot(&self, context: &Context, entry: &rule_graph::Entry) -> NodeFuture { - let edges = &context + fn snapshot( + &self, + context: &Context, + entry: &rule_graph::Entry, + ) -> NodeFuture> { + let edges = context .core .rule_graph .edges_for_inner(entry) @@ -269,7 +273,7 @@ impl Select { context.core.types.path_globs.clone(), self.subject, self.variants.clone(), - edges, + &edges, ).run(context.clone()) .and_then(move |path_globs_val| context.get(Snapshot(externs::key_for(path_globs_val)))) .to_boxed() @@ -613,20 +617,17 @@ impl From for NodeKey { #[derive(Clone, Debug, Eq, Hash, PartialEq)] pub struct Scandir(Dir); -#[derive(Clone, Debug, Eq, PartialEq)] -pub struct DirectoryListing(Vec); - impl WrappedNode for Scandir { - type Item = DirectoryListing; + type Item = Arc; - fn run(self, context: Context) -> NodeFuture { + fn run(self, context: Context) -> NodeFuture> { let dir = self.0.clone(); context .core .vfs .scandir(&self.0) .then(move |listing_res| match listing_res { - Ok(listing) => Ok(DirectoryListing(listing)), + Ok(listing) => Ok(Arc::new(listing)), Err(e) => Err(throw(&format!("Failed to scandir for {:?}: {:?}", dir, e))), }) .to_boxed() @@ -750,13 +751,14 @@ impl Snapshot { } impl WrappedNode for Snapshot { - type Item = fs::Snapshot; + type Item = Arc; - fn run(self, context: Context) -> NodeFuture { + fn run(self, context: Context) -> NodeFuture> { let lifted_path_globs = Self::lift_path_globs(&externs::val_for(&self.0)); future::result(lifted_path_globs) .map_err(|e| throw(&format!("Failed to parse PathGlobs: {}", e))) .and_then(move |path_globs| Self::create(context, path_globs)) + .map(Arc::new) .to_boxed() } } @@ -1063,10 +1065,10 @@ impl NodeError for Failure { #[derive(Clone, Debug, Eq, PartialEq)] pub enum NodeResult { Digest(hashing::Digest), - DirectoryListing(DirectoryListing), + DirectoryListing(Arc), LinkDest(LinkDest), ProcessResult(ProcessResult), - Snapshot(fs::Snapshot), + Snapshot(Arc), Value(Value), } @@ -1076,8 +1078,8 @@ impl From for NodeResult { } } -impl From for NodeResult { - fn from(v: fs::Snapshot) -> Self { +impl From> for NodeResult { + fn from(v: Arc) -> Self { NodeResult::Snapshot(v) } } @@ -1100,8 +1102,8 @@ impl From for NodeResult { } } -impl From for NodeResult { - fn from(v: DirectoryListing) -> Self { +impl From> for NodeResult { + fn from(v: Arc) -> Self { NodeResult::DirectoryListing(v) } } @@ -1148,7 +1150,7 @@ impl TryFrom for Value { } } -impl TryFrom for fs::Snapshot { +impl TryFrom for Arc { type Err = (); fn try_from(nr: NodeResult) -> Result { @@ -1192,7 +1194,7 @@ impl TryFrom for LinkDest { } } -impl TryFrom for DirectoryListing { +impl TryFrom for Arc { type Err = (); fn try_from(nr: NodeResult) -> Result { diff --git a/tests/python/pants_test/engine/test_rules.py b/tests/python/pants_test/engine/test_rules.py index e796dfa3319..26caf98db96 100644 --- a/tests/python/pants_test/engine/test_rules.py +++ b/tests/python/pants_test/engine/test_rules.py @@ -76,7 +76,7 @@ class RulesetValidatorTest(unittest.TestCase): def test_ruleset_with_missing_product_type(self): rules = _suba_root_rules + [TaskRule(A, [Select(B)], noop)] - with self.assertRaises(ValueError) as cm: + with self.assertRaises(Exception) as cm: create_scheduler(rules) self.assert_equal_with_printing(dedent(""" @@ -88,7 +88,7 @@ def test_ruleset_with_missing_product_type(self): def test_ruleset_with_rule_with_two_missing_selects(self): rules = _suba_root_rules + [TaskRule(A, [Select(B), Select(C)], noop)] - with self.assertRaises(ValueError) as cm: + with self.assertRaises(Exception) as cm: create_scheduler(rules) self.assert_equal_with_printing(dedent(""" @@ -110,7 +110,7 @@ def test_ruleset_with_superclass_of_selected_type_produced_fails(self): TaskRule(B, [Select(SubA)], noop) ] - with self.assertRaises(ValueError) as cm: + with self.assertRaises(Exception) as cm: create_scheduler(rules) self.assert_equal_with_printing(dedent(""" Rules with errors: 2 @@ -135,7 +135,7 @@ def test_ruleset_with_failure_due_to_incompatible_subject_for_singleton(self): SingletonRule(B, B()), ] - with self.assertRaises(ValueError) as cm: + with self.assertRaises(Exception) as cm: create_scheduler(rules) # This error message could note near matches like the singleton. @@ -155,7 +155,7 @@ def test_not_fulfillable_duplicated_dependency(self): TaskRule(A, [Select(SubA)], noop) ] - with self.assertRaises(ValueError) as cm: + with self.assertRaises(Exception) as cm: create_scheduler(rules) self.assert_equal_with_printing(dedent("""