Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Assess integrating ownership #77

Open
frankmcsherry opened this issue Aug 11, 2017 · 26 comments
Open

Assess integrating ownership #77

frankmcsherry opened this issue Aug 11, 2017 · 26 comments

Comments

@frankmcsherry
Copy link
Member

frankmcsherry commented Aug 11, 2017

Timely dataflow exclusively moves owned data around. Although operators are defined on &Stream references, the closures and such they take as arguments can rely on getting buffers of owned data. The map operator will, no matter what, be handed owned data, and if this means that the contents of a stream need to be cloned, timely will do that.

This seems a bit aggressive in cases where you want to determine the length of some strings, or where you have a stream of large structs (e.g. 75 field DB records) and various consumers want to pick out a few fields that interest them.

It seems possible (though not obviously a good idea) that this ownership could be promoted so that timely dataflow programmers can control cloning and such by having

  1. Owned streams, Stream<Data> with methods that consume the stream and provide owned elements of type Data.

  2. Reference streams, &Stream<Data> whose methods provide only access to &Data elements, which the observer could then immediately clone if they want to reconstruct the old behavior. Maybe a .cloned() method analogous to Iterators .cloned() method?

I think this sounds good in principle, but it involves a lot of careful detail in the innards of timely. Disruption is fine if it fixes or improves things, but there are several consequences of this sort of change. For example,

  1. Right now Exchange pacts require shuffling the data, and this essentially requires owned data, when we want to shuffle to other threads in process. It does not require it for single thread execution (because the exchange is a no-op) nor for inter-process exchange (because we serialize the data, which we can do from a reference).

  2. Similarly, exchanged data emerges as "unowned" references when it comes out of Abomonation. Most operators will still hit this with a clone, though for large value types the compiler can in principle optimize this down. To determine the length of a String, the code would almost certainly still clone the string to check its length (it does this already, so this isn't a defect so much as a missable opportunity).

  3. Anything we do with references needs to happen as the data are produced at their source, or involve Rc<_> types wrapping buffers and preventing the transfer of owned data. This is because once we enqueue buffers for other operators, we shift the flow of control upward and lose an understanding of whether the references remain valid. This could be fine, because we can register actions the way we currently register "listeners" with the Tee pusher, but would need some thought.

  4. Operators like concat currently just forward batches of owned data. If instead they wanted to move references around, it seems like they would need to act as a proxy for any interested listeners, forwarding their interests upstream to each of their sources (requiring whatever the closure used to handle data be cloneable or something, if we want this behavior).

  5. Such a "register listeners" approach could work very well for operators like map, filter, flat_map and maybe others, where their behavior would be to wrap a supplied closure with another closure, so that a sequence of such operators turn in to a (complicated) closure that the compiler could at least work to optimize down.

Mostly, shifting around where computation happens, as well as whether parts of the timely dataflow graph are just fake elements that are actually a stack of closures, is a bit of a disruption for timely dataflow, but it could be that doing it sanely ends up with a system where we do less copying, more per-batch work with data in the cache, eager filtering and projection, and lots of other good stuff.

@frankmcsherry
Copy link
Member Author

One concrete option is to put this as a v.next goal, and use it as motivation to cut a sane version of timely that we can leave out there before breaking all sorts of things.

@frankmcsherry
Copy link
Member Author

frankmcsherry commented Aug 11, 2017

Historical note: this is almost how timely used to work, back when it was just getting off the ground. If you dig into the operator construction that unary and binary do, they take a Stream as an argument and register a "pusher" with its Tee pusher. The Tee pusher has a list of boxed "things to push batches of owned data at", which used to be either give or show, based on whether ownership transferred or not, respectively.

It wouldn't be too hard to sneak this back in, if only as a hack, by enriching the Tee feature set to allow show again, and rewriting filter and map to sneak custom pushers into place, rather than building operators with queues.

There is some technical complexity to worry about, in that the progress tracking logic currently assumes (and should probably be corrected) that all uses of an output produce the same number of messages. If we sneak a filter on to one of the outputs, or worse a flat_map, we have to worry about whether we've violated that assumption (for filter we could send filtered messages, empty or not, and filter_map could send epically sized message, which is just a bad idea). Alternately, we could leave virtual nodes in the timely dataflow graph whose role is just to resolve this issue, producing the appropriate progress messages but doing no other work.

@milibopp
Copy link
Contributor

I like the idea, as it moves timely closer to Rust's ideal of providing zero-cost abstractions. I have to admit that I struggled making a similar transition, while working on carboxyl – it still clones a lot internally. All the lifetime handling suddenly becomes much harder to get right. However, once it's done it should allow to write more efficient code.

@frankmcsherry
Copy link
Member Author

I've pondered this a bit more, with my hands in the guts for other reasons, and here are some thinkings:

The main place that ownership is mis-used is in Tee, where multiple listeners have expressed an interest in owned data by registering a "pusher" with a Stream. These pushers are boxed and put in a now-homogenous list of Box<Push<(T, Content<D>)>>. As batches of messages arrive, they are cloned and handed out, with the last listener getting the original owned data.

It seems totally reasonable to have one listener who gets owned data, and an arbitrary number of listeners who implement a different trait (rather than Push<(T, Content<D>)>) who get shown references to data. Anyone who needs owned data can be adapted pretty easily (by simply cloning the data), and folks who can take advantage of the references could tap in earlier and avoid full clones (e.g. a filter_ref which clones subsets, or a map_ref that only clones the relevant fields, etc). Also, if we have an exchange edge downstream it could be that most of the data is just going to get serialized, and a reference is fine for that (if we can do streaming serialization; it is done in batches at the moment).

Another advantage to this is that we could have the type of the owning listener as part of the Tee, so that we don't need a virtual call or anything like that, especially in the common case of single consumer.

We could also try and be fancier and have two types of Tee: if no reference listeners have attached, we don't need to do any batching before the owned listener; it was only needed to amortize the overhead of the virtual calls. In principle, a Tee that does not permit reference listeners could skip buffering, inline the listener, and potentially avoid a round of copies (e.g. in an exchange, where we never need the input in batches, as we build our own output batches).

I think the two types of Tee could totally happen with some ergonomic horror: imagine each method on streams takes a stream as input and returns two streams as output, the original and the new output. This would allow each operator to change the type of a stream. Or we could have a .share() method which essentially adds a Tee and changes the type to whatever the reference folks want to use. Both of these would be disruptive breaking changes, but I could imagine that e.g. the pagerank example gets even faster with fewer copies.

@milibopp
Copy link
Contributor

milibopp commented Sep 9, 2017

A general question about this: at the moment, I have not really processed any large data. However, it is quite possible I might end up with allocations of a couple of megabytes in dataflow streams. An extreme case would be a data cube with a volume of, say, 128^3 elements each containing a few dozen bytes or so.

Thus, eventually I really want to make sure there is no cloning, unless I say so in the code. Can this be entirely controlled under this proposal or would there be any points left, where data is cloned implicitly?

@frankmcsherry
Copy link
Member Author

frankmcsherry commented Sep 9, 2017

At the moment the only time you should experience clones is if you re-use a stream of data. That is, if you have

let stream_of_big_things = ...;

stream_of_big_things
    .map( ... )
    .other_stuff( ... );

stream_of_big_things
    .different_stuff( ... );

In this case, your data would be cloned by timely. But if you comment out either of the uses (leaving just one) there shouldn't be any clones.

We would have to talk about serialization and stuff like that, in which case there will be some effective copying when the serialization and possible deserialization happens.

In this proposal (that of the issue) I would love to remove the Clone requirement from the Data trait, and impose the requirement that people explicitly clone things. I'm not sure how to do this ergonomically, as one consumer of the stream can take ownership of the data, and the others can only get a reference, which feels like it means either (i) requiring different methods that break the illusion, or (ii) putting lots of magic in place.

Edit: Yup, just checked and if I remove the Clone requirement, the only thing that breaks is the implementation of the Push trait for the Tee type, which is the place where a stream is allowed to be re-used.

Edit 2: There is also a Clone requirement on the implementation of DerefMut for the type wrapping serialized data, as the only way it knows how to go from a &T (which abomonation provides) to a T is by cloning. This could be fine if you just want to look at serialized data (e.g. a &BigData is fine), but if you want to take ownership of it, the only way that we can (currently) guarantee this happens when you have serialized data is with clone.

@milibopp
Copy link
Contributor

milibopp commented Sep 9, 2017

Okay, so while one has to be careful, it is possible to avoid the clones today. That's good to know.

Serialization is a different story. Usually with problems like this, either way you try to keep as much data local as possible and keep the communication to other workers at a minimum. Does serialization happen on its own without calls to exchange and such?

About ergonomics, it seems similar to an iterator, does it not? One consumer being able to take ownership sounds like passing the stream by value (like map, filter on iterators). If you can only get a reference without consuming the upstream stream, just use a method like as_ref on Option perhaps fn as_ref<'a>(&'a self) -> Stream<G, &'a T>.

