Skip to content

Commit

Permalink
feat(ui): persist all logs (#7822)
Browse files Browse the repository at this point in the history
### Description

This PR changes UI behavior so all task output gets persisted to
terminal after the run is finished.

Non-goals of the PR: respecting `--output-mode` for what logs get
persisted.

This PR also ditches the "priority" channel for communicating with the
rendering thread. This was intended to avoid a scenario where tasks were
producing so many events that it took awhile to get to a "stop" event.
In reality this introduces so many race conditions of a "stop" event
causing events that happened not to be processed leading to missing
information e.g. dropping the last line of task output since the stop
event takes priority.

### Testing Instructions

Run `turbo` and verify that logs get persisted under various scenarios:
Cache misses:
<img width="1124" alt="Screenshot 2024-03-22 at 1 56 35 PM"
src="https://github.com/vercel/turbo/assets/4131117/313aa99f-72d8-49b9-b61b-d75d5c9632fa">


Failures (not displaying tasks that didn't start):
<img width="1130" alt="Screenshot 2024-03-22 at 1 55 59 PM"
src="https://github.com/vercel/turbo/assets/4131117/ee019c49-d7a4-4370-a44f-ef4da764bfda">

Cache hits:
<img width="1128" alt="Screenshot 2024-03-22 at 1 55 25 PM"
src="https://github.com/vercel/turbo/assets/4131117/097c1ea8-02c7-46b0-bb16-6fbcc82cf292">




Closes TURBO-2692
  • Loading branch information
chris-olszewski committed Mar 25, 2024
1 parent c7f8019 commit 4fac8a6
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 27 deletions.
6 changes: 0 additions & 6 deletions crates/turborepo-lib/src/task_graph/visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -982,12 +982,6 @@ impl ExecContext {
if let Err(e) = stdout_writer.flush() {
error!("error flushing logs: {e}");
}
if let TaskOutput::UI(task) = output_client {
let mut persisted_ui = self.prefixed_ui(task.stdout(), task.stdout());
let _ = self
.task_cache
.replay_log_file(persisted_ui.output_prefixed_writer());
}
if let Err(e) = self
.task_cache
.on_error(prefixed_ui.output_prefixed_writer())
Expand Down
9 changes: 7 additions & 2 deletions crates/turborepo-ui/src/tui/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ fn run_app_inner<B: Backend>(
let mut last_render = Instant::now();

while let Some(event) = poll(app.interact, &receiver, last_render + FRAMERATE) {
if let Some(message) = update(&mut app, event)? {
if let Some(message) = update(terminal, &mut app, event)? {
persist_bytes(terminal, &message)?;
}
if app.done {
Expand All @@ -132,6 +132,9 @@ fn run_app_inner<B: Backend>(
}
}

let started_tasks = app.table.tasks_started().collect();
app.pane.render_remaining(started_tasks, terminal)?;

Ok(())
}

Expand Down Expand Up @@ -175,7 +178,8 @@ fn cleanup<B: Backend + io::Write>(mut terminal: Terminal<B>) -> io::Result<()>
Ok(())
}

fn update(
fn update<B: Backend>(
terminal: &mut Terminal<B>,
app: &mut App<Box<dyn io::Write + Send>>,
event: Event,
) -> Result<Option<Vec<u8>>, Error> {
Expand All @@ -197,6 +201,7 @@ fn update(
}
Event::EndTask { task } => {
app.table.finish_task(&task)?;
app.pane.render_screen(&task, terminal)?;
}
Event::Up => {
app.previous();
Expand Down
23 changes: 6 additions & 17 deletions crates/turborepo-ui/src/tui/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,11 @@ use crate::LineWriter;
#[derive(Debug, Clone)]
pub struct AppSender {
primary: mpsc::Sender<Event>,
priority: mpsc::Sender<Event>,
}

/// Struct for receiving app events
pub struct AppReceiver {
primary: mpsc::Receiver<Event>,
priority: mpsc::Receiver<Event>,
}

/// Struct for sending events related to a specific task
Expand Down Expand Up @@ -45,15 +43,12 @@ impl AppSender {
/// AppReceiver should be passed to `crate::tui::run_app`
pub fn new() -> (Self, AppReceiver) {
let (primary_tx, primary_rx) = mpsc::channel();
let (priority_tx, priority_rx) = mpsc::channel();
(
Self {
primary: primary_tx,
priority: priority_tx,
},
AppReceiver {
primary: primary_rx,
priority: priority_rx,
},
)
}
Expand All @@ -69,9 +64,8 @@ impl AppSender {

/// Stop rendering TUI and restore terminal to default configuration
pub fn stop(&self) {
// Send stop events in both channels, if receiver has dropped ignore error as
// Send stop event, if receiver has dropped ignore error as
// it'll be a no-op.
self.priority.send(Event::Stop).ok();
self.primary.send(Event::Stop).ok();
}
}
Expand All @@ -80,15 +74,10 @@ impl AppReceiver {
/// Receive an event, producing a tick event if no events are received by
/// the deadline.
pub fn recv(&self, deadline: Instant) -> Result<Event, mpsc::RecvError> {
// If there's an event in the priority queue take from that first
if let Ok(event) = self.priority.try_recv() {
Ok(event)
} else {
match self.primary.recv_deadline(deadline) {
Ok(event) => Ok(event),
Err(mpsc::RecvTimeoutError::Timeout) => Ok(Event::Tick),
Err(mpsc::RecvTimeoutError::Disconnected) => Err(mpsc::RecvError),
}
match self.primary.recv_deadline(deadline) {
Ok(event) => Ok(event),
Err(mpsc::RecvTimeoutError::Timeout) => Ok(Event::Tick),
Err(mpsc::RecvTimeoutError::Disconnected) => Err(mpsc::RecvError),
}
}
}
Expand Down Expand Up @@ -183,7 +172,7 @@ impl std::io::Write for PersistedWriterInner {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let bytes = buf.to_vec();
self.handle
.priority
.primary
.send(Event::Log { message: bytes })
.map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "receiver dropped"))?;
Ok(buf.len())
Expand Down
57 changes: 55 additions & 2 deletions crates/turborepo-ui/src/tui/pane.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
use std::{collections::BTreeMap, io::Write};
use std::{
collections::{BTreeMap, HashSet},
io::Write,
};

use ratatui::{
backend::Backend,
style::Style,
widgets::{Block, Borders, Widget},
widgets::{
block::{Position, Title},
Block, Borders, Widget,
},
Terminal,
};
use tracing::debug;
use tui_term::widget::PseudoTerminal;
Expand All @@ -23,6 +31,7 @@ struct TerminalOutput<W> {
cols: u16,
parser: vt100::Parser,
stdin: Option<W>,
has_been_persisted: bool,
}

impl<W> TerminalPane<W> {
Expand Down Expand Up @@ -102,6 +111,28 @@ impl<W> TerminalPane<W> {
Ok(())
}

pub fn render_screen<B: Backend>(
&mut self,
task_name: &str,
terminal: &mut Terminal<B>,
) -> Result<(), Error> {
let task = self.task_mut(task_name)?;
task.persist_screen(task_name, terminal)
}

pub fn render_remaining<B: Backend>(
&mut self,
started_tasks: HashSet<&str>,
terminal: &mut Terminal<B>,
) -> Result<(), Error> {
for (task_name, task) in self.tasks.iter_mut() {
if !task.has_been_persisted && started_tasks.contains(task_name.as_str()) {
task.persist_screen(task_name, terminal)?;
}
}
Ok(())
}

fn selected(&self) -> Option<(&String, &TerminalOutput<W>)> {
let task_name = self.displayed.as_deref()?;
self.tasks.get_key_value(task_name)
Expand Down Expand Up @@ -141,6 +172,7 @@ impl<W> TerminalOutput<W> {
stdin,
rows,
cols,
has_been_persisted: false,
}
}

Expand All @@ -151,6 +183,27 @@ impl<W> TerminalOutput<W> {
self.rows = rows;
self.cols = cols;
}

fn persist_screen<B: Backend>(
&mut self,
task_name: &str,
terminal: &mut Terminal<B>,
) -> Result<(), Error> {
let screen = self.parser.entire_screen();
let (rows, _) = screen.size();
let mut cursor = tui_term::widget::Cursor::default();
cursor.hide();
let title = format!(" {task_name} >");
let block = Block::default()
.borders(Borders::ALL)
.title(title.as_str())
.title(Title::from(title.as_str()).position(Position::Bottom));
let term = PseudoTerminal::new(&screen).cursor(cursor).block(block);
terminal.insert_before(rows as u16, |buf| term.render(buf.area, buf))?;
self.has_been_persisted = true;

Ok(())
}
}

impl<W> Widget for &TerminalPane<W> {
Expand Down
7 changes: 7 additions & 0 deletions crates/turborepo-ui/src/tui/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,13 @@ impl TaskTable {
}
}

pub fn tasks_started(&self) -> impl Iterator<Item = &str> + '_ {
self.finished
.iter()
.map(|task| task.name())
.chain(self.running.iter().map(|task| task.name()))
}

fn finished_rows(&self, duration_width: u16) -> impl Iterator<Item = Row> + '_ {
self.finished.iter().map(move |task| {
Row::new(vec![
Expand Down

0 comments on commit 4fac8a6

Please sign in to comment.