Skip to content

Commit

Permalink
dataflow: Buffer full replays in unions by tag
Browse files Browse the repository at this point in the history
If multiple diamond unions exist in a chain, each of those unions will
receive a number of replays equal to the number of replay paths (the
product of the number of path splits). Previously, each union would
buffer replays until it had received a number of replays equal to its
number of parents, but in the case of successive diamonds that would
break if the replays were received out of order - eg a union of two
nodes could receive two replays from its left parent, and release those
as a complete replay, before ever receiving a replay from its right
parent. f8c3582 (Added replay path sorting to ensure we traverse them
correctly., 2021-10-26) attempted to work around this by sorting the
replay paths so that earlier unions would receive replays from all
parents (and then successive replays would just overwrite the downstream
state with the same set of records). That happened to work in the case
that was tested where all the nodes were in the same domain and replays
were all happening synchronously since we fully processed replays
through nodes before starting another replay, but doesn't work if things
are happening more concurrently, for the same reason as before.

Instead of that approach, this commit fixes the issue more sustainably
by buffering full replays within unions on a *per tag* basis, rather
than overall. This works because the "replay grouping" code in the
materialization planner already groups replay paths with identical
suffixes under the same tag, so we know that if we've received a number
of replays equal to the number of parents with the same tag, then we've
received a complete picture of our upstream state and can release the
replay downstream. There's also a comment within the code explaining the
current state of how all this works in another way.

Since this is a better fix for the same issue, this also reverts the
changes in f8c3582 (Added replay path sorting to ensure we traverse
them correctly., 2021-10-26).

This doesn't fix any test cases as of this commit (getting a failing
test here for the nodes-in-different-domains case is somewhat annoying
given node assignment heuristics) but paves the way for making full
replays stream asynchronously, which was another way to hit this issue.

Along the way, this also adds some new trace logging which I used to
debug this issue.

Fixes: REA-2989
Change-Id: Iea0adb55d7f7d7139b51319c04ad88c464297e35
Reviewed-on: https://gerrit.readyset.name/c/readyset/+/5391
Tested-by: Buildkite CI
Reviewed-by: Dan Wilbanks <dan@readyset.io>
  • Loading branch information
glittershark committed Jul 14, 2023
1 parent cf40990 commit a2d92e5
Show file tree
Hide file tree
Showing 5 changed files with 295 additions and 129 deletions.
6 changes: 6 additions & 0 deletions readyset-dataflow/src/domain/mod.rs
Expand Up @@ -2925,6 +2925,12 @@ impl Domain {
// We would have bailed in a previous iteration (break 'outer, below) if
// it wasn't Some
if let Packet::ReplayPiece { ref mut tag, .. } = m.as_deref_mut().unwrap() {
trace!(
%force_tag,
original_tag = %tag,
node = %segment.node,
"Forcing tag",
);
*tag = force_tag;
}
}
Expand Down
5 changes: 3 additions & 2 deletions readyset-dataflow/src/node/process.rs
Expand Up @@ -287,10 +287,11 @@ impl Node {
Packet::ReplayPiece {
ref mut data,
context: payload::ReplayPieceContext::Full { last },
tag,
..
} => {
trace!(?data, last, "received full replay");
(data, ReplayContext::Full { last })
trace!(?data, %tag, last, "received full replay");
(data, ReplayContext::Full { last, tag })
}
Packet::Message { ref mut data, .. } => {
trace!(?data, "received regular message");
Expand Down

0 comments on commit a2d92e5

Please sign in to comment.