Skip to content

Commit

Permalink
dataflow: Track BackUnionState as part of FullWait
Browse files Browse the repository at this point in the history
To implement the bag-union semantics of an OR between filters, unions
internally track some state via a field on the union itself, and use
that state when processing records to optionally omit some extra rows.
However, when buffering full replays, we might receive different chunks
of state as part of entirely separate replays in different orders from
our parents - for example, we might receive a replay with tag 1 from our
left parent, then a replay with tag 2 from our right parent, then the
other half of the replay with tag 1 from our right parent. Previously,
since we were always using the BagUnionState on the Union itself, this
could cause us to emit the wrong results due to that BagUnionState
mistakenly thinking the first two chunks of state were part of the same
replay.

To fix that, this commit adds an `Option<BagUnionState>` field to
FulWait, and uses that when processing records (which is now broken out
from `on_input` into its own function) during buffering of full
replays *by tag*, to avoid getting BagUnionState mixed up between
replays of different tags (see 60f8a85c7 (dataflow: Buffer full replays
in unions by tag, 2023-07-14) for more info here). When we then
eventually release the full replay, the buffered records have become
"canonical" - so we also make the BagUnionState itself canonical, by
setting it as `self.bag_union_state` on the Union.

This fixes 10 previously-failing generated logictests

Change-Id: I0c8429eb991f56016d851f052f54cee5dc68b914
Reviewed-on: https://gerrit.readyset.name/c/readyset/+/5471
Tested-by: Buildkite CI
Reviewed-by: Dan Wilbanks <dan@readyset.io>
  • Loading branch information
glittershark committed Jul 18, 2023
1 parent ed2b33a commit f7ffbe6
Show file tree
Hide file tree
Showing 11 changed files with 137 additions and 47 deletions.
184 changes: 137 additions & 47 deletions readyset-dataflow/src/ops/union.rs
Expand Up @@ -65,6 +65,9 @@ struct FullWait {
finished: usize,
/// The records we've received and buffered as part of this full replay
buffered: Records,
/// State for implementing [`DuplicateMode::BagUnion`]. If this is None, then the duplicate
/// mode is [`DuplicateMode::UnionAll`]
bag_union_state: Option<BagUnionState>,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -140,6 +143,57 @@ impl BagUnionState {
}
}

fn process_records(
from: LocalNodeIndex,
rs: Records,
emit: &Emit,
bag_union_state: Option<&mut BagUnionState>,
) -> ReadySetResult<Records> {
let mut results = match emit {
Emit::AllFrom(..) => rs,
Emit::Project { emit_l, .. } => {
rs.into_iter()
.map(move |rec| {
let (r, pos) = rec.extract();

// yield selected columns for this source
// TODO: if emitting all in same order then avoid clone
let res = emit_l[&from].iter().map(|&col| r[col].clone()).collect();

// return new row with appropriate sign
if pos {
Record::Positive(res)
} else {
Record::Negative(res)
}
})
.collect()
}
};

if let Some(bus) = bag_union_state {
if let Some(parent) = match emit {
Emit::Project { cols_l, .. } => {
let first_parent = cols_l.keys().next().ok_or_else(|| {
internal_err!(
"Union node with DuplicateMode::BagUnion must have exactly 2 parents",
)
})?;
if from == *first_parent {
Some(Side::Left)
} else {
Some(Side::Right)
}
}
_ => None,
} {
results.retain(|rec| bus.process(parent, rec));
}
}

Ok(results)
}

