Skip to content

Commit

Permalink
Propagate IO readiness state to NIO::Monitor#readiness
Browse files Browse the repository at this point in the history
Previously information about the readiness of a particular monitor
wasn't being exposed. This commit implements readiness support for all 3
implementations.
  • Loading branch information
tarcieri committed Jan 8, 2012
1 parent 9d363f9 commit 449c36d
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 24 deletions.
62 changes: 55 additions & 7 deletions ext/nio4r/monitor.c
Expand Up @@ -20,8 +20,11 @@ static VALUE NIO_Monitor_close(VALUE self);
static VALUE NIO_Monitor_is_closed(VALUE self);
static VALUE NIO_Monitor_io(VALUE self);
static VALUE NIO_Monitor_interests(VALUE self);
static VALUE NIO_Monitor_is_readable(VALUE self);
static VALUE NIO_Monitor_is_writable(VALUE self);
static VALUE NIO_Monitor_value(VALUE self);
static VALUE NIO_Monitor_set_value(VALUE self, VALUE obj);
static VALUE NIO_Monitor_readiness(VALUE self);

/* Internal functions */
static void NIO_Monitor_callback(struct ev_loop *ev_loop, struct ev_io *io, int revents);
Expand All @@ -46,6 +49,10 @@ void Init_NIO_Monitor()
rb_define_method(cNIO_Monitor, "interests", NIO_Monitor_interests, 0);
rb_define_method(cNIO_Monitor, "value", NIO_Monitor_value, 0);
rb_define_method(cNIO_Monitor, "value=", NIO_Monitor_set_value, 1);
rb_define_method(cNIO_Monitor, "readiness", NIO_Monitor_readiness, 0);
rb_define_method(cNIO_Monitor, "readable?", NIO_Monitor_is_readable, 0);
rb_define_method(cNIO_Monitor, "writable?", NIO_Monitor_is_writable, 0);
rb_define_method(cNIO_Monitor, "writeable?", NIO_Monitor_is_writable, 0);
}

