Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions codex-rs/app-server-transport/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,20 @@ pub use outgoing_message::OutgoingError;
pub use outgoing_message::OutgoingMessage;
pub use outgoing_message::OutgoingResponse;
pub use outgoing_message::QueuedOutgoingMessage;
pub use transport::AppServerListenerStartup;
pub use transport::AppServerTransport;
pub use transport::AppServerTransportParseError;
pub use transport::CHANNEL_CAPACITY;
pub use transport::ConnectionOrigin;
pub use transport::EnsuredAppServerListener;
pub use transport::RemoteControlHandle;
pub use transport::RemoteControlStartConfig;
pub use transport::RemoteControlUnavailable;
pub use transport::TransportEvent;
pub use transport::app_server_control_socket_path;
pub use transport::app_server_startup_lock_path;
pub use transport::auth;
pub use transport::ensure_control_socket_listener;
pub use transport::start_control_socket_acceptor;
pub use transport::start_remote_control;
pub use transport::start_stdio_connection;
Expand Down
12 changes: 12 additions & 0 deletions codex-rs/app-server-transport/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,17 @@ pub use remote_control::RemoteControlStartConfig;
pub use remote_control::RemoteControlUnavailable;
pub use remote_control::start_remote_control;
pub use stdio::start_stdio_connection;
pub use unix_socket::AppServerListenerStartup;
pub use unix_socket::EnsuredAppServerListener;
pub use unix_socket::ensure_control_socket_listener;
pub use unix_socket::start_control_socket_acceptor;
pub use websocket::start_websocket_acceptor;

const OVERLOADED_ERROR_CODE: i64 = -32001;

const APP_SERVER_CONTROL_SOCKET_DIR_NAME: &str = "app-server-control";
const APP_SERVER_CONTROL_SOCKET_FILE_NAME: &str = "app-server-control.sock";
const APP_SERVER_STARTUP_LOCK_FILE_NAME: &str = "app-server-startup.lock";

pub fn app_server_control_socket_path(codex_home: &Path) -> std::io::Result<AbsolutePathBuf> {
AbsolutePathBuf::from_absolute_path(
Expand All @@ -51,6 +55,14 @@ pub fn app_server_control_socket_path(codex_home: &Path) -> std::io::Result<Abso
)
}

pub fn app_server_startup_lock_path(codex_home: &Path) -> std::io::Result<AbsolutePathBuf> {
AbsolutePathBuf::from_absolute_path(
codex_home
.join(APP_SERVER_CONTROL_SOCKET_DIR_NAME)
.join(APP_SERVER_STARTUP_LOCK_FILE_NAME),
)
}

#[derive(Clone, Debug, Eq, PartialEq)]
pub enum AppServerTransport {
Stdio,
Expand Down
103 changes: 103 additions & 0 deletions codex-rs/app-server-transport/src/transport/unix_socket.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::fs::OpenOptions;
use std::future::Future;
use std::io::ErrorKind;
use std::io::Result as IoResult;
use std::path::Path;
Expand All @@ -11,6 +13,7 @@ use futures::StreamExt;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio::time::Duration;
use tokio::time::sleep;
use tokio_tungstenite::accept_async;
use tokio_util::sync::CancellationToken;
use tracing::error;
Expand All @@ -19,6 +22,18 @@ use tracing::warn;

#[cfg(unix)]
const CONTROL_SOCKET_MODE: u32 = 0o600;
const CONTROL_SOCKET_READY_POLL_INTERVAL: Duration = Duration::from_millis(50);

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum EnsuredAppServerListener {
ReusedExisting,
StartedNew,
}

pub enum AppServerListenerStartup {
Untracked,
Child(std::process::Child),
}

pub async fn start_control_socket_acceptor(
socket_path: AbsolutePathBuf,
Expand All @@ -42,6 +57,33 @@ pub async fn start_control_socket_acceptor(
)))
}

pub async fn ensure_control_socket_listener<F, Fut>(
socket_path: AbsolutePathBuf,
startup_lock_path: AbsolutePathBuf,
start_listener: F,
) -> IoResult<EnsuredAppServerListener>
where
F: FnOnce() -> Fut,
Fut: Future<Output = IoResult<AppServerListenerStartup>>,
{
if probe_existing_control_socket(socket_path.as_path()).await? {
return Ok(EnsuredAppServerListener::ReusedExisting);
}

if let Some(parent) = startup_lock_path.as_path().parent() {
codex_uds::prepare_private_socket_directory(parent).await?;
}
let _startup_lock = acquire_app_server_startup_lock(startup_lock_path).await?;

if probe_existing_control_socket(socket_path.as_path()).await? {
return Ok(EnsuredAppServerListener::ReusedExisting);
}

let startup = start_listener().await?;
wait_for_control_socket_listener(socket_path.as_path(), startup).await?;
Ok(EnsuredAppServerListener::StartedNew)
}

async fn run_control_socket_acceptor(
mut listener: UnixListener,
transport_event_tx: mpsc::Sender<TransportEvent>,
Expand Down Expand Up @@ -130,6 +172,67 @@ async fn prepare_control_socket_path(socket_path: &Path) -> IoResult<()> {
tokio::fs::remove_file(socket_path).await
}

async fn probe_existing_control_socket(socket_path: &Path) -> IoResult<bool> {
match UnixStream::connect(socket_path).await {
Ok(_stream) => Ok(true),
Err(err)
if matches!(
err.kind(),
ErrorKind::ConnectionRefused | ErrorKind::NotFound
) =>
{
Ok(false)
}
Err(err) => {
if !socket_path.exists() {
return Ok(false);
}
Err(err)
}
}
}

async fn wait_for_control_socket_listener(
socket_path: &Path,
mut startup: AppServerListenerStartup,
) -> IoResult<()> {
loop {
if probe_existing_control_socket(socket_path).await? {
return Ok(());
}
if let AppServerListenerStartup::Child(child) = &mut startup
&& let Some(status) = child.try_wait()?
{
return Err(std::io::Error::other(format!(
"detached app-server listener exited before opening {} with status {status}",
socket_path.display()
)));
}
sleep(CONTROL_SOCKET_READY_POLL_INTERVAL).await;
Comment thread
efrazer-oai marked this conversation as resolved.
}
}

struct AppServerStartupLock {
_file: std::fs::File,
}

async fn acquire_app_server_startup_lock(
startup_lock_path: AbsolutePathBuf,
) -> IoResult<AppServerStartupLock> {
tokio::task::spawn_blocking(move || {
let file = OpenOptions::new()
.create(true)
.read(true)
.write(true)
.truncate(false)
.open(startup_lock_path.as_path())?;
file.lock()?;
Ok(AppServerStartupLock { _file: file })
})
.await
.map_err(|err| std::io::Error::other(format!("startup lock task failed: {err}")))?
}

#[cfg(unix)]
async fn set_control_socket_permissions(socket_path: &Path) -> IoResult<()> {
use std::os::unix::fs::PermissionsExt;
Expand Down
Loading
Loading