Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 24 additions & 10 deletions monarch_hyperactor/src/code_sync/rsync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use tokio::process::Child;
use tokio::process::Command;
#[cfg(feature = "packaged_rsync")]
use tokio::sync::OnceCell;
use tracing::warn;

use crate::code_sync::WorkspaceLocation;

Expand All @@ -69,7 +70,7 @@ async fn get_rsync_bin_path() -> Result<&'static Path> {
Ok(RSYNC_BIN_PATH
.get_or_try_init(|| async {
tokio::task::spawn_blocking(|| {
let mut tmp = tempfile::NamedTempFile::new()?;
let mut tmp = tempfile::NamedTempFile::with_prefix("rsync.")?;
let rsync_bin = include_bytes!("rsync.bin");
tmp.write_all(rsync_bin)?;
let bin_path = tmp.into_temp_path();
Expand Down Expand Up @@ -239,7 +240,6 @@ pub async fn do_rsync(addr: &SocketAddr, workspace: &Path) -> Result<RsyncResult
#[derive(Debug)]
pub struct RsyncDaemon {
child: Child,
#[allow(unused)]
state: TempDir,
addr: SocketAddr,
}
Expand Down Expand Up @@ -284,8 +284,7 @@ impl RsyncDaemon {
.arg(format!("--address={}", addr.ip()))
.arg(format!("--port={}", addr.port()))
.arg(format!("--config={}", config.display()))
//.arg(format!("--log-file={}/log", state.path().display()))
.arg("--log-file=/dev/stderr")
.arg(format!("--log-file={}/log", state.path().display()))
.kill_on_drop(true)
.spawn()?;

Expand Down Expand Up @@ -315,14 +314,15 @@ impl RsyncDaemon {
&self.addr
}

pub async fn shutdown(mut self) -> Result<()> {
pub async fn shutdown(mut self) -> Result<String> {
let logs = fs::read_to_string(self.state.path().join("log")).await;
let id = self.child.id().context("missing pid")?;
let pid = Pid::from_raw(id as i32);
signal::kill(pid, Signal::SIGINT)?;
let status = self.child.wait().await?;
// rsync exists with 20 when sent SIGINT.
ensure!(status.code() == Some(20));
Ok(())
Ok(logs?)
}
}

Expand Down Expand Up @@ -412,7 +412,7 @@ where
let instance = actor_mesh.proc_mesh().client();
let (rsync_conns_tx, rsync_conns_rx) = instance.open_port::<Connect>();

let ((), results) = try_join!(
let res = try_join!(
rsync_conns_rx
.take(actor_mesh.shape().slice().len())
.err_into::<anyhow::Error>()
Expand Down Expand Up @@ -443,11 +443,25 @@ where
.await?;
anyhow::Ok(res)
},
)?;
);

daemon.shutdown().await?;
// Kill rsync server and attempt to grab the logs.
let logs = daemon.shutdown().await;

Ok(results)
// Return results, attaching rsync daemon logs on error.
match res {
Ok(((), results)) => {
let _ = logs?;
Ok(results)
}
Err(err) => match logs {
Ok(logs) => Err(err).with_context(|| format!("rsync server logs: {}", logs)),
Err(shutdown_err) => {
warn!("failed to read logs from rsync daemon: {:?}", shutdown_err);
Err(err)
}
},
}
}

#[cfg(test)]
Expand Down
Loading