I am sure, there are some implementation details, that might make that change tricky. But in an ideal world it would be awesome to simply use the same kind of API as an iterator.

Is it possible to help out with experimentation or implementation in a useful way?

@frankmcsherry
Copy link
Member Author

frankmcsherry commented Sep 9, 2017

I think it could end up totally tolerable, but it breaks an abstraction barrier and asks users to be aware of whether they are the final consumer of a stream or not. Proposal later on in this comment.

First, though: I think it is a bit different from iterators, in that if you call e.g. my_vec.iter(), which gives you an iterator over references, that iterator needs to be dropped before you can move ownership with .into_iter() or the like. Timely streams are setting up computation that will be long-lived, and while we can show produced data to immediate consumers, it isn't clear that we can propagate it along chains the way iterators can easily do.

In particular, if you hand out a stream of &'a T that means that for any datum T, consumers of stream of references need to be run to completion before we can run the consumer of the actual data. This isn't hard to do "one hop", but if you have a pipeline of anything other than an iterator, there would be a coordination issue to respect Rust's borrowing semantics.


Instead, I think there is a pretty easy stop-gap (which may address most cases).

Right now there are lots of methods on streams that take a &Stream argument, and use Stream::connect_to, which takes an arbitrary P: Push thing that accepts owned Vec<D> data and adds it (the P thing) to a list. We can change these to take a Stream argument instead, which means that each one can only be called once, consuming the stream.

