From 3d5135668b35f2c22f51b6a638bd8e42b02ca322 Mon Sep 17 00:00:00 2001 From: Matthias Date: Wed, 1 Dec 2021 18:25:11 +0100 Subject: [PATCH] Improve concurrency with streams (#330) * Move to from vec to streams Previously we collected all inputs in one vector before checking the links, which is not ideal. Especially when reading many inputs (e.g. by using a glob pattern), this could cause issues like running out of file handles. By moving to streams we avoid that scenario. This is also the first step towards improving performance for many inputs. To stay as close to the pre-stream behaviour, we want to stop processing as soon as an Err value appears in the stream. This is easiest when the stream is consumed in the main thread. Previously, the stream was consumed in a tokio task and the main thread waited for responses. Now, a tokio task waits for responses (and displays them/registers response stats) and the main thread sends links to the ClientPool. To ensure that the main thread waits for all responses to have arrived before finishing the ProgressBar and printing the stats, it waits for the show_results_task to finish. * Return collected links as Stream * Initialize ProgressBar without length because we can't know the amount of links without blocking * Handle stream results in main thread, not in task * Add basic directory support using jwalk * Add test for HTTP protocol file type (http://) * Remove deadpool (once again): Replaced with `futures::StreamExt::for_each_concurrent`. * Refactor main; fix tests * Move commands into separate submodule * Simplify input handling * Simplify collector * Remove unnecessary unwrap * Simplify main * cleanup check * clean up dump command * Handle requests in parallel * Fix formatting and lints Co-authored-by: Timo Freiberg --- Cargo.lock | 445 +++++++++++++++++++----- examples/client_pool/Cargo.toml | 2 + examples/client_pool/client_pool.rs | 25 +- examples/collect_links/Cargo.toml | 1 + examples/collect_links/collect_links.rs | 5 +- lychee-bin/Cargo.toml | 2 + lychee-bin/src/client.rs | 52 +++ lychee-bin/src/commands/check.rs | 116 ++++++ lychee-bin/src/commands/dump.rs | 52 +++ lychee-bin/src/commands/mod.rs | 5 + lychee-bin/src/main.rs | 204 ++--------- lychee-bin/src/parse.rs | 5 +- lychee-bin/tests/local_files.rs | 28 ++ lychee-lib/Cargo.toml | 8 +- lychee-lib/src/client_pool.rs | 49 --- lychee-lib/src/collector.rs | 96 +++-- lychee-lib/src/extract.rs | 92 ++--- lychee-lib/src/helpers/path.rs | 3 +- lychee-lib/src/lib.rs | 4 +- lychee-lib/src/types/error.rs | 11 +- lychee-lib/src/types/file.rs | 15 +- lychee-lib/src/types/input.rs | 152 ++++++-- 22 files changed, 914 insertions(+), 458 deletions(-) create mode 100644 lychee-bin/src/client.rs create mode 100644 lychee-bin/src/commands/check.rs create mode 100644 lychee-bin/src/commands/dump.rs create mode 100644 lychee-bin/src/commands/mod.rs delete mode 100644 lychee-lib/src/client_pool.rs diff --git a/Cargo.lock b/Cargo.lock index 41ff10a8c5..58d0490428 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,6 +8,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "ahash" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8fd72866655d1904d6b0997d0b07ba561047d070fbe29de039031c641b61217" + [[package]] name = "aho-corasick" version = "0.7.18" @@ -178,7 +184,7 @@ checksum = "b21b63ab5a0db0369deb913540af2892750e42d949faacc7a61495ac418a1692" dependencies = [ "async-io", "blocking", - "cfg-if", + "cfg-if 1.0.0", "event-listener", "futures-lite", "libc", @@ -232,7 +238,7 @@ dependencies = [ "async-io", "async-lock", "async-process", - "crossbeam-utils", + "crossbeam-utils 0.8.5", "futures-channel", "futures-core", "futures-io", @@ -263,6 +269,27 @@ dependencies = [ "trust-dns-resolver", ] +[[package]] +name = "async-stream" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "171374e7e3b2504e0e5236e3b59260560f9fe94bfe9ac39ba5e4e929c5590625" +dependencies = [ + "async-stream-impl", + "futures-core", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "648ed8c8d2ce5409ccd57453d9d1b214b342a0d69376a6feda1fd6cae3299308" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-task" version = "4.0.3" @@ -378,6 +405,12 @@ version = "3.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f1e260c3a9040a7c19a12468758f4c16f31a81a1fe087482be9570ec864bb6c" +[[package]] +name = "by_address" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e245704f60eb4eb45810d65cf14eb54d2eb50a6f3715fe2d7cd01ee905c2944f" + [[package]] name = "bytes" version = "1.1.0" @@ -430,6 +463,12 @@ version = "1.0.71" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79c2681d6594606957bbb8631c4b90a7fcaaa72cdb714743a437b156d6a7eedd" +[[package]] +name = "cfg-if" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" + [[package]] name = "cfg-if" version = "1.0.0" @@ -490,8 +529,19 @@ dependencies = [ name = "client_pool" version = "0.1.0" dependencies = [ + "futures", "lychee-lib", "tokio", + "tokio-stream", +] + +[[package]] +name = "cloudabi" +version = "0.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f" +dependencies = [ + "bitflags", ] [[package]] @@ -503,6 +553,7 @@ dependencies = [ "regex", "reqwest", "tokio", + "tokio-stream", ] [[package]] @@ -571,7 +622,70 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81156fece84ab6a9f2afdb109ce3ae577e42b1228441eded99bd77f627953b1a" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", +] + +[[package]] +name = "crossbeam" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ae5588f6b3c3cb05239e90bd110f257254aecd01e4635400391aeae07497845" +dependencies = [ + "cfg-if 1.0.0", + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch 0.9.5", + "crossbeam-queue", + "crossbeam-utils 0.8.5", +] + +[[package]] +name = "crossbeam-channel" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4" +dependencies = [ + "cfg-if 1.0.0", + "crossbeam-utils 0.8.5", +] + +[[package]] +name = "crossbeam-deque" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6455c0ca19f0d2fbf751b908d5c55c1f5cbc65e03c4225427254b46890bdde1e" +dependencies = [ + "cfg-if 1.0.0", + "crossbeam-epoch 0.9.5", + "crossbeam-utils 0.8.5", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace" +dependencies = [ + "autocfg", + "cfg-if 0.1.10", + "crossbeam-utils 0.7.2", + "lazy_static", + "maybe-uninit", + "memoffset 0.5.6", + "scopeguard", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ec02e091aa634e2c3ada4a392989e7c3116673ef0ac5b72232439094d73b7fd" +dependencies = [ + "cfg-if 1.0.0", + "crossbeam-utils 0.8.5", + "lazy_static", + "memoffset 0.6.4", + "scopeguard", ] [[package]] @@ -580,8 +694,19 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b10ddc024425c88c2ad148c1b0fd53f4c6d38db9697c9f1588381212fa657c9" dependencies = [ - "cfg-if", - "crossbeam-utils", + "cfg-if 1.0.0", + "crossbeam-utils 0.8.5", +] + +[[package]] +name = "crossbeam-utils" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8" +dependencies = [ + "autocfg", + "cfg-if 0.1.10", + "lazy_static", ] [[package]] @@ -590,7 +715,7 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "lazy_static", ] @@ -660,23 +785,16 @@ dependencies = [ ] [[package]] -name = "deadpool" -version = "0.9.2" +name = "derivative" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3bf0c5365c0925c80a838a6810a1bf38d3304ca6b4eb25829e29e33da12de786" +checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b" dependencies = [ - "async-trait", - "deadpool-runtime", - "num_cpus", - "tokio", + "proc-macro2", + "quote", + "syn", ] -[[package]] -name = "deadpool-runtime" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eaa37046cc0f6c3cc6090fbdbf73ef0b8ef4cfcc37f6befc0020f63e8cf121e1" - [[package]] name = "diff" version = "0.1.12" @@ -704,7 +822,7 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b98cf8ebf19c3d1b223e151f99a4f9f0690dca41414773390fc824184ac833e1" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "dirs-sys-next", ] @@ -743,7 +861,7 @@ version = "0.8.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a74ea89a0a1b98f6332de42c95baff457ada66d1cb4030f9ff151b2041a1c746" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", ] [[package]] @@ -801,7 +919,7 @@ version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e6988e897c1c9c485f43b47a529cef42fde0547f9d8d41a7062518f1d8fc53f" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "crc32fast", "libc", "miniz_oxide", @@ -816,6 +934,31 @@ dependencies = [ "num-traits", ] +[[package]] +name = "flume" +version = "0.10.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24c3fd473b3a903a62609e413ed7538f99e10b665ecb502b5e481a95283f8ab4" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "pin-project", + "spin 0.9.2", +] + +[[package]] +name = "flurry" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c0a35f7b50e99185a2825541946252f669f3c3ca77801357cd682a1b356bb3e" +dependencies = [ + "ahash", + "crossbeam-epoch 0.8.2", + "num_cpus", + "parking_lot 0.10.2", +] + [[package]] name = "fnv" version = "1.0.7" @@ -859,9 +1002,9 @@ dependencies = [ [[package]] name = "futures" -version = "0.3.17" +version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a12aa0eb539080d55c3f2d45a67c3b58b6b0773c1a3ca2dfec66d58c97fd66ca" +checksum = "8cd0210d8c325c245ff06fd95a3b13689a1a276ac8cfa8e8720cb840bfb84b9e" dependencies = [ "futures-channel", "futures-core", @@ -874,9 +1017,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.17" +version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5da6ba8c3bb3c165d3c7319fc1cc8304facf1fb8db99c5de877183c08a273888" +checksum = "7fc8cd39e3dbf865f7340dce6a2d401d24fd37c6fe6c4f0ee0de8bfca2252d27" dependencies = [ "futures-core", "futures-sink", @@ -884,15 +1027,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.17" +version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88d1c26957f23603395cd326b0ffe64124b818f4449552f960d815cfba83a53d" +checksum = "629316e42fe7c2a0b9a65b47d159ceaa5453ab14e8f0a3c5eedbb8cd55b4a445" [[package]] name = "futures-executor" -version = "0.3.17" +version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45025be030969d763025784f7f355043dc6bc74093e4ecc5000ca4dc50d8745c" +checksum = "7b808bf53348a36cab739d7e04755909b9fcaaa69b7d7e588b37b6ec62704c97" dependencies = [ "futures-core", "futures-task", @@ -901,9 +1044,9 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.17" +version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "522de2a0fe3e380f1bc577ba0474108faf3f6b18321dbf60b3b9c39a75073377" +checksum = "e481354db6b5c353246ccf6a728b0c5511d752c08da7260546fc0933869daa11" [[package]] name = "futures-lite" @@ -922,12 +1065,10 @@ dependencies = [ [[package]] name = "futures-macro" -version = "0.3.17" +version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18e4a4b95cea4b4ccbcf1c5675ca7c4ee4e9e75eb79944d07defde18068f79bb" +checksum = "a89f17b21645bc4ed773c69af9c9a0effd4a3f1a3876eadd453469f8854e7fdd" dependencies = [ - "autocfg", - "proc-macro-hack", "proc-macro2", "quote", "syn", @@ -935,15 +1076,15 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.17" +version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36ea153c13024fe480590b3e3d4cad89a0cfacecc24577b68f86c6ced9c2bc11" +checksum = "996c6442437b62d21a32cd9906f9c41e7dc1e19a9579843fad948696769305af" [[package]] name = "futures-task" -version = "0.3.17" +version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d3d00f4eddb73e498a54394f228cd55853bdf059259e8e7bc6e69d408892e99" +checksum = "dabf1872aaab32c886832f2276d2f5399887e2bd613698a02359e4ea83f8de12" [[package]] name = "futures-timer" @@ -953,11 +1094,10 @@ checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" [[package]] name = "futures-util" -version = "0.3.17" +version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36568465210a3a6ee45e1f165136d68671471a501e632e9a98d96872222b5481" +checksum = "41d22213122356472061ac0f1ab2cee28d2bac8491410fd68c2af53d1cedb83e" dependencies = [ - "autocfg", "futures-channel", "futures-core", "futures-io", @@ -967,8 +1107,6 @@ dependencies = [ "memchr", "pin-project-lite", "pin-utils", - "proc-macro-hack", - "proc-macro-nested", "slab", ] @@ -997,7 +1135,7 @@ version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fc3cb4d91f53b50155bdcfd23f6a4c39ae1969c2ae85982b135750cccaf5fce" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "libc", "wasi 0.9.0+wasi-snapshot-preview1", ] @@ -1008,9 +1146,11 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", + "js-sys", "libc", "wasi 0.10.2+wasi-snapshot-preview1", + "wasm-bindgen", ] [[package]] @@ -1315,7 +1455,7 @@ version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", ] [[package]] @@ -1374,6 +1514,16 @@ dependencies = [ "simple_asn1", ] +[[package]] +name = "jwalk" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "172752e853a067cbce46427de8470ddf308af7fd8ceaf9b682ef31a5021b6bb9" +dependencies = [ + "crossbeam", + "rayon", +] + [[package]] name = "kv-log-macro" version = "1.0.7" @@ -1403,7 +1553,7 @@ checksum = "6607c62aa161d23d17a9072cc5da0be67cdfc89d3afb1e8d9c842bebc2525ffe" dependencies = [ "arrayvec", "bitflags", - "cfg-if", + "cfg-if 1.0.0", "ryu", "static_assertions", ] @@ -1429,6 +1579,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "lock_api" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4da24a77a3d8a6d4862d95f72e6fdb9c09a643ecdb402d754004a557f2bec75" +dependencies = [ + "scopeguard", +] + [[package]] name = "lock_api" version = "0.4.5" @@ -1444,7 +1603,7 @@ version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "value-bag", ] @@ -1464,6 +1623,7 @@ dependencies = [ "anyhow", "assert_cmd", "console", + "futures", "headers", "http", "indicatif", @@ -1483,6 +1643,7 @@ dependencies = [ "tabled", "tempfile", "tokio", + "tokio-stream", "toml", "tracing-subscriber", "uuid", @@ -1493,20 +1654,23 @@ dependencies = [ name = "lychee-lib" version = "0.8.1" dependencies = [ + "async-stream", "cached", "check-if-email-exists", - "deadpool 0.9.2", "doc-comment", "fast_chemail", + "futures", "glob", "html5ever", "http", "hubcaps", + "jwalk", "linkify", "log", "markup5ever_rcdom", "once_cell", "openssl-sys", + "par-stream", "path-clean", "percent-encoding", "pretty_assertions", @@ -1586,12 +1750,36 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f" +[[package]] +name = "maybe-uninit" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00" + [[package]] name = "memchr" version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" +[[package]] +name = "memoffset" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "043175f069eda7b85febe4a74abbaeff828d9f8b448515d3151a14a3542811aa" +dependencies = [ + "autocfg", +] + +[[package]] +name = "memoffset" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59accc507f1338036a0477ef61afdae33cde60840f4dfe481319ce3ad116ddf9" +dependencies = [ + "autocfg", +] + [[package]] name = "mime" version = "0.3.16" @@ -1630,6 +1818,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "nanorand" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "729eb334247daa1803e0a094d0a5c55711b85571179f5ec6e53eccfdf7008958" +dependencies = [ + "getrandom 0.2.3", +] + [[package]] name = "native-tls" version = "0.2.8" @@ -1745,7 +1942,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c7ae222234c30df141154f159066c5093ff73b63204dcda7121eb082fc56a95" dependencies = [ "bitflags", - "cfg-if", + "cfg-if 1.0.0", "foreign-types", "libc", "once_cell", @@ -1808,12 +2005,39 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "par-stream" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f9c49089732ef484eee337df6312564f73bbce4e2d5fde5a823730f86038390" +dependencies = [ + "by_address", + "derivative", + "flume", + "flurry", + "futures", + "num_cpus", + "pin-project", + "tokio", + "tokio-stream", +] + [[package]] name = "parking" version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" +[[package]] +name = "parking_lot" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3a704eb390aafdc107b0e392f56a82b668e3a71366993b5340f5833fd62505e" +dependencies = [ + "lock_api 0.3.4", + "parking_lot_core 0.7.2", +] + [[package]] name = "parking_lot" version = "0.11.2" @@ -1821,8 +2045,22 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" dependencies = [ "instant", - "lock_api", - "parking_lot_core", + "lock_api 0.4.5", + "parking_lot_core 0.8.5", +] + +[[package]] +name = "parking_lot_core" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d58c7c768d4ba344e3e8d72518ac13e259d7c7ade24167003b8488e10b6740a3" +dependencies = [ + "cfg-if 0.1.10", + "cloudabi", + "libc", + "redox_syscall 0.1.57", + "smallvec", + "winapi", ] [[package]] @@ -1831,10 +2069,10 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "instant", "libc", - "redox_syscall", + "redox_syscall 0.2.10", "smallvec", "winapi", ] @@ -1944,7 +2182,7 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92341d779fa34ea8437ef4d82d440d5e1ce3f3ff7f824aa64424cd481f9a1f25" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "libc", "log", "wepoll-ffi", @@ -2029,18 +2267,6 @@ dependencies = [ "version_check", ] -[[package]] -name = "proc-macro-hack" -version = "0.5.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" - -[[package]] -name = "proc-macro-nested" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc881b2c22681370c6a780e47af9840ef841837bc98118431d4e1868bd0c1086" - [[package]] name = "proc-macro2" version = "1.0.32" @@ -2168,6 +2394,37 @@ dependencies = [ "rand_core 0.5.1", ] +[[package]] +name = "rayon" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c06aca804d41dbc8ba42dfd964f0d01334eceb64314b9ecf7c5fad5188a06d90" +dependencies = [ + "autocfg", + "crossbeam-deque", + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d78120e2c850279833f1dd3582f730c4ab53ed95aeaaaa862a2a5c71b1656d8e" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-utils 0.8.5", + "lazy_static", + "num_cpus", +] + +[[package]] +name = "redox_syscall" +version = "0.1.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce" + [[package]] name = "redox_syscall" version = "0.2.10" @@ -2184,7 +2441,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "528532f3d801c87aec9def2add9ca802fe569e44a544afe633765267840abe64" dependencies = [ "getrandom 0.2.3", - "redox_syscall", + "redox_syscall 0.2.10", ] [[package]] @@ -2279,7 +2536,7 @@ dependencies = [ "cc", "libc", "once_cell", - "spin", + "spin 0.5.2", "untrusted", "web-sys", "winapi", @@ -2391,7 +2648,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "99cd6713db3cf16b6c84e06321e049a9b9f699826e16096d23bbcc44d15d51a6" dependencies = [ "block-buffer", - "cfg-if", + "cfg-if 1.0.0", "cpufeatures", "digest", "opaque-debug", @@ -2477,7 +2734,7 @@ version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "122e570113d28d773067fab24266b66753f6ea915758651696b6e35e49f88d6e" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "libc", "winapi", ] @@ -2498,6 +2755,15 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "spin" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "511254be0c5bcf062b019a6c89c01a664aa359ded62f78aa72c6fc137c0590e5" +dependencies = [ + "lock_api 0.4.5", +] + [[package]] name = "static_assertions" version = "1.1.0" @@ -2512,7 +2778,7 @@ checksum = "923f0f39b6267d37d23ce71ae7235602134b250ace715dd2c90421998ddac0c6" dependencies = [ "lazy_static", "new_debug_unreachable", - "parking_lot", + "parking_lot 0.11.2", "phf_shared", "precomputed-hash", "serde", @@ -2604,10 +2870,10 @@ version = "3.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dac1c663cfc93810f88aed9b8941d48cabf856a1b111c29a40439018d870eb22" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "libc", "rand 0.8.4", - "redox_syscall", + "redox_syscall 0.2.10", "remove_dir_all", "winapi", ] @@ -2715,7 +2981,7 @@ dependencies = [ "mio", "num_cpus", "once_cell", - "parking_lot", + "parking_lot 0.11.2", "pin-project-lite", "signal-hook-registry", "tokio-macros", @@ -2755,6 +3021,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.6.9" @@ -2790,7 +3067,7 @@ version = "0.1.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "375a639232caf30edfc78e8d89b2d4c375515393e7af7e16f01cd96917fb2105" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "pin-project-lite", "tracing-core", ] @@ -2826,7 +3103,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad0d7f5db438199a6e2609debe3f69f808d074e0a2888ee0bccb45fe234d03f4" dependencies = [ "async-trait", - "cfg-if", + "cfg-if 1.0.0", "data-encoding", "enum-as-inner", "futures-channel", @@ -2850,13 +3127,13 @@ version = "0.20.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f6ad17b608a64bd0735e67bde16b0636f8aa8591f831a25d18443ed00a699770" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "futures-util", "ipconfig", "lazy_static", "log", "lru-cache", - "parking_lot", + "parking_lot 0.11.2", "resolv-conf", "smallvec", "thiserror", @@ -3033,7 +3310,7 @@ version = "0.2.78" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "632f73e236b219150ea279196e54e610f5dbafa5d61786303d4da54f84e47fce" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "wasm-bindgen-macro", ] @@ -3058,7 +3335,7 @@ version = "0.4.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e8d7523cb1f2a4c96c1317ca690031b714a51cc14e05f712446691f413f5d39" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "js-sys", "wasm-bindgen", "web-sys", @@ -3174,7 +3451,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4c3fe7c6af90383100cd1486ef0467c2ebead0303ed7aa3dc6e51173ee3ff8ba" dependencies = [ "async-trait", - "deadpool 0.7.0", + "deadpool", "futures", "futures-timer", "http-types", diff --git a/examples/client_pool/Cargo.toml b/examples/client_pool/Cargo.toml index 8643457053..a6a57688a2 100644 --- a/examples/client_pool/Cargo.toml +++ b/examples/client_pool/Cargo.toml @@ -8,5 +8,7 @@ name = "client_pool" path = "client_pool.rs" [dependencies] +futures = "0.3.17" +tokio-stream = "0.1.7" lychee-lib = { path = "../../lychee-lib", version = "0.8.1" } tokio = { version = "1.14.0", features = ["full"] } diff --git a/examples/client_pool/client_pool.rs b/examples/client_pool/client_pool.rs index ef4bc8d46b..a55e7cbb4f 100644 --- a/examples/client_pool/client_pool.rs +++ b/examples/client_pool/client_pool.rs @@ -1,14 +1,14 @@ -use lychee_lib::{ClientBuilder, ClientPool, Input, Request, Result, Uri}; +use lychee_lib::{ClientBuilder, Input, Request, Result, Uri}; use std::convert::TryFrom; use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; const CONCURRENT_REQUESTS: usize = 4; #[tokio::main] -#[allow(clippy::trivial_regex)] async fn main() -> Result<()> { // These channels are used to send requests and receive responses to and - // from the lychee client pool + // from lychee let (send_req, recv_req) = mpsc::channel(CONCURRENT_REQUESTS); let (send_resp, mut recv_resp) = mpsc::channel(CONCURRENT_REQUESTS); @@ -18,7 +18,7 @@ async fn main() -> Result<()> { Input::Stdin, )]; - // Send requests to pool + // Queue requests tokio::spawn(async move { for request in requests { println!("Sending {}", request); @@ -29,13 +29,18 @@ async fn main() -> Result<()> { // Create a default lychee client let client = ClientBuilder::default().client()?; - // Create a pool with four lychee clients - let clients = vec![client; CONCURRENT_REQUESTS]; - let mut clients = ClientPool::new(send_resp, recv_req, clients); - - // Handle requests in a client pool + // Start receiving requests + // Requests get streamed into the client and run concurrently tokio::spawn(async move { - clients.listen().await; + futures::StreamExt::for_each_concurrent( + ReceiverStream::new(recv_req), + CONCURRENT_REQUESTS, + |req| async { + let resp = client.check(req).await.unwrap(); + send_resp.send(resp).await.unwrap(); + }, + ) + .await; }); // Finally, listen to incoming responses from lychee diff --git a/examples/collect_links/Cargo.toml b/examples/collect_links/Cargo.toml index 9dd99c8787..12856fc9a4 100644 --- a/examples/collect_links/Cargo.toml +++ b/examples/collect_links/Cargo.toml @@ -12,4 +12,5 @@ lychee-lib = { path = "../../lychee-lib", version = "0.8.1" } tokio = { version = "1.14.0", features = ["full"] } regex = "1.4.6" http = "0.2.5" +tokio-stream = "0.1.7" reqwest = { version = "0.11.7", features = ["gzip"] } \ No newline at end of file diff --git a/examples/collect_links/collect_links.rs b/examples/collect_links/collect_links.rs index 60c37f9291..c225c3e46a 100644 --- a/examples/collect_links/collect_links.rs +++ b/examples/collect_links/collect_links.rs @@ -1,12 +1,13 @@ use lychee_lib::{Collector, Input, Result}; use reqwest::Url; use std::path::PathBuf; +use tokio_stream::StreamExt; #[tokio::main] #[allow(clippy::trivial_regex)] async fn main() -> Result<()> { // Collect all links from the following inputs - let inputs: &[Input] = &[ + let inputs = vec![ Input::RemoteUrl(Box::new( Url::parse("https://github.com/lycheeverse/lychee").unwrap(), )), @@ -21,6 +22,8 @@ async fn main() -> Result<()> { .collect_links( inputs, // base url or directory ) + .await + .collect::>>() .await?; dbg!(links); diff --git a/lychee-bin/Cargo.toml b/lychee-bin/Cargo.toml index ea422501f3..0f49ccabc6 100644 --- a/lychee-bin/Cargo.toml +++ b/lychee-bin/Cargo.toml @@ -39,6 +39,8 @@ structopt = "0.3.25" tabled = "0.3.0" tokio = { version = "1.14.0", features = ["full"] } toml = "0.5.8" +futures = "0.3.17" +tokio-stream = "0.1.7" once_cell = "1.8.0" [dev-dependencies] diff --git a/lychee-bin/src/client.rs b/lychee-bin/src/client.rs new file mode 100644 index 0000000000..c9ff4cbdfd --- /dev/null +++ b/lychee-bin/src/client.rs @@ -0,0 +1,52 @@ +use crate::options::Config; +use crate::parse::{parse_basic_auth, parse_headers, parse_statuscodes, parse_timeout}; +use anyhow::{Context, Result}; +use headers::HeaderMapExt; +use lychee_lib::{Client, ClientBuilder}; +use regex::RegexSet; +use std::iter::FromIterator; +use std::{collections::HashSet, str::FromStr}; + +/// Creates a client according to the command-line config +pub(crate) fn create(cfg: &Config) -> Result { + let mut headers = parse_headers(&cfg.headers)?; + if let Some(auth) = &cfg.basic_auth { + let auth_header = parse_basic_auth(auth)?; + headers.typed_insert(auth_header); + } + + let accepted = cfg.accept.clone().and_then(|a| parse_statuscodes(&a).ok()); + let timeout = parse_timeout(cfg.timeout); + let method: reqwest::Method = reqwest::Method::from_str(&cfg.method.to_uppercase())?; + let include = RegexSet::new(&cfg.include)?; + let exclude = RegexSet::new(&cfg.exclude)?; + + // Offline mode overrides the scheme + let schemes = if cfg.offline { + vec!["file".to_string()] + } else { + cfg.scheme.clone() + }; + + ClientBuilder::builder() + .includes(include) + .excludes(exclude) + .exclude_all_private(cfg.exclude_all_private) + .exclude_private_ips(cfg.exclude_private) + .exclude_link_local_ips(cfg.exclude_link_local) + .exclude_loopback_ips(cfg.exclude_loopback) + .exclude_mail(cfg.exclude_mail) + .max_redirects(cfg.max_redirects) + .user_agent(cfg.user_agent.clone()) + .allow_insecure(cfg.insecure) + .custom_headers(headers) + .method(method) + .timeout(timeout) + .github_token(cfg.github_token.clone()) + .schemes(HashSet::from_iter(schemes)) + .accepted(accepted) + .require_https(cfg.require_https) + .build() + .client() + .context("Failed to create request client") +} diff --git a/lychee-bin/src/commands/check.rs b/lychee-bin/src/commands/check.rs new file mode 100644 index 0000000000..014ddecfb1 --- /dev/null +++ b/lychee-bin/src/commands/check.rs @@ -0,0 +1,116 @@ +use indicatif::ProgressBar; +use indicatif::ProgressStyle; +use lychee_lib::Result; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tokio_stream::StreamExt; + +use crate::{ + options::Config, + stats::{color_response, ResponseStats}, + ExitCode, +}; +use lychee_lib::{Client, Request, Response}; + +pub(crate) async fn check( + client: Client, + requests: S, + cfg: &Config, +) -> Result<(ResponseStats, ExitCode)> +where + S: futures::Stream>, +{ + let (send_req, recv_req) = mpsc::channel(cfg.max_concurrency); + let (send_resp, mut recv_resp) = mpsc::channel(cfg.max_concurrency); + let max_concurrency = cfg.max_concurrency; + let mut stats = ResponseStats::new(); + + // Start receiving requests + tokio::spawn(async move { + futures::StreamExt::for_each_concurrent( + ReceiverStream::new(recv_req), + max_concurrency, + |request: Result| async { + let request: Request = request.expect("cannot read request"); + let response = client.check(request).await.expect("cannot check request"); + send_resp + .send(response) + .await + .expect("cannot send response to queue"); + }, + ) + .await; + }); + + let pb = if cfg.no_progress { + None + } else { + let bar = ProgressBar::new_spinner().with_style(ProgressStyle::default_bar().template( + "{spinner:.red.bright} {pos}/{len:.dim} [{elapsed_precise}] {bar:25} {wide_msg}", + )); + bar.set_length(0); + bar.set_message("Extracting links"); + bar.enable_steady_tick(100); + Some(bar) + }; + + let bar = pb.clone(); + let show_results_task = tokio::spawn({ + let verbose = cfg.verbose; + async move { + while let Some(response) = recv_resp.recv().await { + show_progress(&pb, &response, verbose); + stats.add(response); + } + (pb, stats) + } + }); + + tokio::pin!(requests); + + while let Some(request) = requests.next().await { + let request = request?; + if let Some(pb) = &bar { + pb.inc_length(1); + pb.set_message(&request.to_string()); + }; + send_req + .send(Ok(request)) + .await + .expect("Cannot send request"); + } + // required for the receiver task to end, which closes send_resp, which allows + // the show_results_task to finish + drop(send_req); + + let (pb, stats) = show_results_task.await?; + + // Note that print statements may interfere with the progress bar, so this + // must go before printing the stats + if let Some(pb) = &pb { + pb.finish_and_clear(); + } + + let code = if stats.is_success() { + ExitCode::Success + } else { + ExitCode::LinkCheckFailure + }; + Ok((stats, code)) +} + +fn show_progress(progress_bar: &Option, response: &Response, verbose: bool) { + let out = color_response(&response.1); + if let Some(pb) = progress_bar { + pb.inc(1); + pb.set_message(&out); + if verbose { + pb.println(out); + } + } else { + if (response.status().is_success() || response.status().is_excluded()) && !verbose { + return; + } + println!("{}", out); + } +} diff --git a/lychee-bin/src/commands/dump.rs b/lychee-bin/src/commands/dump.rs new file mode 100644 index 0000000000..da1b791d5b --- /dev/null +++ b/lychee-bin/src/commands/dump.rs @@ -0,0 +1,52 @@ +use std::io::{self, StdoutLock, Write}; + +use lychee_lib::Result; +use lychee_lib::{Client, Request}; +use tokio_stream::StreamExt; + +use crate::ExitCode; + +/// Dump all detected links to stdout without checking them +pub(crate) async fn dump<'a, S>(client: Client, requests: S, verbose: bool) -> Result +where + S: futures::Stream>, +{ + // Lock stdout for better performance + let stdout = io::stdout(); + let mut handle = stdout.lock(); + + tokio::pin!(requests); + + while let Some(request) = requests.next().await { + let request = request?; + + if client.filtered(&request.uri) { + continue; + } + + // Avoid panic on broken pipe. + // See https://github.com/rust-lang/rust/issues/46016 + // This can occur when piping the output of lychee + // to another program like `grep`. + if let Err(e) = write(&mut handle, &request, verbose) { + if e.kind() != io::ErrorKind::BrokenPipe { + eprintln!("{}", e); + return Ok(ExitCode::UnexpectedFailure); + } + } + } + + Ok(ExitCode::Success) +} + +/// Dump request to stdout +/// Only print source in verbose mode. This way the normal link output +/// can be fed into another tool without data mangling. +fn write(handle: &mut StdoutLock<'_>, request: &Request, verbose: bool) -> io::Result<()> { + let output = if verbose { + request.to_string() + } else { + request.uri.to_string() + }; + writeln!(*handle, "{}", output) +} diff --git a/lychee-bin/src/commands/mod.rs b/lychee-bin/src/commands/mod.rs new file mode 100644 index 0000000000..eccd0d97a6 --- /dev/null +++ b/lychee-bin/src/commands/mod.rs @@ -0,0 +1,5 @@ +pub(crate) mod check; +pub(crate) mod dump; + +pub(crate) use check::check; +pub(crate) use dump::dump; diff --git a/lychee-bin/src/main.rs b/lychee-bin/src/main.rs index af9778c924..66a18eef78 100644 --- a/lychee-bin/src/main.rs +++ b/lychee-bin/src/main.rs @@ -58,32 +58,25 @@ #![deny(anonymous_parameters, macro_use_extern_crate, pointer_structural_match)] #![deny(missing_docs)] +use lychee_lib::Collector; // required for apple silicon use ring as _; -use stats::color_response; -use std::fs::File; -use std::io::{self, BufRead, BufReader, Write}; -use std::iter::FromIterator; -use std::{collections::HashSet, fs, str::FromStr}; - -use anyhow::{anyhow, Context, Result}; -use headers::HeaderMapExt; -use indicatif::{ProgressBar, ProgressStyle}; -use lychee_lib::{ClientBuilder, ClientPool, Collector, Input, Request, Response}; +use anyhow::{Context, Result}; use openssl_sys as _; // required for vendored-openssl feature -use regex::RegexSet; -use ring as _; // required for apple silicon +use ring as _; +use std::fs::{self, File}; +use std::io::{BufRead, BufReader}; use structopt::StructOpt; -use tokio::sync::mpsc; +mod client; mod color; +mod commands; mod options; mod parse; mod stats; mod writer; -use crate::parse::{parse_basic_auth, parse_headers, parse_statuscodes, parse_timeout}; use crate::{ options::{Config, Format, LycheeOptions}, stats::ResponseStats, @@ -107,7 +100,7 @@ fn main() -> Result<()> { #[cfg(feature = "tokio-console")] console_subscriber::init(); // std::process::exit doesn't guarantee that all destructors will be ran, - // therefore we wrap "main" code in another function to guarantee that. + // therefore we wrap "main" code in another function to ensure that. // See: https://doc.rust-lang.org/stable/std/process/fn.exit.html // Also see: https://www.youtube.com/watch?v=zQC8T71Y8e4 let exit_code = run_main()?; @@ -120,7 +113,9 @@ fn read_lines(file: &File) -> Result> { Ok(lines.into_iter().filter(|line| !line.is_empty()).collect()) } -fn run_main() -> Result { +/// Merge all provided config options into one +/// This includes a potential config file, command-line- and environment variables +fn load_config() -> Result { let mut opts = LycheeOptions::from_args(); // Load a potentially existing config file and merge it into the config from the CLI @@ -138,9 +133,12 @@ fn run_main() -> Result { opts.config.exclude.append(&mut read_lines(&file)?); } - let cfg = &opts.config; + Ok(opts) +} - let runtime = match cfg.threads { +fn run_main() -> Result { + let opts = load_config()?; + let runtime = match opts.config.threads { Some(threads) => { // We define our own runtime instead of the `tokio::main` attribute // since we want to make the number of threads configurable @@ -152,125 +150,34 @@ fn run_main() -> Result { None => tokio::runtime::Runtime::new()?, }; - runtime.block_on(run(cfg, opts.inputs())) + runtime.block_on(run(&opts)) } -fn show_progress(progress_bar: &Option, response: &Response, verbose: bool) { - let out = color_response(&response.1); - if let Some(pb) = progress_bar { - pb.inc(1); - pb.set_message(&out); - if verbose { - pb.println(out); - } - } else { - if (response.status().is_success() || response.status().is_excluded()) && !verbose { - return; - } - println!("{}", out); - } -} +async fn run(opts: &LycheeOptions) -> Result { + let inputs = opts.inputs(); + let requests = Collector::new( + opts.config.base.clone(), + opts.config.skip_missing, + opts.config.max_concurrency, + ) + .collect_links(inputs) + .await; -async fn run(cfg: &Config, inputs: Vec) -> Result { - let mut headers = parse_headers(&cfg.headers)?; - if let Some(auth) = &cfg.basic_auth { - let auth_header = parse_basic_auth(auth)?; - headers.typed_insert(auth_header); - } + let client = client::create(&opts.config)?; - let accepted = cfg.accept.clone().and_then(|a| parse_statuscodes(&a).ok()); - let timeout = parse_timeout(cfg.timeout); - let max_concurrency = cfg.max_concurrency; - let method: reqwest::Method = reqwest::Method::from_str(&cfg.method.to_uppercase())?; - let include = RegexSet::new(&cfg.include)?; - let exclude = RegexSet::new(&cfg.exclude)?; - - // Offline mode overrides the scheme - let schemes = if cfg.offline { - vec!["file".to_string()] - } else { - cfg.scheme.clone() - }; - - let client = ClientBuilder::builder() - .includes(include) - .excludes(exclude) - .exclude_all_private(cfg.exclude_all_private) - .exclude_private_ips(cfg.exclude_private) - .exclude_link_local_ips(cfg.exclude_link_local) - .exclude_loopback_ips(cfg.exclude_loopback) - .exclude_mail(cfg.exclude_mail) - .max_redirects(cfg.max_redirects) - .user_agent(cfg.user_agent.clone()) - .allow_insecure(cfg.insecure) - .custom_headers(headers) - .method(method) - .timeout(timeout) - .github_token(cfg.github_token.clone()) - .schemes(HashSet::from_iter(schemes)) - .accepted(accepted) - .require_https(cfg.require_https) - .build() - .client() - .map_err(|e| anyhow!("Failed to create request client: {}", e))?; - - let links = Collector::new(cfg.base.clone(), cfg.skip_missing, max_concurrency) - .collect_links(&inputs) - .await - .map_err(|e| anyhow!(e))?; - - if cfg.dump { - let exit_code = dump_links( - links.iter().filter(|link| !client.filtered(&link.uri)), - cfg.verbose, - ); - return Ok(exit_code as i32); - } - - let pb = if cfg.no_progress { - None + let exit_code = if opts.config.dump { + commands::dump(client, requests, opts.config.verbose).await? } else { - let bar = - ProgressBar::new(links.len() as u64).with_style(ProgressStyle::default_bar().template( - "{spinner:.red.bright} {pos}/{len:.dim} [{elapsed_precise}] {bar:25} {wide_msg}", - )); - bar.enable_steady_tick(100); - Some(bar) + let (stats, code) = commands::check(client, requests, &opts.config).await?; + write_stats(stats, &opts.config)?; + code }; - let (send_req, recv_req) = mpsc::channel(max_concurrency); - let (send_resp, mut recv_resp) = mpsc::channel(max_concurrency); - - let mut stats = ResponseStats::new(); - - let bar = pb.clone(); - tokio::spawn(async move { - for link in links { - if let Some(pb) = &bar { - pb.set_message(&link.to_string()); - }; - send_req.send(link).await.unwrap(); - } - }); - - // Start receiving requests - tokio::spawn(async move { - let clients = vec![client; max_concurrency]; - let mut clients = ClientPool::new(send_resp, recv_req, clients); - clients.listen().await; - }); - - while let Some(response) = recv_resp.recv().await { - show_progress(&pb, &response, cfg.verbose); - stats.add(response); - } - - // Note that print statements may interfere with the progress bar, so this - // must go before printing the stats - if let Some(pb) = &pb { - pb.finish_and_clear(); - } + Ok(exit_code as i32) +} +/// Write final statistics to stdout or to file +fn write_stats(stats: ResponseStats, cfg: &Config) -> Result<()> { let writer: Box = match cfg.format { Format::Compact => Box::new(writer::Compact::new()), Format::Detailed => Box::new(writer::Detailed::new()), @@ -278,45 +185,6 @@ async fn run(cfg: &Config, inputs: Vec) -> Result { Format::Markdown => Box::new(writer::Markdown::new()), }; - let code = if stats.is_success() { - ExitCode::Success - } else { - ExitCode::LinkCheckFailure - }; - - write_stats(&*writer, stats, cfg)?; - - Ok(code as i32) -} - -/// Dump all detected links to stdout without checking them -fn dump_links<'a>(links: impl Iterator, verbose: bool) -> ExitCode { - let mut stdout = io::stdout(); - for link in links { - // Avoid panic on broken pipe. - // See https://github.com/rust-lang/rust/issues/46016 - // This can occur when piping the output of lychee - // to another program like `grep`. - - // Only print source in verbose mode. This way the normal link output - // can be fed into another tool without data mangling. - let output = if verbose { - link.to_string() - } else { - link.uri.to_string() - }; - if let Err(e) = writeln!(stdout, "{}", output) { - if e.kind() != io::ErrorKind::BrokenPipe { - eprintln!("{}", e); - return ExitCode::UnexpectedFailure; - } - } - } - ExitCode::Success -} - -/// Write final statistics to stdout or to file -fn write_stats(writer: &dyn StatsWriter, stats: ResponseStats, cfg: &Config) -> Result<()> { let is_empty = stats.is_empty(); let formatted = writer.write(stats)?; diff --git a/lychee-bin/src/parse.rs b/lychee-bin/src/parse.rs index 29fbb0757d..d3e15771ba 100644 --- a/lychee-bin/src/parse.rs +++ b/lychee-bin/src/parse.rs @@ -22,10 +22,7 @@ pub(crate) fn parse_headers>(headers: &[T]) -> Result { let mut out = HeaderMap::new(); for header in headers { let (key, val) = read_header(header.as_ref())?; - out.insert( - HeaderName::from_bytes(key.as_bytes())?, - val.parse().unwrap(), - ); + out.insert(HeaderName::from_bytes(key.as_bytes())?, val.parse()?); } Ok(out) } diff --git a/lychee-bin/tests/local_files.rs b/lychee-bin/tests/local_files.rs index 2353f328dc..c0e8021ef8 100644 --- a/lychee-bin/tests/local_files.rs +++ b/lychee-bin/tests/local_files.rs @@ -33,4 +33,32 @@ mod cli { Ok(()) } + + #[tokio::test] + async fn test_local_dir() -> Result<()> { + let dir = tempfile::tempdir()?; + let index_path = dir.path().join("index.html"); + let mut index = File::create(&index_path)?; + writeln!(index, r#"Foo"#)?; + writeln!(index, r#"Bar"#)?; + + let foo_path = dir.path().join("foo.html"); + File::create(&foo_path)?; + let bar_path = dir.path().join("bar.md"); + File::create(&bar_path)?; + + let mut cmd = main_command(); + cmd.arg(dir.path()) + .arg("--no-progress") + .arg("--verbose") + .env_clear() + .assert() + .success() + .stdout(contains("2 Total")) + .stdout(contains("2 OK")) + .stdout(contains("foo.html")) + .stdout(contains("bar.md")); + + Ok(()) + } } diff --git a/lychee-lib/Cargo.toml b/lychee-lib/Cargo.toml index 0e3593faa8..d7a8fe15f1 100644 --- a/lychee-lib/Cargo.toml +++ b/lychee-lib/Cargo.toml @@ -18,7 +18,6 @@ version = "0.8.1" [dependencies] check-if-email-exists = "0.8.25" -deadpool = "0.9.2" fast_chemail = "0.9.6" glob = "0.3.0" html5ever = "0.25.1" @@ -43,9 +42,16 @@ url = { version = "2.2.2", features = ["serde"] } log = "0.4.14" path-clean = "0.1.0" percent-encoding = "2.1.0" +async-stream = "0.3.2" +jwalk = "0.6.0" cached = "0.26.2" once_cell = "1.8.0" thiserror = "1.0" +futures = "0.3.18" + +[dependencies.par-stream] +version = "0.7.0" +features = ["runtime-tokio"] [dev-dependencies] doc-comment = "0.3.3" diff --git a/lychee-lib/src/client_pool.rs b/lychee-lib/src/client_pool.rs deleted file mode 100644 index 85f3ef08ed..0000000000 --- a/lychee-lib/src/client_pool.rs +++ /dev/null @@ -1,49 +0,0 @@ -use client::Client; -use deadpool::unmanaged::Pool; -use tokio::sync::mpsc; - -use crate::{client, types}; - -#[allow(missing_debug_implementations)] -/// Manages a channel for incoming requests -/// and a pool of lychee clients to handle them -/// -/// Note: Although `reqwest` has its own pool, -/// it only works for connections to the same host, so -/// a single client can still be blocked until a request is done. -pub struct ClientPool { - tx: mpsc::Sender, - rx: mpsc::Receiver, - pool: deadpool::unmanaged::Pool, -} - -impl ClientPool { - #[must_use] - /// Creates a new client pool - pub fn new( - tx: mpsc::Sender, - rx: mpsc::Receiver, - clients: Vec, - ) -> Self { - let pool = Pool::from(clients); - ClientPool { tx, rx, pool } - } - - #[allow(clippy::missing_panics_doc)] - /// Start listening for incoming requests and send each of them - /// asynchronously to a client from the pool - pub async fn listen(&mut self) { - while let Some(req) = self.rx.recv().await { - let client = self.pool.get().await.unwrap(); - let tx = self.tx.clone(); - tokio::spawn(async move { - // Client::check() may fail only because Request::try_from() may fail - // here request is already Request, so it never fails - let resp = client.check(req).await.unwrap(); - tx.send(resp) - .await - .expect("Cannot send response to channel"); - }); - } - } -} diff --git a/lychee-lib/src/collector.rs b/lychee-lib/src/collector.rs index 2089e304f7..9c3dbd0cc1 100644 --- a/lychee-lib/src/collector.rs +++ b/lychee-lib/src/collector.rs @@ -1,7 +1,13 @@ use crate::{extract::Extractor, Base, Input, Request, Result}; +use futures::{ + stream::{self, Stream}, + StreamExt, TryStreamExt, +}; +use par_stream::ParStreamExt; use std::collections::HashSet; /// Collector keeps the state of link collection +/// It drives the link extraction from inputs #[derive(Debug, Clone)] pub struct Collector { base: Option, @@ -24,56 +30,33 @@ impl Collector { } } - /// Fetch all unique links from a slice of inputs - /// All relative URLs get prefixed with `base` if given. + /// Fetch all unique links from inputs + /// All relative URLs get prefixed with `base` (if given). /// (This can be a directory or a base URL) /// /// # Errors /// /// Will return `Err` if links cannot be extracted from an input - pub async fn collect_links(self, inputs: &[Input]) -> Result> { - let (contents_tx, mut contents_rx) = tokio::sync::mpsc::channel(self.max_concurrency); - - // extract input contents - for input in inputs.iter().cloned() { - let sender = contents_tx.clone(); - - let skip_missing_inputs = self.skip_missing_inputs; - tokio::spawn(async move { - let contents = input.get_contents(None, skip_missing_inputs).await; - sender.send(contents).await - }); - } - - // receiver will get None once all tasks are done - drop(contents_tx); - - // extract links from input contents - let mut extract_links_handles = vec![]; - - while let Some(result) = contents_rx.recv().await { - for input_content in result? { - let base = self.base.clone(); - let handle = tokio::task::spawn_blocking(move || { - let mut extractor = Extractor::new(base); - extractor.extract(&input_content) - }); - extract_links_handles.push(handle); - } - } - - // Note: we could dispatch links to be checked as soon as we get them, - // instead of building a HashSet with all links. - // This optimization would speed up cases where there's - // a lot of inputs and/or the inputs are large (e.g. big files). - let mut links: HashSet = HashSet::new(); - - for handle in extract_links_handles { - let new_links = handle.await?; - links.extend(new_links?); - } - - Ok(links) + pub async fn collect_links(self, inputs: Vec) -> impl Stream> { + let skip_missing_inputs = self.skip_missing_inputs; + let contents = stream::iter(inputs) + .par_then_unordered(None, move |input| async move { + input.get_contents(None, skip_missing_inputs).await + }) + .flatten(); + + let extractor = Extractor::new(self.base); + contents + .par_then_unordered(None, move |content| { + let mut extractor = extractor.clone(); + // send to parallel worker + async move { + let content = content?; + let requests: HashSet = extractor.extract(&content)?; + Result::Ok(stream::iter(requests.into_iter().map(Ok))) + } + }) + .try_flatten() } } @@ -100,27 +83,34 @@ mod test { const TEST_GLOB_2_MAIL: &str = "test@glob-2.io"; #[tokio::test] - #[ignore] async fn test_file_without_extension_is_plaintext() -> Result<()> { let temp_dir = tempfile::tempdir()?; // Treat as plaintext file (no extension) let file_path = temp_dir.path().join("README"); let _file = File::create(&file_path)?; let input = Input::new(&file_path.as_path().display().to_string(), true); - let contents = input.get_contents(None, true).await?; + let contents: Vec<_> = input + .get_contents(None, true) + .await + .collect::>() + .await; assert_eq!(contents.len(), 1); - assert_eq!(contents[0].file_type, FileType::Plaintext); + assert_eq!(contents[0].as_ref().unwrap().file_type, FileType::Plaintext); Ok(()) } #[tokio::test] async fn test_url_without_extension_is_html() -> Result<()> { let input = Input::new("https://example.org/", true); - let contents = input.get_contents(None, true).await?; + let contents: Vec<_> = input + .get_contents(None, true) + .await + .collect::>() + .await; assert_eq!(contents.len(), 1); - assert_eq!(contents[0].file_type, FileType::Html); + assert_eq!(contents[0].as_ref().unwrap().file_type, FileType::Html); Ok(()) } @@ -155,10 +145,8 @@ mod test { }, ]; - let responses = Collector::new(None, false, 8) - .collect_links(&inputs) - .await?; - let mut links = responses.into_iter().map(|r| r.uri).collect::>(); + let responses = Collector::new(None, false, 8).collect_links(inputs).await; + let mut links: Vec = responses.map(|r| r.unwrap().uri).collect().await; let mut expected_links = vec![ website(TEST_STRING), diff --git a/lychee-lib/src/extract.rs b/lychee-lib/src/extract.rs index f5849e3b4d..3c69207671 100644 --- a/lychee-lib/src/extract.rs +++ b/lychee-lib/src/extract.rs @@ -19,37 +19,36 @@ use crate::{ /// A handler for extracting links from various input formats like Markdown and /// HTML. Allocations are avoided if possible as this is a performance-critical /// section of the library. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Extractor { - /// URLs extracted from input - pub urls: Vec, /// Base URL or Path pub base: Option, } impl Extractor { pub(crate) const fn new(base: Option) -> Self { - Extractor { - urls: Vec::new(), - base, - } + Extractor { base } } /// Main entrypoint for extracting links from various sources /// (Markdown, HTML, and plaintext) pub(crate) fn extract(&mut self, input_content: &InputContent) -> Result> { - match input_content.file_type { + let urls = match input_content.file_type { FileType::Markdown => self.extract_markdown(&input_content.content), - FileType::Html => self.extract_html(&input_content.content), + FileType::Html => self.extract_html(&input_content.content)?, FileType::Plaintext => self.extract_plaintext(&input_content.content), }; - self.create_requests(input_content) + self.create_requests(&urls, input_content) } /// Create requests out of the collected URLs. - /// Only keeps legit URLs. For example this filters out anchors. - fn create_requests(&self, input_content: &InputContent) -> Result> { - let mut requests: HashSet = HashSet::with_capacity(self.urls.len()); + /// Only keeps "valid" URLs. This filters out anchors for example. + fn create_requests( + &self, + urls: &[StrTendril], + input_content: &InputContent, + ) -> Result> { + let mut requests: HashSet = HashSet::with_capacity(urls.len()); let base_input = match &input_content.input { Input::RemoteUrl(url) => Some(Url::parse(&format!( @@ -61,7 +60,7 @@ impl Extractor { // other inputs do not have a URL to extract a base }; - for url in &self.urls { + for url in urls { let req = if let Ok(uri) = Uri::try_from(url.as_ref()) { Request::new(uri, input_content.input.clone()) } else if let Some(url) = self.base.as_ref().and_then(|u| u.join(url)) { @@ -94,36 +93,39 @@ impl Extractor { } /// Extract unparsed URL strings from a Markdown string. - fn extract_markdown(&mut self, input: &str) { + fn extract_markdown(&self, input: &str) -> Vec { let parser = Parser::new(input); - for event in parser { - match event { + parser + .flat_map(|event| match event { MDEvent::Start(Tag::Link(_, url, _) | Tag::Image(_, url, _)) => { - self.urls.push(StrTendril::from(url.as_ref())); + vec![StrTendril::from(url.as_ref())] } MDEvent::Text(txt) => self.extract_plaintext(&txt), - MDEvent::Html(html) => self.extract_html(&html.to_string()), - _ => {} - } - } + MDEvent::Html(html) => self.extract_plaintext(&html.to_string()), + _ => vec![], + }) + .collect() } /// Extract unparsed URL strings from an HTML string. - fn extract_html(&mut self, input: &str) { - let tendril = StrTendril::from(input); - let rc_dom = parse_document(RcDom::default(), html5ever::ParseOpts::default()).one(tendril); + fn extract_html(&mut self, input: &str) -> Result> { + let rc_dom = parse_document(RcDom::default(), html5ever::ParseOpts::default()) + .from_utf8() + .read_from(&mut input.as_bytes())?; - self.walk_html_links(&rc_dom.document); + Ok(self.walk_html_links(&rc_dom.document)) } /// Recursively walk links in a HTML document, aggregating URL strings in `urls`. - fn walk_html_links(&mut self, node: &Handle) { + fn walk_html_links(&mut self, node: &Handle) -> Vec { + let mut all_urls = Vec::new(); match node.data { NodeData::Text { ref contents } => { - self.extract_plaintext(&contents.borrow()); + all_urls.append(&mut self.extract_plaintext(&contents.borrow())); } + NodeData::Comment { ref contents } => { - self.extract_plaintext(contents); + all_urls.append(&mut self.extract_plaintext(contents)); } NodeData::Element { ref name, @@ -140,7 +142,7 @@ impl Extractor { if urls.is_empty() { self.extract_plaintext(&attr.value); } else { - self.urls.extend(urls.into_iter().map(StrTendril::from)); + all_urls.extend(urls.into_iter().map(StrTendril::from).collect::>()); } } } @@ -150,14 +152,20 @@ impl Extractor { // recursively traverse the document's nodes -- this doesn't need any extra // exit conditions, because the document is a tree for child in node.children.borrow().iter() { - self.walk_html_links(child); + let urls = self.walk_html_links(child); + all_urls.extend(urls); } + + all_urls } /// Extract unparsed URL strings from plaintext - fn extract_plaintext(&mut self, input: &str) { - self.urls - .extend(url::find_links(input).map(|l| StrTendril::from(l.as_str()))); + // Allow &self here for consistency with the other extractors + #[allow(clippy::unused_self)] + fn extract_plaintext(&self, input: &str) -> Vec { + url::find_links(input) + .map(|l| StrTendril::from(l.as_str())) + .collect() } fn create_uri_from_path(&self, src: &Path, dst: &str) -> Result> { @@ -263,18 +271,16 @@ mod test { fn test_extract_link_at_end_of_line() { let input = "https://www.apache.org/licenses/LICENSE-2.0\n"; let link = input.trim_end(); - let mut extractor = Extractor::new(None); - extractor.extract_markdown(input); - assert_eq!(vec![StrTendril::from(link)], extractor.urls); - let mut extractor = Extractor::new(None); - extractor.extract_plaintext(input); - assert_eq!(vec![StrTendril::from(link)], extractor.urls); + let urls = extractor.extract_markdown(input); + assert_eq!(vec![StrTendril::from(link)], urls); - let mut extractor = Extractor::new(None); - extractor.extract_html(input); - assert_eq!(vec![StrTendril::from(link)], extractor.urls); + let urls = extractor.extract_plaintext(input); + assert_eq!(vec![StrTendril::from(link)], urls); + + let urls = extractor.extract_html(input).unwrap(); + assert_eq!(vec![StrTendril::from(link)], urls); } #[test] diff --git a/lychee-lib/src/helpers/path.rs b/lychee-lib/src/helpers/path.rs index fc5f085656..37daa04589 100644 --- a/lychee-lib/src/helpers/path.rs +++ b/lychee-lib/src/helpers/path.rs @@ -5,7 +5,8 @@ use path_clean::PathClean; use std::env; use std::path::{Path, PathBuf}; -static CURRENT_DIR: Lazy = Lazy::new(|| env::current_dir().unwrap()); +static CURRENT_DIR: Lazy = + Lazy::new(|| env::current_dir().expect("cannot get current dir from environment")); // Returns the base if it is a valid `PathBuf` fn get_base_dir(base: &Option) -> Option { diff --git a/lychee-lib/src/lib.rs b/lychee-lib/src/lib.rs index 58c0a6abc6..e4d1de057a 100644 --- a/lychee-lib/src/lib.rs +++ b/lychee-lib/src/lib.rs @@ -47,7 +47,6 @@ doc_comment::doctest!("../../README.md"); mod client; -mod client_pool; /// A pool of clients, to handle concurrent checks pub mod collector; mod helpers; @@ -73,8 +72,7 @@ use ring as _; // required for apple silicon #[doc(inline)] pub use crate::{ - client::{check, ClientBuilder}, - client_pool::ClientPool, + client::{check, Client, ClientBuilder}, collector::Collector, filter::{Excludes, Filter, Includes}, types::{Base, ErrorKind, Input, Request, Response, ResponseBody, Result, Status, Uri}, diff --git a/lychee-lib/src/types/error.rs b/lychee-lib/src/types/error.rs index 8a91aa46c4..fbe5fd5633 100644 --- a/lychee-lib/src/types/error.rs +++ b/lychee-lib/src/types/error.rs @@ -4,9 +4,10 @@ use std::hash::Hash; use std::{convert::Infallible, path::PathBuf}; use thiserror::Error; +use super::InputContent; use crate::Uri; -/// Possible Errors when interacting with `lychee_lib` +/// Kinds of status errors. #[derive(Error, Debug)] #[non_exhaustive] pub enum ErrorKind { @@ -50,6 +51,9 @@ pub enum ErrorKind { /// The given path does not resolve to a valid file #[error("Cannot find local file {0}")] FileNotFound(PathBuf), + /// Error while traversing an input directory + #[error("Cannot traverse input directory")] + DirTraversal(#[from] jwalk::Error), /// The given glob pattern is not valid #[error("UNIX glob pattern is invalid")] InvalidGlobPattern(#[from] glob::PatternError), @@ -59,6 +63,9 @@ pub enum ErrorKind { /// Used an insecure URI where a secure variant was reachable #[error("This URI is available in HTTPS protocol, but HTTP is provided, use '{0}' instead")] InsecureURL(Uri), + /// Error while sending/receiving messages from MPSC channel + #[error("Cannot send/receive message from channel")] + ChannelError(#[from] tokio::sync::mpsc::error::SendError), /// An URL with an invalid host was found #[error("URL is missing a host")] InvalidUrlHost, @@ -97,6 +104,7 @@ impl Hash for ErrorKind { Self::IoError(p, e) => (p, e.kind()).hash(state), Self::ReqwestError(e) => e.to_string().hash(state), Self::HubcapsError(e) => e.to_string().hash(state), + Self::DirTraversal(e) => e.to_string().hash(state), Self::FileNotFound(e) => e.to_string_lossy().hash(state), Self::UrlParseError(s, e) => (s, e.type_id()).hash(state), Self::InvalidURI(u) => u.hash(state), @@ -108,6 +116,7 @@ impl Hash for ErrorKind { Self::InvalidBase(base, e) => (base, e).hash(state), Self::InvalidHeader(e) => e.to_string().hash(state), Self::InvalidGlobPattern(e) => e.to_string().hash(state), + Self::ChannelError(e) => e.to_string().hash(state), Self::MissingGitHubToken | Self::InvalidUrlHost => { std::mem::discriminant(self).hash(state); } diff --git a/lychee-lib/src/types/file.rs b/lychee-lib/src/types/file.rs index 6ba8d89012..05ff157289 100644 --- a/lychee-lib/src/types/file.rs +++ b/lychee-lib/src/types/file.rs @@ -28,6 +28,9 @@ impl> From

for FileType { // Unfortunately that's not possible without refactoring, as // `AsRef` could be implemented for `Url` in the future, which is why // `From for FileType` is not allowed. + // As a workaround, we check if we got a known web-protocol + let is_url = path.starts_with("http"); + match path .extension() .and_then(std::ffi::OsStr::to_str) @@ -35,8 +38,9 @@ impl> From

for FileType { .as_deref() { Some("md" | "markdown") => FileType::Markdown, - Some("htm" | "html") | None => FileType::Html, - Some(_) => FileType::Plaintext, + Some("htm" | "html") => FileType::Html, + None if is_url => FileType::Html, + _ => FileType::Plaintext, } } } @@ -54,10 +58,15 @@ mod tests { FileType::from(Path::new("test.unknown")), FileType::Plaintext ); + assert_eq!(FileType::from(Path::new("test")), FileType::Plaintext); assert_eq!(FileType::from(Path::new("test.txt")), FileType::Plaintext); assert_eq!(FileType::from(Path::new("README.TXT")), FileType::Plaintext); assert_eq!(FileType::from(Path::new("test.htm")), FileType::Html); - assert_eq!(FileType::from(Path::new("test")), FileType::Html); + assert_eq!(FileType::from(Path::new("index.html")), FileType::Html); + assert_eq!( + FileType::from(Path::new("http://foo.com/index.html")), + FileType::Html + ); } } diff --git a/lychee-lib/src/types/input.rs b/lychee-lib/src/types/input.rs index ad97355dd2..cbb3df2c8b 100644 --- a/lychee-lib/src/types/input.rs +++ b/lychee-lib/src/types/input.rs @@ -1,14 +1,24 @@ use crate::types::FileType; use crate::Result; +use async_stream::try_stream; +use futures::stream::Stream; use glob::glob_with; +use jwalk::WalkDir; use reqwest::Url; use serde::Serialize; use shellexpand::tilde; +use std::fmt::Display; use std::path::{Path, PathBuf}; -use std::{fmt::Display, fs::read_to_string}; use tokio::io::{stdin, AsyncReadExt}; const STDIN: &str = "-"; + +// Check the extension of the given path against the list of known/accepted +// file extensions +fn valid_extension(p: &Path) -> bool { + matches!(FileType::from(p), FileType::Markdown | FileType::Html) +} + #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[non_exhaustive] /// An exhaustive list of input sources, which lychee accepts @@ -109,27 +119,75 @@ impl Input { /// because of an underlying I/O error (e.g. an error while making a /// network request or retrieving the contents from the file system) pub async fn get_contents( - &self, + self, file_type_hint: Option, skip_missing: bool, - ) -> Result> { - match *self { - // TODO: should skip_missing also affect URLs? - Input::RemoteUrl(ref url) => Ok(vec![Self::url_contents(url).await?]), - Input::FsGlob { - ref pattern, - ignore_case, - } => Ok(Self::glob_contents(pattern, ignore_case).await?), - Input::FsPath(ref path) => { - let content = Self::path_content(path); - match content { - Ok(input_content) => Ok(vec![input_content]), - Err(_) if skip_missing => Ok(vec![]), - Err(e) => Err(e), + ) -> impl Stream> { + try_stream! { + match self { + Input::RemoteUrl(ref url) => { + let contents: InputContent = Self::url_contents(url).await?; + yield contents; + }, + Input::FsGlob { + ref pattern, + ignore_case, + } => { + for await content in Self::glob_contents(pattern, ignore_case).await { + let content = content?; + yield content; + } } + Input::FsPath(ref path) => { + if path.is_dir() { + for entry in WalkDir::new(path).skip_hidden(true) + .process_read_dir(|_, _, _, children| { + children.retain(|child| { + let entry = match child.as_ref() { + Ok(x) => x, + Err(_) => return true, + }; + + let file_type = entry.file_type(); + + if file_type.is_dir() { + // Required for recursion + return true; + } + if file_type.is_symlink() { + return false; + } + if !file_type.is_file() { + return false; + } + return valid_extension(&entry.path()); + }); + }) { + let entry = entry?; + if entry.file_type().is_dir() { + continue; + } + let content = Self::path_content(entry.path()).await?; + yield content + } + } else { + let content = Self::path_content(path).await; + match content { + Err(_) if skip_missing => (), + Err(e) => Err(e)?, + Ok(content) => yield content, + }; + } + }, + Input::Stdin => { + let content = Self::stdin_content(file_type_hint).await?; + yield content; + }, + Input::String(ref s) => { + let content = Self::string_content(s, file_type_hint); + yield content; + }, } - Input::Stdin => Ok(vec![Self::stdin_content(file_type_hint).await?]), - Input::String(ref s) => Ok(vec![Self::string_content(s, file_type_hint)]), } } @@ -151,40 +209,46 @@ impl Input { Ok(input_content) } - async fn glob_contents(path_glob: &str, ignore_case: bool) -> Result> { - let mut contents = vec![]; - let glob_expanded = tilde(&path_glob); + async fn glob_contents( + path_glob: &str, + ignore_case: bool, + ) -> impl Stream> + '_ { + let glob_expanded = tilde(&path_glob).to_string(); let mut match_opts = glob::MatchOptions::new(); match_opts.case_sensitive = !ignore_case; - for entry in glob_with(&glob_expanded, match_opts)? { - match entry { - Ok(path) => { - if path.is_dir() { - // Directories can still have a suffix which looks like - // a file extension like `foo.html`. This can lead to + try_stream! { + for entry in glob_with(&glob_expanded, match_opts)? { + match entry { + Ok(path) => { + // Directories can have a suffix which looks like + // a file extension (like `foo.html`). This can lead to // unexpected behavior with glob patterns like // `**/*.html`. Therefore filter these out. - // https://github.com/lycheeverse/lychee/pull/262#issuecomment-913226819 - continue; + // See https://github.com/lycheeverse/lychee/pull/262#issuecomment-913226819 + if path.is_dir() { + continue; + } + let content: InputContent = Self::path_content(&path).await?; + yield content; } - let content = Self::path_content(&path)?; - contents.push(content); + Err(e) => println!("{:?}", e), } - Err(e) => println!("{:?}", e), } } - - Ok(contents) } /// Get the input content of a given path /// # Errors /// /// Will return `Err` if file contents can't be read - pub fn path_content + AsRef + Clone>(path: P) -> Result { - let content = read_to_string(&path).map_err(|e| (path.clone().into(), e))?; + pub async fn path_content + AsRef + Clone>( + path: P, + ) -> Result { + let content = tokio::fs::read_to_string(&path) + .await + .map_err(|e| (path.clone().into(), e))?; let input_content = InputContent { file_type: FileType::from(path.as_ref()), content, @@ -212,3 +276,19 @@ impl Input { InputContent::from_string(s, file_type_hint.unwrap_or_default()) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_valid_extension() { + assert!(valid_extension(Path::new("file.md"))); + assert!(valid_extension(Path::new("file.markdown"))); + assert!(valid_extension(Path::new("file.html"))); + assert!(valid_extension(Path::new("file.htm"))); + assert!(valid_extension(Path::new("file.HTM"))); + assert!(!valid_extension(Path::new("file.txt"))); + assert!(!valid_extension(Path::new("file"))); + } +}