Skip to content

Commit

Permalink
sdk: fix stop and start flow
Browse files Browse the repository at this point in the history
  • Loading branch information
yukibtc committed Oct 10, 2023
1 parent 8b94861 commit 8ee3cc4
Showing 1 changed file with 72 additions and 71 deletions.
143 changes: 72 additions & 71 deletions crates/nostr-sdk/src/relay/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ pub struct Relay {
document: Arc<RwLock<RelayInformationDocument>>,
opts: RelayOptions,
stats: RelayConnectionStats,
auto_connect_loop_running: Arc<AtomicBool>,
// auto_connect_loop_running: Arc<AtomicBool>,
scheduled_for_stop: Arc<AtomicBool>,
scheduled_for_termination: Arc<AtomicBool>,
pool_sender: Sender<RelayPoolMessage>,
Expand Down Expand Up @@ -286,7 +286,7 @@ impl Relay {
document: Arc::new(RwLock::new(RelayInformationDocument::new())),
opts,
stats: RelayConnectionStats::new(),
auto_connect_loop_running: Arc::new(AtomicBool::new(false)),
// auto_connect_loop_running: Arc::new(AtomicBool::new(false)),
scheduled_for_stop: Arc::new(AtomicBool::new(false)),
scheduled_for_termination: Arc::new(AtomicBool::new(false)),
pool_sender,
Expand Down Expand Up @@ -314,7 +314,7 @@ impl Relay {
document: Arc::new(RwLock::new(RelayInformationDocument::new())),
opts,
stats: RelayConnectionStats::new(),
auto_connect_loop_running: Arc::new(AtomicBool::new(false)),
// auto_connect_loop_running: Arc::new(AtomicBool::new(false)),
scheduled_for_stop: Arc::new(AtomicBool::new(false)),
scheduled_for_termination: Arc::new(AtomicBool::new(false)),
pool_sender,
Expand Down Expand Up @@ -419,15 +419,15 @@ impl Relay {
self.relay_sender.max_capacity() - self.relay_sender.capacity()
}

fn is_auto_connect_loop_running(&self) -> bool {
/* fn is_auto_connect_loop_running(&self) -> bool {
self.auto_connect_loop_running.load(Ordering::SeqCst)
}
fn set_auto_connect_loop_running(&self, value: bool) {
let _ =
self.auto_connect_loop_running
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(value));
}
} */

fn is_scheduled_for_stop(&self) -> bool {
self.scheduled_for_stop.load(Ordering::SeqCst)
Expand Down Expand Up @@ -463,84 +463,85 @@ impl Relay {
}
}

if !self.is_auto_connect_loop_running() {
self.set_auto_connect_loop_running(true);
// TODO: temp disable because cause stop-start issues
/* if !self.is_auto_connect_loop_running() {
self.set_auto_connect_loop_running(true); */

tracing::debug!("Auto connect loop started for {}", self.url);
tracing::debug!("Auto connect loop started for {}", self.url);

if !wait_for_connection {
self.set_status(RelayStatus::Initialized).await;
}
if !wait_for_connection {
self.set_status(RelayStatus::Initialized).await;
}

let relay = self.clone();
thread::spawn(async move {
loop {
let queue = relay.queue();
if queue > 0 {
tracing::info!(
"{} messages queued for {} (capacity: {})",
queue,
relay.url(),
relay.relay_sender.capacity()
);
}
let relay = self.clone();
thread::abortable(async move {
loop {
let queue = relay.queue();
if queue > 0 {
tracing::info!(
"{} messages queued for {} (capacity: {})",
queue,
relay.url(),
relay.relay_sender.capacity()
);
}

// Schedule relay for termination
// Needed to terminate the auto reconnect loop, also if the relay is not connected yet.
if relay.is_scheduled_for_stop() {
relay.set_status(RelayStatus::Stopped).await;
relay.schedule_for_stop(false);
tracing::debug!(
"Auto connect loop terminated for {} [stop - schedule]",
relay.url
);
break;
} else if relay.is_scheduled_for_termination() {
relay.set_status(RelayStatus::Terminated).await;
relay.schedule_for_termination(false);
tracing::debug!(
"Auto connect loop terminated for {} [schedule]",
relay.url
);
// Schedule relay for termination
// Needed to terminate the auto reconnect loop, also if the relay is not connected yet.
if relay.is_scheduled_for_stop() {
relay.set_status(RelayStatus::Stopped).await;
relay.schedule_for_stop(false);
tracing::debug!(
"Auto connect loop terminated for {} [stop - schedule]",
relay.url
);
break;
} else if relay.is_scheduled_for_termination() {
relay.set_status(RelayStatus::Terminated).await;
relay.schedule_for_termination(false);
tracing::debug!(
"Auto connect loop terminated for {} [schedule]",
relay.url
);
break;
}

// Check status
match relay.status().await {
RelayStatus::Initialized | RelayStatus::Disconnected => {
relay.try_connect().await
}
RelayStatus::Stopped | RelayStatus::Terminated => {
tracing::debug!("Auto connect loop terminated for {}", relay.url);
break;
}
_ => (),
};

// Check status
match relay.status().await {
RelayStatus::Initialized | RelayStatus::Disconnected => {
relay.try_connect().await
}
RelayStatus::Stopped | RelayStatus::Terminated => {
tracing::debug!("Auto connect loop terminated for {}", relay.url);
break;
}
_ => (),
};

let retry_sec: u64 = if relay.opts.get_adjust_retry_sec() {
let var: u64 =
relay.stats.attempts().saturating_sub(relay.stats.success()) as u64;
if var >= 3 {
let retry_interval: i64 =
cmp::min(MIN_RETRY_SEC * (1 + var), MAX_ADJ_RETRY_SEC) as i64;
let jitter: i64 = rand::thread_rng().gen_range(-1..=1);
retry_interval.saturating_add(jitter) as u64
} else {
relay.opts().get_retry_sec()
}
let retry_sec: u64 = if relay.opts.get_adjust_retry_sec() {
let var: u64 =
relay.stats.attempts().saturating_sub(relay.stats.success()) as u64;
if var >= 3 {
let retry_interval: i64 =
cmp::min(MIN_RETRY_SEC * (1 + var), MAX_ADJ_RETRY_SEC) as i64;
let jitter: i64 = rand::thread_rng().gen_range(-1..=1);
retry_interval.saturating_add(jitter) as u64
} else {
relay.opts().get_retry_sec()
};
}
} else {
relay.opts().get_retry_sec()
};

tracing::trace!("{} retry time set to {retry_sec} secs", relay.url);
thread::sleep(Duration::from_secs(retry_sec)).await;
}
tracing::trace!("{} retry time set to {retry_sec} secs", relay.url);
thread::sleep(Duration::from_secs(retry_sec)).await;
}

relay.set_auto_connect_loop_running(false);
});
} else {
//relay.set_auto_connect_loop_running(false);
});
/* } else {
tracing::warn!("Auto connect loop for {} is already running!", self.url)
}
} */
} else if let RelayStatus::Initialized | RelayStatus::Stopped | RelayStatus::Terminated =
self.status().await
{
Expand Down

0 comments on commit 8ee3cc4

Please sign in to comment.