At the same time, we can add a new method on &Stream which looks like

fn flat_map_ref<I, F>(stream: &Stream<G,D>, func: F) -> Stream<G, I::Item>
where
    I: IntoIterator,
    I::Item: Data,
    F: Fn(G::Timestamp, &[D])->I;

which allows pretty much anyone to look at stream and produce whatever iterator of data they want from the references they see.

Perhaps we add convenience methods like cloned() analogous to the iterator method that just clones underlying references and gives back a new owned stream that you can take advantage of, but which requires D: Clone. If you'd like to tap in more directly and just work off of the references themselves (perhaps you only want to pull out boundaries of the cube, for example), you can do that too.


Changing most of the methods to take a self argument would be massively breaking, which .. I don't really mind but we want to do carefully, but adding in the cloned and flat_map_ref methods should be backward compatible. We could start with those, and the guarantee that if you program as if the other methods were self consuming, then we won't do clones.

I do really like the idea that timely exposes ownership and lets you be certain about it, even if it means that you occasionally have to write

let stream_of_big_things = ...;

stream_of_big_things
    .cloned()
    .map( ... )
    .other_stuff( ... );

stream_of_big_things
    .different_stuff( ... );

which, doesn't really seem all that horrible written like that.

@frankmcsherry
Copy link
Member Author

frankmcsherry commented Sep 9, 2017

Does serialization happen on its own without calls to exchange and such?

Right now it is done automatically, but only when data move between processes (that is, only for operators that require the Exchange pact, including exchange but also some others). It is an interesting question if it should happen more often (e.g. between NUMA regions), but I suspect we won't want to bake that in ever, for reasons like yours.

There is a separate trait, ExchangeData, required by operators that need to be able to exchange data between workers. The normal trait, Data, has fewer requirements (e.g. does not include Send). If an operator only requires Data, it shouldn't serialize data.

Is it possible to help out with experimentation or implementation in a useful way?

What do you think of me prototyping cloned() and flat_map_ref(), and you telling me if that is a horrible interface? I think these two get pretty close to the intended experience (at least, as I was imagining it). Alternately, if you think they suck and have a different API in mind, we could try and hash out whether we could get the lifetimes to work with guarantees about what executes when.

