Skip to content

Commit

Permalink
state: Stream records for full replay from persistent state
Browse files Browse the repository at this point in the history
Currently, before we launch the thread to chunk the records for a full
replay, we clone all rows out of the source materialization,
constructing an in-memory `Vec<Vec<DfValue>>` containing the rows. This
is inefficient - and in the case of persistent state, actually
unnecessary, since RocksDB allows multiple threads to concurrently read
from the same state. This commit tweaks the API of what was previously
called `State::cloned_records` (I've renamed it to `State::all_records`,
since it now only clones in the case of MemoryState) such that we no
longer have to collect the full vector of records for PersistentState.
This adds a little bit of complexity since the state itself is behind an
RwLock, has benefits both in terms of runtime - on my machine it speeds
up a full replay for a `COUNT(*)` from a 10GB table from about 1m 38s to
about 1m 19s - and more significantly in terms of maximum memory usage,
since we no longer have to keep the full contents of a base table in
memory in order to replay from it.

Fixes: REA-2972
Change-Id: I76f67e24a37c15fa8fdfbc5fdfbfeefa11c3a0e0
Reviewed-on: https://gerrit.readyset.name/c/readyset/+/5022
Reviewed-by: Luke Osborne <luke@readyset.io>
Reviewed-by: Dan Wilbanks <dan@readyset.io>
Reviewed-by: Fran Noriega <fran@readyset.io>
Tested-by: Buildkite CI
  • Loading branch information
glittershark committed Jul 14, 2023
1 parent a2d92e5 commit de3231f
Show file tree
Hide file tree
Showing 5 changed files with 273 additions and 127 deletions.
58 changes: 52 additions & 6 deletions dataflow-state/src/lib.rs
Expand Up @@ -18,6 +18,7 @@ use ahash::RandomState;
use common::{Records, SizeOf, Tag};
use derive_more::From;
use hashbag::HashBag;
use itertools::Either;
pub use partial_map::PartialMap;
use readyset_client::internal::Index;
use readyset_client::replication::ReplicationOffset;
Expand Down Expand Up @@ -210,8 +211,9 @@ pub trait State: SizeOf + Send {
/// state
fn row_count(&self) -> usize;

/// Return a copy of all records. Panics if the state is only partially materialized.
fn cloned_records(&self) -> Vec<Vec<DfValue>>;
/// Return a handle that allows streaming a consistent snapshot of all records within this
/// state. Panics if the state is only partially materialized.
fn all_records(&self) -> AllRecords;

/// Evict up to `bytes` by randomly selected keys, returning a struct representing the index
/// chosen to evict from along with the keys evicted and the number of bytes evicted.
Expand Down Expand Up @@ -392,11 +394,11 @@ impl State for MaterializedNodeState {
}
}

fn cloned_records(&self) -> Vec<Vec<DfValue>> {
fn all_records(&self) -> AllRecords {
match self {
MaterializedNodeState::Memory(ms) => ms.cloned_records(),
MaterializedNodeState::Persistent(ps) => ps.cloned_records(),
MaterializedNodeState::PersistentReadHandle(rh) => rh.cloned_records(),
MaterializedNodeState::Memory(ms) => ms.all_records(),
MaterializedNodeState::Persistent(ps) => ps.all_records(),
MaterializedNodeState::PersistentReadHandle(rh) => rh.all_records(),
}
}

Expand Down Expand Up @@ -441,6 +443,50 @@ impl State for MaterializedNodeState {
}
}

/// Handle to all the records in a state.
///
/// This type exists as distinct from [`AllRecordsGuard`] to allow it to be sent between threads.
pub enum AllRecords {
/// Owned records taken from a [`MemoryState`]
Owned(Vec<Vec<DfValue>>),
/// Records streaming out of a [`PersistentState`]
Persistent(persistent_state::AllRecords),
}

/// RAII guard providing the ability to stream all the records out of a state
pub enum AllRecordsGuard<'a> {
/// Owned records taken from a [`MemoryState`]
Owned(vec::IntoIter<Vec<DfValue>>),
/// Records streaming out of a [`PersistentState`]
Persistent(persistent_state::AllRecordsGuard<'a>),
}

impl AllRecords {
/// Construct an RAII guard providing the ability to stream all the records out of a state
pub fn read(&mut self) -> AllRecordsGuard<'_> {
match self {
AllRecords::Owned(i) => AllRecordsGuard::Owned(std::mem::take(i).into_iter()),
AllRecords::Persistent(g) => AllRecordsGuard::Persistent(g.read()),
}
}
}

