Skip to content

Commit

Permalink
agent,runtime-rs,runk: fix fmt and clippy warnings
Browse files Browse the repository at this point in the history
Fix rustfmt and clippy warnings detected by CI.

Fixes: #6714
Signed-off-by: Zixuan Tan <tanzixuan.me@gmail.com>
  • Loading branch information
frezcirno committed Jan 15, 2024
1 parent a5d0ea8 commit 0ff8dd3
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 37 deletions.
2 changes: 1 addition & 1 deletion src/agent/rustjail/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ cgroups = { package = "cgroups-rs", version = "0.3.3" }
rlimit = "0.5.3"
cfg-if = "0.1.0"

tokio = { version = "1.28.1", features = ["sync", "io-util", "process", "time", "macros", "rt"] }
tokio = { version = "1.28.1", features = ["sync", "io-util", "process", "time", "macros", "rt", "fs"] }
tokio-vsock = "0.3.1"
futures = "0.3.17"
async-trait = "0.1.31"
Expand Down
17 changes: 11 additions & 6 deletions src/agent/rustjail/src/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1015,7 +1015,7 @@ impl BaseContainer for LinuxContainer {
break;
}
Ok(n) => {
if let Err(_) = term_master.write_all(&buf[..n]).await {
if term_master.write_all(&buf[..n]).await.is_err() {
break;
}
}
Expand All @@ -1035,11 +1035,12 @@ impl BaseContainer for LinuxContainer {
});
}

