From be75e554aa14774804bc949b2d77f424b74e23fd Mon Sep 17 00:00:00 2001 From: Koute Date: Fri, 10 Dec 2021 20:31:04 +0900 Subject: [PATCH] Automatically unsubscribe storage listeners when they're dropped (RCP node memory leak fix) (#10454) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Automatically unsubscribe storage listeners when they're dropped * Fix tests' compilation in `sc-client-api` * Add an extra test * Align to review comments; cleanups * Update client/api/src/notifications.rs * FMT Co-authored-by: Bastian Köcher Co-authored-by: Bastian Köcher --- client/api/src/notifications.rs | 229 ++++++++++++++++++++++++-------- 1 file changed, 174 insertions(+), 55 deletions(-) diff --git a/client/api/src/notifications.rs b/client/api/src/notifications.rs index 9565fd673acf4..88a709beb1f0b 100644 --- a/client/api/src/notifications.rs +++ b/client/api/src/notifications.rs @@ -20,13 +20,20 @@ use std::{ collections::{HashMap, HashSet}, - sync::Arc, + pin::Pin, + sync::{Arc, Weak}, + task::Poll, }; use fnv::{FnvHashMap, FnvHashSet}; +use futures::Stream; +use parking_lot::Mutex; use prometheus_endpoint::{register, CounterVec, Opts, Registry, U64}; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; -use sp_core::storage::{StorageData, StorageKey}; +use sp_core::{ + hexdisplay::HexDisplay, + storage::{StorageData, StorageKey}, +}; use sp_runtime::traits::Block as BlockT; /// Storage change set @@ -34,8 +41,8 @@ use sp_runtime::traits::Block as BlockT; pub struct StorageChangeSet { changes: Arc)>>, child_changes: Arc)>)>>, - filter: Option>, - child_filters: Option>>>, + filter: Keys, + child_filters: ChildKeys, } impl StorageChangeSet { @@ -74,7 +81,46 @@ impl StorageChangeSet { } /// Type that implements `futures::Stream` of storage change events. -pub type StorageEventStream = TracingUnboundedReceiver<(H, StorageChangeSet)>; +pub struct StorageEventStream { + rx: TracingUnboundedReceiver<(H, StorageChangeSet)>, + storage_notifications: Weak>>, + was_triggered: bool, + id: u64, +} + +impl Stream for StorageEventStream { + type Item = as Stream>::Item; + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + let result = Stream::poll_next(Pin::new(&mut self.rx), cx); + if result.is_ready() { + self.was_triggered = true; + } + result + } +} + +impl Drop for StorageEventStream { + fn drop(&mut self) { + if let Some(storage_notifications) = self.storage_notifications.upgrade() { + if let Some((keys, child_keys)) = + storage_notifications.lock().remove_subscriber(self.id) + { + if !self.was_triggered { + log::trace!( + target: "storage_notifications", + "Listener was never triggered: id={}, keys={:?}, child_keys={:?}", + self.id, + PrintKeys(&keys), + PrintChildKeys(&child_keys), + ); + } + } + } + } +} type SubscriberId = u64; @@ -82,7 +128,13 @@ type SubscribersGauge = CounterVec; /// Manages storage listeners. #[derive(Debug)] -pub struct StorageNotifications { +pub struct StorageNotifications(Arc>>); + +type Keys = Option>; +type ChildKeys = Option>>>; + +#[derive(Debug)] +struct StorageNotificationsImpl { metrics: Option, next_id: SubscriberId, wildcard_listeners: FnvHashSet, @@ -93,15 +145,17 @@ pub struct StorageNotifications { >, sinks: FnvHashMap< SubscriberId, - ( - TracingUnboundedSender<(Block::Hash, StorageChangeSet)>, - Option>, - Option>>>, - ), + (TracingUnboundedSender<(Hash, StorageChangeSet)>, Keys, ChildKeys), >, } impl Default for StorageNotifications { + fn default() -> Self { + Self(Default::default()) + } +} + +impl Default for StorageNotificationsImpl { fn default() -> Self { Self { metrics: Default::default(), @@ -114,10 +168,68 @@ impl Default for StorageNotifications { } } +struct PrintKeys<'a>(&'a Keys); +impl<'a> std::fmt::Debug for PrintKeys<'a> { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + if let Some(keys) = self.0 { + fmt.debug_list().entries(keys.iter().map(HexDisplay::from)).finish() + } else { + write!(fmt, "None") + } + } +} + +struct PrintChildKeys<'a>(&'a ChildKeys); +impl<'a> std::fmt::Debug for PrintChildKeys<'a> { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + if let Some(map) = self.0 { + fmt.debug_map() + .entries(map.iter().map(|(key, values)| (HexDisplay::from(key), PrintKeys(values)))) + .finish() + } else { + write!(fmt, "None") + } + } +} + impl StorageNotifications { /// Initialize a new StorageNotifications /// optionally pass a prometheus registry to send subscriber metrics to pub fn new(prometheus_registry: Option) -> Self { + StorageNotifications(Arc::new(Mutex::new(StorageNotificationsImpl::new( + prometheus_registry, + )))) + } + + /// Trigger notification to all listeners. + /// + /// Note the changes are going to be filtered by listener's filter key. + /// In fact no event might be sent if clients are not interested in the changes. + pub fn trigger( + &mut self, + hash: &Block::Hash, + changeset: impl Iterator, Option>)>, + child_changeset: impl Iterator< + Item = (Vec, impl Iterator, Option>)>), + >, + ) { + self.0.lock().trigger(hash, changeset, child_changeset); + } + + /// Start listening for particular storage keys. + pub fn listen( + &mut self, + filter_keys: Option<&[StorageKey]>, + filter_child_keys: Option<&[(StorageKey, Option>)]>, + ) -> StorageEventStream { + let (id, rx) = self.0.lock().listen(filter_keys, filter_child_keys); + let storage_notifications = Arc::downgrade(&self.0); + StorageEventStream { rx, storage_notifications, was_triggered: false, id } + } +} + +impl StorageNotificationsImpl { + fn new(prometheus_registry: Option) -> Self { let metrics = prometheus_registry.and_then(|r| { CounterVec::new( Opts::new( @@ -130,7 +242,7 @@ impl StorageNotifications { .ok() }); - StorageNotifications { + StorageNotificationsImpl { metrics, next_id: Default::default(), wildcard_listeners: Default::default(), @@ -139,18 +251,16 @@ impl StorageNotifications { sinks: Default::default(), } } - /// Trigger notification to all listeners. - /// - /// Note the changes are going to be filtered by listener's filter key. - /// In fact no event might be sent if clients are not interested in the changes. - pub fn trigger( + fn trigger( &mut self, - hash: &Block::Hash, + hash: &Hash, changeset: impl Iterator, Option>)>, child_changeset: impl Iterator< Item = (Vec, impl Iterator, Option>)>), >, - ) { + ) where + Hash: Clone, + { let has_wildcard = !self.wildcard_listeners.is_empty(); // early exit if no listeners @@ -244,7 +354,7 @@ impl StorageNotifications { fn remove_subscriber_from( subscriber: &SubscriberId, - filters: &Option>, + filters: &Keys, listeners: &mut HashMap>, wildcards: &mut FnvHashSet, ) { @@ -269,34 +379,35 @@ impl StorageNotifications { } } - fn remove_subscriber(&mut self, subscriber: SubscriberId) { - if let Some((_, filters, child_filters)) = self.sinks.remove(&subscriber) { - Self::remove_subscriber_from( - &subscriber, - &filters, - &mut self.listeners, - &mut self.wildcard_listeners, - ); - if let Some(child_filters) = child_filters.as_ref() { - for (c_key, filters) in child_filters { - if let Some((listeners, wildcards)) = self.child_listeners.get_mut(&c_key) { - Self::remove_subscriber_from( - &subscriber, - &filters, - &mut *listeners, - &mut *wildcards, - ); - - if listeners.is_empty() && wildcards.is_empty() { - self.child_listeners.remove(&c_key); - } + fn remove_subscriber(&mut self, subscriber: SubscriberId) -> Option<(Keys, ChildKeys)> { + let (_, filters, child_filters) = self.sinks.remove(&subscriber)?; + Self::remove_subscriber_from( + &subscriber, + &filters, + &mut self.listeners, + &mut self.wildcard_listeners, + ); + if let Some(child_filters) = child_filters.as_ref() { + for (c_key, filters) in child_filters { + if let Some((listeners, wildcards)) = self.child_listeners.get_mut(&c_key) { + Self::remove_subscriber_from( + &subscriber, + &filters, + &mut *listeners, + &mut *wildcards, + ); + + if listeners.is_empty() && wildcards.is_empty() { + self.child_listeners.remove(&c_key); } } } - if let Some(m) = self.metrics.as_ref() { - m.with_label_values(&[&"removed"]).inc(); - } } + if let Some(m) = self.metrics.as_ref() { + m.with_label_values(&[&"removed"]).inc(); + } + + Some((filters, child_filters)) } fn listen_from( @@ -304,7 +415,7 @@ impl StorageNotifications { filter_keys: &Option>, listeners: &mut HashMap>, wildcards: &mut FnvHashSet, - ) -> Option> { + ) -> Keys { match filter_keys { None => { wildcards.insert(current_id); @@ -325,12 +436,11 @@ impl StorageNotifications { } } - /// Start listening for particular storage keys. - pub fn listen( + fn listen( &mut self, filter_keys: Option<&[StorageKey]>, filter_child_keys: Option<&[(StorageKey, Option>)]>, - ) -> StorageEventStream { + ) -> (u64, TracingUnboundedReceiver<(Hash, StorageChangeSet)>) { self.next_id += 1; let current_id = self.next_id; @@ -364,7 +474,7 @@ impl StorageNotifications { m.with_label_values(&[&"added"]).inc(); } - rx + (current_id, rx) } } @@ -517,9 +627,9 @@ mod tests { let _recv3 = futures::executor::block_on_stream(notifications.listen(None, None)); let _recv4 = futures::executor::block_on_stream(notifications.listen(None, Some(&child_filter))); - assert_eq!(notifications.listeners.len(), 2); - assert_eq!(notifications.wildcard_listeners.len(), 2); - assert_eq!(notifications.child_listeners.len(), 1); + assert_eq!(notifications.0.lock().listeners.len(), 2); + assert_eq!(notifications.0.lock().wildcard_listeners.len(), 2); + assert_eq!(notifications.0.lock().child_listeners.len(), 1); } // when @@ -528,9 +638,18 @@ mod tests { notifications.trigger(&Hash::from_low_u64_be(1), changeset.into_iter(), c_changeset); // then - assert_eq!(notifications.listeners.len(), 0); - assert_eq!(notifications.wildcard_listeners.len(), 0); - assert_eq!(notifications.child_listeners.len(), 0); + assert_eq!(notifications.0.lock().listeners.len(), 0); + assert_eq!(notifications.0.lock().wildcard_listeners.len(), 0); + assert_eq!(notifications.0.lock().child_listeners.len(), 0); + } + + #[test] + fn should_cleanup_subscriber_if_stream_is_dropped() { + let mut notifications = StorageNotifications::::default(); + let stream = notifications.listen(None, None); + assert_eq!(notifications.0.lock().sinks.len(), 1); + std::mem::drop(stream); + assert_eq!(notifications.0.lock().sinks.len(), 0); } #[test]