Skip to content

Commit

Permalink
fix: reconnect to relay
Browse files Browse the repository at this point in the history
  • Loading branch information
thesimplekid committed May 16, 2023
1 parent 9c64a5f commit f47298f
Showing 1 changed file with 70 additions and 21 deletions.
91 changes: 70 additions & 21 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::net::TcpStream;
use std::ops::Add;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::time::{Duration, Instant};

use anyhow::{bail, Result};
Expand All @@ -23,7 +24,9 @@ use nostr_sdk::{
};
use nostr_sdk::{EventId, RelayMessage, Tag, Url};
use tokio::io::{stdin, stdout};
use tungstenite::{connect, Message as WsMessage};
use tokio::sync::Mutex;
use tungstenite::stream::MaybeTlsStream;
use tungstenite::{connect, Message as WsMessage, WebSocket};

mod utils;

Expand Down Expand Up @@ -386,11 +389,35 @@ fn create_failure_note(
))
}

async fn connect_relay(url: Url) -> Result<WebSocket<MaybeTlsStream<TcpStream>>> {
// Attempt to reconnect
for _ in 0..100 {
if let Ok((new_socket, _response)) = connect(url.clone()) {
return Ok(new_socket);
} else {
info!("Attempted connection to {} failed", url);
// Delay before attempting reconnection
tokio::time::sleep(Duration::from_secs(10)).await;
}
}
for _ in 0..100 {
if let Ok((new_socket, _response)) = connect(url.clone()) {
return Ok(new_socket);
} else {
info!("Attempted connection to {} failed", url);
// Delay before attempting reconnection
tokio::time::sleep(Duration::from_secs(30)).await;
}
}

bail!("Relay connection attempts exceeded");
}

async fn event_stream(
connect_client_pubkey: XOnlyPublicKey,
relay: Url,
) -> Result<impl Stream<Item = RelayMessage>> {
let (mut socket, _response) = connect(relay).expect("Can't connect");
let mut socket = connect_relay(relay.clone()).await?;

// Subscription filter
let subscribe_to_requests = ClientMessage::new_req(
Expand All @@ -404,27 +431,49 @@ async fn event_stream(

let socket = Arc::new(Mutex::new(socket));

Ok(futures::stream::unfold(socket, |socket| async move {
loop {
let msg = socket
.lock()
.unwrap()
.read_message()
.expect("Error reading message");
let msg_text = msg.to_text().expect("Failed to convert message to text");
if let Ok(handled_message) = RelayMessage::from_json(msg_text) {
match &handled_message {
RelayMessage::Event { .. } | RelayMessage::Auth { .. } => {
break Some((handled_message, socket));
Ok(
futures::stream::unfold((socket, relay), |(mut socket, relay)| async move {
loop {
let msg = match socket.clone().lock().await.read_message() {
Ok(msg) => msg,
Err(err) => {
// Handle disconnection
info!("WebSocket disconnected: {}", err);
info!("Attempting to reconnect ...");
match connect_relay(relay.clone()).await {
Ok(new_socket) => socket = Arc::new(Mutex::new(new_socket)),
Err(err) => {
info!("{}", err);
return None;
}
}

continue;
}
_ => continue,
};

let msg_text = match msg.to_text() {
Ok(msg_test) => msg_test,
Err(_) => {
info!("Failed to convert message to text");
continue;
}
};

if let Ok(handled_message) = RelayMessage::from_json(msg_text) {
match &handled_message {
RelayMessage::Event { .. } | RelayMessage::Auth { .. } => {
break Some((handled_message, (socket, relay.clone())));
}
_ => continue,
}
} else {
info!("Got unexpected message: {}", msg_text);
}
} else {
info!("Got unexpected message: {}", msg_text);
}
}
})
.boxed())
})
.boxed(),
)
}

struct Limits {
Expand Down

0 comments on commit f47298f

Please sign in to comment.