From 729f9224d3f0b8ce2b8fba584e62fd3fcc1cb541 Mon Sep 17 00:00:00 2001 From: Vladislav Ivanov Date: Fri, 16 Dec 2022 16:01:27 +0100 Subject: [PATCH] Use unix domain sockets instead of pipes for SSH RPC Unline pipes, UDSs can be opened in a non-blocking way. And unlike pipes, when opened in a non-blocking way they will still signal EOF. This should solve the issue with early git stdout EOF commit-id:233e2cae --- Cargo.lock | 1 + josh-proxy/src/bin/josh-proxy.rs | 23 +++----- josh-rpc/src/calls.rs | 4 +- josh-rpc/src/lib.rs | 1 - josh-rpc/src/named_pipe.rs | 68 ------------------------ josh-ssh-shell/Cargo.toml | 1 + josh-ssh-shell/src/bin/josh-ssh-shell.rs | 62 ++++++++------------- 7 files changed, 32 insertions(+), 128 deletions(-) delete mode 100644 josh-rpc/src/named_pipe.rs diff --git a/Cargo.lock b/Cargo.lock index f4a49107b..6d36cbdb0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1175,6 +1175,7 @@ dependencies = [ "reqwest", "serde_json", "shell-words", + "tempfile", "thiserror", "tokio", "tokio-util", diff --git a/josh-proxy/src/bin/josh-proxy.rs b/josh-proxy/src/bin/josh-proxy.rs index f0f370425..cc383a634 100644 --- a/josh-proxy/src/bin/josh-proxy.rs +++ b/josh-proxy/src/bin/josh-proxy.rs @@ -17,7 +17,6 @@ use hyper_reverse_proxy; use indoc::formatdoc; use josh::{josh_error, JoshError}; use josh_rpc::calls::RequestedCommand; -use josh_rpc::tokio_fd::IntoAsyncFd; use serde::Serialize; use std::collections::HashMap; use std::io; @@ -26,6 +25,7 @@ use std::process::Stdio; use std::str::FromStr; use std::sync::{Arc, RwLock}; use tokio::io::AsyncWriteExt; +use tokio::net::UnixStream; use tokio::process::Command; use tracing::{trace, Span}; use tracing_futures::Instrument; @@ -609,14 +609,10 @@ async fn serve_namespace( let mut stdout = stdout; // Dropping the handle at the end of this block will generate EOF at the other end - let mut stdout_pipe_handle = std::fs::OpenOptions::new() - .read(true) - .write(true) - .open(¶ms.stdout_pipe)? - .into_async_fd()?; - - tokio::io::copy(&mut stdout, &mut stdout_pipe_handle).await?; - stdout_pipe_handle.flush().await + let mut stdout_stream = UnixStream::connect(¶ms.stdout_sock).await?; + + tokio::io::copy(&mut stdout, &mut stdout_stream).await?; + stdout_stream.flush().await }; copy_future.await.map_err(|e| ServeError::FifoError(e)) @@ -629,14 +625,9 @@ async fn serve_namespace( let copy_future = async { // See comment about stdout above let mut stdin = stdin; + let mut stdin_stream = UnixStream::connect(¶ms.stdin_sock).await?; - let mut stdin_pipe_handle = std::fs::OpenOptions::new() - .read(true) - .write(true) - .open(¶ms.stdin_pipe)? - .into_async_fd()?; - - tokio::io::copy(&mut stdin_pipe_handle, &mut stdin).await?; + tokio::io::copy(&mut stdin_stream, &mut stdin).await?; // Flushing is necessary to ensure file handle is closed when // it goes out of scope / dropped diff --git a/josh-rpc/src/calls.rs b/josh-rpc/src/calls.rs index 1d821eed5..6bacfeed8 100644 --- a/josh-rpc/src/calls.rs +++ b/josh-rpc/src/calls.rs @@ -11,8 +11,8 @@ pub enum RequestedCommand { #[derive(Serialize, Deserialize, Debug)] pub struct ServeNamespace { pub command: RequestedCommand, - pub stdin_pipe: PathBuf, - pub stdout_pipe: PathBuf, + pub stdin_sock: PathBuf, + pub stdout_sock: PathBuf, pub ssh_socket: PathBuf, pub query: String, } diff --git a/josh-rpc/src/lib.rs b/josh-rpc/src/lib.rs index 3565d96ac..a6ecda363 100644 --- a/josh-rpc/src/lib.rs +++ b/josh-rpc/src/lib.rs @@ -1,3 +1,2 @@ pub mod calls; -pub mod named_pipe; pub mod tokio_fd; diff --git a/josh-rpc/src/named_pipe.rs b/josh-rpc/src/named_pipe.rs deleted file mode 100644 index b8a802293..000000000 --- a/josh-rpc/src/named_pipe.rs +++ /dev/null @@ -1,68 +0,0 @@ -extern crate libc; -extern crate rand; - -use rand::distributions::Alphanumeric; -use rand::{thread_rng, Rng}; -use std::ffi::CString; -use std::io::Error; -use std::path::{Path, PathBuf}; -use std::{env, io}; - -const TEMP_SUFFIX_LENGTH: usize = 32; -const PIPE_CREATE_ATTEMPTS: usize = 10; -const PIPE_FILEMODE: libc::mode_t = 0o660; - -pub struct NamedPipe { - pub path: PathBuf, -} - -impl Drop for NamedPipe { - fn drop(&mut self) { - std::fs::remove_file(&self.path).unwrap(); - } -} - -impl NamedPipe { - pub fn new(prefix: &str) -> Result { - let created_pipe = try_make_pipe(prefix)?; - Ok(NamedPipe { path: created_pipe }) - } -} - -fn make_fifo(path: &Path) -> Result<(), io::Error> { - let path_str = path.to_str().unwrap(); - let path = CString::new(path_str).unwrap(); - let return_code = unsafe { libc::mkfifo(path.as_ptr(), PIPE_FILEMODE) }; - - match return_code { - 0 => Ok(()), - _ => Err(Error::last_os_error()), - } -} - -fn make_random_path(prefix: &str) -> PathBuf { - let temp_path = env::temp_dir(); - let rand_string: String = thread_rng() - .sample_iter(&Alphanumeric) - .take(TEMP_SUFFIX_LENGTH) - .map(char::from) - .collect(); - - let fifo_name = format!("{}-{}", prefix, rand_string); - temp_path.join(fifo_name) -} - -fn try_make_pipe(prefix: &str) -> Result { - for _ in 0..PIPE_CREATE_ATTEMPTS { - let pipe_path = make_random_path(prefix); - match make_fifo(pipe_path.as_path()) { - Ok(_) => return Ok(pipe_path), - Err(e) => match e.kind() { - io::ErrorKind::AlreadyExists => continue, - _ => (), - }, - } - } - - Err(io::Error::from(io::ErrorKind::AlreadyExists)) -} diff --git a/josh-ssh-shell/Cargo.toml b/josh-ssh-shell/Cargo.toml index 29cdcd9da..7266c6f8a 100644 --- a/josh-ssh-shell/Cargo.toml +++ b/josh-ssh-shell/Cargo.toml @@ -18,3 +18,4 @@ tracing-opentelemetry = "0.18.0" tracing-subscriber = { version = "0.3.16", features = ["env-filter"] } tokio-util = "0.7.4" thiserror = "1.0.37" +tempfile = "3.3.0" diff --git a/josh-ssh-shell/src/bin/josh-ssh-shell.rs b/josh-ssh-shell/src/bin/josh-ssh-shell.rs index 680179f61..66858983e 100644 --- a/josh-ssh-shell/src/bin/josh-ssh-shell.rs +++ b/josh-ssh-shell/src/bin/josh-ssh-shell.rs @@ -5,8 +5,6 @@ extern crate shell_words; use clap::Parser; use josh_rpc::calls::{RequestedCommand, ServeNamespace}; -use josh_rpc::named_pipe::NamedPipe; -use josh_rpc::tokio_fd::IntoAsyncFd; use reqwest::header::CONTENT_TYPE; use reqwest::StatusCode; use std::convert::TryFrom; @@ -17,6 +15,7 @@ use std::process::ExitCode; use std::time::Duration; use std::{env, fs, io, process}; use tokio::io::AsyncWriteExt; +use tokio::net::UnixListener; use tracing_subscriber::Layer; #[derive(Parser, Debug)] @@ -88,20 +87,21 @@ async fn handle_command( ssh_socket: &Path, query: &str, ) -> Result<(), CallError> { - let stdout_pipe = NamedPipe::new("josh-stdout")?; - let stdin_pipe = NamedPipe::new("josh-stdin")?; + let sock_path = tempfile::Builder::new().prefix("josh").tempdir()?; + + // Has a drop guard, so we don't replace the variable yet + let sock_path_buf = sock_path.path().to_path_buf(); + + let stdout_sock = sock_path_buf.join("stdout"); + let stdin_sock = sock_path_buf.join("stdin"); let stdin_cancel_token = tokio_util::sync::CancellationToken::new(); let stdin_cancel_token_stdout = stdin_cancel_token.clone(); - let stdin_cancel_token_http = stdin_cancel_token.clone(); - - let stdout_cancel_token = tokio_util::sync::CancellationToken::new(); - let stdout_cancel_token_http = stdout_cancel_token.clone(); let rpc_payload = ServeNamespace { command, - stdout_pipe: stdout_pipe.path.clone(), - stdin_pipe: stdin_pipe.path.clone(), + stdout_sock: stdout_sock.clone(), + stdin_sock: stdin_sock.clone(), ssh_socket: ssh_socket.to_path_buf(), query: query.to_string(), }; @@ -109,43 +109,26 @@ async fn handle_command( let read_stdout = async move { let _guard_stdin = stdin_cancel_token_stdout.drop_guard(); - let copy_future = async { - let mut stdout = josh_rpc::tokio_fd::AsyncFd::try_from(libc::STDOUT_FILENO)?; - let mut stdout_pipe_handle = std::fs::OpenOptions::new() - .read(true) - .write(true) - .open(stdout_pipe.path.as_path())? - .into_async_fd()?; - - tokio::io::copy(&mut stdout_pipe_handle, &mut stdout).await?; - stdout.flush().await?; + let mut stdout = josh_rpc::tokio_fd::AsyncFd::try_from(libc::STDOUT_FILENO)?; + let stdout_sock_handle = UnixListener::bind(&stdout_sock).unwrap(); + let (mut stdout_stream, _) = stdout_sock_handle.accept().await?; - Ok(()) - }; + tokio::io::copy(&mut stdout_stream, &mut stdout).await?; + stdout.flush().await?; - tokio::select! { - copy_result = copy_future => { - copy_result.map(|_| ()) - } - _ = stdout_cancel_token.cancelled() => { - Ok(()) - } - } + Ok(()) }; let write_stdin = async move { let copy_future = async { - // When the remote end sends EOF over the stdout_pipe, + // When the remote end sends EOF over the stdout_sock, // we should stop copying stuff here let mut stdin = josh_rpc::tokio_fd::AsyncFd::try_from(libc::STDIN_FILENO)?; - let mut stdin_pipe_handle = std::fs::OpenOptions::new() - .read(true) - .write(true) - .open(stdin_pipe.path.as_path())? - .into_async_fd()?; + let stdin_sock_handle = UnixListener::bind(&stdin_sock)?; + let (mut stdin_stream, _) = stdin_sock_handle.accept().await?; - tokio::io::copy(&mut stdin, &mut stdin_pipe_handle).await?; - stdin_pipe_handle.flush().await?; + tokio::io::copy(&mut stdin, &mut stdin_stream).await?; + stdin_stream.flush().await?; Ok(()) }; @@ -161,9 +144,6 @@ async fn handle_command( }; let make_request = async move { - let _guard_stdin = stdin_cancel_token_http.drop_guard(); - let _guard_stdout = stdout_cancel_token_http.drop_guard(); - let client = reqwest::Client::new(); let response = client .post(format!("{}/serve_namespace", get_endpoint()))