Skip to content

Commit cecca5a

Browse files
Improve Windows process management edge cases (#19211)
## Summary Some improvements to Windows process-management issues from #15578 - bound the elevated runner pipe-connect handshake instead of waiting forever on blocking pipe connects - terminate the spawned runner if that handshake fails, so timeout/error paths do not leave a stray `codex-command-runner.exe` - loop on partial `WriteFile` results when forwarding stdin in the elevated runner, so input is not silently truncated - fix the concrete HANDLE/SID cleanup paths in the runner setup code - keep draining driver-backed stdout/stderr after exit until the backend closes, instead of dropping the tail after a fixed 200ms grace period - reuse `LocalSid` for SID ownership and add more explanatory comments around the ownership/concurrency-sensitive code paths ## Why The original PR fixed a lot of Windows session plumbing, but there were still a few sharp process-lifecycle edges: - some elevated runner handshakes could block forever - the new timeout path could still orphan the spawned runner process - stdin forwarding still assumed a single `WriteFile` consumed the whole buffer - a few raw HANDLE/SID error paths still leaked - driver-backed output could still lose the last chunk of stdout/stderr on slower backends ## Validation - `cargo fmt -p codex-windows-sandbox -p codex-utils-pty` - `cargo test -p codex-utils-pty` - `cargo test -p codex-windows-sandbox finish_driver_spawn` - `cargo test -p codex-windows-sandbox runner_` Ran a local test matrix of unified-exec and shell_tool tests, all passing
1 parent 1c420a9 commit cecca5a

8 files changed

Lines changed: 385 additions & 89 deletions

File tree

codex-rs/utils/pty/src/process.rs

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -330,22 +330,20 @@ pub fn spawn_from_driver(driver: ProcessDriver) -> SpawnedProcess {
330330
output_tx: mpsc::Sender<Vec<u8>>,
331331
mut exit_seen_rx: watch::Receiver<bool>| {
332332
tokio::spawn(async move {
333-
let mut process_exited = false;
334333
loop {
335-
let recv_result = if process_exited {
336-
match tokio::time::timeout(
337-
std::time::Duration::from_millis(200),
338-
output_rx.recv(),
339-
)
340-
.await
341-
{
342-
Ok(result) => result,
343-
Err(_) => break,
344-
}
334+
let recv_result = if *exit_seen_rx.borrow() {
335+
// Once exit has been observed, we no longer want a timer here. Some
336+
// backends publish the exit code before their final stdout/stderr bytes
337+
// have been forwarded through the broadcast channel, so a fixed grace
338+
// period can still drop the tail of the stream under load.
339+
//
340+
// Instead, keep waiting until the driver closes the broadcast sender.
341+
// That makes the shutdown contract explicit: the backend is responsible
342+
// for dropping its sender when it has truly finished forwarding output.
343+
output_rx.recv().await
345344
} else {
346345
tokio::select! {
347346
_ = exit_seen_rx.changed() => {
348-
process_exited = *exit_seen_rx.borrow();
349347
continue;
350348
}
351349
result = output_rx.recv() => result,

codex-rs/utils/pty/src/tests.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -688,6 +688,51 @@ async fn driver_backed_process_can_resize_via_resizer_hook() -> anyhow::Result<(
688688
Ok(())
689689
}
690690

691+
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
692+
async fn driver_backed_process_drains_output_that_arrives_after_exit_signal() -> anyhow::Result<()>
693+
{
694+
let (writer_tx, _writer_rx) = tokio::sync::mpsc::channel::<Vec<u8>>(1);
695+
let (stdout_tx, stdout_driver_rx) = tokio::sync::broadcast::channel::<Vec<u8>>(8);
696+
let (exit_tx, exit_rx) = tokio::sync::oneshot::channel::<i32>();
697+
698+
let spawned = spawn_from_driver(ProcessDriver {
699+
writer_tx,
700+
stdout_rx: stdout_driver_rx,
701+
stderr_rx: None,
702+
exit_rx,
703+
terminator: None,
704+
writer_handle: None,
705+
resizer: None,
706+
});
707+
708+
let SpawnedProcess {
709+
session: _session,
710+
stdout_rx,
711+
stderr_rx: _stderr_rx,
712+
exit_rx,
713+
} = spawned;
714+
let stdout_task = tokio::spawn(async move { collect_split_output(stdout_rx).await });
715+
716+
exit_tx.send(0).expect("send exit code");
717+
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
718+
stdout_tx.send(b"tail".to_vec())?;
719+
drop(stdout_tx);
720+
721+
let timeout = tokio::time::Duration::from_secs(2);
722+
let code = tokio::time::timeout(timeout, exit_rx)
723+
.await
724+
.map_err(|_| anyhow::anyhow!("timed out waiting for driver exit"))?
725+
.unwrap_or(-1);
726+
let stdout = tokio::time::timeout(timeout, stdout_task)
727+
.await
728+
.map_err(|_| anyhow::anyhow!("timed out waiting to drain driver stdout"))??;
729+
730+
assert_eq!(stdout, b"tail".to_vec());
731+
assert_eq!(code, 0);
732+
733+
Ok(())
734+
}
735+
691736
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
692737
async fn pipe_terminate_aborts_detached_readers() -> anyhow::Result<()> {
693738
if !setsid_available() {

codex-rs/windows-sandbox-rs/src/elevated/command_runner_win.rs

Lines changed: 117 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use codex_windows_sandbox::ErrorPayload;
1616
use codex_windows_sandbox::ExitPayload;
1717
use codex_windows_sandbox::FramedMessage;
1818
use codex_windows_sandbox::LaunchDesktop;
19+
use codex_windows_sandbox::LocalSid;
1920
use codex_windows_sandbox::Message;
2021
use codex_windows_sandbox::OutputPayload;
2122
use codex_windows_sandbox::OutputStream;
@@ -27,7 +28,6 @@ use codex_windows_sandbox::SpawnRequest;
2728
use codex_windows_sandbox::StderrMode;
2829
use codex_windows_sandbox::StdinMode;
2930
use codex_windows_sandbox::allow_null_device;
30-
use codex_windows_sandbox::convert_string_sid_to_sid;
3131
use codex_windows_sandbox::create_readonly_token_with_caps_from;
3232
use codex_windows_sandbox::create_workspace_write_token_with_caps_from;
3333
use codex_windows_sandbox::decode_bytes;
@@ -41,7 +41,6 @@ use codex_windows_sandbox::read_handle_loop;
4141
use codex_windows_sandbox::spawn_process_with_pipes;
4242
use codex_windows_sandbox::to_wide;
4343
use codex_windows_sandbox::write_frame;
44-
use std::ffi::c_void;
4544
use std::fs::File;
4645
use std::os::windows::io::FromRawHandle;
4746
use std::path::Path;
@@ -52,8 +51,7 @@ use std::sync::Mutex as StdMutex;
5251
use windows_sys::Win32::Foundation::CloseHandle;
5352
use windows_sys::Win32::Foundation::GetLastError;
5453
use windows_sys::Win32::Foundation::HANDLE;
55-
use windows_sys::Win32::Foundation::HLOCAL;
56-
use windows_sys::Win32::Foundation::LocalFree;
54+
use windows_sys::Win32::Foundation::INVALID_HANDLE_VALUE;
5755
use windows_sys::Win32::Storage::FileSystem::CreateFileW;
5856
use windows_sys::Win32::Storage::FileSystem::FILE_GENERIC_READ;
5957
use windows_sys::Win32::Storage::FileSystem::FILE_GENERIC_WRITE;
@@ -94,23 +92,58 @@ struct IpcSpawnedProcess {
9492
_pipe_handles: Option<PipeSpawnHandles>,
9593
}
9694

95+
/// Small RAII wrapper for raw Win32 handles.
96+
///
97+
/// The elevated runner has a few early-return paths where we acquire a token, job, or pipe
98+
/// handle and then may fail while preparing the child. Keeping those handles in a guard makes
99+
/// the error paths read more directly and closes the gaps that were previously leaking them.
100+
struct OwnedWinHandle(HANDLE);
101+
102+
impl OwnedWinHandle {
103+
fn new(handle: HANDLE) -> Self {
104+
Self(handle)
105+
}
106+
107+
fn raw(&self) -> HANDLE {
108+
self.0
109+
}
110+
111+
fn into_raw(mut self) -> HANDLE {
112+
// Transfer ownership to the caller. After this point the caller is responsible for
113+
// eventually closing the returned HANDLE.
114+
let handle = self.0;
115+
self.0 = 0;
116+
handle
117+
}
118+
}
119+
120+
impl Drop for OwnedWinHandle {
121+
fn drop(&mut self) {
122+
if self.0 != 0 && self.0 != INVALID_HANDLE_VALUE {
123+
unsafe {
124+
CloseHandle(self.0);
125+
}
126+
}
127+
}
128+
}
129+
97130
unsafe fn create_job_kill_on_close() -> Result<HANDLE> {
98-
let h = CreateJobObjectW(std::ptr::null_mut(), std::ptr::null());
99-
if h == 0 {
131+
let h_job = OwnedWinHandle::new(CreateJobObjectW(std::ptr::null_mut(), std::ptr::null()));
132+
if h_job.raw() == 0 {
100133
return Err(anyhow::anyhow!("CreateJobObjectW failed"));
101134
}
102135
let mut limits: JOBOBJECT_EXTENDED_LIMIT_INFORMATION = std::mem::zeroed();
103136
limits.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
104137
let ok = SetInformationJobObject(
105-
h,
138+
h_job.raw(),
106139
JobObjectExtendedLimitInformation,
107140
&mut limits as *mut _ as *mut _,
108141
std::mem::size_of::<JOBOBJECT_EXTENDED_LIMIT_INFORMATION>() as u32,
109142
);
110143
if ok == 0 {
111144
return Err(anyhow::anyhow!("SetInformationJobObject failed"));
112145
}
113-
Ok(h)
146+
Ok(h_job.into_raw())
114147
}
115148

116149
/// Open a named pipe created by the parent process.
@@ -190,45 +223,42 @@ fn spawn_ipc_process(req: &SpawnRequest) -> Result<IpcSpawnedProcess> {
190223
let log_dir = req.codex_home.clone();
191224
hide_current_user_profile_dir(req.codex_home.as_path());
192225
let policy = parse_policy(&req.policy_json_or_preset).context("parse policy_json_or_preset")?;
193-
let mut cap_psids: Vec<*mut c_void> = Vec::new();
226+
let mut cap_psids: Vec<LocalSid> = Vec::new();
194227
for sid in &req.cap_sids {
195-
let Some(psid) = (unsafe { convert_string_sid_to_sid(sid) }) else {
196-
anyhow::bail!("ConvertStringSidToSidW failed for capability SID");
197-
};
198-
cap_psids.push(psid);
228+
cap_psids.push(
229+
LocalSid::from_string(sid)
230+
.context("ConvertStringSidToSidW failed for capability SID")?,
231+
);
199232
}
200233
if cap_psids.is_empty() {
201234
anyhow::bail!("runner: empty capability SID list");
202235
}
203236

204-
let base = unsafe { get_current_token_for_restriction()? };
205-
let token_res: Result<(HANDLE, *mut c_void)> = unsafe {
237+
// The token helpers still take raw SID pointers, but we keep ownership in `LocalSid`
238+
// wrappers for as long as possible. That way any failure after SID parsing but before the
239+
// child is fully spawned still releases the backing LocalAlloc memory automatically.
240+
let cap_psid_ptrs: Vec<*mut _> = cap_psids.iter().map(LocalSid::as_ptr).collect();
241+
let base = OwnedWinHandle::new(unsafe { get_current_token_for_restriction()? });
242+
let h_token = OwnedWinHandle::new(unsafe {
206243
match &policy {
207244
SandboxPolicy::ReadOnly { .. } => {
208-
create_readonly_token_with_caps_from(base, &cap_psids)
209-
.map(|h_token| (h_token, cap_psids[0]))
245+
create_readonly_token_with_caps_from(base.raw(), &cap_psid_ptrs)
210246
}
211247
SandboxPolicy::WorkspaceWrite { .. } => {
212-
create_workspace_write_token_with_caps_from(base, &cap_psids)
213-
.map(|h_token| (h_token, cap_psids[0]))
248+
create_workspace_write_token_with_caps_from(base.raw(), &cap_psid_ptrs)
214249
}
215250
SandboxPolicy::DangerFullAccess | SandboxPolicy::ExternalSandbox { .. } => {
216251
unreachable!()
217252
}
218253
}
219-
};
220-
let (h_token, psid_to_use) = token_res?;
254+
}?);
221255
unsafe {
222-
CloseHandle(base);
223-
allow_null_device(psid_to_use);
224-
for psid in &cap_psids {
256+
// These ACL adjustments need the raw SID values, but ownership stays with `cap_psids`.
257+
// We do not manually `LocalFree` anything here; the wrappers handle every return path.
258+
allow_null_device(cap_psid_ptrs[0]);
259+
for psid in &cap_psid_ptrs {
225260
allow_null_device(*psid);
226261
}
227-
for psid in cap_psids {
228-
if !psid.is_null() {
229-
LocalFree(psid as HLOCAL);
230-
}
231-
}
232262
}
233263

234264
let effective_cwd = effective_cwd(&req.cwd, Some(log_dir.as_path()));
@@ -238,7 +268,7 @@ fn spawn_ipc_process(req: &SpawnRequest) -> Result<IpcSpawnedProcess> {
238268
let mut pipe_handles = None;
239269
let (pi, stdout_handle, stderr_handle, stdin_handle) = if req.tty {
240270
let (pi, conpty) = codex_windows_sandbox::spawn_conpty_process_as_user(
241-
h_token,
271+
h_token.raw(),
242272
&req.command,
243273
&effective_cwd,
244274
&req.env,
@@ -269,7 +299,7 @@ fn spawn_ipc_process(req: &SpawnRequest) -> Result<IpcSpawnedProcess> {
269299
StdinMode::Closed
270300
};
271301
let spawned_pipes: PipeSpawnHandles = spawn_process_with_pipes(
272-
h_token,
302+
h_token.raw(),
273303
&req.command,
274304
&effective_cwd,
275305
&req.env,
@@ -287,10 +317,6 @@ fn spawn_ipc_process(req: &SpawnRequest) -> Result<IpcSpawnedProcess> {
287317
pipe_handles = Some(spawned_pipes);
288318
(pi, stdout_handle, stderr_handle, stdin_handle)
289319
};
290-
291-
unsafe {
292-
CloseHandle(h_token);
293-
}
294320
Ok(IpcSpawnedProcess {
295321
log_dir,
296322
pi,
@@ -337,7 +363,7 @@ fn spawn_input_loop(
337363
stdin_handle: Option<HANDLE>,
338364
hpc_handle: Arc<StdMutex<Option<HANDLE>>>,
339365
process_handle: Arc<StdMutex<Option<HANDLE>>>,
340-
_log_dir: Option<PathBuf>,
366+
log_dir: Option<PathBuf>,
341367
) -> std::thread::JoinHandle<()> {
342368
std::thread::spawn(move || {
343369
let mut stdin_handle = stdin_handle;
@@ -353,15 +379,54 @@ fn spawn_input_loop(
353379
continue;
354380
};
355381
if let Some(handle) = stdin_handle {
356-
let mut written: u32 = 0;
357-
unsafe {
358-
let _ = windows_sys::Win32::Storage::FileSystem::WriteFile(
359-
handle,
360-
bytes.as_ptr(),
361-
bytes.len() as u32,
362-
&mut written,
363-
ptr::null_mut(),
364-
);
382+
let mut offset = 0usize;
383+
// `WriteFile` can report success after consuming only part of the buffer
384+
// when the target is a pipe. Treat this like a normal partial write and
385+
// keep advancing until every decoded stdin byte has been forwarded.
386+
//
387+
// If the child closes stdin or the pipe enters an error state, we log
388+
// that fact, close our local HANDLE, and stop trying to forward later
389+
// `Stdin` frames. That prevents silent truncation while also avoiding an
390+
// endless stream of failing writes after the child is already gone.
391+
while offset < bytes.len() {
392+
let chunk = &bytes[offset..];
393+
let chunk_len = chunk.len().min(u32::MAX as usize);
394+
let mut written = 0u32;
395+
let ok = unsafe {
396+
windows_sys::Win32::Storage::FileSystem::WriteFile(
397+
handle,
398+
chunk.as_ptr(),
399+
chunk_len as u32,
400+
&mut written,
401+
ptr::null_mut(),
402+
)
403+
};
404+
if ok == 0 {
405+
log_note(
406+
&format!(
407+
"runner stdin write failed after {offset} bytes: {}",
408+
unsafe { GetLastError() }
409+
),
410+
log_dir.as_deref(),
411+
);
412+
unsafe {
413+
CloseHandle(handle);
414+
}
415+
stdin_handle = None;
416+
break;
417+
}
418+
if written == 0 {
419+
log_note(
420+
"runner stdin write made no progress; closing child stdin",
421+
log_dir.as_deref(),
422+
);
423+
unsafe {
424+
CloseHandle(handle);
425+
}
426+
stdin_handle = None;
427+
break;
428+
}
429+
offset += written as usize;
365430
}
366431
}
367432
}
@@ -432,11 +497,14 @@ pub fn main() -> Result<()> {
432497
anyhow::bail!("runner: no pipe-out provided");
433498
};
434499

435-
let h_pipe_in = open_pipe(&pipe_in, FILE_GENERIC_READ)?;
436-
let h_pipe_out = open_pipe(&pipe_out, FILE_GENERIC_WRITE)?;
437-
let mut pipe_read = unsafe { File::from_raw_handle(h_pipe_in as _) };
500+
// Open both pipe ends under guards first so a failure on the second open cannot leak the
501+
// first HANDLE. Only after both opens succeed do we transfer ownership into `File`, which
502+
// then becomes responsible for closing them.
503+
let h_pipe_in = OwnedWinHandle::new(open_pipe(&pipe_in, FILE_GENERIC_READ)?);
504+
let h_pipe_out = OwnedWinHandle::new(open_pipe(&pipe_out, FILE_GENERIC_WRITE)?);
505+
let mut pipe_read = unsafe { File::from_raw_handle(h_pipe_in.into_raw() as _) };
438506
let pipe_write = Arc::new(StdMutex::new(unsafe {
439-
File::from_raw_handle(h_pipe_out as _)
507+
File::from_raw_handle(h_pipe_out.into_raw() as _)
440508
}));
441509

442510
let req = match read_spawn_request(&mut pipe_read) {

0 commit comments

Comments
 (0)