Skip to content

Commit

Permalink
Improve concurrency with streams (#330)
Browse files Browse the repository at this point in the history
* Move to from vec to streams

Previously we collected all inputs in one vector
before checking the links, which is not ideal.
Especially when reading many inputs (e.g. by using a glob pattern),
this could cause issues like running out of file handles.

By moving to streams we avoid that scenario. This is also the first
step towards improving performance for many inputs.

To stay as close to the pre-stream behaviour, we want to stop processing
as soon as an Err value appears in the stream. This is easiest when the
stream is consumed in the main thread.
Previously, the stream was consumed in a tokio task and the main thread
waited for responses.
Now, a tokio task waits for responses (and displays them/registers
response stats) and the main thread sends links to the ClientPool.
To ensure that the main thread waits for all responses to have arrived
before finishing the ProgressBar and printing the stats, it waits for
the show_results_task to finish.


* Return collected links as Stream
* Initialize ProgressBar without length because we can't know the amount of links without blocking
* Handle stream results in main thread, not in task
* Add basic directory support using jwalk
* Add test for HTTP protocol file type (http://)
* Remove deadpool (once again): Replaced with `futures::StreamExt::for_each_concurrent`.
* Refactor main; fix tests
* Move commands into separate submodule
* Simplify input handling
* Simplify collector
* Remove unnecessary unwrap
* Simplify main
* cleanup check
* clean up dump command
* Handle requests in parallel 
* Fix formatting and lints

Co-authored-by: Timo Freiberg <self@timofreiberg.com>
  • Loading branch information
mre and Timo Freiberg committed Dec 1, 2021
1 parent bcd1d67 commit 3d51356
Show file tree
Hide file tree
Showing 22 changed files with 914 additions and 458 deletions.
445 changes: 361 additions & 84 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions examples/client_pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,7 @@ name = "client_pool"
path = "client_pool.rs"

[dependencies]
futures = "0.3.17"
tokio-stream = "0.1.7"
lychee-lib = { path = "../../lychee-lib", version = "0.8.1" }
tokio = { version = "1.14.0", features = ["full"] }
25 changes: 15 additions & 10 deletions examples/client_pool/client_pool.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use lychee_lib::{ClientBuilder, ClientPool, Input, Request, Result, Uri};
use lychee_lib::{ClientBuilder, Input, Request, Result, Uri};
use std::convert::TryFrom;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;

const CONCURRENT_REQUESTS: usize = 4;

#[tokio::main]
#[allow(clippy::trivial_regex)]
async fn main() -> Result<()> {
// These channels are used to send requests and receive responses to and
// from the lychee client pool
// from lychee
let (send_req, recv_req) = mpsc::channel(CONCURRENT_REQUESTS);
let (send_resp, mut recv_resp) = mpsc::channel(CONCURRENT_REQUESTS);

Expand All @@ -18,7 +18,7 @@ async fn main() -> Result<()> {
Input::Stdin,
)];

// Send requests to pool
// Queue requests
tokio::spawn(async move {
for request in requests {
println!("Sending {}", request);
Expand All @@ -29,13 +29,18 @@ async fn main() -> Result<()> {
// Create a default lychee client
let client = ClientBuilder::default().client()?;

// Create a pool with four lychee clients
let clients = vec![client; CONCURRENT_REQUESTS];
let mut clients = ClientPool::new(send_resp, recv_req, clients);

// Handle requests in a client pool
// Start receiving requests
// Requests get streamed into the client and run concurrently
tokio::spawn(async move {
clients.listen().await;
futures::StreamExt::for_each_concurrent(
ReceiverStream::new(recv_req),
CONCURRENT_REQUESTS,
|req| async {
let resp = client.check(req).await.unwrap();
send_resp.send(resp).await.unwrap();
},
)
.await;
});

// Finally, listen to incoming responses from lychee
Expand Down
1 change: 1 addition & 0 deletions examples/collect_links/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ lychee-lib = { path = "../../lychee-lib", version = "0.8.1" }
tokio = { version = "1.14.0", features = ["full"] }
regex = "1.4.6"
http = "0.2.5"
tokio-stream = "0.1.7"
reqwest = { version = "0.11.7", features = ["gzip"] }
5 changes: 4 additions & 1 deletion examples/collect_links/collect_links.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use lychee_lib::{Collector, Input, Result};
use reqwest::Url;
use std::path::PathBuf;
use tokio_stream::StreamExt;

#[tokio::main]
#[allow(clippy::trivial_regex)]
async fn main() -> Result<()> {
// Collect all links from the following inputs
let inputs: &[Input] = &[
let inputs = vec![
Input::RemoteUrl(Box::new(
Url::parse("https://github.com/lycheeverse/lychee").unwrap(),
)),
Expand All @@ -21,6 +22,8 @@ async fn main() -> Result<()> {
.collect_links(
inputs, // base url or directory
)
.await
.collect::<Result<Vec<_>>>()
.await?;

dbg!(links);
Expand Down
2 changes: 2 additions & 0 deletions lychee-bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ structopt = "0.3.25"
tabled = "0.3.0"
tokio = { version = "1.14.0", features = ["full"] }
toml = "0.5.8"
futures = "0.3.17"
tokio-stream = "0.1.7"
once_cell = "1.8.0"

[dev-dependencies]
Expand Down
52 changes: 52 additions & 0 deletions lychee-bin/src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use crate::options::Config;
use crate::parse::{parse_basic_auth, parse_headers, parse_statuscodes, parse_timeout};
use anyhow::{Context, Result};
use headers::HeaderMapExt;
use lychee_lib::{Client, ClientBuilder};
use regex::RegexSet;
use std::iter::FromIterator;
use std::{collections::HashSet, str::FromStr};

/// Creates a client according to the command-line config
pub(crate) fn create(cfg: &Config) -> Result<Client> {
let mut headers = parse_headers(&cfg.headers)?;
if let Some(auth) = &cfg.basic_auth {
let auth_header = parse_basic_auth(auth)?;
headers.typed_insert(auth_header);
}

let accepted = cfg.accept.clone().and_then(|a| parse_statuscodes(&a).ok());
let timeout = parse_timeout(cfg.timeout);
let method: reqwest::Method = reqwest::Method::from_str(&cfg.method.to_uppercase())?;
let include = RegexSet::new(&cfg.include)?;
let exclude = RegexSet::new(&cfg.exclude)?;

// Offline mode overrides the scheme
let schemes = if cfg.offline {
vec!["file".to_string()]
} else {
cfg.scheme.clone()
};

ClientBuilder::builder()
.includes(include)
.excludes(exclude)
.exclude_all_private(cfg.exclude_all_private)
.exclude_private_ips(cfg.exclude_private)
.exclude_link_local_ips(cfg.exclude_link_local)
.exclude_loopback_ips(cfg.exclude_loopback)
.exclude_mail(cfg.exclude_mail)
.max_redirects(cfg.max_redirects)
.user_agent(cfg.user_agent.clone())
.allow_insecure(cfg.insecure)
.custom_headers(headers)
.method(method)
.timeout(timeout)
.github_token(cfg.github_token.clone())
.schemes(HashSet::from_iter(schemes))
.accepted(accepted)
.require_https(cfg.require_https)
.build()
.client()
.context("Failed to create request client")
}
116 changes: 116 additions & 0 deletions lychee-bin/src/commands/check.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
use indicatif::ProgressBar;
use indicatif::ProgressStyle;
use lychee_lib::Result;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;

use crate::{
options::Config,
stats::{color_response, ResponseStats},
ExitCode,
};
use lychee_lib::{Client, Request, Response};

pub(crate) async fn check<S>(
client: Client,
requests: S,
cfg: &Config,
) -> Result<(ResponseStats, ExitCode)>
where
S: futures::Stream<Item = Result<Request>>,
{
let (send_req, recv_req) = mpsc::channel(cfg.max_concurrency);
let (send_resp, mut recv_resp) = mpsc::channel(cfg.max_concurrency);
let max_concurrency = cfg.max_concurrency;
let mut stats = ResponseStats::new();

// Start receiving requests
tokio::spawn(async move {
futures::StreamExt::for_each_concurrent(
ReceiverStream::new(recv_req),
max_concurrency,
|request: Result<Request>| async {
let request: Request = request.expect("cannot read request");
let response = client.check(request).await.expect("cannot check request");
send_resp
.send(response)
.await
.expect("cannot send response to queue");
},
)
.await;
});

let pb = if cfg.no_progress {
None
} else {
let bar = ProgressBar::new_spinner().with_style(ProgressStyle::default_bar().template(
"{spinner:.red.bright} {pos}/{len:.dim} [{elapsed_precise}] {bar:25} {wide_msg}",
));
bar.set_length(0);
bar.set_message("Extracting links");
bar.enable_steady_tick(100);
Some(bar)
};

let bar = pb.clone();
let show_results_task = tokio::spawn({
let verbose = cfg.verbose;
async move {
while let Some(response) = recv_resp.recv().await {
show_progress(&pb, &response, verbose);
stats.add(response);
}
(pb, stats)
}
});

tokio::pin!(requests);

while let Some(request) = requests.next().await {
let request = request?;
if let Some(pb) = &bar {
pb.inc_length(1);
pb.set_message(&request.to_string());
};
send_req
.send(Ok(request))
.await
.expect("Cannot send request");
}
// required for the receiver task to end, which closes send_resp, which allows
// the show_results_task to finish
drop(send_req);

let (pb, stats) = show_results_task.await?;

// Note that print statements may interfere with the progress bar, so this
// must go before printing the stats
if let Some(pb) = &pb {
pb.finish_and_clear();
}

let code = if stats.is_success() {
ExitCode::Success
} else {
ExitCode::LinkCheckFailure
};
Ok((stats, code))
}

fn show_progress(progress_bar: &Option<ProgressBar>, response: &Response, verbose: bool) {
let out = color_response(&response.1);
if let Some(pb) = progress_bar {
pb.inc(1);
pb.set_message(&out);
if verbose {
pb.println(out);
}
} else {
if (response.status().is_success() || response.status().is_excluded()) && !verbose {
return;
}
println!("{}", out);
}
}
52 changes: 52 additions & 0 deletions lychee-bin/src/commands/dump.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use std::io::{self, StdoutLock, Write};

use lychee_lib::Result;
use lychee_lib::{Client, Request};
use tokio_stream::StreamExt;

use crate::ExitCode;

/// Dump all detected links to stdout without checking them
pub(crate) async fn dump<'a, S>(client: Client, requests: S, verbose: bool) -> Result<ExitCode>
where
S: futures::Stream<Item = Result<Request>>,
{
// Lock stdout for better performance
let stdout = io::stdout();
let mut handle = stdout.lock();

tokio::pin!(requests);

while let Some(request) = requests.next().await {
let request = request?;

if client.filtered(&request.uri) {
continue;
}

// Avoid panic on broken pipe.
// See https://github.com/rust-lang/rust/issues/46016
// This can occur when piping the output of lychee
// to another program like `grep`.
if let Err(e) = write(&mut handle, &request, verbose) {
if e.kind() != io::ErrorKind::BrokenPipe {
eprintln!("{}", e);
return Ok(ExitCode::UnexpectedFailure);
}
}
}

Ok(ExitCode::Success)
}

/// Dump request to stdout
/// Only print source in verbose mode. This way the normal link output
/// can be fed into another tool without data mangling.
fn write(handle: &mut StdoutLock<'_>, request: &Request, verbose: bool) -> io::Result<()> {
let output = if verbose {
request.to_string()
} else {
request.uri.to_string()
};
writeln!(*handle, "{}", output)
}
5 changes: 5 additions & 0 deletions lychee-bin/src/commands/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pub(crate) mod check;
pub(crate) mod dump;

pub(crate) use check::check;
pub(crate) use dump::dump;
Loading

0 comments on commit 3d51356

Please sign in to comment.