Skip to content

Commit

Permalink
timeout option
Browse files Browse the repository at this point in the history
  • Loading branch information
sonots committed Apr 13, 2014
1 parent 6cd9915 commit 7453ed1
Show file tree
Hide file tree
Showing 3 changed files with 210 additions and 60 deletions.
1 change: 1 addition & 0 deletions ext/cool.io/cool.io.h
Expand Up @@ -32,6 +32,7 @@ struct Coolio_Event
struct Coolio_Loop
{
struct ev_loop *ev_loop;
struct ev_timer timer; /* for timeouts */

int running;
int events_received;
Expand Down
132 changes: 72 additions & 60 deletions ext/cool.io/loop.c
Expand Up @@ -11,14 +11,6 @@

#include "cool.io.h"

#if defined(HAVE_RB_THREAD_BLOCKING_REGION) || defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL)
# define Coolio_Loop_may_block_safely() (1)
#elif defined(HAVE_RB_THREAD_ALONE)
# define Coolio_Loop_may_block_safely() (rb_thread_alone())
#else /* just in case Ruby changes: */
# define Coolio_Loop_may_block_safely() (0)
#endif

static VALUE mCoolio = Qnil;
static VALUE cCoolio_Loop = Qnil;

Expand All @@ -28,12 +20,15 @@ static void Coolio_Loop_free(struct Coolio_Loop *loop);

static VALUE Coolio_Loop_initialize(VALUE self);
static VALUE Coolio_Loop_ev_loop_new(VALUE self, VALUE flags);
static VALUE Coolio_Loop_run_once(VALUE self);
static VALUE Coolio_Loop_run_once(int argc, VALUE *argv, VALUE self);
static VALUE Coolio_Loop_run_nonblock(VALUE self);

static void Coolio_Loop_ev_loop_oneshot(struct Coolio_Loop *loop_data);
static void Coolio_Loop_timeout_callback(struct ev_loop *ev_loop, struct ev_timer *timer, int revents);
static void Coolio_Loop_dispatch_events(struct Coolio_Loop *loop_data);

/* Ruby 1.8 needs us to busy wait and run the green threads scheduler every 10ms */
#define BUSYWAIT_INTERVAL 0.01

#define DEFAULT_EVENTBUF_SIZE 32
#define RUN_LOOP(loop_data, options) \
loop_data->running = 1; \
Expand All @@ -54,7 +49,7 @@ void Init_coolio_loop()

rb_define_method(cCoolio_Loop, "initialize", Coolio_Loop_initialize, 0);
rb_define_private_method(cCoolio_Loop, "ev_loop_new", Coolio_Loop_ev_loop_new, 1);
rb_define_method(cCoolio_Loop, "run_once", Coolio_Loop_run_once, 0);
rb_define_method(cCoolio_Loop, "run_once", Coolio_Loop_run_once, -1);
rb_define_method(cCoolio_Loop, "run_nonblock", Coolio_Loop_run_nonblock, 0);
}

Expand All @@ -63,6 +58,7 @@ static VALUE Coolio_Loop_allocate(VALUE klass)
struct Coolio_Loop *loop = (struct Coolio_Loop *)xmalloc(sizeof(struct Coolio_Loop));

loop->ev_loop = 0;
ev_init(&loop->timer, Coolio_Loop_timeout_callback);
loop->running = 0;
loop->events_received = 0;
loop->eventbuf_size = DEFAULT_EVENTBUF_SIZE;
Expand Down Expand Up @@ -174,79 +170,95 @@ void Coolio_Loop_process_event(VALUE watcher, int revents)
loop_data->events_received++;
}

/* Called whenever a timeout fires on the event loop */
static void Coolio_Loop_timeout_callback(struct ev_loop *ev_loop, struct ev_timer *timer, int revents)
{
/* We don't actually need to do anything here, the mere firing of the
timer is sufficient to interrupt the selector. However, libev still wants a callback */
}

