Skip to content

Commit

Permalink
fix: Arc retriers so we reduce the cloning + antipatern
Browse files Browse the repository at this point in the history
One think I really dislike about talaia-labs#89 was having a method that cloned it's caller
to be able to work around spawning a task inside it that called a method of the same class.

Turns out you can have self as Arc<Self> which would completely prevent having to do such a thing,
plus it also reduces the amount of things being cloned.
  • Loading branch information
sr-gi committed Sep 19, 2022
1 parent 8867de1 commit 7dfb091
Showing 1 changed file with 27 additions and 27 deletions.
54 changes: 27 additions & 27 deletions watchtower-plugin/src/retrier.rs
Expand Up @@ -19,7 +19,7 @@ pub struct RetryManager {
unreachable_towers: UnboundedReceiver<(TowerId, Locator)>,
max_elapsed_time_secs: u16,
max_interval_time_secs: u16,
retriers: HashMap<TowerId, Retrier>,
retriers: HashMap<TowerId, Arc<Retrier>>,
}

impl RetryManager {
Expand Down Expand Up @@ -77,7 +77,7 @@ impl RetryManager {
// Start all the ready retriers.
for retrier in self.retriers.values() {
if retrier.should_start() {
self.start_retrying(retrier);
self.start_retrying(retrier.clone());
}
}
// Sleep to not waste a lot of CPU cycles.
Expand All @@ -98,7 +98,11 @@ impl RetryManager {
tower_id,
locator
);
e.insert(Retrier::new(self.wt_client.clone(), tower_id, locator));
e.insert(Arc::new(Retrier::new(
self.wt_client.clone(),
tower_id,
locator,
)));
} else {
log::debug!(
"Adding pending appointment {} to existing tower {}",
Expand All @@ -115,7 +119,7 @@ impl RetryManager {
}
}

fn start_retrying(&self, retrier: &Retrier) {
fn start_retrying(&self, retrier: Arc<Retrier>) {
log::info!("Retrying tower {}", retrier.tower_id);
retrier.start(self.max_elapsed_time_secs, self.max_interval_time_secs);
}
Expand All @@ -138,21 +142,20 @@ pub enum RetrierStatus {
Failed,
}

#[derive(Clone)]
pub struct Retrier {
wt_client: Arc<Mutex<WTClient>>,
tower_id: TowerId,
pending_appointments: Arc<Mutex<HashSet<Locator>>>,
status: Arc<Mutex<RetrierStatus>>,
pending_appointments: Mutex<HashSet<Locator>>,
status: Mutex<RetrierStatus>,
}

impl Retrier {
pub fn new(wt_client: Arc<Mutex<WTClient>>, tower_id: TowerId, locator: Locator) -> Self {
Self {
wt_client,
tower_id,
pending_appointments: Arc::new(Mutex::new(HashSet::from([locator]))),
status: Arc::new(Mutex::new(RetrierStatus::Stopped)),
pending_appointments: Mutex::new(HashSet::from([locator])),
status: Mutex::new(RetrierStatus::Stopped),
}
}

Expand All @@ -174,19 +177,16 @@ impl Retrier {
*self.status.lock().unwrap() == RetrierStatus::Stopped && self.has_pending_appointments()
}

pub fn start(&self, max_elapsed_time_secs: u16, max_interval_time_secs: u16) {
let retrier = self.clone();

pub fn start(self: Arc<Self>, max_elapsed_time_secs: u16, max_interval_time_secs: u16) {
// We shouldn't be retrying failed and running retriers.
debug_assert_eq!(*retrier.status.lock().unwrap(), RetrierStatus::Stopped);
debug_assert_eq!(*self.status.lock().unwrap(), RetrierStatus::Stopped);

// Set the tower as temporary unreachable and the retrier status to running.
retrier
.wt_client
self.wt_client
.lock()
.unwrap()
.set_tower_status(retrier.tower_id, crate::TowerStatus::TemporaryUnreachable);
retrier.set_status(RetrierStatus::Running);
.set_tower_status(self.tower_id, crate::TowerStatus::TemporaryUnreachable);
self.set_status(RetrierStatus::Running);

tokio::spawn(async move {
let r = retry_notify(
Expand All @@ -195,33 +195,33 @@ impl Retrier {
max_interval: Duration::from_secs(max_interval_time_secs as u64),
..ExponentialBackoff::default()
},
|| async { retrier.run().await },
|| async { self.run().await },
|err, _| {
log::warn!("Retry error happened with {}. {}", retrier.tower_id, err);
log::warn!("Retry error happened with {}. {}", self.tower_id, err);
},
)
.await;

let mut state = retrier.wt_client.lock().unwrap();
let mut state = self.wt_client.lock().unwrap();

match r {
Ok(_) => {
log::info!("Retry strategy succeeded for {}", retrier.tower_id);
log::info!("Retry strategy succeeded for {}", self.tower_id);
// Set the tower status now so new appointment doesn't go to the retry manager.
state.set_tower_status(retrier.tower_id, crate::TowerStatus::Reachable);
state.set_tower_status(self.tower_id, crate::TowerStatus::Reachable);
// Retrier succeeded and can be re-used by re-starting it.
retrier.set_status(RetrierStatus::Stopped);
self.set_status(RetrierStatus::Stopped);
}
Err(e) => {
// Notice we'll end up here after a permanent error. That is, either after finishing the backoff strategy
// unsuccessfully or by manually raising such an error (like when facing a tower misbehavior).
log::warn!("Retry strategy gave up for {}. {}", retrier.tower_id, e);
log::warn!("Retry strategy gave up for {}. {}", self.tower_id, e);

// Retrier failed and should be given up on. Avoid setting the tower status until the retrier is
// deleted/dropped. This way users performing manual retry will get an error as the tower will be
// temporary unreachable.
// We don't need to set the tower status now. Any new appointments we receive will not be retried anyways.
retrier.set_status(RetrierStatus::Failed);
self.set_status(RetrierStatus::Failed);
}
}
});
Expand Down Expand Up @@ -382,8 +382,8 @@ mod tests {
Self {
wt_client,
tower_id,
pending_appointments: Arc::new(Mutex::new(HashSet::new())),
status: Arc::new(Mutex::new(RetrierStatus::Stopped)),
pending_appointments: Mutex::new(HashSet::new()),
status: Mutex::new(RetrierStatus::Stopped),
}
}
}
Expand Down

0 comments on commit 7dfb091

Please sign in to comment.