Skip to content

Commit

Permalink
Review fixes
Browse files Browse the repository at this point in the history
Need to be squashed
  • Loading branch information
sr-gi committed Sep 4, 2022
1 parent 552f199 commit 4fb7083
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 45 deletions.
92 changes: 48 additions & 44 deletions watchtower-plugin/src/retrier.rs
Expand Up @@ -46,49 +46,48 @@ impl Retrier {
{
let wt_client = self.wt_client.lock().unwrap();
if !wt_client.towers.contains_key(&tower_id) {
continue;
}
if wt_client.towers.get(&tower_id).is_none() {
log::info!("Skipping retrying abandoned tower {}", tower_id);
continue;
}
}

self.add_pending_appointment(tower_id, locator);

log::info!("Retrying tower {}", tower_id);
let r = retry_notify(
ExponentialBackoff {
max_elapsed_time: Some(Duration::from_secs(self.max_elapsed_time_secs as u64)),
max_interval: Duration::from_secs(self.max_interval_time_secs as u64),
..ExponentialBackoff::default()
},
|| async { self.retry_tower(tower_id).await },
|err, _| {
log::warn!("Retry error happened with {}. {}", tower_id, err);
},
)
.await;

let mut state = self.wt_client.lock().unwrap();
self.pending_appointments.lock().unwrap().remove(&tower_id);

match r {
Ok(_) => {
log::info!("Retry strategy succeeded for {}", tower_id);
state.set_tower_status(tower_id, crate::TowerStatus::Reachable);
}
Err(e) => {
log::warn!("Retry strategy gave up for {}. {}", tower_id, e);
// Notice we'll end up here after a permanent error. That is, either after finishing the backoff strategy
// unsuccessfully or by manually raising such an error (like when facing a tower misbehavior)
if let Some(tower) = state.towers.get_mut(&tower_id) {
if tower.status.is_unreachable() {
log::warn!("Setting {} as unreachable", tower_id);
state.set_tower_status(tower_id, crate::TowerStatus::Unreachable);
if self.add_pending_appointment(tower_id, locator) {
log::info!("Retrying tower {}", tower_id);
let r = retry_notify(
ExponentialBackoff {
max_elapsed_time: Some(Duration::from_secs(
self.max_elapsed_time_secs as u64,
)),
max_interval: Duration::from_secs(self.max_interval_time_secs as u64),
..ExponentialBackoff::default()
},
|| async { self.retry_tower(tower_id).await },
|err, _| {
log::warn!("Retry error happened with {}. {}", tower_id, err);
},
)
.await;

let mut state = self.wt_client.lock().unwrap();
self.pending_appointments.lock().unwrap().remove(&tower_id);

match r {
Ok(_) => {
log::info!("Retry strategy succeeded for {}", tower_id);
state.set_tower_status(tower_id, crate::TowerStatus::Reachable);
}
Err(e) => {
log::warn!("Retry strategy gave up for {}. {}", tower_id, e);
// Notice we'll end up here after a permanent error. That is, either after finishing the backoff strategy
// unsuccessfully or by manually raising such an error (like when facing a tower misbehavior)
if let Some(tower) = state.towers.get_mut(&tower_id) {
if tower.status.is_unreachable() {
log::warn!("Setting {} as unreachable", tower_id);
state.set_tower_status(tower_id, crate::TowerStatus::Unreachable);
}
} else {
log::info!("Skipping retrying abandoned tower {}", tower_id);
}
} else {
log::info!("Skipping retrying abandoned tower {}", tower_id);
}
}
}
Expand All @@ -98,7 +97,9 @@ impl Retrier {
/// Adds an appointment to pending for a given tower.
///
/// If the tower is not currently being retried, a new entry for it is created, otherwise, the data is appended to the existing entry.
fn add_pending_appointment(&self, tower_id: TowerId, locator: Locator) {
///
/// Returns true if a new entry is created, false otherwise.
fn add_pending_appointment(&self, tower_id: TowerId, locator: Locator) -> bool {
let mut pending_appointments = self.pending_appointments.lock().unwrap();
if let std::collections::hash_map::Entry::Vacant(e) = pending_appointments.entry(tower_id) {
log::debug!(
Expand All @@ -111,6 +112,8 @@ impl Retrier {
.unwrap()
.set_tower_status(tower_id, crate::TowerStatus::TemporaryUnreachable);
e.insert(HashSet::from([locator]));

true
} else {
log::debug!(
"Adding pending appointment {} to existing tower {}",
Expand All @@ -121,6 +124,8 @@ impl Retrier {
.get_mut(&tower_id)
.unwrap()
.insert(locator);

false
}
}

Expand All @@ -132,12 +137,11 @@ impl Retrier {
return Err(Error::permanent("Tower was abandoned. Skipping retry"));
}

if self
if !self
.pending_appointments
.lock()
.unwrap()
.get(&tower_id)
.is_none()
.contains_key(&tower_id)
{
return Err(Error::permanent("Tower has no data pending for retry"));
}
Expand Down Expand Up @@ -213,7 +217,7 @@ impl Retrier {
AddAppointmentError::ApiError(e) => match e.error_code {
errors::INVALID_SIGNATURE_OR_SUBSCRIPTION_ERROR => {
log::warn!("There is a subscription issue with {}", tower_id);
return Err(Error::transient("Subscription error"));
return Err(Error::permanent("Subscription error"));
}
_ => {
log::warn!(
Expand All @@ -223,7 +227,7 @@ impl Retrier {
e.error_code
);
// We need to move the appointment from pending to invalid
// Add itn first to invalid and remove it from pending later so a cascade delete is not triggered
// Add it first to invalid and remove it from pending later so a cascade delete is not triggered
self.pending_appointments
.lock()
.unwrap()
Expand Down Expand Up @@ -807,7 +811,7 @@ mod tests {

let r = retrier.retry_tower(tower_id).await;

assert_eq!(r, Err(Error::transient("Subscription error")));
assert_eq!(r, Err(Error::permanent("Subscription error")));
api_mock.assert();
}

Expand Down
2 changes: 1 addition & 1 deletion watchtower-plugin/tests/test.py
Expand Up @@ -134,7 +134,7 @@ def test_retry_watchtower(node_factory, bitcoind, teosd):
# Start the tower and retry it
teosd.start()
l2.rpc.retrytower(tower_id)
while l2.rpc.gettowerinfo(tower_id)["status"] != "reachable":
while l2.rpc.gettowerinfo(tower_id)["pending_appointments"]:
time.sleep(1)

assert l2.rpc.gettowerinfo(tower_id)["status"] == "reachable"
Expand Down

0 comments on commit 4fb7083

Please sign in to comment.