Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Notifications out of order #108

Closed
gandro opened this issue Nov 22, 2017 · 18 comments
Closed

Notifications out of order #108

gandro opened this issue Nov 22, 2017 · 18 comments

Comments

@gandro
Copy link
Contributor

gandro commented Nov 22, 2017

I was under the assumption that the order in which notifications are delivered to an operator is following the partial order defined on timestamps. At least this seemed to be the case in past versions of timely, since our sessionization code relies on this.

However, the following code (which sometimes requests notification for future times during a notifications, like sessionization) observes notifications in a weird order:

last: (Root, 0), curr: (Root, 0), frontier: [(Root, 4)]
last: (Root, 0), curr: (Root, 1), frontier: [(Root, 4)]
last: (Root, 1), curr: (Root, 2), frontier: [(Root, 4)]
last: (Root, 2), curr: (Root, 3), frontier: [(Root, 4)]
last: (Root, 3), curr: (Root, 2), frontier: [(Root, 4)]
thread 'worker thread 0' panicked at 'assertion failed: last_notification.less_equal(curr.time())', src/bin/bug.rs:22:24

Source code:

extern crate timely; // 0.3.0

use timely::PartialOrder;
use timely::dataflow::operators::{Input, Unary, Probe};
use timely::dataflow::channels::pact::Pipeline;
use timely::progress::timestamp::RootTimestamp;

fn main() {
    timely::execute_from_args(std::env::args(), move |computation| {
        let (mut input, probe) = computation.dataflow(move |scope| {
            let (input, stream) = scope.new_input::<()>();

            let mut last_notification = RootTimestamp::new(0);
            let probe = stream.unary_notify::<(), _, _>(Pipeline, "foo", Vec::new(), 
                move |input, _, notificator| {
                    input.for_each(|time, _| {
                        notificator.notify_at(time);
                    });
                    notificator.for_each(|curr, _, notif| {
                        println!("last: {:?}, curr: {:?}, frontier: {:?}",
                            last_notification, curr.time(), notif.frontier(0));
                        assert!(last_notification.less_equal(curr.time()));
                        last_notification = curr.time().clone();
                        if *curr == RootTimestamp::new(0) {
                            notif.notify_at(curr.delayed(&RootTimestamp::new(2)));
                        }
                   });
               }).probe();
            (input, probe)
        });
    
        for epoch in 0..5 {
            input.advance_to(epoch);
            input.send(());
        }
        
        computation.step_while(|| probe.less_than(input.time()))
    }).unwrap();
}
@utaal
Copy link
Member

utaal commented Nov 22, 2017

Heyo.

I believe this is what's happening:

  • once you've read all the input, the input frontier is now (Root, 4);
  • when you call notificator.for_each(...), notificator looks at all the pending notifications and figures out which ones are now below the frontier (after sorting them) -- in this case these are: (Root, 0), (Root, 1), (Root, 2), (Root, 3), as requested by the input.for_each closure;
  • these are delivered by notificator.for_each in order -- in the closure, you ask for another notification at (Root, 2) but this will only be made available on the next invocation of the operator (when notificator.for_each is called again).

@frankmcsherry
Copy link
Member

frankmcsherry commented Nov 22, 2017

I've just checked out the code, and it is currently certainly possible for this to happen when user code re-issues notification requests for times that are less than the current frontier. I'll explain the trade-offs, and maybe we can talk through what answer is best for you; the good news is that it is all user-level code now, so if you need a certain tweak, it is much easier to do.

  1. I think this change came about when the infrastructure was switched over to "high throughput" timestamp processing. Previously, several timestamp operations were taking linear time, so that processing a large batch of timestamps would take quadratic time. Ensuring in-order delivery in the presence of re-requests was one of those tasks.

  2. If the timestamps were totally ordered, the right datastructure to use here would be a heap. You keep popping the minimum off and in the face of any requests you push a new element on. The timestamps aren't totally ordered, generally, but they do implement Ord for the purposes of being able to sort and such; we could use the Ord implementation as the delivery order, use a heap underneath, and get you back the properties that you want / need.