For example, even with flat_map_ref there might be some questions about whether I:'static, so that we can capture the iterator and hold it, or whether we should play out the iterator completely in the course of processing the batch of &D.

@frankmcsherry
Copy link
Member Author

frankmcsherry commented Sep 9, 2017

Independently, it is a fine question to ask: "could we have a stream of &D types?".

I think this gets into some sticky details, where the operators that can use these references will be fairly restricted, and probably couldn't be able to stash the references for example. But I could imagine you wanting to define a dataflow fragment that can observe and fully retire a bunch of reference, with the expectation that batch-by-batch it runs to completion before ownership is handed of to the consumer of the stream.

I'm not clear if there are dataflow fragments that (i) make sense, and (ii) can't be expressed with something like flat_map_ref. Most Rust iterators are relatively simple, and we could try and give a programming experience that presented itself as an iterator that you work with. In the flat_map_ref world, that would be inside the closure you pass to flat_map_ref, like

stream.flat_map_ref(|time, data|
    data.iter()
        .filter(...)
        .map(...)
        .cycle()
        .take(100)
        .zip(0 .. 100)
);

but if there are better idioms, we could try and present them. I don't know that we'll actually be able to expose an actual iterator type that you act on, because we need to eventually go monadic on it and turn it back to a stream.

@milibopp
Copy link
Contributor

milibopp commented Sep 9, 2017

What do you think of me prototyping cloned() and flat_map_ref(), and you telling me if that is a horrible interface? I think these two get pretty close to the intended experience (at least, as I was imagining it). Alternately, if you think they suck and have a different API in mind, we could try and hash out whether we could get the lifetimes to work with guarantees about what executes when.

Sounds good. Happy to provide feedback. The filter_map_ref sounds like it could solve most practical problems.

The question of non-'static streams still may be relevant though, I think. Continuing with the large data cube use case, you mentioned just sending boundaries, which is exactly what needs to happen. While I think it would generally be tolerable to clone just a small subset of the data, it would likely still benefit the scaling efficiency of a particular algorithm on a data set of a given size not to.

To be more concrete, it could look like this with cloning:

struct Cube {...};
struct Boundary {...};
fn boundaries(cube: &Cube) -> Boundary { /* copies data internally */ }

stream
    .flat_map_ref(|_time, data| data.iter().map(boundaries))
    .exchange(|boundary| neighbour_index(boundary))

And I think this is fine already and everything that comes now, may be me getting carried away with premature optimization…

That said, if cloning could be avoided, then it may look like this:

struct Boundary<'a> {...};
fn boundaries<'a>(cube: &'a Cube) -> Boundary<'a> { /* borrows */ }

The same streaming code as above would then produce a Stream<_, Boundary<'a>>. Now that I've spelled it out, it seems quite strange, as there is no plausible way to serialize and deserialize something with a lifetime on it. After all, what would it borrow from on the receiving end?

There probably still is a case for being able to send part of a data structure over the network without copying and only materializing it into an owned structure on the receiving end. But instead of streams containing non-'static entities, it may be more to solve this with a specialized operator and a more involved serialization method. I guess one would have to express a relation between two datatypes like &'a [T] and Vec<T> via a trait (similar to Abomonation) that describes that you can serialize &'a [T] to some binary data, from which Vec<T> can be built.

It sounds fairly complicated to build this and implementing that new trait for users probably will be fairly unsafe… Perhaps we should not bother too much. Still I'm curious what you think about this.

@frankmcsherry
Copy link
Member Author

frankmcsherry commented Sep 9, 2017

Now that I've spelled it out, it seems quite strange, as there is no plausible way to serialize and deserialize something with a lifetime on it. After all, what would it borrow from on the receiving end?

