From 5ebf21673e7eab0f85f0159599488dad7735e959 Mon Sep 17 00:00:00 2001 From: Jan Bujak Date: Thu, 9 Dec 2021 08:49:38 +0000 Subject: [PATCH 1/6] Automatically unsubscribe storage listeners when they're dropped --- client/api/src/notifications.rs | 154 +++++++++++++++++++++++++++++--- 1 file changed, 141 insertions(+), 13 deletions(-) diff --git a/client/api/src/notifications.rs b/client/api/src/notifications.rs index 1346afd5e54d2..a760b9e45f9ff 100644 --- a/client/api/src/notifications.rs +++ b/client/api/src/notifications.rs @@ -20,13 +20,20 @@ use std::{ collections::{HashMap, HashSet}, + pin::Pin, sync::Arc, + task::Poll, }; use fnv::{FnvHashMap, FnvHashSet}; +use futures::Stream; +use parking_lot::Mutex; use prometheus_endpoint::{register, CounterVec, Opts, Registry, U64}; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; -use sp_core::storage::{StorageData, StorageKey}; +use sp_core::{ + hexdisplay::HexDisplay, + storage::{StorageData, StorageKey}, +}; use sp_runtime::traits::Block as BlockT; /// Storage change set @@ -74,7 +81,33 @@ impl StorageChangeSet { } /// Type that implements `futures::Stream` of storage change events. -pub type StorageEventStream = TracingUnboundedReceiver<(H, StorageChangeSet)>; +pub struct StorageEventStream { + rx: TracingUnboundedReceiver<(H, StorageChangeSet)>, + unsubscribe: Option>, + was_triggered: bool, +} + +impl Stream for StorageEventStream { + type Item = as Stream>::Item; + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + let result = Stream::poll_next(Pin::new(&mut self.rx), cx); + if matches!(result, Poll::Ready(..)) { + self.was_triggered = true; + } + result + } +} + +impl Drop for StorageEventStream { + fn drop(&mut self) { + if let Some(unsubscribe) = self.unsubscribe.take() { + unsubscribe(self.was_triggered); + } + } +} type SubscriberId = u64; @@ -82,7 +115,10 @@ type SubscribersGauge = CounterVec; /// Manages storage listeners. #[derive(Debug)] -pub struct StorageNotifications { +pub struct StorageNotifications(Arc>>); + +#[derive(Debug)] +struct StorageNotificationsImpl { metrics: Option, next_id: SubscriberId, wildcard_listeners: FnvHashSet, @@ -102,6 +138,12 @@ pub struct StorageNotifications { } impl Default for StorageNotifications { + fn default() -> Self { + Self(Default::default()) + } +} + +impl Default for StorageNotificationsImpl { fn default() -> Self { Self { metrics: Default::default(), @@ -114,10 +156,101 @@ impl Default for StorageNotifications { } } +struct PrintKeys<'a>(&'a Option>); +impl<'a> std::fmt::Display for PrintKeys<'a> { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + if let Some(keys) = self.0.as_ref() { + write!(fmt, "[")?; + let mut is_first = true; + for key in keys { + if is_first { + is_first = false; + } else { + write!(fmt, ", ")?; + } + write!(fmt, "{}", HexDisplay::from(key))?; + } + write!(fmt, "]") + } else { + write!(fmt, "None") + } + } +} + +struct PrintChildKeys<'a>(&'a Option>>>); +impl<'a> std::fmt::Display for PrintChildKeys<'a> { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + if let Some(child_keys) = self.0.as_ref() { + write!(fmt, "{{")?; + let mut is_first = true; + for (key, values) in child_keys { + if is_first { + is_first = false; + } else { + write!(fmt, ", ")?; + } + write!(fmt, "{}={}", HexDisplay::from(key), PrintKeys(values))?; + } + write!(fmt, "}}") + } else { + write!(fmt, "None") + } + } +} + impl StorageNotifications { /// Initialize a new StorageNotifications /// optionally pass a prometheus registry to send subscriber metrics to pub fn new(prometheus_registry: Option) -> Self { + StorageNotifications(Arc::new(Mutex::new(StorageNotificationsImpl::new( + prometheus_registry, + )))) + } + + /// Trigger notification to all listeners. + /// + /// Note the changes are going to be filtered by listener's filter key. + /// In fact no event might be sent if clients are not interested in the changes. + pub fn trigger( + &mut self, + hash: &Block::Hash, + changeset: impl Iterator, Option>)>, + child_changeset: impl Iterator< + Item = (Vec, impl Iterator, Option>)>), + >, + ) { + self.0.lock().trigger(hash, changeset, child_changeset); + } + + /// Start listening for particular storage keys. + pub fn listen( + &mut self, + filter_keys: Option<&[StorageKey]>, + filter_child_keys: Option<&[(StorageKey, Option>)]>, + ) -> StorageEventStream { + let (id, rx) = self.0.lock().listen(filter_keys, filter_child_keys); + let storage_notifications = Arc::downgrade(&self.0); + StorageEventStream { + rx, + unsubscribe: Some(Box::new(move |was_triggered| { + if let Some(storage_notifications) = storage_notifications.upgrade() { + if !was_triggered { + if let Some((_, keys, child_keys)) = + storage_notifications.lock().sinks.get(&id) + { + log::trace!(target: "storage_notifications", "Listener was never triggered: id={}, keys={}, child_keys={}", id, PrintKeys(keys), PrintChildKeys(child_keys)); + } + } + storage_notifications.lock().remove_subscriber(id); + } + })), + was_triggered: false, + } + } +} + +impl StorageNotificationsImpl { + fn new(prometheus_registry: Option) -> Self { let metrics = prometheus_registry.and_then(|r| { CounterVec::new( Opts::new( @@ -130,7 +263,7 @@ impl StorageNotifications { .ok() }); - StorageNotifications { + StorageNotificationsImpl { metrics, next_id: Default::default(), wildcard_listeners: Default::default(), @@ -139,11 +272,7 @@ impl StorageNotifications { sinks: Default::default(), } } - /// Trigger notification to all listeners. - /// - /// Note the changes are going to be filtered by listener's filter key. - /// In fact no event might be sent if clients are not interested in the changes. - pub fn trigger( + fn trigger( &mut self, hash: &Block::Hash, changeset: impl Iterator, Option>)>, @@ -325,12 +454,11 @@ impl StorageNotifications { } } - /// Start listening for particular storage keys. - pub fn listen( + fn listen( &mut self, filter_keys: Option<&[StorageKey]>, filter_child_keys: Option<&[(StorageKey, Option>)]>, - ) -> StorageEventStream { + ) -> (u64, TracingUnboundedReceiver<(Block::Hash, StorageChangeSet)>) { self.next_id += 1; let current_id = self.next_id; @@ -364,7 +492,7 @@ impl StorageNotifications { m.with_label_values(&[&"added"]).inc(); } - rx + (current_id, rx) } } From 50d3ec2d1cc7371c1017a5cc5c1707aa5100c4f2 Mon Sep 17 00:00:00 2001 From: Jan Bujak Date: Thu, 9 Dec 2021 09:30:20 +0000 Subject: [PATCH 2/6] Fix tests' compilation in `sc-client-api` --- client/api/src/notifications.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/client/api/src/notifications.rs b/client/api/src/notifications.rs index a760b9e45f9ff..db23da4c95d16 100644 --- a/client/api/src/notifications.rs +++ b/client/api/src/notifications.rs @@ -645,9 +645,9 @@ mod tests { let _recv3 = futures::executor::block_on_stream(notifications.listen(None, None)); let _recv4 = futures::executor::block_on_stream(notifications.listen(None, Some(&child_filter))); - assert_eq!(notifications.listeners.len(), 2); - assert_eq!(notifications.wildcard_listeners.len(), 2); - assert_eq!(notifications.child_listeners.len(), 1); + assert_eq!(notifications.0.lock().listeners.len(), 2); + assert_eq!(notifications.0.lock().wildcard_listeners.len(), 2); + assert_eq!(notifications.0.lock().child_listeners.len(), 1); } // when @@ -656,9 +656,9 @@ mod tests { notifications.trigger(&Hash::from_low_u64_be(1), changeset.into_iter(), c_changeset); // then - assert_eq!(notifications.listeners.len(), 0); - assert_eq!(notifications.wildcard_listeners.len(), 0); - assert_eq!(notifications.child_listeners.len(), 0); + assert_eq!(notifications.0.lock().listeners.len(), 0); + assert_eq!(notifications.0.lock().wildcard_listeners.len(), 0); + assert_eq!(notifications.0.lock().child_listeners.len(), 0); } #[test] From 1ab34963d74abd5057c5e07300ebcf2827dbe59e Mon Sep 17 00:00:00 2001 From: Jan Bujak Date: Thu, 9 Dec 2021 09:34:39 +0000 Subject: [PATCH 3/6] Add an extra test --- client/api/src/notifications.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/client/api/src/notifications.rs b/client/api/src/notifications.rs index db23da4c95d16..30b1af1d9685f 100644 --- a/client/api/src/notifications.rs +++ b/client/api/src/notifications.rs @@ -661,6 +661,15 @@ mod tests { assert_eq!(notifications.0.lock().child_listeners.len(), 0); } + #[test] + fn should_cleanup_subscriber_if_stream_is_dropped() { + let mut notifications = StorageNotifications::::default(); + let stream = notifications.listen(None, None); + assert_eq!(notifications.0.lock().sinks.len(), 1); + std::mem::drop(stream); + assert_eq!(notifications.0.lock().sinks.len(), 0); + } + #[test] fn should_not_send_empty_notifications() { // given From 569608373d94067cf0c1bc774b0eca0d601c2605 Mon Sep 17 00:00:00 2001 From: Jan Bujak Date: Thu, 9 Dec 2021 12:14:34 +0000 Subject: [PATCH 4/6] Align to review comments; cleanups --- client/api/src/notifications.rs | 154 ++++++++++++++------------------ 1 file changed, 65 insertions(+), 89 deletions(-) diff --git a/client/api/src/notifications.rs b/client/api/src/notifications.rs index 30b1af1d9685f..eb12028b3df1f 100644 --- a/client/api/src/notifications.rs +++ b/client/api/src/notifications.rs @@ -21,7 +21,7 @@ use std::{ collections::{HashMap, HashSet}, pin::Pin, - sync::Arc, + sync::{Arc, Weak}, task::Poll, }; @@ -41,8 +41,8 @@ use sp_runtime::traits::Block as BlockT; pub struct StorageChangeSet { changes: Arc)>>, child_changes: Arc)>)>>, - filter: Option>, - child_filters: Option>>>, + filter: Keys, + child_filters: ChildKeys, } impl StorageChangeSet { @@ -83,8 +83,9 @@ impl StorageChangeSet { /// Type that implements `futures::Stream` of storage change events. pub struct StorageEventStream { rx: TracingUnboundedReceiver<(H, StorageChangeSet)>, - unsubscribe: Option>, + storage_notifications: Weak>>, was_triggered: bool, + id: u64, } impl Stream for StorageEventStream { @@ -94,7 +95,7 @@ impl Stream for StorageEventStream { cx: &mut std::task::Context<'_>, ) -> Poll> { let result = Stream::poll_next(Pin::new(&mut self.rx), cx); - if matches!(result, Poll::Ready(..)) { + if result.is_ready() { self.was_triggered = true; } result @@ -103,8 +104,14 @@ impl Stream for StorageEventStream { impl Drop for StorageEventStream { fn drop(&mut self) { - if let Some(unsubscribe) = self.unsubscribe.take() { - unsubscribe(self.was_triggered); + if let Some(storage_notifications) = self.storage_notifications.upgrade() { + if let Some((keys, child_keys)) = + storage_notifications.lock().remove_subscriber(self.id) + { + if !self.was_triggered { + log::trace!(target: "storage_notifications", "Listener was never triggered: id={}, keys={:?}, child_keys={:?}", self.id, PrintKeys(&keys), PrintChildKeys(&child_keys)); + } + } } } } @@ -115,10 +122,13 @@ type SubscribersGauge = CounterVec; /// Manages storage listeners. #[derive(Debug)] -pub struct StorageNotifications(Arc>>); +pub struct StorageNotifications(Arc>>); + +type Keys = Option>; +type ChildKeys = Option>>>; #[derive(Debug)] -struct StorageNotificationsImpl { +struct StorageNotificationsImpl { metrics: Option, next_id: SubscriberId, wildcard_listeners: FnvHashSet, @@ -129,11 +139,7 @@ struct StorageNotificationsImpl { >, sinks: FnvHashMap< SubscriberId, - ( - TracingUnboundedSender<(Block::Hash, StorageChangeSet)>, - Option>, - Option>>>, - ), + (TracingUnboundedSender<(Hash, StorageChangeSet)>, Keys, ChildKeys), >, } @@ -143,7 +149,7 @@ impl Default for StorageNotifications { } } -impl Default for StorageNotificationsImpl { +impl Default for StorageNotificationsImpl { fn default() -> Self { Self { metrics: Default::default(), @@ -156,42 +162,24 @@ impl Default for StorageNotificationsImpl { } } -struct PrintKeys<'a>(&'a Option>); -impl<'a> std::fmt::Display for PrintKeys<'a> { +struct PrintKeys<'a>(&'a Keys); +impl<'a> std::fmt::Debug for PrintKeys<'a> { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { - if let Some(keys) = self.0.as_ref() { - write!(fmt, "[")?; - let mut is_first = true; - for key in keys { - if is_first { - is_first = false; - } else { - write!(fmt, ", ")?; - } - write!(fmt, "{}", HexDisplay::from(key))?; - } - write!(fmt, "]") + if let Some(keys) = self.0 { + fmt.debug_list().entries(keys.iter().map(HexDisplay::from)).finish() } else { write!(fmt, "None") } } } -struct PrintChildKeys<'a>(&'a Option>>>); -impl<'a> std::fmt::Display for PrintChildKeys<'a> { +struct PrintChildKeys<'a>(&'a ChildKeys); +impl<'a> std::fmt::Debug for PrintChildKeys<'a> { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { - if let Some(child_keys) = self.0.as_ref() { - write!(fmt, "{{")?; - let mut is_first = true; - for (key, values) in child_keys { - if is_first { - is_first = false; - } else { - write!(fmt, ", ")?; - } - write!(fmt, "{}={}", HexDisplay::from(key), PrintKeys(values))?; - } - write!(fmt, "}}") + if let Some(map) = self.0 { + fmt.debug_map() + .entries(map.iter().map(|(key, values)| (HexDisplay::from(key), PrintKeys(values)))) + .finish() } else { write!(fmt, "None") } @@ -230,26 +218,11 @@ impl StorageNotifications { ) -> StorageEventStream { let (id, rx) = self.0.lock().listen(filter_keys, filter_child_keys); let storage_notifications = Arc::downgrade(&self.0); - StorageEventStream { - rx, - unsubscribe: Some(Box::new(move |was_triggered| { - if let Some(storage_notifications) = storage_notifications.upgrade() { - if !was_triggered { - if let Some((_, keys, child_keys)) = - storage_notifications.lock().sinks.get(&id) - { - log::trace!(target: "storage_notifications", "Listener was never triggered: id={}, keys={}, child_keys={}", id, PrintKeys(keys), PrintChildKeys(child_keys)); - } - } - storage_notifications.lock().remove_subscriber(id); - } - })), - was_triggered: false, - } + StorageEventStream { rx, storage_notifications, was_triggered: false, id } } } -impl StorageNotificationsImpl { +impl StorageNotificationsImpl { fn new(prometheus_registry: Option) -> Self { let metrics = prometheus_registry.and_then(|r| { CounterVec::new( @@ -274,12 +247,14 @@ impl StorageNotificationsImpl { } fn trigger( &mut self, - hash: &Block::Hash, + hash: &Hash, changeset: impl Iterator, Option>)>, child_changeset: impl Iterator< Item = (Vec, impl Iterator, Option>)>), >, - ) { + ) where + Hash: Clone, + { let has_wildcard = !self.wildcard_listeners.is_empty(); // early exit if no listeners @@ -373,7 +348,7 @@ impl StorageNotificationsImpl { fn remove_subscriber_from( subscriber: &SubscriberId, - filters: &Option>, + filters: &Keys, listeners: &mut HashMap>, wildcards: &mut FnvHashSet, ) { @@ -398,34 +373,35 @@ impl StorageNotificationsImpl { } } - fn remove_subscriber(&mut self, subscriber: SubscriberId) { - if let Some((_, filters, child_filters)) = self.sinks.remove(&subscriber) { - Self::remove_subscriber_from( - &subscriber, - &filters, - &mut self.listeners, - &mut self.wildcard_listeners, - ); - if let Some(child_filters) = child_filters.as_ref() { - for (c_key, filters) in child_filters { - if let Some((listeners, wildcards)) = self.child_listeners.get_mut(&c_key) { - Self::remove_subscriber_from( - &subscriber, - &filters, - &mut *listeners, - &mut *wildcards, - ); - - if listeners.is_empty() && wildcards.is_empty() { - self.child_listeners.remove(&c_key); - } + fn remove_subscriber(&mut self, subscriber: SubscriberId) -> Option<(Keys, ChildKeys)> { + let (_, filters, child_filters) = self.sinks.remove(&subscriber)?; + Self::remove_subscriber_from( + &subscriber, + &filters, + &mut self.listeners, + &mut self.wildcard_listeners, + ); + if let Some(child_filters) = child_filters.as_ref() { + for (c_key, filters) in child_filters { + if let Some((listeners, wildcards)) = self.child_listeners.get_mut(&c_key) { + Self::remove_subscriber_from( + &subscriber, + &filters, + &mut *listeners, + &mut *wildcards, + ); + + if listeners.is_empty() && wildcards.is_empty() { + self.child_listeners.remove(&c_key); } } } - if let Some(m) = self.metrics.as_ref() { - m.with_label_values(&[&"removed"]).inc(); - } } + if let Some(m) = self.metrics.as_ref() { + m.with_label_values(&[&"removed"]).inc(); + } + + Some((filters, child_filters)) } fn listen_from( @@ -433,7 +409,7 @@ impl StorageNotificationsImpl { filter_keys: &Option>, listeners: &mut HashMap>, wildcards: &mut FnvHashSet, - ) -> Option> { + ) -> Keys { match filter_keys { None => { wildcards.insert(current_id); @@ -458,7 +434,7 @@ impl StorageNotificationsImpl { &mut self, filter_keys: Option<&[StorageKey]>, filter_child_keys: Option<&[(StorageKey, Option>)]>, - ) -> (u64, TracingUnboundedReceiver<(Block::Hash, StorageChangeSet)>) { + ) -> (u64, TracingUnboundedReceiver<(Hash, StorageChangeSet)>) { self.next_id += 1; let current_id = self.next_id; From 9fd68b278bada805edf20bee5df2f26406628921 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Thu, 9 Dec 2021 15:41:22 +0100 Subject: [PATCH 5/6] Update client/api/src/notifications.rs --- client/api/src/notifications.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/client/api/src/notifications.rs b/client/api/src/notifications.rs index eb12028b3df1f..e15dca456b0cf 100644 --- a/client/api/src/notifications.rs +++ b/client/api/src/notifications.rs @@ -109,7 +109,13 @@ impl Drop for StorageEventStream { storage_notifications.lock().remove_subscriber(self.id) { if !self.was_triggered { - log::trace!(target: "storage_notifications", "Listener was never triggered: id={}, keys={:?}, child_keys={:?}", self.id, PrintKeys(&keys), PrintChildKeys(&child_keys)); + log::trace!( + target: "storage_notifications", + "Listener was never triggered: id={}, keys={:?}, child_keys={:?}", + self.id, + PrintKeys(&keys), + PrintChildKeys(&child_keys), + ); } } } From 5e618fb8430d7a8439753db4961951c17c25d9f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Thu, 9 Dec 2021 15:59:21 +0100 Subject: [PATCH 6/6] FMT --- client/api/src/notifications.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/client/api/src/notifications.rs b/client/api/src/notifications.rs index e15dca456b0cf..25a0ea99af361 100644 --- a/client/api/src/notifications.rs +++ b/client/api/src/notifications.rs @@ -110,10 +110,10 @@ impl Drop for StorageEventStream { { if !self.was_triggered { log::trace!( - target: "storage_notifications", - "Listener was never triggered: id={}, keys={:?}, child_keys={:?}", - self.id, - PrintKeys(&keys), + target: "storage_notifications", + "Listener was never triggered: id={}, keys={:?}, child_keys={:?}", + self.id, + PrintKeys(&keys), PrintChildKeys(&child_keys), ); }