Permalink
Browse files

Fix a number of IO/Signal interaction issues

  • Loading branch information...
1 parent 9a3f572 commit e6bb96a2a313cbdb786d2b2ebe85a493b7bee51f Evan Phoenix committed May 1, 2009
Showing with 147 additions and 178 deletions.
  1. +2 −0 kernel/bootstrap/io.rb
  2. +69 −44 vm/builtin/io.cpp
  3. +2 −2 vm/builtin/io.hpp
  4. +3 −3 vm/builtin/system.cpp
  5. +1 −1 vm/drivers/cli.cpp
  6. +4 −6 vm/environment.cpp
  7. +1 −1 vm/environment.hpp
  8. +1 −0 vm/shared_state.cpp
  9. +6 −6 vm/shared_state.hpp
  10. +33 −68 vm/signal.cpp
  11. +10 −29 vm/signal.hpp
  12. +6 −17 vm/vm.cpp
  13. +9 −1 vm/vm.hpp
View
@@ -43,6 +43,7 @@ def self.connect_pipe(lhs, rhs)
def self.select_primitive(readables, writables, errorables, timeout)
Ruby.primitive :io_select
+ raise PrimitiveFailure, "IO#select_primitive primitive failed"
end
@@ -107,6 +108,7 @@ def close
#
def shutdown(how)
Ruby.primitive :io_shutdown
+ raise PrimitiveFailure, "IO#shutdown primitive failed"
end
end
View
@@ -4,6 +4,7 @@
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <sys/types.h>
+#include <sys/time.h>
#include "builtin/io.hpp"
#include "builtin/bytearray.hpp"
@@ -111,17 +112,7 @@ namespace rubinius {
*
* @todo This is highly unoptimised since we always rebuild the FD_SETs. --rue
*/
- Object* IO::select(STATE, Object* readables, Object* writables, Object* errorables, Object* timeout) {
- struct timeval limit;
- struct timeval* maybe_limit = NULL;
-
- if(!timeout->nil_p()) {
- unsigned long long microseconds = as<Integer>(timeout)->to_ulong_long();
- limit.tv_sec = microseconds / 1000000;
- limit.tv_usec = microseconds % 1000000;
- maybe_limit = &limit;
- }
-
+ Object* IO::select(STATE, Object* readables, Object* writables, Object* errorables, Object* timeout, CallFrame* calling_environment) {
fd_set read_set;
fd_set* maybe_read_set = readables->nil_p() ? NULL : &read_set;
@@ -143,45 +134,69 @@ namespace rubinius {
candidate = hidden_fd_set_from_array(state, errorables, maybe_error_set);
highest = candidate > highest ? candidate : highest;
- /* And the main event, pun intended */
+ struct timeval future;
+ struct timeval limit;
+ struct timeval* maybe_limit = NULL;
+
+ if(!timeout->nil_p()) {
+ unsigned long long microseconds = as<Integer>(timeout)->to_ulong_long();
+ limit.tv_sec = microseconds / 1000000;
+ limit.tv_usec = microseconds % 1000000;
+ maybe_limit = &limit;
+
+ // Get the current time to be used if select is interrupted and we
+ // have to recalculate the sleep time
+ assert(gettimeofday(&future, NULL) == 0);
+ timeradd(&future, &limit, &future);
+ }
+
WaitingForSignal waiter;
- state->install_waiter(waiter);
+ native_int events;
- {
- GlobalLock::UnlockGuard lock(state->global_lock());
+ /* And the main event, pun intended */
retry:
- native_int events = ::select((highest + 1), maybe_read_set,
+ state->install_waiter(waiter);
+ {
+ GlobalLock::UnlockGuard lock(state->global_lock());
+
+ events = ::select((highest + 1), maybe_read_set,
maybe_write_set,
maybe_error_set,
maybe_limit);
+ }
+ state->clear_waiter();
- if(events == -1) {
- if(errno == EAGAIN || errno == EINTR) {
- goto retry;
- }
+ if(events == -1) {
+ if(errno == EAGAIN || errno == EINTR) {
+ if(!state->check_async(calling_environment)) return NULL;
- Exception::errno_error(state, "::select() failed!");
- }
+ // Recalculate the limit and go again.
+ if(maybe_limit) {
+ struct timeval now;
+ assert(gettimeofday(&now, NULL) == 0);
+ timersub(&future, &now, &limit);
+ }
- /* Timeout expired */
- if(events == 0) {
- return Qnil;
+ goto retry;
}
+ Exception::errno_error(state, "select(2) failed");
+ return NULL;
}
- state->clear_waiter();
+ /* Timeout expired */
+ if(events == 0) return Qnil;
/* Build the results. */
- Array* events = Array::create(state, 3);
+ Array* output = Array::create(state, 3);
/* These handle NULL sets. */
- events->set(state, 0, hidden_reject_unset_fds(state, readables, maybe_read_set));
- events->set(state, 1, hidden_reject_unset_fds(state, writables, maybe_write_set));
- events->set(state, 2, hidden_reject_unset_fds(state, errorables, maybe_error_set));
+ output->set(state, 0, hidden_reject_unset_fds(state, readables, maybe_read_set));
+ output->set(state, 1, hidden_reject_unset_fds(state, writables, maybe_write_set));
+ output->set(state, 2, hidden_reject_unset_fds(state, errorables, maybe_error_set));
- return events;
+ return output;
}
@@ -341,23 +356,23 @@ namespace rubinius {
}
- Object* IO::sysread(STATE, Fixnum* number_of_bytes) {
+ Object* IO::sysread(STATE, Fixnum* number_of_bytes, CallFrame* calling_environment) {
std::size_t count = number_of_bytes->to_ulong();
String* buffer = String::create_pinned(state, number_of_bytes);
ssize_t bytes_read;
+ native_int fd = descriptor()->to_native();
OnStack<1> variables(state, buffer);
- retry:
WaitingForSignal waiter;
+
+ retry:
state->install_waiter(waiter);
{
GlobalLock::UnlockGuard lock(state->global_lock());
- bytes_read = ::read(descriptor()->to_native(),
- buffer->data()->bytes,
- count);
+ bytes_read = ::read(fd, buffer->data()->bytes, count);
}
state->clear_waiter();
@@ -366,7 +381,9 @@ namespace rubinius {
if(bytes_read == -1) {
if(errno == EAGAIN || errno == EINTR) {
- goto retry;
+ if(state->check_async(calling_environment)) goto retry;
+ } else {
+ Exception::errno_error(state, "read(2) failed");
}
return NULL;
@@ -456,27 +473,35 @@ namespace rubinius {
Object* IOBuffer::fill(STATE, IO* io, CallFrame* calling_environment) {
ssize_t bytes_read;
+ WaitingForSignal waiter;
+
+ IOBuffer* self = this;
+ OnStack<1> os(state, self);
retry:
- WaitingForSignal waiter;
state->install_waiter(waiter);
{
GlobalLock::UnlockGuard lock(state->global_lock());
bytes_read = read(io->descriptor()->to_native(),
- at_unused(),
- left());
+ self->at_unused(),
+ self->left());
}
state->clear_waiter();
- if(bytes_read == -1 && errno == EINTR) {
- if(!state->check_async(calling_environment)) return NULL;
- goto retry;
+ if(bytes_read == -1) {
+ if(errno == EAGAIN || errno == EINTR) {
+ if(state->check_async(calling_environment)) goto retry;
+ } else {
+ Exception::errno_error(state, "read(2) failed");
+ }
+
+ return NULL;
}
if(bytes_read > 0) {
- read_bytes(state, bytes_read);
+ self->read_bytes(state, bytes_read);
}
return Fixnum::from(bytes_read);
View
@@ -62,7 +62,7 @@ namespace rubinius {
* @todo Replace with an evented version when redoing events. --rue
*/
// Ruby.primitive :io_select
- static Object* select(STATE, Object* readables, Object* writables, Object* errorables, Object* timeout);
+ static Object* select(STATE, Object* readables, Object* writables, Object* errorables, Object* timeout, CallFrame* calling_environment);
/* Instance primitives */
@@ -76,7 +76,7 @@ namespace rubinius {
* Returns Qnil at EOF.
*/
// Ruby.primitive :io_sysread
- Object* sysread(STATE, Fixnum* number_of_bytes);
+ Object* sysread(STATE, Fixnum* number_of_bytes, CallFrame* calling_environment);
// Ruby.primitive :io_seek
Integer* seek(STATE, Integer* amount, Fixnum* whence);
View
@@ -297,9 +297,9 @@ namespace rubinius {
}
Object* System::vm_watch_signal(STATE, Fixnum* sig) {
- SignalThread* thr = state->shared.signal_thread();
- if(thr) {
- thr->add_signal(sig->to_native());
+ SignalHandler* h = state->shared.signal_handler();
+ if(h) {
+ h->add_signal(sig->to_native());
return Qtrue;
} else {
return Qfalse;
View
@@ -89,7 +89,7 @@ int main(int argc, char** argv) {
std::string loader = root + "/loader.rbc";
env.enable_preemption();
- env.start_signal_thread();
+ env.start_signals();
env.run_file(loader);
return env.exit_code();
View
@@ -51,17 +51,15 @@ namespace rubinius {
static void null_func(int sig) {}
- void Environment::start_signal_thread() {
- SignalThread* st = new SignalThread(state);
- st->run();
-
+ void Environment::start_signals() {
struct sigaction action;
action.sa_handler = null_func;
- action.sa_flags = 0;
+ action.sa_flags = SA_RESTART;
sigfillset(&action.sa_mask);
sigaction(7, &action, NULL);
- shared->set_signal_thread(st);
+ state->set_run_signals(true);
+ shared->set_signal_handler(new SignalHandler(state));
}
void Environment::load_argv(int argc, char** argv) {
View
@@ -31,7 +31,7 @@ namespace rubinius {
void enable_preemption();
void boot_vm();
int exit_code();
- void start_signal_thread();
+ void start_signals();
};
}
View
@@ -9,6 +9,7 @@
namespace rubinius {
SharedState::SharedState(Configuration& config, ConfigParser& cp)
: initialized_(false)
+ , signal_handler_(0)
, global_handles_(new capi::Handles)
, profiling_(false)
, profiler_collection_(0)
View
@@ -20,7 +20,7 @@ namespace rubinius {
class ProfilerCollection;
}
- class SignalThread;
+ class SignalHandler;
class ObjectMemory;
class GlobalCache;
class ConfigParser;
@@ -53,7 +53,7 @@ namespace rubinius {
private:
bool initialized_;
GlobalLock lock_;
- SignalThread* signal_thread_;
+ SignalHandler* signal_handler_;
CallFrameLocationList cf_locations_;
VariableRootBuffers root_buffers_;
capi::Handles* global_handles_;
@@ -84,12 +84,12 @@ namespace rubinius {
return lock_;
}
- SignalThread* signal_thread() {
- return signal_thread_;
+ SignalHandler* signal_handler() {
+ return signal_handler_;
}
- void set_signal_thread(SignalThread* thr) {
- signal_thread_ = thr;
+ void set_signal_handler(SignalHandler* thr) {
+ signal_handler_ = thr;
}
static SharedState* standalone(VM*);
Oops, something went wrong.

0 comments on commit e6bb96a

Please sign in to comment.