Skip to content

Commit

Permalink
fix: support non UTF-8 logs (#6167)
Browse files Browse the repository at this point in the history
### Description

This PR adds support for non UTF-8 output from tasks. Previously we
would fail if a task produced UTF-8 when attempting to convert it into a
`String` via either
[`BufRead::lines`](https://doc.rust-lang.org/std/io/trait.BufRead.html#method.lines)
or
[`BufRead::read_line`](https://doc.rust-lang.org/std/io/trait.BufRead.html#method.read_line)

I'd suggest reviewing this by-commit as each commit should either:
 - add a failing test
 - fix the failing test that was added in the previous commit

### Testing Instructions

Added failing unit tests for dealing with bytes that aren't valid UTF-8.

Tested the behavior of how we handle non-utf8 output in `turbo`:
```
# Script that produces invalid utf8
[0 olszewski@chriss-mbp] /tmp/global $ cat packages/ui/nasty.sh
printf "%b" '\x00\x9f\x92\x96'
[0 olszewski@chriss-mbp] /tmp/global $ turbo_dev build --experimental-rust-codepath --filter=ui --force
ui:build: cache bypass, force executing c1a3e94329df105c
ui:build: 
ui:build: > ui@0.0.0 build /private/tmp/global/packages/ui
ui:build: > ./nasty.sh
ui:build: 
ui:build:
[0 olszewski@chriss-mbp] /tmp/global $ turbo_dev build --experimental-rust-codepath --filter=ui        
ui:build: cache hit, suppressing logs c1a3e94329df105c
ui:build: 
ui:build: > ui@0.0.0 build /private/tmp/global/packages/ui
ui:build: > ./nasty.sh
ui:build: 
ui:build:
```


Closes TURBO-1460

---------

Co-authored-by: Chris Olszewski <Chris Olszewski>
  • Loading branch information
chris-olszewski committed Oct 12, 2023
1 parent b18ee00 commit 18f190d
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 26 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/turborepo-lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ anyhow = { workspace = true, features = ["backtrace"] }
atty = { workspace = true }
axum = { workspace = true }
axum-server = { workspace = true }
bytelines = "2.4.0"
chrono = { workspace = true, features = ["serde"] }
clap = { workspace = true, features = ["derive", "env"] }
clap_complete = { workspace = true }
Expand Down
87 changes: 66 additions & 21 deletions crates/turborepo-lib/src/process/child.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ use std::{
time::Duration,
};

use bytelines::AsyncByteLines;
use command_group::AsyncCommandGroup;
use futures::future::{try_join3, try_join4};
use itertools::Itertools;
pub use tokio::process::Command;
use tokio::{
io::{AsyncBufReadExt, AsyncRead, BufReader},
io::{AsyncRead, BufReader},
join,
sync::{mpsc, watch, RwLock},
};
Expand Down Expand Up @@ -375,15 +376,12 @@ impl Child {
mut sink: W,
) -> std::io::Result<()> {
let Some(stream) = stream else { return Ok(()) };
let mut stream = BufReader::new(stream);
let mut buffer = String::new();
loop {
// If 0 bytes are read that indicates we've hit EOF
if stream.read_line(&mut buffer).await? == 0 {
break;
}
sink.write_all(buffer.as_bytes())?;
buffer.clear();
let stream = BufReader::new(stream);
let mut lines = AsyncByteLines::new(stream);
while let Some(line) = lines.next().await? {
sink.write_all(line)?;
// Line iterator doesn't return newline delimiter so we must add it here
sink.write_all(b"\n")?;
}
Ok(())
}
Expand All @@ -407,17 +405,22 @@ impl Child {
// but allows us to provide our own sinks instead of just writing to a buffers.
async fn pipe_lines<R: AsyncRead + Unpin>(
stream: Option<R>,
tx: mpsc::Sender<String>,
tx: mpsc::Sender<Vec<u8>>,
) -> std::io::Result<()> {
let Some(stream) = stream else { return Ok(()) };
let mut stream = BufReader::new(stream);
loop {
let mut buffer = String::new();
// If 0 bytes are read that indicates we've hit EOF
if stream.read_line(&mut buffer).await? == 0 {
break;
}
if tx.send(buffer).await.is_err() {
let stream = BufReader::new(stream);
let mut lines = AsyncByteLines::new(stream);
while let Some(line) = lines.next().await? {
let line = {
// Allocate vector with enough capacity for trailing newline
let mut buffer = Vec::with_capacity(line.len() + 1);
buffer.extend_from_slice(line);
// Line iterator doesn't return newline delimiter so we must add it here
buffer.push(b'\n');
buffer
};

if tx.send(line).await.is_err() {
// If the receiver is dropped then we have nothing to do with these bytes
break;
}
Expand All @@ -426,11 +429,11 @@ impl Child {
}

async fn write_lines<W: Write>(
mut rx: mpsc::Receiver<String>,
mut rx: mpsc::Receiver<Vec<u8>>,
mut writer: W,
) -> std::io::Result<()> {
while let Some(buffer) = rx.recv().await {
writer.write_all(buffer.as_bytes())?;
writer.write_all(&buffer)?;
}
Ok(())
}
Expand Down Expand Up @@ -733,4 +736,46 @@ mod test {
assert!(buffer == b"hello world\nhello moon\n" || buffer == b"hello moon\nhello world\n");
assert_matches!(exit, Some(ChildExit::Finished(Some(0))));
}

#[tokio::test]
async fn test_wait_with_with_non_utf8_output() {
let script = find_script_dir().join_component("hello_non_utf8.js");
let mut cmd = Command::new("node");
cmd.args([script.as_std_path()]);
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
let mut child = Child::spawn(cmd, ShutdownStyle::Kill).unwrap();

let mut out = Vec::new();
let mut err = Vec::new();

let exit = child
.wait_with_piped_outputs(&mut out, &mut err)
.await
.unwrap();

assert_eq!(out, &[0, 159, 146, 150, b'\n']);
assert!(err.is_empty());
assert_matches!(exit, Some(ChildExit::Finished(Some(0))));
}

#[tokio::test]
async fn test_wait_with_non_utf8_single_output() {
let script = find_script_dir().join_component("hello_non_utf8.js");
let mut cmd = Command::new("node");
cmd.args([script.as_std_path()]);
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
let mut child = Child::spawn(cmd, ShutdownStyle::Kill).unwrap();

let mut buffer = Vec::new();

let exit = child
.wait_with_single_piped_output(&mut buffer)
.await
.unwrap();

assert_eq!(buffer, &[0, 159, 146, 150, b'\n']);
assert_matches!(exit, Some(ChildExit::Finished(Some(0))));
}
}
5 changes: 5 additions & 0 deletions crates/turborepo-lib/test/scripts/hello_non_utf8.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
const process = require("node:process");
process.stdout.write(new Uint8Array([0, 159, 146, 150]), () => {
// Print newline
console.log();
});
1 change: 1 addition & 0 deletions crates/turborepo-ui/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ test-case = { workspace = true }

[dependencies]
atty = { workspace = true }
bytelines = "2.4.0"
console = { workspace = true }
indicatif = { workspace = true }
lazy_static = { workspace = true }
Expand Down
35 changes: 30 additions & 5 deletions crates/turborepo-ui/src/logs.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::{
fs::File,
io::{BufRead, BufReader, BufWriter, Write},
io::{BufReader, BufWriter, Write},
};

use bytelines::ByteLines;
use tracing::{debug, warn};
use turbopath::AbsoluteSystemPath;

Expand Down Expand Up @@ -88,11 +89,18 @@ pub fn replay_logs<W: Write>(
Error::CannotReadLogs(err)
})?;

// Construct a PrefixedWriter which allows for non UTF-8 bytes to be written to
// it.
let mut prefixed_writer = output.output_prefixed_writer();
let log_reader = BufReader::new(log_file);

for line in log_reader.lines() {
let line = line.map_err(Error::CannotReadLogs)?;
output.output(line);
let lines = ByteLines::new(log_reader);

for line in lines.into_iter() {
let mut line = line.map_err(Error::CannotReadLogs)?;
line.push(b'\n');
prefixed_writer
.write_all(&line)
.map_err(Error::CannotReadLogs)?;
}

debug!("finish replaying logs");
Expand Down Expand Up @@ -172,4 +180,21 @@ mod tests {

Ok(())
}

#[test]
fn test_replay_logs_invalid_utf8() -> Result<()> {
let ui = UI::new(true);
let mut output = Vec::new();
let mut err = Vec::new();
let mut prefixed_ui = PrefixedUI::new(ui, &mut output, &mut err)
.with_output_prefix(CYAN.apply_to(">".to_string()))
.with_warn_prefix(BOLD.apply_to(">!".to_string()));
let dir = tempdir()?;
let log_file_path = AbsoluteSystemPathBuf::try_from(dir.path().join("test.txt"))?;
fs::write(&log_file_path, [0, 159, 146, 150, b'\n'])?;
replay_logs(&mut prefixed_ui, &log_file_path)?;

assert_eq!(output, [b'>', 0, 159, 146, 150, b'\n']);
Ok(())
}
}
13 changes: 13 additions & 0 deletions crates/turborepo-ui/src/prefixed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,19 @@ impl<W: Write> PrefixedUI<W> {
error!("cannot write to logs: {:?}", err);
}
}

/// Construct a PrefixedWriter which will behave the same as `output`, but
/// without the requirement that messages be valid UTF-8
pub(crate) fn output_prefixed_writer(&mut self) -> PrefixedWriter<&mut W> {
PrefixedWriter {
prefix: self
.output_prefix
.as_ref()
.map(|prefix| prefix.to_string())
.unwrap_or_default(),
writer: &mut self.out,
}
}
}

//
Expand Down

0 comments on commit 18f190d

Please sign in to comment.