This does come at a bit of a cost, in that heaps are more expensive than sorting (which we are already doing) but it seems reasonable that if the Notificator is providing value (vs just looking at the frontier and managing your own capabilities) it should be doing something good. It may mean that the API for FrontierNotificator has to change a bit, as you should not be able to add a notification without specifying the current frontier (this is what indicates whether the notification is ready to go yet), but the main consumer of that is Notificator, so probably not much breakage.

I'll whip up something today and report back.

@frankmcsherry
Copy link
Member

So, some very minor complexities. The frontier_notificator currently has a drain method, which produces an iterator draining the available notifications with respect to the current frontier. It's return type is currently vec_deque::Drain.

If we switch over to a binary heap, Rust only has a max-heap, which means we will need to wrap each element in order to flip the comparison, and also that it will be a fair bit harder to return a type from drain, because while binary_heap::Drain exists we want to unwrap the order-flipped elements, but we can't use map because we can't (easily) indicate the function in the return type.

I'm poking at the code, and this might work out with a static function passed to map (whose type we can name), or we could write a custom UnwrapOrderFlipper iterator so as to be able to name its type in the return.

Otherwise the changes seem pretty easy, really. Just Rust getting in the way. ;)

@utaal
Copy link
Member

utaal commented Nov 22, 2017

Something I noticed here, which I'm not clear on (and I don't think it's covered by this example): what do you think should be the semantics of requesting a notification at time t_n < t_i, while processing the notification at t_i?

Should the delivery be monotonic still, jumping back for the just-requested notification?

Example:

let index = worker.index();
let mut input = InputHandle::new();

let _probe = worker.dataflow(|scope| {
    let mut cap = None;
    scope.input_from(&mut input)
         .exchange(|x| *x)
         .unary_notify::<(), _, _>(Pipeline, "foo", Vec::new(), 
             move |input, _, notificator| {
                 input.for_each(|time, _| {
                     if cap.is_none() && *time == RootTimestamp::new(0) {
                         cap = Some(time.clone()); // keep a capability for (Root, 0)
                     }
                     notificator.notify_at(time);
                 });
                 notificator.for_each(|curr, _, notif| {
                     println!("curr: {:?}", curr.time());
                     if *curr == RootTimestamp::new(2) {
                         notif.notify_at(cap.take().unwrap()); // Use that capability to ask for a notification at the timestamp lower then the notification being delivered
                     }
                 });
             })
         .probe()
});

for round in 0..5 {
    if index == 0 {
        input.send(round);
    }
    input.advance_to(round + 1);
}

Should this be (as it is currently):

curr: (Root, 0)
curr: (Root, 1)
curr: (Root, 2)
curr: (Root, 3)
curr: (Root, 0)

or,

curr: (Root, 0)
curr: (Root, 1)
curr: (Root, 2)
curr: (Root, 0) <--
curr: (Root, 3)

@frankmcsherry
Copy link
Member

frankmcsherry commented Nov 22, 2017

@utaal The thing I was just about to do would hop backward in time for you, but it does raise a good point: we can't actually guarantee that you see a non-decreasing sequence of notifications because you could interactively issue a decreasing sequence of notifications that will clear.

So it seems the question is mostly about "should notificator tell you about notifications relative to the input frontier, or should it tell you about notifications relative to some internal capability frontier?". E.g. perhaps the example output above should have been

curr: (Root, 0)

because we should deliver nothing beyond the time zero capability as long as you are still holding on to a time zero capability.

This is possibly hard to do, because the capabilities still exist (in the notificator), but we need to know about the existence of extra-notificator capabilities (e.g. the difference between the operator holding a 0 capability, and one sitting in the notificator ready to be delivered). Your example above would deadlock, I think, which is something we'd probably like to avoid.

