Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: process manager tests tty #7156

Merged
merged 2 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions crates/turborepo-lib/src/process/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ impl From<Command> for portable_pty::CommandBuilder {
cmd.args(args);
if let Some(cwd) = cwd {
cmd.cwd(cwd.as_std_path());
} else if let Ok(cwd) = std::env::current_dir() {
// portably_pty defaults to a users home directory instead of cwd if one isn't
// configured on the command builder.
// We explicitly set the cwd if one exists to avoid this behavior
cmd.cwd(&cwd);
}
for (key, value) in env {
cmd.env(key, value);
Expand Down
55 changes: 33 additions & 22 deletions crates/turborepo-lib/src/process/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ pub use self::child::{Child, ChildExit};
/// using `spawn`. When the manager is Closed, all currently-running children
/// will be closed, and no new children can be spawned.
#[derive(Debug, Clone)]
pub struct ProcessManager(Arc<Mutex<ProcessManagerInner>>);
pub struct ProcessManager {
state: Arc<Mutex<ProcessManagerInner>>,
use_pty: bool,
}

#[derive(Debug)]
struct ProcessManagerInner {
Expand All @@ -39,11 +42,22 @@ struct ProcessManagerInner {
}

impl ProcessManager {
pub fn new() -> Self {
Self(Arc::new(Mutex::new(ProcessManagerInner {
is_closing: false,
children: Vec::new(),
})))
pub fn new(use_pty: bool) -> Self {
Self {
state: Arc::new(Mutex::new(ProcessManagerInner {
is_closing: false,
children: Vec::new(),
})),
use_pty,
}
}

/// Construct a process manager and infer if pty should be used
pub fn infer() -> Self {
// Only use PTY if we're not on windows and we're currently hooked up to a
// in a TTY
let use_pty = !cfg!(windows) && atty::is(atty::Stream::Stdout);
Self::new(use_pty)
}
}

Expand All @@ -61,17 +75,14 @@ impl ProcessManager {
command: Command,
stop_timeout: Duration,
) -> Option<io::Result<child::Child>> {
let mut lock = self.0.lock().unwrap();
let mut lock = self.state.lock().unwrap();
if lock.is_closing {
return None;
}
// Only use PTY if we're not on windows and we're currently hooked up to a
// in a TTY
let use_pty = !cfg!(windows) && atty::is(atty::Stream::Stdout);
let child = child::Child::spawn(
command,
child::ShutdownStyle::Graceful(stop_timeout),
use_pty,
self.use_pty,
);
if let Ok(child) = &child {
lock.children.push(child.clone());
Expand Down Expand Up @@ -108,7 +119,7 @@ impl ProcessManager {
let mut set = JoinSet::new();

{
let mut lock = self.0.lock().expect("not poisoned");
let mut lock = self.state.lock().expect("not poisoned");
lock.is_closing = true;
for child in lock.children.iter() {
let child = child.clone();
Expand All @@ -123,7 +134,7 @@ impl ProcessManager {
}

{
let mut lock = self.0.lock().expect("not poisoned");
let mut lock = self.state.lock().expect("not poisoned");

// just allocate a new vec rather than clearing the old one
lock.children = vec![];
Expand Down Expand Up @@ -155,7 +166,7 @@ mod test {

#[tokio::test]
async fn test_basic() {
let manager = ProcessManager::new();
let manager = ProcessManager::new(false);
let mut child = manager
.spawn(get_script_command("hello_world.js"), Duration::from_secs(2))
.unwrap()
Expand All @@ -168,7 +179,7 @@ mod test {

#[tokio::test]
async fn test_multiple() {
let manager = ProcessManager::new();
let manager = ProcessManager::new(false);

let children = (0..2)
.map(|_| {
Expand All @@ -191,7 +202,7 @@ mod test {

#[tokio::test]
async fn test_closed() {
let manager = ProcessManager::new();
let manager = ProcessManager::new(false);
let mut child = manager
.spawn(get_command(), Duration::from_secs(2))
.unwrap()
Expand All @@ -218,7 +229,7 @@ mod test {

#[tokio::test]
async fn test_exit_code() {
let manager = ProcessManager::new();
let manager = ProcessManager::new(false);
let mut child = manager
.spawn(get_script_command("hello_world.js"), Duration::from_secs(2))
.unwrap()
Expand All @@ -235,7 +246,7 @@ mod test {
#[tokio::test]
#[traced_test]
async fn test_message_after_stop() {
let manager = ProcessManager::new();
let manager = ProcessManager::new(false);
let mut child = manager
.spawn(get_script_command("hello_world.js"), Duration::from_secs(2))
.unwrap()
Expand All @@ -258,14 +269,14 @@ mod test {

#[tokio::test]
async fn test_reuse_manager() {
let manager = ProcessManager::new();
let manager = ProcessManager::new(false);
manager.spawn(get_command(), Duration::from_secs(2));

sleep(Duration::from_millis(100)).await;

manager.stop().await;

assert!(manager.0.lock().unwrap().children.is_empty());
assert!(manager.state.lock().unwrap().children.is_empty());

// TODO: actually do some check that this is idempotent
// idempotent
Expand All @@ -280,7 +291,7 @@ mod test {
script: &str,
expected: Option<ChildExit>,
) {
let manager = ProcessManager::new();
let manager = ProcessManager::new(false);
let tasks = FuturesUnordered::new();

for _ in 0..10 {
Expand Down Expand Up @@ -315,7 +326,7 @@ mod test {

#[tokio::test]
async fn test_wait_multiple_tasks() {
let manager = ProcessManager::new();
let manager = ProcessManager::new(false);

let mut out = Vec::new();
let mut child = manager
Expand Down
2 changes: 1 addition & 1 deletion crates/turborepo-lib/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub struct Run {

impl Run {
pub fn new(base: CommandBase, api_auth: Option<APIAuth>) -> Result<Self, Error> {
let processes = ProcessManager::new();
let processes = ProcessManager::infer();
let mut opts: Opts = base.args().try_into()?;
let config = base.config()?;
let is_linked = turborepo_api_client::is_linked(&api_auth);
Expand Down