Ahaha. You can totally do this with Abomonation. :) (( unfun caveat: would only be safe if you use lifetimes bound by that of the &mut [u8] you are deserializing from, or if you wait for the Abomonation<'a> trait that will happen as soon as anyone needs to deserialize types with borrows in them. ))

But I think realistically, as soon as you hit an exchange, you are going to want to transfer ownership. If the Boundary<'a> goes to a different (peer) worker, on a different machine or even a different thread, it will be hard to maintain the lifetime property.

To flesh out your example a bit, do you imagine wanting:

struct Cube {...};
struct BoundaryRef<'a> {...};

cube_stream
    .as_ref()
    .map(|x: &Cube| BoundaryRef::from(x))
    .do_some_stuff()
    .maybe_eventually_exchange();

cube_stream
    .take_ownership()
    .exchange(|boundary| neighbour_index(boundary))

The do_some_stuff part is what I'm not clear is (i) easily supported, and (ii) much distinct from

cube_stream
    .flat_map_ref(|cube_refs|
        cube_refs
            .map(|cube_ref| BoundaryRef::from(cube_ref))
            .do_some_stuff()
    )
    .maybe_eventually_exchange();

I could imagine some do_some_stuff involving timely operators, but if we impose the constraint "must be able to run to completion on each batch", we probably get something that looks a bit like an iterator?

@frankmcsherry
Copy link
Member Author

There probably still is a case for being able to send part of a data structure over the network without copying and only materializing it into an owned structure on the receiving end. But instead of streams containing non-'static entities, it may be more to solve this with a specialized operator and a more involved serialization method. I guess one would have to express a relation between two datatypes like &'a [T] and Vec via a trait (similar to Abomonation) that describes that you can serialize &'a [T] to some binary data, from which Vec can be built.

For what it is worth, the way this works at the moment is that you can serialize anything implementing Abomonation, which will require a copy (of bytes, into buffers pointed at the network). In principle you could do this with a reference &T as that is all serialization requires, but in practice most serialization is just on the other end of an exchange, and it wants owned data because it puts the data in buffers first (so the lifetime of the exchanger is bound by that of the types it exchanges).

On the receive side things are a bit better. When you deserialize a T you actually just get a &T after some light pointer manipulation (imagine: std::mem::transmute), which means you are at liberty to do whatever cleverness you want here. You could call clone to get a T, or you could do whatever read-only work you need without copying/cloning. This isn't the best time to take a boundary, though, as you've already sent the whole cube in that case.

@frankmcsherry
Copy link
Member Author

frankmcsherry commented Sep 9, 2017

I could totally imagine providing an exchange_serialized method that acts on a &Stream and produces a shuffled stream of serialized data. This works because serialization only needs references (and can't make use of ownership) and produces owned byte arrays (that look like references).

This removes the opportunistic ability to exploit ownership and avoid serialization when within a process (e.g. if you are running at a smaller scale), but ensures that you don't do a clone just to serialize the data (and then drop the clone).

I could also imagine something like flat_map_ref for the other side, where by only providing references as closure arguments it can ensure that it will not do a clone post-deserialization. Right now a map post-exchange will automatically clone so that it can hand you owned data.

It isn't conceptually hard to do this, but it does break a few abstractions and makes you wish things were clearer. E.g. when you write your map, should you expect owned data or borrowed data? In each case you give up something if you received a different input (e.g. you got a reference expecting owned and must clone, or you got owned data expecting a reference and cannot take ownership). It would be interesting to think about Cow, or similar "maybe owned, maybe borrowed" abstractions, to see if it could be easy to write code that can accept either, and exploit either, without having to know which communication strategy the system has chosen.

Edit: Right now you can do this in custom timely operators by doing a match on the type of Content<D> you receive; it will either be owned or a wrapper around a Vec<u8>. So, if your first touch after the exchange is hand-written, you should be able to do pretty much whatever. No solution at the moment for the pre-exchange side, though.

@milibopp
Copy link
Contributor

milibopp commented Sep 9, 2017

Ahaha. You can totally do this with Abomonation. :)

Okay, I am starting to understand the scary name :)

To flesh out your example a bit, do you imagine wanting: [...]
The do_some_stuff part is what I'm not clear is (i) easily supported, and (ii) much distinct from [...]

No, as I said, for this case the flat_map_ref approach is more than fine. Leveraging iterators like that is actually a big help, as it allows pushing code inside the stream abstraction. From working on reactive streams I found, that it usually makes sense to keep the stream transformations at the outer layer of an application. You just write as much conventional code as possible. Simply because it gets you into familiar territory, avoids computational and mental overhead. Plus, the monadic nature of streams can become a bit of maintenance burden sometimes.

@milibopp
Copy link
Contributor

milibopp commented Sep 9, 2017

This removes the opportunistic ability to exploit ownership and avoid serialization when within a process (e.g. if you are running at a smaller scale), but ensures that you don't do a copy just to serialize the data (and then drop the copy).

It's hard to tell this without benchmarking, but I could well imagine that this ability is worth more than avoiding the clone for serialization. In a typical HPC setup you have single nodes with dozens of CPU cores and you likely want to use multi-threading on each node. So communication between processes should be somewhat rarer than communication within a process.

It would be interesting to think about Cow, or similar "maybe owned, maybe borrowed" abstractions, to see if it could be easy to write code that can accept either, and exploit either, without having to know which communication strategy the system has chosen.

This sounds more promising. Maybe combine exchange and map using cows like so:

fn exchange_cow<R, L, T, U, V>(stream: &Stream<G, T>, route: R, logic: L) -> Stream<G, V>
    where R: Fn(&T) -> (u64, &U),
          L: Fn(Cow<U>) -> V,
          U: ExchangeData, T: Data, V: Data

On the sending worker, with route, you determine the index and borrow some data. And then on the receiving end, in logic, you are free to just take a reference, if that is enough or conditionally clone, if you are on the same thread, if you need ownership.

Could this be implemented as a custom operator in today's timely?

More radically, it would still be useful to just leave out logic and write it like this:

fn exchange_cow<'a, R, T, U>(stream: &'a Stream<G, T>, route: R) -> Stream<G, Cow<'a, U>>
    where R: Fn(&T) -> (u64, &U),
          T: ExchangeData, U: Data

