Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

walk: Encapsulate the buffering behavior in a struct #895

Merged
merged 2 commits into from
Dec 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ dirs-next = "2.0"
normpath = "0.3"
chrono = "0.4"
once_cell = "1.8.0"
crossbeam-channel = "0.5.1"

[dependencies.clap]
version = "2.31.3"
Expand Down
3 changes: 2 additions & 1 deletion src/exec/job.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::path::PathBuf;
use std::sync::mpsc::Receiver;
use std::sync::{Arc, Mutex};

use crossbeam_channel::Receiver;

use crate::error::print_error;
use crate::exit_codes::{merge_exitcodes, ExitCode};
use crate::walk::WorkerResult;
Expand Down
245 changes: 160 additions & 85 deletions src/walk.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use std::ffi::OsStr;
use std::fs::{FileType, Metadata};
use std::io;
use std::mem;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time;
use std::time::{Duration, Instant};
use std::{borrow::Cow, io::Write};

use anyhow::{anyhow, Result};
use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender};
use ignore::overrides::OverrideBuilder;
use ignore::{self, WalkBuilder};
use once_cell::unsync::OnceCell;
Expand All @@ -23,6 +24,7 @@ use crate::filesystem;
use crate::output;

/// The receiver thread can either be buffering results or directly streaming to the console.
#[derive(PartialEq)]
enum ReceiverMode {
/// Receiver is still buffering in order to sort the results, if the search finishes fast
/// enough.
Expand All @@ -41,7 +43,7 @@ pub enum WorkerResult {
/// Maximum size of the output buffer before flushing results to the console
pub const MAX_BUFFER_LENGTH: usize = 1000;
/// Default duration until output buffering switches to streaming.
pub const DEFAULT_MAX_BUFFER_TIME: time::Duration = time::Duration::from_millis(100);
pub const DEFAULT_MAX_BUFFER_TIME: Duration = Duration::from_millis(100);

/// Recursively scan the given search path for files / pathnames matching the pattern.
///
Expand All @@ -53,7 +55,7 @@ pub fn scan(path_vec: &[PathBuf], pattern: Arc<Regex>, config: Arc<Config>) -> R
let first_path_buf = path_iter
.next()
.expect("Error: Path vector can not be empty");
let (tx, rx) = channel();
let (tx, rx) = unbounded();

let mut override_builder = OverrideBuilder::new(first_path_buf.as_path());

Expand Down Expand Up @@ -160,6 +162,157 @@ pub fn scan(path_vec: &[PathBuf], pattern: Arc<Regex>, config: Arc<Config>) -> R
}
}

/// Wrapper for the receiver thread's buffering behavior.
struct ReceiverBuffer<W> {
/// The configuration.
config: Arc<Config>,
/// The ^C notifier.
wants_to_quit: Arc<AtomicBool>,
/// Receiver for worker results.
rx: Receiver<WorkerResult>,
/// Standard output.
stdout: W,
/// The current buffer mode.
mode: ReceiverMode,
/// The deadline to switch to streaming mode.
deadline: Instant,
/// The buffer of quickly received paths.
buffer: Vec<PathBuf>,
/// Result count.
num_results: usize,
}

