Skip to content

Commit

Permalink
Support IO#pread / IO#pwrite using fiber scheduler. (#7594)
Browse files Browse the repository at this point in the history
* Skip test if non-blocking file IO is not supported.
  • Loading branch information
ioquatix committed Mar 30, 2023
1 parent 6f12296 commit 648870b
Show file tree
Hide file tree
Showing 7 changed files with 264 additions and 60 deletions.
78 changes: 53 additions & 25 deletions include/ruby/fiber/scheduler.h
Expand Up @@ -267,10 +267,10 @@ VALUE rb_fiber_scheduler_io_selectv(VALUE scheduler, int argc, VALUE *argv);
* Non-blocking read from the passed IO.
*
* @param[in] scheduler Target scheduler.
* @param[out] io An io object to read from.
* @param[out] buffer Return buffer.
* @param[in] length Requested number of bytes to read.
* @param[in] offset The offset in the buffer to read to.
* @param[in] io An io object to read from.
* @param[in] buffer The buffer to read to.
* @param[in] length The minimum number of bytes to read.
* @param[in] offset The offset in the buffer to read from.
* @retval RUBY_Qundef `scheduler` doesn't have `#io_read`.
* @return otherwise What `scheduler.io_read` returns `[-errno, size]`.
*/
Expand All @@ -280,9 +280,9 @@ VALUE rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t
* Non-blocking write to the passed IO.
*
* @param[in] scheduler Target scheduler.
* @param[out] io An io object to write to.
* @param[in] buffer What to write.
* @param[in] length Number of bytes to write.
* @param[in] io An io object to write to.
* @param[in] buffer The buffer to write from.
* @param[in] length The minimum number of bytes to write.
* @param[in] offset The offset in the buffer to write from.
* @retval RUBY_Qundef `scheduler` doesn't have `#io_write`.
* @return otherwise What `scheduler.io_write` returns `[-errno, size]`.
Expand All @@ -293,10 +293,10 @@ VALUE rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_
* Non-blocking read from the passed IO at the specified offset.
*
* @param[in] scheduler Target scheduler.
* @param[out] io An io object to read from.
* @param[in] from The offset in the given IO to read the data from.
* @param[out] buffer The buffer to read the data to.
* @param[in] length Requested number of bytes to read.
* @param[in] io An io object to read from.
* @param[in] from The offset to read from.
* @param[in] buffer The buffer to read to.
* @param[in] length The minimum number of bytes to read.
* @param[in] offset The offset in the buffer to read to.
* @retval RUBY_Qundef `scheduler` doesn't have `#io_read`.
* @return otherwise What `scheduler.io_read` returns.
Expand All @@ -307,10 +307,10 @@ VALUE rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALU
* Non-blocking write to the passed IO at the specified offset.
*
* @param[in] scheduler Target scheduler.
* @param[out] io An io object to write to.
* @param[in] from The offset in the given IO to write the data to.
* @param[in] buffer The buffer to write the data from.
* @param[in] length Number of bytes to write.
* @param[in] io An io object to write to.
* @param[in] from The offset to write to.
* @param[in] buffer The buffer to write from.
* @param[in] length The minimum number of bytes to write.
* @param[in] offset The offset in the buffer to write from.
* @retval RUBY_Qundef `scheduler` doesn't have `#io_write`.
* @return otherwise What `scheduler.io_write` returns.
Expand All @@ -321,27 +321,55 @@ VALUE rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VAL
* Non-blocking read from the passed IO using a native buffer.
*
* @param[in] scheduler Target scheduler.
* @param[out] io An io object to read from.
* @param[out] buffer Return buffer.
* @param[in] size Size of the return buffer.
* @param[in] length Requested number of bytes to read.
* @param[in] io An io object to read from.
* @param[in] base The memory to read to.
* @param[in] size Size of the memory.
* @param[in] length The minimum number of bytes to read.
* @retval RUBY_Qundef `scheduler` doesn't have `#io_read`.
* @return otherwise What `scheduler.io_read` returns.
*/
VALUE rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *buffer, size_t size, size_t length);
VALUE rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *base, size_t size, size_t length);