impl<'a> AllRecordsGuard<'a> {
/// Construct an iterator over all the records in a state.
///
/// Do not call this method multiple times on the same `guard` - doing so will yield an empty
/// result set.
pub fn iter<'b>(&'b mut self) -> impl Iterator<Item = Vec<DfValue>> + 'b
where
'a: 'b,
{
match self {
AllRecordsGuard::Owned(v) => Either::Left(v),
AllRecordsGuard::Persistent(g) => Either::Right(g.iter()),
}
}
}

#[derive(Debug, Hash, PartialEq, Eq)]
pub struct Row(Rc<Vec<DfValue>>);

Expand Down
8 changes: 4 additions & 4 deletions dataflow-state/src/memory_state.rs
Expand Up @@ -13,8 +13,8 @@ use tracing::trace;
use crate::keyed_state::KeyedState;
use crate::single_state::SingleState;
use crate::{
EvictBytesResult, EvictKeysResult, EvictRandomResult, LookupResult, PointKey, RangeKey,
RangeLookupResult, RecordResult, Row, Rows, State,
AllRecords, EvictBytesResult, EvictKeysResult, EvictRandomResult, LookupResult, PointKey,
RangeKey, RangeLookupResult, RecordResult, Row, Rows, State,
};

#[derive(Default)]
Expand Down Expand Up @@ -286,14 +286,14 @@ impl State for MemoryState {
self.state[index].lookup_range(key)
}

fn cloned_records(&self) -> Vec<Vec<DfValue>> {
fn all_records(&self) -> AllRecords {
#[allow(clippy::ptr_arg)]
fn fix(rs: &Rows) -> impl Iterator<Item = Vec<DfValue>> + '_ {
rs.iter().map(|r| Vec::clone(&**r))
}

assert!(!self.state[0].partial());
self.state[0].values().flat_map(fix).collect()
AllRecords::Owned(self.state[0].values().flat_map(fix).collect())
}

/// Evicts `bytes` by evicting random keys from the state. The key are first evicted from the
Expand Down
125 changes: 102 additions & 23 deletions dataflow-state/src/persistent_state.rs
Expand Up @@ -70,7 +70,7 @@ use std::str::FromStr;
use std::sync::Arc;
use std::thread::JoinHandle;
use std::time::{Duration, Instant};
use std::{fmt, fs};
use std::{fmt, fs, mem};

use bincode::Options;
use clap::ValueEnum;
Expand Down Expand Up @@ -758,8 +758,8 @@ impl State for PersistentState {
}
}

fn cloned_records(&self) -> Vec<Vec<DfValue>> {
self.db.cloned_records()
fn all_records(&self) -> crate::AllRecords {
self.read_handle().all_records()
}

/// Returns a *row* count estimate from RocksDB (not a key count as the function name would
Expand Down Expand Up @@ -1029,13 +1029,8 @@ impl State for PersistentStateHandle {
.unwrap() as usize
}

fn cloned_records(&self) -> Vec<Vec<DfValue>> {
let inner = self.inner();
let db = &inner.db;
let cf = db.cf_handle(&inner.indices[0].column_family).unwrap();
db.full_iterator_cf(cf, IteratorMode::Start)
.map(|res| deserialize_row(res.unwrap().1))
.collect()
fn all_records(&self) -> crate::AllRecords {
crate::AllRecords::Persistent(AllRecords(self.clone()))
}

fn evict_bytes(&mut self, _: usize) -> Option<crate::EvictBytesResult> {
Expand Down Expand Up @@ -1358,6 +1353,40 @@ fn compact_cf(table: &str, db: &DB, index: &PersistentIndex, opts: &CompactOptio
}
}

/// Handle to all the records in a persistent state.
///
/// This type exists as distinct from [`AllRecordsGuard`] to allow it to be sent between threads.
pub struct AllRecords(PersistentStateHandle);

/// RAII guard providing the ability to stream all the records out of a persistent state
pub struct AllRecordsGuard<'a>(RwLockReadGuard<'a, SharedState>);

impl AllRecords {
/// Construct an RAII guard providing the ability to stream all the records out of a persistent
/// state
pub fn read(&self) -> AllRecordsGuard<'_> {
AllRecordsGuard(self.0.inner())
}
}

