Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node API: zero-copy write/read message #61

Closed
lemunozm opened this issue Mar 20, 2021 · 0 comments · Fixed by #66
Closed

Node API: zero-copy write/read message #61

lemunozm opened this issue Mar 20, 2021 · 0 comments · Fixed by #66
Labels
enhancement New feature or request

Comments

@lemunozm
Copy link
Owner

lemunozm commented Mar 20, 2021

Problem

To read a message, you use:

let (mut network, mut events) = Network::split();
network.listen(Transport::FramedTcp, "0.0.0.0:0").unwrap();
loop {
    match events.receive() {
        NetEvent::Message(endpoint, data) => { /* data has been copied here */ },
        _ => (),
    }
}

Although the current API is quite simple, it has a drawback: in order to pass messages into the EventQueue, you need to perform a copy of that message. This is why the signature of the NetEvent::Message<Endpoint, Vec<u8>> has an allocated vector instead of a reference like &[u8]. This copy is necessary because once you send data into the queue, the lifetime of the referenced data is lost. The internal socket buffer can be overwritten with a new incoming message before you read the previous one.

To avoid this issue you can avoid sending the data into EventQueue in order to process the message directly from the AdapterEvent which signature reference the internal input buffer: AdapterEvent::Data(Endpoint, &[u8]). You can archieve this using the Network::new() constructor:

let mut network = Network::new(|adapter_event| {
    match adapter_event { 
        AdapterEvent::Data(endpoint, data) => { /* data direclty from adapter*/ },
        _ => (),
    }
});
network.listen(Transport::FramedTcp, "0.0.0.0:0").unwrap();

Although this works with the desired performance, it reduces the API usage, for example:

  • How I can send a message just after read it?. I can not access to the Network instance in its own callback, so I need to push this "action" as an event and send it into an EventQueue to read it out of the callback, in the EventQueue loop, and then call Network::send() properly.
  • If I want to send some signal based on the message read, I need an EventQueue.

These problems forced you to divide your application logic, offuscating the code: some events will be processed in the Network callback and other events will be processed in the EventQueue loop:

let events = EventQueue::new();
let sender = events.sender().clone();
let mut network = Network::new(move |adapter_event| {
    match adapter_event { 
        AdapterEvent::Data(endpoint, data) => { 
           // data directly from adapter
          let response = process_data(data);
          // Here I can not send by the network, I need to perform this action out of the callback.
          sender.send(UserEvent::SendResponse(endpoint, response));
        },
        _ => (),
    }
});

network.listen(Transport::FramedTcp, "0.0.0.0:0").unwrap();

loop {
    match events.receive() {
        UserEvent::SendResponse(endpoint, response) => { 
              network.send(endpoint, response);
        },
        _ => (),
    }
}

Solution

To solve this problem, and allow the user to process all their events only in the callback, it is needed some additions:

  • Add network in the own Network callback.
  • Add to the network the possibility to react to timer signals, to avoid completely the use of an EventQueue.
  • Allow natively two types of events, network events, and custom signal events.

Example 1

Signals as part of the network.

enum UserSignal {
    Tick(usize),
    Close,
    // other signals
}

let node = Network::new(|network, event|{
   match event { 
        NetEvent::Data(endpoint, data) => { 
              // data direclty from adapter
              network.send(endpoint, data);
              network.self_signal(UserSignal::Tick(1), Duration::from_millis(50));
        },
       NetEvent::Signal(signal) => match signal {
             UserSignal::Tick => { /* send other signal, call network action, etc... */ }
             UserSignal::Close => network.stop(), // The callback never be called again.
        }
       NetEvent::Connected(..) => (),
       NetEvent::Disconnected(..) => (),
    }
});
network.listen(Transport::FramedTcp, "0.0.0.0:0").unwrap();
network.self_signal(Close, Duration::from_secs(3));
network.wait_to_close();
// You still can make any network call and send signals outside the callback.

Example 2

Node concept: the node, contains network, signals and handles the internal thread . The node can be used inside and outside the callback.

enum UserSignal {
    Tick(usize),
    Close,
    // other signals
}

let node = Node::new(|node, event| {
   match event { 
        Event::Network(net_event) => match net_event {
             NetEvent::Data(endpoint, data) => { 
                 // data direclty from adapter
                 node.network.send(endpoint, data);
                 node.signals.send(UserSignal::Tick(1), Duration::from_millis(50));
             },
            NetEvent::Connected(..) => (),
            NetEvent::Disconnected(..) => (),
        }
        Event::Signal(signal) => match signal {
             UserSignal::Tick => { /* send other signal, call network action, etc... */ }
             UserSIgnal::Close => node.stop()
        }
    }
});
// In this moment the node is already running.
node.network.listen(Transport::FramedTcp, "0.0.0.0:0").unwrap();
node.signals.send(UserSignal::Close, Duration::from_secs(3));
node.await(); //waiting until the node be stoped.
// You still can make any network call and send signals outside the callback.
// ... 
let node = Node::new(...);
node.network.listen(Transport::FramedTcp, "0.0.0.0:0").unwrap();
std::thread::sleep(Duration::from_secs(3));
node.stop();

Example 3 candidate

Split node into a NodeHandler and a NodeListener. The handler manages the network, signals and can stop the internal thread. The NodeListener dispatch received events. The NodeHandler can be used both, inside and outside the callback.

enum UserSignal {
    Tick(usize),
    Close,
    // other signals
}

let (handler, listener) = Node::split();
node.network.listen(Transport::FramedTcp, "0.0.0.0:0").unwrap();
node.signals.send(UserSignal::Close, Duration::from_secs(3));
let node_task = listener.fo_each(|event| {
   match event { 
        NodeEvent::Network(net_event) => match net_event {
             NetEvent::Data(endpoint, data) => { 
                 // data direclty from adapter
                 node.network.send(endpoint, data);
                 node.signals.send(UserSignal::Tick(1), Duration::from_millis(50));
             },
            NetEvent::Connected(..) => (),
            NetEvent::Disconnected(..) => (),
        }
        NodeEvent::Signal(signal) => match signal {
             UserSignal::Tick => { /* send other signal, call network action, etc... */ }
             UserSIgnal::Close => node.stop()
        }
    }
});
// In this moment the node is already running.
// You can still make any network call and send signals outside the callback.
node.network.listen(Transport::Udp, "0.0.0.0:0").unwrap();
node.signals.send(UserSignal::Tick, Duration::from_secs(1));
drop(node_task); //waits until node.stop() be called.
@lemunozm lemunozm added the enhancement New feature or request label Mar 20, 2021
This was referenced Mar 22, 2021
@lemunozm lemunozm changed the title API improvement ideas (focused on increase performance) Node API: focused on increase performance. Mar 24, 2021
@lemunozm lemunozm changed the title Node API: focused on increase performance. Node API: zero-copy write/read message Apr 8, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant