Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 7 additions & 16 deletions josh-proxy/src/bin/josh-proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use hyper_reverse_proxy;
use indoc::formatdoc;
use josh::{josh_error, JoshError};
use josh_rpc::calls::RequestedCommand;
use josh_rpc::tokio_fd::IntoAsyncFd;
use serde::Serialize;
use std::collections::HashMap;
use std::io;
Expand All @@ -26,6 +25,7 @@ use std::process::Stdio;
use std::str::FromStr;
use std::sync::{Arc, RwLock};
use tokio::io::AsyncWriteExt;
use tokio::net::UnixStream;
use tokio::process::Command;
use tracing::{trace, Span};
use tracing_futures::Instrument;
Expand Down Expand Up @@ -609,14 +609,10 @@ async fn serve_namespace(
let mut stdout = stdout;

// Dropping the handle at the end of this block will generate EOF at the other end
let mut stdout_pipe_handle = std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(&params.stdout_pipe)?
.into_async_fd()?;

tokio::io::copy(&mut stdout, &mut stdout_pipe_handle).await?;
stdout_pipe_handle.flush().await
let mut stdout_stream = UnixStream::connect(&params.stdout_sock).await?;

tokio::io::copy(&mut stdout, &mut stdout_stream).await?;
stdout_stream.flush().await
};

copy_future.await.map_err(|e| ServeError::FifoError(e))
Expand All @@ -629,14 +625,9 @@ async fn serve_namespace(
let copy_future = async {
// See comment about stdout above
let mut stdin = stdin;
let mut stdin_stream = UnixStream::connect(&params.stdin_sock).await?;

let mut stdin_pipe_handle = std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(&params.stdin_pipe)?
.into_async_fd()?;

tokio::io::copy(&mut stdin_pipe_handle, &mut stdin).await?;
tokio::io::copy(&mut stdin_stream, &mut stdin).await?;

// Flushing is necessary to ensure file handle is closed when
// it goes out of scope / dropped
Expand Down
4 changes: 2 additions & 2 deletions josh-rpc/src/calls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ pub enum RequestedCommand {
#[derive(Serialize, Deserialize, Debug)]
pub struct ServeNamespace {
pub command: RequestedCommand,
pub stdin_pipe: PathBuf,
pub stdout_pipe: PathBuf,
pub stdin_sock: PathBuf,
pub stdout_sock: PathBuf,
pub ssh_socket: PathBuf,
pub query: String,
}
1 change: 0 additions & 1 deletion josh-rpc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
pub mod calls;
pub mod named_pipe;
pub mod tokio_fd;
68 changes: 0 additions & 68 deletions josh-rpc/src/named_pipe.rs

This file was deleted.

1 change: 1 addition & 0 deletions josh-ssh-shell/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ tracing-opentelemetry = "0.18.0"
tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
tokio-util = "0.7.4"
thiserror = "1.0.37"
tempfile = "3.3.0"
62 changes: 21 additions & 41 deletions josh-ssh-shell/src/bin/josh-ssh-shell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ extern crate shell_words;

use clap::Parser;
use josh_rpc::calls::{RequestedCommand, ServeNamespace};
use josh_rpc::named_pipe::NamedPipe;
use josh_rpc::tokio_fd::IntoAsyncFd;
use reqwest::header::CONTENT_TYPE;
use reqwest::StatusCode;
use std::convert::TryFrom;
Expand All @@ -17,6 +15,7 @@ use std::process::ExitCode;
use std::time::Duration;
use std::{env, fs, io, process};
use tokio::io::AsyncWriteExt;
use tokio::net::UnixListener;
use tracing_subscriber::Layer;

#[derive(Parser, Debug)]
Expand Down Expand Up @@ -88,64 +87,48 @@ async fn handle_command(
ssh_socket: &Path,
query: &str,
) -> Result<(), CallError> {
let stdout_pipe = NamedPipe::new("josh-stdout")?;
let stdin_pipe = NamedPipe::new("josh-stdin")?;
let sock_path = tempfile::Builder::new().prefix("josh").tempdir()?;

// Has a drop guard, so we don't replace the variable yet
let sock_path_buf = sock_path.path().to_path_buf();

let stdout_sock = sock_path_buf.join("stdout");
let stdin_sock = sock_path_buf.join("stdin");

let stdin_cancel_token = tokio_util::sync::CancellationToken::new();
let stdin_cancel_token_stdout = stdin_cancel_token.clone();
let stdin_cancel_token_http = stdin_cancel_token.clone();

let stdout_cancel_token = tokio_util::sync::CancellationToken::new();
let stdout_cancel_token_http = stdout_cancel_token.clone();

let rpc_payload = ServeNamespace {
command,
stdout_pipe: stdout_pipe.path.clone(),
stdin_pipe: stdin_pipe.path.clone(),
stdout_sock: stdout_sock.clone(),
stdin_sock: stdin_sock.clone(),
ssh_socket: ssh_socket.to_path_buf(),
query: query.to_string(),
};

let read_stdout = async move {
let _guard_stdin = stdin_cancel_token_stdout.drop_guard();

let copy_future = async {
let mut stdout = josh_rpc::tokio_fd::AsyncFd::try_from(libc::STDOUT_FILENO)?;
let mut stdout_pipe_handle = std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(stdout_pipe.path.as_path())?
.into_async_fd()?;

tokio::io::copy(&mut stdout_pipe_handle, &mut stdout).await?;
stdout.flush().await?;
let mut stdout = josh_rpc::tokio_fd::AsyncFd::try_from(libc::STDOUT_FILENO)?;
let stdout_sock_handle = UnixListener::bind(&stdout_sock).unwrap();
let (mut stdout_stream, _) = stdout_sock_handle.accept().await?;

Ok(())
};
tokio::io::copy(&mut stdout_stream, &mut stdout).await?;
stdout.flush().await?;

tokio::select! {
copy_result = copy_future => {
copy_result.map(|_| ())
}
_ = stdout_cancel_token.cancelled() => {
Ok(())
}
}
Ok(())
};

let write_stdin = async move {
let copy_future = async {
// When the remote end sends EOF over the stdout_pipe,
// When the remote end sends EOF over the stdout_sock,
// we should stop copying stuff here
let mut stdin = josh_rpc::tokio_fd::AsyncFd::try_from(libc::STDIN_FILENO)?;
let mut stdin_pipe_handle = std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(stdin_pipe.path.as_path())?
.into_async_fd()?;
let stdin_sock_handle = UnixListener::bind(&stdin_sock)?;
let (mut stdin_stream, _) = stdin_sock_handle.accept().await?;

tokio::io::copy(&mut stdin, &mut stdin_pipe_handle).await?;
stdin_pipe_handle.flush().await?;
tokio::io::copy(&mut stdin, &mut stdin_stream).await?;
stdin_stream.flush().await?;

Ok(())
};
Expand All @@ -161,9 +144,6 @@ async fn handle_command(
};

let make_request = async move {
let _guard_stdin = stdin_cancel_token_http.drop_guard();
let _guard_stdout = stdout_cancel_token_http.drop_guard();

let client = reqwest::Client::new();
let response = client
.post(format!("{}/serve_namespace", get_endpoint()))
Expand Down