diff --git a/Cargo.toml b/Cargo.toml index 717ded6..f370a29 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rperf" -version = "0.1.5" +version = "0.1.6" description = "validates network throughput capacity and reliability" authors = ["Neil Tallim "] edition = "2018" @@ -29,7 +29,7 @@ uuid = {version = "0.8", features = ["v4"]} #then "cargo deb" to build simple Debian packages for this project [package.metadata.deb] maintainer-scripts = "debian-maintainer-scripts/" -copyright = "(C) 2021 Evtech Solutions, Ltd., dba 3D-P" +copyright = "(C) 2022 Evtech Solutions, Ltd., dba 3D-P" license-file = ["COPYING", "0"] extended-description = """ Rust-based iperf clone with a number of behavioural fixes and corrections, plus diff --git a/README.md b/README.md index dd9ac3c..506c567 100644 --- a/README.md +++ b/README.md @@ -77,34 +77,17 @@ welcome. ## usage -Everything is outlined in the output of `--help` and most users familiar with -similar tools should feel comfortable immediately. - -_rperf_ works much like _iperf3_, sharing a lot of concepts and even -command-line flags. One key area where it differs is that the client drives -all of the configuration process while the server just complies to the best -of its ability and provides a stream of results. This means that the server -will not present test-results directly via its interface and also that TCP -and UDP tests can be run against the same instance, potentially by many clients -simultaneously. - -In its normal mode of operation, the client will upload data to the server; -when the `reverse` flag is set, the client will receive data. - -Unlike _iperf3_, _rperf_ does not make use of a reserved port-range. This is -so it can support an arbitrary number of clients in parallel without -resource contention on what can only practically be a small number of -contiguous ports. In its intended capacity, this shouldn't be a problem, but -it does make `reverse` incompatible with most non-permissive firewalls and -NAT setups. - -There also isn't a concept of testing throughput relative to a fixed quantity -of data. Rather, the sole focus is on measuring throughput over a roughly -known period of time. - -Also of relevance is that, if the server is running in IPv6 mode and its -host supports IPv4-mapping in a dual-stack configuration, both IPv4 and IPv6 -clients can connect to the same instance. +Everything is outlined in the output of `--help` and most users familiar with similar tools should feel comfortable immediately. + +_rperf_ works much like _iperf3_, sharing a lot of concepts and even command-line flags. One key area where it differs is that the client drives all of the configuration process while the server just complies to the best of its ability and provides a stream of results. This means that the server will not present test-results directly via its interface and also that TCP and UDP tests can be run against the same instance, potentially by many clients simultaneously. + +In its normal mode of operation, the client will upload data to the server; when the `reverse` flag is set, the client will receive data. + +Unlike _iperf3_, _rperf_ does not make use of a reserved port-range by default. This is so it can support an arbitrary number of clients in parallel without resource contention on what can only practically be a small number of contiguous ports. In its intended capacity, this shouldn't be a problem, but where non-permissive firewalls and NAT setups are concerned, the `--tcp[6]-port-pool` and `--udp[6]-port-pool` options may be used to allocate non-continguous ports to the set that will be used to receive traffic. + +There also isn't a concept of testing throughput relative to a fixed quantity of data. Rather, the sole focus is on measuring throughput over a roughly known period of time. + +Also of relevance is that, if the server is running in IPv6 mode and its host supports IPv4-mapping in a dual-stack configuration, both IPv4 and IPv6 clients can connect to the same instance. ## building diff --git a/src/client.rs b/src/client.rs index 33dbbbc..797c540 100644 --- a/src/client.rs +++ b/src/client.rs @@ -99,6 +99,15 @@ pub fn execute(args:ArgMatches) -> BoxResult<()> { let mut complete = false; //config-parsing and pre-connection setup + let mut tcp_port_pool = tcp::receiver::TcpPortPool::new( + args.value_of("tcp_port_pool").unwrap().to_string(), + args.value_of("tcp6_port_pool").unwrap().to_string(), + ); + let mut udp_port_pool = udp::receiver::UdpPortPool::new( + args.value_of("udp_port_pool").unwrap().to_string(), + args.value_of("udp6_port_pool").unwrap().to_string(), + ); + let cpu_affinity_manager = Arc::new(Mutex::new(crate::utils::cpu_affinity::CpuAffinityManager::new(args.value_of("affinity").unwrap())?)); let display_json:bool; @@ -199,7 +208,7 @@ pub fn execute(args:ArgMatches) -> BoxResult<()> { log::debug!("preparing UDP-receiver for stream {}...", stream_idx); let test = udp::receiver::UdpReceiver::new( test_definition.clone(), &(stream_idx as u8), - &0, + &mut udp_port_pool, &server_addr.ip(), &(download_config["receive_buffer"].as_i64().unwrap() as usize), )?; @@ -214,7 +223,7 @@ pub fn execute(args:ArgMatches) -> BoxResult<()> { log::debug!("preparing TCP-receiver for stream {}...", stream_idx); let test = tcp::receiver::TcpReceiver::new( test_definition.clone(), &(stream_idx as u8), - &0, + &mut tcp_port_pool, &server_addr.ip(), &(download_config["receive_buffer"].as_i64().unwrap() as usize), )?; diff --git a/src/main.rs b/src/main.rs index 48e50d4..56215a6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -201,6 +201,38 @@ fn main() { .short("N") .required(false) ) + .arg( + Arg::with_name("tcp_port_pool") + .help("an optional pool of IPv4 TCP ports over which data will be accepted; if omitted, any OS-assignable port is used; format: 1-10,19,21") + .takes_value(true) + .long("tcp-port-pool") + .required(false) + .default_value("") + ) + .arg( + Arg::with_name("tcp6_port_pool") + .help("an optional pool of IPv6 TCP ports over which data will be accepted; if omitted, any OS-assignable port is used; format: 1-10,19,21") + .takes_value(true) + .long("tcp6-port-pool") + .required(false) + .default_value("") + ) + .arg( + Arg::with_name("udp_port_pool") + .help("an optional pool of IPv4 UDP ports over which data will be accepted; if omitted, any OS-assignable port is used; format: 1-10,19,21") + .takes_value(true) + .long("udp-port-pool") + .required(false) + .default_value("") + ) + .arg( + Arg::with_name("udp6_port_pool") + .help("an optional pool of IPv6 UDP ports over which data will be accepted; if omitted, any OS-assignable port is used; format: 1-10,19,21") + .takes_value(true) + .long("udp6-port-pool") + .required(false) + .default_value("") + ) .get_matches(); let mut env = env_logger::Env::default() diff --git a/src/server.rs b/src/server.rs index 64bc9ca..7256226 100644 --- a/src/server.rs +++ b/src/server.rs @@ -17,7 +17,7 @@ * You should have received a copy of the GNU General Public License * along with rperf. If not, see . */ - + use std::error::Error; use std::io; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, Shutdown, SocketAddr}; @@ -53,7 +53,12 @@ static ALIVE:AtomicBool = AtomicBool::new(true); static CLIENTS:AtomicU16 = AtomicU16::new(0); -fn handle_client(stream:&mut TcpStream, cpu_affinity_manager:Arc>) -> BoxResult<()> { +fn handle_client( + stream:&mut TcpStream, + cpu_affinity_manager:Arc>, + tcp_port_pool:Arc>, + udp_port_pool:Arc>, +) -> BoxResult<()> { let mut started = false; let peer_addr = stream.peer_addr()?; @@ -95,12 +100,14 @@ fn handle_client(stream:&mut TcpStream, cpu_affinity_manager:Arc BoxResult<()> { //config-parsing and pre-connection setup + let tcp_port_pool = Arc::new(Mutex::new(tcp::receiver::TcpPortPool::new( + args.value_of("tcp_port_pool").unwrap().to_string(), + args.value_of("tcp6_port_pool").unwrap().to_string(), + ))); + let udp_port_pool = Arc::new(Mutex::new(udp::receiver::UdpPortPool::new( + args.value_of("udp_port_pool").unwrap().to_string(), + args.value_of("udp6_port_pool").unwrap().to_string(), + ))); + let cpu_affinity_manager = Arc::new(Mutex::new(crate::utils::cpu_affinity::CpuAffinityManager::new(args.value_of("affinity").unwrap())?)); let client_limit:u16 = args.value_of("client_limit").unwrap().parse()?; @@ -320,6 +338,8 @@ pub fn serve(args:ArgMatches) -> BoxResult<()> { CLIENTS.fetch_sub(1, Ordering::Relaxed); } else { let c_cam = cpu_affinity_manager.clone(); + let c_tcp_port_pool = tcp_port_pool.clone(); + let c_udp_port_pool = udp_port_pool.clone(); let thread_builder = thread::Builder::new() .name(address.to_string().into()); thread_builder.spawn(move || { @@ -328,7 +348,7 @@ pub fn serve(args:ArgMatches) -> BoxResult<()> { client_address: address.to_string(), }; - match handle_client(&mut stream, c_cam) { + match handle_client(&mut stream, c_cam, c_tcp_port_pool, c_udp_port_pool) { Ok(_) => (), Err(e) => log::error!("error in client-handler: {}", e), } diff --git a/src/stream/mod.rs b/src/stream/mod.rs index fa507b1..62e768a 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -39,3 +39,26 @@ pub trait TestStream { /// stops a running test fn stop(&mut self); } + +fn parse_port_spec(port_spec:String) -> Vec { + let mut ports = Vec::::new(); + if !port_spec.is_empty() { + for range in port_spec.split(',') { + if range.contains('-') { + let mut range_spec = range.split('-'); + let range_first = range_spec.next().unwrap().parse::().unwrap(); + let range_last = range_spec.last().unwrap().parse::().unwrap(); + + for port in range_first..=range_last { + ports.push(port); + } + } else { + ports.push(range.parse::().unwrap()); + } + } + + ports.sort(); + } + + return ports; +} diff --git a/src/stream/tcp.rs b/src/stream/tcp.rs index 973566d..8aa71dd 100644 --- a/src/stream/tcp.rs +++ b/src/stream/tcp.rs @@ -17,14 +17,14 @@ * You should have received a copy of the GNU General Public License * along with rperf. If not, see . */ - + extern crate nix; use nix::sys::socket::{setsockopt, sockopt::RcvBuf, sockopt::SndBuf}; use crate::protocol::results::{IntervalResult, TcpReceiveResult, TcpSendResult, get_unix_timestamp}; -use super::{INTERVAL, TestStream}; +use super::{INTERVAL, TestStream, parse_port_spec}; use std::error::Error; type BoxResult = Result>; @@ -68,6 +68,7 @@ pub mod receiver { use std::io::Read; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::os::unix::io::AsRawFd; + use std::sync::{Mutex}; use std::time::{Duration, Instant}; use mio::net::{TcpListener, TcpStream}; @@ -76,6 +77,101 @@ pub mod receiver { const POLL_TIMEOUT:Duration = Duration::from_millis(250); const RECEIVE_TIMEOUT:Duration = Duration::from_secs(3); + pub struct TcpPortPool { + pub ports_ip4: Vec, + pub ports_ip6: Vec, + pos_ip4: usize, + pos_ip6: usize, + lock_ip4: Mutex, + lock_ip6: Mutex, + } + impl TcpPortPool { + pub fn new(port_spec:String, port_spec6:String) -> TcpPortPool { + let ports = super::parse_port_spec(port_spec); + if !ports.is_empty() { + log::debug!("configured IPv4 TCP port pool: {:?}", ports); + } else { + log::debug!("using OS assignment for IPv4 TCP ports"); + } + + let ports6 = super::parse_port_spec(port_spec6); + if !ports.is_empty() { + log::debug!("configured IPv6 TCP port pool: {:?}", ports6); + } else { + log::debug!("using OS assignment for IPv6 TCP ports"); + } + + TcpPortPool { + ports_ip4: ports, + pos_ip4: 0, + lock_ip4: Mutex::new(0), + + ports_ip6: ports6, + pos_ip6: 0, + lock_ip6: Mutex::new(0), + } + } + + pub fn bind(&mut self, peer_ip:&IpAddr) -> super::BoxResult { + match peer_ip { + IpAddr::V6(_) => { + if self.ports_ip6.is_empty() { + return Ok(TcpListener::bind(&SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0)).expect(format!("failed to bind OS-assigned IPv6 TCP socket").as_str())); + } else { + let _guard = self.lock_ip6.lock().unwrap(); + + for port_idx in (self.pos_ip6 + 1)..self.ports_ip6.len() { //iterate to the end of the pool; this will skip the first element in the pool initially, but that's fine + let listener_result = TcpListener::bind(&SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), self.ports_ip6[port_idx])); + if listener_result.is_ok() { + self.pos_ip6 = port_idx; + return Ok(listener_result.unwrap()); + } else { + log::warn!("unable to bind IPv6 TCP port {}", self.ports_ip6[port_idx]); + } + } + for port_idx in 0..=self.pos_ip6 { //circle back to where the search started + let listener_result = TcpListener::bind(&SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), self.ports_ip6[port_idx])); + if listener_result.is_ok() { + self.pos_ip6 = port_idx; + return Ok(listener_result.unwrap()); + } else { + log::warn!("unable to bind IPv6 TCP port {}", self.ports_ip6[port_idx]); + } + } + } + return Err(Box::new(simple_error::simple_error!("unable to allocate IPv6 TCP port"))); + }, + IpAddr::V4(_) => { + if self.ports_ip4.is_empty() { + return Ok(TcpListener::bind(&SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0)).expect(format!("failed to bind OS-assigned IPv4 TCP socket").as_str())); + } else { + let _guard = self.lock_ip4.lock().unwrap(); + + for port_idx in (self.pos_ip4 + 1)..self.ports_ip4.len() { //iterate to the end of the pool; this will skip the first element in the pool initially, but that's fine + let listener_result = TcpListener::bind(&SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), self.ports_ip4[port_idx])); + if listener_result.is_ok() { + self.pos_ip4 = port_idx; + return Ok(listener_result.unwrap()); + } else { + log::warn!("unable to bind IPv4 TCP port {}", self.ports_ip4[port_idx]); + } + } + for port_idx in 0..=self.pos_ip4 { //circle back to where the search started + let listener_result = TcpListener::bind(&SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), self.ports_ip4[port_idx])); + if listener_result.is_ok() { + self.pos_ip4 = port_idx; + return Ok(listener_result.unwrap()); + } else { + log::warn!("unable to bind IPv4 TCP port {}", self.ports_ip4[port_idx]); + } + } + } + return Err(Box::new(simple_error::simple_error!("unable to allocate IPv4 TCP port"))); + }, + }; + } + } + pub struct TcpReceiver { active: bool, test_definition: super::TcpTestDefinition, @@ -89,17 +185,9 @@ pub mod receiver { receive_buffer: usize, } impl TcpReceiver { - pub fn new(test_definition:super::TcpTestDefinition, stream_idx:&u8, port:&u16, peer_ip:&IpAddr, receive_buffer:&usize) -> super::BoxResult { + pub fn new(test_definition:super::TcpTestDefinition, stream_idx:&u8, port_pool:&mut TcpPortPool, peer_ip:&IpAddr, receive_buffer:&usize) -> super::BoxResult { log::debug!("binding TCP listener for stream {}...", stream_idx); - let listener:TcpListener; - match peer_ip { - IpAddr::V6(_) => { - listener = TcpListener::bind(&SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), *port)).expect(format!("failed to bind TCP socket, port {}", port).as_str()); - }, - IpAddr::V4(_) => { - listener = TcpListener::bind(&SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), *port)).expect(format!("failed to bind TCP socket, port {}", port).as_str()); - }, - }; + let listener:TcpListener = port_pool.bind(peer_ip).expect(format!("failed to bind TCP socket").as_str()); log::debug!("bound TCP listener for stream {}: {}", stream_idx, listener.local_addr()?); let mio_poll_token = Token(0); diff --git a/src/stream/udp.rs b/src/stream/udp.rs index 8696520..e7e54da 100644 --- a/src/stream/udp.rs +++ b/src/stream/udp.rs @@ -27,7 +27,7 @@ use nix::sys::socket::{setsockopt, sockopt::RcvBuf, sockopt::SndBuf}; use crate::protocol::results::{IntervalResult, UdpReceiveResult, UdpSendResult, get_unix_timestamp}; -use super::{INTERVAL, TestStream}; +use super::{INTERVAL, TestStream, parse_port_spec}; type BoxResult = Result>; @@ -70,6 +70,7 @@ pub mod receiver { use std::convert::TryInto; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::os::unix::io::AsRawFd; + use std::sync::{Mutex}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use chrono::{NaiveDateTime}; @@ -79,6 +80,102 @@ pub mod receiver { const READ_TIMEOUT:Duration = Duration::from_millis(50); const RECEIVE_TIMEOUT:Duration = Duration::from_secs(3); + pub struct UdpPortPool { + pub ports_ip4: Vec, + pos_ip4: usize, + lock_ip4: Mutex, + + pub ports_ip6: Vec, + pos_ip6: usize, + lock_ip6: Mutex, + } + impl UdpPortPool { + pub fn new(port_spec:String, port_spec6:String) -> UdpPortPool { + let ports = super::parse_port_spec(port_spec); + if !ports.is_empty() { + log::debug!("configured IPv4 UDP port pool: {:?}", ports); + } else { + log::debug!("using OS assignment for IPv4 UDP ports"); + } + + let ports6 = super::parse_port_spec(port_spec6); + if !ports.is_empty() { + log::debug!("configured IPv6 UDP port pool: {:?}", ports6); + } else { + log::debug!("using OS assignment for IPv6 UDP ports"); + } + + UdpPortPool { + ports_ip4: ports, + pos_ip4: 0, + lock_ip4: Mutex::new(0), + + ports_ip6: ports6, + pos_ip6: 0, + lock_ip6: Mutex::new(0), + } + } + + pub fn bind(&mut self, peer_ip:&IpAddr) -> super::BoxResult { + match peer_ip { + IpAddr::V6(_) => { + if self.ports_ip6.is_empty() { + return Ok(UdpSocket::bind(&SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0)).expect(format!("failed to bind OS-assigned IPv6 UDP socket").as_str())); + } else { + let _guard = self.lock_ip6.lock().unwrap(); + + for port_idx in (self.pos_ip6 + 1)..self.ports_ip6.len() { //iterate to the end of the pool; this will skip the first element in the pool initially, but that's fine + let listener_result = UdpSocket::bind(&SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), self.ports_ip6[port_idx])); + if listener_result.is_ok() { + self.pos_ip6 = port_idx; + return Ok(listener_result.unwrap()); + } else { + log::warn!("unable to bind IPv6 UDP port {}", self.ports_ip6[port_idx]); + } + } + for port_idx in 0..=self.pos_ip6 { //circle back to where the search started + let listener_result = UdpSocket::bind(&SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), self.ports_ip6[port_idx])); + if listener_result.is_ok() { + self.pos_ip6 = port_idx; + return Ok(listener_result.unwrap()); + } else { + log::warn!("unable to bind IPv6 UDP port {}", self.ports_ip6[port_idx]); + } + } + } + return Err(Box::new(simple_error::simple_error!("unable to allocate IPv6 UDP port"))); + }, + IpAddr::V4(_) => { + if self.ports_ip4.is_empty() { + return Ok(UdpSocket::bind(&SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0)).expect(format!("failed to bind OS-assigned IPv4 UDP socket").as_str())); + } else { + let _guard = self.lock_ip4.lock().unwrap(); + + for port_idx in (self.pos_ip4 + 1)..self.ports_ip4.len() { //iterate to the end of the pool; this will skip the first element in the pool initially, but that's fine + let listener_result = UdpSocket::bind(&SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), self.ports_ip4[port_idx])); + if listener_result.is_ok() { + self.pos_ip4 = port_idx; + return Ok(listener_result.unwrap()); + } else { + log::warn!("unable to bind IPv4 UDP port {}", self.ports_ip4[port_idx]); + } + } + for port_idx in 0..=self.pos_ip4 { //circle back to where the search started + let listener_result = UdpSocket::bind(&SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), self.ports_ip4[port_idx])); + if listener_result.is_ok() { + self.pos_ip4 = port_idx; + return Ok(listener_result.unwrap()); + } else { + log::warn!("unable to bind IPv4 UDP port {}", self.ports_ip4[port_idx]); + } + } + } + return Err(Box::new(simple_error::simple_error!("unable to allocate IPv4 UDP port"))); + }, + }; + } + } + struct UdpReceiverIntervalHistory { packets_received: u64, @@ -100,12 +197,9 @@ pub mod receiver { socket: UdpSocket, } impl UdpReceiver { - pub fn new(test_definition:super::UdpTestDefinition, stream_idx:&u8, port:&u16, peer_ip:&IpAddr, receive_buffer:&usize) -> super::BoxResult { + pub fn new(test_definition:super::UdpTestDefinition, stream_idx:&u8, port_pool:&mut UdpPortPool, peer_ip:&IpAddr, receive_buffer:&usize) -> super::BoxResult { log::debug!("binding UDP receive socket for stream {}...", stream_idx); - let socket = match peer_ip { - IpAddr::V6(_) => UdpSocket::bind(&SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), *port)).expect(format!("failed to bind UDP socket, port {}", port).as_str()), - IpAddr::V4(_) => UdpSocket::bind(&SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), *port)).expect(format!("failed to bind UDP socket, port {}", port).as_str()), - }; + let socket:UdpSocket = port_pool.bind(peer_ip).expect(format!("failed to bind UDP socket").as_str()); socket.set_read_timeout(Some(READ_TIMEOUT))?; if !cfg!(windows) { //NOTE: features unsupported on Windows if *receive_buffer != 0 {