static VALUE NIO_Monitor_allocate(VALUE klass)
Expand All @@ -68,7 +75,6 @@ static VALUE NIO_Monitor_initialize(VALUE self, VALUE selector_obj, VALUE io, VA
{
struct NIO_Monitor *monitor;
struct NIO_Selector *selector;
int events;
ID interests_id;

#if HAVE_RB_IO_T
Expand All @@ -79,21 +85,21 @@ static VALUE NIO_Monitor_initialize(VALUE self, VALUE selector_obj, VALUE io, VA

interests_id = SYM2ID(interests);

Data_Get_Struct(self, struct NIO_Monitor, monitor);

if(interests_id == rb_intern("r")) {
events = EV_READ;
monitor->interests = EV_READ;
} else if(interests_id == rb_intern("w")) {
events = EV_WRITE;
monitor->interests = EV_WRITE;
} else if(interests_id == rb_intern("rw")) {
events = EV_READ | EV_WRITE;
monitor->interests = EV_READ | EV_WRITE;
} else {
rb_raise(rb_eArgError, "invalid event type %s (must be :r, :w, or :rw)",
RSTRING_PTR(rb_funcall(interests, rb_intern("inspect"), 0, 0)));
}

Data_Get_Struct(self, struct NIO_Monitor, monitor);

GetOpenFile(rb_convert_type(io, T_FILE, "IO", "to_io"), fptr);
ev_io_init(&monitor->ev_io, NIO_Monitor_callback, FPTR_TO_FD(fptr), events);
ev_io_init(&monitor->ev_io, NIO_Monitor_callback, FPTR_TO_FD(fptr), monitor->interests);

rb_ivar_set(self, rb_intern("selector"), selector_obj);
rb_ivar_set(self, rb_intern("io"), io);
Expand Down Expand Up @@ -154,11 +160,53 @@ static VALUE NIO_Monitor_set_value(VALUE self, VALUE obj)
return rb_ivar_set(self, rb_intern("value"), obj);
}

static VALUE NIO_Monitor_readiness(VALUE self)
{
struct NIO_Monitor *monitor;
Data_Get_Struct(self, struct NIO_Monitor, monitor);

if((monitor->revents & (EV_READ | EV_WRITE)) == (EV_READ | EV_WRITE)) {
return ID2SYM(rb_intern("rw"));
} else if(monitor->revents & EV_READ) {
return ID2SYM(rb_intern("r"));
} else if(monitor->revents & EV_WRITE) {
return ID2SYM(rb_intern("w"));
} else {
return Qnil;
}
}

static VALUE NIO_Monitor_is_readable(VALUE self)
{
struct NIO_Monitor *monitor;
Data_Get_Struct(self, struct NIO_Monitor, monitor);

if(monitor->revents & EV_READ) {
return Qtrue;
} else {
return Qfalse;
}
}

static VALUE NIO_Monitor_is_writable(VALUE self)
{
struct NIO_Monitor *monitor;
Data_Get_Struct(self, struct NIO_Monitor, monitor);

if(monitor->revents & EV_WRITE) {
return Qtrue;
} else {
return Qfalse;
}
}

/* libev callback fired whenever this monitor gets events */
static void NIO_Monitor_callback(struct ev_loop *ev_loop, struct ev_io *io, int revents)
{
struct NIO_Monitor *monitor = (struct NIO_Monitor *)io->data;

assert(monitor->selector != 0);
monitor->revents = revents;

NIO_Selector_handle_event(monitor->selector, monitor->self, revents);
}
1 change: 1 addition & 0 deletions ext/nio4r/nio4r.h
Expand Up @@ -31,6 +31,7 @@ struct NIO_callback_data
struct NIO_Monitor
{
VALUE self;
int interests, revents;
struct ev_io ev_io;
struct NIO_Selector *selector;
};
Expand Down
16 changes: 16 additions & 0 deletions lib/nio/jruby/monitor.rb
Expand Up @@ -15,6 +15,22 @@ def interests
Selector.iops2sym @key.interestOps
end

# What is the IO object ready for?
def readiness
Selector.iops2sym @key.readyOps
end

# Is the IO object readable?
def readable?
readiness == :r || readiness == :rw
end

# Is the IO object writable?
def writable?
readiness == :w || readiness == :rw
end
alias_method :writeable?, :writable?

# Is this monitor closed?
def closed?; @closed; end

Expand Down
13 changes: 12 additions & 1 deletion lib/nio/monitor.rb
Expand Up @@ -2,14 +2,25 @@ module NIO
# Monitors watch IO objects for specific events
class Monitor
attr_reader :io, :interests
attr_accessor :value
attr_accessor :value, :readiness

# :nodoc
def initialize(io, interests)
@io, @interests = io, interests
@closed = false
end

# Is the IO object readable?
def readable?
readiness == :r || readiness == :rw
end

# Is the IO object writable?
def writable?
readiness == :w || readiness == :rw
end
alias_method :writeable?, :writable?

# Is this monitor closed?
def closed?; @closed; end

Expand Down
25 changes: 19 additions & 6 deletions lib/nio/selector.rb
Expand Up @@ -54,10 +54,8 @@ def select(timeout = nil)
ready_readers, ready_writers = Kernel.select readers, writers, [], timeout
return unless ready_readers # timeout or wakeup

results = ready_readers
results.concat ready_writers if ready_writers

results.map! do |io|
results = []
ready_readers.each do |io|
if io == @wakeup
# Clear all wakeup signals we've received by reading them
# Wakeups should have level triggered behavior
Expand All @@ -71,12 +69,27 @@ def select(timeout = nil)

return
else
@selectables[io]
monitor = @selectables[io]
monitor.readiness = :r
results << monitor
end
end

ready_readwriters = ready_readers & ready_writers
ready_writers = ready_writers - ready_readwriters

[[ready_writers, :w], [ready_readwriters, :rw]].each do |ios, readiness|
ios.each do |io|
monitor = @selectables[io]
monitor.readiness = readiness
results << monitor
end
end

results
end
end

# Select for ready monitors, successively yielding each one in a block
def select_each(timeout = nil, &block)
selected = select(timeout)
Expand Down
30 changes: 20 additions & 10 deletions spec/nio/monitor_spec.rb
@@ -1,23 +1,18 @@
require 'spec_helper'

describe NIO::Monitor do
let :readable_pipe do
pipe, peer = IO.pipe
peer << "data"
pipe
let :readable do
reader, writer = IO.pipe
writer << "have some data"
reader
end

# let :unreadable_pipe do
# pipe, _ = IO.pipe
# pipe
# end

let :selector do
NIO::Selector.new
end

# Monitors are created by registering IO objects or channels with a selector
subject { selector.register(readable_pipe, :r) }
subject { selector.register(readable, :r) }

it "knows its interests" do
subject.interests.should == :r
Expand All @@ -28,6 +23,21 @@
subject.value.should == 42
end

it "knows what IO objects are ready for" do
# Perhaps let bindings are just confusing me but they're not producing
# what I want. Manually doing the setup here does
# FIXME: Hey RSpec wizards! Fix this!
reader, writer = IO.pipe
writer << "loldata"
selector = NIO::Selector.new
subject = selector.register(reader, :r)

# Here's where the spec really begins
selector.select(1).should include(subject)
subject.readiness.should == :r
subject.should be_readable
end

it "closes" do
subject.should_not be_closed
subject.close
Expand Down

0 comments on commit 449c36d

Please sign in to comment.