But here goes the non-'static stream again…

@milibopp
Copy link
Contributor

milibopp commented Sep 9, 2017

I could also imagine something like flat_map_ref for the other side, where by only providing references as an argument it ensures that it will not do a clone post-deserialization. Right now a map post-exchange will automatically clone so that it can hand you owned data.

I am starting to see the complexity here. flat_map_ref on the receiving side cannot quite solve it, as I understand it, because there will be no way to combine the reference with data from another stream without cloning.

What I am trying to get to, is roughly like this:

fn combine<G: Scope>(Stream<G, Cube>, Stream<G, Boundary>) -> Stream<G, Cube> { ... }

let cube: Stream<G, Cube> = ...;
let boundaries = cube.map(get_boundary).exchange(route);
let new_cube = combine(cube, boundaries);

I guess, what I am worried about is that I can write the non-stream logic for combine here as an fn(&Cube, &Boundary) -> Cube or more generally fn(&[Cube], &[Boundary]) -> Cube to handle multiple events per timestamp – in my case the [Cube] instance will always have length one, but we need to handle this more generally.

Anyway, the essence is, I can take the exchanged data by reference for my domain logic. By the time the data has been exchanged, the clone has already happened though, so this does not buy me anything.

The kind of operator that I would need to avoid the clones is probably like this:

fn exchange_and_combine<G: Scope, T: Data, U: ExchangeData, V: Data>(
    stream: &Stream<G, T>,
    route_and_borrow: impl Fn(&T) -> (u64, &U),
    combine: impl Fn(&[T], Cow<[U]>) -> V
)
    -> Stream<G, V>

(impl Trait notation here is not required, but makes it more readable)

The more I think about this, the more feasible it seems to write custom operators like above. The open question is whether I can exploit the cow semantics that way.

PS: I hope I am not making this too much about my own use case, but I feel there is a general concern to be addressed here.

@frankmcsherry
Copy link
Member Author

frankmcsherry commented Sep 9, 2017

Let's look at your example and see what I would imagine doing:

n combine<G: Scope>(Stream<G, Cube>, Stream<G, Boundary>) -> Stream<G, Cube> { ... }

let cube: Stream<G, Cube> = ...;
let boundaries = cube.map(get_boundary).exchange(route);
let new_cube = combine(cube, boundaries);

I think what you would probably go for is replacing the map(get_boundary) with flat_map_ref(get_boundary), where get_boundary takes a &Cube as input and produces an owned Boundary as output. It means copying a Boundary's amount of data, but you are going to do it again in just a moment in exchange one way or the other (if within process, you'll need an owned copy to send to other workers; if between processes you'll need to serialize, and perhaps might fairly lament the double copy but you will need at least one).

