From 648870b5c577239b3274b0b48c82fb74910dfabf Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Fri, 31 Mar 2023 00:48:55 +1300 Subject: [PATCH] Support `IO#pread` / `IO#pwrite` using fiber scheduler. (#7594) * Skip test if non-blocking file IO is not supported. --- include/ruby/fiber/scheduler.h | 78 +++++++++++++++++++++++----------- include/ruby/io/buffer.h | 2 + io.c | 42 +++++++++++++----- io_buffer.c | 27 +++++++++--- scheduler.c | 76 ++++++++++++++++++++++++--------- test/fiber/scheduler.rb | 58 +++++++++++++++++++++++++ test/fiber/test_io_buffer.rb | 41 ++++++++++++++++++ 7 files changed, 264 insertions(+), 60 deletions(-) diff --git a/include/ruby/fiber/scheduler.h b/include/ruby/fiber/scheduler.h index 250b39b6df5782..ad3d2d74833ede 100644 --- a/include/ruby/fiber/scheduler.h +++ b/include/ruby/fiber/scheduler.h @@ -267,10 +267,10 @@ VALUE rb_fiber_scheduler_io_selectv(VALUE scheduler, int argc, VALUE *argv); * Non-blocking read from the passed IO. * * @param[in] scheduler Target scheduler. - * @param[out] io An io object to read from. - * @param[out] buffer Return buffer. - * @param[in] length Requested number of bytes to read. - * @param[in] offset The offset in the buffer to read to. + * @param[in] io An io object to read from. + * @param[in] buffer The buffer to read to. + * @param[in] length The minimum number of bytes to read. + * @param[in] offset The offset in the buffer to read from. * @retval RUBY_Qundef `scheduler` doesn't have `#io_read`. * @return otherwise What `scheduler.io_read` returns `[-errno, size]`. */ @@ -280,9 +280,9 @@ VALUE rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t * Non-blocking write to the passed IO. * * @param[in] scheduler Target scheduler. - * @param[out] io An io object to write to. - * @param[in] buffer What to write. - * @param[in] length Number of bytes to write. + * @param[in] io An io object to write to. + * @param[in] buffer The buffer to write from. + * @param[in] length The minimum number of bytes to write. * @param[in] offset The offset in the buffer to write from. * @retval RUBY_Qundef `scheduler` doesn't have `#io_write`. * @return otherwise What `scheduler.io_write` returns `[-errno, size]`. @@ -293,10 +293,10 @@ VALUE rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_ * Non-blocking read from the passed IO at the specified offset. * * @param[in] scheduler Target scheduler. - * @param[out] io An io object to read from. - * @param[in] from The offset in the given IO to read the data from. - * @param[out] buffer The buffer to read the data to. - * @param[in] length Requested number of bytes to read. + * @param[in] io An io object to read from. + * @param[in] from The offset to read from. + * @param[in] buffer The buffer to read to. + * @param[in] length The minimum number of bytes to read. * @param[in] offset The offset in the buffer to read to. * @retval RUBY_Qundef `scheduler` doesn't have `#io_read`. * @return otherwise What `scheduler.io_read` returns. @@ -307,10 +307,10 @@ VALUE rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALU * Non-blocking write to the passed IO at the specified offset. * * @param[in] scheduler Target scheduler. - * @param[out] io An io object to write to. - * @param[in] from The offset in the given IO to write the data to. - * @param[in] buffer The buffer to write the data from. - * @param[in] length Number of bytes to write. + * @param[in] io An io object to write to. + * @param[in] from The offset to write to. + * @param[in] buffer The buffer to write from. + * @param[in] length The minimum number of bytes to write. * @param[in] offset The offset in the buffer to write from. * @retval RUBY_Qundef `scheduler` doesn't have `#io_write`. * @return otherwise What `scheduler.io_write` returns. @@ -321,27 +321,55 @@ VALUE rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VAL * Non-blocking read from the passed IO using a native buffer. * * @param[in] scheduler Target scheduler. - * @param[out] io An io object to read from. - * @param[out] buffer Return buffer. - * @param[in] size Size of the return buffer. - * @param[in] length Requested number of bytes to read. + * @param[in] io An io object to read from. + * @param[in] base The memory to read to. + * @param[in] size Size of the memory. + * @param[in] length The minimum number of bytes to read. * @retval RUBY_Qundef `scheduler` doesn't have `#io_read`. * @return otherwise What `scheduler.io_read` returns. */ -VALUE rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *buffer, size_t size, size_t length); +VALUE rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *base, size_t size, size_t length); /** * Non-blocking write to the passed IO using a native buffer. * * @param[in] scheduler Target scheduler. - * @param[out] io An io object to write to. - * @param[in] buffer What to write. - * @param[in] size Size of the buffer. - * @param[in] length Number of bytes to write. + * @param[in] io An io object to write to. + * @param[in] base The memory to write from. + * @param[in] size Size of the memory. + * @param[in] length The minimum number of bytes to write. + * @retval RUBY_Qundef `scheduler` doesn't have `#io_write`. + * @return otherwise What `scheduler.io_write` returns. + */ +VALUE rb_fiber_scheduler_io_write_memory(VALUE scheduler, VALUE io, const void *base, size_t size, size_t length); + +/** + * Non-blocking pread from the passed IO using a native buffer. + * + * @param[in] scheduler Target scheduler. + * @param[in] io An io object to read from. + * @param[in] from The offset to read from. + * @param[in] base The memory to read to. + * @param[in] size Size of the memory. + * @param[in] length The minimum number of bytes to read. + * @retval RUBY_Qundef `scheduler` doesn't have `#io_read`. + * @return otherwise What `scheduler.io_read` returns. + */ +VALUE rb_fiber_scheduler_io_pread_memory(VALUE scheduler, VALUE io, rb_off_t from, void *base, size_t size, size_t length); + +/** + * Non-blocking pwrite to the passed IO using a native buffer. + * + * @param[in] scheduler Target scheduler. + * @param[in] io An io object to write to. + * @param[in] from The offset to write from. + * @param[in] base The memory to write from. + * @param[in] size Size of the memory. + * @param[in] length The minimum number of bytes to write. * @retval RUBY_Qundef `scheduler` doesn't have `#io_write`. * @return otherwise What `scheduler.io_write` returns. */ -VALUE rb_fiber_scheduler_io_write_memory(VALUE scheduler, VALUE io, const void *buffer, size_t size, size_t length); +VALUE rb_fiber_scheduler_io_pwrite_memory(VALUE scheduler, VALUE io, rb_off_t from, const void *base, size_t size, size_t length); /** * Non-blocking close the given IO. diff --git a/include/ruby/io/buffer.h b/include/ruby/io/buffer.h index 88e55980668111..737fafe518e01e 100644 --- a/include/ruby/io/buffer.h +++ b/include/ruby/io/buffer.h @@ -75,7 +75,9 @@ VALUE rb_io_buffer_map(VALUE io, size_t size, rb_off_t offset, enum rb_io_buffer VALUE rb_io_buffer_lock(VALUE self); VALUE rb_io_buffer_unlock(VALUE self); int rb_io_buffer_try_unlock(VALUE self); + VALUE rb_io_buffer_free(VALUE self); +VALUE rb_io_buffer_free_locked(VALUE self); int rb_io_buffer_get_bytes(VALUE self, void **base, size_t *size); void rb_io_buffer_get_bytes_for_reading(VALUE self, const void **base, size_t *size); diff --git a/io.c b/io.c index a023039209b914..be5d47909942e1 100644 --- a/io.c +++ b/io.c @@ -6066,6 +6066,7 @@ rb_io_sysread(int argc, VALUE *argv, VALUE io) #if defined(HAVE_PREAD) || defined(HAVE_PWRITE) struct prdwr_internal_arg { + VALUE io; int fd; void *buf; size_t count; @@ -6075,17 +6076,28 @@ struct prdwr_internal_arg { #if defined(HAVE_PREAD) static VALUE -internal_pread_func(void *arg) +internal_pread_func(void *_arg) { - struct prdwr_internal_arg *p = arg; - return (VALUE)pread(p->fd, p->buf, p->count, p->offset); + struct prdwr_internal_arg *arg = _arg; + + return (VALUE)pread(arg->fd, arg->buf, arg->count, arg->offset); } static VALUE -pread_internal_call(VALUE arg) +pread_internal_call(VALUE _arg) { - struct prdwr_internal_arg *p = (struct prdwr_internal_arg *)arg; - return rb_thread_io_blocking_region(internal_pread_func, p, p->fd); + struct prdwr_internal_arg *arg = (struct prdwr_internal_arg *)_arg; + + VALUE scheduler = rb_fiber_scheduler_current(); + if (scheduler != Qnil) { + VALUE result = rb_fiber_scheduler_io_pread_memory(scheduler, arg->io, arg->offset, arg->buf, arg->count, 0); + + if (!UNDEF_P(result)) { + return rb_fiber_scheduler_io_result_apply(result); + } + } + + return rb_thread_io_blocking_region(internal_pread_func, arg, arg->fd); } /* @@ -6122,7 +6134,7 @@ rb_io_pread(int argc, VALUE *argv, VALUE io) VALUE len, offset, str; rb_io_t *fptr; ssize_t n; - struct prdwr_internal_arg arg; + struct prdwr_internal_arg arg = {.io = io}; int shrinkable; rb_scan_args(argc, argv, "21", &len, &offset, &str); @@ -6158,9 +6170,19 @@ rb_io_pread(int argc, VALUE *argv, VALUE io) #if defined(HAVE_PWRITE) static VALUE -internal_pwrite_func(void *ptr) +internal_pwrite_func(void *_arg) { - struct prdwr_internal_arg *arg = ptr; + struct prdwr_internal_arg *arg = _arg; + + VALUE scheduler = rb_fiber_scheduler_current(); + if (scheduler != Qnil) { + VALUE result = rb_fiber_scheduler_io_pwrite_memory(scheduler, arg->io, arg->offset, arg->buf, arg->count, 0); + + if (!UNDEF_P(result)) { + return rb_fiber_scheduler_io_result_apply(result); + } + } + return (VALUE)pwrite(arg->fd, arg->buf, arg->count, arg->offset); } @@ -6195,7 +6217,7 @@ rb_io_pwrite(VALUE io, VALUE str, VALUE offset) { rb_io_t *fptr; ssize_t n; - struct prdwr_internal_arg arg; + struct prdwr_internal_arg arg = {.io = io}; VALUE tmp; if (!RB_TYPE_P(str, T_STRING)) diff --git a/io_buffer.c b/io_buffer.c index 2fc7ac8a80f698..4a088111851535 100644 --- a/io_buffer.c +++ b/io_buffer.c @@ -1001,17 +1001,23 @@ rb_io_buffer_lock(VALUE self) return self; } -VALUE -rb_io_buffer_unlock(VALUE self) +static void +io_buffer_unlock(struct rb_io_buffer *data) { - struct rb_io_buffer *data = NULL; - TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data); - if (!(data->flags & RB_IO_BUFFER_LOCKED)) { rb_raise(rb_eIOBufferLockedError, "Buffer not locked!"); } data->flags &= ~RB_IO_BUFFER_LOCKED; +} + +VALUE +rb_io_buffer_unlock(VALUE self) +{ + struct rb_io_buffer *data = NULL; + TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data); + + io_buffer_unlock(data); return self; } @@ -1123,6 +1129,17 @@ rb_io_buffer_free(VALUE self) return self; } +VALUE rb_io_buffer_free_locked(VALUE self) +{ + struct rb_io_buffer *data = NULL; + TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data); + + io_buffer_unlock(data); + io_buffer_free(data); + + return self; +} + // Validate that access to the buffer is within bounds, assuming you want to // access length bytes from the specified offset. static inline void diff --git a/scheduler.c b/scheduler.c index 477f11c03c32b7..866e53993fff1e 100644 --- a/scheduler.c +++ b/scheduler.c @@ -458,15 +458,15 @@ VALUE rb_fiber_scheduler_io_selectv(VALUE scheduler, int argc, VALUE *argv) /* * Document-method: Fiber::Scheduler#io_read - * call-seq: io_read(io, buffer, minimum_length) -> read length or -errno + * call-seq: io_read(io, buffer, length) -> read length or -errno * * Invoked by IO#read or IO#Buffer.read to read +length+ bytes from +io+ into a - * specified +buffer+ (see IO::Buffer). + * specified +buffer+ (see IO::Buffer) at the given +offset+. * - * The +minimum_length+ argument is the "minimum length to be read". If the IO - * buffer size is 8KiB, but the +length+ is +1024+ (1KiB), up to 8KiB might be - * read, but at least 1KiB will be. Generally, the only case where less data - * than +length+ will be read is if there is an error reading the data. + * The +length+ argument is the "minimum length to be read". If the IO buffer + * size is 8KiB, but the +length+ is +1024+ (1KiB), up to 8KiB might be read, + * but at least 1KiB will be. Generally, the only case where less data than + * +length+ will be read is if there is an error reading the data. * * Specifying a +length+ of 0 is valid and means try reading at least once and * return any available data. @@ -492,13 +492,19 @@ rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t lengt return rb_check_funcall(scheduler, id_io_read, 4, arguments); } - /* * Document-method: Fiber::Scheduler#io_read * call-seq: io_pread(io, buffer, from, length, offset) -> read length or -errno * - * Invoked by IO::Buffer#pread. See that method for description of arguments. + * Invoked by IO#pread or IO::Buffer#pread to read +length+ bytes from +io+ + * at offset +from+ into a specified +buffer+ (see IO::Buffer) at the given + * +offset+. + * + * This method is semantically the same as #io_read, but it allows to specify + * the offset to read from and is often better for asynchronous IO on the same + * file. * + * The method should be considered _experimental_. */ VALUE rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset) @@ -512,16 +518,16 @@ rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buff /* * Document-method: Scheduler#io_write - * call-seq: io_write(io, buffer, minimum_length) -> written length or -errno + * call-seq: io_write(io, buffer, length) -> written length or -errno * * Invoked by IO#write or IO::Buffer#write to write +length+ bytes to +io+ from - * from a specified +buffer+ (see IO::Buffer). + * from a specified +buffer+ (see IO::Buffer) at the given +offset+. * - * The +minimum_length+ argument is the "minimum length to be written". If the - * IO buffer size is 8KiB, but the +length+ specified is 1024 (1KiB), at most - * 8KiB will be written, but at least 1KiB will be. Generally, the only case - * where less data than +minimum_length+ will be written is if there is an - * error writing the data. + * The +length+ argument is the "minimum length to be written". If the IO + * buffer size is 8KiB, but the +length+ specified is 1024 (1KiB), at most 8KiB + * will be written, but at least 1KiB will be. Generally, the only case where + * less data than +length+ will be written is if there is an error writing the + * data. * * Specifying a +length+ of 0 is valid and means try writing at least once, as * much data as possible. @@ -552,7 +558,15 @@ rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t leng * Document-method: Fiber::Scheduler#io_pwrite * call-seq: io_pwrite(io, buffer, from, length, offset) -> written length or -errno * - * Invoked by IO::Buffer#pwrite. See that method for description of arguments. + * Invoked by IO#pwrite or IO::Buffer#pwrite to write +length+ bytes to +io+ + * at offset +from+ into a specified +buffer+ (see IO::Buffer) at the given + * +offset+. + * + * This method is semantically the same as #io_write, but it allows to specify + * the offset to write to and is often better for asynchronous IO on the same + * file. + * + * The method should be considered _experimental_. * */ VALUE @@ -572,8 +586,7 @@ rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *base, size_t VALUE result = rb_fiber_scheduler_io_read(scheduler, io, buffer, length, 0); - rb_io_buffer_unlock(buffer); - rb_io_buffer_free(buffer); + rb_io_buffer_free_locked(buffer); return result; } @@ -585,8 +598,31 @@ rb_fiber_scheduler_io_write_memory(VALUE scheduler, VALUE io, const void *base, VALUE result = rb_fiber_scheduler_io_write(scheduler, io, buffer, length, 0); - rb_io_buffer_unlock(buffer); - rb_io_buffer_free(buffer); + rb_io_buffer_free_locked(buffer); + + return result; +} + +VALUE +rb_fiber_scheduler_io_pread_memory(VALUE scheduler, VALUE io, rb_off_t from, void *base, size_t size, size_t length) +{ + VALUE buffer = rb_io_buffer_new(base, size, RB_IO_BUFFER_LOCKED); + + VALUE result = rb_fiber_scheduler_io_pread(scheduler, io, from, buffer, length, 0); + + rb_io_buffer_free_locked(buffer); + + return result; +} + +VALUE +rb_fiber_scheduler_io_pwrite_memory(VALUE scheduler, VALUE io, rb_off_t from, const void *base, size_t size, size_t length) +{ + VALUE buffer = rb_io_buffer_new((void*)base, size, RB_IO_BUFFER_LOCKED|RB_IO_BUFFER_READONLY); + + VALUE result = rb_fiber_scheduler_io_pwrite(scheduler, io, from, buffer, length, 0); + + rb_io_buffer_free_locked(buffer); return result; } diff --git a/test/fiber/scheduler.rb b/test/fiber/scheduler.rb index 71ca4d27899979..5090271db157db 100644 --- a/test/fiber/scheduler.rb +++ b/test/fiber/scheduler.rb @@ -366,6 +366,64 @@ def io_write(io, buffer, length, offset) return total end + def io_pread(io, buffer, from, length, offset) + total = 0 + io.nonblock = true + + while true + maximum_size = buffer.size - offset + result = blocking{buffer.pread(io, from, maximum_size, offset)} + + if result > 0 + total += result + offset += result + from += result + break if total >= length + elsif result == 0 + break + elsif result == EAGAIN + if length > 0 + self.io_wait(io, IO::READABLE, nil) + else + return result + end + elsif result < 0 + return result + end + end + + return total + end + + def io_pwrite(io, buffer, from, length, offset) + total = 0 + io.nonblock = true + + while true + maximum_size = buffer.size - offset + result = blocking{buffer.pwrite(io, from, maximum_size, offset)} + + if result > 0 + total += result + offset += result + from += result + break if total >= length + elsif result == 0 + break + elsif result == EAGAIN + if length > 0 + self.io_wait(io, IO::WRITABLE, nil) + else + return result + end + elsif result < 0 + return result + end + end + + return total + end + def blocking(&block) Fiber.blocking(&block) end diff --git a/test/fiber/test_io_buffer.rb b/test/fiber/test_io_buffer.rb index 3de70200d5c075..a08b1ce1a9d0cf 100644 --- a/test/fiber/test_io_buffer.rb +++ b/test/fiber/test_io_buffer.rb @@ -155,4 +155,45 @@ def test_io_buffer_read_write i&.close o&.close end + + def nonblockable?(io) + io.nonblock{} + true + rescue + false + end + + def test_io_buffer_pread_pwrite + file = Tempfile.new("test_io_buffer_pread_pwrite") + + omit "Non-blocking file IO is not supported" unless nonblockable?(file) + + source_buffer = IO::Buffer.for("Hello World!") + destination_buffer = IO::Buffer.new(source_buffer.size) + + # Test non-scheduler code path: + source_buffer.pwrite(file, 1, source_buffer.size) + destination_buffer.pread(file, 1, source_buffer.size) + assert_equal source_buffer, destination_buffer + + # Test scheduler code path: + destination_buffer.clear + file.truncate(0) + + thread = Thread.new do + scheduler = IOBufferScheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + source_buffer.pwrite(file, 1, source_buffer.size) + destination_buffer.pread(file, 1, source_buffer.size) + end + end + + thread.join + + assert_equal source_buffer, destination_buffer + ensure + file&.close! + end end