Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ensure PrefixedWriter is line buffered #7728

Merged
merged 2 commits into from Mar 14, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
128 changes: 114 additions & 14 deletions crates/turborepo-ui/src/prefixed.rs
Expand Up @@ -86,14 +86,13 @@ impl<W: Write> PrefixedUI<W> {
/// Construct a PrefixedWriter which will behave the same as `output`, but
/// without the requirement that messages be valid UTF-8
pub fn output_prefixed_writer(&mut self) -> PrefixedWriter<&mut W> {
PrefixedWriter {
prefix: self
.output_prefix
.as_ref()
.map(|prefix| prefix.to_string())
.unwrap_or_default(),
writer: &mut self.out,
}
PrefixedWriter::new(
self.ui,
self.output_prefix
.clone()
.unwrap_or_else(|| Style::new().apply_to(String::new())),
&mut self.out,
)
}
}

Expand All @@ -107,26 +106,50 @@ enum Command {

/// Wraps a writer with a prefix before the actual message.
pub struct PrefixedWriter<W> {
prefix: String,
writer: W,
inner: LineWriter<PrefixedWriterInner<W>>,
}

impl<W> Debug for PrefixedWriter<W> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PrefixedWriter")
.field("prefix", &self.prefix)
.field("prefix", &self.inner.writer.prefix)
.finish()
}
}

impl<W: Write> PrefixedWriter<W> {
pub fn new(ui: UI, prefix: StyledObject<impl Display>, writer: W) -> Self {
Self {
inner: LineWriter::new(PrefixedWriterInner::new(ui, prefix, writer)),
}
}
}

impl<W: Write> Write for PrefixedWriter<W> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.inner.write(buf)
}

fn flush(&mut self) -> std::io::Result<()> {
self.inner.flush()
}
}

/// Wraps a writer so that a prefix will be added at the start of each line.
/// Expects to only be called with complete lines.
struct PrefixedWriterInner<W> {
prefix: String,
writer: W,
}

impl<W: Write> PrefixedWriterInner<W> {
pub fn new(ui: UI, prefix: StyledObject<impl Display>, writer: W) -> Self {
let prefix = ui.apply(prefix).to_string();
Self { prefix, writer }
}
}

impl<W: Write> Write for PrefixedWriter<W> {
impl<W: Write> Write for PrefixedWriterInner<W> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let mut is_first = true;
for chunk in buf.split_inclusive(|c| *c == b'\r') {
Expand All @@ -142,6 +165,7 @@ impl<W: Write> Write for PrefixedWriter<W> {
self.writer.write_all(chunk)?;
is_first = false;
}

// We do end up writing more bytes than this to the underlying writer, but we
// cannot report this to the callers as the amount of bytes we report
// written must be less than or equal to the number of bytes in the buffer.
Expand All @@ -153,6 +177,48 @@ impl<W: Write> Write for PrefixedWriter<W> {
}
}

/// Writer that will buffer writes so the underlying writer is only called with
/// writes that end in a newline
struct LineWriter<W> {
writer: W,
buffer: Vec<u8>,
}

impl<W: Write> LineWriter<W> {
pub fn new(writer: W) -> Self {
Self {
writer,
buffer: Vec::with_capacity(512),
}
}
}

impl<W: Write> Write for LineWriter<W> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
for line in buf.split_inclusive(|c| *c == b'\n') {
if line.ends_with(b"\n") {
if self.buffer.is_empty() {
self.writer.write_all(line)?;
} else {
self.buffer.extend_from_slice(line);
self.writer.write_all(&self.buffer)?;
self.buffer.clear();
}
} else {
// This should only happen on the last chunk?
self.buffer.extend_from_slice(line)
}
}

Ok(buf.len())
}

fn flush(&mut self) -> std::io::Result<()> {
// We don't flush our buffer as that would lead to a write without a newline
self.writer.flush()
}
}

#[cfg(test)]
mod test {
use test_case::test_case;
Expand Down Expand Up @@ -196,7 +262,7 @@ mod test {
#[test_case(false, "\u{1b}[1mfoo#build: \u{1b}[0mcool!")]
fn test_prefixed_writer(strip_ansi: bool, expected: &str) {
let mut buffer = Vec::new();
let mut writer = PrefixedWriter::new(
let mut writer = PrefixedWriterInner::new(
UI::new(strip_ansi),
crate::BOLD.apply_to("foo#build: "),
&mut buffer,
Expand All @@ -213,7 +279,7 @@ mod test {
#[test_case("\n", "turbo > \n" ; "leading new line")]
fn test_prefixed_writer_cr(input: &str, expected: &str) {
let mut buffer = Vec::new();
let mut writer = PrefixedWriter::new(
let mut writer = PrefixedWriterInner::new(
UI::new(false),
Style::new().apply_to("turbo > "),
&mut buffer,
Expand All @@ -222,4 +288,38 @@ mod test {
writer.write_all(input.as_bytes()).unwrap();
assert_eq!(String::from_utf8(buffer).unwrap(), expected);
}

#[test_case(&["foo"], "" ; "no newline")]
#[test_case(&["foo\n"], "foo\n" ; "single newline")]
#[test_case(&["foo ", "bar ", "baz\n"], "foo bar baz\n" ; "building line")]
#[test_case(&["multiple\nlines\nin\none"], "multiple\nlines\nin\n" ; "multiple lines")]
fn test_line_writer(inputs: &[&str], expected: &str) {
let mut buffer = Vec::new();
let mut writer = LineWriter::new(&mut buffer);
for input in inputs {
writer.write_all(input.as_bytes()).unwrap();
}

assert_eq!(String::from_utf8(buffer).unwrap(), expected);
}

#[test]
fn test_prefixed_writer_split_lines() {
let mut buffer = Vec::new();
let mut writer = PrefixedWriter::new(
UI::new(false),
Style::new().apply_to("turbo > "),
&mut buffer,
);

writer.write_all(b"not a line yet").unwrap();
writer
.write_all(b", now\nbut \ranother one starts")
.unwrap();
writer.write_all(b" done\n").unwrap();
assert_eq!(
String::from_utf8(buffer).unwrap(),
"turbo > not a line yet, now\nturbo > but \rturbo > another one starts done\n"
);
}
}