Skip to content

Commit

Permalink
Make io_binwrite atomic.
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed May 28, 2022
1 parent 8a13a2e commit ce23cfa
Showing 1 changed file with 124 additions and 88 deletions.
212 changes: 124 additions & 88 deletions io.c
Expand Up @@ -1581,71 +1581,111 @@ struct write_arg {

#ifdef HAVE_WRITEV
static VALUE
io_binwrite_string(VALUE arg)
io_binwrite_string_internal(rb_io_t *fptr, const char *ptr, long length)
{
struct binwrite_arg *p = (struct binwrite_arg *)arg;
rb_io_t *fptr = p->fptr;
long r;

if (fptr->wbuf.len) {
struct iovec iov[2];
struct iovec iov[2];

iov[0].iov_base = fptr->wbuf.ptr+fptr->wbuf.off;
iov[0].iov_len = fptr->wbuf.len;
iov[1].iov_base = (char *)p->ptr;
iov[1].iov_len = p->length;
iov[0].iov_base = fptr->wbuf.ptr+fptr->wbuf.off;
iov[0].iov_len = fptr->wbuf.len;
iov[1].iov_base = (void*)ptr;
iov[1].iov_len = length;

r = rb_writev_internal(fptr, iov, 2);
long result = rb_writev_internal(fptr, iov, 2);

if (r < 0)
return r;
if (result < 0)
return result;

if (fptr->wbuf.len <= r) {
r -= fptr->wbuf.len;
fptr->wbuf.off = 0;
fptr->wbuf.len = 0;
}
else {
fptr->wbuf.off += (int)r;
fptr->wbuf.len -= (int)r;
r = 0L;
}
if (result >= fptr->wbuf.len) {
// We wrote more than the internal buffer:
result -= fptr->wbuf.len;
fptr->wbuf.off = 0;
fptr->wbuf.len = 0;
}
else {
// We only wrote less data than the internal buffer:
fptr->wbuf.off += (int)result;
fptr->wbuf.len -= (int)result;

return 0L;
}

return result;
}
else {
r = rb_write_internal(fptr, p->ptr, p->length);
return rb_write_internal(fptr, ptr, length);
}

return r;
}
#else
static VALUE
io_binwrite_string_internal(rb_io_t *fptr, const char *ptr, long length)
{
long remaining = length;

if (fptr->wbuf.len) {
if (fptr->wbuf.len+length <= fptr->wbuf.capa) {
if (fptr->wbuf.capa < fptr->wbuf.off+fptr->wbuf.len+length) {
MEMMOVE(fptr->wbuf.ptr, fptr->wbuf.ptr+fptr->wbuf.off, char, fptr->wbuf.len);
fptr->wbuf.off = 0;
}

MEMMOVE(fptr->wbuf.ptr+fptr->wbuf.off+fptr->wbuf.len, ptr, char, length);
fptr->wbuf.len += (int)length;

// We copied the entire incoming data to the internal buffer:
remaining = 0;
}

// Flush the internal buffer:
if (io_fflush(fptr) < 0) {
return -1;
}

// If all the data was buffered, we are done:
if (remaining == 0) {
return length;
}
}

// Otherwise, we should write the data directly:
return rb_write_internal(fptr, ptr, length);
}
#endif

static VALUE
io_binwrite_string(VALUE arg)
{
struct binwrite_arg *p = (struct binwrite_arg *)arg;
rb_io_t *fptr = p->fptr;
long l, len;

l = len = p->length;
const char *ptr = p->ptr;
long remaining = p->length;

if (fptr->wbuf.len) {
if (fptr->wbuf.len+len <= fptr->wbuf.capa) {
if (fptr->wbuf.capa < fptr->wbuf.off+fptr->wbuf.len+len) {
MEMMOVE(fptr->wbuf.ptr, fptr->wbuf.ptr+fptr->wbuf.off, char, fptr->wbuf.len);
fptr->wbuf.off = 0;
}
MEMMOVE(fptr->wbuf.ptr+fptr->wbuf.off+fptr->wbuf.len, p->ptr, char, len);
fptr->wbuf.len += (int)len;
l = 0;
}
if (io_fflush(fptr) < 0)
return -2L; /* fail in fflush */
if (l == 0)
return len;
while (remaining) {
// Write as much as possible:
long result = (long)io_binwrite_string_internal(p->fptr, ptr, remaining);

// Finished:
if (result == remaining) {
break;
}

if (result >= 0) {
ptr += result;
remaining -= result;
errno = EAGAIN;
}

// Wait for it to become writable:
if (rb_io_maybe_wait_writable(errno, p->fptr->self, Qnil)) {
rb_io_check_closed(p->fptr);
} else {
// The error was unrelated to waiting for it to become writable, so we fail:
return -1;
}
}

return rb_write_internal(p->fptr, p->ptr, p->length);
return p->length;
}
#endif

inline static void
io_allocate_write_buffer(rb_io_t *fptr, int sync)
Expand All @@ -1660,65 +1700,57 @@ io_allocate_write_buffer(rb_io_t *fptr, int sync)
}
}