impl<W: Write> ReceiverBuffer<W> {
/// Create a new receiver buffer.
fn new(
config: Arc<Config>,
wants_to_quit: Arc<AtomicBool>,
rx: Receiver<WorkerResult>,
stdout: W,
) -> Self {
let max_buffer_time = config.max_buffer_time.unwrap_or(DEFAULT_MAX_BUFFER_TIME);
let deadline = Instant::now() + max_buffer_time;

Self {
config,
wants_to_quit,
rx,
stdout,
mode: ReceiverMode::Buffering,
deadline,
buffer: Vec::with_capacity(MAX_BUFFER_LENGTH),
num_results: 0,
}
}

/// Process results until finished.
fn process(&mut self) -> ExitCode {
loop {
if let Err(ec) = self.poll() {
return ec;
}
}
}

/// Receive the next worker result.
fn recv(&self) -> Result<WorkerResult, RecvTimeoutError> {
match self.mode {
ReceiverMode::Buffering => {
// Wait at most until we should switch to streaming
self.rx.recv_deadline(self.deadline)
}
ReceiverMode::Streaming => {
// Wait however long it takes for a result
Ok(self.rx.recv()?)
}
}
}

/// Wait for a result or state change.
fn poll(&mut self) -> Result<(), ExitCode> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little weird that this function is poll but it isn't actually part of a Future. I don't have a better name though. It's just a little unfortunate that it happens to share a name with the most important function of a Future.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fn poll(&mut self) -> Result<(), ExitCode> {
fn poll(&mut self) -> ControlFlow<ExitCode> {

Would probably be a better return type here, but that would require increasing the MSRV to 1.55.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't realize ControlFlow was stabilized. I'll leave it up to @sharkdp whether that MSRV bump would be acceptable.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not worth bumping MSRV just for this, but may be worth changing when we do bump it.

match self.recv() {
Ok(WorkerResult::Entry(path)) => {
if self.config.quiet {
return Err(ExitCode::HasResults(true));
}

match self.mode {
ReceiverMode::Buffering => {
self.buffer.push(path);
if self.buffer.len() > MAX_BUFFER_LENGTH {
self.stream()?;
}
}
ReceiverMode::Streaming => {
self.print(&path);
self.flush()?;
}
}

self.num_results += 1;
if let Some(max_results) = self.config.max_results {
if self.num_results >= max_results {
return self.stop();
}
}
}
Ok(WorkerResult::Error(err)) => {
if self.config.show_filesystem_errors {
print_error(err.to_string());
}
}
Err(RecvTimeoutError::Timeout) => {
self.stream()?;
}
Err(RecvTimeoutError::Disconnected) => {
return self.stop();
}
}

Ok(())
}

/// Output a path.
fn print(&mut self, path: &Path) {
output::print_entry(&mut self.stdout, path, &self.config, &self.wants_to_quit)
}

/// Switch ourselves into streaming mode.
fn stream(&mut self) -> Result<(), ExitCode> {
self.mode = ReceiverMode::Streaming;

let buffer = mem::take(&mut self.buffer);
for path in buffer {
self.print(&path);
}

self.flush()
}

/// Stop looping.
fn stop(&mut self) -> Result<(), ExitCode> {
if self.mode == ReceiverMode::Buffering {
self.buffer.sort();
self.stream()?;
}

if self.config.quiet {
Err(ExitCode::HasResults(self.num_results > 0))
} else {
Err(ExitCode::Success)
}
}

/// Flush stdout if necessary.
fn flush(&mut self) -> Result<(), ExitCode> {
if self.config.interactive_terminal && self.stdout.flush().is_err() {
// Probably a broken pipe. Exit gracefully.
return Err(ExitCode::GeneralError);
}
Ok(())
}
}

fn spawn_receiver(
config: &Arc<Config>,
wants_to_quit: &Arc<AtomicBool>,
Expand Down Expand Up @@ -218,90 +371,12 @@ fn spawn_receiver(
merge_exitcodes(exit_codes)
}
} else {
let start = time::Instant::now();

// Start in buffering mode
let mut mode = ReceiverMode::Buffering;

// Maximum time to wait before we start streaming to the console.
let max_buffer_time = config.max_buffer_time.unwrap_or(DEFAULT_MAX_BUFFER_TIME);

let stdout = io::stdout();
let stdout = stdout.lock();
let mut stdout = io::BufWriter::new(stdout);

let mut num_results = 0;
let is_interactive = config.interactive_terminal;
let mut buffer = Vec::with_capacity(MAX_BUFFER_LENGTH);
for worker_result in rx {
match worker_result {
WorkerResult::Entry(path) => {
if config.quiet {
return ExitCode::HasResults(true);
}
let stdout = io::BufWriter::new(stdout);

match mode {
ReceiverMode::Buffering => {
buffer.push(path);

// Have we reached the maximum buffer size or maximum buffering time?
if buffer.len() > MAX_BUFFER_LENGTH
|| start.elapsed() > max_buffer_time
{
// Flush the buffer
for path in &buffer {
output::print_entry(
&mut stdout,
path,
&config,
&wants_to_quit,
);
}
buffer.clear();
if is_interactive && stdout.flush().is_err() {
// Probably a broken pipe. Exit gracefully.
return ExitCode::GeneralError;
}
// Start streaming
mode = ReceiverMode::Streaming;
}
}
ReceiverMode::Streaming => {
output::print_entry(&mut stdout, &path, &config, &wants_to_quit);
if is_interactive && stdout.flush().is_err() {
// Probably a broken pipe. Exit gracefully.
return ExitCode::GeneralError;
}
}
}

num_results += 1;
if let Some(max_results) = config.max_results {
if num_results >= max_results {
break;
}
}
}
WorkerResult::Error(err) => {
if show_filesystem_errors {
print_error(err.to_string());
}
}
}
}

// If we have finished fast enough (faster than max_buffer_time), we haven't streamed
// anything to the console, yet. In this case, sort the results and print them:
buffer.sort();
for value in buffer {
output::print_entry(&mut stdout, &value, &config, &wants_to_quit);
}

if config.quiet {
ExitCode::HasResults(false)
} else {
ExitCode::Success
}
let mut rxbuffer = ReceiverBuffer::new(config, wants_to_quit, rx, stdout);
rxbuffer.process()
}
})
}
Expand Down