I think it will be tricky to avoid a Boundary clone or copy, if you want to exchange boundary information with other workers that are not coordinated. Especially if your plan is to update the Cube objects soon thereafter by mutating their backing memory (which we would not ever clone).

Edit: I may have misunderstood. Based on the signature fn(&[Cube], &[Boundary]) -> Cube, it looks like you might generate new Cubes from immutable old cubes. I was imagining what you actually wanted to do was

// update in place
fn(&mut Cube, &[Boundary]) 

which would make it more clear that we would need to copy some Boundarys. If you really plan on keeping old Cubes around immutable, my claim about "needing to do a boundary copy" is less defensible.

PS: I hope I am not making this too much about my own use case, but I feel there is a general concern to be addressed here.

No, this is great. A variety of requirements are helpful!

@milibopp
Copy link
Contributor

milibopp commented Sep 9, 2017

It means copying a Boundary's amount of data, but you are going to do it again in just a moment in exchange one way or the other (if within process, you'll need an owned copy to send to other workers; if between processes you'll need to serialize, and perhaps might fairly lament the double copy but you will need at least one).

I think it will be tricky to avoid a Boundary clone or copy, if you want to exchange boundary information with other workers that are not coordinated. Especially if your plan is to update the Cube objects soon thereafter by mutating their backing memory (which we would not ever clone).

Ah, yes, there is a trade-off here, even in an ideal case. When I make a new Cube instead of mutating the original's memory, I should be able to avoid the copy within process though, right? The extra allocation would not be too bad, if I have to compute every element anyway.

Workers not being coordinated means, that the computation happens in detached background threads? I imagine, it would be hard to use scoped threads instead, would it not?

@frankmcsherry
Copy link
Member Author

When I make a new Cube instead of mutating the original's memory, I should be able to avoid the copy within process though, right?

Well, the clone isn't much more expensive than allocating a new Cube (that plus memcpy). If you used &mut Cube you would have neither Cube copies nor allocations, which strikes me as pretty handy, unless it is much harder to write the Fn(&mut Cube, &[Boundary]) logic than the Fn(&Cube, &[Boundary]) -> Cube logic.

I guess I would say, if you are allocating a new Cube you are already in clone territory.

Workers not being coordinated means, that the computation happens in detached background threads, whose scope is not coupled to anything. I imagine, it would be hard to use scoped threads instead?

Yes, scoped threads make little sense because the scope that (I think) you are hoping for is somehow based on a borrow of some Cubes, and we don't plan on running the worker threads to completion for each message (the unit of "borrow").

In the dataflow setting, relatively little of the data are rooted on some common stack, but rather are rooted in the stacks of the worker threads. A worker with a Cube wants (in timely) to be free to process it, move it along, drop it, whatever without being blocked by all other outstanding processes that might have a borrow on it. I think if you want that, Arc<Cube> may be a better abstraction, but I also suspect that you probably don't want that (i.e. a boundary copy is probably a lot cheaper, if I understand things correctly).

@milibopp
Copy link
Contributor

milibopp commented Sep 9, 2017

Okay, makes sense. I guess I am underestimating the cost of allocation there. Well, the clones are probably not as bad, as I think. ;)

@frankmcsherry
Copy link
Member Author

frankmcsherry commented Sep 13, 2017

I have some more observations about ownership and cloning in particular. I've started playing around with this in a branch, and there are some other places where clone was used, and I'll see if I can explain them and whether it's workable or not (they were missed because they get noticed in a different rustc pass).

  1. Tee is the main place that clone was used. This is where multiple streams could "tee" off of one output stream. I swapped this over to multiple "observers" who get to see references and not take ownership, where one can add an ownership-taking observer if the data implement clone (we clone the data, and then hand it to them). There is an unpleasant aspect here that the old version re-used buffers a lot better, in that it cloned into the same backing buffer and re-used it for each consumer. It is a bit more horrible now because the cloning happens further away, but maybe it can be fixed.

  2. Input clones data when the same input is connected to multiple dataflows. This ability could just be removed. I can't remember exactly why it exists, other than that it was an easy way of doing InputHandle::to_stream; that is, mutating rather than consuming the handle, and so having to deal with the possibility it is called more than once. This could be a run-time error ("can't use twice") or we could revert to "you get the handle only when you call to_stream".

  3. Both capture and replay want to clone, because: (i) capture needs access to a Vec<D> rather than a Content<D> but is only provided the former. This comes down to "deserialization requires Clone", but we could also make the Event::Message variant wrap a Content<D> rather than a Vec<D>. (ii) replay uses clone because EventIterator provides references to events, rather than owned events, which it does so that multiple folks can "listen" to the same stream. The replay case seems "harder", in that we would need to push ownership up to the replayable streams too, perhaps again having the two cases of "reference to data" and "ownership of data".

  4. Various operators require access to clone because they use DerefMut, to deal with the receipt of serialized data. E.g. map provides owned data to its closure, but if it receives serialized data it needs to do something like clone.

