diff --git a/ext/cool.io/cool.io.h b/ext/cool.io/cool.io.h index 364d4ae..c1a168c 100644 --- a/ext/cool.io/cool.io.h +++ b/ext/cool.io/cool.io.h @@ -32,6 +32,7 @@ struct Coolio_Event struct Coolio_Loop { struct ev_loop *ev_loop; + struct ev_timer timer; /* for timeouts */ int running; int events_received; diff --git a/ext/cool.io/loop.c b/ext/cool.io/loop.c index c0527fc..8ae0445 100644 --- a/ext/cool.io/loop.c +++ b/ext/cool.io/loop.c @@ -11,14 +11,6 @@ #include "cool.io.h" -#if defined(HAVE_RB_THREAD_BLOCKING_REGION) || defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL) -# define Coolio_Loop_may_block_safely() (1) -#elif defined(HAVE_RB_THREAD_ALONE) -# define Coolio_Loop_may_block_safely() (rb_thread_alone()) -#else /* just in case Ruby changes: */ -# define Coolio_Loop_may_block_safely() (0) -#endif - static VALUE mCoolio = Qnil; static VALUE cCoolio_Loop = Qnil; @@ -28,12 +20,15 @@ static void Coolio_Loop_free(struct Coolio_Loop *loop); static VALUE Coolio_Loop_initialize(VALUE self); static VALUE Coolio_Loop_ev_loop_new(VALUE self, VALUE flags); -static VALUE Coolio_Loop_run_once(VALUE self); +static VALUE Coolio_Loop_run_once(int argc, VALUE *argv, VALUE self); static VALUE Coolio_Loop_run_nonblock(VALUE self); -static void Coolio_Loop_ev_loop_oneshot(struct Coolio_Loop *loop_data); +static void Coolio_Loop_timeout_callback(struct ev_loop *ev_loop, struct ev_timer *timer, int revents); static void Coolio_Loop_dispatch_events(struct Coolio_Loop *loop_data); +/* Ruby 1.8 needs us to busy wait and run the green threads scheduler every 10ms */ +#define BUSYWAIT_INTERVAL 0.01 + #define DEFAULT_EVENTBUF_SIZE 32 #define RUN_LOOP(loop_data, options) \ loop_data->running = 1; \ @@ -54,7 +49,7 @@ void Init_coolio_loop() rb_define_method(cCoolio_Loop, "initialize", Coolio_Loop_initialize, 0); rb_define_private_method(cCoolio_Loop, "ev_loop_new", Coolio_Loop_ev_loop_new, 1); - rb_define_method(cCoolio_Loop, "run_once", Coolio_Loop_run_once, 0); + rb_define_method(cCoolio_Loop, "run_once", Coolio_Loop_run_once, -1); rb_define_method(cCoolio_Loop, "run_nonblock", Coolio_Loop_run_nonblock, 0); } @@ -63,6 +58,7 @@ static VALUE Coolio_Loop_allocate(VALUE klass) struct Coolio_Loop *loop = (struct Coolio_Loop *)xmalloc(sizeof(struct Coolio_Loop)); loop->ev_loop = 0; + ev_init(&loop->timer, Coolio_Loop_timeout_callback); loop->running = 0; loop->events_received = 0; loop->eventbuf_size = DEFAULT_EVENTBUF_SIZE; @@ -174,79 +170,95 @@ void Coolio_Loop_process_event(VALUE watcher, int revents) loop_data->events_received++; } +/* Called whenever a timeout fires on the event loop */ +static void Coolio_Loop_timeout_callback(struct ev_loop *ev_loop, struct ev_timer *timer, int revents) +{ + /* We don't actually need to do anything here, the mere firing of the + timer is sufficient to interrupt the selector. However, libev still wants a callback */ +} + /** * call-seq: * Coolio::Loop.run_once -> nil * * Run the Coolio::Loop once, blocking until events are received. */ -static VALUE Coolio_Loop_run_once(VALUE self) +static VALUE Coolio_Loop_run_once(int argc, VALUE *argv, VALUE self) { + VALUE timeout; VALUE nevents; + struct Coolio_Loop *loop_data; - if (Coolio_Loop_may_block_safely()) { - struct Coolio_Loop *loop_data; - - Data_Get_Struct(self, struct Coolio_Loop, loop_data); + rb_scan_args(argc, argv, "01", &timeout); - assert(loop_data->ev_loop && !loop_data->events_received); + if(timeout != Qnil && NUM2DBL(timeout) < 0) { + rb_raise(rb_eArgError, "time interval must be positive"); + } - Coolio_Loop_ev_loop_oneshot(loop_data); - Coolio_Loop_dispatch_events(loop_data); + Data_Get_Struct(self, struct Coolio_Loop, loop_data); - nevents = INT2NUM(loop_data->events_received); - loop_data->events_received = 0; + assert(loop_data->ev_loop && !loop_data->events_received); +#if defined(HAVE_RB_THREAD_BLOCKING_REGION) || defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL) || defined(HAVE_RB_THREAD_ALONE) + /* Implement the optional timeout (if any) as a ev_timer */ + if(timeout != Qnil) { + /* It seems libev is not a fan of timers being zero, so fudge a little */ + loop_data->timer.repeat = NUM2DBL(timeout) + 0.0001; + ev_timer_again(loop_data->ev_loop, &loop_data->timer); } else { - nevents = Coolio_Loop_run_nonblock(self); - rb_thread_schedule(); + ev_timer_stop(loop_data->ev_loop, &loop_data->timer); } - - return nevents; -} +#endif +#if !defined(HAVE_RB_THREAD_BLOCKING_REGION) && !defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL) + /* Store when we started the loop so we can calculate the timeout */ + ev_tstamp started_at = ev_now(loop_data->ev_loop); +#endif #if defined(HAVE_RB_THREAD_BLOCKING_REGION) || defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL) -#define HAVE_EV_LOOP_ONESHOT - -static void Coolio_Loop_ev_loop_oneshot(struct Coolio_Loop *loop_data) -{ + /* libev is patched to release the GIL when it makes its system call */ RUN_LOOP(loop_data, EVLOOP_ONESHOT); -} -#endif - -/* Ruby 1.8 requires us to periodically run the event loop then defer back to - * the green threads scheduler */ -#ifndef HAVE_EV_LOOP_ONESHOT -#define BLOCKING_INTERVAL 0.01 /* Block for 10ms at a time */ - -/* Stub for scheduler's ev_timer callback */ -static void timer_callback(struct ev_loop *ev_loop, struct ev_timer *timer, int revents) -{ - ev_timer_again (ev_loop, timer); -} - -/* Run the event loop, calling rb_thread_schedule every 10ms */ -static void Coolio_Loop_ev_loop_oneshot(struct Coolio_Loop *loop_data) -{ - struct ev_timer timer; - struct timeval tv; - - /* Set up an ev_timer to unblock the loop every 10ms */ - ev_timer_init(&timer, timer_callback, BLOCKING_INTERVAL, BLOCKING_INTERVAL); - ev_timer_start(loop_data->ev_loop, &timer); - - /* Loop until we receive events */ - while(!loop_data->events_received) { +#elif defined(HAVE_RB_THREAD_ALONE) + /* If we're the only thread we can make a blocking system call */ + if(rb_thread_alone()) { +#else + /* If we don't have rb_thread_alone() we can't block */ + if(0) { +#endif /* defined(HAVE_RB_THREAD_BLOCKING_REGION) || defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL) */ + +#if !defined(HAVE_RB_THREAD_BLOCKING_REGION) && !defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL) TRAP_BEG; RUN_LOOP(loop_data, EVLOOP_ONESHOT); TRAP_END; - - rb_thread_schedule(); + } else { + /* We need to busy wait as not to stall the green thread scheduler + Ruby 1.8: just say no! :( */ + ev_timer_init(&loop_data->timer, Coolio_Loop_timeout_callback, BUSYWAIT_INTERVAL, BUSYWAIT_INTERVAL); + ev_timer_start(loop_data->ev_loop, &loop_data->timer); + + /* Loop until we receive events */ + while(!loop_data->events_received) { + TRAP_BEG; + RUN_LOOP(loop_data, EVLOOP_ONESHOT); + TRAP_END; + + /* Run the next green thread */ + rb_thread_schedule(); + + /* Break if the timeout has elapsed */ + if(timeout != Qnil && ev_now(loop_data->ev_loop) - started_at >= NUM2DBL(timeout)) + break; + } + + ev_timer_stop(loop_data->ev_loop, &loop_data->timer); } +#endif /* !defined(HAVE_RB_THREAD_BLOCKING_REGION) && !defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL) */ - ev_timer_stop(loop_data->ev_loop, &timer); + Coolio_Loop_dispatch_events(loop_data); + nevents = INT2NUM(loop_data->events_received); + loop_data->events_received = 0; + + return nevents; } -#endif /** * call-seq: diff --git a/spec/tcp_server_spec.rb b/spec/tcp_server_spec.rb new file mode 100644 index 0000000..29ad05e --- /dev/null +++ b/spec/tcp_server_spec.rb @@ -0,0 +1,137 @@ +require File.expand_path('../spec_helper', __FILE__) + +def unused_port + s = TCPServer.open(0) + port = s.addr[1] + s.close + port +end + +TIMEOUT = 0.010 +HOST = '127.0.0.1' +PORT = unused_port + +def send_data(data) + io = TCPSocket.new('127.0.0.1', PORT) + begin + io.write data + ensure + io.close + end +end + +class MyConnection < Coolio::Socket + attr_accessor :data, :connected, :closed + + def initialize(io, on_message) + super(io) + @on_message = on_message + end + + def on_connect + @connected = true + end + + def on_close + @closed = true + end + + def on_read(data) + @on_message.call(data) + end +end + +@data = "" +def on_message(data) + @data = data +end + +def test_run(data = nil) + reactor = Coolio::Loop.new + server = Cool.io::TCPServer.new(HOST, PORT, MyConnection, method(:on_message)) + reactor.attach(server) + thread = Thread.new { reactor.run } + send_data(data) if data + sleep TIMEOUT + reactor.stop + server.detach + send_data('') # to leave from blocking loop + thread.join + @data +ensure + server.close +end + +def test_run_once(data = nil) + reactor = Coolio::Loop.new + server = Cool.io::TCPServer.new(HOST, PORT, MyConnection, method(:on_message)) + reactor.attach(server) + thread = Thread.new do + reactor.run_once # on_connect + reactor.run_once # on_read + end + send_data(data) if data + thread.join + server.detach + @data +ensure + server.close +end + +def test_run_once_timeout(timeout = TIMEOUT) + reactor = Coolio::Loop.new + server = Cool.io::TCPServer.new(HOST, PORT, MyConnection, method(:on_message)) + reactor.attach(server) + running = true + thread = Thread.new { reactor.run_once(timeout) } + sleep timeout + server.detach + thread.join + @data +ensure + server.close +end + +def test_run_timeout(data = nil, timeout = TIMEOUT) + reactor = Coolio::Loop.new + server = Cool.io::TCPServer.new(HOST, PORT, MyConnection, method(:on_message)) + reactor.attach(server) + running = true + thread = Thread.new do + while running and reactor.has_active_watchers? + reactor.run_once(timeout) + end + end + send_data(data) if data + sleep timeout + server.detach + running = false # another send is not required + thread.join + @data +ensure + server.close +end + +describe Coolio::TCPServer do + + it '#run' do + test_run("hello").should == "hello" + end + + it '#run_once' do + test_run_once("hello").should == "hello" + end + + it '#run_once(timeout)' do + test_run_once_timeout # should not block + end + + it '#run_once(-timeout)' do + expect { test_run_once_timeout(-0.1) }.to raise_error(ArgumentError) + end + + it '#run(timeout)' do + test_run_timeout("hello").should == "hello" + end + +end