Skip to content

Commit

Permalink
Handle stream results in main thread, not in task
Browse files Browse the repository at this point in the history
To stay as close to the pre-stream behaviour, we want to stop processing
as soon as an Err value appears in the stream. This is easiest when the
stream is consumed in the main thread.
Previously, the stream was consumed in a tokio task and the main thread
waited for responses.
Now, a tokio task waits for responses (and displays them/registers
response stats) and the main thread sends links to the ClientPool.
To ensure that the main thread waits for all responses to have arrived
before finishing the ProgressBar and printing the stats, it waits for
the show_results_task to finish.
  • Loading branch information
Timo Freiberg committed Oct 6, 2021
1 parent d4b9bad commit 98cdfba
Showing 1 changed file with 30 additions and 16 deletions.
46 changes: 30 additions & 16 deletions lychee-bin/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ use std::iter::FromIterator;
use std::{collections::HashSet, fs, str::FromStr, time::Duration};

use anyhow::{anyhow, Context, Result};
use futures::stream::TryStreamExt;
use futures::{pin_mut, stream::TryStreamExt};
use headers::{authorization::Basic, Authorization, HeaderMap, HeaderMapExt, HeaderName};
use http::StatusCode;
use indicatif::{ProgressBar, ProgressStyle};
Expand Down Expand Up @@ -211,12 +211,16 @@ async fn run(cfg: &Config, inputs: Vec<Input>) -> Result<i32> {
let links = Collector::new(cfg.base.clone(), cfg.skip_missing, max_concurrency)
.collect_links(&inputs)
.await
.map_err(|e| anyhow!(e))
.collect::<Result<Vec<_>>>()
.await?;
.map_err(|e| anyhow!(e));

if cfg.dump {
let exit_code = dump_links(links.iter().filter(|link| !client.filtered(&link.uri)));
let exit_code = dump_links(
links
.collect::<Result<Vec<_>>>()
.await?
.iter()
.filter(|link| !client.filtered(&link.uri)),
);
return Ok(exit_code as i32);
}

Expand All @@ -238,14 +242,6 @@ async fn run(cfg: &Config, inputs: Vec<Input>) -> Result<i32> {
let mut stats = ResponseStats::new();

let bar = pb.clone();
tokio::spawn(async move {
for link in links {
if let Some(pb) = &bar {
pb.set_message(&link.to_string());
};
send_req.send(link).await.unwrap();
}
});

// Start receiving requests
tokio::spawn(async move {
Expand All @@ -254,11 +250,29 @@ async fn run(cfg: &Config, inputs: Vec<Input>) -> Result<i32> {
clients.listen().await;
});

while let Some(response) = recv_resp.recv().await {
show_progress(&pb, &response, cfg.verbose);
stats.add(response);
let show_results_task = tokio::spawn({
let verbose = cfg.verbose;
async move {
while let Some(response) = recv_resp.recv().await {
show_progress(&pb, &response, verbose);
stats.add(response);
}
(pb, stats)
}
});

pin_mut!(links);
while let Some(link) = links.next().await {
let link = link?;
if let Some(pb) = &bar {
pb.inc_length(1);
pb.set_message(&link.to_string());
};
send_req.send(link).await.unwrap();
}

let (pb, stats) = show_results_task.await?;

// Note that print statements may interfere with the progress bar, so this
// must go before printing the stats
if let Some(pb) = &pb {
Expand Down

0 comments on commit 98cdfba

Please sign in to comment.