/**
* call-seq:
* Coolio::Loop.run_once -> nil
*
* Run the Coolio::Loop once, blocking until events are received.
*/
static VALUE Coolio_Loop_run_once(VALUE self)
static VALUE Coolio_Loop_run_once(int argc, VALUE *argv, VALUE self)
{
VALUE timeout;
VALUE nevents;
struct Coolio_Loop *loop_data;

if (Coolio_Loop_may_block_safely()) {
struct Coolio_Loop *loop_data;

Data_Get_Struct(self, struct Coolio_Loop, loop_data);
rb_scan_args(argc, argv, "01", &timeout);

assert(loop_data->ev_loop && !loop_data->events_received);
if(timeout != Qnil && NUM2DBL(timeout) < 0) {
rb_raise(rb_eArgError, "time interval must be positive");
}

Coolio_Loop_ev_loop_oneshot(loop_data);
Coolio_Loop_dispatch_events(loop_data);
Data_Get_Struct(self, struct Coolio_Loop, loop_data);

nevents = INT2NUM(loop_data->events_received);
loop_data->events_received = 0;
assert(loop_data->ev_loop && !loop_data->events_received);

#if defined(HAVE_RB_THREAD_BLOCKING_REGION) || defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL) || defined(HAVE_RB_THREAD_ALONE)
/* Implement the optional timeout (if any) as a ev_timer */
if(timeout != Qnil) {
/* It seems libev is not a fan of timers being zero, so fudge a little */
loop_data->timer.repeat = NUM2DBL(timeout) + 0.0001;
ev_timer_again(loop_data->ev_loop, &loop_data->timer);
} else {
nevents = Coolio_Loop_run_nonblock(self);
rb_thread_schedule();
ev_timer_stop(loop_data->ev_loop, &loop_data->timer);
}

return nevents;
}
#endif
#if !defined(HAVE_RB_THREAD_BLOCKING_REGION) && !defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL)
/* Store when we started the loop so we can calculate the timeout */
ev_tstamp started_at = ev_now(loop_data->ev_loop);
#endif

#if defined(HAVE_RB_THREAD_BLOCKING_REGION) || defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL)
#define HAVE_EV_LOOP_ONESHOT

static void Coolio_Loop_ev_loop_oneshot(struct Coolio_Loop *loop_data)
{
/* libev is patched to release the GIL when it makes its system call */
RUN_LOOP(loop_data, EVLOOP_ONESHOT);
}
#endif

/* Ruby 1.8 requires us to periodically run the event loop then defer back to
* the green threads scheduler */
#ifndef HAVE_EV_LOOP_ONESHOT
#define BLOCKING_INTERVAL 0.01 /* Block for 10ms at a time */

/* Stub for scheduler's ev_timer callback */
static void timer_callback(struct ev_loop *ev_loop, struct ev_timer *timer, int revents)
{
ev_timer_again (ev_loop, timer);
}

/* Run the event loop, calling rb_thread_schedule every 10ms */
static void Coolio_Loop_ev_loop_oneshot(struct Coolio_Loop *loop_data)
{
struct ev_timer timer;
struct timeval tv;

/* Set up an ev_timer to unblock the loop every 10ms */
ev_timer_init(&timer, timer_callback, BLOCKING_INTERVAL, BLOCKING_INTERVAL);
ev_timer_start(loop_data->ev_loop, &timer);

/* Loop until we receive events */
while(!loop_data->events_received) {
#elif defined(HAVE_RB_THREAD_ALONE)
/* If we're the only thread we can make a blocking system call */
if(rb_thread_alone()) {
#else
/* If we don't have rb_thread_alone() we can't block */
if(0) {
#endif /* defined(HAVE_RB_THREAD_BLOCKING_REGION) || defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL) */

#if !defined(HAVE_RB_THREAD_BLOCKING_REGION) && !defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL)
TRAP_BEG;
RUN_LOOP(loop_data, EVLOOP_ONESHOT);
TRAP_END;

rb_thread_schedule();
} else {
/* We need to busy wait as not to stall the green thread scheduler
Ruby 1.8: just say no! :( */
ev_timer_init(&loop_data->timer, Coolio_Loop_timeout_callback, BUSYWAIT_INTERVAL, BUSYWAIT_INTERVAL);
ev_timer_start(loop_data->ev_loop, &loop_data->timer);

/* Loop until we receive events */
while(!loop_data->events_received) {
TRAP_BEG;
RUN_LOOP(loop_data, EVLOOP_ONESHOT);
TRAP_END;

/* Run the next green thread */
rb_thread_schedule();

/* Break if the timeout has elapsed */
if(timeout != Qnil && ev_now(loop_data->ev_loop) - started_at >= NUM2DBL(timeout))
break;
}

ev_timer_stop(loop_data->ev_loop, &loop_data->timer);
}
#endif /* !defined(HAVE_RB_THREAD_BLOCKING_REGION) && !defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL) */

ev_timer_stop(loop_data->ev_loop, &timer);
Coolio_Loop_dispatch_events(loop_data);
nevents = INT2NUM(loop_data->events_received);
loop_data->events_received = 0;

return nevents;
}
#endif

/**
* call-seq:
Expand Down
137 changes: 137 additions & 0 deletions spec/tcp_server_spec.rb
@@ -0,0 +1,137 @@
require File.expand_path('../spec_helper', __FILE__)

def unused_port
s = TCPServer.open(0)
port = s.addr[1]
s.close
port
end

TIMEOUT = 0.010
HOST = '127.0.0.1'
PORT = unused_port

def send_data(data)
io = TCPSocket.new('127.0.0.1', PORT)
begin
io.write data
ensure
io.close
end
end

class MyConnection < Coolio::Socket
attr_accessor :data, :connected, :closed

def initialize(io, on_message)
super(io)
@on_message = on_message
end

def on_connect
@connected = true
end

def on_close
@closed = true
end

def on_read(data)
@on_message.call(data)
end
end

@data = ""
def on_message(data)
@data = data
end

def test_run(data = nil)
reactor = Coolio::Loop.new
server = Cool.io::TCPServer.new(HOST, PORT, MyConnection, method(:on_message))
reactor.attach(server)
thread = Thread.new { reactor.run }
send_data(data) if data
sleep TIMEOUT
reactor.stop
server.detach
send_data('') # to leave from blocking loop
thread.join
@data
ensure
server.close
end

def test_run_once(data = nil)
reactor = Coolio::Loop.new
server = Cool.io::TCPServer.new(HOST, PORT, MyConnection, method(:on_message))
reactor.attach(server)
thread = Thread.new do
reactor.run_once # on_connect
reactor.run_once # on_read
end
send_data(data) if data
thread.join
server.detach
@data
ensure
server.close
end

def test_run_once_timeout(timeout = TIMEOUT)
reactor = Coolio::Loop.new
server = Cool.io::TCPServer.new(HOST, PORT, MyConnection, method(:on_message))
reactor.attach(server)
running = true
thread = Thread.new { reactor.run_once(timeout) }
sleep timeout
server.detach
thread.join
@data
ensure
server.close
end

def test_run_timeout(data = nil, timeout = TIMEOUT)
reactor = Coolio::Loop.new
server = Cool.io::TCPServer.new(HOST, PORT, MyConnection, method(:on_message))
reactor.attach(server)
running = true
thread = Thread.new do
while running and reactor.has_active_watchers?
reactor.run_once(timeout)
end
end
send_data(data) if data
sleep timeout
server.detach
running = false # another send is not required
thread.join
@data
ensure
server.close
end

describe Coolio::TCPServer do

it '#run' do
test_run("hello").should == "hello"
end

it '#run_once' do
test_run_once("hello").should == "hello"
end

it '#run_once(timeout)' do
test_run_once_timeout # should not block
end

it '#run_once(-timeout)' do
expect { test_run_once_timeout(-0.1) }.to raise_error(ArgumentError)
end

it '#run(timeout)' do
test_run_timeout("hello").should == "hello"
end

end

0 comments on commit 7453ed1

Please sign in to comment.