Skip to content

Commit

Permalink
Add wget/curl support and refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
taoky committed May 3, 2023
1 parent e80567d commit 28cb39d
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 57 deletions.
22 changes: 11 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
[package]
name = "rsync-speedtest"
version = "0.2.0"
name = "admirror-speedtest"
version = "0.3.0"
edition = "2021"
description = "Test speed (bandwidth) of different bind IP to rsync upstream"
description = "Test speed (bandwidth) of different bind IP to rsync, http(s) and git upstream. Alleviate mirror site admin's trouble choosing fastest bind IP."

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Expand Down
136 changes: 93 additions & 43 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,35 @@
mod spawner;

use std::{
cmp::min,
fs::File,
io::{BufRead, BufReader},
net,
os::unix::process::CommandExt,
process::{self, ExitStatus, Stdio},
process::{self, ExitStatus},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::{Duration, Instant},
};

use clap::Parser;
use clap::{clap_derive::ArgEnum, Parser};
use signal_hook::consts::{SIGINT, SIGTERM};

use crate::spawner::{get_child, get_program_name};

#[derive(Debug, ArgEnum, Clone, Copy)]
pub enum Program {
Rsync,
Wget,
Curl,
Git,
}

#[derive(Parser, Debug)]
#[clap(about, version)]
struct Args {
/// Config file (IP list) path. Default to ~/.rsync-speedtest
/// Config file (IP list) path. Default to ~/.admirror-speedtest or (if not exist) ~/.rsync-speedtest
#[clap(short, long)]
config: Option<String>,

Expand All @@ -34,13 +45,17 @@ struct Args {
#[clap(long)]
tmp_dir: Option<String>,

/// Rsync log file. Default to /dev/null
/// Log file. Default to /dev/null
#[clap(long)]
log: Option<String>,

/// Upstream path. Will be given to rsync
/// Upstream path. Will be given to specified program
#[clap(value_parser)]
upstream: String,

/// Program to use. It will try to detect by default (here curl will be used default for http(s))
#[clap(long, arg_enum)]
program: Option<Program>,
}

struct Ip {
Expand All @@ -56,7 +71,7 @@ fn create_tmp(tmp_dir: &Option<String>) -> mktemp::Temp {
.expect("tmp file created failed")
}

struct RsyncStatus {
struct ProgramStatus {
status: ExitStatus,
time: Duration,
}
Expand All @@ -71,7 +86,7 @@ fn reap_all_children() {
}
}

fn kill_rsync(proc: &mut process::Child) -> ExitStatus {
fn kill_children(proc: &mut process::Child) -> ExitStatus {
// Soundness requirement: the latest try_wait() should return Ok(None)
// Elsewhere libc::kill may kill unrelated processes

Expand All @@ -88,15 +103,19 @@ fn kill_rsync(proc: &mut process::Child) -> ExitStatus {
libc::kill(proc.id() as i32, SIGTERM);
}

let res = proc.wait().expect("rsync wait failed");
let res = proc.wait().expect("program wait() failed");
// if receiver died before generator, the SIGCHLD handler of generator will help reap it
// but we cannot rely on race condition to help do things right
reap_all_children();

res
}

fn wait_timeout(mut proc: process::Child, timeout: Duration, term: Arc<AtomicBool>) -> RsyncStatus {
fn wait_timeout(
mut proc: process::Child,
timeout: Duration,
term: Arc<AtomicBool>,
) -> ProgramStatus {
// Reference adaptable timeout algorithm from
// https://github.com/hniksic/rust-subprocess/blob/5e89ac093f378bcfc03c69bdb1b4bcacf4313ce4/src/popen.rs#L778
// Licensed under MIT & Apache-2.0
Expand All @@ -112,23 +131,23 @@ fn wait_timeout(mut proc: process::Child, timeout: Duration, term: Arc<AtomicBoo
.expect("try waiting for rsync process failed");
match status {
Some(status) => {
return RsyncStatus {
return ProgramStatus {
status,
time: start.elapsed(),
}
}
None => {
if term.load(Ordering::SeqCst) {
let time = start.elapsed();
let status = kill_rsync(&mut proc);
return RsyncStatus { status, time };
let status = kill_children(&mut proc);
return ProgramStatus { status, time };
}

let now = Instant::now();
if now >= deadline {
let time = start.elapsed();
let status = kill_rsync(&mut proc);
return RsyncStatus { status, time };
let status = kill_children(&mut proc);
return ProgramStatus { status, time };
}

let remaining = deadline.duration_since(now);
Expand All @@ -142,19 +161,41 @@ fn wait_timeout(mut proc: process::Child, timeout: Duration, term: Arc<AtomicBoo
fn main() {
let args = Args::parse();
let log = File::create(args.log.unwrap_or_else(|| "/dev/null".to_string()))
.expect("Cannot open rsync log file");
.expect("Cannot open log file");
let term = Arc::new(AtomicBool::new(false));
signal_hook::flag::register(SIGINT, Arc::clone(&term)).expect("Register SIGINT handler failed");
signal_hook::flag::register(SIGTERM, Arc::clone(&term))
.expect("Register SIGTERM handler failed");

// 1. read IP list from args.config
let mut ips: Vec<Ip> = Vec::new();
let config_path = args.config.unwrap_or_else(|| {
let config_path = args.config.clone().unwrap_or_else(|| {
let mut path = dirs::home_dir().unwrap();
path.push(".rsync-speedtest");
path.push(".admirror-speedtest");
path.to_str().unwrap().to_string()
});
let ips_file = File::open(config_path).expect("Cannot open IP list file.");
let mut config_paths = vec![config_path];
if args.config.is_none() {
// Add .rsync-speedtest for backward compatibility
let mut path = dirs::home_dir().unwrap();
path.push(".rsync-speedtest");
config_paths.push(path.to_str().unwrap().to_string());
}

let mut ips_file = None;
for config in config_paths {
if let Ok(file) = File::open(&config) {
ips_file = Some(file);
break;
}
}
let ips_file = match ips_file {
Some(ips_file) => ips_file,
None => {
panic!("Cannot open IP list file.")
}
};

let iterator = BufReader::new(ips_file).lines();
for line in iterator {
let line = line.unwrap();
Expand All @@ -169,7 +210,30 @@ fn main() {
comment: comment.to_string(),
});
}
// 2. run rsync for passes times and collect results
// 2. Detect which program should we run
let program = match args.program {
Some(program) => program,
None => {
// We need to detect by upstream

// Though I don't think anyone will use ALL UPPERCASE here...
let upstream = args.upstream.to_lowercase();
if upstream.starts_with("rsync://") || upstream.contains("::") {
Program::Rsync
} else if upstream.starts_with("http://") || upstream.starts_with("https://") {
if upstream.ends_with(".git") {
Program::Git
} else {
Program::Curl
}
} else if upstream.starts_with("git://") {
Program::Git
} else {
panic!("Cannot detect upstream program. Please specify with --program.")
}
}
};
// 3. run rsync for passes times and collect results
for pass in 0..args.pass {
println!("Pass {}:", pass);
for ip in &ips {
Expand All @@ -180,40 +244,26 @@ fn main() {
}
// create tmp file
let tmp_file = create_tmp(&args.tmp_dir);
let proc = std::process::Command::new("rsync")
.arg("-avP")
.arg("--inplace")
.arg("--address")
.arg(ip.ip.clone())
.arg(args.upstream.clone())
.arg(tmp_file.as_os_str().to_string_lossy().to_string())
.stdin(Stdio::null())
.stdout(Stdio::from(
log.try_clone()
.expect("Clone log file descriptor failed (stdout)"),
))
.stderr(Stdio::from(
log.try_clone()
.expect("Clone log file descriptor failed (stderr)"),
))
.process_group(0) // Don't let rsync receive SIGINT from tty: we handle it ourselves
.spawn()
.expect("Failed to spawn rsync with timeout.");
let proc = get_child(&program, &ip.ip, &args.upstream, &tmp_file, &log);
let rsync_status =
wait_timeout(proc, Duration::from_secs(args.timeout as u64), term.clone());
let status = rsync_status.status;
let duration = rsync_status.time;
let duration_seconds = duration.as_secs_f64();
let mut state_str = {
if duration_seconds > args.timeout as f64 {
"✅ Rsync timeout as expected".to_owned()
format!("✅ {} timeout as expected", get_program_name(&program))
} else {
match status.code() {
Some(code) => match code {
0 => "✅ OK".to_owned(),
_ => format!("❌ Rsync failed with code {}", code),
_ => format!(
"❌ {} failed with code {}",
get_program_name(&program),
code
),
},
None => "❌ Rsync killed by signal".to_owned(),
None => format!("❌ {} killed by signal", get_program_name(&program)),
}
}
};
Expand All @@ -222,7 +272,7 @@ fn main() {
}
// check file size
let size = tmp_file.metadata().unwrap().len();
let bandwidth = size as f64 / duration_seconds as f64; // Bytes / Seconds
let bandwidth = size as f64 / duration_seconds; // Bytes / Seconds
let bandwidth = bandwidth / 1024_f64; // KB/s
println!(
"{} ({}): {} KB/s ({})",
Expand Down
76 changes: 76 additions & 0 deletions src/spawner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use std::{
fs::File,
os::unix::process::CommandExt,
path::Path,
process::{Child, Command, Stdio},
};

use crate::Program;

#[inline]
pub fn get_program_name(program: &Program) -> String {
match program {
Program::Rsync => "rsync",
Program::Wget => "wget",
Program::Curl => "curl",
Program::Git => "git",
}
.to_owned()
}

pub fn get_child(
program: &Program,
bind_ip: &str,
upstream: &str,
tmp_path: &Path,
log_file: &File,
) -> Child {
let mut cmd: Command;
match program {
Program::Rsync => {
cmd = std::process::Command::new("rsync");
cmd.arg("-avP")
.arg("--inplace")
.arg("--address")
.arg(bind_ip)
.arg(upstream)
.arg(tmp_path.as_os_str().to_string_lossy().to_string())
}
Program::Curl => {
cmd = std::process::Command::new("curl");
cmd.arg("-o")
.arg(tmp_path.as_os_str().to_string_lossy().to_string())
.arg("--interface")
.arg(bind_ip)
.arg(upstream)
}
Program::Wget => {
cmd = std::process::Command::new("wget");
cmd.arg("-O")
.arg(tmp_path.as_os_str().to_string_lossy().to_string())
.arg("--bind-address")
.arg(bind_ip)
.arg(upstream)
}
Program::Git => unimplemented!("Git is not supported yet."),
}
.stdin(Stdio::null())
.stdout(Stdio::from(
log_file
.try_clone()
.expect("Clone log file descriptor failed (stdout)"),
))
.stderr(Stdio::from(
log_file
.try_clone()
.expect("Clone log file descriptor failed (stderr)"),
))
.process_group(0) // Don't receive SIGINT from tty: we handle it ourselves (for rsync)
.spawn()
.unwrap_or_else(|_| {
panic!(
"Failed to spawn {} with timeout.",
get_program_name(program)
)
})
}

0 comments on commit 28cb39d

Please sign in to comment.