Skip to content

Commit

Permalink
Setup max open fd tracking in the VM
Browse files Browse the repository at this point in the history
This makes sure we track it atomically and always keep it up to date
independent from Ruby land code.
  • Loading branch information
dbussink committed Oct 29, 2012
1 parent 4d13643 commit 367ed96
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 16 deletions.
12 changes: 1 addition & 11 deletions kernel/common/io.rb
Expand Up @@ -454,25 +454,15 @@ def self.setup(io, fd, mode=nil, sync=false)
end
end

update_max_open_fd(fd)

io.descriptor = fd
io.mode = mode || cur_mode
io.sync = !!sync
io.sync ||= STDOUT.fileno == fd if STDOUT.respond_to?(:fileno)
io.sync ||= STDERR.fileno == fd if STDERR.respond_to?(:fileno)
end

@max_open_fd = 2

def self.update_max_open_fd(*fds)
fds.each do |fd|
@max_open_fd = fd if fd > @max_open_fd
end
end

def self.max_open_fd
@max_open_fd
@max_open_fd.get
end

##
Expand Down
2 changes: 0 additions & 2 deletions kernel/common/io18.rb
Expand Up @@ -275,11 +275,9 @@ def self.pipe

begin
connect_pipe(lhs, rhs)
::IO.update_max_open_fd(lhs.fileno, rhs.fileno)
rescue Errno::EMFILE
GC.run(true)
connect_pipe(lhs, rhs)
::IO.update_max_open_fd(lhs.fileno, rhs.fileno)
end

lhs.sync = true
Expand Down
2 changes: 0 additions & 2 deletions kernel/common/io19.rb
Expand Up @@ -791,11 +791,9 @@ def self.pipe(external=nil, internal=nil, options=nil)

begin
connect_pipe(lhs, rhs)
::IO.update_max_open_fd(lhs.fileno, rhs.fileno)
rescue Errno::EMFILE
GC.run(true)
connect_pipe(lhs, rhs)
::IO.update_max_open_fd(lhs.fileno, rhs.fileno)
end

lhs.set_encoding external, internal, options
Expand Down
6 changes: 6 additions & 0 deletions vm/builtin/atomic.cpp
Expand Up @@ -15,6 +15,12 @@ namespace rubinius {
return state->new_object<AtomicReference>(G(atomic_ref));
}

AtomicReference* AtomicReference::create(STATE, Object* obj) {
AtomicReference* ref = AtomicReference::allocate(state);
ref->set(state, obj);
return ref;
}

Object* AtomicReference::compare_and_set(STATE, Object* old, Object* new_) {
Object** pp = &value_;

Expand Down
1 change: 1 addition & 0 deletions vm/builtin/atomic.hpp
Expand Up @@ -21,6 +21,7 @@ namespace rubinius {
static void init(STATE);

static AtomicReference* allocate(STATE);
static AtomicReference* create(STATE, Object* val);

// Rubinius.primitive :atomic_get
Object* get(STATE) {
Expand Down
18 changes: 18 additions & 0 deletions vm/builtin/io.cpp
Expand Up @@ -20,6 +20,7 @@
#endif

#include "builtin/io.hpp"
#include "builtin/atomic.hpp"
#include "builtin/array.hpp"
#include "builtin/bytearray.hpp"
#include "builtin/channel.hpp"
Expand Down Expand Up @@ -51,6 +52,9 @@ namespace rubinius {
GO(iobuffer).set(ontology::new_class(state, "InternalBuffer",
G(object), G(io)));
G(iobuffer)->set_object_type(state, IOBufferType);

AtomicReference* ref = AtomicReference::create(state, Fixnum::from(-1));
G(io)->set_ivar(state, state->symbol("@max_open_fd"), ref);
}

IO* IO::create(STATE, int fd) {
Expand Down Expand Up @@ -103,9 +107,19 @@ namespace rubinius {

Fixnum* IO::open(STATE, String* path, Fixnum* mode, Fixnum* perm) {
int fd = ::open(path->c_str_null_safe(state), mode->to_native(), perm->to_native());
update_max_fd(state, fd);
return Fixnum::from(fd);
}

void IO::update_max_fd(STATE, native_int new_fd) {
AtomicReference* ref = as<AtomicReference>(G(io)->get_ivar(state, state->symbol("@max_open_fd")));
Fixnum* old_fd = as<Fixnum>(ref->get(state));

while(old_fd->to_native() < new_fd &&
ref->compare_and_set(state, old_fd, Fixnum::from(new_fd)) == cFalse) {
old_fd = as<Fixnum>(ref->get(state));
}
}

namespace {
/** Utility function used by IO::select, returns highest descriptor. */
Expand Down Expand Up @@ -305,6 +319,7 @@ namespace rubinius {
native_int cur_fd = to_fd();

int other_fd = ::open(path->c_str_null_safe(state), mode->to_native(), 0666);
update_max_fd(state, other_fd);

if(other_fd == -1) {
Exception::errno_error(state, "reopen");
Expand Down Expand Up @@ -348,6 +363,9 @@ namespace rubinius {
Exception::errno_error(state, "creating pipe");
}

update_max_fd(state, fds[0]);
update_max_fd(state, fds[1]);

lhs->descriptor(state, Fixnum::from(fds[0]));
rhs->descriptor(state, Fixnum::from(fds[1]));

Expand Down
2 changes: 2 additions & 0 deletions vm/builtin/io.hpp
Expand Up @@ -59,6 +59,8 @@ namespace rubinius {
// Rubinius.primitive :io_open
static Fixnum* open(STATE, String* path, Fixnum* mode, Fixnum* perm);

static void update_max_fd(STATE, native_int fd);

/**
* Perform select() on descriptors.
*
Expand Down
2 changes: 1 addition & 1 deletion vm/ontology.cpp
Expand Up @@ -296,6 +296,7 @@ namespace rubinius {
String::init(state);
Executable::init(state);
CompiledCode::init(state);
AtomicReference::init(state);
IO::init(state);
BlockEnvironment::init(state);
ConstantScope::init(state);
Expand Down Expand Up @@ -328,7 +329,6 @@ namespace rubinius {
Fiber::init(state);
Alias::init(state);
Randomizer::init(state);
AtomicReference::init(state);

Encoding::init(state);
kcode::init(state);
Expand Down

0 comments on commit 367ed96

Please sign in to comment.