From f6a60a9655849e7c7cf9971e28908667390226bb Mon Sep 17 00:00:00 2001 From: meryacine Date: Wed, 14 Sep 2022 16:27:28 +0200 Subject: [PATCH] Rework the retrier This is an attempt to rework the retrier logic to simplify how it works and make it less error prone. This is done by making the retry manager object responsible for both: 1- adding new retriers and extending current ones 2- removing retriers when they finish their work This way, we don't need a mutex to gaurd the retriers hashmap & we are sure there is no adding/extending retriers and removing them happending at the same time, because only the retry manager does it and not single retriers (i.e. retriers can't remove themselves from the retriers hashmap). The retry manager logic goes as follows: 1- drain the unreachable towers channel till it's empty, and store the pending appointments (locators to be exact) in the pending appointments set for each retrier. 2- remove any finished retrier (ones that succeeded and have no more pending appointments) and failed retriers (ones that failed to send their appointments). 3- start all the non-running retriers left after removing failed and finished retrieres. Retriers will signal thier status so that the retry manager could determine which retriers to keep, which to remove, and which to re-start. We also set tower as unreachable when destroying the tower's retrier and not after completing backoff. This makes it so that the tower is unreachable until its retrier is destroyed, thus manual tower retry by the user will fail with an error till the tower's retrier is destroyed. If we were to set the unreachable tower status after the backoff, then manual user retries might get discarded completely without an error because retrier set the tower state to unreachable too early thus allowing the user to perform manual retries, but if the user does manual retry, it won't get carried out, since the retry manager will remove that retrier anyway as it failed to deliver its pending appointments. --- watchtower-plugin/src/main.rs | 4 +- watchtower-plugin/src/retrier.rs | 346 +++++++++++++++++++------------ watchtower-plugin/tests/test.py | 16 +- 3 files changed, 236 insertions(+), 130 deletions(-) diff --git a/watchtower-plugin/src/main.rs b/watchtower-plugin/src/main.rs index 74195127..07c883b1 100755 --- a/watchtower-plugin/src/main.rs +++ b/watchtower-plugin/src/main.rs @@ -615,8 +615,8 @@ async fn main() -> Result<(), Error> { 60 }; tokio::spawn(async move { - RetryManager::new(state_clone) - .manage_retry(max_elapsed_time, max_interval_time, rx) + RetryManager::new(state_clone, rx, max_elapsed_time, max_interval_time) + .manage_retry() .await }); plugin.join().await diff --git a/watchtower-plugin/src/retrier.rs b/watchtower-plugin/src/retrier.rs index cfc5afd3..923c30f2 100644 --- a/watchtower-plugin/src/retrier.rs +++ b/watchtower-plugin/src/retrier.rs @@ -1,7 +1,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::{Arc, Mutex}; use std::time::Duration; -use tokio::sync::mpsc::UnboundedReceiver; +use tokio::sync::mpsc::{error::TryRecvError, UnboundedReceiver}; use backoff::future::retry_notify; use backoff::{Error, ExponentialBackoff}; @@ -16,97 +16,74 @@ use crate::wt_client::WTClient; pub struct RetryManager { wt_client: Arc>, - retriers: Arc>>, + unreachable_towers: UnboundedReceiver<(TowerId, Locator)>, + max_elapsed_time_secs: u16, + max_interval_time_secs: u16, + retriers: HashMap, } impl RetryManager { - pub fn new(wt_client: Arc>) -> Self { + pub fn new( + wt_client: Arc>, + unreachable_towers: UnboundedReceiver<(TowerId, Locator)>, + max_elapsed_time_secs: u16, + max_interval_time_secs: u16, + ) -> Self { RetryManager { wt_client, - retriers: Arc::new(Mutex::new(HashMap::new())), + unreachable_towers, + max_elapsed_time_secs, + max_interval_time_secs, + retriers: HashMap::new(), } } - pub async fn manage_retry( - &mut self, - max_elapsed_time_secs: u16, - max_interval_time_secs: u16, - mut unreachable_towers: UnboundedReceiver<(TowerId, Locator)>, - ) { + + /// Starts the retry manager's main logic loop. + /// This method will keep running until the `unreachable_towers` sender disconnects. + /// + /// It will receive any `(tower_id, locator)` pair and try to send the appointment with `locator` to + /// the tower with `tower_id`. This is done by spawning a tokio thread for each `tower_id` that tries + /// to send all the pending appointments. + pub async fn manage_retry(&mut self) { log::info!("Starting retry manager"); loop { - let (tower_id, locator) = unreachable_towers.recv().await.unwrap(); - // Not start a retry if the tower is flagged to be abandoned - { - let wt_client = self.wt_client.lock().unwrap(); - if !wt_client.towers.contains_key(&tower_id) { - log::info!("Skipping retrying abandoned tower {}", tower_id); - continue; + match self.unreachable_towers.try_recv() { + Ok((tower_id, locator)) => { + // Not start a retry if the tower is flagged to be abandoned + if !self + .wt_client + .lock() + .unwrap() + .towers + .contains_key(&tower_id) + { + log::info!("Skipping retrying abandoned tower {}", tower_id); + continue; + } + self.add_pending_appointment(tower_id, locator); } - } - - if let Some(retrier) = self.add_pending_appointment(tower_id, locator) { - log::info!("Retrying tower {}", tower_id); - let wt_client = self.wt_client.clone(); - let retriers = self.retriers.clone(); - - tokio::spawn(async move { - let r = retry_notify( - ExponentialBackoff { - max_elapsed_time: Some(Duration::from_secs( - max_elapsed_time_secs as u64, - )), - max_interval: Duration::from_secs(max_interval_time_secs as u64), - ..ExponentialBackoff::default() - }, - || async { retrier.retry_tower(tower_id).await }, - |err, _| { - log::warn!("Retry error happened with {}. {}", tower_id, err); - }, - ) - .await; - - let mut state = wt_client.lock().unwrap(); - let retrier = retriers.lock().unwrap().remove(&tower_id).unwrap(); - - match r { - Ok(_) => { - let pending_appointments = retrier.pending_appointments.lock().unwrap(); - if !pending_appointments.is_empty() { - // If there are pending appointments by the time we remove the retrier we send them back through the channel - // so they are not missed. Notice this is unlikely given the map is checked before exiting `retry_tower`, but it - // can happen. - log::info!( - "Some data was missed while retrying {}. Adding it back", - tower_id - ); - for locator in retrier.pending_appointments.lock().unwrap().drain() - { - state.unreachable_towers.send((tower_id, locator)).unwrap(); - } - } else { - log::info!("Retry strategy succeeded for {}", tower_id); - state.set_tower_status(tower_id, crate::TowerStatus::Reachable); - } - } - Err(e) => { - log::warn!("Retry strategy gave up for {}. {}", tower_id, e); - // Notice we'll end up here after a permanent error. That is, either after finishing the backoff strategy - // unsuccessfully or by manually raising such an error (like when facing a tower misbehavior) - if let Some(tower) = state.towers.get_mut(&tower_id) { - if tower.status.is_temporary_unreachable() { - log::warn!("Setting {} as unreachable", tower_id); - state.set_tower_status( - tower_id, - crate::TowerStatus::Unreachable, - ); - } - } else { - log::info!("Skipping retrying abandoned tower {}", tower_id); - } + Err(TryRecvError::Empty) => { + // Keep only running retriers and retriers ready to be started/re-started. + // This will remove failed ones and ones finished successfully and have no pending appointments. + // + // Note that a failed retrier could have received some new appointments to retry. In this case, we don't try to send + // them because we know that that tower is unreachable. We most likely received these new appointments while the tower + // was still flagged as temporarily unreachable when cleaning up after giving up retrying. + self.retriers.retain(|_, retrier| { + retrier.set_tower_status_if_failed(); + retrier.is_running() || retrier.should_start() + }); + // Start all the ready retriers. + for retrier in self.retriers.values() { + if retrier.should_start() { + self.start_retrying(retrier); } } - }); + // Sleep to not waste a lot of CPU cycles. + tokio::time::sleep(Duration::from_secs(1)).await; + } + Err(TryRecvError::Disconnected) => break, } } } @@ -114,76 +91,167 @@ impl RetryManager { /// Adds an appointment to pending for a given tower. /// /// If the tower is not currently being retried, a new entry for it is created, otherwise, the data is appended to the existing entry. - /// - /// Returns true if a new entry is created, false otherwise. - fn add_pending_appointment(&mut self, tower_id: TowerId, locator: Locator) -> Option { - let mut retriers = self.retriers.lock().unwrap(); - if let std::collections::hash_map::Entry::Vacant(e) = retriers.entry(tower_id) { + fn add_pending_appointment(&mut self, tower_id: TowerId, locator: Locator) { + if let std::collections::hash_map::Entry::Vacant(e) = self.retriers.entry(tower_id) { log::debug!( "Creating a new entry for tower {} with locator {}", tower_id, locator ); - self.wt_client - .lock() - .unwrap() - .set_tower_status(tower_id, crate::TowerStatus::TemporaryUnreachable); - - let retrier = Retrier::new(self.wt_client.clone(), locator); - e.insert(retrier.clone()); - - Some(retrier) + e.insert(Retrier::new(self.wt_client.clone(), tower_id, locator)); } else { log::debug!( "Adding pending appointment {} to existing tower {}", locator, tower_id ); - retriers + self.retriers .get(&tower_id) .unwrap() .pending_appointments .lock() .unwrap() .insert(locator); - - None } } + + fn start_retrying(&self, retrier: &Retrier) { + log::info!("Retrying tower {}", retrier.tower_id); + retrier.start(self.max_elapsed_time_secs, self.max_interval_time_secs); + } +} + +#[derive(Debug, PartialEq, Eq)] +pub enum RetrierStatus { + /// Retrier is stopped. This could happen if the retrier was never started or it started and + /// finished successfully. If a retrier is stopped and has some pending appointments, it should be + /// started/re-started, otherwise, it can be deleted safely. + Stopped, + /// Retrier is currently retrying the tower. If the retrier receives new appointments, it will + /// **try** to send them along (but it might not send them). + /// + /// If a retrier status is `Running`, then its associated tower is temporary unreachable. + Running, + /// Retrier failed retrying the tower. Should not be re-started. + /// + /// If a retrier status is `Failed`, then its associated tower is neither reachable nor temporary unreachable. + Failed, } #[derive(Clone)] pub struct Retrier { wt_client: Arc>, + tower_id: TowerId, pending_appointments: Arc>>, + status: Arc>, } impl Retrier { - pub fn new(wt_client: Arc>, locator: Locator) -> Self { + pub fn new(wt_client: Arc>, tower_id: TowerId, locator: Locator) -> Self { Self { wt_client, + tower_id, pending_appointments: Arc::new(Mutex::new(HashSet::from([locator]))), + status: Arc::new(Mutex::new(RetrierStatus::Stopped)), } } - async fn retry_tower(&self, tower_id: TowerId) -> Result<(), Error<&'static str>> { + fn has_pending_appointments(&self) -> bool { + !self.pending_appointments.lock().unwrap().is_empty() + } + + fn set_status(&self, status: RetrierStatus) { + *self.status.lock().unwrap() = status; + } + + pub fn is_running(&self) -> bool { + *self.status.lock().unwrap() == RetrierStatus::Running + } + + pub fn should_start(&self) -> bool { + // A retrier can be started/re-started if it is stopped (i.e. not running and not failed) + // and has some pending appointments. + *self.status.lock().unwrap() == RetrierStatus::Stopped && self.has_pending_appointments() + } + + pub fn start(&self, max_elapsed_time_secs: u16, max_interval_time_secs: u16) { + let retrier = self.clone(); + + // We shouldn't be retrying failed and running retriers. + debug_assert_eq!(*retrier.status.lock().unwrap(), RetrierStatus::Stopped); + + // Set the tower as temporary unreachable and the retrier status to running. + retrier + .wt_client + .lock() + .unwrap() + .set_tower_status(retrier.tower_id, crate::TowerStatus::TemporaryUnreachable); + retrier.set_status(RetrierStatus::Running); + + tokio::spawn(async move { + let r = retry_notify( + ExponentialBackoff { + max_elapsed_time: Some(Duration::from_secs(max_elapsed_time_secs as u64)), + max_interval: Duration::from_secs(max_interval_time_secs as u64), + ..ExponentialBackoff::default() + }, + || async { retrier.run().await }, + |err, _| { + log::warn!("Retry error happened with {}. {}", retrier.tower_id, err); + }, + ) + .await; + + let mut state = retrier.wt_client.lock().unwrap(); + + match r { + Ok(_) => { + log::info!("Retry strategy succeeded for {}", retrier.tower_id); + // Set the tower status now so new appointment doesn't go to the retry manager. + state.set_tower_status(retrier.tower_id, crate::TowerStatus::Reachable); + // Retrier succeeded and can be re-used by re-starting it. + retrier.set_status(RetrierStatus::Stopped); + } + Err(e) => { + // Notice we'll end up here after a permanent error. That is, either after finishing the backoff strategy + // unsuccessfully or by manually raising such an error (like when facing a tower misbehavior). + log::warn!("Retry strategy gave up for {}. {}", retrier.tower_id, e); + + // Retrier failed and should be given up on. Avoid setting the tower status until the retrier is + // deleted/dropped. This way users performing manual retry will get an error as the tower will be + // temporary unreachable. + // We don't need to set the tower status now. Any new appointments we receive will not be retried anyways. + retrier.set_status(RetrierStatus::Failed); + } + } + }); + } + + async fn run(&self) -> Result<(), Error<&'static str>> { // Create a new scope so we can get all the data only locking the WTClient once. - let (net_addr, user_sk, proxy) = { + let (tower_id, net_addr, user_sk, proxy) = { let wt_client = self.wt_client.lock().unwrap(); - if wt_client.towers.get(&tower_id).is_none() { + if wt_client.towers.get(&self.tower_id).is_none() { return Err(Error::permanent("Tower was abandoned. Skipping retry")); } - if self.pending_appointments.lock().unwrap().is_empty() { + if !self.has_pending_appointments() { + // will this ever happen ?? + // FIXME: success/Ok() here instead so not to mark the tower as unreachable. return Err(Error::permanent("Tower has no data pending for retry")); } - let net_addr = wt_client.towers.get(&tower_id).unwrap().net_addr.clone(); + let net_addr = wt_client + .towers + .get(&self.tower_id) + .unwrap() + .net_addr + .clone(); let user_sk = wt_client.user_sk; - (net_addr, user_sk, wt_client.proxy.clone()) + (self.tower_id, net_addr, user_sk, wt_client.proxy.clone()) }; - while !self.pending_appointments.lock().unwrap().is_empty() { + while self.has_pending_appointments() { let locators = self.pending_appointments.lock().unwrap().clone(); for locator in locators.into_iter() { let appointment = self @@ -231,6 +299,10 @@ impl Retrier { AddAppointmentError::ApiError(e) => match e.error_code { errors::INVALID_SIGNATURE_OR_SUBSCRIPTION_ERROR => { log::warn!("There is a subscription issue with {}", tower_id); + self.wt_client.lock().unwrap().set_tower_status( + tower_id, + crate::TowerStatus::SubscriptionError, + ); return Err(Error::permanent("Subscription error")); } _ => { @@ -265,6 +337,24 @@ impl Retrier { Ok(()) } + + /// Sets the correct tower status if the retrier status is failed. + /// + /// This method MUST be called before getting rid of a failed retrier, and has + /// no effect on non-failed retriers. + pub fn set_tower_status_if_failed(&self) { + if *self.status.lock().unwrap() == RetrierStatus::Failed { + let mut state = self.wt_client.lock().unwrap(); + if let Some(tower) = state.towers.get(&self.tower_id) { + if tower.status.is_temporary_unreachable() { + log::warn!("Setting {} as unreachable", self.tower_id); + state.set_tower_status(self.tower_id, crate::TowerStatus::Unreachable); + } + } else { + log::info!("Skipping retrying abandoned tower {}", self.tower_id); + } + } + } } #[cfg(test)] @@ -290,10 +380,12 @@ mod tests { const MAX_INTERVAL_TIME: u16 = 1; impl Retrier { - fn empty(wt_client: Arc>) -> Self { + fn empty(wt_client: Arc>, tower_id: TowerId) -> Self { Self { wt_client, + tower_id, pending_appointments: Arc::new(Mutex::new(HashSet::new())), + status: Arc::new(Mutex::new(RetrierStatus::Stopped)), } } } @@ -344,8 +436,8 @@ mod tests { // Start the task and send the tower to the channel for retry let wt_client_clone = wt_client.clone(); let task = tokio::spawn(async move { - RetryManager::new(wt_client_clone) - .manage_retry(MAX_ELAPSED_TIME, MAX_INTERVAL_TIME, rx) + RetryManager::new(wt_client_clone, rx, MAX_ELAPSED_TIME, MAX_INTERVAL_TIME) + .manage_retry() .await }); tx.send((tower_id, appointment.locator)).unwrap(); @@ -406,8 +498,8 @@ mod tests { let max_elapsed_time = MAX_ELAPSED_TIME + 1; let task = tokio::spawn(async move { - RetryManager::new(wt_client_clone) - .manage_retry(MAX_ELAPSED_TIME, MAX_INTERVAL_TIME, rx) + RetryManager::new(wt_client_clone, rx, MAX_ELAPSED_TIME, MAX_INTERVAL_TIME) + .manage_retry() .await }); tx.send((tower_id, appointment.locator)).unwrap(); @@ -477,8 +569,8 @@ mod tests { // Start the task and send the tower to the channel for retry let wt_client_clone = wt_client.clone(); let task = tokio::spawn(async move { - RetryManager::new(wt_client_clone) - .manage_retry(MAX_ELAPSED_TIME, MAX_INTERVAL_TIME, rx) + RetryManager::new(wt_client_clone, rx, MAX_ELAPSED_TIME, MAX_INTERVAL_TIME) + .manage_retry() .await }); tx.send((tower_id, appointment.locator)).unwrap(); @@ -561,8 +653,8 @@ mod tests { // Start the task and send the tower to the channel for retry let wt_client_clone = wt_client.clone(); let task = tokio::spawn(async move { - RetryManager::new(wt_client_clone) - .manage_retry(MAX_ELAPSED_TIME, MAX_INTERVAL_TIME, rx) + RetryManager::new(wt_client_clone, rx, MAX_ELAPSED_TIME, MAX_INTERVAL_TIME) + .manage_retry() .await }); tx.send((tower_id, appointment.locator)).unwrap(); @@ -607,8 +699,8 @@ mod tests { // Start the task and send the tower to the channel for retry let wt_client_clone = wt_client.clone(); let task = tokio::spawn(async move { - RetryManager::new(wt_client_clone) - .manage_retry(MAX_ELAPSED_TIME, MAX_INTERVAL_TIME, rx) + RetryManager::new(wt_client_clone, rx, MAX_ELAPSED_TIME, MAX_INTERVAL_TIME) + .manage_retry() .await }); @@ -662,8 +754,8 @@ mod tests { }); // Since we are retrying manually, we need to add the data to pending appointments manually too - let retrier = Retrier::new(wt_client, appointment.locator); - let r = retrier.retry_tower(tower_id).await; + let retrier = Retrier::new(wt_client, tower_id, appointment.locator); + let r = retrier.run().await; assert_eq!(r, Ok(())); api_mock.assert(); } @@ -687,7 +779,7 @@ mod tests { .unwrap(); // If there are no pending appointments the method will simply return - let r = Retrier::empty(wt_client).retry_tower(tower_id).await; + let r = Retrier::empty(wt_client, tower_id).run().await; assert_eq!( r, Err(Error::permanent("Tower has no data pending for retry")) @@ -735,8 +827,8 @@ mod tests { }); // Since we are retrying manually, we need to add the data to pending appointments manually too - let retrier = Retrier::new(wt_client, appointment.locator); - let r = retrier.retry_tower(tower_id).await; + let retrier = Retrier::new(wt_client, tower_id, appointment.locator); + let r = retrier.run().await; assert_eq!(r, Err(Error::permanent("Tower misbehaved"))); api_mock.assert(); } @@ -766,8 +858,8 @@ mod tests { .add_pending_appointment(tower_id, &appointment); // Since we are retrying manually, we need to add the data to pending appointments manually too - let retrier = Retrier::new(wt_client, appointment.locator); - let r = retrier.retry_tower(tower_id).await; + let retrier = Retrier::new(wt_client, tower_id, appointment.locator); + let r = retrier.run().await; assert_eq!(r, Err(Error::transient("Tower cannot be reached"))); } @@ -808,8 +900,8 @@ mod tests { .add_pending_appointment(tower_id, &appointment); // Since we are retrying manually, we need to add the data to pending appointments manually too - let retrier = Retrier::new(wt_client, appointment.locator); - let r = retrier.retry_tower(tower_id).await; + let retrier = Retrier::new(wt_client, tower_id, appointment.locator); + let r = retrier.run().await; assert_eq!(r, Err(Error::permanent("Subscription error"))); api_mock.assert(); @@ -851,8 +943,8 @@ mod tests { .add_pending_appointment(tower_id, &appointment); // Since we are retrying manually, we need to add the data to pending appointments manually too - let retrier = Retrier::new(wt_client.clone(), appointment.locator); - let r = retrier.retry_tower(tower_id).await; + let retrier = Retrier::new(wt_client.clone(), tower_id, appointment.locator); + let r = retrier.run().await; assert_eq!(r, Ok(())); api_mock.assert(); @@ -888,7 +980,7 @@ mod tests { wt_client.lock().unwrap().remove_tower(tower_id).unwrap(); // If there are no pending appointments the method will simply return - let r = Retrier::empty(wt_client).retry_tower(tower_id).await; + let r = Retrier::empty(wt_client, tower_id).run().await; assert_eq!( r, diff --git a/watchtower-plugin/tests/test.py b/watchtower-plugin/tests/test.py index 65c53310..735675fd 100644 --- a/watchtower-plugin/tests/test.py +++ b/watchtower-plugin/tests/test.py @@ -128,12 +128,26 @@ def test_retry_watchtower(node_factory, bitcoind, teosd): # Make a new payment with an unreachable tower l1.rpc.pay(l2.rpc.invoice(25000000, "lbl1", "desc1")["bolt11"]) + + # The retrier manager waits 1 second before spawning new retriers for unreachable towers, + # so we need to wait a little bit until a retrier is started for our tower. + while l2.rpc.gettowerinfo(tower_id)["status"] == "temporary_unreachable": + time.sleep(1) assert l2.rpc.gettowerinfo(tower_id)["status"] == "unreachable" assert l2.rpc.gettowerinfo(tower_id)["pending_appointments"] # Start the tower and retry it teosd.start() - l2.rpc.retrytower(tower_id) + + # Even though we set the max retry time to zero seconds, the retrier manager takes some time (1s) to recognize + # that the tower is unreachable. So manual retries might fail as the tower is marked as temporary unreachable. + while True: + try: + l2.rpc.retrytower(tower_id) + break + except Exception: + time.sleep(1) + while l2.rpc.gettowerinfo(tower_id)["pending_appointments"]: time.sleep(1)