Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arrangement batch formation costs in proportion to outstanding updates #460

Open
frankmcsherry opened this issue Feb 19, 2024 · 3 comments

Comments

@frankmcsherry
Copy link
Member

When an arrangement builder is presented with (far) future updates, they linger in the holding pen and are reconsidered for each batch that is formed. This increases the cost of "extracting" a batch from "proportional to the batch size" to "proportional to all outstanding updates". This is especially noticeable in Materialize with it's mz_now() temporal filters, which introduce (far) future retractions.

To see this in the existing codebase, consider the following modification to examples/hello.rs:

--- a/examples/hello.rs
+++ b/examples/hello.rs
@@ -51,7 +51,7 @@ fn main() {
 
         // Load up graph data. Round-robin among workers.
         for _ in 0 .. (edges / peers) + if index < (edges % peers) { 1 } else { 0 } {
-            input.update((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)), 1)
+            input.update_at((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)), 1_000_000, 1)
         }
 
         input.advance_to(1);

The does the bulk loading of data at the start using a future time (1_000_000) and which lingers indefinitely, for our purposes.

Unmodified,

cargo run --example hello -- 1000000 10000000 1 foo

produces output that looks like

...
round 5664 finished after 215.5µs
round 5665 finished after 245.666µs
round 5666 finished after 221.375µs
round 5667 finished after 213.542µs
round 5668 finished after 243.666µs
round 5669 finished after 208.583µs
round 5670 finished after 189.417µs
round 5671 finished after 216.375µs
round 5672 finished after 219.292µs
round 5673 finished after 231.959µs
round 5674 finished after 233.958µs
^C

whereas modified

...
round 18 finished after 85.603625ms
round 19 finished after 98.8575ms
round 20 finished after 96.948833ms
round 21 finished after 100.601833ms
round 22 finished after 74.549875ms
round 23 finished after 94.3985ms
round 24 finished after 75.045083ms
^C

So, clearly there is some orders of magnitude difference between the two.

The best remedy at the moment seems to be to do the batch organization "by time" rather than "by data". This is too bad because batches will want the updates organized by data, but the frontier-based extraction would prefer to have them organized by time. However, due to the partial order on times, there's no guarantee that we can do anything quickly; binary search does not apply in the same way it does for totally ordered times.

Nonetheless, seems like a good idea to build the partially robust operator, which orders by times and if we want to support partially ordered times probably maintains runs of less_equal times which do admit binary search (but all of our updates may be an arbitrarily large number of runs).

Future work might look in to Sorting and Selection in Posets, which aims for bounds related to the "width" of the poset: the largest antichain. There will be no such bound with the above technique.

cc: @antiguru, @petrosagg

@petrosagg
Copy link
Contributor

Nonetheless, seems like a good idea to build the partially robust operator, which orders by times and if we want to support partially ordered times probably maintains runs of less_equal times which do admit binary search (but all of our updates may be an arbitrarily large number of runs).

I implemented this idea as part of MaterializeInc/materialize#25720 and also studied the linked paper out of interest. I noticed an interesting property that I thought I'd share.

In the paper the authors talk about whether a chain decomposition of a poset is optimal, with the lower bound being w chains where w is the width of the widest antichain. What I observed is that for the types we are mostly concerned with (arbitrarily nested Product<_, _>s) we can always get the optimal decomposition if we extract chains after having sorted the data with the linear extension of the poset.

As an example, if we take Product<u8, u8> we can see that the widest antichain that can be constructed has width 256 and is the following: {(255, 0), (254, 1), ..., (0, 255)}. Because the linear extension order will sort things lexicographically the maximum number of chains we can ever end up with is 256, since each update will be grouped to the chain corresponding to its first coordinate. This seems to generalize to additional levels of nesting.

@frankmcsherry
Copy link
Member Author

The challenge of using this generally is that you don't end up with the optimal width chains using this technique for common patterns with nested scopes (vs the Kafka part timestamps I think you are looking at). Specifically, if timestamps have the form (time, iter) where time is "wall-clock" and iter is "iteration", then even with just two iterations you can end up segmenting the sorted times into arbitrarily many runs of just two elements.

[(time+0, 0), (time+1, 1), (time+2, 0), (time+3, 1), (time+4, 0), (time+5, 1), ...]

These are already sorted, but the runs are very short and very numerous.

@frankmcsherry
Copy link
Member Author

But you are totally right, if you have a reason to think one of the domains is small, amazing and order by that first. For example, if we had a cunning way to know that we were looking at (time, iter) pairs we could sort by the second coordinate first, because this ordering doesn't have any requirements other than comporting to the partial order.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants