Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support ya sub subcommand for the Ya CLI #1004

Merged
merged 9 commits into from
Jun 16, 2024
9 changes: 9 additions & 0 deletions yazi-cli/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ pub(super) enum Command {
PubStatic(CommandPubStatic),
/// Manage packages.
Pack(CommandPack),
/// Subscribe to messages from all remote instance(s).
Sub(CommandSub),
}

#[derive(clap::Args)]
Expand Down Expand Up @@ -109,3 +111,10 @@ pub(super) struct CommandPack {
#[arg(short = 'u', long)]
pub(super) upgrade: bool,
}

#[derive(clap::Args)]
pub(super) struct CommandSub {
/// The kind of messages we are interested in.
#[arg(index = 1)]
pub(super) kinds: String,
}
11 changes: 11 additions & 0 deletions yazi-cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
mod args;
mod package;

use std::collections::HashSet;

use args::*;
use clap::Parser;

Expand Down Expand Up @@ -46,6 +48,15 @@ async fn main() -> anyhow::Result<()> {
package::Package::add_to_config(repo).await?;
}
}

Command::Sub(cmd) => {
yazi_dds::init();
let kinds = cmd.kinds.split(',').map(|s| s.to_owned()).collect::<HashSet<_>>();
sxyazi marked this conversation as resolved.
Show resolved Hide resolved

yazi_dds::Client::echo_events_to_stdout(kinds).await?;

tokio::signal::ctrl_c().await?;
}
}

Ok(())
Expand Down
2 changes: 2 additions & 0 deletions yazi-dds/src/body/hi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ use serde::{Deserialize, Serialize};

use super::Body;

/// The handshake message
#[derive(Debug, Serialize, Deserialize)]
pub struct BodyHi<'a> {
/// Specifies the kinds of events that the client can handle
pub abilities: HashSet<Cow<'a, String>>,
pub version: String,
}
Expand Down
51 changes: 48 additions & 3 deletions yazi-dds/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub struct Peer {
}

impl Client {
/// Connect to an existing server or start a new one.
pub(super) fn serve() {
let mut rx = QUEUE_RX.drop();
while rx.try_recv().is_ok() {}
Expand All @@ -52,7 +53,7 @@ impl Client {
if line.is_empty() {
continue;
} else if line.starts_with("hey,") {
Self::handle_hey(line);
Self::handle_hey(&line);
} else {
Payload::from_str(&line).map(|p| p.emit()).ok();
}
Expand All @@ -62,6 +63,50 @@ impl Client {
});
}

/// Connect to an existing server and listen in on the messages that are being
/// sent by other yazi instances.
/// If no server is running, fail right away.
/// If a server is closed, attempt to reconnect forever.
pub async fn echo_events_to_stdout(kinds: HashSet<String>) -> Result<()> {
let mut lines = Self::connect_listener(&kinds).await?;

loop {
match lines.next_line().await {
Ok(Some(s)) => {
let kind = s.split(',').next();
if matches!(kind, Some(kind) if kinds.contains(kind)) {
println!("{}", s);
}
}
Ok(None) => loop {
println!("Connection closed");
sxyazi marked this conversation as resolved.
Show resolved Hide resolved
match Self::connect_listener(&kinds).await {
Ok(new_lines) => {
lines = new_lines;
break;
}
Err(_) => {
time::sleep(time::Duration::from_secs(1)).await;
}
};
},
Err(e) => {
// could not establish initial connection
return Err(e.into());
}
sxyazi marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

async fn connect_listener(kinds: &HashSet<String>) -> Result<ClientReader> {
let (lines, mut writer) = Stream::connect().await?;
let hi = Payload::new(BodyHi::borrowed(kinds.iter().collect()));
writer.write_all(format!("{}\n", hi).as_bytes()).await?;
writer.flush().await?;
Ok(lines)
}

/// Connect to an existing server to send a single message.
pub async fn shot(kind: &str, receiver: u64, severity: Option<u16>, body: &str) -> Result<()> {
Body::validate(kind)?;

Expand Down Expand Up @@ -136,8 +181,8 @@ impl Client {
Self::connect(server).await
}

fn handle_hey(s: String) {
if let Ok(Body::Hey(mut hey)) = Payload::from_str(&s).map(|p| p.body) {
fn handle_hey(s: &str) {
if let Ok(Body::Hey(mut hey)) = Payload::from_str(s).map(|p| p.body) {
hey.peers.retain(|&id, _| id != *ID);
*PEERS.write() = hey.peers;
}
Expand Down