Skip to content

Commit

Permalink
Merge pull request psychon#438 from psychon/write-buf-vec-deque
Browse files Browse the repository at this point in the history
Use `VecDeque` instead of `Vec` in `BufWriteFD`
  • Loading branch information
mergify[bot] committed May 23, 2020
2 parents 10c4d2e + 52194cb commit e3d7ab5
Showing 1 changed file with 26 additions and 26 deletions.
52 changes: 26 additions & 26 deletions src/rust_connection/fd_read_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
//! This module contains variants of `std::io::Read` and `std::io::Write` that also support passing
//! file descriptors.

use std::io::{Error, ErrorKind, IoSlice, Read, Result, Write};
use std::collections::VecDeque;
use std::io::{Error, ErrorKind, IoSlice, Read, Result};

use crate::utils::RawFdContainer;

Expand Down Expand Up @@ -80,7 +81,7 @@ pub trait WriteFD: Poll {
#[derive(Debug)]
pub struct BufWriteFD<W: WriteFD> {
inner: W,
data_buf: Vec<u8>,
data_buf: VecDeque<u8>,
fd_buf: Vec<RawFdContainer>,
}

Expand All @@ -96,7 +97,7 @@ impl<W: WriteFD> BufWriteFD<W> {
pub fn with_capacity(capacity: usize, inner: W) -> Self {
Self {
inner,
data_buf: Vec::with_capacity(capacity),
data_buf: VecDeque::with_capacity(capacity),
fd_buf: Vec::new(),
}
}
Expand All @@ -116,40 +117,34 @@ impl<W: WriteFD> BufWriteFD<W> {
}

fn flush_buffer(&mut self) -> Result<()> {
let mut written = 0;
let mut ret = Ok(());
while written < self.data_buf.len() || !self.fd_buf.is_empty() {
match self
.inner
.write(&self.data_buf[written..], &mut self.fd_buf)
{
while !self.data_buf.is_empty() || !self.fd_buf.is_empty() {
let data_bufs = self.data_buf.as_slices();
let data_bufs = [IoSlice::new(data_bufs.0), IoSlice::new(data_bufs.1)];
match self.inner.write_vectored(&data_bufs, &mut self.fd_buf) {
Ok(0) => {
if written == self.data_buf.len() {
if self.data_buf.is_empty() {
assert!(!self.fd_buf.is_empty());
ret = Err(Error::new(
return Err(Error::new(
ErrorKind::WriteZero,
"failed to write the buffered FDs",
));
} else {
ret = Err(Error::new(
return Err(Error::new(
ErrorKind::WriteZero,
"failed to write the buffered data",
));
}
break;
}
Ok(n) => written += n,
Ok(n) => {
let _ = self.data_buf.drain(..n);
}
Err(ref e) if e.kind() == ErrorKind::Interrupted => {}
Err(e) => {
ret = Err(e);
break;
return Err(e);
}
}
}
if written > 0 {
let _ = self.data_buf.drain(..written);
}
ret
Ok(())
}

fn write_helper<F, G>(
Expand All @@ -161,7 +156,7 @@ impl<W: WriteFD> BufWriteFD<W> {
to_write_length: usize,
) -> Result<usize>
where
F: FnOnce(&mut Vec<u8>) -> Result<usize>,
F: FnOnce(&mut VecDeque<u8>),
G: FnOnce(&mut W, &mut Vec<RawFdContainer>) -> Result<usize>,
{
self.fd_buf.append(fds);
Expand All @@ -180,7 +175,7 @@ impl<W: WriteFD> BufWriteFD<W> {
return Err(e);
} else {
let n_to_write = first_buffer.len().min(available_buf);
let _ = self.data_buf.write(&first_buffer[..n_to_write]).unwrap();
self.data_buf.extend(&first_buffer[..n_to_write]);
// Return `Ok` because some or all data has been buffered,
// so from the outside it is seen as a successful write.
return Ok(n_to_write);
Expand All @@ -201,7 +196,8 @@ impl<W: WriteFD> BufWriteFD<W> {
write_inner(&mut self.inner, &mut self.fd_buf)
} else {
// At this point there is enough space available in the buffer.
write_buffer(&mut self.data_buf)
write_buffer(&mut self.data_buf);
Ok(to_write_length)
}
}
}
Expand All @@ -210,7 +206,7 @@ impl<W: WriteFD> WriteFD for BufWriteFD<W> {
fn write(&mut self, buf: &[u8], fds: &mut Vec<RawFdContainer>) -> Result<usize> {
self.write_helper(
fds,
|w| w.write(buf),
|w| w.extend(buf),
|w, fd| w.write(buf, fd),
buf,
buf.len(),
Expand All @@ -229,7 +225,11 @@ impl<W: WriteFD> WriteFD for BufWriteFD<W> {
let total_len = bufs.iter().map(|b| b.len()).sum();
self.write_helper(
fds,
|w| w.write_vectored(bufs),
|w| {
for buf in bufs.iter() {
w.extend(&**buf);
}
},
|w, fd| w.write_vectored(bufs, fd),
first_nonempty,
total_len,
Expand Down

0 comments on commit e3d7ab5

Please sign in to comment.