diff --git a/src/stream/src/common/cache/state_cache/mod.rs b/src/stream/src/common/cache/state_cache/mod.rs new file mode 100644 index 000000000000..1e393c258990 --- /dev/null +++ b/src/stream/src/common/cache/state_cache/mod.rs @@ -0,0 +1,69 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub use ordered::*; +use risingwave_common::array::Op; +pub use top_n::*; + +mod ordered; +mod top_n; + +/// A common interface for state table cache. +pub trait StateCache { + type Key: Ord; + type Value; + + /// Type of state cache filler, for syncing the cache with the state table. + type Filler<'a>: StateCacheFiller + 'a + where + Self: 'a; + + /// Check if the cache is synced with the state table. + fn is_synced(&self) -> bool; + + /// Begin syncing the cache with the state table. + fn begin_syncing(&mut self) -> Self::Filler<'_>; + + /// Insert an entry into the cache. Should not break cache validity. + fn insert(&mut self, key: Self::Key, value: Self::Value) -> Option; + + /// Delete an entry from the cache. Should not break cache validity. + fn delete(&mut self, key: &Self::Key) -> Option; + + /// Apply a batch of operations to the cache. Should not break cache validity. + fn apply_batch(&mut self, batch: impl IntoIterator); + + /// Clear the cache. + fn clear(&mut self); + + /// Iterate over the values in the cache. + fn values(&self) -> impl Iterator; + + /// Get the reference of first key-value pair in the cache. + fn first_key_value(&self) -> Option<(&Self::Key, &Self::Value)>; +} + +pub trait StateCacheFiller { + type Key: Ord; + type Value; + + /// Get the capacity of the cache. + fn capacity(&self) -> Option; + + /// Insert an entry into the cache without cache validity check. + fn insert_unchecked(&mut self, key: Self::Key, value: Self::Value); + + /// Finish syncing the cache with the state table. This should mark the cache as synced. + fn finish(self); +} diff --git a/src/stream/src/common/cache/state_cache/ordered.rs b/src/stream/src/common/cache/state_cache/ordered.rs new file mode 100644 index 000000000000..78bbea6a9619 --- /dev/null +++ b/src/stream/src/common/cache/state_cache/ordered.rs @@ -0,0 +1,120 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; + +use risingwave_common::array::Op; + +use super::{StateCache, StateCacheFiller}; + +/// An implementation of [`StateCache`] that uses a [`BTreeMap`] as the underlying cache, with no +/// capacity limit. +pub struct OrderedStateCache { + cache: BTreeMap, + synced: bool, +} + +impl OrderedStateCache { + pub fn new() -> Self { + Self { + cache: BTreeMap::new(), + synced: false, + } + } +} + +impl Default for OrderedStateCache { + fn default() -> Self { + Self::new() + } +} + +impl StateCache for OrderedStateCache { + type Filler<'a> = &'a mut Self where Self: 'a; + type Key = K; + type Value = V; + + fn is_synced(&self) -> bool { + self.synced + } + + fn begin_syncing(&mut self) -> Self::Filler<'_> { + self.synced = false; + self.cache.clear(); + self + } + + fn insert(&mut self, key: Self::Key, value: Self::Value) -> Option { + if self.synced { + self.cache.insert(key, value) + } else { + None + } + } + + fn delete(&mut self, key: &Self::Key) -> Option { + if self.synced { + self.cache.remove(key) + } else { + None + } + } + + fn apply_batch(&mut self, batch: impl IntoIterator) { + if self.synced { + for (op, key, value) in batch { + match op { + Op::Insert | Op::UpdateInsert => { + self.cache.insert(key, value); + } + Op::Delete | Op::UpdateDelete => { + self.cache.remove(&key); + } + } + } + } + } + + fn clear(&mut self) { + self.cache.clear(); + self.synced = false; + } + + fn values(&self) -> impl Iterator { + assert!(self.synced); + self.cache.values() + } + + fn first_key_value(&self) -> Option<(&Self::Key, &Self::Value)> { + assert!(self.synced); + self.cache.first_key_value() + } +} + +impl StateCacheFiller for &mut OrderedStateCache { + type Key = K; + type Value = V; + + fn capacity(&self) -> Option { + None + } + + fn insert_unchecked(&mut self, key: Self::Key, value: Self::Value) { + self.cache.insert(key, value); + } + + fn finish(self) { + self.synced = true; + } +} diff --git a/src/stream/src/common/cache/state_cache.rs b/src/stream/src/common/cache/state_cache/top_n.rs similarity index 53% rename from src/stream/src/common/cache/state_cache.rs rename to src/stream/src/common/cache/state_cache/top_n.rs index 8eaf35b503c6..6829341f1d07 100644 --- a/src/stream/src/common/cache/state_cache.rs +++ b/src/stream/src/common/cache/state_cache/top_n.rs @@ -12,60 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; - use risingwave_common::array::Op; -use super::TopNCache; - -/// A common interface for state table cache. -pub trait StateCache { - type Key: Ord; - type Value; - - /// Type of state cache filler, for syncing the cache with the state table. - type Filler<'a>: StateCacheFiller + 'a - where - Self: 'a; - - /// Check if the cache is synced with the state table. - fn is_synced(&self) -> bool; - - /// Begin syncing the cache with the state table. - fn begin_syncing(&mut self) -> Self::Filler<'_>; - - /// Insert an entry into the cache. Should not break cache validity. - fn insert(&mut self, key: Self::Key, value: Self::Value) -> Option; - - /// Delete an entry from the cache. Should not break cache validity. - fn delete(&mut self, key: &Self::Key) -> Option; - - /// Apply a batch of operations to the cache. Should not break cache validity. - fn apply_batch(&mut self, batch: impl IntoIterator); - - /// Clear the cache. - fn clear(&mut self); - - /// Iterate over the values in the cache. - fn values(&self) -> impl Iterator; - - /// Get the reference of first key-value pair in the cache. - fn first_key_value(&self) -> Option<(&Self::Key, &Self::Value)>; -} - -pub trait StateCacheFiller { - type Key: Ord; - type Value; - - /// Get the capacity of the cache. - fn capacity(&self) -> Option; - - /// Insert an entry into the cache without cache validity check. - fn insert_unchecked(&mut self, key: Self::Key, value: Self::Value); - - /// Finish syncing the cache with the state table. This should mark the cache as synced. - fn finish(self); -} +use super::{StateCache, StateCacheFiller}; +use crate::common::cache::TopNCache; /// An implementation of [`StateCache`] that uses a [`TopNCache`] as the underlying cache, with /// limited capacity. @@ -212,104 +162,3 @@ impl StateCacheFiller for &mut TopNStateCache { self.synced = true; } } - -/// An implementation of [`StateCache`] that uses a [`BTreeMap`] as the underlying cache, with no -/// capacity limit. -pub struct OrderedStateCache { - cache: BTreeMap, - synced: bool, -} - -impl OrderedStateCache { - pub fn new() -> Self { - Self { - cache: BTreeMap::new(), - synced: false, - } - } -} - -impl Default for OrderedStateCache { - fn default() -> Self { - Self::new() - } -} - -impl StateCache for OrderedStateCache { - type Filler<'a> = &'a mut Self where Self: 'a; - type Key = K; - type Value = V; - - fn is_synced(&self) -> bool { - self.synced - } - - fn begin_syncing(&mut self) -> Self::Filler<'_> { - self.synced = false; - self.cache.clear(); - self - } - - fn insert(&mut self, key: Self::Key, value: Self::Value) -> Option { - if self.synced { - self.cache.insert(key, value) - } else { - None - } - } - - fn delete(&mut self, key: &Self::Key) -> Option { - if self.synced { - self.cache.remove(key) - } else { - None - } - } - - fn apply_batch(&mut self, batch: impl IntoIterator) { - if self.synced { - for (op, key, value) in batch { - match op { - Op::Insert | Op::UpdateInsert => { - self.cache.insert(key, value); - } - Op::Delete | Op::UpdateDelete => { - self.cache.remove(&key); - } - } - } - } - } - - fn clear(&mut self) { - self.cache.clear(); - self.synced = false; - } - - fn values(&self) -> impl Iterator { - assert!(self.synced); - self.cache.values() - } - - fn first_key_value(&self) -> Option<(&Self::Key, &Self::Value)> { - assert!(self.synced); - self.cache.first_key_value() - } -} - -impl StateCacheFiller for &mut OrderedStateCache { - type Key = K; - type Value = V; - - fn capacity(&self) -> Option { - None - } - - fn insert_unchecked(&mut self, key: Self::Key, value: Self::Value) { - self.cache.insert(key, value); - } - - fn finish(self) { - self.synced = true; - } -}