Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Automatically unsubscribe storage listeners when they're dropped (RCP node memory leak fix) #10454

Merged
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 156 additions & 19 deletions client/api/src/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,20 @@

use std::{
collections::{HashMap, HashSet},
pin::Pin,
sync::Arc,
task::Poll,
};

use fnv::{FnvHashMap, FnvHashSet};
use futures::Stream;
use parking_lot::Mutex;
use prometheus_endpoint::{register, CounterVec, Opts, Registry, U64};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_core::storage::{StorageData, StorageKey};
use sp_core::{
hexdisplay::HexDisplay,
storage::{StorageData, StorageKey},
};
use sp_runtime::traits::Block as BlockT;

/// Storage change set
Expand Down Expand Up @@ -74,15 +81,44 @@ impl StorageChangeSet {
}

/// Type that implements `futures::Stream` of storage change events.
pub type StorageEventStream<H> = TracingUnboundedReceiver<(H, StorageChangeSet)>;
pub struct StorageEventStream<H> {
rx: TracingUnboundedReceiver<(H, StorageChangeSet)>,
unsubscribe: Option<Box<dyn FnOnce(bool) + Send + Sync>>,
was_triggered: bool,
}

impl<H> Stream for StorageEventStream<H> {
type Item = <TracingUnboundedReceiver<(H, StorageChangeSet)> as Stream>::Item;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let result = Stream::poll_next(Pin::new(&mut self.rx), cx);
if matches!(result, Poll::Ready(..)) {
self.was_triggered = true;
}
koute marked this conversation as resolved.
Show resolved Hide resolved
result
}
}

impl<H> Drop for StorageEventStream<H> {
fn drop(&mut self) {
if let Some(unsubscribe) = self.unsubscribe.take() {
unsubscribe(self.was_triggered);
}
}
}

type SubscriberId = u64;

type SubscribersGauge = CounterVec<U64>;

/// Manages storage listeners.
#[derive(Debug)]
pub struct StorageNotifications<Block: BlockT> {
pub struct StorageNotifications<Block: BlockT>(Arc<Mutex<StorageNotificationsImpl<Block>>>);

#[derive(Debug)]
struct StorageNotificationsImpl<Block: BlockT> {
metrics: Option<SubscribersGauge>,
next_id: SubscriberId,
wildcard_listeners: FnvHashSet<SubscriberId>,
Expand All @@ -102,6 +138,12 @@ pub struct StorageNotifications<Block: BlockT> {
}

impl<Block: BlockT> Default for StorageNotifications<Block> {
fn default() -> Self {
Self(Default::default())
}
}

impl<Block: BlockT> Default for StorageNotificationsImpl<Block> {
fn default() -> Self {
Self {
metrics: Default::default(),
Expand All @@ -114,10 +156,101 @@ impl<Block: BlockT> Default for StorageNotifications<Block> {
}
}

struct PrintKeys<'a>(&'a Option<HashSet<StorageKey>>);
impl<'a> std::fmt::Display for PrintKeys<'a> {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
if let Some(keys) = self.0.as_ref() {
write!(fmt, "[")?;
let mut is_first = true;
for key in keys {
if is_first {
is_first = false;
} else {
write!(fmt, ", ")?;
}
write!(fmt, "{}", HexDisplay::from(key))?;
}
write!(fmt, "]")
koute marked this conversation as resolved.
Show resolved Hide resolved
} else {
write!(fmt, "None")
}
}
}

struct PrintChildKeys<'a>(&'a Option<HashMap<StorageKey, Option<HashSet<StorageKey>>>>);
impl<'a> std::fmt::Display for PrintChildKeys<'a> {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
if let Some(child_keys) = self.0.as_ref() {
write!(fmt, "{{")?;
let mut is_first = true;
for (key, values) in child_keys {
if is_first {
is_first = false;
} else {
write!(fmt, ", ")?;
}
write!(fmt, "{}={}", HexDisplay::from(key), PrintKeys(values))?;
koute marked this conversation as resolved.
Show resolved Hide resolved
}
write!(fmt, "}}")
} else {
write!(fmt, "None")
}
}
}

