Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

broke io::read/write out into non-buffering and buffering solutions #51

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
Cargo.lock
target
libs
.*.sw*
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ authors = ["Carl Lerche <me@carllerche.com>"]

git = "https://github.com/carllerche/nix-rust"

[dependencies.time]

git = "https://github.com/rust-lang/time"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pulled this in on master already.


[[test]]

name = "test"
Expand Down
2 changes: 1 addition & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pub type MioResult<T> = Result<T, MioError>;

#[deriving(Show, PartialEq, Clone)]
pub struct MioError {
kind: MioErrorKind,
pub kind: MioErrorKind,
sys: Option<SysError>
}

Expand Down
171 changes: 171 additions & 0 deletions src/event_ctx.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
use std::fmt;
use token::Token;
use handler::{ReadHint, DATAHINT, HUPHINT, ERRORHINT};


bitflags!(
flags IoEventKind: uint {
const IOREADABLE = 0x001,
const IOWRITABLE = 0x002,
const IOERROR = 0x004,
const IOHUPHINT = 0x008,
const IOHINTED = 0x010,
const IOONESHOT = 0x020,
const IOEDGE = 0x040,
const IOLEVEL = 0x080,
const IOALL = 0x001 | 0x002 | 0x004 | 0x008
}
)

impl fmt::Show for IoEventKind {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let mut one = false;
let flags = [
(IOREADABLE, "IoReadable"),
(IOWRITABLE, "IoWritable"),
(IOERROR, "IoError"),
(IOHUPHINT, "IoHupHint"),
(IOHINTED, "IoHinted"),
(IOEDGE, "IoEdgeTriggered")];

for &(flag, msg) in flags.iter() {
if self.contains(flag) {
if one { try!(write!(fmt, " | ")) }
try!(write!(fmt, "{}", msg));

one = true
}
}

Ok(())
}
}

#[deriving(Show)]
pub struct IoEventCtx {
kind: IoEventKind,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seems to be some spacing issues

token: Token
}

/// IoEventCtx represents the raw event upon which the OS-specific selector
/// operates. It takes an IoEventCtx when registering and re-registering event
/// interest for Tokens. It also returns IoEventCtx when calling handlers.
/// An event can represent more than one kind (such as readable or writable) at a time.
///
/// These IoEventCtx objects are created by the OS-specific concrete
/// Selector when they have events to report.
impl IoEventCtx {
/// Create a new IoEvent.
pub fn new(kind: IoEventKind, token: Token) -> IoEventCtx {
IoEventCtx {
kind: kind,
token: token
}
}

pub fn token(&self) -> Token {
self.token
}

/// Return an optional hint for a readable IO handle. Currently,
/// this method supports the HupHint, which indicates that the
/// kernel reported that the remote side hung up. This allows a
/// consumer to avoid reading in order to discover the hangup.
pub fn read_hint(&self) -> ReadHint {
let mut hint = ReadHint::empty();

// The backend doesn't support hinting
if !self.kind.contains(IOHINTED) {
return hint;
}

if self.kind.contains(IOHUPHINT) {
hint = hint | HUPHINT
}

if self.kind.contains(IOREADABLE) {
hint = hint | DATAHINT
}

if self.kind.contains(IOERROR) {
hint = hint | ERRORHINT
}

hint
}

/// This event indicated that the IO handle is now readable
pub fn is_readable(&self) -> bool {
self.kind.contains(IOREADABLE) || self.kind.contains(IOHUPHINT)
}

/// This event indicated that the IO handle is now writable
pub fn is_writable(&self) -> bool {
self.kind.contains(IOWRITABLE)
}

/// This event indicated that the IO handle had an error
pub fn is_error(&self) -> bool {
self.kind.contains(IOERROR)
}

pub fn is_hangup(&self) -> bool {
self.kind.contains(IOHUPHINT)
}

pub fn is_edge_triggered(&self) -> bool {
self.kind.contains(IOEDGE)
}

/// This event indicated that the IO handle is now readable
pub fn set_readable(&mut self, flag: bool) -> &IoEventCtx {
match flag {
true => self.kind.insert(IOREADABLE),
false => self.kind.remove(IOREADABLE)
}
return self;
}

/// This event indicated that the IO handle is now writable
pub fn set_writable(&mut self, flag: bool) -> &IoEventCtx {
match flag {
true => self.kind.insert(IOWRITABLE),
false => self.kind.remove(IOWRITABLE)
}
return self;
}

/// This event indicated that the IO handle had an error
pub fn set_error(&mut self, flag: bool) -> &IoEventCtx {
match flag {
true => self.kind.insert(IOERROR),
false => self.kind.remove(IOERROR)
}
return self;
}

pub fn set_hangup(&mut self, flag: bool) -> &IoEventCtx {
match flag {
true => self.kind.insert(IOHUPHINT),
false => self.kind.remove(IOHUPHINT)
}
return self;
}

pub fn set_edge_triggered(&mut self, flag: bool) -> &IoEventCtx {
match flag {
true => self.kind.insert(IOEDGE),
false => self.kind.remove(IOEDGE)
}
return self;
}

pub fn set_all(&mut self, flag: bool) -> &IoEventCtx {
match flag {
true => self.kind.insert(IOALL),
false => self.kind.remove(IOALL)
}
return self;
}
}

48 changes: 17 additions & 31 deletions src/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ use std::uint;
use error::{MioResult, MioError};
use handler::Handler;
use io::IoHandle;
use net::{Socket, SockAddr};
use notify::Notify;
use os;
use poll::{Poll, IoEvent};
use poll::Poll;
use timer::{Timer, Timeout, TimerResult};
use token::Token;
use event_ctx::IoEventCtx;
use event_ctx as evt;

/// A lightweight event loop.
///
Expand Down Expand Up @@ -73,7 +73,8 @@ impl<T, M: Send> EventLoop<T, M> {
let notify = try!(Notify::with_capacity(config.notify_capacity));

// Register the notification wakeup FD with the IO poller
try!(poll.register(&notify, NOTIFY));
let ctx = evt::IoEventCtx::new(evt::IOREADABLE, NOTIFY);
try!(poll.register(&notify, &ctx));

// Set the timer's starting time reference point
timer.setup();
Expand Down Expand Up @@ -112,32 +113,13 @@ impl<T, M: Send> EventLoop<T, M> {
}

/// Registers an IO handle with the event loop.
pub fn register<H: IoHandle>(&mut self, io: &H, token: Token) -> MioResult<()> {
self.poll.register(io, token)
pub fn register<H: IoHandle>(&mut self, io: &H, eventctx: &IoEventCtx) -> MioResult<()> {
self.poll.register(io, eventctx)
}

/// Connects the socket to the specified address. When the operation
/// completes, the handler will be notified with the supplied token.
///
/// The goal of this method is to ensure that the event loop will always
/// notify about the connection, even if the connection happens
/// immediately. Otherwise, every consumer of the event loop would have
/// to worry about possibly-immediate connection.
pub fn connect<S: Socket>(&mut self, io: &S, addr: &SockAddr, token: Token) -> MioResult<()> {
debug!("socket connect; addr={}", addr);

// Attempt establishing the context. This may not complete immediately.
if try!(os::connect(io.desc(), addr)) {
// On some OSs, connecting to localhost succeeds immediately. In
// this case, queue the writable callback for execution during the
// next event loop tick.
debug!("socket connected immediately; addr={}", addr);
}

// Register interest with socket on the event loop
try!(self.register(io, token));

Ok(())
/// Re-Registers an IO handle with the event to change/update interest
pub fn reregister<H: IoHandle>(&mut self, io: &H, eventctx: &IoEventCtx) -> MioResult<()> {
self.poll.reregister(io, eventctx)
}

/// Keep spinning the event loop indefinitely, and notify the handler whenever
Expand Down Expand Up @@ -238,7 +220,7 @@ impl<T, M: Send> EventLoop<T, M> {
}
}

fn io_event<H: Handler<T, M>>(&mut self, handler: &mut H, evt: IoEvent) {
fn io_event<H: Handler<T, M>>(&mut self, handler: &mut H, evt: IoEventCtx) {
let tok = evt.token();

if evt.is_readable() {
Expand Down Expand Up @@ -269,7 +251,7 @@ impl<T, M: Send> EventLoop<T, M> {

loop {
match self.timer.tick_to(now) {
Some(t) => handler.timeout(self, t),
Some(t) => { debug!("received timeout at {}", now); handler.timeout(self, t); }
_ => return
}
}
Expand Down Expand Up @@ -315,6 +297,8 @@ mod tests {
use super::EventLoop;
use io::{IoWriter, IoReader};
use {io, buf, Buf, Handler, Token, ReadHint};
use event_ctx::IoEventCtx;
use event_ctx as evt;

type TestEventLoop = EventLoop<uint, ()>;

Expand Down Expand Up @@ -350,7 +334,9 @@ mod tests {
let handler = Funtimes::new(rcount.clone(), wcount.clone());

writer.write(&mut buf::wrap("hello".as_bytes())).unwrap();
event_loop.register(&reader, Token(10)).unwrap();

let evt = IoEventCtx::new(evt::IOREADABLE, Token(10));
event_loop.register(&reader, &evt).unwrap();

let _ = event_loop.run_once(handler);
let mut b = buf::ByteBuf::new(16);
Expand Down