Skip to content

Commit

Permalink
Added port-pool assignment support, version increased to 0.1.6
Browse files Browse the repository at this point in the history
  • Loading branch information
flan committed Jun 20, 2022
1 parent b999bd5 commit f9163dc
Show file tree
Hide file tree
Showing 8 changed files with 304 additions and 55 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rperf"
version = "0.1.5"
version = "0.1.6"
description = "validates network throughput capacity and reliability"
authors = ["Neil Tallim <neiltallim@3d-p.com>"]
edition = "2018"
Expand Down Expand Up @@ -29,7 +29,7 @@ uuid = {version = "0.8", features = ["v4"]}
#then "cargo deb" to build simple Debian packages for this project
[package.metadata.deb]
maintainer-scripts = "debian-maintainer-scripts/"
copyright = "(C) 2021 Evtech Solutions, Ltd., dba 3D-P"
copyright = "(C) 2022 Evtech Solutions, Ltd., dba 3D-P"
license-file = ["COPYING", "0"]
extended-description = """
Rust-based iperf clone with a number of behavioural fixes and corrections, plus
Expand Down
39 changes: 11 additions & 28 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,34 +77,17 @@ welcome.

## usage

Everything is outlined in the output of `--help` and most users familiar with
similar tools should feel comfortable immediately.

_rperf_ works much like _iperf3_, sharing a lot of concepts and even
command-line flags. One key area where it differs is that the client drives
all of the configuration process while the server just complies to the best
of its ability and provides a stream of results. This means that the server
will not present test-results directly via its interface and also that TCP
and UDP tests can be run against the same instance, potentially by many clients
simultaneously.

In its normal mode of operation, the client will upload data to the server;
when the `reverse` flag is set, the client will receive data.

Unlike _iperf3_, _rperf_ does not make use of a reserved port-range. This is
so it can support an arbitrary number of clients in parallel without
resource contention on what can only practically be a small number of
contiguous ports. In its intended capacity, this shouldn't be a problem, but
it does make `reverse` incompatible with most non-permissive firewalls and
NAT setups.

There also isn't a concept of testing throughput relative to a fixed quantity
of data. Rather, the sole focus is on measuring throughput over a roughly
known period of time.

Also of relevance is that, if the server is running in IPv6 mode and its
host supports IPv4-mapping in a dual-stack configuration, both IPv4 and IPv6
clients can connect to the same instance.
Everything is outlined in the output of `--help` and most users familiar with similar tools should feel comfortable immediately.

_rperf_ works much like _iperf3_, sharing a lot of concepts and even command-line flags. One key area where it differs is that the client drives all of the configuration process while the server just complies to the best of its ability and provides a stream of results. This means that the server will not present test-results directly via its interface and also that TCP and UDP tests can be run against the same instance, potentially by many clients simultaneously.

In its normal mode of operation, the client will upload data to the server; when the `reverse` flag is set, the client will receive data.

Unlike _iperf3_, _rperf_ does not make use of a reserved port-range by default. This is so it can support an arbitrary number of clients in parallel without resource contention on what can only practically be a small number of contiguous ports. In its intended capacity, this shouldn't be a problem, but where non-permissive firewalls and NAT setups are concerned, the `--tcp[6]-port-pool` and `--udp[6]-port-pool` options may be used to allocate non-continguous ports to the set that will be used to receive traffic.

There also isn't a concept of testing throughput relative to a fixed quantity of data. Rather, the sole focus is on measuring throughput over a roughly known period of time.

Also of relevance is that, if the server is running in IPv6 mode and its host supports IPv4-mapping in a dual-stack configuration, both IPv4 and IPv6 clients can connect to the same instance.


## building
Expand Down
13 changes: 11 additions & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,15 @@ pub fn execute(args:ArgMatches) -> BoxResult<()> {
let mut complete = false;

//config-parsing and pre-connection setup
let mut tcp_port_pool = tcp::receiver::TcpPortPool::new(
args.value_of("tcp_port_pool").unwrap().to_string(),
args.value_of("tcp6_port_pool").unwrap().to_string(),
);
let mut udp_port_pool = udp::receiver::UdpPortPool::new(
args.value_of("udp_port_pool").unwrap().to_string(),
args.value_of("udp6_port_pool").unwrap().to_string(),
);

let cpu_affinity_manager = Arc::new(Mutex::new(crate::utils::cpu_affinity::CpuAffinityManager::new(args.value_of("affinity").unwrap())?));

let display_json:bool;
Expand Down Expand Up @@ -199,7 +208,7 @@ pub fn execute(args:ArgMatches) -> BoxResult<()> {
log::debug!("preparing UDP-receiver for stream {}...", stream_idx);
let test = udp::receiver::UdpReceiver::new(
test_definition.clone(), &(stream_idx as u8),
&0,
&mut udp_port_pool,
&server_addr.ip(),
&(download_config["receive_buffer"].as_i64().unwrap() as usize),
)?;
Expand All @@ -214,7 +223,7 @@ pub fn execute(args:ArgMatches) -> BoxResult<()> {
log::debug!("preparing TCP-receiver for stream {}...", stream_idx);
let test = tcp::receiver::TcpReceiver::new(
test_definition.clone(), &(stream_idx as u8),
&0,
&mut tcp_port_pool,
&server_addr.ip(),
&(download_config["receive_buffer"].as_i64().unwrap() as usize),
)?;
Expand Down
32 changes: 32 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,38 @@ fn main() {
.short("N")
.required(false)
)
.arg(
Arg::with_name("tcp_port_pool")
.help("an optional pool of IPv4 TCP ports over which data will be accepted; if omitted, any OS-assignable port is used; format: 1-10,19,21")
.takes_value(true)
.long("tcp-port-pool")
.required(false)
.default_value("")
)
.arg(
Arg::with_name("tcp6_port_pool")
.help("an optional pool of IPv6 TCP ports over which data will be accepted; if omitted, any OS-assignable port is used; format: 1-10,19,21")
.takes_value(true)
.long("tcp6-port-pool")
.required(false)
.default_value("")
)
.arg(
Arg::with_name("udp_port_pool")
.help("an optional pool of IPv4 UDP ports over which data will be accepted; if omitted, any OS-assignable port is used; format: 1-10,19,21")
.takes_value(true)
.long("udp-port-pool")
.required(false)
.default_value("")
)
.arg(
Arg::with_name("udp6_port_pool")
.help("an optional pool of IPv6 UDP ports over which data will be accepted; if omitted, any OS-assignable port is used; format: 1-10,19,21")
.takes_value(true)
.long("udp6-port-pool")
.required(false)
.default_value("")
)
.get_matches();

let mut env = env_logger::Env::default()
Expand Down
30 changes: 25 additions & 5 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* You should have received a copy of the GNU General Public License
* along with rperf. If not, see <https://www.gnu.org/licenses/>.
*/

use std::error::Error;
use std::io;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, Shutdown, SocketAddr};
Expand Down Expand Up @@ -53,7 +53,12 @@ static ALIVE:AtomicBool = AtomicBool::new(true);
static CLIENTS:AtomicU16 = AtomicU16::new(0);


fn handle_client(stream:&mut TcpStream, cpu_affinity_manager:Arc<Mutex<crate::utils::cpu_affinity::CpuAffinityManager>>) -> BoxResult<()> {
fn handle_client(
stream:&mut TcpStream,
cpu_affinity_manager:Arc<Mutex<crate::utils::cpu_affinity::CpuAffinityManager>>,
tcp_port_pool:Arc<Mutex<tcp::receiver::TcpPortPool>>,
udp_port_pool:Arc<Mutex<udp::receiver::UdpPortPool>>,
) -> BoxResult<()> {
let mut started = false;
let peer_addr = stream.peer_addr()?;

Expand Down Expand Up @@ -95,12 +100,14 @@ fn handle_client(stream:&mut TcpStream, cpu_affinity_manager:Arc<Mutex<crate::ut
if payload.get("family").unwrap_or(&serde_json::json!("tcp")).as_str().unwrap() == "udp" {
log::info!("[{}] preparing for UDP test with {} streams...", &peer_addr, stream_count);

let mut c_udp_port_pool = udp_port_pool.lock().unwrap();

let test_definition = udp::UdpTestDefinition::new(&payload)?;
for stream_idx in 0..stream_count {
log::debug!("[{}] preparing UDP-receiver for stream {}...", &peer_addr, stream_idx);
let test = udp::receiver::UdpReceiver::new(
test_definition.clone(), &(stream_idx as u8),
&0,
&mut c_udp_port_pool,
&peer_addr.ip(),
&(payload["receive_buffer"].as_i64().unwrap() as usize),
)?;
Expand All @@ -110,12 +117,14 @@ fn handle_client(stream:&mut TcpStream, cpu_affinity_manager:Arc<Mutex<crate::ut
} else { //TCP
log::info!("[{}] preparing for TCP test with {} streams...", &peer_addr, stream_count);

let mut c_tcp_port_pool = tcp_port_pool.lock().unwrap();

let test_definition = tcp::TcpTestDefinition::new(&payload)?;
for stream_idx in 0..stream_count {
log::debug!("[{}] preparing TCP-receiver for stream {}...", &peer_addr, stream_idx);
let test = tcp::receiver::TcpReceiver::new(
test_definition.clone(), &(stream_idx as u8),
&0,
&mut c_tcp_port_pool,
&peer_addr.ip(),
&(payload["receive_buffer"].as_i64().unwrap() as usize),
)?;
Expand Down Expand Up @@ -274,6 +283,15 @@ impl Drop for ClientThreadMonitor {

pub fn serve(args:ArgMatches) -> BoxResult<()> {
//config-parsing and pre-connection setup
let tcp_port_pool = Arc::new(Mutex::new(tcp::receiver::TcpPortPool::new(
args.value_of("tcp_port_pool").unwrap().to_string(),
args.value_of("tcp6_port_pool").unwrap().to_string(),
)));
let udp_port_pool = Arc::new(Mutex::new(udp::receiver::UdpPortPool::new(
args.value_of("udp_port_pool").unwrap().to_string(),
args.value_of("udp6_port_pool").unwrap().to_string(),
)));

let cpu_affinity_manager = Arc::new(Mutex::new(crate::utils::cpu_affinity::CpuAffinityManager::new(args.value_of("affinity").unwrap())?));

let client_limit:u16 = args.value_of("client_limit").unwrap().parse()?;
Expand Down Expand Up @@ -320,6 +338,8 @@ pub fn serve(args:ArgMatches) -> BoxResult<()> {
CLIENTS.fetch_sub(1, Ordering::Relaxed);
} else {
let c_cam = cpu_affinity_manager.clone();
let c_tcp_port_pool = tcp_port_pool.clone();
let c_udp_port_pool = udp_port_pool.clone();
let thread_builder = thread::Builder::new()
.name(address.to_string().into());
thread_builder.spawn(move || {
Expand All @@ -328,7 +348,7 @@ pub fn serve(args:ArgMatches) -> BoxResult<()> {
client_address: address.to_string(),
};

match handle_client(&mut stream, c_cam) {
match handle_client(&mut stream, c_cam, c_tcp_port_pool, c_udp_port_pool) {
Ok(_) => (),
Err(e) => log::error!("error in client-handler: {}", e),
}
Expand Down
23 changes: 23 additions & 0 deletions src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,26 @@ pub trait TestStream {
/// stops a running test
fn stop(&mut self);
}

fn parse_port_spec(port_spec:String) -> Vec<u16> {
let mut ports = Vec::<u16>::new();
if !port_spec.is_empty() {
for range in port_spec.split(',') {
if range.contains('-') {
let mut range_spec = range.split('-');
let range_first = range_spec.next().unwrap().parse::<u16>().unwrap();
let range_last = range_spec.last().unwrap().parse::<u16>().unwrap();

for port in range_first..=range_last {
ports.push(port);
}
} else {
ports.push(range.parse::<u16>().unwrap());
}
}

ports.sort();
}

return ports;
}

0 comments on commit f9163dc

Please sign in to comment.