From 3742e44948ce8276f26cc79190b66485e3b8fca6 Mon Sep 17 00:00:00 2001 From: Tavian Barnes Date: Mon, 24 Oct 2022 10:17:02 -0400 Subject: [PATCH 1/2] walk: Switch back to crossbeam-channel Fixes #933. Fixes #1060. Fixes #1113. --- Cargo.lock | 11 +++++++++++ Cargo.toml | 1 + src/exec/job.rs | 12 ++++-------- src/walk.rs | 14 ++++---------- 4 files changed, 20 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e856d0da3..0b21f8ef6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -162,6 +162,16 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc" +[[package]] +name = "crossbeam-channel" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.12" @@ -295,6 +305,7 @@ dependencies = [ "chrono", "clap", "clap_complete", + "crossbeam-channel", "ctrlc", "diff", "dirs-next", diff --git a/Cargo.toml b/Cargo.toml index 5f627fd62..fad68d532 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ dirs-next = "2.0" normpath = "0.3.2" chrono = "0.4" once_cell = "1.15.0" +crossbeam-channel = "0.5.6" [dependencies.clap] version = "3.1" diff --git a/src/exec/job.rs b/src/exec/job.rs index b803f79c8..1617ea752 100644 --- a/src/exec/job.rs +++ b/src/exec/job.rs @@ -1,6 +1,7 @@ -use std::sync::mpsc::Receiver; use std::sync::{Arc, Mutex}; +use crossbeam_channel::Receiver; + use crate::config::Config; use crate::dir_entry::DirEntry; use crate::error::print_error; @@ -13,7 +14,7 @@ use super::CommandSet; /// generate a command with the supplied command template. The generated command will then /// be executed, and this process will continue until the receiver's sender has closed. pub fn job( - rx: Arc>>, + rx: Receiver, cmd: Arc, out_perm: Arc>, config: &Config, @@ -23,12 +24,9 @@ pub fn job( let mut results: Vec = Vec::new(); loop { - // Create a lock on the shared receiver for this thread. - let lock = rx.lock().unwrap(); - // Obtain the next result from the receiver, else if the channel // has closed, exit from the loop - let dir_entry: DirEntry = match lock.recv() { + let dir_entry: DirEntry = match rx.recv() { Ok(WorkerResult::Entry(dir_entry)) => dir_entry, Ok(WorkerResult::Error(err)) => { if config.show_filesystem_errors { @@ -39,8 +37,6 @@ pub fn job( Err(_) => break, }; - // Drop the lock so that other threads can read from the receiver. - drop(lock); // Generate a command, execute it and store its exit code. results.push(cmd.execute( dir_entry.stripped_path(config), diff --git a/src/walk.rs b/src/walk.rs index 799828224..ef618ab83 100644 --- a/src/walk.rs +++ b/src/walk.rs @@ -3,13 +3,13 @@ use std::io; use std::mem; use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; use std::sync::{Arc, Mutex}; use std::thread; use std::time::{Duration, Instant}; use std::{borrow::Cow, io::Write}; use anyhow::{anyhow, Result}; +use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}; use ignore::overrides::OverrideBuilder; use ignore::{self, WalkBuilder}; use regex::bytes::Regex; @@ -51,7 +51,7 @@ pub const DEFAULT_MAX_BUFFER_TIME: Duration = Duration::from_millis(100); /// path will simply be written to standard output. pub fn scan(paths: &[PathBuf], pattern: Arc, config: Arc) -> Result { let first_path = &paths[0]; - let (tx, rx) = channel(); + let (tx, rx) = unbounded(); let mut override_builder = OverrideBuilder::new(first_path); @@ -222,11 +222,7 @@ impl ReceiverBuffer { match self.mode { ReceiverMode::Buffering => { // Wait at most until we should switch to streaming - let now = Instant::now(); - self.deadline - .checked_duration_since(now) - .ok_or(RecvTimeoutError::Timeout) - .and_then(|t| self.rx.recv_timeout(t)) + self.rx.recv_deadline(self.deadline) } ReceiverMode::Streaming => { // Wait however long it takes for a result @@ -345,15 +341,13 @@ fn spawn_receiver( if cmd.in_batch_mode() { exec::batch(rx, cmd, &config) } else { - let shared_rx = Arc::new(Mutex::new(rx)); - let out_perm = Arc::new(Mutex::new(())); // Each spawned job will store it's thread handle in here. let mut handles = Vec::with_capacity(threads); for _ in 0..threads { let config = Arc::clone(&config); - let rx = Arc::clone(&shared_rx); + let rx = rx.clone(); let cmd = Arc::clone(cmd); let out_perm = Arc::clone(&out_perm); From 5cf0c66f6bbfa5c1b107f55b8982ecc422ae00f5 Mon Sep 17 00:00:00 2001 From: Tavian Barnes Date: Mon, 24 Oct 2022 10:20:46 -0400 Subject: [PATCH 2/2] walk: Use a bounded queue. Fixes #918. --- src/walk.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/walk.rs b/src/walk.rs index ef618ab83..b5b665070 100644 --- a/src/walk.rs +++ b/src/walk.rs @@ -9,7 +9,7 @@ use std::time::{Duration, Instant}; use std::{borrow::Cow, io::Write}; use anyhow::{anyhow, Result}; -use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}; +use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender}; use ignore::overrides::OverrideBuilder; use ignore::{self, WalkBuilder}; use regex::bytes::Regex; @@ -51,7 +51,9 @@ pub const DEFAULT_MAX_BUFFER_TIME: Duration = Duration::from_millis(100); /// path will simply be written to standard output. pub fn scan(paths: &[PathBuf], pattern: Arc, config: Arc) -> Result { let first_path = &paths[0]; - let (tx, rx) = unbounded(); + + // Channel capacity was chosen empircally to perform similarly to an unbounded channel + let (tx, rx) = bounded(0x4000 * config.threads); let mut override_builder = OverrideBuilder::new(first_path);