Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve concurrency with streams #330

Merged
merged 43 commits into from
Dec 1, 2021
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
613bf5f
Move to from vec to streams
mre Sep 13, 2021
aee5c9f
Merge branch 'master' of github.com:lycheeverse/lychee into stream
mre Sep 13, 2021
0c05acb
Fix formatting and lints
mre Sep 14, 2021
eecd8e2
Merge branch 'master' of github.com:lycheeverse/lychee into stream
mre Sep 20, 2021
18448ce
Merge remote-tracking branch 'upstream/master' into stream
Sep 26, 2021
a05400b
Return collected links as Stream
Sep 26, 2021
d4b9bad
Initialize ProgressBar without length
Oct 6, 2021
98cdfba
Handle stream results in main thread, not in task
Oct 6, 2021
23df173
Merge branch 'master' of github.com:lycheeverse/lychee into stream
mre Oct 7, 2021
1471725
Cleanup
mre Oct 7, 2021
d111c0e
Merge branch 'master' of github.com:lycheeverse/lychee into stream
mre Oct 10, 2021
dfd0735
Add basic directory support
mre Oct 10, 2021
9ef2b7d
Merge branch 'master' of github.com:lycheeverse/lychee into stream
mre Oct 10, 2021
5f790bf
Fix deadlock
Oct 9, 2021
d42bf3e
Clippy
mre Oct 10, 2021
5bea0c8
Add test for http protocol file type
mre Oct 10, 2021
8ea4de6
Remove deadpool (once again)
mre Oct 10, 2021
f33468e
Refactor main; fix tests
mre Oct 10, 2021
1bfeb0e
Move commands into separate submodule
mre Oct 10, 2021
fb2dde2
Reintegrate changes from master
mre Nov 24, 2021
fec6f8f
Simplify input handling
mre Nov 26, 2021
a69ea63
Simplify collector
mre Nov 26, 2021
c83429c
Remove unnecessary unwrap
mre Nov 26, 2021
b231175
Simplify main
mre Nov 26, 2021
1d80866
cleanup check
mre Nov 26, 2021
ee4dd9c
clean up dump command
mre Nov 26, 2021
52e52bf
Move to String, which is Send
mre Nov 28, 2021
89d7566
Revert "Move to String, which is Send"
mre Nov 28, 2021
faf40a8
Revert "Revert "Move to String, which is Send""
mre Nov 28, 2021
63a8370
Parallel stream awesomeness
mre Nov 28, 2021
562f112
cleanup
mre Nov 28, 2021
e57bd6c
Add back Result
mre Nov 30, 2021
954ba0d
fmt
mre Nov 30, 2021
7945421
Fix wording in test
mre Nov 30, 2021
34858f8
Adjust test to new style
mre Nov 30, 2021
e20228c
fmt
mre Nov 30, 2021
ea371dd
clippy
mre Nov 30, 2021
d7f347a
wording
mre Nov 30, 2021
e1930cf
fmt
mre Nov 30, 2021
1f36ab1
clippy
mre Nov 30, 2021
1acdd24
map_err -> context
mre Nov 30, 2021
ef7d760
Replace FuturesUnordered with ParThenUnordered
mre Nov 30, 2021
6c3be1c
Merge branch 'master' into stream
mre Dec 1, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 105 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions examples/client_pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ path = "client_pool.rs"
[dependencies]
lychee-lib = { path = "../../lychee-lib", version = "0.7.2" }
tokio = { version = "1.12.0", features = ["full"] }
futures = "0.3.17"
tokio-stream = "0.1.7"
25 changes: 15 additions & 10 deletions examples/client_pool/client_pool.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use lychee_lib::{ClientBuilder, ClientPool, Input, Request, Result, Uri};
use lychee_lib::{ClientBuilder, Input, Request, Result, Uri};
use std::convert::TryFrom;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;

const CONCURRENT_REQUESTS: usize = 4;

#[tokio::main]
#[allow(clippy::trivial_regex)]
async fn main() -> Result<()> {
// These channels are used to send requests and receive responses to and
// from the lychee client pool
// from lychee
let (send_req, recv_req) = mpsc::channel(CONCURRENT_REQUESTS);
let (send_resp, mut recv_resp) = mpsc::channel(CONCURRENT_REQUESTS);

Expand All @@ -18,7 +18,7 @@ async fn main() -> Result<()> {
Input::Stdin,
)];

