Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions codex-rs/app-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,12 +562,13 @@ pub async fn run_main_with_transport_options(

let feedback_layer = feedback.logger_layer();
let feedback_metadata_layer = feedback.metadata_layer();
let state_db = codex_state::StateRuntime::init(
let state_db_result = codex_state::StateRuntime::init(
config.sqlite_home.clone(),
config.model_provider_id.clone(),
)
.await
.ok();
.await;
let state_db_init_error = state_db_result.as_ref().err().map(ToString::to_string);
let state_db = state_db_result.ok();
let log_db = state_db.clone().map(log_db::start);
let log_db_layer = log_db
.clone()
Expand All @@ -588,6 +589,9 @@ pub async fn run_main_with_transport_options(
None => error!("{}", warning.summary),
}
}
if let Some(err) = &state_db_init_error {
error!("failed to initialize sqlite state db: {err}");
}

let transport_shutdown_token = CancellationToken::new();
let mut transport_accept_handles = Vec::<JoinHandle<()>>::new();
Expand Down Expand Up @@ -633,11 +637,19 @@ pub async fn run_main_with_transport_options(
let auth_manager =
AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false).await;

let remote_control_enabled = config.features.enabled(Feature::RemoteControl);
let remote_control_config_enabled = config.features.enabled(Feature::RemoteControl);
let remote_control_enabled = remote_control_config_enabled && state_db.is_some();
if remote_control_config_enabled && state_db.is_none() {
error!("remote control disabled because sqlite state db is unavailable");
}
if transport_accept_handles.is_empty() && !remote_control_enabled {
return Err(std::io::Error::new(
ErrorKind::InvalidInput,
"no transport configured; use --listen or enable remote control",
if remote_control_config_enabled && state_db.is_none() {
"no transport configured; remote control disabled because sqlite state db is unavailable"
} else {
"no transport configured; use --listen or enable remote control"
},
));
}

Expand Down
55 changes: 32 additions & 23 deletions codex-rs/app-server/src/transport/remote_control/enroll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@ pub(super) async fn load_persisted_remote_control_enrollment(
remote_control_target: &RemoteControlTarget,
account_id: &str,
app_server_client_name: Option<&str>,
) -> Option<RemoteControlEnrollment> {
) -> io::Result<Option<RemoteControlEnrollment>> {
let Some(state_db) = state_db else {
info!(
"remote control enrollment cache unavailable because sqlite state db is disabled: websocket_url={}, account_id={}, app_server_client_name={:?}",
remote_control_target.websocket_url, account_id, app_server_client_name
);
return None;
return Err(io::Error::new(
ErrorKind::NotFound,
format!(
"remote control enrollment cache unavailable because sqlite state db is disabled: websocket_url={}, account_id={}, app_server_client_name={:?}",
remote_control_target.websocket_url, account_id, app_server_client_name
),
));
};
let enrollment = match state_db
.get_remote_control_enrollment(
Expand All @@ -60,7 +62,7 @@ pub(super) async fn load_persisted_remote_control_enrollment(
"failed to load persisted remote control enrollment: websocket_url={}, account_id={}, app_server_client_name={:?}, err={err}",
remote_control_target.websocket_url, account_id, app_server_client_name
);
return None;
return Err(io::Error::other(err));
}
};

Expand All @@ -74,19 +76,19 @@ pub(super) async fn load_persisted_remote_control_enrollment(
enrollment.server_id,
enrollment.environment_id
);
Some(RemoteControlEnrollment {
Ok(Some(RemoteControlEnrollment {
account_id: enrollment.account_id,
environment_id: enrollment.environment_id,
server_id: enrollment.server_id,
server_name: enrollment.server_name,
})
}))
}
None => {
info!(
"no persisted remote control enrollment found: websocket_url={}, account_id={}, app_server_client_name={:?}",
remote_control_target.websocket_url, account_id, app_server_client_name
);
None
Ok(None)
}
}
}
Expand All @@ -99,14 +101,16 @@ pub(super) async fn update_persisted_remote_control_enrollment(
enrollment: Option<&RemoteControlEnrollment>,
) -> io::Result<()> {
let Some(state_db) = state_db else {
info!(
"skipping remote control enrollment persistence because sqlite state db is disabled: websocket_url={}, account_id={}, app_server_client_name={:?}, has_enrollment={}",
remote_control_target.websocket_url,
account_id,
app_server_client_name,
enrollment.is_some()
);
return Ok(());
return Err(io::Error::new(
ErrorKind::NotFound,
format!(
"remote control enrollment persistence unavailable because sqlite state db is disabled: websocket_url={}, account_id={}, app_server_client_name={:?}, has_enrollment={}",
remote_control_target.websocket_url,
account_id,
app_server_client_name,
enrollment.is_some()
),
));
};
if let &Some(enrollment) = &enrollment
&& enrollment.account_id != account_id
Expand Down Expand Up @@ -322,7 +326,8 @@ mod tests {
"account-a",
Some("desktop-client"),
)
.await,
.await
.expect("first enrollment should load"),
Some(first_enrollment.clone())
);
assert_eq!(
Expand All @@ -332,7 +337,8 @@ mod tests {
"account-b",
Some("desktop-client"),
)
.await,
.await
.expect("missing account should load"),
None
);
assert_eq!(
Expand All @@ -342,7 +348,8 @@ mod tests {
"account-a",
Some("desktop-client"),
)
.await,
.await
.expect("second enrollment should load"),
Some(second_enrollment)
);
}
Expand Down Expand Up @@ -405,7 +412,8 @@ mod tests {
"account-a",
/*app_server_client_name*/ None,
)
.await,
.await
.expect("cleared enrollment should load"),
None
);
assert_eq!(
Expand All @@ -415,7 +423,8 @@ mod tests {
"account-a",
/*app_server_client_name*/ None,
)
.await,
.await
.expect("remaining enrollment should load"),
Some(second_enrollment)
);
}
Expand Down
14 changes: 14 additions & 0 deletions codex-rs/app-server/src/transport/remote_control/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use tokio::sync::oneshot;
use tokio::sync::watch;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::warn;

