-
Notifications
You must be signed in to change notification settings - Fork 13.9k
Closed as not planned
Description
Code
use clap::Parser;
use reqwest::{header::{HeaderValue, RANGE}, Client, StatusCode, redirect::Policy};
use std::path::PathBuf;
use tokio::{io::{AsyncWriteExt, AsyncSeekExt, AsyncReadExt}, time::Duration};
use futures::StreamExt;
use std::time::{Duration as StdDuration, Instant};
use std::sync::{Arc, atomic::{AtomicU64, AtomicBool, Ordering}};
use indicatif::{ProgressBar, ProgressStyle, MultiProgress};
use tokio::sync::{Semaphore, Mutex};
use sha2::{Sha256, Digest};
use std::io::SeekFrom;
use std::fs;
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
#[derive(Parser)]
#[command(author, version, about = "A blazingly fast concurrent downloader with resume capability and integrity checks")]
struct Cfg {
link: String,
#[arg(short, long)]
output: Option<PathBuf>,
#[arg(short, long, default_value_t = 8)]
threads: usize,
#[arg(short = 't', long, default_value_t = 30.0)]
timeout: f32,
#[arg(short, long)]
verify: bool,
#[arg(long)]
checksum: Option<String>,
#[arg(short = 'r', long, default_value_t = 3)]
retries: u32,
#[arg(short = 'c', long, default_value_t = 4)]
concurrent_writes: usize,
#[arg(long, default_value_t = 10737418240)]
max_size: u64,
#[arg(long)]
no_resume: bool,
}
struct ChunkInfo {
start: u64,
end: u64,
completed: Arc<AtomicBool>,
}
struct SpeedTracker {
last_instant: Instant,
last_bytes: u64,
}
impl SpeedTracker {
fn new() -> Self {
SpeedTracker { last_instant: Instant::now(), last_bytes: 0 }
}
}
fn validate_url(u: &str) -> Result<()> {
let p = url::Url::parse(u)?;
if p.scheme() != "https" && p.scheme() != "http" {
let e = std::io::Error::new(std::io::ErrorKind::InvalidInput, "Only HTTP/HTTPS allowed");
return Err(Box::new(e));
}
if let Some(h) = p.host_str() {
if h == "localhost" || h.starts_with("127.") || h.starts_with("192.168.") || h.starts_with("10.") {
let e = std::io::Error::new(std::io::ErrorKind::InvalidInput, "Local network URLs not allowed");
return Err(Box::new(e));
}
}
Ok(())
}
fn validate_path(p: &PathBuf) -> Result<()> {
let s = p.to_string_lossy();
if s.contains("..") || s.starts_with('/') || (cfg!(windows) && s.contains('\\')) {
let e = std::io::Error::new(std::io::ErrorKind::InvalidInput, "Path traversal detected");
return Err(Box::new(e));
}
Ok(())
}
#[tokio::main]
async fn main() -> Result<()> {
let cfg = Cfg::parse();
if cfg.threads == 0 || cfg.threads > 64 {
let e = std::io::Error::new(std::io::ErrorKind::InvalidInput, "Threads must be 1-64");
return Err(Box::new(e));
}
validate_url(&cfg.link)?;
let parsed_url = url::Url::parse(&cfg.link)?;
let timeout = StdDuration::from_secs_f32(cfg.timeout);
let client = Client::builder()
.timeout(timeout)
.redirect(Policy::limited(10))
.user_agent("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36")
.build()?;
let out_path = cfg.output.unwrap_or_else(|| {
PathBuf::from(
parsed_url
.path_segments()
.and_then(|s| s.last())
.filter(|s| !s.is_empty())
.unwrap_or("download")
)
});
validate_path(&out_path)?;
let mut total_size: u64 = 0;
let mut server_etag: Option<String> = None;
match client.head(parsed_url.clone()).send().await {
Ok(resp) => {
if let Some(len) = resp.headers().get(reqwest::header::CONTENT_LENGTH) {
if let Ok(s) = len.to_str() {
total_size = s.parse().unwrap_or(0);
}
}
if let Some(et) = resp.headers().get(reqwest::header::ETAG) {
if let Ok(s) = et.to_str() {
server_etag = Some(s.to_string());
}
}
}
Err(_) => {
if let Ok(resp) = client.get(parsed_url.clone()).header(RANGE, "bytes=0-0").send().await {
if let Some(cr) = resp.headers().get(reqwest::header::CONTENT_RANGE) {
if let Ok(s) = cr.to_str() {
if let Some(pos) = s.rfind('/') {
total_size = s[pos+1..].parse().unwrap_or(0);
}
}
}
}
}
}
if total_size > cfg.max_size {
let msg = format!("File size {} exceeds max allowed {}", total_size, cfg.max_size);
let e = std::io::Error::new(std::io::ErrorKind::InvalidInput, msg);
return Err(Box::new(e));
}
if total_size == 0 {
println!("Warning: could not determine total size. Falling back to single-stream download and integrity will be best-effort.");
}
let temp_path = out_path.with_extension("part");
let multi_progress = MultiProgress::new();
let main_pb = if total_size > 0 {
multi_progress.add(ProgressBar::new(total_size))
} else {
multi_progress.add(ProgressBar::new_spinner())
};
main_pb.enable_steady_tick(StdDuration::from_millis(1000));
let style = ProgressStyle::with_template(
"{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({bytes_per_sec}) ({eta})"
);
if let Ok(s) = style {
main_pb.set_style(s.progress_chars("━╸─"));
}
let resume_from = if cfg.no_resume {
0
} else {
fs::metadata(&temp_path).map(|m| m.len()).unwrap_or(0)
};
if resume_from > total_size && total_size > 0 {
println!("Existing temp file larger than expected total size, truncating");
let _ = tokio::fs::OpenOptions::new().write(true).open(&temp_path).await?.set_len(total_size).await;
}
println!("Output: {}", out_path.display());
if total_size > 0 {
println!("Size: {:.2} MB", total_size as f64 / 1_048_576.0);
}
println!("Resuming from {} bytes", resume_from);
println!("Threads: {}", cfg.threads);
main_pb.set_position(resume_from);
let supports_ranges = true;
if total_size == 0 || !supports_ranges || cfg.threads == 1 {
let mut attempts_overall = 0;
loop {
attempts_overall += 1;
if attempts_overall > cfg.retries {
let e = std::io::Error::new(std::io::ErrorKind::Other, "Failed single-stream download after retries");
return Err(Box::new(e));
}
let mut resp = client.get(parsed_url.clone()).send().await?;
if !(resp.status().is_success()) {
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
let mut file = tokio::fs::OpenOptions::new().create(true).write(true).truncate(true).open(&temp_path).await?;
let mut hasher = Sha256::new();
let mut downloaded: u64 = 0;
while let Some(item) = resp.bytes_stream().next().await {
let chunk = item?;
file.write_all(&chunk).await?;
downloaded += chunk.len() as u64;
if cfg.verify {
hasher.update(&chunk);
}
main_pb.inc(chunk.len() as u64);
}
file.sync_all().await.ok();
if cfg.verify || cfg.checksum.is_some() || server_etag.is_some() {
let computed = if cfg.verify || cfg.checksum.is_some() {
format!("{:x}", hasher.finalize())
} else {
String::new()
};
if let Some(expected) = cfg.checksum.as_ref() {
if !computed.eq_ignore_ascii_case(expected) {
println!("Checksum mismatch (expected provided checksum). Attempt {} failed", attempts_overall);
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
}
if let Some(et) = server_etag.as_ref() {
let et_clean = et.trim_matches('"');
if et_clean.len() == computed.len() && !computed.eq_ignore_ascii_case(et_clean) {
println!("ETag/hash mismatch. Attempt {} failed", attempts_overall);
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
}
}
tokio::fs::rename(&temp_path, &out_path).await?;
println!("Download completed successfully!");
break;
}
return Ok(());
}
let chunk_size = std::cmp::max(1024 * 1024, total_size / cfg.threads as u64);
let file_path = std::env::temp_dir().join("dl_temp_file.part");
let file = tokio::fs::OpenOptions::new().create(true).write(true).read(true).open(&file_path).await?;
file.set_len(total_size).await.ok();
let file_arc = Arc::new(Mutex::new(file));
let downloaded_counter = Arc::new(AtomicU64::new(resume_from));
let sem = Arc::new(Semaphore::new(cfg.concurrent_writes));
let mut chunks = Vec::new();
let mut start = 0u64;
while start < total_size {
let end = std::cmp::min(start + chunk_size - 1, total_size - 1);
chunks.push((start, end));
start = end + 1;
}
let mut tasks = Vec::new();
for (idx, (start, end)) in chunks.into_iter().enumerate() {
let client = client.clone();
let url = parsed_url.clone();
let file = file_arc.clone();
let downloaded = downloaded_counter.clone();
let sem = sem.clone();
let c_start = start;
let c_end = end;
let retries = cfg.retries;
let pb = main_pb.clone();
let task = tokio::spawn(async move {
let mut attempts = 0u32;
let mut backoff = Duration::from_millis(200);
loop {
attempts += 1;
if attempts > retries {
let e = std::io::Error::new(std::io::ErrorKind::Other, "chunk retries exceeded");
return Err::<(), Box<dyn std::error::Error + Send + Sync>>(Box::new(e));
}
let range_val = format!("bytes={}-{}", c_start, c_end);
let mut req = client.get(url.clone());
if let Ok(hv) = HeaderValue::from_str(&range_val) {
req = req.header(RANGE, hv);
}
match req.send().await {
Ok(mut resp) => {
if resp.status() != StatusCode::PARTIAL_CONTENT && resp.status() != StatusCode::OK {
tokio::time::sleep(backoff).await;
backoff = backoff.saturating_mul(2).min(Duration::from_secs(30));
continue;
}
let mut position = c_start;
while let Some(chunk) = resp.bytes_stream().next().await {
let bytes = match chunk {
Ok(b) => b,
Err(_e) => {
tokio::time::sleep(backoff).await;
backoff = backoff.saturating_mul(2).min(Duration::from_secs(30));
continue;
}
};
let _permit = sem.acquire().await.unwrap();
let mut f = file.lock().await;
if let Err(e) = f.seek(SeekFrom::Start(position)).await {
return Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>);
}
if let Err(e) = f.write_all(&bytes).await {
return Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>);
}
f.sync_data().await.ok();
let len = bytes.len() as u64;
downloaded.fetch_add(len, Ordering::SeqCst);
position += len;
pb.inc(len);
}
if position - c_start != (c_end - c_start + 1) {
tokio::time::sleep(backoff).await;
backoff = backoff.saturating_mul(2).min(Duration::from_secs(30));
continue;
}
return Ok::<(), Box<dyn std::error::Error + Send + Sync>>(());
}
Err(_) => {
tokio::time::sleep(backoff).await;
backoff = backoff.saturating_mul(2).min(Duration::from_secs(30));
continue;
}
}
}
});
tasks.push(task);
}
let results = futures::future::join_all(tasks).await;
for r in results {
match r {
Ok(Ok(())) => {}
Ok(Err(e)) => { return Err(e); }
Err(e) => {
let msg = format!("task panicked: {}", e);
let err = std::io::Error::new(std::io::ErrorKind::Other, msg);
return Err(Box::new(err));
}
}
}
{
let mut f = file_arc.lock().await;
f.sync_all().await.ok();
}
tokio::fs::copy(&file_path, &temp_path).await?;
tokio::fs::remove_file(&file_path).await.ok();
if cfg.verify || cfg.checksum.is_some() || server_etag.is_some() {
let mut file = tokio::fs::File::open(&temp_path).await?;
let mut hasher = Sha256::new();
let mut buf = vec![0u8; 8192];
loop {
let n = file.read(&mut buf).await?;
if n == 0 { break; }
hasher.update(&buf[..n]);
}
let computed = format!("{:x}", hasher.finalize());
if let Some(expected) = cfg.checksum.as_ref() {
if !computed.eq_ignore_ascii_case(expected) {
let msg = format!("Checksum mismatch: computed {} but expected {}", computed, expected);
let e = std::io::Error::new(std::io::ErrorKind::Other, msg);
return Err(Box::new(e));
}
}
if let Some(et) = server_etag.as_ref() {
let et_clean = et.trim_matches('"');
if (et_clean.len() == computed.len()) && !computed.eq_ignore_ascii_case(et_clean) {
let msg = format!("ETag/hash mismatch: computed {} vs etag {}", computed, et_clean);
let e = std::io::Error::new(std::io::ErrorKind::Other, msg);
return Err(Box::new(e));
}
}
}
let meta = tokio::fs::metadata(&temp_path).await?;
if total_size > 0 && meta.len() != total_size {
let msg = format!("Final file size {} does not equal expected {}", meta.len(), total_size);
let e = std::io::Error::new(std::io::ErrorKind::Other, msg);
return Err(Box::new(e));
}
tokio::fs::rename(&temp_path, &out_path).await?;
println!("Download completed successfully and verified!");
Ok(())
}Current output
error[E0277]: `?` couldn't convert the error to `Box<std::io::Error>`
--> src\main.rs:80:28
|
80 | validate_url(&cfg.link)?;
| -----------------------^ the trait `From<Box<dyn std::error::Error + std::marker::Send + Sync>>` is not implemented for `Box<std::io::Error>`
| |
| this can't be annotated with `?` because it has type `Result<_, Box<(dyn std::error::Error + std::marker::Send + Sync + 'static)>>`
|
= note: the question mark operation (`?`) implicitly performs a conversion on the error value using the `From` trait
= help: the following other types implement trait `From<T>`:
`Box<ByteStr>` implements `From<Box<[u8]>>`
`Box<CStr>` implements `From<&CStr>`
`Box<CStr>` implements `From<&mut CStr>`
`Box<CStr>` implements `From<CString>`
`Box<CStr>` implements `From<Cow<'_, CStr>>`
`Box<Path>` implements `From<&Path>`
`Box<Path>` implements `From<&mut Path>`
`Box<Path>` implements `From<Cow<'_, Path>>`
and 27 others
error[E0277]: `?` couldn't convert the error to `Box<std::io::Error>`
--> src\main.rs:81:48
|
81 | let parsed_url = url::Url::parse(&cfg.link)?;
| --------------------------^ the trait `From<ParseError>` is not implemented for `Box<std::io::Error>`
| |
| this can't be annotated with `?` because it has type `Result<_, ParseError>`
|
= note: the question mark operation (`?`) implicitly performs a conversion on the error value using the `From` trait
= help: the following other types implement trait `From<T>`:
`Box<ByteStr>` implements `From<Box<[u8]>>`
`Box<CStr>` implements `From<&CStr>`
`Box<CStr>` implements `From<&mut CStr>`
`Box<CStr>` implements `From<CString>`
`Box<CStr>` implements `From<Cow<'_, CStr>>`
`Box<Path>` implements `From<&Path>`
`Box<Path>` implements `From<&mut Path>`
`Box<Path>` implements `From<Cow<'_, Path>>`
and 27 others
error[E0277]: `?` couldn't convert the error to `Box<std::io::Error>`
--> src\main.rs:87:17
|
87 | .build()?;
| -------^ the trait `From<reqwest::Error>` is not implemented for `Box<std::io::Error>`
| |
| this can't be annotated with `?` because it has type `Result<_, reqwest::Error>`
|
= note: the question mark operation (`?`) implicitly performs a conversion on the error value using the `From` trait
= help: the following other types implement trait `From<T>`:
`Box<ByteStr>` implements `From<Box<[u8]>>`
`Box<CStr>` implements `From<&CStr>`
`Box<CStr>` implements `From<&mut CStr>`
`Box<CStr>` implements `From<CString>`
`Box<CStr>` implements `From<Cow<'_, CStr>>`
`Box<Path>` implements `From<&Path>`
`Box<Path>` implements `From<&mut Path>`
`Box<Path>` implements `From<Cow<'_, Path>>`
and 27 others
error[E0277]: `?` couldn't convert the error to `Box<std::io::Error>`
--> src\main.rs:97:29
|
97 | validate_path(&out_path)?;
| ------------------------^ the trait `From<Box<dyn std::error::Error + std::marker::Send + Sync>>` is not implemented for `Box<std::io::Error>`
| |
| this can't be annotated with `?` because it has type `Result<_, Box<(dyn std::error::Error + std::marker::Send + Sync + 'static)>>`
|
= note: the question mark operation (`?`) implicitly performs a conversion on the error value using the `From` trait
= help: the following other types implement trait `From<T>`:
`Box<ByteStr>` implements `From<Box<[u8]>>`
`Box<CStr>` implements `From<&CStr>`
`Box<CStr>` implements `From<&mut CStr>`
`Box<CStr>` implements `From<CString>`
`Box<CStr>` implements `From<Cow<'_, CStr>>`
`Box<Path>` implements `From<&Path>`
`Box<Path>` implements `From<&mut Path>`
`Box<Path>` implements `From<Cow<'_, Path>>`
and 27 others
error[E0277]: `?` couldn't convert the error to `Box<std::io::Error>`
--> src\main.rs:172:71
|
172 | let mut resp = client.get(parsed_url.clone()).send().await?;
| -------------------------------------------^ the trait `From<reqwest::Error>` is not implemented for `Box<std::io::Error>`
| |
| this can't be annotated with `?` because it has type `Result<_, reqwest::Error>`
|
= note: the question mark operation (`?`) implicitly performs a conversion on the error value using the `From` trait
= help: the following other types implement trait `From<T>`:
`Box<ByteStr>` implements `From<Box<[u8]>>`
`Box<CStr>` implements `From<&CStr>`
`Box<CStr>` implements `From<&mut CStr>`
`Box<CStr>` implements `From<CString>`
`Box<CStr>` implements `From<Cow<'_, CStr>>`
`Box<Path>` implements `From<&Path>`
`Box<Path>` implements `From<&mut Path>`
`Box<Path>` implements `From<Cow<'_, Path>>`
and 27 others
error[E0277]: `?` couldn't convert the error to `Box<std::io::Error>`
--> src\main.rs:181:33
|
181 | let chunk = item?;
| ----^ the trait `From<reqwest::Error>` is not implemented for `Box<std::io::Error>`
| |
| this can't be annotated with `?` because it has type `Result<_, reqwest::Error>`
|
= note: the question mark operation (`?`) implicitly performs a conversion on the error value using the `From` trait
= help: the following other types implement trait `From<T>`:
`Box<ByteStr>` implements `From<Box<[u8]>>`
`Box<CStr>` implements `From<&CStr>`
`Box<CStr>` implements `From<&mut CStr>`
`Box<CStr>` implements `From<CString>`
`Box<CStr>` implements `From<Cow<'_, CStr>>`
`Box<Path>` implements `From<&Path>`
`Box<Path>` implements `From<&mut Path>`
`Box<Path>` implements `From<Cow<'_, Path>>`
and 27 others
error[E0308]: mismatched types
--> src\main.rs:309:40
|
309 | Ok(Err(e)) => { return Err(e); }
| --- ^ expected `Box<Error>`, found `Box<dyn Error + Send + Sync>`
| |
| arguments to this enum variant are incorrect
|
= note: expected struct `Box<std::io::Error>`
found struct `Box<dyn std::error::Error + std::marker::Send + Sync>`
= help: `std::io::Error` implements `Error` so you could change the expected type to `Box<dyn Error>`
help: the type constructed contains `Box<dyn std::error::Error + std::marker::Send + Sync>` due to the type of the argument passed
--> src\main.rs:309:36
|
309 | Ok(Err(e)) => { return Err(e); }
| ^^^^-^
| |
| this argument influences the type of `Err`
note: tuple variant defined here
--> /rustc/1159e78c4747b02ef996e55082b704c09b970588\library\core\src\result.rs:557:5
error[E0308]: mismatched types
--> src\main.rs:357:5
|
74 | async fn main() -> Result<()> {
| ---------- expected `Result<(), Box<dyn Error + Send + Sync>>` because of return type
...
357 | Ok(())
| ^^^^^^ expected `Result<(), Box<dyn Error + Send + Sync>>`, found `Result<(), Box<Error>>`
|
= note: expected enum `std::result::Result<_, Box<(dyn std::error::Error + std::marker::Send + Sync + 'static)>>`
found enum `std::result::Result<_, Box<std::io::Error>>`
= help: `std::io::Error` implements `Error` so you could box the found value and coerce it to the trait object `Box<dyn Error>`, you will have to change the expected type as well
= note: the full name for the type has been written to 'C:\Users\Pensa\finstaller\target\release\deps\finstaller.long-type-15736134538739630304.txt'
= note: consider using `--verbose` to print the full type name to the console
help: use `?` to coerce and return an appropriate `Err`, and wrap the resulting value in `Ok` so the expression remains of type `Result`
|
357 | Ok(Ok(())?)
| +++ ++Desired output
Rationale and extra context
I don't know whats wrong , but it doesn't feel right , i have been trying to fix the issue for hours now
Other cases
Rust Version
rustc 1.90.0 (1159e78c4 2025-09-14)
binary: rustc
commit-hash: 1159e78c4747b02ef996e55082b704c09b970588
commit-date: 2025-09-14
host: x86_64-pc-windows-msvc
release: 1.90.0
LLVM version: 20.1.8Anything else?
Metadata
Metadata
Assignees
Labels
No labels