// Send requests to pool
// Queue requests
tokio::spawn(async move {
for request in requests {
println!("Sending {}", request);
Expand All @@ -29,13 +29,18 @@ async fn main() -> Result<()> {
// Create a default lychee client
let client = ClientBuilder::default().client()?;

// Create a pool with four lychee clients
let clients = vec![client; CONCURRENT_REQUESTS];
let mut clients = ClientPool::new(send_resp, recv_req, clients);

// Handle requests in a client pool
// Start receiving requests
// Requests get streamed into the client and run concurrently
tokio::spawn(async move {
clients.listen().await;
futures::StreamExt::for_each_concurrent(
ReceiverStream::new(recv_req),
CONCURRENT_REQUESTS,
|req| async {
let resp = client.check(req).await.unwrap();
send_resp.send(resp).await.unwrap();
},
)
.await;
});

// Finally, listen to incoming responses from lychee
Expand Down
3 changes: 2 additions & 1 deletion examples/collect_links/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ lychee-lib = { path = "../../lychee-lib", version = "0.7.2" }
tokio = { version = "1.12.0", features = ["full"] }
regex = "1.4.6"
http = "0.2.5"
reqwest = { version = "0.11.5", features = ["gzip"] }
reqwest = { version = "0.11.5", features = ["gzip"] }
tokio-stream = "0.1.7"
3 changes: 3 additions & 0 deletions examples/collect_links/collect_links.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use lychee_lib::{Collector, Input, Result};
use reqwest::Url;
use std::path::PathBuf;
use tokio_stream::StreamExt;

#[tokio::main]
#[allow(clippy::trivial_regex)]
Expand All @@ -21,6 +22,8 @@ async fn main() -> Result<()> {
.collect_links(
inputs, // base url or directory
)
.await
.collect::<Result<Vec<_>>>()
.await?;

dbg!(links);
Expand Down
2 changes: 2 additions & 0 deletions lychee-bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ serde_json = "1.0.68"
structopt = "0.3.21"
tokio = { version = "1.12.0", features = ["full"] }
toml = "0.5.8"
futures = "0.3.17"
tokio-stream = "0.1.7"
once_cell = "1.8.0"

[dev-dependencies]
Expand Down
52 changes: 52 additions & 0 deletions lychee-bin/src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use crate::options::Config;
use crate::parse::{parse_basic_auth, parse_headers, parse_statuscodes, parse_timeout};
use anyhow::{anyhow, Result};
use headers::HeaderMapExt;
use lychee_lib::{Client, ClientBuilder};
use regex::RegexSet;
use std::iter::FromIterator;
use std::{collections::HashSet, str::FromStr};

/// Creates a client according to the command-line config
pub(crate) fn create(cfg: &Config) -> Result<Client> {
let mut headers = parse_headers(&cfg.headers)?;
if let Some(auth) = &cfg.basic_auth {
let auth_header = parse_basic_auth(auth)?;
headers.typed_insert(auth_header);
}

let accepted = cfg.accept.clone().and_then(|a| parse_statuscodes(&a).ok());
let timeout = parse_timeout(cfg.timeout);
let method: reqwest::Method = reqwest::Method::from_str(&cfg.method.to_uppercase())?;
let include = RegexSet::new(&cfg.include)?;
let exclude = RegexSet::new(&cfg.exclude)?;

// Offline mode overrides the scheme
let schemes = if cfg.offline {
vec!["file".to_string()]
} else {
cfg.scheme.clone()
};

ClientBuilder::builder()
.includes(include)
.excludes(exclude)
.exclude_all_private(cfg.exclude_all_private)
.exclude_private_ips(cfg.exclude_private)
.exclude_link_local_ips(cfg.exclude_link_local)
.exclude_loopback_ips(cfg.exclude_loopback)
.exclude_mail(cfg.exclude_mail)
.max_redirects(cfg.max_redirects)
.user_agent(cfg.user_agent.clone())
.allow_insecure(cfg.insecure)
.custom_headers(headers)
.method(method)
.timeout(timeout)
.github_token(cfg.github_token.clone())
.schemes(HashSet::from_iter(schemes))
.accepted(accepted)
.require_https(cfg.require_https)
.build()
.client()
.map_err(|e| anyhow!("Failed to create request client: {}", e))
}
Loading