Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support ya sub subcommand for the Ya CLI #1004

Merged
merged 9 commits into from
Jun 16, 2024
9 changes: 9 additions & 0 deletions yazi-cli/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ pub(super) enum Command {
PubStatic(CommandPubStatic),
/// Manage packages.
Pack(CommandPack),
/// Subscribe to messages from all remote instances.
Sub(CommandSub),
}

#[derive(clap::Args)]
Expand Down Expand Up @@ -109,3 +111,10 @@ pub(super) struct CommandPack {
#[arg(short = 'u', long)]
pub(super) upgrade: bool,
}

#[derive(clap::Args)]
pub(super) struct CommandSub {
/// The kind of messages to subscribe to, separated by commas if multiple.
#[arg(index = 1)]
pub(super) kinds: String,
}
7 changes: 7 additions & 0 deletions yazi-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ async fn main() -> anyhow::Result<()> {
package::Package::add_to_config(repo).await?;
}
}

Command::Sub(cmd) => {
yazi_dds::init();
yazi_dds::Client::draw(cmd.kinds.split(',').collect()).await?;

tokio::signal::ctrl_c().await?;
}
}

Ok(())
Expand Down
4 changes: 2 additions & 2 deletions yazi-config/preset/yazi.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ ueberzug_offset = [ 0, 0, 0, 0 ]
[opener]
edit = [
{ run = '${EDITOR:=vi} "$@"', desc = "$EDITOR", block = true, for = "unix" },
{ run = 'code "%*"', orphan = true, desc = "code", for = "windows" },
{ run = 'code -w "%*"', block = true, desc = "code (block)", for = "windows" },
{ run = 'code %*', orphan = true, desc = "code", for = "windows" },
{ run = 'code -w %*', block = true, desc = "code (block)", for = "windows" },
]
open = [
{ run = 'xdg-open "$1"', desc = "Open", for = "linux" },
Expand Down
6 changes: 4 additions & 2 deletions yazi-dds/src/body/hi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@ use serde::{Deserialize, Serialize};

use super::Body;

/// The client handshake
#[derive(Debug, Serialize, Deserialize)]
pub struct BodyHi<'a> {
pub abilities: HashSet<Cow<'a, String>>,
/// Specifies the kinds of events that the client can handle
pub abilities: HashSet<Cow<'a, str>>,
pub version: String,
}

impl<'a> BodyHi<'a> {
#[inline]
pub fn borrowed(abilities: HashSet<&'a String>) -> Body<'a> {
pub fn borrowed(abilities: HashSet<&'a str>) -> Body<'a> {
Self {
abilities: abilities.into_iter().map(Cow::Borrowed).collect(),
version: Self::version(),
Expand Down
44 changes: 40 additions & 4 deletions yazi-dds/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{collections::{HashMap, HashSet}, mem, str::FromStr};

use anyhow::{bail, Result};
use anyhow::{bail, Context, Result};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use tokio::{io::AsyncWriteExt, select, sync::mpsc, task::JoinHandle, time};
Expand All @@ -27,6 +27,7 @@ pub struct Peer {
}

impl Client {
/// Connect to an existing server or start a new one.
pub(super) fn serve() {
let mut rx = QUEUE_RX.drop();
while rx.try_recv().is_ok() {}
Expand All @@ -52,7 +53,7 @@ impl Client {
if line.is_empty() {
continue;
} else if line.starts_with("hey,") {
Self::handle_hey(line);
Self::handle_hey(&line);
} else {
Payload::from_str(&line).map(|p| p.emit()).ok();
}
Expand All @@ -62,6 +63,7 @@ impl Client {
});
}

/// Connect to an existing server to send a single message.
pub async fn shot(kind: &str, receiver: u64, severity: Option<u16>, body: &str) -> Result<()> {
Body::validate(kind)?;

Expand Down Expand Up @@ -100,6 +102,40 @@ impl Client {
Ok(())
}

/// Connect to an existing server and listen in on the messages that are being
/// sent by other yazi instances:
/// - If no server is running, fail right away;
/// - If a server is closed, attempt to reconnect forever.
pub async fn draw(kinds: HashSet<&str>) -> Result<()> {
async fn make(kinds: &HashSet<&str>) -> Result<ClientReader> {
let (lines, mut writer) = Stream::connect().await?;
let hi = Payload::new(BodyHi::borrowed(kinds.clone()));
writer.write_all(format!("{hi}\n").as_bytes()).await?;
writer.flush().await?;
Ok(lines)
}

let mut lines = make(&kinds).await.context("No running Yazi instance found")?;
loop {
match lines.next_line().await? {
Some(s) => {
let kind = s.split(',').next();
if matches!(kind, Some(kind) if kinds.contains(kind)) {
println!("{s}");
}
}
None => loop {
if let Ok(new) = make(&kinds).await {
lines = new;
break;
} else {
time::sleep(time::Duration::from_secs(1)).await;
}
},
}
}
}

#[inline]
pub(super) fn push<'a>(payload: impl Into<Payload<'a>>) {
QUEUE_TX.send(format!("{}\n", payload.into())).ok();
Expand Down Expand Up @@ -136,8 +172,8 @@ impl Client {
Self::connect(server).await
}

fn handle_hey(s: String) {
if let Ok(Body::Hey(mut hey)) = Payload::from_str(&s).map(|p| p.body) {
fn handle_hey(s: &str) {
if let Ok(Body::Hey(mut hey)) = Payload::from_str(s).map(|p| p.body) {
hey.peers.retain(|&id, _| id != *ID);
*PEERS.write() = hey.peers;
}
Expand Down
2 changes: 1 addition & 1 deletion yazi-dds/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl Pubsub {

pub fn pub_from_hi() -> bool {
let abilities = REMOTE.read().keys().cloned().collect();
let abilities = BOOT.remote_events.union(&abilities).collect();
let abilities = BOOT.remote_events.union(&abilities).map(|s| s.as_str()).collect();

Client::push(BodyHi::borrowed(abilities));
true
Expand Down