There are possibly some other cases hiding out there. Removing Clone is a bit less simple than I had imagined, largely due to possibility of serialized data. Arguably Abomonation is a pretty light touch in that it doesn't require clone, but somewhere along the way we need to be able to produce a new instance of data where we didn't previously have one.


One first step might be to try and make Stream generic in the representation of the data, in addition to the data itself. So we might have Stream<G, Vec<D>> and Stream<G, Content<D>>, where the former wouldn't need to be serializable (nor Clone), but wouldn't be appropriate for e.g. inter-process exchange.

@frankmcsherry
Copy link
Member Author

Another observation, possibly related: in writing various interfaces to the outside world, I often find that I start with a &[D] of data, e.g. from a file or from Kafka or from wherever. In many cases the immediate consumers do not need a whole Vec<D>, but rather can start from &[D] data and clone whatever they need (often a subset).

If we insist that data ingestion happens in an operator, we need to hand owned data somewhere just to get things going, which means we've fundamentally cloned all of the &[D] data, even if we don't need all of it.

This suggests that we might want the type Stream to turn into a trait, so that multiple types could implement connect_to in various ways. Of course, it would be delightful (to at least one of us) if the data type were simply &D, and maybe that can work for the first hop of data ingestion?

@milibopp
Copy link
Contributor

One first step might be to try and make Stream generic in the representation of the data, in addition to the data itself. So we might have Stream<G, Vec> and Stream<G, Content>, where the former wouldn't need to be serializable (nor Clone), but wouldn't be appropriate for e.g. inter-process exchange.

Would this mean the inner type is now the container of events within a timestamp? And would it allow streams that are statically guaranteed to only have a single piece of data per timestamp, in this context a Stream<G, D>? Or could you simply not construct such a thing?

Anecdotally, I occasionally find myself mentally stumbling over the nested event structure of timely's streams. There are multiple timestamps, each of which contain multiple events. If I understand the implication your suggestion as above, such a change in type signature would make the structure somewhat more obvious.

@frankmcsherry
Copy link
Member Author

frankmcsherry commented Sep 13, 2017

I think it is roughly this (the "container"), but I don't believe that Stream<G, D> would imply a single element per-time. At least, even now you can receive multiple Vec<D> for each time.

It's good to note what the stumbling blocks are. I'm not sure what can be done about this one, short of a type of stream that is known to have single elements per time (and .. maybe no data exchange, otherwise hard to ensure?).

I think some things might be getting worse for you in the future, too; there is a good mathematical reason that each "batch" Vec<D> should come with a set of times, rather than a single time. Of course, your are welcome to just use one everywhere, but there are sane reasons to want multiple times, and to want zero times (roughly: the "progress" timely tracks generalizes to multiple times, and some operators require multiple incomparable times and currently do contortions).

Perhaps the right way to think about single-elements/batcher per time are wrapper around Stream, like how String is a wrapper around Vec<u8>?


Alternately, one could certainly write a (thin) layer on top of timely stuffs in which all operators stash their inputs until they are complete, and then deliver one burst of data at each time. E.g. you have a notificator that holds on to Vec<D> data for each T, which is a pretty common pattern (but not one we should insist on always imposing).

This wouldn't be too hard an API to prototype; what isn't immediately clear is what the API is. Perhaps similar to how we have now, but with the implied guarantee that each time T appears only once, and in order?

@milibopp
Copy link
Contributor

Oh, I was just curious about the superficial similarity to my mental model. But it seems I was wrong about that.

I would probably rather file this under getting used to the concept rather than a problem with the API itself. It did make sense to me though, as soon as I had to think about non-trivial data exchange patterns.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants