Skip to content

Commit

Permalink
Remove Socket token generic in favor of a struct
Browse files Browse the repository at this point in the history
Token has become a tuple struct wrapping a single uint. Using a generic
is not much use because supporting epoll and kqueue require that:

a) implement Copy
b) fit in the same number of bytes as uint

Given these requirements, it seems easier to just use a uint based
token.
  • Loading branch information
carllerche committed Sep 30, 2014
1 parent d717457 commit 49e99b3
Show file tree
Hide file tree
Showing 13 changed files with 159 additions and 166 deletions.
69 changes: 7 additions & 62 deletions src/handler.rs
@@ -1,72 +1,17 @@
use reactor::{Reactor};
use reactor::Reactor;
use token::Token;

#[allow(unused_variable)]
pub trait Handler<T: Token, T2, M: Send> {
fn readable(&mut self, reactor: &mut Reactor<T, T2, M>, token: T) {
pub trait Handler<T, M: Send> {
fn readable(&mut self, reactor: &mut Reactor<T, M>, token: Token) {
}

fn writable(&mut self, reactor: &mut Reactor<T, T2, M>, token: T) {
fn writable(&mut self, reactor: &mut Reactor<T, M>, token: Token) {
}

fn notify(&mut self, reactor: &mut Reactor<T, T2, M>, msg: M) {
fn notify(&mut self, reactor: &mut Reactor<T, M>, msg: M) {
}

fn timeout(&mut self, reactor: &mut Reactor<T, T2, M>, timeout: T2) {
}
}

pub trait Token : Copy {
fn from_u64(val: u64) -> Self;

fn to_u64(self) -> u64;
}

impl Token for int {
fn from_u64(val: u64) -> int {
val as int
}

fn to_u64(self) -> u64 {
self as u64
}
}

impl Token for uint {
fn from_u64(val: u64) -> uint {
val as uint
}

fn to_u64(self) -> u64 {
self as u64
}
}

impl Token for i64 {
fn from_u64(val: u64) -> i64 {
val as i64
}

fn to_u64(self) -> u64 {
self as u64
}
}

impl Token for u64 {
fn from_u64(val: u64) -> u64 {
val
}

fn to_u64(self) -> u64 {
self
}
}

impl Token for () {
fn from_u64(_: u64) -> () {
()
}

fn to_u64(self) -> u64 {
0
fn timeout(&mut self, reactor: &mut Reactor<T, M>, timeout: T) {
}
}
6 changes: 6 additions & 0 deletions src/lib.rs
Expand Up @@ -53,6 +53,11 @@ pub use timer::{
Timer,
Timeout,
};
pub use token::{
Token,
TOKEN_0,
TOKEN_1,
};

pub mod buf;
mod error;
Expand All @@ -65,3 +70,4 @@ mod reactor;
mod slab;
mod socket;
mod timer;
mod token;
4 changes: 2 additions & 2 deletions src/os/epoll.rs
Expand Up @@ -28,12 +28,12 @@ impl Selector {
}

/// Register event interests for the given IO handle with the OS
pub fn register(&mut self, io: &IoDesc, token: u64) -> MioResult<()> {
pub fn register(&mut self, io: &IoDesc, token: uint) -> MioResult<()> {
let interests = EPOLLIN | EPOLLOUT | EPOLLERR;

let info = EpollEvent {
events: interests | EPOLLET,
data: token
data: token as u64
};

epoll_ctl(self.epfd, EpollCtlAdd, io.fd, &info)
Expand Down
10 changes: 4 additions & 6 deletions src/os/kqueue.rs
Expand Up @@ -29,7 +29,7 @@ impl Selector {
Ok(())
}

pub fn register(&mut self, io: &IoDesc, token: u64) -> MioResult<()> {
pub fn register(&mut self, io: &IoDesc, token: uint) -> MioResult<()> {
let flag = EV_ADD | EV_CLEAR;

try!(self.ev_push(io, EVFILT_READ, flag, FilterFlag::empty(), token));
Expand All @@ -45,16 +45,15 @@ impl Selector {
filter: EventFilter,
flags: EventFlag,
fflags: FilterFlag,
token: u64) -> MioResult<()> {
token: uint) -> MioResult<()> {

// If the change buffer is full, flush it
try!(self.maybe_flush_changes());

let idx = self.changes.len;
let ev = &mut self.changes.events[idx];

// TODO: Don't cast to uint
ev_set(ev, io.fd as uint, filter, flags, fflags, token as uint);
ev_set(ev, io.fd as uint, filter, flags, fflags, token);

self.changes.len += 1;

Expand Down Expand Up @@ -121,8 +120,7 @@ impl Events {
}
}

// TODO: Don't cast
IoEvent::new(kind, token as u64)
IoEvent::new(kind, token)
}

#[inline]
Expand Down
13 changes: 7 additions & 6 deletions src/poll.rs
@@ -1,6 +1,7 @@
use error::MioResult;
use io::IoHandle;
use os;
use token::Token;

pub struct Poll {
selector: os::Selector,
Expand All @@ -15,11 +16,11 @@ impl Poll {
})
}

pub fn register<H: IoHandle>(&mut self, io: &H, token: u64) -> MioResult<()> {
pub fn register<H: IoHandle>(&mut self, io: &H, token: Token) -> MioResult<()> {
debug!("registering IO with poller");

// Register interests for this socket
try!(self.selector.register(io.desc(), token));
try!(self.selector.register(io.desc(), token.as_uint()));

Ok(())
}
Expand Down Expand Up @@ -47,7 +48,7 @@ bitflags!(
#[deriving(Show)]
pub struct IoEvent {
kind: IoEventKind,
token: u64
token: Token
}

/// IoEvent represents the raw event that the OS-specific selector
Expand All @@ -58,14 +59,14 @@ pub struct IoEvent {
/// Selector when they have events to report.
impl IoEvent {
/// Create a new IoEvent.
pub fn new(kind: IoEventKind, token: u64) -> IoEvent {
pub fn new(kind: IoEventKind, token: uint) -> IoEvent {
IoEvent {
kind: kind,
token: token
token: Token(token)
}
}

pub fn token(&self) -> u64 {
pub fn token(&self) -> Token {
self.token
}

Expand Down
59 changes: 29 additions & 30 deletions src/reactor.rs
@@ -1,13 +1,14 @@
use std::default::Default;
use std::u64;
use std::uint;
use error::{MioResult, MioError};
use handler::{Handler, Token};
use handler::Handler;
use io::{IoAcceptor, IoHandle};
use notify::Notify;
use os;
use poll::{Poll, IoEvent};
use socket::{Socket, SockAddr};
use timer::{Timer, Timeout, TimerResult};
use token::Token;

/// A lightweight IO reactor.
///
Expand Down Expand Up @@ -41,24 +42,24 @@ impl Default for ReactorConfig {
}
}

pub struct Reactor<T, T2, M: Send> {
pub struct Reactor<T, M: Send> {
run: bool,
poll: Poll,
timer: Timer<T2>,
timer: Timer<T>,
notify: Notify<M>,
config: ReactorConfig,
}

// Token used to represent notifications
static NOTIFY: u64 = u64::MAX;
static NOTIFY: Token = Token(uint::MAX);

impl<T: Token, T2, M: Send> Reactor<T, T2, M> {
impl<T, M: Send> Reactor<T, M> {
/// Initializes a new reactor. The reactor will not be running yet.
pub fn new() -> MioResult<Reactor<T, T2, M>> {
pub fn new() -> MioResult<Reactor<T, M>> {
Reactor::configured(Default::default())
}

pub fn configured(config: ReactorConfig) -> MioResult<Reactor<T, T2, M>> {
pub fn configured(config: ReactorConfig) -> MioResult<Reactor<T, M>> {
// Create the IO poller
let mut poll = try!(Poll::new());

Expand Down Expand Up @@ -94,7 +95,7 @@ impl<T: Token, T2, M: Send> Reactor<T, T2, M> {

/// After the requested time interval, the handler's `timeout` function
/// will be called with the supplied token.
pub fn timeout_ms(&mut self, token: T2, delay: u64) -> TimerResult<Timeout> {
pub fn timeout_ms(&mut self, token: T, delay: u64) -> TimerResult<Timeout> {
self.timer.timeout_ms(token, delay)
}

Expand All @@ -116,8 +117,8 @@ impl<T: Token, T2, M: Send> Reactor<T, T2, M> {
}

/// Registers an IO handle with the reactor.
pub fn register<H: IoHandle>(&mut self, io: &H, token: T) -> MioResult<()> {
self.poll.register(io, token.to_u64())
pub fn register<H: IoHandle>(&mut self, io: &H, token: Token) -> MioResult<()> {
self.poll.register(io, token)
}

/// Connects the socket to the specified address. When the operation
Expand All @@ -127,9 +128,7 @@ impl<T: Token, T2, M: Send> Reactor<T, T2, M> {
/// notify about the connection, even if the connection happens
/// immediately. Otherwise, every consumer of the reactor would have
/// to worry about possibly-immediate connection.
pub fn connect<S: Socket>(&mut self, io: &S,
addr: &SockAddr, token: T) -> MioResult<()> {

pub fn connect<S: Socket>(&mut self, io: &S, addr: &SockAddr, token: Token) -> MioResult<()> {
debug!("socket connect; addr={}", addr);

// Attempt establishing the context. This may not complete immediately.
Expand All @@ -146,8 +145,8 @@ impl<T: Token, T2, M: Send> Reactor<T, T2, M> {
Ok(())
}

pub fn listen<S, A: IoHandle + IoAcceptor<S>>(&mut self, io: &A, backlog: uint,
token: T) -> MioResult<()> {
pub fn listen<S, A: IoHandle + IoAcceptor<S>>(
&mut self, io: &A, backlog: uint, token: Token) -> MioResult<()> {

debug!("socket listen");

Expand All @@ -162,7 +161,7 @@ impl<T: Token, T2, M: Send> Reactor<T, T2, M> {

/// Keep spinning the reactor indefinitely, and notify the handler whenever
/// any of the registered handles are ready.
pub fn run<H: Handler<T, T2, M>>(&mut self, mut handler: H) -> ReactorResult<H> {
pub fn run<H: Handler<T, M>>(&mut self, mut handler: H) -> ReactorResult<H> {
self.run = true;

while self.run {
Expand All @@ -179,7 +178,7 @@ impl<T: Token, T2, M: Send> Reactor<T, T2, M> {
/// Spin the reactor once, with a timeout of one second, and notify the
/// handler if any of the registered handles become ready during that
/// time.
pub fn run_once<H: Handler<T, T2, M>>(&mut self, mut handler: H) -> ReactorResult<H> {
pub fn run_once<H: Handler<T, M>>(&mut self, mut handler: H) -> ReactorResult<H> {
// Execute a single tick
match self.tick(&mut handler) {
Err(e) => return Err(ReactorError::new(handler, e)),
Expand All @@ -190,7 +189,7 @@ impl<T: Token, T2, M: Send> Reactor<T, T2, M> {
}

// Executes a single run of the reactor loop
fn tick<H: Handler<T, T2, M>>(&mut self, handler: &mut H) -> MioResult<()> {
fn tick<H: Handler<T, M>>(&mut self, handler: &mut H) -> MioResult<()> {
let mut messages;
let mut pending;

Expand Down Expand Up @@ -237,7 +236,7 @@ impl<T: Token, T2, M: Send> Reactor<T, T2, M> {
}

// Process IO events that have been previously polled
fn io_process<H: Handler<T, T2, M>>(&mut self, handler: &mut H, cnt: uint) {
fn io_process<H: Handler<T, M>>(&mut self, handler: &mut H, cnt: uint) {
let mut i = 0u;

// Iterate over the notifications. Each event provides the token
Expand All @@ -258,8 +257,8 @@ impl<T: Token, T2, M: Send> Reactor<T, T2, M> {
}
}

fn io_event<H: Handler<T, T2, M>>(&mut self, handler: &mut H, evt: IoEvent) {
let tok = Token::from_u64(evt.token());
fn io_event<H: Handler<T, M>>(&mut self, handler: &mut H, evt: IoEvent) {
let tok = evt.token();

if evt.is_readable() {
handler.readable(self, tok);
Expand All @@ -274,7 +273,7 @@ impl<T: Token, T2, M: Send> Reactor<T, T2, M> {
}
}

fn notify<H: Handler<T, T2, M>>(&mut self, handler: &mut H, mut cnt: uint) {
fn notify<H: Handler<T, M>>(&mut self, handler: &mut H, mut cnt: uint) {
while cnt > 0 {
let msg = self.notify.poll()
.expect("[BUG] at this point there should always be a message");
Expand All @@ -284,7 +283,7 @@ impl<T: Token, T2, M: Send> Reactor<T, T2, M> {
}
}

fn timer_process<H: Handler<T, T2, M>>(&mut self, handler: &mut H) {
fn timer_process<H: Handler<T, M>>(&mut self, handler: &mut H) {
let now = self.timer.now();

loop {
Expand Down Expand Up @@ -334,9 +333,9 @@ mod tests {
use std::sync::atomics::{AtomicInt, SeqCst};
use super::Reactor;
use io::{IoWriter, IoReader};
use {io, buf, Buf, Handler};
use {io, buf, Buf, Handler, Token};

type TestReactor = Reactor<u64, uint, ()>;
type TestReactor = Reactor<uint, ()>;

struct Funtimes {
rcount: Arc<AtomicInt>,
Expand All @@ -352,10 +351,10 @@ mod tests {
}
}

impl Handler<u64, uint, ()> for Funtimes {
fn readable(&mut self, _reactor: &mut TestReactor, token: u64) {
impl Handler<uint, ()> for Funtimes {
fn readable(&mut self, _reactor: &mut TestReactor, token: Token) {
(*self.rcount).fetch_add(1, SeqCst);
assert_eq!(token, 10u64);
assert_eq!(token, Token(10));
}
}

Expand All @@ -370,7 +369,7 @@ mod tests {
let handler = Funtimes::new(rcount.clone(), wcount.clone());

writer.write(&mut buf::wrap("hello".as_bytes())).unwrap();
reactor.register(&reader, 10u64).unwrap();
reactor.register(&reader, Token(10)).unwrap();

let _ = reactor.run_once(handler);
let mut b = buf::ByteBuf::new(16);
Expand Down

0 comments on commit 49e99b3

Please sign in to comment.