Skip to content

Commit

Permalink
Create new process group for rebuilder backend
Browse files Browse the repository at this point in the history
  • Loading branch information
kpcyrd committed Dec 10, 2021
1 parent 9b58682 commit dcaeef9
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 9 deletions.
2 changes: 1 addition & 1 deletion worker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ async fn main() -> Result<()> {
}],
input_url: build.input_url,
backend,
build: config::Build::default(),
build: config.build,
diffoscope,
privkey: &profile.privkey,
}).await?;
Expand Down
40 changes: 32 additions & 8 deletions worker/src/proc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,18 @@ impl Capture {
Ok(())
}

fn kill(pid: u32, signal: Signal) -> Result<()> {
// multiply with -1 to kill grand-children too
let pid = pid as i32 * -1;
info!("Sending {} to child(pid={})", signal, pid);
signal::kill(Pid::from_raw(pid), signal)?;
Ok(())
}

async fn truncate(&mut self, child: &mut Child, reason: &str, kill: bool) -> Result<()> {
if kill {
if let Some(pid) = child.id() {
info!("Sending SIGTERM to child(pid={})", pid);
signal::kill(Pid::from_raw(pid as i32), Signal::SIGTERM)?;
Self::kill(pid, Signal::SIGTERM)?;
}
self.sigterm_sent = Some(Instant::now());
}
Expand All @@ -87,14 +94,16 @@ impl Capture {
Ok(())
}

pub async fn next_wakeup(&mut self, child: &mut Child) -> Result<Duration> {
pub async fn next_wakeup(&mut self, child: &mut Child, stdout_open: &mut bool, stderr_open: &mut bool) -> Result<Duration> {
// check if we need to SIGKILL due to SIGTERM timeout
if let Some(sigterm_sent) = self.sigterm_sent {
if sigterm_sent.elapsed() > Duration::from_secs(SIGKILL_DELAY) {
if let Some(pid) = child.id() {
warn!("child(pid={}) didn't terminate {}s after SIGTERM, sending SIGKILL", pid, SIGKILL_DELAY);
// child.id is going to return None after this
child.kill().await?;
Self::kill(pid, Signal::SIGKILL)?;
*stdout_open = false;
*stderr_open = false;
}
}
}
Expand All @@ -118,13 +127,26 @@ pub async fn run<I, S>(bin: &Path, args: I, opts: Options) -> Result<(bool, Stri
S: AsRef<OsStr>,
{
info!("Running {:?} {:?}", bin, args);
let mut child = Command::new(bin)
let mut cmd = Command::new(bin);
cmd
.args(args)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.envs(&opts.envs)
.spawn()?;
.envs(&opts.envs);

unsafe {
cmd.pre_exec(|| {
// create a new process group
let pid = nix::unistd::getpid();
if let Err(err) = nix::unistd::setpgid(pid, Pid::from_raw(0)) {
warn!("Failed to create new process group: {:#?}", err);
}
Ok(())
});
}

let mut child = cmd.spawn()?;

let mut child_stdout = child.stdout.take().unwrap();
let mut child_stderr = child.stderr.take().unwrap();
Expand All @@ -140,12 +162,13 @@ pub async fn run<I, S>(bin: &Path, args: I, opts: Options) -> Result<(bool, Stri
let mut stderr_open = true;
let mut cap = capture(opts);
let (success, output) = loop {
let remaining = cap.next_wakeup(&mut child).await?;
let remaining = cap.next_wakeup(&mut child, &mut stdout_open, &mut stderr_open).await?;

if stdout_open || stderr_open {
select! {
n = child_stdout.read(&mut buf_stdout).fuse() => {
let n = n?;
trace!("read stdout: {}", n);
if n == 0 {
stdout_open = false;
} else {
Expand All @@ -157,6 +180,7 @@ pub async fn run<I, S>(bin: &Path, args: I, opts: Options) -> Result<(bool, Stri
},
n = child_stderr.read(&mut buf_stderr).fuse() => {
let n = n?;
trace!("read stderr: {}", n);
if n == 0 {
stderr_open = false;
} else {
Expand Down

0 comments on commit dcaeef9

Please sign in to comment.