/**
* Non-blocking write to the passed IO using a native buffer.
*
* @param[in] scheduler Target scheduler.
* @param[out] io An io object to write to.
* @param[in] buffer What to write.
* @param[in] size Size of the buffer.
* @param[in] length Number of bytes to write.
* @param[in] io An io object to write to.
* @param[in] base The memory to write from.
* @param[in] size Size of the memory.
* @param[in] length The minimum number of bytes to write.
* @retval RUBY_Qundef `scheduler` doesn't have `#io_write`.
* @return otherwise What `scheduler.io_write` returns.
*/
VALUE rb_fiber_scheduler_io_write_memory(VALUE scheduler, VALUE io, const void *base, size_t size, size_t length);

/**
* Non-blocking pread from the passed IO using a native buffer.
*
* @param[in] scheduler Target scheduler.
* @param[in] io An io object to read from.
* @param[in] from The offset to read from.
* @param[in] base The memory to read to.
* @param[in] size Size of the memory.
* @param[in] length The minimum number of bytes to read.
* @retval RUBY_Qundef `scheduler` doesn't have `#io_read`.
* @return otherwise What `scheduler.io_read` returns.
*/
VALUE rb_fiber_scheduler_io_pread_memory(VALUE scheduler, VALUE io, rb_off_t from, void *base, size_t size, size_t length);

/**
* Non-blocking pwrite to the passed IO using a native buffer.
*
* @param[in] scheduler Target scheduler.
* @param[in] io An io object to write to.
* @param[in] from The offset to write from.
* @param[in] base The memory to write from.
* @param[in] size Size of the memory.
* @param[in] length The minimum number of bytes to write.
* @retval RUBY_Qundef `scheduler` doesn't have `#io_write`.
* @return otherwise What `scheduler.io_write` returns.
*/
VALUE rb_fiber_scheduler_io_write_memory(VALUE scheduler, VALUE io, const void *buffer, size_t size, size_t length);
VALUE rb_fiber_scheduler_io_pwrite_memory(VALUE scheduler, VALUE io, rb_off_t from, const void *base, size_t size, size_t length);

