Skip to content

Commit

Permalink
Improve EventLoop IO interest handling
Browse files Browse the repository at this point in the history
* Change io::read/write to not use a loop
* Return the count of bytes read / written
* Allow specifying IO interests when registering
* Add EventLoop#reregister and EventLoop#register_opt
  • Loading branch information
Rick Richardson authored and carllerche committed Nov 26, 2014
1 parent 07ad77e commit 0122914
Show file tree
Hide file tree
Showing 21 changed files with 676 additions and 423 deletions.
2 changes: 1 addition & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub type MioResult<T> = Result<T, MioError>;

#[deriving(Show, PartialEq, Clone)]
pub struct MioError {
kind: MioErrorKind,
pub kind: MioErrorKind,
sys: Option<SysError>
}

Expand Down
46 changes: 16 additions & 30 deletions src/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ use std::uint;
use error::{MioResult, MioError};
use handler::Handler;
use io::IoHandle;
use net::{Socket, SockAddr};
use notify::Notify;
use os;
use poll::{Poll, IoEvent};
use os::event;
use poll::{Poll};
use timer::{Timer, Timeout, TimerResult};
use token::Token;
use os::token::Token;

/// A lightweight event loop.
///
Expand Down Expand Up @@ -74,7 +73,7 @@ impl<T, M: Send> EventLoop<T, M> {
let notify = try!(Notify::with_capacity(config.notify_capacity));

// Register the notification wakeup FD with the IO poller
try!(poll.register(&notify, NOTIFY));
try!(poll.register(&notify, NOTIFY, event::READABLE | event::WRITABLE, event::EDGE));

// Set the timer's starting time reference point
timer.setup();
Expand Down Expand Up @@ -114,31 +113,17 @@ impl<T, M: Send> EventLoop<T, M> {

/// Registers an IO handle with the event loop.
pub fn register<H: IoHandle>(&mut self, io: &H, token: Token) -> MioResult<()> {
self.poll.register(io, token)
self.poll.register(io, token, event::READABLE, event::LEVEL)
}

/// Connects the socket to the specified address. When the operation
/// completes, the handler will be notified with the supplied token.
///
/// The goal of this method is to ensure that the event loop will always
/// notify about the connection, even if the connection happens
/// immediately. Otherwise, every consumer of the event loop would have
/// to worry about possibly-immediate connection.
pub fn connect<S: Socket>(&mut self, io: &S, addr: &SockAddr, token: Token) -> MioResult<()> {
debug!("socket connect; addr={}", addr);

// Attempt establishing the context. This may not complete immediately.
if try!(os::connect(io.desc(), addr)) {
// On some OSs, connecting to localhost succeeds immediately. In
// this case, queue the writable callback for execution during the
// next event loop tick.
debug!("socket connected immediately; addr={}", addr);
}

// Register interest with socket on the event loop
try!(self.register(io, token));
/// Registers an IO handle with the event loop.
pub fn register_opt<H: IoHandle>(&mut self, io: &H, token: Token, interest: event::Interest, opt: event::PollOpt) -> MioResult<()> {
self.poll.register(io, token, interest, opt)
}

Ok(())
/// Re-Registers an IO handle with the event loop.
pub fn reregister<H: IoHandle>(&mut self, io: &H, token: Token, interest: event::Interest, opt: event::PollOpt) -> MioResult<()> {
self.poll.reregister(io, token, interest, opt)
}

/// Keep spinning the event loop indefinitely, and notify the handler whenever
Expand Down Expand Up @@ -239,7 +224,7 @@ impl<T, M: Send> EventLoop<T, M> {
}
}

fn io_event<H: Handler<T, M>>(&mut self, handler: &mut H, evt: IoEvent) {
fn io_event<H: Handler<T, M>>(&mut self, handler: &mut H, evt: event::IoEvent) {
let tok = evt.token();

if evt.is_readable() {
Expand Down Expand Up @@ -315,7 +300,8 @@ mod tests {
use std::sync::atomic::{AtomicInt, SeqCst};
use super::EventLoop;
use io::{IoWriter, IoReader};
use {io, buf, Buf, Handler, Token, ReadHint};
use {io, buf, Buf, Handler, Token};
use os::event;

type TestEventLoop = EventLoop<uint, ()>;

Expand All @@ -334,7 +320,7 @@ mod tests {
}

impl Handler<uint, ()> for Funtimes {
fn readable(&mut self, _event_loop: &mut TestEventLoop, token: Token, _hint: ReadHint) {
fn readable(&mut self, _event_loop: &mut TestEventLoop, token: Token, _hint: event::ReadHint) {
(*self.rcount).fetch_add(1, SeqCst);
assert_eq!(token, Token(10));
}
Expand Down
35 changes: 3 additions & 32 deletions src/handler.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,10 @@
use std::fmt;
use event_loop::EventLoop;
use token::Token;

bitflags!(
flags ReadHint: uint {
const DATAHINT = 0x001,
const HUPHINT = 0x002,
const ERRORHINT = 0x004
}
)

impl fmt::Show for ReadHint {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let mut one = false;
let flags = [
(DATAHINT, "DataHint"),
(HUPHINT, "HupHint"),
(ERRORHINT, "ErrorHint")];

for &(flag, msg) in flags.iter() {
if self.contains(flag) {
if one { try!(write!(fmt, " | ")) }
try!(write!(fmt, "{}", msg));

one = true
}
}

Ok(())
}
}
use os::token::Token;
use os::event;

#[allow(unused_variables)]
pub trait Handler<T, M: Send> {
fn readable(&mut self, event_loop: &mut EventLoop<T, M>, token: Token, hint: ReadHint) {
fn readable(&mut self, event_loop: &mut EventLoop<T, M>, token: Token, hint: event::ReadHint) {
}

fn writable(&mut self, event_loop: &mut EventLoop<T, M>, token: Token) {
Expand Down
78 changes: 32 additions & 46 deletions src/io.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use buf::{Buf, MutBuf};
use os;
use error::MioResult;

use self::NonBlock::{Ready, WouldBlock};
use error::MioErrorKind as mek;

#[deriving(Show)]
pub enum NonBlock<T> {
Expand Down Expand Up @@ -31,11 +31,11 @@ pub trait IoHandle {
}

pub trait IoReader {
fn read(&mut self, buf: &mut MutBuf) -> MioResult<NonBlock<()>>;
fn read(&mut self, buf: &mut MutBuf) -> MioResult<NonBlock<(uint)>>;
}

pub trait IoWriter {
fn write(&mut self, buf: &mut Buf) -> MioResult<NonBlock<()>>;
fn write(&mut self, buf: &mut Buf) -> MioResult<NonBlock<(uint)>>;
}

pub trait IoAcceptor<T> {
Expand Down Expand Up @@ -68,64 +68,50 @@ impl IoHandle for PipeWriter {
}

impl IoReader for PipeReader {
fn read(&mut self, buf: &mut MutBuf) -> MioResult<NonBlock<()>> {
fn read(&mut self, buf: &mut MutBuf) -> MioResult<NonBlock<(uint)>> {
read(self, buf)
}
}

impl IoWriter for PipeWriter {
fn write(&mut self, buf: &mut Buf) -> MioResult<NonBlock<()>> {
fn write(&mut self, buf: &mut Buf) -> MioResult<NonBlock<(uint)>> {
write(self, buf)
}
}

pub fn read<I: IoHandle>(io: &mut I, buf: &mut MutBuf) -> MioResult<NonBlock<()>> {
let mut first_iter = true;

while buf.has_remaining() {
match os::read(io.desc(), buf.mut_bytes()) {
// Successfully read some bytes, advance the cursor
Ok(cnt) => {
buf.advance(cnt);
first_iter = false;
}
Err(e) => {
if e.is_would_block() {
return Ok(WouldBlock);
}

// If the EOF is hit the first time around, then propagate
if e.is_eof() {
if first_iter {
return Err(e);
}

return Ok(Ready(()));
}

// Indicate that the read was successful
return Err(e);
/// Reads the length of the slice supplied by buf.mut_bytes into the buffer
/// This is not guaranteed to consume an entire datagram or segment.
/// If your protocol is msg based (instead of continuous stream) you should
/// ensure that your buffer is large enough to hold an entire segment (1532 bytes if not jumbo
/// frames)
#[inline]
pub fn read<I: IoHandle>(io: &mut I, buf: &mut MutBuf) -> MioResult<NonBlock<uint>> {

match os::read(io.desc(), buf.mut_bytes()) {
// Successfully read some bytes, advance the cursor
Ok(cnt) => {
buf.advance(cnt);
Ok(Ready(cnt))
}
Err(e) => {
match e.kind {
mek::WouldBlock => Ok(WouldBlock),
_ => Err(e)
}
}
}

Ok(Ready(()))
}

pub fn write<O: IoHandle>(io: &mut O, buf: &mut Buf) -> MioResult<NonBlock<()>> {
while buf.has_remaining() {
match os::write(io.desc(), buf.bytes()) {
Ok(cnt) => buf.advance(cnt),
Err(e) => {
if e.is_would_block() {
return Ok(WouldBlock);
}

return Err(e);
///writes the length of the slice supplied by Buf.bytes into the socket
#[inline]
pub fn write<O: IoHandle>(io: &mut O, buf: &mut Buf) -> MioResult<NonBlock<uint>> {
match os::write(io.desc(), buf.bytes()) {
Ok(cnt) => { buf.advance(cnt); Ok(Ready(cnt)) }
Err(e) => {
match e.kind {
mek::WouldBlock => Ok(WouldBlock),
_ => Err(e)
}
}
}

Ok(Ready(()))
}

14 changes: 5 additions & 9 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@ pub use buf::{
pub use error::{
MioResult,
MioError,
MioErrorKind
};
pub use handler::{
Handler,
ReadHint,
DATAHINT,
HUPHINT,
ERRORHINT,
};
pub use io::{
pipe,
Expand All @@ -37,9 +34,7 @@ pub use io::{
PipeWriter,
};
pub use poll::{
Poll,
IoEvent,
IoEventKind,
Poll
};
pub use event_loop::{
EventLoop,
Expand All @@ -50,10 +45,12 @@ pub use event_loop::{
pub use timer::{
Timeout,
};
pub use token::{
pub use os::token::{
Token,
};

pub use os::event;

pub mod buf;
pub mod net;
pub mod util;
Expand All @@ -66,4 +63,3 @@ mod notify;
mod os;
mod poll;
mod timer;
mod token;
33 changes: 27 additions & 6 deletions src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,27 @@ pub mod tcp {
Ok(TcpSocket { desc: try!(os::socket(family, Stream)) })
}

/// Connects the socket to the specified address. When the operation
/// completes, the handler will be notified with the supplied token.
///
/// The goal of this method is to ensure that the event loop will always
/// notify about the connection, even if the connection happens
/// immediately. Otherwise, every consumer of the event loop would have
/// to worry about possibly-immediate connection.
pub fn connect(&self, addr: &SockAddr) -> MioResult<()> {
debug!("socket connect; addr={}", addr);

// Attempt establishing the context. This may not complete immediately.
if try!(os::connect(&self.desc, addr)) {
// On some OSs, connecting to localhost succeeds immediately. In
// this case, queue the writable callback for execution during the
// next event loop tick.
debug!("socket connected immediately; addr={}", addr);
}

Ok(())
}

pub fn bind(self, addr: &SockAddr) -> MioResult<TcpListener> {
try!(os::bind(&self.desc, addr))
Ok(TcpListener { desc: self.desc })
Expand All @@ -169,13 +190,13 @@ pub mod tcp {
}

impl IoReader for TcpSocket {
fn read(&mut self, buf: &mut MutBuf) -> MioResult<NonBlock<()>> {
fn read(&mut self, buf: &mut MutBuf) -> MioResult<NonBlock<(uint)>> {
io::read(self, buf)
}
}

impl IoWriter for TcpSocket {
fn write(&mut self, buf: &mut Buf) -> MioResult<NonBlock<()>> {
fn write(&mut self, buf: &mut Buf) -> MioResult<NonBlock<(uint)>> {
io::write(self, buf)
}
}
Expand Down Expand Up @@ -293,11 +314,11 @@ pub mod udp {
}

impl IoReader for UdpSocket {
fn read(&mut self, buf: &mut MutBuf) -> MioResult<NonBlock<()>> {
fn read(&mut self, buf: &mut MutBuf) -> MioResult<NonBlock<(uint)>> {
match os::read(&self.desc, buf.mut_bytes()) {
Ok(cnt) => {
buf.advance(cnt);
Ok(Ready(()))
Ok(Ready((cnt)))
}
Err(e) => {
if e.is_would_block() {
Expand All @@ -311,11 +332,11 @@ pub mod udp {
}

impl IoWriter for UdpSocket {
fn write(&mut self, buf: &mut Buf) -> MioResult<NonBlock<()>> {
fn write(&mut self, buf: &mut Buf) -> MioResult<NonBlock<(uint)>> {
match os::write(&self.desc, buf.bytes()) {
Ok(cnt) => {
buf.advance(cnt);
Ok(Ready(()))
Ok(Ready((cnt)))
}
Err(e) => {
if e.is_would_block() {
Expand Down

0 comments on commit 0122914

Please sign in to comment.