Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send extracted links from extractor pool via channel #193

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions src/bin/lychee/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ async fn run(cfg: &Config, inputs: Vec<Input>) -> Result<i32> {
let accepted = cfg.accept.clone().and_then(|a| parse_statuscodes(&a).ok());
let timeout = parse_timeout(cfg.timeout);
let max_concurrency = cfg.max_concurrency;
let method: reqwest::Method = reqwest::Method::from_str(&cfg.method.to_uppercase())?;
let method: http::Method = http::Method::from_str(&cfg.method.to_uppercase())?;
let include = RegexSet::new(&cfg.include)?;
let exclude = RegexSet::new(&cfg.exclude)?;

Expand All @@ -118,21 +118,22 @@ async fn run(cfg: &Config, inputs: Vec<Input>) -> Result<i32> {
.accepted(accepted)
.build()?;

let links = collector::collect_links(
&inputs,
let mut links = collector::collect_links(
inputs,
cfg.base_url.clone(),
cfg.skip_missing,
max_concurrency,
cfg.max_concurrency,
)
.await?;

let pb = match cfg.no_progress {
true => None,
false => {
let bar = ProgressBar::new(links.len() as u64)
.with_style(ProgressStyle::default_bar().template(
let bar = ProgressBar::new_spinner().with_style(ProgressStyle::default_bar().template(
"{spinner:.red.bright} {pos}/{len:.dim} [{elapsed_precise}] {bar:25} {wide_msg}",
));
bar.set_length(0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noticed that the progress bar is a bit "bumpy" in the beginning. (There's a better word for what I mean, but I can't seem to find it, heh.)
Keeping the initial length of links.len() is not possible, of course because at this point we don't have the length (since links is a channel now, not a HashSet), but I wonder if there is another way to avoid that "bumpy" behavior. Maybe we can get the initial length from the channel somehow to give a better estimate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you talking about the initial period where no links were extracted yet? Or that, when links come streaming in, the progress bar can jump around?
I don't see a way to get the amount of current messages already waiting in the stream, did you mean using the capacity of the stream as the initial value?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I meant the jumping when links start streaming in. Thought there was a length method for mpsc, but it doesn't exist. Probably mixed it up with another channel impl. Anyway, if we can't find a solution for the jumpiness, it's fine, too. 🤷‍♀️

Copy link
Member

@MichaIng MichaIng Sep 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The downside of concurrency 😄. I think its fine when the bar represents the n/m checked/gathered URLs in the front, else it may be even more confusing. Only other idea would be to not show any bar until all inputs have been processed, but this example (video in OP) shows that sometimes the checker is waiting for the extractor, so no process bar at all then 😢.

Should be fine as it is.

bar.set_message("Extracting links");
bar.enable_steady_tick(100);
Some(bar)
}
Expand All @@ -145,8 +146,9 @@ async fn run(cfg: &Config, inputs: Vec<Input>) -> Result<i32> {

let bar = pb.clone();
tokio::spawn(async move {
for link in links {
while let Some(link) = links.recv().await {
if let Some(pb) = &bar {
pb.inc_length(1);
pb.set_message(&link.to_string());
};
send_req.send(link).await.unwrap();
Expand Down Expand Up @@ -219,7 +221,7 @@ fn parse_headers<T: AsRef<str>>(headers: &[T]) -> Result<HeaderMap> {
fn parse_statuscodes<T: AsRef<str>>(accept: T) -> Result<HashSet<http::StatusCode>> {
let mut statuscodes = HashSet::new();
for code in accept.as_ref().split(',').into_iter() {
let code: reqwest::StatusCode = reqwest::StatusCode::from_bytes(code.as_bytes())?;
let code: http::StatusCode = http::StatusCode::from_bytes(code.as_bytes())?;
TimoFreiberg marked this conversation as resolved.
Show resolved Hide resolved
statuscodes.insert(code);
}
Ok(statuscodes)
Expand Down
2 changes: 1 addition & 1 deletion src/client_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{client, types};
pub struct ClientPool {
tx: mpsc::Sender<types::Response>,
rx: mpsc::Receiver<types::Request>,
pool: deadpool::unmanaged::Pool<client::Client>,
pool: deadpool::unmanaged::Pool<Client>,
}

impl ClientPool {
Expand Down
52 changes: 33 additions & 19 deletions src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,11 +197,11 @@ impl Input {
/// Fetch all unique links from a slice of inputs
/// All relative URLs get prefixed with `base_url` if given.
pub async fn collect_links(
inputs: &[Input],
inputs: Vec<Input>,
base_url: Option<String>,
skip_missing_inputs: bool,
max_concurrency: usize,
) -> Result<HashSet<Request>> {
) -> Result<tokio::sync::mpsc::Receiver<Request>> {
let base_url = match base_url {
Some(url) => Some(Url::parse(&url)?),
_ => None,
Expand All @@ -210,7 +210,7 @@ pub async fn collect_links(
let (contents_tx, mut contents_rx) = tokio::sync::mpsc::channel(max_concurrency);

// extract input contents
for input in inputs.iter().cloned() {
for input in inputs {
let sender = contents_tx.clone();

tokio::spawn(async move {
Expand All @@ -234,18 +234,32 @@ pub async fn collect_links(
}
}

// Note: we could dispatch links to be checked as soon as we get them,
// instead of building a HashSet with all links.
// This optimization would speed up cases where there's
// a lot of inputs and/or the inputs are large (e.g. big files).
let mut collected_links: HashSet<Request> = HashSet::new();

for handle in extract_links_handles {
let links = handle.await?;
collected_links.extend(links);
}
let (links_tx, links_rx) = tokio::sync::mpsc::channel(max_concurrency);
tokio::spawn(async move {
let mut collected_links = HashSet::new();

for handle in extract_links_handles {
// Unwrap should be fine because joining fails:
// * if the Task was dropped (which we don't do)
// * if the Task panicked. Propagating panics is correct here.
let requests = handle
.await
.expect("Awaiting termination of link handle failed");
for request in requests {
if !collected_links.contains(&request) {
collected_links.insert(request.clone());
// Unwrap should be fine because sending fails
// if the receiver was closed - in which case we can't continue anyway
links_tx
.send(request)
.await
.expect("Extractor could not send link to channel");
}
}
}
});

Ok(collected_links)
Ok(links_rx)
}

#[cfg(test)]
Expand Down Expand Up @@ -292,11 +306,11 @@ mod test {
},
];

let responses = collect_links(&inputs, None, false, 8).await?;
let links = responses
.into_iter()
.map(|r| r.uri)
.collect::<HashSet<Uri>>();
let mut responses = collect_links(inputs, None, false, 8).await?;
let mut links = HashSet::new();
while let Some(request) = responses.recv().await {
links.insert(request.uri);
}

let mut expected_links: HashSet<Uri> = HashSet::new();
expected_links.insert(website(TEST_STRING));
Expand Down