Skip to content

Commit

Permalink
feat: reconnect after a connection is lost
Browse files Browse the repository at this point in the history
If the connection cannot be made when starting, the client will fail and
not reconnect. However, if the connection is lost after the client has
started, it will forever attempt to reconnect with a 1s delay.

Also clean up the implementation by removing previous mistakes
  • Loading branch information
mikavilpas committed May 12, 2024
1 parent 910de24 commit c46b247
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 46 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 0 additions & 21 deletions yazi-boot/src/boot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,27 +130,6 @@ impl Boot {
);
}
}

/// Allows for initialization without forcing the reading of command line
/// arguments.
pub fn init_with(local_events: HashSet<String>, remote_events: HashSet<String>) -> Boot {
let config_dir = Xdg::config_dir();

let (cwd, file) = Self::parse_entry(None);

Self {
cwd,
file,

local_events,
remote_events,

flavor_dir: config_dir.join("flavors"),
plugin_dir: config_dir.join("plugins"),
config_dir,
state_dir: Xdg::state_dir(),
}
}
}

impl Default for Boot {
Expand Down
1 change: 0 additions & 1 deletion yazi-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ homepage = "https://yazi-rs.github.io"
repository = "https://github.com/sxyazi/yazi"

[dependencies]
yazi-boot = { path = "../yazi-boot", version = "0.2.5" }
yazi-dds = { path = "../yazi-dds", version = "0.2.5" }
yazi-shared = { path = "../yazi-shared", version = "0.2.5" }

Expand Down
13 changes: 1 addition & 12 deletions yazi-cli/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub(super) enum Command {
/// Manage packages.
Pack(CommandPack),
/// Subscribe to messages from all remote instance(s).
SubStatic(CommandSubStatic),
Sub(CommandSub),
}

#[derive(clap::Args)]
Expand Down Expand Up @@ -100,17 +100,6 @@ pub(super) struct CommandPack {

#[derive(clap::Args)]
pub(super) struct CommandSub {
/// The sender ID whose messages we want to monitor.
#[arg(index = 1)]
pub(super) sender: u64,

/// The kind of messages we are interested in.
#[arg(index = 2)]
pub(super) kinds: String,
}

#[derive(clap::Args)]
pub(super) struct CommandSubStatic {
/// The kind of messages we are interested in.
#[arg(index = 1)]
pub(super) kinds: String,
Expand Down
2 changes: 1 addition & 1 deletion yazi-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ async fn main() -> anyhow::Result<()> {
}
}

Command::SubStatic(cmd) => {
Command::Sub(cmd) => {
yazi_dds::init();
let kinds = cmd.kinds.split(',').map(|s| s.to_owned()).collect::<HashSet<_>>();

Expand Down
50 changes: 40 additions & 10 deletions yazi-dds/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,21 +65,51 @@ impl Client {

/// Connect to an existing server and listen in on the messages that are being
/// sent by other yazi instances.
/// If no server is running, fail.
/// If no server is running, fail right away.
/// If a server is closed, attempt to reconnect forever.
pub async fn echo_events_to_stdout(kinds: HashSet<String>) -> Result<()> {
let (mut lines, mut writer) = Stream::connect().await?;
let hi = Payload::new(BodyHi::borrowed(kinds.iter().collect()));
writer.write_all(format!("{}\n", hi).as_bytes()).await?;
writer.flush().await?;
let mut lines = Self::connect_listener(&kinds).await?;
let mut msg_counter = 0;

while let Ok(Some(s)) = lines.next_line().await {
let kind = s.split(',').next();
if matches!(kind, Some(kind) if kinds.contains(kind)) {
println!("{}", s);
loop {
println!("Waiting for messages...");
match lines.next_line().await {
Ok(Some(s)) => {
println!("Received: message {} '{}'", msg_counter, s);
msg_counter += 1;
let kind = s.split(',').next();
if matches!(kind, Some(kind) if kinds.contains(kind)) {
println!("{}", s);
}
}
Ok(None) => loop {
println!("Connection closed");
match Self::connect_listener(&kinds).await {
Ok(new_lines) => {
lines = new_lines;
println!("Reconnected");
break;
}
Err(_) => {
println!("Reconnecting...");
time::sleep(time::Duration::from_secs(1)).await;
}
};
},
Err(e) => {
// could not establish initial connection
return Err(e.into());
}
}
}
}

Ok(())
async fn connect_listener(kinds: &HashSet<String>) -> Result<ClientReader> {
let (lines, mut writer) = Stream::connect().await?;
let hi = Payload::new(BodyHi::borrowed(kinds.iter().collect()));
writer.write_all(format!("{}\n", hi).as_bytes()).await?;
writer.flush().await?;
Ok(lines)
}

/// Connect to an existing server to send a single message.
Expand Down

0 comments on commit c46b247

Please sign in to comment.