Skip to content

Commit

Permalink
fix(rpc): recreate dead and uncleaned subscriptions (#22281)
Browse files Browse the repository at this point in the history
  • Loading branch information
00nktk committed Jan 5, 2022
1 parent 5bb376f commit c1995c6
Showing 1 changed file with 35 additions and 18 deletions.
53 changes: 35 additions & 18 deletions rpc/src/rpc_subscription_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,10 @@ pub struct SignatureSubscriptionParams {

#[derive(Clone)]
pub struct SubscriptionControl(Arc<SubscriptionControlInner>);
pub struct WeakSubscriptionTokenRef(Weak<SubscriptionTokenInner>, SubscriptionId);

struct SubscriptionControlInner {
subscriptions: DashMap<SubscriptionParams, Weak<SubscriptionTokenInner>>,
subscriptions: DashMap<SubscriptionParams, WeakSubscriptionTokenRef>,
next_id: AtomicU64,
max_active_subscriptions: usize,
sender: crossbeam_channel::Sender<TimestampedNotificationEntry>,
Expand Down Expand Up @@ -216,33 +217,44 @@ impl SubscriptionControl {
self.0.subscriptions.len()
);
let count = self.0.subscriptions.len();
match self.0.subscriptions.entry(params) {
DashEntry::Occupied(entry) => Ok(SubscriptionToken(
entry
.get()
.upgrade()
.expect("dead subscription encountered in SubscriptionControl"),
let create_token_and_weak_ref = |id, params| {
let token = SubscriptionToken(
Arc::new(SubscriptionTokenInner {
control: Arc::clone(&self.0),
params,
id,
}),
self.0.counter.create_token(),
)),
);
let weak_ref = WeakSubscriptionTokenRef(Arc::downgrade(&token.0), token.0.id);
(token, weak_ref)
};

match self.0.subscriptions.entry(params) {
DashEntry::Occupied(mut entry) => match entry.get().0.upgrade() {
Some(token_ref) => Ok(SubscriptionToken(token_ref, self.0.counter.create_token())),
// This means the last Arc for this Weak pointer entered the drop just before us,
// but could not remove the entry since we are holding the write lock.
// See `Drop` implementation for `SubscriptionTokenInner` for further info.
None => {
let (token, weak_ref) =
create_token_and_weak_ref(entry.get().1, entry.key().clone());
entry.insert(weak_ref);
Ok(token)
}
},
DashEntry::Vacant(entry) => {
if count >= self.0.max_active_subscriptions {
inc_new_counter_info!("rpc-subscription-refused-limit-reached", 1);
return Err(Error::TooManySubscriptions);
}
let id = SubscriptionId::from(self.0.next_id.fetch_add(1, Ordering::AcqRel));
let token = SubscriptionToken(
Arc::new(SubscriptionTokenInner {
control: Arc::clone(&self.0),
params: entry.key().clone(),
id,
}),
self.0.counter.create_token(),
);
let (token, weak_ref) = create_token_and_weak_ref(id, entry.key().clone());
let _ = self
.0
.sender
.send(NotificationEntry::Subscribed(token.0.params.clone(), id).into());
entry.insert(Arc::downgrade(&token.0));
entry.insert(weak_ref);
datapoint_info!(
"rpc-subscription",
("total", self.0.subscriptions.len(), i64)
Expand Down Expand Up @@ -529,7 +541,9 @@ impl Drop for SubscriptionTokenInner {
DashEntry::Vacant(_) => {
warn!("Subscriptions inconsistency (missing entry in by_params)");
}
DashEntry::Occupied(entry) => {
// Check the strong refs count to ensure no other thread recreated this subscription (not token)
// while we were acquiring the lock.
DashEntry::Occupied(entry) if entry.get().0.strong_count() == 0 => {
let _ = self
.control
.sender
Expand All @@ -540,6 +554,9 @@ impl Drop for SubscriptionTokenInner {
("total", self.control.subscriptions.len(), i64)
);
}
// This branch handles the case in which this entry got recreated
// while we were waiting for the lock (inside the `DashMap::entry` method).
DashEntry::Occupied(_entry) /* if _entry.get().0.strong_count() > 0 */ => (),
}
}
}
Expand Down

0 comments on commit c1995c6

Please sign in to comment.