diff --git a/crates/turborepo-ui/src/output.rs b/crates/turborepo-ui/src/output.rs index 10a8ccc8bb325..26c36130281fc 100644 --- a/crates/turborepo-ui/src/output.rs +++ b/crates/turborepo-ui/src/output.rs @@ -27,6 +27,7 @@ pub struct OutputClient { pub struct OutputWriter<'a, W> { logger: &'a OutputClient, destination: Destination, + buffer: Vec, } /// Enum for controlling the behavior of the client @@ -90,6 +91,7 @@ impl OutputClient { OutputWriter { logger: self, destination: Destination::Stdout, + buffer: Vec::new(), } } @@ -99,6 +101,7 @@ impl OutputClient { OutputWriter { logger: self, destination: Destination::Stderr, + buffer: Vec::new(), } } @@ -143,7 +146,7 @@ impl OutputClient { })) } - fn handle_bytes(&self, bytes: SinkBytes) -> io::Result { + fn handle_bytes(&self, bytes: SinkBytes) -> io::Result<()> { if matches!( self.behavior, OutputClientBehavior::InMemoryBuffer | OutputClientBehavior::Grouped @@ -163,11 +166,11 @@ impl OutputClient { self.write_bytes(bytes) } else { // If we only wrote to the buffer, then we consider it a successful write - Ok(bytes.buffer.len()) + Ok(()) } } - fn write_bytes(&self, bytes: SinkBytes) -> io::Result { + fn write_bytes(&self, bytes: SinkBytes) -> io::Result<()> { let SinkBytes { buffer: line, destination, @@ -177,7 +180,7 @@ impl OutputClient { Destination::Stdout => &mut writers.out, Destination::Stderr => &mut writers.err, }; - writer.write(&line) + writer.write_all(&line) } fn add_bytes_to_buffer(&self, bytes: SinkBytes<'static>) { @@ -191,20 +194,35 @@ impl OutputClient { impl<'a, W: Write> Write for OutputWriter<'a, W> { fn write(&mut self, buf: &[u8]) -> io::Result { - self.logger.handle_bytes(SinkBytes { - buffer: buf.into(), - destination: self.destination, - }) + for line in buf.split_inclusive(|b| *b == b'\n') { + self.buffer.extend_from_slice(line); + // If the line doesn't end in a newline we assume it isn't finished and add it + // to the buffer + if line.ends_with(b"\n") { + self.logger.handle_bytes(SinkBytes { + buffer: self.buffer.as_slice().into(), + destination: self.destination, + })?; + self.buffer.clear(); + } + } + Ok(buf.len()) } fn flush(&mut self) -> io::Result<()> { - // No buffer held by the logger writer so flush is a noop + self.logger.handle_bytes(SinkBytes { + buffer: self.buffer.as_slice().into(), + destination: self.destination, + })?; + self.buffer.clear(); Ok(()) } } #[cfg(test)] mod test { + use std::sync::Barrier; + use super::*; #[test] @@ -325,6 +343,37 @@ mod test { Ok(()) } + #[test] + fn test_loggers_wait_for_newline() { + let b1 = Arc::new(Barrier::new(2)); + let b2 = Arc::clone(&b1); + + let sink = OutputSink::new(Vec::new(), Vec::new()); + let logger1 = sink.logger(OutputClientBehavior::Passthrough); + let logger2 = sink.logger(OutputClientBehavior::Passthrough); + std::thread::scope(|s| { + s.spawn(move || { + let mut out = logger1.stdout(); + write!(&mut out, "task 1:").unwrap(); + b1.wait(); + writeln!(&mut out, " echo building").unwrap(); + assert!(logger1.finish().unwrap().is_none()); + }); + s.spawn(move || { + let mut out = logger2.stdout(); + write!(&mut out, "task 2:").unwrap(); + b2.wait(); + writeln!(&mut out, " echo failing").unwrap(); + assert!(logger2.finish().unwrap().is_none(),); + }); + }); + let SinkWriters { out, .. } = Arc::into_inner(sink.writers).unwrap().into_inner().unwrap(); + let out = String::from_utf8(out).unwrap(); + for line in out.lines() { + assert!(line.starts_with("task ")); + } + } + #[test] fn assert_output_writer_sync() { // This is the bound required for a value to be held across an await