/**
* Non-blocking close the given IO.
Expand Down
2 changes: 2 additions & 0 deletions include/ruby/io/buffer.h
Expand Up @@ -75,7 +75,9 @@ VALUE rb_io_buffer_map(VALUE io, size_t size, rb_off_t offset, enum rb_io_buffer
VALUE rb_io_buffer_lock(VALUE self);
VALUE rb_io_buffer_unlock(VALUE self);
int rb_io_buffer_try_unlock(VALUE self);

VALUE rb_io_buffer_free(VALUE self);
VALUE rb_io_buffer_free_locked(VALUE self);

int rb_io_buffer_get_bytes(VALUE self, void **base, size_t *size);
void rb_io_buffer_get_bytes_for_reading(VALUE self, const void **base, size_t *size);
Expand Down
42 changes: 32 additions & 10 deletions io.c
Expand Up @@ -6066,6 +6066,7 @@ rb_io_sysread(int argc, VALUE *argv, VALUE io)

#if defined(HAVE_PREAD) || defined(HAVE_PWRITE)
struct prdwr_internal_arg {
VALUE io;
int fd;
void *buf;
size_t count;
Expand All @@ -6075,17 +6076,28 @@ struct prdwr_internal_arg {

#if defined(HAVE_PREAD)
static VALUE
internal_pread_func(void *arg)
internal_pread_func(void *_arg)
{
struct prdwr_internal_arg *p = arg;
return (VALUE)pread(p->fd, p->buf, p->count, p->offset);
struct prdwr_internal_arg *arg = _arg;

return (VALUE)pread(arg->fd, arg->buf, arg->count, arg->offset);
}

static VALUE
pread_internal_call(VALUE arg)
pread_internal_call(VALUE _arg)
{
struct prdwr_internal_arg *p = (struct prdwr_internal_arg *)arg;
return rb_thread_io_blocking_region(internal_pread_func, p, p->fd);
struct prdwr_internal_arg *arg = (struct prdwr_internal_arg *)_arg;

VALUE scheduler = rb_fiber_scheduler_current();
if (scheduler != Qnil) {
VALUE result = rb_fiber_scheduler_io_pread_memory(scheduler, arg->io, arg->offset, arg->buf, arg->count, 0);

if (!UNDEF_P(result)) {
return rb_fiber_scheduler_io_result_apply(result);
}
}

return rb_thread_io_blocking_region(internal_pread_func, arg, arg->fd);
}

/*
Expand Down Expand Up @@ -6122,7 +6134,7 @@ rb_io_pread(int argc, VALUE *argv, VALUE io)
VALUE len, offset, str;
rb_io_t *fptr;
ssize_t n;
struct prdwr_internal_arg arg;
struct prdwr_internal_arg arg = {.io = io};
int shrinkable;

rb_scan_args(argc, argv, "21", &len, &offset, &str);
Expand Down Expand Up @@ -6158,9 +6170,19 @@ rb_io_pread(int argc, VALUE *argv, VALUE io)

#if defined(HAVE_PWRITE)
static VALUE
internal_pwrite_func(void *ptr)
internal_pwrite_func(void *_arg)
{
struct prdwr_internal_arg *arg = ptr;
struct prdwr_internal_arg *arg = _arg;

VALUE scheduler = rb_fiber_scheduler_current();
if (scheduler != Qnil) {
VALUE result = rb_fiber_scheduler_io_pwrite_memory(scheduler, arg->io, arg->offset, arg->buf, arg->count, 0);

if (!UNDEF_P(result)) {
return rb_fiber_scheduler_io_result_apply(result);
}
}


return (VALUE)pwrite(arg->fd, arg->buf, arg->count, arg->offset);
}
Expand Down Expand Up @@ -6195,7 +6217,7 @@ rb_io_pwrite(VALUE io, VALUE str, VALUE offset)
{
rb_io_t *fptr;
ssize_t n;
struct prdwr_internal_arg arg;
struct prdwr_internal_arg arg = {.io = io};
VALUE tmp;

if (!RB_TYPE_P(str, T_STRING))
Expand Down
27 changes: 22 additions & 5 deletions io_buffer.c
Expand Up @@ -1001,17 +1001,23 @@ rb_io_buffer_lock(VALUE self)
return self;
}

VALUE
rb_io_buffer_unlock(VALUE self)
static void
io_buffer_unlock(struct rb_io_buffer *data)
{
struct rb_io_buffer *data = NULL;
TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data);

if (!(data->flags & RB_IO_BUFFER_LOCKED)) {
rb_raise(rb_eIOBufferLockedError, "Buffer not locked!");
}

data->flags &= ~RB_IO_BUFFER_LOCKED;
}

VALUE
rb_io_buffer_unlock(VALUE self)
{
struct rb_io_buffer *data = NULL;
TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data);

io_buffer_unlock(data);

return self;
}
Expand Down Expand Up @@ -1123,6 +1129,17 @@ rb_io_buffer_free(VALUE self)
return self;
}

VALUE rb_io_buffer_free_locked(VALUE self)
{
struct rb_io_buffer *data = NULL;
TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data);

io_buffer_unlock(data);
io_buffer_free(data);

return self;
}

// Validate that access to the buffer is within bounds, assuming you want to
// access length bytes from the specified offset.
static inline void
Expand Down
76 changes: 56 additions & 20 deletions scheduler.c
Expand Up @@ -458,15 +458,15 @@ VALUE rb_fiber_scheduler_io_selectv(VALUE scheduler, int argc, VALUE *argv)

/*
* Document-method: Fiber::Scheduler#io_read
* call-seq: io_read(io, buffer, minimum_length) -> read length or -errno
* call-seq: io_read(io, buffer, length) -> read length or -errno
*
* Invoked by IO#read or IO#Buffer.read to read +length+ bytes from +io+ into a
* specified +buffer+ (see IO::Buffer).
* specified +buffer+ (see IO::Buffer) at the given +offset+.
*
* The +minimum_length+ argument is the "minimum length to be read". If the IO
* buffer size is 8KiB, but the +length+ is +1024+ (1KiB), up to 8KiB might be
* read, but at least 1KiB will be. Generally, the only case where less data
* than +length+ will be read is if there is an error reading the data.
* The +length+ argument is the "minimum length to be read". If the IO buffer
* size is 8KiB, but the +length+ is +1024+ (1KiB), up to 8KiB might be read,
* but at least 1KiB will be. Generally, the only case where less data than
* +length+ will be read is if there is an error reading the data.
*
* Specifying a +length+ of 0 is valid and means try reading at least once and
* return any available data.
Expand All @@ -492,13 +492,19 @@ rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t lengt
return rb_check_funcall(scheduler, id_io_read, 4, arguments);
}


/*
* Document-method: Fiber::Scheduler#io_read
* call-seq: io_pread(io, buffer, from, length, offset) -> read length or -errno
*
* Invoked by IO::Buffer#pread. See that method for description of arguments.
* Invoked by IO#pread or IO::Buffer#pread to read +length+ bytes from +io+
* at offset +from+ into a specified +buffer+ (see IO::Buffer) at the given
* +offset+.
*
* This method is semantically the same as #io_read, but it allows to specify
* the offset to read from and is often better for asynchronous IO on the same
* file.
*
* The method should be considered _experimental_.
*/
VALUE
rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
Expand All @@ -512,16 +518,16 @@ rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buff