pub(super) struct QueuedServerEnvelope {
pub(super) event: ServerEvent,
Expand All @@ -32,10 +33,16 @@ pub(super) struct QueuedServerEnvelope {
#[derive(Clone)]
pub(crate) struct RemoteControlHandle {
enabled_tx: Arc<watch::Sender<bool>>,
state_db_available: bool,
}

impl RemoteControlHandle {
pub(crate) fn set_enabled(&self, enabled: bool) {
let requested_enabled = enabled;
let enabled = enabled && self.state_db_available;
if requested_enabled && !self.state_db_available {
warn!("remote control cannot be enabled because sqlite state db is unavailable");
}
self.enabled_tx.send_if_modified(|state| {
let changed = *state != enabled;
*state = enabled;
Expand All @@ -53,6 +60,12 @@ pub(crate) async fn start_remote_control(
app_server_client_name_rx: Option<oneshot::Receiver<String>>,
initial_enabled: bool,
) -> io::Result<(JoinHandle<()>, RemoteControlHandle)> {
let state_db_available = state_db.is_some();
let requested_initial_enabled = initial_enabled;
let initial_enabled = initial_enabled && state_db_available;
if requested_initial_enabled && !state_db_available {
warn!("remote control disabled because sqlite state db is unavailable");
}
let remote_control_target = if initial_enabled {
Some(normalize_remote_control_url(&remote_control_url)?)
} else {
Expand All @@ -78,6 +91,7 @@ pub(crate) async fn start_remote_control(
join_handle,
RemoteControlHandle {
enabled_tx: Arc::new(enabled_tx),
state_db_available,
},
))
}
Expand Down
45 changes: 42 additions & 3 deletions codex-rs/app-server/src/transport/remote_control/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ async fn remote_control_start_allows_missing_auth_when_enabled() {
let shutdown_token = CancellationToken::new();
let (remote_task, _remote_handle) = start_remote_control(
remote_control_url,
/*state_db*/ None,
Some(remote_control_state_runtime(&codex_home).await),
auth_manager,
transport_event_tx,
shutdown_token.clone(),
Expand All @@ -525,6 +525,43 @@ async fn remote_control_start_allows_missing_auth_when_enabled() {
.expect("remote control task should join");
}

#[tokio::test]
async fn remote_control_start_disables_remote_control_without_state_db() {
let listener = TcpListener::bind("127.0.0.1:0")
.await
.expect("listener should bind");
let remote_control_url = remote_control_url_for_listener(&listener);
let (transport_event_tx, _transport_event_rx) =
mpsc::channel::<TransportEvent>(CHANNEL_CAPACITY);
let shutdown_token = CancellationToken::new();
let (remote_task, remote_handle) = start_remote_control(
remote_control_url,
/*state_db*/ None,
remote_control_auth_manager(),
transport_event_tx,
shutdown_token.clone(),
/*app_server_client_name_rx*/ None,
/*initial_enabled*/ true,
)
.await
.expect("remote control should start disabled without sqlite state db");

timeout(Duration::from_millis(100), listener.accept())
.await
.expect_err("remote control should not connect without sqlite state db");

remote_handle.set_enabled(/*enabled*/ true);
timeout(Duration::from_millis(100), listener.accept())
.await
.expect_err("remote control should remain disabled without sqlite state db");

shutdown_token.cancel();
timeout(Duration::from_secs(1), remote_task)
.await
.expect("remote control task should stop")
.expect("remote control task should join");
}

#[tokio::test]
async fn remote_control_handle_set_enabled_stops_and_restarts_connections() {
let listener = TcpListener::bind("127.0.0.1:0")
Expand Down Expand Up @@ -1001,7 +1038,8 @@ async fn remote_control_http_mode_reuses_persisted_enrollment_before_reenrolling
"account_id",
/*app_server_client_name*/ None,
)
.await,
.await
.expect("persisted enrollment should load"),
Some(persisted_enrollment)
);

Expand Down Expand Up @@ -1231,7 +1269,8 @@ async fn remote_control_http_mode_clears_stale_persisted_enrollment_after_404()
"account_id",
/*app_server_client_name*/ None,
)
.await,
.await
.expect("refreshed enrollment should load"),
Some(refreshed_enrollment)
);

Expand Down
50 changes: 45 additions & 5 deletions codex-rs/app-server/src/transport/remote_control/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,14 @@ pub(super) async fn connect_remote_control_websocket(
)> {
ensure_rustls_crypto_provider();

let Some(state_db) = state_db else {
*enrollment = None;
return Err(io::Error::new(
ErrorKind::NotFound,
"remote control requires sqlite state db",
));
};

let auth = load_remote_control_auth(auth_manager).await?;
let enrollment_account_id = enrollment.as_ref().map(|enrollment| &enrollment.account_id);
if enrollment_account_id.is_some_and(|account_id| account_id != &auth.account_id) {
Expand All @@ -769,12 +777,12 @@ pub(super) async fn connect_remote_control_websocket(

if enrollment.is_none() {
*enrollment = load_persisted_remote_control_enrollment(
state_db,
Some(state_db),
remote_control_target,
&auth.account_id,
app_server_client_name,
)
.await;
.await?;
}

if enrollment.is_none() {
Expand All @@ -796,15 +804,17 @@ pub(super) async fn connect_remote_control_websocket(
Err(err) => return Err(err),
};
if let Err(err) = update_persisted_remote_control_enrollment(
state_db,
Some(state_db),
remote_control_target,
&auth.account_id,
app_server_client_name,
Some(&new_enrollment),
)
.await
{
warn!("failed to persist remote control enrollment in sqlite state db: {err}");
return Err(io::Error::other(format!(
"failed to persist remote control enrollment in sqlite state db: {err}"
)));
}
info!(
"created new remote control enrollment: websocket_url={}, account_id={}, server_id={}, environment_id={}",
Expand Down Expand Up @@ -839,7 +849,7 @@ pub(super) async fn connect_remote_control_websocket(
enrollment_ref.environment_id
);
if let Err(clear_err) = update_persisted_remote_control_enrollment(
state_db,
Some(state_db),
remote_control_target,
&auth.account_id,
app_server_client_name,
Expand Down Expand Up @@ -1214,6 +1224,36 @@ mod tests {
);
}

#[tokio::test]
async fn connect_remote_control_websocket_requires_sqlite_state_db() {
let remote_control_target = normalize_remote_control_url("http://127.0.0.1:9/backend-api/")
.expect("target should parse");
let auth_manager = remote_control_auth_manager();
let mut auth_recovery = auth_manager.unauthorized_recovery();
let mut enrollment = Some(RemoteControlEnrollment {
account_id: "account_id".to_string(),
environment_id: "env_test".to_string(),
server_id: "srv_e_test".to_string(),
server_name: "test-server".to_string(),
});

let err = connect_remote_control_websocket(
&remote_control_target,
/*state_db*/ None,
&auth_manager,
&mut auth_recovery,
&mut enrollment,
/*subscribe_cursor*/ None,
/*app_server_client_name*/ None,
)
.await
.expect_err("missing sqlite state db should fail remote control");

assert_eq!(err.kind(), ErrorKind::NotFound);
assert_eq!(err.to_string(), "remote control requires sqlite state db");
assert_eq!(enrollment, None);
}

#[tokio::test]
async fn run_remote_control_websocket_loop_shutdown_cancels_reconnect_backoff() {
let listener = TcpListener::bind("127.0.0.1:0")
Expand Down
Loading