Skip to content

Commit

Permalink
Switch from std::sync::mpsc to flume
Browse files Browse the repository at this point in the history
  • Loading branch information
tavianator committed Jan 11, 2022
1 parent 125cb81 commit 81ad851
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 4 deletions.
150 changes: 150 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ dirs-next = "2.0"
normpath = "0.3.2"
chrono = "0.4"
once_cell = "1.9.0"
flume = "0.10.9"

[dependencies.clap]
version = "3.0"
Expand Down
3 changes: 2 additions & 1 deletion src/exec/job.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::path::PathBuf;
use std::sync::mpsc::Receiver;
use std::sync::{Arc, Mutex};

use flume::Receiver;

use crate::error::print_error;
use crate::exit_codes::{merge_exitcodes, ExitCode};
use crate::walk::WorkerResult;
Expand Down
6 changes: 3 additions & 3 deletions src/walk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ use std::io;
use std::mem;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use std::{borrow::Cow, io::Write};

use anyhow::{anyhow, Result};
use flume::{unbounded, Receiver, RecvTimeoutError, Sender};
use ignore::overrides::OverrideBuilder;
use ignore::{self, WalkBuilder};
use once_cell::unsync::OnceCell;
Expand Down Expand Up @@ -55,7 +55,7 @@ pub fn scan(path_vec: &[PathBuf], pattern: Arc<Regex>, config: Arc<Config>) -> R
let first_path_buf = path_iter
.next()
.expect("Error: Path vector can not be empty");
let (tx, rx) = channel();
let (tx, rx) = unbounded();

let mut override_builder = OverrideBuilder::new(first_path_buf.as_path());

Expand Down Expand Up @@ -240,7 +240,7 @@ impl<W: Write> ReceiverBuffer<W> {
}
ReceiverMode::Streaming => {
// Wait however long it takes for a result
Ok(self.rx.recv()?)
self.rx.recv().map_err(|_| RecvTimeoutError::Disconnected)
}
}
}
Expand Down

0 comments on commit 81ad851

Please sign in to comment.