static inline int
io_binwrite_requires_flush_write(rb_io_t *fptr, long len, int nosync)
{
// If the requested operation was synchronous and the output mode is synchronus or a TTY:
if (!nosync && (fptr->mode & (FMODE_SYNC|FMODE_TTY)))
return 1;

// If the amount of data we want to write exceeds the internal buffer:
if (fptr->wbuf.ptr && fptr->wbuf.capa <= fptr->wbuf.len + len)
return 1;

// Otherwise, we can append to the internal buffer:
return 0;
}

static long
io_binwrite(VALUE str, const char *ptr, long len, rb_io_t *fptr, int nosync)
{
long n, r, offset = 0;
if (len <= 0) return len;

/* don't write anything if current thread has a pending interrupt. */
// Don't write anything if current thread has a pending interrupt:
rb_thread_check_ints();

if ((n = len) <= 0) return n;

io_allocate_write_buffer(fptr, !nosync);

if ((!nosync && (fptr->mode & (FMODE_SYNC|FMODE_TTY))) ||
(fptr->wbuf.ptr && fptr->wbuf.capa <= fptr->wbuf.len + len)) {
if (io_binwrite_requires_flush_write(fptr, len, nosync)) {
struct binwrite_arg arg;

arg.fptr = fptr;
arg.str = str;
retry:
arg.ptr = ptr + offset;
arg.length = n;
arg.ptr = ptr;
arg.length = len;

if (fptr->write_lock) {
r = rb_mutex_synchronize(fptr->write_lock, io_binwrite_string, (VALUE)&arg);
return rb_mutex_synchronize(fptr->write_lock, io_binwrite_string, (VALUE)&arg);
}
else {
r = io_binwrite_string((VALUE)&arg);
return io_binwrite_string((VALUE)&arg);
}

/* xxx: other threads may modify given string. */
if (r == n) return len;
if (0 <= r) {
offset += r;
n -= r;
errno = EAGAIN;
} else {
if (fptr->wbuf.off) {
if (fptr->wbuf.len)
MEMMOVE(fptr->wbuf.ptr, fptr->wbuf.ptr+fptr->wbuf.off, char, fptr->wbuf.len);
fptr->wbuf.off = 0;
}

if (r == -2L)
return -1L;
if (rb_io_maybe_wait_writable(errno, fptr->self, Qnil)) {
rb_io_check_closed(fptr);

if (offset < len)
goto retry;
}

return -1L;
}
MEMMOVE(fptr->wbuf.ptr+fptr->wbuf.off+fptr->wbuf.len, ptr, char, len);
fptr->wbuf.len += (int)len;

if (fptr->wbuf.off) {
if (fptr->wbuf.len)
MEMMOVE(fptr->wbuf.ptr, fptr->wbuf.ptr+fptr->wbuf.off, char, fptr->wbuf.len);
fptr->wbuf.off = 0;
return len;
}

MEMMOVE(fptr->wbuf.ptr+fptr->wbuf.off+fptr->wbuf.len, ptr+offset, char, len);
fptr->wbuf.len += (int)len;

return len;
}

# define MODE_BTMODE(a,b,c) ((fmode & FMODE_BINMODE) ? (b) : \
Expand Down Expand Up @@ -1792,15 +1824,17 @@ io_fwrite(VALUE str, rb_io_t *fptr, int nosync)
VALUE tmp;
long n, len;
const char *ptr;

#ifdef _WIN32
if (fptr->mode & FMODE_TTY) {
long len = rb_w32_write_console(str, fptr->fd);
if (len > 0) return len;
long len = rb_w32_write_console(str, fptr->fd);
if (len > 0) return len;
}
#endif

str = do_writeconv(str, fptr, &converted);
if (converted)
OBJ_FREEZE(str);
OBJ_FREEZE(str);

tmp = rb_str_tmp_frozen_acquire(str);
RSTRING_GETMEM(tmp, ptr, len);
Expand Down Expand Up @@ -1830,10 +1864,12 @@ io_write(VALUE io, VALUE str, int nosync)
io = GetWriteIO(io);
str = rb_obj_as_string(str);
tmp = rb_io_check_io(io);

if (NIL_P(tmp)) {
/* port is not IO, call write method for it. */
return rb_funcall(io, id_write, 1, str);
/* port is not IO, call write method for it. */
return rb_funcall(io, id_write, 1, str);
}

io = tmp;
if (RSTRING_LEN(str) == 0) return INT2FIX(0);

Expand Down

0 comments on commit ce23cfa

Please sign in to comment.