// Copy from term_master to stdout
if let Some(mut stdout_stream) = proc_io.stdout.take() {
let wgw_output = proc_io.wg_output.worker();
let mut term_master = unsafe { File::from_raw_fd(pseudo.master) };
let logger = logger.clone();
let term_closer = term_closer.clone();
let term_closer = term_closer;
tokio::spawn(async move {
let res = tokio::io::copy(&mut term_master, &mut stdout_stream).await;
debug!(logger, "copy from term_master to stdout end: {:?}", res);
Expand All @@ -1050,6 +1051,7 @@ impl BaseContainer for LinuxContainer {
}
}
} else {
// not using a terminal
let stdin = p.stdin.unwrap();
let stdout = p.stdout.unwrap();
let stderr = p.stderr.unwrap();
Expand All @@ -1058,8 +1060,11 @@ impl BaseContainer for LinuxContainer {
child_stderr = unsafe { std::process::Stdio::from_raw_fd(stderr) };

if let Some(proc_io) = &mut p.proc_io {
// Copy from stdin to parent_stdin
// Here we copy from vsock stdin stream to parent_stdin manually.
// This is because we need to close the stdin fifo when the stdin stream
// is drained.
if let Some(mut stdin_stream) = proc_io.stdin.take() {
info!(logger, "copy from stdin to parent_stdin");
let mut parent_stdin = unsafe { File::from_raw_fd(p.parent_stdin.unwrap()) };
let mut close_stdin_rx = proc_io.close_stdin_rx.clone();
let wgw_input = proc_io.wg_input.worker();
Expand All @@ -1073,11 +1078,11 @@ impl BaseContainer for LinuxContainer {
res = stdin_stream.read(&mut buf) => {
match res {
Err(_) | Ok(0) => {
debug!(logger, "copy from stdin to term_master end: {:?}", res);
info!(logger, "copy from stdin to term_master end: {:?}", res);
break;
}
Ok(n) => {
if let Err(_) = parent_stdin.write_all(&buf[..n]).await {
if parent_stdin.write_all(&buf[..n]).await.is_err() {
break;
}
}
Expand All @@ -1086,7 +1091,7 @@ impl BaseContainer for LinuxContainer {
// As the stdin fifo is opened in RW mode in the shim, which will never
// read EOF, we close the stdin fifo here when explicit requested.
_ = close_stdin_rx.changed() => {
debug!(logger, "copy ends as requested");
info!(logger, "copy ends as requested");
break
}
}
Expand Down
10 changes: 5 additions & 5 deletions src/agent/rustjail/src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

use libc::pid_t;
use std::fs::File;
use std::os::unix::io::{AsRawFd, RawFd, IntoRawFd};
use std::os::unix::io::{AsRawFd, IntoRawFd, RawFd};
use tokio::sync::mpsc::Sender;
use tokio_vsock::VsockStream;

Expand Down Expand Up @@ -195,7 +195,7 @@ impl Process {
p.parent_stdin = Some(pstdin);
p.stdin = Some(stdin);

if let Some(stdout) = p.proc_io.as_mut().map(|io| io.stdout.take()).flatten() {
if let Some(stdout) = p.proc_io.as_mut().and_then(|io| io.stdout.take()) {
let fd = stdout.into_raw_fd();
// The stdout/stderr of the process should be blocking, otherwise
// the process may encounter EAGAIN error when writing to stdout/stderr.
Expand All @@ -207,7 +207,7 @@ impl Process {
p.stdout = Some(stdout);
}

if let Some(stderr) = p.proc_io.as_mut().map(|io| io.stderr.take()).flatten() {
if let Some(stderr) = p.proc_io.as_mut().and_then(|io| io.stderr.take()) {
let fd = stderr.into_raw_fd();
set_blocking(fd)?;
p.stderr = Some(fd);
Expand Down Expand Up @@ -241,8 +241,8 @@ impl Process {
}

pub fn cleanup_process_stream(&mut self) {
if let Some(_) = self.proc_io.take() {
// taken
if let Some(proc_io) = self.proc_io.take() {
drop(proc_io);

return;
}
Expand Down
10 changes: 4 additions & 6 deletions src/agent/src/passfd_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@ pub(crate) async fn start_listen(port: u32) -> Result<()> {
let mut listener = VsockListener::bind(libc::VMADDR_CID_ANY, port)?;
tokio::spawn(async move {
loop {
if let Ok((stream, peer_addr)) = listener.accept().await {
if let Vsock(addr) = peer_addr {
let port = addr.port();
info!(sl(), "accept connection from peer port {}", port);
HVSOCK_STREAMS.lock().await.insert(port, stream);
}
if let Ok((stream, Vsock(addr))) = listener.accept().await {
let port = addr.port();
info!(sl(), "accept connection from peer port {}", port);
HVSOCK_STREAMS.lock().await.insert(port, stream);
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,20 +210,17 @@ impl Container {
.init_process
.passfd_io
.as_ref()
.map(|io| io.stdin_port)
.flatten(),
.and_then(|io| io.stdin_port),
stdout_port: inner
.init_process
.passfd_io
.as_ref()
.map(|io| io.stdout_port)
.flatten(),
.and_then(|io| io.stdout_port),
stderr_port: inner
.init_process
.passfd_io
.as_ref()
.map(|io| io.stderr_port)
.flatten(),
.and_then(|io| io.stderr_port),
..Default::default()
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,24 +87,17 @@ impl ContainerInner {
process_id: process.clone().into(),
string_user: None,
process: Some(exec.oci_process.clone()),
stdin_port: exec
.process
.passfd_io
.as_ref()
.map(|io| io.stdin_port)
.flatten(),
stdin_port: exec.process.passfd_io.as_ref().and_then(|io| io.stdin_port),
stdout_port: exec
.process
.passfd_io
.as_ref()
.map(|io| io.stdout_port)
.flatten(),
.and_then(|io| io.stdout_port),
stderr_port: exec
.process
.passfd_io
.as_ref()
.map(|io| io.stderr_port)
.flatten(),
.and_then(|io| io.stderr_port),
})
.await
.context("exec process")?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl PassfdIo {
.read(true)
.write(true)
.custom_flags(libc::O_NONBLOCK)
.open(&stdin)
.open(stdin)
.context("open stdin")?;

let hostport = passfd_connect(uds_path, passfd_port, fin.into())
Expand All @@ -99,7 +99,7 @@ impl PassfdIo {
let fout = OpenOptions::new()
.write(true)
.custom_flags(libc::O_NONBLOCK)
.open(&stdout)
.open(stdout)
.context("open stdout")?;

let hostport = passfd_connect(uds_path, passfd_port, fout.into())
Expand All @@ -115,7 +115,7 @@ impl PassfdIo {
let ferr = OpenOptions::new()
.write(true)
.custom_flags(libc::O_NONBLOCK)
.open(&stderr)
.open(stderr)
.context("open stderr")?;

let hostport = passfd_connect(uds_path, passfd_port, ferr.into())
Expand Down
1 change: 1 addition & 0 deletions src/tools/runk/libcontainer/src/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ impl ContainerLauncher {
&self.id,
self.init,
0,
None,
)?)
} else {
Err(anyhow!("no process configuration"))
Expand Down

0 comments on commit 0ff8dd3

Please sign in to comment.