It's a good question and worth thinking about. I believe we can address Sebastian's issue with a "strict improvement" (perhaps with drain breaks; no one used the method it seems), so I'll do that and we can ponder whether an even more feature-rich notificator would be helpful.

@utaal
Copy link
Member

utaal commented Nov 22, 2017

I believe we can address Sebastian's issue with a "strict improvement" (perhaps with drain breaks; no one used the method it seems), so I'll do that and we can ponder whether an even more feature-rich notificator would be helpful.

Absolutely. I reported because while thinking about the issue @gandro reported, I realised I didn't know exactly what to expect in this other case.

I guess we can also just say that the monotonic ordering is not guaranteed if you request a notification for a t ≤ the timestamp of the notification being processed. It feels like it's something that happens very rarely, if at all.

Not sure this is that rare, actually. Sessionisation could totally use this, for example (emit the transaction tree at the timestamp it begun, but only once we've waited for all leaves, or a timeout).

@gandro
Copy link
Contributor Author

gandro commented Nov 22, 2017

I have indeed not considered the fact that you can force a jump backwards in time by holding on to a capability. I would be absolutely fine with an alternative user-space way of getting monotonic notifications, and being required to write this myself, but I do think it's very useful thing to have for many stateful operators.

@frankmcsherry
Copy link
Member

frankmcsherry commented Nov 22, 2017

I'm writing a bit of the code, and it seems I have underestimated the challenge in doing this correctly. :)

While we can use a heap for the available capabilities that are ready to clear, the pending list of capabilities not yet known to be ready is a bit more complicated. We want to let you put arbitrary notifications into this list (i.e. issue notification requests that aren't yet ready to land), but if we want to clear them in order, I am not sure how to avoid a full rescan on each call to next(). Right now we only rescan pending when available is empty, which wouldn't provide the in-order property you would like. We can't use a heap for pending because the "could clear" property is not necessarily monotonic with the timestamp order (it would be for total orders, which I suspect is a common case).

At the moment I see a few options:

  1. Things as they currently are (no in-order guarantee for re-issued notifications).
  2. Guarantee in-order, but next and for_each incur linear work.
  3. Special case things for totally ordered timestamps.

I think 2 + optional 3 sounds pretty good, and is a nice motivation to use for_each to boot: it can do the linear work once per call, because it knows that the current frontier stays fixed, whereas next would need to do it with each call (as the frontier is an argument).

I'm not aware of the downside of 2 + 3, in that I don't know what the workload is that wants to avoid the work linear in pending.len(). Perhaps if you have a very large available that you want to interactively drain with next, interleaving with re-notifications, in a way that for_each wouldn't support (perhaps: "drain 1,000 elements then stop").

Edit: Alternate option number 4.: Provide the in-order property for for_each, but make no such promises for sequential calls to next. Thoughts?

@utaal
Copy link
Member

utaal commented Nov 22, 2017

From some offline conversation:

@frankmcsherry:

[In FrontierNotificator] each time we call make_available, we are separating the current capabilities into two groups, available and pending, with the property that nothing in pending is less_equal anything in available. We didn't intentionally do that, but it is a consequence of using a frontier to split things. That means that as long as we stick with frontiers as the boundary, we shouldn't have out-of-order delivery problems (that is, if new notification requests are either inserted into the available priority queue [replacing available with a binary heap] or the pending unsorted list).

However, each time we change frontiers, because someone has issued a new call [to notificator.next(frontiers)] with perhaps a new frontiers argument, this invariant has the potential to break. Now, if we re-enqueue [requested notifications] with this new frontiers we might deliver a re-enqueued something before we get around to re-processing pending. This doesn't seem to be a problem for for_each, which eats a make_available call with each for_each, but it would be a problem for next which might need to better understand re-equeueing.

As long as we have the clear separation of available and pending using frontiers, there is no way anything in pending can come before anything in available.

And actually, I'm looking at Notificator and perhaps it can have in-order guarantees for next because it has a locked down frontiers for its existence. It would just need to call make_available on construction, and then it could behave similarly [as long as available is a priority queue/binary heap].


@frankmcsherry:

FrontierNotificator doesn't own a frontier, so each call to next comes with a frontier that could be totally different from the last call. Not much it can do without more of a "session" structure.

Actually, one way to think of Notificator is as a FrontierNotificator session. I had previously thought of it as "FrontierNotificator that owns its frontiers", but really it borrows them and doesn't change them over the course of its existence.

@utaal:

We can add a FrontierNotificator.monotonic(&frontiers) function that locks down frontiers and produces a Notificator (convenience method that calls Notificator::new(frontiers, &mut self)).

@frankmcsherry
Copy link
Member

frankmcsherry commented Nov 22, 2017

I've pushed a branch that intends (and appears) to deliver the least available notification from Notificator in each call to next, and in order for for_each. This does mean that if you request a new notification earlier than an already delivered notification, the sequence may decrease, but otherwise the sequence should be non-decreasing.

https://github.com/frankmcsherry/timely-dataflow/tree/monotonic_notification

A thing worth checking out is the modified test, which attempts to exercise the out-of-order behavior that was previously problematic.

https://github.com/frankmcsherry/timely-dataflow/blob/monotonic_notification/src/dataflow/operators/generic/notificator.rs#L98-L165

I think most of this comes at near-zero incremental cost. The observation, trickling out of the conversation @utaal posted above, is that as a Notificator corresponds to a locked-down frontiers, we can process pending notifications once at creation time (which we do), and then any subsequent notify_at requests are either (i) deferred if in advance of frontiers, or (ii) integrated into the priority queue of available notifications. Also, there is now a priority queue for available notifications, rather than sorting the list once at construction.

There are still several pending questions about notificators that could be shaken out. Here are a few:

  1. Should FrontierNotificator be public? It seems to have few guarantees, and is potentially easy to mis-use. Really, it seems to be NotificatorStorage more than anything, with some convenience methods for moving data around.

  2. Should Notificator make stronger guarantees about deduplication? Right now some deduplication is done, but I am not certain what we actually intend.

  3. Related: why do we still return the multiplicity of capabilities? I think this used to exist pre-capabilities, where it might have been important to know the amount by which you should decrement internal because you the operator implementor were allowed to do crazy things like that. We should probably drop this as it is confusing and always 1, and instead be clearer on whether we deduplicate capabilities for you, or explicitly do not do this (also defensible).

@gandro
Copy link
Contributor Author

gandro commented Nov 22, 2017

This looks great! I'll give it a shot and try to move the sessionization code to the new notificator API

@frankmcsherry
Copy link
Member

This looks great! I'll give it a shot and try to move the sessionization code to the new notificator API

Ideally the API should be the same. If it seems like it is different, let me know! The intent was that Notificator stays the same, and folks that create notificators for you (e.g. unary_notify) automatically get the monotonic guarantees (now part of constructing a Notificator).

@gandro
Copy link
Contributor Author

gandro commented Nov 22, 2017

Maybe I'm missing something, but the current version on the branch does not create the monotonic Notificator for unary_notify, does it?

@frankmcsherry
Copy link
Member

frankmcsherry commented Nov 22, 2017

All instances of Notificator are now monotonic (or, they should be). Instances of FrontierNotificator may not be, depending on how you use them.

@gandro
Copy link
Contributor Author

gandro commented Nov 22, 2017 via email

@frankmcsherry
Copy link
Member

I've started up a PR (#109) to merge this, but I'd love to get @gandro's feedback on whether it sorts out observed problems first, just to be sure.

@gandro
Copy link
Contributor Author

gandro commented Nov 23, 2017

Sorry for the delay, but yes, it seems to do the trick. The old sessionizion code works again without modifications.

Thanks!

@frankmcsherry
Copy link
Member

Sweet; PR #109 has been merged, so I'll close down the issue!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants