Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## Release 0.12.1
- *WebSocket* now can returns when send correctly a `SendStatus::MaxPacketSizeExceeded` instead of `ResourceNotFound` if the packet size is exceeded.
- *UDP* has increases the packet size when send.
Now more bytes per packet can be sent if the OS let it.
- Exported some adapter constants dependent.
- `Transport::max_message_size()` now represents the teorical maximum size (see its related docs).

## Release 0.12.0
- Node concept: `NodeHandler` and `NodeListener`.
- Non-mutable and shared network operations.
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "message-io"
version = "0.12.0"
version = "0.12.1"
authors = ["lemunozm <lemunozm@gmail.com>"]
edition = "2018"
readme = "README.md"
Expand Down
55 changes: 2 additions & 53 deletions benches/performance.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use message_io::network::{self, Transport, NetworkController, NetworkProcessor, Endpoint};
use message_io::util::thread::{NamespacedThread};

use criterion::{criterion_group, criterion_main, Criterion, BenchmarkId, Throughput};
use criterion::{criterion_group, criterion_main, Criterion};

use std::time::{Duration};
use std::sync::{
Expand Down Expand Up @@ -49,44 +49,6 @@ fn latency_by(c: &mut Criterion, transport: Transport) {
});
}

fn throughput_by(c: &mut Criterion, transport: Transport) {
let sizes = [1, 2, 4, 8, 16, 32, 64, 128]
.iter()
.map(|i| i * 1024)
.filter(|&size| size < transport.max_message_size());

for block_size in sizes {
let mut group = c.benchmark_group(format!("throughput by {}", transport));
group.throughput(Throughput::Bytes(block_size as u64));
group.bench_with_input(BenchmarkId::from_parameter(block_size), &block_size, |b, &size| {
let (controller, mut processor, endpoint) = init_connection(transport);

let thread_running = Arc::new(AtomicBool::new(true));
let running = thread_running.clone();
let (tx, rx) = std::sync::mpsc::channel();
let mut thread = NamespacedThread::spawn("perf-sender", move || {
let message = (0..size).map(|_| 0xFF).collect::<Vec<u8>>();
tx.send(()).unwrap(); // receiving thread ready
while running.load(Ordering::Relaxed) {
controller.send(endpoint, &message);
}
});
rx.recv().unwrap();

b.iter(|| {
// FIX IT:
// Because the sender do not stop sends, the receiver has always data.
// This means that only one poll event is generated for all messages, and
// process_poll_event will call the callback continuously without ends.
processor.process_poll_event(Some(*TIMEOUT), |_| ());
});

thread_running.store(true, Ordering::Relaxed);
thread.join();
});
}
}