/// Key type for [`Union::replay_pieces`]. See the documentation on that field for more information
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
pub struct BufferedReplayKey {
Expand Down Expand Up @@ -437,47 +491,7 @@ impl Ingredient for Union {
_: &StateMap,
_: &mut AuxiliaryNodeStateMap,
) -> ReadySetResult<ProcessingResult> {
let mut results = match self.emit {
Emit::AllFrom(..) => rs,
Emit::Project { ref emit_l, .. } => {
rs.into_iter()
.map(move |rec| {
let (r, pos) = rec.extract();

// yield selected columns for this source
// TODO: if emitting all in same order then avoid clone
let res = emit_l[&from].iter().map(|&col| r[col].clone()).collect();

// return new row with appropriate sign
if pos {
Record::Positive(res)
} else {
Record::Negative(res)
}
})
.collect()
}
};

if let Some(bus) = &mut self.bag_union_state {
if let Some(parent) = match self.emit {
Emit::Project { ref cols_l, .. } => {
let first_parent = cols_l.keys().next().ok_or_else(|| {
internal_err!(
"Union node with DuplicateMode::BagUnion must have exactly 2 parents",
)
})?;
if from == *first_parent {
Some(Side::Left)
} else {
Some(Side::Right)
}
}
_ => None,
} {
results.retain(|rec| bus.process(parent, rec));
}
}
let results = process_records(from, rs, &self.emit, self.bag_union_state.as_mut())?;

Ok(ProcessingResult {
results,
Expand Down Expand Up @@ -715,24 +729,32 @@ impl Ingredient for Union {
// arm). feel free to go check. interestingly enough, it's also fine for us to
// still emit 2 (i.e., not capture it), since it'll just be dropped by the target
// domain.
let mut rs = self.on_input(from, rs, &replay, n, s, ans)?.results;
match self.full_wait_state.entry(tag) {
hash_map::Entry::Vacant(e) => {
if self.required == 1 {
// no need to ever buffer
return Ok(RawProcessingResult::FullReplay(rs, last));
return Ok(RawProcessingResult::FullReplay(
process_records(from, rs, &self.emit, None)?,
last,
));
}

debug!(
"union captured start of full replay; has: {}, need: {}",
1, self.required
);

let mut bag_union_state =
self.bag_union_state.is_some().then(BagUnionState::default);
let buffered =
process_records(from, rs, &self.emit, bag_union_state.as_mut())?;

// we need to hold this back until we've received one from every ancestor
e.insert(FullWait {
started: HashSet::from([from]),
finished: usize::from(last),
buffered: rs,
buffered,
bag_union_state,
});
Ok(RawProcessingResult::CapturedFull)
}
Expand All @@ -741,26 +763,31 @@ impl Ingredient for Union {
started,
finished,
buffered,
bag_union_state,
} = wait.get_mut();
if last {
*finished += 1;
}

let rs = process_records(from, rs, &self.emit, bag_union_state.as_mut())?;

if *finished == self.required {
// we can just send everything and we're done!
// make sure to include what's in *this* replay.
buffered.append(&mut *rs);
buffered.extend(rs);
debug!("union releasing end of full replay");
let res =
RawProcessingResult::FullReplay(buffered.split_off(0).into(), true);
self.bag_union_state = bag_union_state.take();
wait.remove();
Ok(res)
} else {
if started.len() != self.required {
if started.insert(from) && started.len() == self.required {
// we can release all buffered replays!
debug!("union releasing full replay");
buffered.append(&mut *rs);
self.bag_union_state = bag_union_state.take();
buffered.extend(rs);
return Ok(RawProcessingResult::FullReplay(
buffered.split_off(0).into(),
false,
Expand All @@ -781,7 +808,7 @@ impl Ingredient for Union {

// if we fell through here, it means we're still missing the first
// replay from at least one ancestor, so we need to buffer
buffered.append(&mut *rs);
buffered.extend(rs);
Ok(RawProcessingResult::CapturedFull)
}
}
Expand Down Expand Up @@ -1525,6 +1552,69 @@ mod tests {
_ => panic!(),
}
}

#[test]
fn captured_full_replay_bag_union() {
let (mut g, left, right) = setup(DuplicateMode::BagUnion);
let t0 = Tag::new(0);
let t1 = Tag::new(1);

g.input_raw(
left,
vec![
vec![DfValue::from(1), DfValue::from("a")],
vec![DfValue::from(2), DfValue::from("a")],
],
ReplayContext::Full {
last: true,
tag: t0,
},
false,
);
g.input_raw(
right,
vec![vec![DfValue::from(1), DfValue::None, DfValue::from("a")]],
ReplayContext::Full {
last: true,
tag: t1,
},
false,
);
let res = g.input_raw(
right,
vec![vec![DfValue::from(1), DfValue::None, DfValue::from("a")]],
ReplayContext::Full {
last: true,
tag: t0,
},
false,
);

match res {
RawProcessingResult::FullReplay(records, last) => {
assert!(last);
assert_eq!(
*records,
vec![
vec![DfValue::from(1), DfValue::from("a")].into(),
vec![DfValue::from(2), DfValue::from("a")].into()
]
);
}
_ => panic!(),
}

// A subsequent regular update (eg a delete) should now pass through as expected
let res = g.input(
left,
vec![(vec![DfValue::from(2), DfValue::from("a")], false)],
false,
);
assert_eq!(
*res.results,
vec![(vec![DfValue::from(2), DfValue::from("a")], false).into()]
)
}
}

/// An oracle implementation of the "bag union" algorithm that unions implement for
Expand Down

0 comments on commit f7ffbe6

Please sign in to comment.