Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4092,6 +4092,7 @@ dependencies = [
name = "mesh_remote"
version = "0.0.0"
dependencies = [
"event-listener",
"futures",
"futures-concurrency",
"libc",
Expand Down
1 change: 1 addition & 0 deletions support/mesh/mesh_remote/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ libc.workspace = true
socket2.workspace = true

[dev-dependencies]
event-listener.workspace = true
test_with_tracing.workspace = true

[target.'cfg(windows)'.dev-dependencies]
Expand Down
14 changes: 14 additions & 0 deletions support/mesh/mesh_remote/src/alpc_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -741,13 +741,15 @@ impl Connection {

impl SendEvent for Connection {
fn event(&self, event: OutgoingEvent<'_>) {
let len = event.len();
match self.send_event(event) {
Ok(_) => (),
Err(err) => {
tracing::error!(
node = ?self.local_id,
remote_node = ?self.remote_id,
error = err.as_error(),
len,
"error sending packet"
);
// Notify the connection task of the failure.
Expand Down Expand Up @@ -825,6 +827,18 @@ mod tests {
node2.shutdown().await;
}

#[async_test]
async fn test_message_sizes(driver: DefaultDriver) {
let (p1, p2) = mesh_node::local_node::Port::new_pair();
let (p3, p4) = mesh_node::local_node::Port::new_pair();
let node1 = AlpcNode::new(driver.clone()).unwrap();
let (invitation, _handle) = node1.invite(p2).unwrap();
let _node2 = AlpcNode::join(driver.clone(), invitation, p3).unwrap();

crate::test_common::test_message_sizes(p1, p4, 0..=super::MAX_SMALL_EVENT_SIZE + 0x1000)
.await;
}

#[async_test]
async fn test_three(driver: DefaultDriver) {
let (p1, p2) = channel::<u32>();
Expand Down
1 change: 1 addition & 0 deletions support/mesh/mesh_remote/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod alpc_node;
mod common;
mod point_to_point;
mod protocol;
mod test_common;
mod unix_node;

#[cfg(windows)]
Expand Down
68 changes: 68 additions & 0 deletions support/mesh/mesh_remote/src/test_common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

#![cfg(test)]

use mesh_node::local_node::HandlePortEvent;
use mesh_node::local_node::Port;
use mesh_node::message::Message;
use std::ops::RangeInclusive;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;

#[cfg_attr(not(any(windows, target_os = "linux")), expect(dead_code))]
pub(crate) async fn test_message_sizes(left: Port, right: Port, range: RangeInclusive<usize>) {
let counter = Arc::new(Counter {
count: AtomicUsize::new(*range.start()),
event: event_listener::Event::new(),
});
let _right = right.set_handler(CountHandler(counter.clone()));
let buf = vec![0; *range.end()];
for i in range {
if i % 0x1000 == 0 {
tracing::info!(i, "at message");
}
left.send(Message::serialized(&buf[..i], Vec::new()));
loop {
let listener = counter.event.listen();
if counter.count.load(Relaxed) == i + 1 {
break;
}
listener.await;
}
}
}

struct Counter {
count: AtomicUsize,
event: event_listener::Event,
}

struct CountHandler(Arc<Counter>);

impl HandlePortEvent for CountHandler {
fn message<'a>(
&mut self,
_control: &mut mesh_node::local_node::PortControl<'_, 'a>,
_message: Message<'a>,
) -> Result<(), mesh_node::local_node::HandleMessageError> {
self.0.count.fetch_add(1, Relaxed);
self.0.event.notify(1);
Ok(())
}

fn close(&mut self, _control: &mut mesh_node::local_node::PortControl<'_, '_>) {}

fn fail(
&mut self,
_control: &mut mesh_node::local_node::PortControl<'_, '_>,
_err: mesh_node::local_node::NodeError,
) {
panic!("failed after {} messages", self.0.count.load(Relaxed));
}

fn drain(&mut self) -> Vec<mesh_node::message::OwnedMessage> {
Vec::new()
}
}
15 changes: 15 additions & 0 deletions support/mesh/mesh_remote/src/unix_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1334,6 +1334,21 @@ mod tests {
leader.shutdown().await;
}

#[cfg(target_os = "linux")]
#[async_test]
async fn test_message_sizes(driver: DefaultDriver) {
let (p1, p2) = mesh_node::local_node::Port::new_pair();
let (p3, p4) = mesh_node::local_node::Port::new_pair();
let node1 = UnixNode::new(driver.clone());
let invitation = node1.invite(p2).await.unwrap();
let _node2 = UnixNode::join(driver.clone(), invitation, p3)
.await
.unwrap();

crate::test_common::test_message_sizes(p1, p4, 0..=super::MAX_SMALL_EVENT_SIZE + 0x1000)
.await;
}

#[async_test]
async fn test_dropped_shutdown(driver: DefaultDriver) {
let leader = UnixNode::new(driver.clone());
Expand Down
2 changes: 1 addition & 1 deletion support/pal/src/windows/alpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl PortConfig {
| ALPC_PORFLG_ACCEPT_INDIRECT_HANDLES
| ALPC_PORFLG_ACCEPT_REQUESTS,
DupObjectTypes: OB_ALL_OBJECT_TYPE_CODES,
MaxMessageLength: self.max_message_len,
MaxMessageLength: self.max_message_len + size_of::<PORT_MESSAGE>(),
MaxPoolUsage: usize::MAX,
MaxSectionSize: usize::MAX,
MaxTotalSectionSize: usize::MAX,
Expand Down