fn latency(c: &mut Criterion) {
#[cfg(feature = "udp")]
latency_by(c, Transport::Udp);
Expand All @@ -98,18 +60,5 @@ fn latency(c: &mut Criterion) {
latency_by(c, Transport::Ws);
}

#[allow(dead_code)] //TODO: remove when the throughput test works fine
fn throughput(c: &mut Criterion) {
#[cfg(feature = "udp")]
throughput_by(c, Transport::Udp);
// TODO: Fix this test: How to read inside of criterion iter() an stream protocol?
// #[cfg(feature = "tcp")]
// throughput_by(c, Transport::Tcp);
#[cfg(feature = "tcp")]
throughput_by(c, Transport::FramedTcp);
#[cfg(feature = "websocket")]
throughput_by(c, Transport::Ws);
}

criterion_group!(benches, latency /*throughput*/,);
criterion_group!(benches, latency);
criterion_main!(benches);
2 changes: 1 addition & 1 deletion src/adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ pub mod framed_tcp;
#[cfg(feature = "udp")]
pub mod udp;
#[cfg(feature = "websocket")]
pub mod web_socket;
pub mod ws;
// Add new adapters here
// ...
13 changes: 4 additions & 9 deletions src/adapters/framed_tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,15 @@ use std::ops::{Deref};
use std::cell::{RefCell};
use std::mem::{MaybeUninit};

const INPUT_BUFFER_SIZE: usize = 65535; // 2^16 - 1
const INPUT_BUFFER_SIZE: usize = u16::MAX as usize; // 2^16 - 1

/// The max packet value for tcp.
/// Although this size is very high, it is preferred send data in smaller chunks with a rate
/// to not saturate the receiver thread in the endpoint.
pub const MAX_TCP_PAYLOAD_LEN: usize = usize::MAX;

pub struct FramedTcpAdapter;
pub(crate) struct FramedTcpAdapter;
impl Adapter for FramedTcpAdapter {
type Remote = RemoteResource;
type Local = LocalResource;
}

pub struct RemoteResource {
pub(crate) struct RemoteResource {
stream: TcpStream,
decoder: RefCell<Decoder>,
}
Expand Down Expand Up @@ -119,7 +114,7 @@ impl Remote for RemoteResource {
}
}

pub struct LocalResource {
pub(crate) struct LocalResource {
listener: TcpListener,
}

Expand Down
11 changes: 7 additions & 4 deletions src/adapters/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,18 @@ use std::io::{self, ErrorKind, Read, Write};
use std::ops::{Deref};
use std::mem::{MaybeUninit};

pub const INPUT_BUFFER_SIZE: usize = 65535; // 2^16 - 1
/// Size of the internal reading buffer.
/// It implies that at most the generated [`crate::network::NetEvent::Message`]
/// will contains a chunk of data of this value.
pub const INPUT_BUFFER_SIZE: usize = u16::MAX as usize; // 2^16 - 1

pub struct TcpAdapter;
pub(crate) struct TcpAdapter;
impl Adapter for TcpAdapter {
type Remote = RemoteResource;
type Local = LocalResource;
}

pub struct RemoteResource {
pub(crate) struct RemoteResource {
stream: TcpStream,
}

Expand Down Expand Up @@ -97,7 +100,7 @@ impl Remote for RemoteResource {
}
}

pub struct LocalResource {
pub(crate) struct LocalResource {
listener: TcpListener,
}

Expand Down
6 changes: 3 additions & 3 deletions src/adapters/template.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ use mio::event::{Source};
use std::net::{SocketAddr};
use std::io::{self};

pub struct MyAdapter;
pub(crate) struct MyAdapter;
impl Adapter for MyAdapter {
type Remote = RemoteResource;
type Local = LocalResource;
}

pub struct RemoteResource;
pub(crate) struct RemoteResource;
impl Resource for RemoteResource {
fn source(&mut self) -> &mut dyn Source {
todo!();
Expand All @@ -38,7 +38,7 @@ impl Remote for RemoteResource {
}
}

pub struct LocalResource;
pub(crate) struct LocalResource;
impl Resource for LocalResource {
fn source(&mut self) -> &mut dyn Source {
todo!();
Expand Down
66 changes: 34 additions & 32 deletions src/adapters/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,26 @@ use std::net::{SocketAddr, SocketAddrV4, Ipv4Addr};
use std::io::{self, ErrorKind};
use std::mem::{MaybeUninit};

/// Maximun payload that a UDP packet can send safety in main OS.
/// - 9216: MTU of the OS with the minimun MTU: OSX
/// - 20: max IP header
/// - 8: max udp header
/// The serialization of your message must not exceed this value.
pub const MAX_UDP_PAYLOAD_LEN: usize = 9216 - 20 - 8;
/// Maximun payload that UDP can send.
/// The following payload works on Linux and Windows, but overcome the MacOS limits.
/// To more safety limit, see: `MAX_COMPATIBLE_UDP_PAYLOAD_LEN`.
// - 20: max IP header
// - 8: max udp header
pub const MAX_PAYLOAD_LEN: usize = 65535 - 20 - 8;

// The reception buffer can reach the UDP standard size.
const INPUT_BUFFER_SIZE: usize = 65535 - 20 - 8;
/// Maximun payload that UDP can send safety in main OS.
// 9216: MTU of the OS with the minimun MTU: OSX
pub const MAX_COMPATIBLE_PAYLOAD_LEN: usize = 9216 - 20 - 8;

pub struct UdpAdapter;
const INPUT_BUFFER_SIZE: usize = 65535; // 2^16 - 1

pub(crate) struct UdpAdapter;
impl Adapter for UdpAdapter {
type Remote = RemoteResource;
type Local = LocalResource;
}

pub struct RemoteResource {
pub(crate) struct RemoteResource {
socket: UdpSocket,
}

Expand Down Expand Up @@ -75,7 +78,7 @@ impl Remote for RemoteResource {
}
}

pub struct LocalResource {
pub(crate) struct LocalResource {
socket: UdpSocket,
}

Expand Down Expand Up @@ -136,28 +139,27 @@ impl Drop for LocalResource {
}

fn send_packet(data: &[u8], send_method: impl Fn(&[u8]) -> io::Result<usize>) -> SendStatus {
if data.len() > MAX_UDP_PAYLOAD_LEN {
log::error!(
"The UDP message could not be sent because it exceeds the MTU. \
Current size: {}, MTU: {}",
data.len(),
MAX_UDP_PAYLOAD_LEN
);
SendStatus::MaxPacketSizeExceeded(data.len(), MAX_UDP_PAYLOAD_LEN)
}
else {
loop {
match send_method(data) {
Ok(_) => break SendStatus::Sent,
// Avoid ICMP generated error to be logged
Err(ref err) if err.kind() == ErrorKind::ConnectionRefused => {
break SendStatus::ResourceNotFound
}
Err(ref err) if err.kind() == ErrorKind::WouldBlock => continue,
Err(err) => {
log::error!("UDP send error: {}", err);
break SendStatus::ResourceNotFound // should not happen
loop {
match send_method(data) {
Ok(_) => break SendStatus::Sent,
// Avoid ICMP generated error to be logged
Err(ref err) if err.kind() == ErrorKind::ConnectionRefused => {
break SendStatus::ResourceNotFound
}
Err(ref err) if err.kind() == ErrorKind::WouldBlock => continue,
Err(ref err) if err.kind() == ErrorKind::Other => {
let expected_assumption = if data.len() > MAX_PAYLOAD_LEN {
MAX_PAYLOAD_LEN
}
else {
// e.g. MacOS do not support the MAX UDP MTU.
MAX_COMPATIBLE_PAYLOAD_LEN
};
break SendStatus::MaxPacketSizeExceeded(data.len(), expected_assumption)
}
Err(err) => {
log::error!("UDP send error: {}", err);
break SendStatus::ResourceNotFound // should not happen
}
}
}
Expand Down
11 changes: 7 additions & 4 deletions src/adapters/web_socket.rs → src/adapters/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ use std::ops::{DerefMut};

/// Max message size for default config
// From https://docs.rs/tungstenite/0.13.0/src/tungstenite/protocol/mod.rs.html#65
pub const MAX_WS_PAYLOAD_LEN: usize = 32 << 20;
pub const MAX_PAYLOAD_LEN: usize = 32 << 20;

pub struct WsAdapter;
pub(crate) struct WsAdapter;
impl Adapter for WsAdapter {
type Remote = RemoteResource;
type Local = LocalResource;
Expand All @@ -44,7 +44,7 @@ enum RemoteState {
Handshake(Option<PendingHandshake>),
}

pub struct RemoteResource {
pub(crate) struct RemoteResource {
state: Mutex<RemoteState>,
}

Expand Down Expand Up @@ -181,6 +181,9 @@ impl RemoteResource {
Err(Error::Io(ref err)) if err.kind() == ErrorKind::WouldBlock => {
result = web_socket.write_pending();
}
Err(Error::Capacity(_)) => {
break SendStatus::MaxPacketSizeExceeded(data.len(), MAX_PAYLOAD_LEN)
}
Err(err) => {
log::error!("WS send error: {}", err);
break SendStatus::ResourceNotFound // should not happen
Expand All @@ -190,7 +193,7 @@ impl RemoteResource {
}
}

pub struct LocalResource {
pub(crate) struct LocalResource {
listener: TcpListener,
}

Expand Down
4 changes: 3 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
// Tells rustdoc where is the README to compile and test the rust code found there
doc_comment::doctest!("../README.md");

mod adapters;
/// Adapter related information.
/// If some adapter has special values or configuration, it is specified here.
pub mod adapters;

/// Main API. Create connections, send and receive message, signals,...
pub mod node;
Expand Down
Loading