impl<Block: BlockT> StorageNotifications<Block> {
/// Initialize a new StorageNotifications
/// optionally pass a prometheus registry to send subscriber metrics to
pub fn new(prometheus_registry: Option<Registry>) -> Self {
StorageNotifications(Arc::new(Mutex::new(StorageNotificationsImpl::new(
prometheus_registry,
))))
}

/// Trigger notification to all listeners.
///
/// Note the changes are going to be filtered by listener's filter key.
/// In fact no event might be sent if clients are not interested in the changes.
pub fn trigger(
&mut self,
hash: &Block::Hash,
changeset: impl Iterator<Item = (Vec<u8>, Option<Vec<u8>>)>,
child_changeset: impl Iterator<
Item = (Vec<u8>, impl Iterator<Item = (Vec<u8>, Option<Vec<u8>>)>),
>,
) {
self.0.lock().trigger(hash, changeset, child_changeset);
}

/// Start listening for particular storage keys.
pub fn listen(
&mut self,
filter_keys: Option<&[StorageKey]>,
filter_child_keys: Option<&[(StorageKey, Option<Vec<StorageKey>>)]>,
) -> StorageEventStream<Block::Hash> {
let (id, rx) = self.0.lock().listen(filter_keys, filter_child_keys);
let storage_notifications = Arc::downgrade(&self.0);
StorageEventStream {
rx,
unsubscribe: Some(Box::new(move |was_triggered| {
if let Some(storage_notifications) = storage_notifications.upgrade() {
if !was_triggered {
koute marked this conversation as resolved.
Show resolved Hide resolved
if let Some((_, keys, child_keys)) =
storage_notifications.lock().sinks.get(&id)
{
log::trace!(target: "storage_notifications", "Listener was never triggered: id={}, keys={}, child_keys={}", id, PrintKeys(keys), PrintChildKeys(child_keys));
}
}
storage_notifications.lock().remove_subscriber(id);
koute marked this conversation as resolved.
Show resolved Hide resolved
}
})),
was_triggered: false,
}
}
}

impl<Block: BlockT> StorageNotificationsImpl<Block> {
fn new(prometheus_registry: Option<Registry>) -> Self {
let metrics = prometheus_registry.and_then(|r| {
CounterVec::new(
Opts::new(
Expand All @@ -130,7 +263,7 @@ impl<Block: BlockT> StorageNotifications<Block> {
.ok()
});

StorageNotifications {
StorageNotificationsImpl {
metrics,
next_id: Default::default(),
wildcard_listeners: Default::default(),
Expand All @@ -139,11 +272,7 @@ impl<Block: BlockT> StorageNotifications<Block> {
sinks: Default::default(),
}
}
/// Trigger notification to all listeners.
bkchr marked this conversation as resolved.
Show resolved Hide resolved
///
/// Note the changes are going to be filtered by listener's filter key.
/// In fact no event might be sent if clients are not interested in the changes.
pub fn trigger(
fn trigger(
&mut self,
hash: &Block::Hash,
changeset: impl Iterator<Item = (Vec<u8>, Option<Vec<u8>>)>,
Expand Down Expand Up @@ -325,12 +454,11 @@ impl<Block: BlockT> StorageNotifications<Block> {
}
}

/// Start listening for particular storage keys.
pub fn listen(
fn listen(
&mut self,
filter_keys: Option<&[StorageKey]>,
filter_child_keys: Option<&[(StorageKey, Option<Vec<StorageKey>>)]>,
) -> StorageEventStream<Block::Hash> {
) -> (u64, TracingUnboundedReceiver<(Block::Hash, StorageChangeSet)>) {
self.next_id += 1;
let current_id = self.next_id;

Expand Down Expand Up @@ -364,7 +492,7 @@ impl<Block: BlockT> StorageNotifications<Block> {
m.with_label_values(&[&"added"]).inc();
}

rx
(current_id, rx)
}
}

Expand Down Expand Up @@ -517,9 +645,9 @@ mod tests {
let _recv3 = futures::executor::block_on_stream(notifications.listen(None, None));
let _recv4 =
futures::executor::block_on_stream(notifications.listen(None, Some(&child_filter)));
assert_eq!(notifications.listeners.len(), 2);
assert_eq!(notifications.wildcard_listeners.len(), 2);
assert_eq!(notifications.child_listeners.len(), 1);
assert_eq!(notifications.0.lock().listeners.len(), 2);
assert_eq!(notifications.0.lock().wildcard_listeners.len(), 2);
assert_eq!(notifications.0.lock().child_listeners.len(), 1);
}

// when
Expand All @@ -528,9 +656,18 @@ mod tests {
notifications.trigger(&Hash::from_low_u64_be(1), changeset.into_iter(), c_changeset);

// then
assert_eq!(notifications.listeners.len(), 0);
assert_eq!(notifications.wildcard_listeners.len(), 0);
assert_eq!(notifications.child_listeners.len(), 0);
assert_eq!(notifications.0.lock().listeners.len(), 0);
assert_eq!(notifications.0.lock().wildcard_listeners.len(), 0);
assert_eq!(notifications.0.lock().child_listeners.len(), 0);
}

#[test]
fn should_cleanup_subscriber_if_stream_is_dropped() {
let mut notifications = StorageNotifications::<Block>::default();
let stream = notifications.listen(None, None);
assert_eq!(notifications.0.lock().sinks.len(), 1);
std::mem::drop(stream);
assert_eq!(notifications.0.lock().sinks.len(), 0);
}

#[test]
Expand Down