/*
* Document-method: Scheduler#io_write
* call-seq: io_write(io, buffer, minimum_length) -> written length or -errno
* call-seq: io_write(io, buffer, length) -> written length or -errno
*
* Invoked by IO#write or IO::Buffer#write to write +length+ bytes to +io+ from
* from a specified +buffer+ (see IO::Buffer).
* from a specified +buffer+ (see IO::Buffer) at the given +offset+.
*
* The +minimum_length+ argument is the "minimum length to be written". If the
* IO buffer size is 8KiB, but the +length+ specified is 1024 (1KiB), at most
* 8KiB will be written, but at least 1KiB will be. Generally, the only case
* where less data than +minimum_length+ will be written is if there is an
* error writing the data.
* The +length+ argument is the "minimum length to be written". If the IO
* buffer size is 8KiB, but the +length+ specified is 1024 (1KiB), at most 8KiB
* will be written, but at least 1KiB will be. Generally, the only case where
* less data than +length+ will be written is if there is an error writing the
* data.
*
* Specifying a +length+ of 0 is valid and means try writing at least once, as
* much data as possible.
Expand Down Expand Up @@ -552,7 +558,15 @@ rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t leng
* Document-method: Fiber::Scheduler#io_pwrite
* call-seq: io_pwrite(io, buffer, from, length, offset) -> written length or -errno
*
* Invoked by IO::Buffer#pwrite. See that method for description of arguments.
* Invoked by IO#pwrite or IO::Buffer#pwrite to write +length+ bytes to +io+
* at offset +from+ into a specified +buffer+ (see IO::Buffer) at the given
* +offset+.
*
* This method is semantically the same as #io_write, but it allows to specify
* the offset to write to and is often better for asynchronous IO on the same
* file.
*
* The method should be considered _experimental_.
*
*/
VALUE
Expand All @@ -572,8 +586,7 @@ rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *base, size_t

VALUE result = rb_fiber_scheduler_io_read(scheduler, io, buffer, length, 0);

rb_io_buffer_unlock(buffer);
rb_io_buffer_free(buffer);
rb_io_buffer_free_locked(buffer);

return result;
}
Expand All @@ -585,8 +598,31 @@ rb_fiber_scheduler_io_write_memory(VALUE scheduler, VALUE io, const void *base,

VALUE result = rb_fiber_scheduler_io_write(scheduler, io, buffer, length, 0);

rb_io_buffer_unlock(buffer);
rb_io_buffer_free(buffer);
rb_io_buffer_free_locked(buffer);

return result;
}

VALUE
rb_fiber_scheduler_io_pread_memory(VALUE scheduler, VALUE io, rb_off_t from, void *base, size_t size, size_t length)
{
VALUE buffer = rb_io_buffer_new(base, size, RB_IO_BUFFER_LOCKED);

VALUE result = rb_fiber_scheduler_io_pread(scheduler, io, from, buffer, length, 0);

rb_io_buffer_free_locked(buffer);

return result;
}

VALUE
rb_fiber_scheduler_io_pwrite_memory(VALUE scheduler, VALUE io, rb_off_t from, const void *base, size_t size, size_t length)
{
VALUE buffer = rb_io_buffer_new((void*)base, size, RB_IO_BUFFER_LOCKED|RB_IO_BUFFER_READONLY);

VALUE result = rb_fiber_scheduler_io_pwrite(scheduler, io, from, buffer, length, 0);

rb_io_buffer_free_locked(buffer);

return result;
}
Expand Down

0 comments on commit 648870b

Please sign in to comment.