From de3231f370561ade60e817533dac3ec14fa0812f Mon Sep 17 00:00:00 2001 From: Aspen Smith Date: Tue, 11 Jul 2023 15:06:03 -0400 Subject: [PATCH] state: Stream records for full replay from persistent state Currently, before we launch the thread to chunk the records for a full replay, we clone all rows out of the source materialization, constructing an in-memory `Vec>` containing the rows. This is inefficient - and in the case of persistent state, actually unnecessary, since RocksDB allows multiple threads to concurrently read from the same state. This commit tweaks the API of what was previously called `State::cloned_records` (I've renamed it to `State::all_records`, since it now only clones in the case of MemoryState) such that we no longer have to collect the full vector of records for PersistentState. This adds a little bit of complexity since the state itself is behind an RwLock, has benefits both in terms of runtime - on my machine it speeds up a full replay for a `COUNT(*)` from a 10GB table from about 1m 38s to about 1m 19s - and more significantly in terms of maximum memory usage, since we no longer have to keep the full contents of a base table in memory in order to replay from it. Fixes: REA-2972 Change-Id: I76f67e24a37c15fa8fdfbc5fdfbfeefa11c3a0e0 Reviewed-on: https://gerrit.readyset.name/c/readyset/+/5022 Reviewed-by: Luke Osborne Reviewed-by: Dan Wilbanks Reviewed-by: Fran Noriega Tested-by: Buildkite CI --- dataflow-state/src/lib.rs | 58 +++++- dataflow-state/src/memory_state.rs | 8 +- dataflow-state/src/persistent_state.rs | 125 ++++++++++--- readyset-dataflow/src/domain/mod.rs | 203 +++++++++++---------- readyset-dataflow/src/node/special/base.rs | 6 +- 5 files changed, 273 insertions(+), 127 deletions(-) diff --git a/dataflow-state/src/lib.rs b/dataflow-state/src/lib.rs index 0baf27781c..25f1df6c36 100644 --- a/dataflow-state/src/lib.rs +++ b/dataflow-state/src/lib.rs @@ -18,6 +18,7 @@ use ahash::RandomState; use common::{Records, SizeOf, Tag}; use derive_more::From; use hashbag::HashBag; +use itertools::Either; pub use partial_map::PartialMap; use readyset_client::internal::Index; use readyset_client::replication::ReplicationOffset; @@ -210,8 +211,9 @@ pub trait State: SizeOf + Send { /// state fn row_count(&self) -> usize; - /// Return a copy of all records. Panics if the state is only partially materialized. - fn cloned_records(&self) -> Vec>; + /// Return a handle that allows streaming a consistent snapshot of all records within this + /// state. Panics if the state is only partially materialized. + fn all_records(&self) -> AllRecords; /// Evict up to `bytes` by randomly selected keys, returning a struct representing the index /// chosen to evict from along with the keys evicted and the number of bytes evicted. @@ -392,11 +394,11 @@ impl State for MaterializedNodeState { } } - fn cloned_records(&self) -> Vec> { + fn all_records(&self) -> AllRecords { match self { - MaterializedNodeState::Memory(ms) => ms.cloned_records(), - MaterializedNodeState::Persistent(ps) => ps.cloned_records(), - MaterializedNodeState::PersistentReadHandle(rh) => rh.cloned_records(), + MaterializedNodeState::Memory(ms) => ms.all_records(), + MaterializedNodeState::Persistent(ps) => ps.all_records(), + MaterializedNodeState::PersistentReadHandle(rh) => rh.all_records(), } } @@ -441,6 +443,50 @@ impl State for MaterializedNodeState { } } +/// Handle to all the records in a state. +/// +/// This type exists as distinct from [`AllRecordsGuard`] to allow it to be sent between threads. +pub enum AllRecords { + /// Owned records taken from a [`MemoryState`] + Owned(Vec>), + /// Records streaming out of a [`PersistentState`] + Persistent(persistent_state::AllRecords), +} + +/// RAII guard providing the ability to stream all the records out of a state +pub enum AllRecordsGuard<'a> { + /// Owned records taken from a [`MemoryState`] + Owned(vec::IntoIter>), + /// Records streaming out of a [`PersistentState`] + Persistent(persistent_state::AllRecordsGuard<'a>), +} + +impl AllRecords { + /// Construct an RAII guard providing the ability to stream all the records out of a state + pub fn read(&mut self) -> AllRecordsGuard<'_> { + match self { + AllRecords::Owned(i) => AllRecordsGuard::Owned(std::mem::take(i).into_iter()), + AllRecords::Persistent(g) => AllRecordsGuard::Persistent(g.read()), + } + } +} + +impl<'a> AllRecordsGuard<'a> { + /// Construct an iterator over all the records in a state. + /// + /// Do not call this method multiple times on the same `guard` - doing so will yield an empty + /// result set. + pub fn iter<'b>(&'b mut self) -> impl Iterator> + 'b + where + 'a: 'b, + { + match self { + AllRecordsGuard::Owned(v) => Either::Left(v), + AllRecordsGuard::Persistent(g) => Either::Right(g.iter()), + } + } +} + #[derive(Debug, Hash, PartialEq, Eq)] pub struct Row(Rc>); diff --git a/dataflow-state/src/memory_state.rs b/dataflow-state/src/memory_state.rs index 48513330d7..2bc152a97d 100644 --- a/dataflow-state/src/memory_state.rs +++ b/dataflow-state/src/memory_state.rs @@ -13,8 +13,8 @@ use tracing::trace; use crate::keyed_state::KeyedState; use crate::single_state::SingleState; use crate::{ - EvictBytesResult, EvictKeysResult, EvictRandomResult, LookupResult, PointKey, RangeKey, - RangeLookupResult, RecordResult, Row, Rows, State, + AllRecords, EvictBytesResult, EvictKeysResult, EvictRandomResult, LookupResult, PointKey, + RangeKey, RangeLookupResult, RecordResult, Row, Rows, State, }; #[derive(Default)] @@ -286,14 +286,14 @@ impl State for MemoryState { self.state[index].lookup_range(key) } - fn cloned_records(&self) -> Vec> { + fn all_records(&self) -> AllRecords { #[allow(clippy::ptr_arg)] fn fix(rs: &Rows) -> impl Iterator> + '_ { rs.iter().map(|r| Vec::clone(&**r)) } assert!(!self.state[0].partial()); - self.state[0].values().flat_map(fix).collect() + AllRecords::Owned(self.state[0].values().flat_map(fix).collect()) } /// Evicts `bytes` by evicting random keys from the state. The key are first evicted from the diff --git a/dataflow-state/src/persistent_state.rs b/dataflow-state/src/persistent_state.rs index b233ce9582..5084f0cf2c 100644 --- a/dataflow-state/src/persistent_state.rs +++ b/dataflow-state/src/persistent_state.rs @@ -70,7 +70,7 @@ use std::str::FromStr; use std::sync::Arc; use std::thread::JoinHandle; use std::time::{Duration, Instant}; -use std::{fmt, fs}; +use std::{fmt, fs, mem}; use bincode::Options; use clap::ValueEnum; @@ -758,8 +758,8 @@ impl State for PersistentState { } } - fn cloned_records(&self) -> Vec> { - self.db.cloned_records() + fn all_records(&self) -> crate::AllRecords { + self.read_handle().all_records() } /// Returns a *row* count estimate from RocksDB (not a key count as the function name would @@ -1029,13 +1029,8 @@ impl State for PersistentStateHandle { .unwrap() as usize } - fn cloned_records(&self) -> Vec> { - let inner = self.inner(); - let db = &inner.db; - let cf = db.cf_handle(&inner.indices[0].column_family).unwrap(); - db.full_iterator_cf(cf, IteratorMode::Start) - .map(|res| deserialize_row(res.unwrap().1)) - .collect() + fn all_records(&self) -> crate::AllRecords { + crate::AllRecords::Persistent(AllRecords(self.clone())) } fn evict_bytes(&mut self, _: usize) -> Option { @@ -1358,6 +1353,40 @@ fn compact_cf(table: &str, db: &DB, index: &PersistentIndex, opts: &CompactOptio } } +/// Handle to all the records in a persistent state. +/// +/// This type exists as distinct from [`AllRecordsGuard`] to allow it to be sent between threads. +pub struct AllRecords(PersistentStateHandle); + +/// RAII guard providing the ability to stream all the records out of a persistent state +pub struct AllRecordsGuard<'a>(RwLockReadGuard<'a, SharedState>); + +impl AllRecords { + /// Construct an RAII guard providing the ability to stream all the records out of a persistent + /// state + pub fn read(&self) -> AllRecordsGuard<'_> { + AllRecordsGuard(self.0.inner()) + } +} + +impl<'a> AllRecordsGuard<'a> { + /// Construct an iterator over all the records in a persistent state + pub fn iter<'b>(&'a self) -> impl Iterator> + 'b + where + 'a: 'b, + { + let cf = self + .0 + .db + .cf_handle(&self.0.indices[0].column_family) + .expect("Column families always exist for all indices"); + self.0 + .db + .full_iterator_cf(cf, IteratorMode::Start) + .map(|res| deserialize_row(res.unwrap().1)) + } +} + impl PersistentState { #[instrument(name = "Creating persistent state", skip_all, fields(name))] pub fn new, K: IntoIterator>( @@ -2019,7 +2048,7 @@ impl SizeOf for PersistentStateHandle { } fn size_of(&self) -> u64 { - std::mem::size_of::() as u64 + mem::size_of::() as u64 } fn is_empty(&self) -> bool { @@ -2034,7 +2063,7 @@ impl SizeOf for PersistentStateHandle { impl SizeOf for PersistentState { fn size_of(&self) -> u64 { - std::mem::size_of::() as u64 + mem::size_of::() as u64 } #[allow(clippy::panic)] // Can't return a result, panicking is the best we can do @@ -2739,18 +2768,68 @@ mod tests { assert!(count > 0 && count < rows.len() * 2); } - #[test] - fn persistent_state_cloned_records() { - let mut state = setup_persistent("persistent_state_cloned_records", None); - let first: Vec = vec![10.into(), "Cat".into()]; - let second: Vec = vec![20.into(), "Cat".into()]; - state.add_key(Index::new(IndexType::HashMap, vec![0]), None); - state.add_key(Index::new(IndexType::HashMap, vec![1]), None); - state - .process_records(&mut vec![first.clone(), second.clone()].into(), None, None) - .unwrap(); + mod all_records { + use pretty_assertions::assert_eq; + + use super::*; + + #[test] + fn simple_case() { + let mut state = setup_persistent("persistent_state_cloned_records", None); + let first: Vec = vec![10.into(), "Cat".into()]; + let second: Vec = vec![20.into(), "Cat".into()]; + state.add_key(Index::new(IndexType::HashMap, vec![0]), None); + state.add_key(Index::new(IndexType::HashMap, vec![1]), None); + state + .process_records(&mut vec![first.clone(), second.clone()].into(), None, None) + .unwrap(); + + let mut all_records = state.all_records(); + assert_eq!( + all_records.read().iter().collect::>(), + vec![first, second] + ); + } + + #[test] + fn wonky_drop_order() { + let mut state = setup_persistent("persistent_state_cloned_records", None); + let first: Vec = vec![10.into(), "Cat".into()]; + let second: Vec = vec![20.into(), "Cat".into()]; + state.add_key(Index::new(IndexType::HashMap, vec![0]), None); + state.add_key(Index::new(IndexType::HashMap, vec![1]), None); + state + .process_records(&mut vec![first.clone(), second.clone()].into(), None, None) + .unwrap(); + let mut all_records = state.all_records(); + drop(state); - assert_eq!(state.cloned_records(), vec![first, second]); + assert_eq!( + all_records.read().iter().collect::>(), + vec![first, second] + ); + } + + #[test] + fn writes_during_iter() { + let mut state = setup_persistent("persistent_state_cloned_records", None); + let first: Vec = vec![10.into(), "Cat".into()]; + let second: Vec = vec![20.into(), "Cat".into()]; + state.add_key(Index::new(IndexType::HashMap, vec![0]), None); + state.add_key(Index::new(IndexType::HashMap, vec![1]), None); + state + .process_records(&mut vec![first.clone(), second.clone()].into(), None, None) + .unwrap(); + let mut all_records = state.all_records(); + let mut guard = all_records.read(); + let iter = guard.iter(); + state + .process_records(&mut vec![first.clone(), second.clone()].into(), None, None) + .unwrap(); + drop(state); + + assert_eq!(iter.collect::>(), vec![first, second]); + } } #[test] diff --git a/readyset-dataflow/src/domain/mod.rs b/readyset-dataflow/src/domain/mod.rs index b9b33349dc..7c26638d22 100644 --- a/readyset-dataflow/src/domain/mod.rs +++ b/readyset-dataflow/src/domain/mod.rs @@ -36,7 +36,7 @@ use serde::{Deserialize, Serialize}; use timekeeper::{RealTime, SimpleTracker, ThreadTime, Timer, TimerSet}; use tokio::sync::mpsc::Sender; use tokio_stream::wrappers::UnboundedReceiverStream; -use tracing::{debug, error, info, trace, warn, Instrument}; +use tracing::{debug, error, info, info_span, trace, warn, Instrument}; use vec1::Vec1; pub(crate) use self::replay_paths::ReplayPath; @@ -1924,14 +1924,16 @@ impl Domain { // case, we wouldn't be able to do the replay, and the entire migration // would fail. // - // we clone the entire state so that we can continue to occasionally - // process incoming updates to the domain without disturbing the state that - // is being replayed. + // In the case of memory state, we clone the entire state so that we can continue to + // occasionally process incoming updates to the domain without disturbing the state + // that is being replayed. For persistent state, we can stream records from a + // consistent snapshot and avoid the allocations let state = self .state .get(from) - .expect("migration replay path started with non-materialized node") - .cloned_records(); + .expect("migration replay path started with non-materialized node"); + let is_empty = state.is_empty(); + let mut all_records = state.all_records(); debug!( μs = %start.elapsed().as_micros(), @@ -1953,105 +1955,122 @@ impl Domain { tag, link, context: ReplayPieceContext::Full { - last: state.is_empty(), + // NOTE: If we're replaying from persistent state this might be wrong, since + // it's backed by an *estimate* of the number of keys in the state + last: is_empty, }, data: Vec::::new().into(), }); - if !state.is_empty() { - let added_cols = self.ingress_inject.get(from).cloned(); - let default = { - let n = self - .nodes - .get(from) - .ok_or_else(|| ReadySetError::NoSuchNode(from.id()))? - .borrow(); - let mut default = None; - if let Some(b) = n.get_base() { - let mut row = Vec::new(); - b.fix(&mut row); - default = Some(row); - } - default - }; - let fix = move |mut r: Vec| -> Vec { - if let Some((start, ref added)) = added_cols { - let rlen = r.len(); - r.extend(added.iter().skip(rlen - start).cloned()); - } else if let Some(ref defaults) = default { - let rlen = r.len(); - r.extend(defaults.iter().skip(rlen).cloned()); - } - r - }; - - let replay_tx_desc = self.channel_coordinator.builder_for(&self.address())?; - - // Have to get metrics here so we can move them to the thread - let (replay_time_counter, replay_time_histogram) = - self.metrics.recorders_for_chunked_replay(link.dst); + let added_cols = self.ingress_inject.get(from).cloned(); + let default = { + let n = self + .nodes + .get(from) + .ok_or_else(|| ReadySetError::NoSuchNode(from.id()))? + .borrow(); + let mut default = None; + if let Some(b) = n.get_base() { + let mut row = Vec::new(); + b.fix(&mut row); + default = Some(row); + } + default + }; + let fix = move |mut r: Vec| -> Vec { + if let Some((start, ref added)) = added_cols { + let rlen = r.len(); + r.extend(added.iter().skip(rlen - start).cloned()); + } else if let Some(ref defaults) = default { + let rlen = r.len(); + r.extend(defaults.iter().skip(rlen).cloned()); + } + r + }; - thread::Builder::new() - .name(format!( - "replay{}.{}", - #[allow(clippy::unwrap_used)] // self.nodes can't be empty - self.nodes - .values() - .next() - .unwrap() - .borrow() - .domain() - .index(), - link.src - )) - .spawn(move || { - use itertools::Itertools; + let replay_tx_desc = self.channel_coordinator.builder_for(&self.address())?; + + // Have to get metrics here so we can move them to the thread + let (replay_time_counter, replay_time_histogram) = + self.metrics.recorders_for_chunked_replay(link.dst); + + let address = self.address(); + thread::Builder::new() + .name(format!("replay{}.{}", self.index(), link.src)) + .spawn(move || { + let span = info_span!("full_replay", %address, src = %link.src); + let _guard = span.enter(); + use itertools::Itertools; + + // TODO: make async + let mut chunked_replay_tx = match replay_tx_desc.build_sync() { + Ok(r) => r, + Err(error) => { + error!(%error, "Error building channel for chunked replay"); + return; + } + }; - // TODO: make async - let mut chunked_replay_tx = match replay_tx_desc.build_sync() { - Ok(r) => r, - Err(error) => { - error!(%error, "Error building channel for chunked replay"); - return; - } - }; + let start = time::Instant::now(); + debug!(node = %link.dst, "starting state chunker"); - let start = time::Instant::now(); - debug!(node = %link.dst, "starting state chunker"); + let mut guard = all_records.read(); + let iter = guard.iter().chunks(BATCH_SIZE); + let mut iter = iter + .into_iter() + .map(|chunk| Records::from_iter(chunk.map(&fix))) + .enumerate() + .peekable(); + + // process all records in state to completion within domain and then + // forward on tx (if there is one) + let mut sent_last = is_empty; + while let Some((i, chunk)) = iter.next() { + let len = chunk.len(); + let last = iter.peek().is_none(); + sent_last = last; + let p = Box::new(Packet::ReplayPiece { + tag, + link, // to is overwritten by receiver + context: ReplayPieceContext::Full { last }, + data: chunk, + }); - let iter = state.into_iter().chunks(BATCH_SIZE); - let mut iter = iter.into_iter().enumerate().peekable(); + trace!(num = i, len, "sending batch"); + if let Err(error) = chunked_replay_tx.send(p) { + warn!(%error, "replayer noticed domain shutdown"); + break; + } + } - // process all records in state to completion within domain - // and then forward on tx (if there is one) - while let Some((i, chunk)) = iter.next() { - let chunk = Records::from_iter(chunk.map(&fix)); - let len = chunk.len(); - let last = iter.peek().is_none(); - let p = Box::new(Packet::ReplayPiece { + // Since we're using `is_empty` above to send a `last: true` packet + // before launching the thread, and that's based on a potentially + // inaccurate estimate, it might be the case that we started this thread + // with no records - if so, we need to send a `last: true` packet to + // tell the target domain we're done + if !sent_last { + trace!("Sending empty last batch"); + if let Err(error) = + chunked_replay_tx.send(Box::new(Packet::ReplayPiece { tag, - link, // to is overwritten by receiver - context: ReplayPieceContext::Full { last }, - data: chunk, - }); - - trace!(num = i, len, "sending batch"); - if chunked_replay_tx.send(p).is_err() { - warn!("replayer noticed domain shutdown"); - break; - } + link, + context: ReplayPieceContext::Full { last: true }, + data: Default::default(), + })) + { + warn!(%error, "replayer noticed domain shutdown"); } + } - debug!( - node = %link.dst, - μs = %start.elapsed().as_micros(), - "state chunker finished" - ); + debug!( + node = %link.dst, + μs = %start.elapsed().as_micros(), + "state chunker finished" + ); - replay_time_counter.increment(start.elapsed().as_micros() as u64); - replay_time_histogram.record(start.elapsed().as_micros() as f64); - })?; - } + replay_time_counter.increment(start.elapsed().as_micros() as u64); + replay_time_histogram.record(start.elapsed().as_micros() as f64); + })?; self.handle_replay(*p, executor)?; self.total_replay_time.stop(); diff --git a/readyset-dataflow/src/node/special/base.rs b/readyset-dataflow/src/node/special/base.rs index 5f4541b949..75ddbd3c87 100644 --- a/readyset-dataflow/src/node/special/base.rs +++ b/readyset-dataflow/src/node/special/base.rs @@ -201,7 +201,8 @@ impl Base { } TableOperation::Truncate => { records.clear(); - records.extend(db.cloned_records().into_iter().map(|r| Record::Negative(r))) + let mut all_records = db.all_records(); + records.extend(all_records.read().iter().map(|r| Record::Negative(r))); } TableOperation::DeleteByKey { .. } | TableOperation::InsertOrUpdate { .. } @@ -297,7 +298,8 @@ impl Base { if !truncated { debug!("Truncating base"); truncated = true; - results.extend(db.cloned_records().into_iter().map(|r| Record::Negative(r))); + let mut all_records = db.all_records(); + results.extend(all_records.read().iter().map(|r| Record::Negative(r))); } }