impl<'a> AllRecordsGuard<'a> {
/// Construct an iterator over all the records in a persistent state
pub fn iter<'b>(&'a self) -> impl Iterator<Item = Vec<DfValue>> + 'b
where
'a: 'b,
{
let cf = self
.0
.db
.cf_handle(&self.0.indices[0].column_family)
.expect("Column families always exist for all indices");
self.0
.db
.full_iterator_cf(cf, IteratorMode::Start)
.map(|res| deserialize_row(res.unwrap().1))
}
}

impl PersistentState {
#[instrument(name = "Creating persistent state", skip_all, fields(name))]
pub fn new<C: AsRef<[usize]>, K: IntoIterator<Item = C>>(
Expand Down Expand Up @@ -2019,7 +2048,7 @@ impl SizeOf for PersistentStateHandle {
}

fn size_of(&self) -> u64 {
std::mem::size_of::<Self>() as u64
mem::size_of::<Self>() as u64
}

fn is_empty(&self) -> bool {
Expand All @@ -2034,7 +2063,7 @@ impl SizeOf for PersistentStateHandle {

impl SizeOf for PersistentState {
fn size_of(&self) -> u64 {
std::mem::size_of::<Self>() as u64
mem::size_of::<Self>() as u64
}

#[allow(clippy::panic)] // Can't return a result, panicking is the best we can do
Expand Down Expand Up @@ -2739,18 +2768,68 @@ mod tests {
assert!(count > 0 && count < rows.len() * 2);
}

#[test]
fn persistent_state_cloned_records() {
let mut state = setup_persistent("persistent_state_cloned_records", None);
let first: Vec<DfValue> = vec![10.into(), "Cat".into()];
let second: Vec<DfValue> = vec![20.into(), "Cat".into()];
state.add_key(Index::new(IndexType::HashMap, vec![0]), None);
state.add_key(Index::new(IndexType::HashMap, vec![1]), None);
state
.process_records(&mut vec![first.clone(), second.clone()].into(), None, None)
.unwrap();
mod all_records {
use pretty_assertions::assert_eq;

use super::*;

#[test]
fn simple_case() {
let mut state = setup_persistent("persistent_state_cloned_records", None);
let first: Vec<DfValue> = vec![10.into(), "Cat".into()];
let second: Vec<DfValue> = vec![20.into(), "Cat".into()];
state.add_key(Index::new(IndexType::HashMap, vec![0]), None);
state.add_key(Index::new(IndexType::HashMap, vec![1]), None);
state
.process_records(&mut vec![first.clone(), second.clone()].into(), None, None)
.unwrap();

let mut all_records = state.all_records();
assert_eq!(
all_records.read().iter().collect::<Vec<_>>(),
vec![first, second]
);
}

#[test]
fn wonky_drop_order() {
let mut state = setup_persistent("persistent_state_cloned_records", None);
let first: Vec<DfValue> = vec![10.into(), "Cat".into()];
let second: Vec<DfValue> = vec![20.into(), "Cat".into()];
state.add_key(Index::new(IndexType::HashMap, vec![0]), None);
state.add_key(Index::new(IndexType::HashMap, vec![1]), None);
state
.process_records(&mut vec![first.clone(), second.clone()].into(), None, None)
.unwrap();
let mut all_records = state.all_records();
drop(state);

assert_eq!(state.cloned_records(), vec![first, second]);
assert_eq!(
all_records.read().iter().collect::<Vec<_>>(),
vec![first, second]
);
}

#[test]
fn writes_during_iter() {
let mut state = setup_persistent("persistent_state_cloned_records", None);
let first: Vec<DfValue> = vec![10.into(), "Cat".into()];
let second: Vec<DfValue> = vec![20.into(), "Cat".into()];
state.add_key(Index::new(IndexType::HashMap, vec![0]), None);
state.add_key(Index::new(IndexType::HashMap, vec![1]), None);
state
.process_records(&mut vec![first.clone(), second.clone()].into(), None, None)
.unwrap();
let mut all_records = state.all_records();
let mut guard = all_records.read();
let iter = guard.iter();
state
.process_records(&mut vec![first.clone(), second.clone()].into(), None, None)
.unwrap();
drop(state);

assert_eq!(iter.collect::<Vec<_>>(), vec![first, second]);
}
}

#[test]
Expand Down

0 comments on commit de3231f

Please sign in to comment.