Skip to content

Commit

Permalink
breaking: std.os read/write functions + sendfile
Browse files Browse the repository at this point in the history
 * rework os.sendfile and add macosx support, and a fallback
   implementation for any OS.
 * fix sendto compile error
 * std.os write functions support partial writes. closes #3443.
 * std.os pread / pwrite functions can now return `error.Unseekable`.
 * std.fs.File read/write functions now have readAll/writeAll variants
   which loop to complete operations even when partial reads/writes
   happen.
 * Audit std.os read/write functions with respect to Linux returning
   EINVAL for lengths greater than 0x7fff0000.
 * std.os read/write shim functions do not unnecessarily loop. Since
   partial reads/writes are part of the API, the caller will be forced
   to loop anyway, and so that would just be code bloat.
 * Improve doc comments
 * Add a non-trivial test for std.os.sendfile
 * Fix std.os.pread on 32 bit Linux
 * Add missing SYS_sendfile bit on aarch64
  • Loading branch information
andrewrk committed Mar 3, 2020
1 parent bd287dd commit c81345c
Show file tree
Hide file tree
Showing 20 changed files with 816 additions and 352 deletions.
2 changes: 1 addition & 1 deletion lib/std/c.zig
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ pub extern "c" fn sendto(
buf: *const c_void,
len: usize,
flags: u32,
dest_addr: *const sockaddr,
dest_addr: ?*const sockaddr,
addrlen: socklen_t,
) isize;

Expand Down
16 changes: 16 additions & 0 deletions lib/std/c/darwin.zig
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,22 @@ pub extern "c" fn clock_get_time(clock_serv: clock_serv_t, cur_time: *mach_times
pub extern "c" fn host_get_clock_service(host: host_t, clock_id: clock_id_t, clock_serv: ?[*]clock_serv_t) kern_return_t;
pub extern "c" fn mach_port_deallocate(task: ipc_space_t, name: mach_port_name_t) kern_return_t;

pub const sf_hdtr = extern struct {
headers: [*]iovec_const,
hdr_cnt: c_int,
trailers: [*]iovec_const,
trl_cnt: c_int,
};

pub extern "c" fn sendfile(
out_fd: fd_t,
in_fd: fd_t,
offset: off_t,
len: *off_t,
sf_hdtr: ?*sf_hdtr,
flags: u32,
) c_int;

pub fn sigaddset(set: *sigset_t, signo: u5) void {
set.* |= @as(u32, 1) << (signo - 1);
}
Expand Down
10 changes: 9 additions & 1 deletion lib/std/c/freebsd.zig
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,15 @@ pub const sf_hdtr = extern struct {
trailers: [*]iovec_const,
trl_cnt: c_int,
};
pub extern "c" fn sendfile(fd: c_int, s: c_int, offset: u64, nbytes: usize, sf_hdtr: ?*sf_hdtr, sbytes: ?*u64, flags: c_int) c_int;
pub extern "c" fn sendfile(
out_fd: fd_t,
in_fd: fd_t,
offset: ?*off_t,
nbytes: usize,
sf_hdtr: ?*sf_hdtr,
sbytes: ?*off_t,
flags: u32,
) c_int;

pub const dl_iterate_phdr_callback = extern fn (info: *dl_phdr_info, size: usize, data: ?*c_void) c_int;
pub extern "c" fn dl_iterate_phdr(callback: dl_iterate_phdr_callback, data: ?*c_void) c_int;
Expand Down
7 changes: 7 additions & 0 deletions lib/std/c/linux.zig
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ pub extern "c" fn sigaltstack(ss: ?*stack_t, old_ss: ?*stack_t) c_int;

pub extern "c" fn memfd_create(name: [*:0]const u8, flags: c_uint) c_int;

pub extern "c" fn sendfile(
out_fd: fd_t,
in_fd: fd_t,
offset: ?*off_t,
count: usize,
) isize;

pub const pthread_attr_t = extern struct {
__size: [56]u8,
__align: c_long,
Expand Down
22 changes: 12 additions & 10 deletions lib/std/event/loop.zig
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,8 @@ pub const Loop = struct {
var extra_thread_index: usize = 0;
errdefer {
// writing 8 bytes to an eventfd cannot fail
noasync os.write(self.os_data.final_eventfd, &wakeup_bytes) catch unreachable;
const amt = noasync os.write(self.os_data.final_eventfd, &wakeup_bytes) catch unreachable;
assert(amt == wakeup_bytes.len);
while (extra_thread_index != 0) {
extra_thread_index -= 1;
self.extra_threads[extra_thread_index].wait();
Expand Down Expand Up @@ -682,7 +683,8 @@ pub const Loop = struct {
.linux => {
self.posixFsRequest(&self.os_data.fs_end_request);
// writing 8 bytes to an eventfd cannot fail
noasync os.write(self.os_data.final_eventfd, &wakeup_bytes) catch unreachable;
const amt = noasync os.write(self.os_data.final_eventfd, &wakeup_bytes) catch unreachable;
assert(amt == wakeup_bytes.len);
return;
},
.macosx, .freebsd, .netbsd, .dragonfly => {
Expand Down Expand Up @@ -831,7 +833,7 @@ pub const Loop = struct {

/// Performs an async `os.write` using a separate thread.
/// `fd` must block and not return EAGAIN.
pub fn write(self: *Loop, fd: os.fd_t, bytes: []const u8) os.WriteError!void {
pub fn write(self: *Loop, fd: os.fd_t, bytes: []const u8) os.WriteError!usize {
var req_node = Request.Node{
.data = .{
.msg = .{
Expand All @@ -852,7 +854,7 @@ pub const Loop = struct {

/// Performs an async `os.writev` using a separate thread.
/// `fd` must block and not return EAGAIN.
pub fn writev(self: *Loop, fd: os.fd_t, iov: []const os.iovec_const) os.WriteError!void {
pub fn writev(self: *Loop, fd: os.fd_t, iov: []const os.iovec_const) os.WriteError!usize {
var req_node = Request.Node{
.data = .{
.msg = .{
Expand All @@ -873,7 +875,7 @@ pub const Loop = struct {

/// Performs an async `os.pwritev` using a separate thread.
/// `fd` must block and not return EAGAIN.
pub fn pwritev(self: *Loop, fd: os.fd_t, iov: []const os.iovec_const, offset: u64) os.WriteError!void {
pub fn pwritev(self: *Loop, fd: os.fd_t, iov: []const os.iovec_const, offset: u64) os.WriteError!usize {
var req_node = Request.Node{
.data = .{
.msg = .{
Expand Down Expand Up @@ -1137,15 +1139,15 @@ pub const Loop = struct {
pub const Write = struct {
fd: os.fd_t,
bytes: []const u8,
result: Error!void,
result: Error!usize,

pub const Error = os.WriteError;
};

pub const WriteV = struct {
fd: os.fd_t,
iov: []const os.iovec_const,
result: Error!void,
result: Error!usize,

pub const Error = os.WriteError;
};
Expand All @@ -1154,9 +1156,9 @@ pub const Loop = struct {
fd: os.fd_t,
iov: []const os.iovec_const,
offset: usize,
result: Error!void,
result: Error!usize,

pub const Error = os.WriteError;
pub const Error = os.PWriteError;
};

pub const PReadV = struct {
Expand All @@ -1165,7 +1167,7 @@ pub const Loop = struct {
offset: usize,
result: Error!usize,

pub const Error = os.ReadError;
pub const Error = os.PReadError;
};

pub const Open = struct {
Expand Down
4 changes: 2 additions & 2 deletions lib/std/fs.zig
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ pub fn updateFileMode(source_path: []const u8, dest_path: []const u8, mode: ?Fil
var buf: [mem.page_size * 6]u8 = undefined;
while (true) {
const amt = try in_stream.readFull(buf[0..]);
try atomic_file.file.write(buf[0..amt]);
try atomic_file.file.writeAll(buf[0..amt]);
if (amt != buf.len) {
try atomic_file.file.updateTimes(src_stat.atime, src_stat.mtime);
try atomic_file.finish();
Expand Down Expand Up @@ -1329,7 +1329,7 @@ pub const Dir = struct {
pub fn writeFile(self: Dir, sub_path: []const u8, data: []const u8) !void {
var file = try self.createFile(sub_path, .{});
defer file.close();
try file.write(data);
try file.writeAll(data);
}

pub const AccessError = os.AccessError;
Expand Down
140 changes: 123 additions & 17 deletions lib/std/fs/file.zig
Original file line number Diff line number Diff line change
Expand Up @@ -228,63 +228,169 @@ pub const File = struct {
}

pub const ReadError = os.ReadError;
pub const PReadError = os.PReadError;

pub fn read(self: File, buffer: []u8) ReadError!usize {
if (need_async_thread and self.io_mode == .blocking and !self.async_block_allowed) {
return std.event.Loop.instance.?.read(self.handle, buffer);
} else {
return os.read(self.handle, buffer);
}
return os.read(self.handle, buffer);
}

pub fn pread(self: File, buffer: []u8, offset: u64) ReadError!usize {
pub fn readAll(self: File, buffer: []u8) ReadError!void {
var index: usize = 0;
while (index < buffer.len) {
index += try self.read(buffer[index..]);
}
}

pub fn pread(self: File, buffer: []u8, offset: u64) PReadError!usize {
if (need_async_thread and self.io_mode == .blocking and !self.async_block_allowed) {
return std.event.Loop.instance.?.pread(self.handle, buffer);
return std.event.Loop.instance.?.pread(self.handle, buffer, offset);
} else {
return os.pread(self.handle, buffer, offset);
}
}

pub fn preadAll(self: File, buffer: []u8, offset: u64) PReadError!void {
var index: usize = 0;
while (index < buffer.len) {
index += try self.pread(buffer[index..], offset + index);
}
return os.pread(self.handle, buffer, offset);
}

pub fn readv(self: File, iovecs: []const os.iovec) ReadError!usize {
if (need_async_thread and self.io_mode == .blocking and !self.async_block_allowed) {
return std.event.Loop.instance.?.readv(self.handle, iovecs);
} else {
return os.readv(self.handle, iovecs);
}
return os.readv(self.handle, iovecs);
}

pub fn preadv(self: File, iovecs: []const os.iovec, offset: u64) ReadError!usize {
/// The `iovecs` parameter is mutable because this function needs to mutate the fields in
/// order to handle partial reads from the underlying OS layer.
pub fn readvAll(self: File, iovecs: []os.iovec) ReadError!void {
var i: usize = 0;
while (true) {
var amt = try self.readv(iovecs[i..]);
while (amt >= iovecs[i].iov_len) {
amt -= iovecs[i].iov_len;
i += 1;
if (i >= iovecs.len) return;
}
iovecs[i].iov_base += amt;
iovecs[i].iov_len -= amt;
}
}

pub fn preadv(self: File, iovecs: []const os.iovec, offset: u64) PReadError!usize {
if (need_async_thread and self.io_mode == .blocking and !self.async_block_allowed) {
return std.event.Loop.instance.?.preadv(self.handle, iovecs, offset);
} else {
return os.preadv(self.handle, iovecs, offset);
}
}

/// The `iovecs` parameter is mutable because this function needs to mutate the fields in
/// order to handle partial reads from the underlying OS layer.
pub fn preadvAll(self: File, iovecs: []const os.iovec, offset: u64) PReadError!void {
var i: usize = 0;
var off: usize = 0;
while (true) {
var amt = try self.preadv(iovecs[i..], offset + off);
off += amt;
while (amt >= iovecs[i].iov_len) {
amt -= iovecs[i].iov_len;
i += 1;
if (i >= iovecs.len) return;
}
iovecs[i].iov_base += amt;
iovecs[i].iov_len -= amt;
}
return os.preadv(self.handle, iovecs, offset);
}

pub const WriteError = os.WriteError;
pub const PWriteError = os.PWriteError;

pub fn write(self: File, bytes: []const u8) WriteError!void {
pub fn write(self: File, bytes: []const u8) WriteError!usize {
if (need_async_thread and self.io_mode == .blocking and !self.async_block_allowed) {
return std.event.Loop.instance.?.write(self.handle, bytes);
} else {
return os.write(self.handle, bytes);
}
return os.write(self.handle, bytes);
}

pub fn pwrite(self: File, bytes: []const u8, offset: u64) WriteError!void {
pub fn writeAll(self: File, bytes: []const u8) WriteError!void {
var index: usize = 0;
while (index < bytes.len) {
index += try self.write(bytes[index..]);
}
}

pub fn pwrite(self: File, bytes: []const u8, offset: u64) PWriteError!usize {
if (need_async_thread and self.io_mode == .blocking and !self.async_block_allowed) {
return std.event.Loop.instance.?.pwrite(self.handle, bytes, offset);
} else {
return os.pwrite(self.handle, bytes, offset);
}
return os.pwrite(self.handle, bytes, offset);
}

pub fn writev(self: File, iovecs: []const os.iovec_const) WriteError!void {
pub fn pwriteAll(self: File, bytes: []const u8, offset: u64) PWriteError!void {
var index: usize = 0;
while (index < bytes.len) {
index += try self.pwrite(bytes[index..], offset + index);
}
}

pub fn writev(self: File, iovecs: []const os.iovec_const) WriteError!usize {
if (need_async_thread and self.io_mode == .blocking and !self.async_block_allowed) {
return std.event.Loop.instance.?.writev(self.handle, iovecs);
} else {
return os.writev(self.handle, iovecs);
}
return os.writev(self.handle, iovecs);
}

pub fn pwritev(self: File, iovecs: []const os.iovec_const, offset: usize) WriteError!void {
/// The `iovecs` parameter is mutable because this function needs to mutate the fields in
/// order to handle partial writes from the underlying OS layer.
pub fn writevAll(self: File, iovecs: []os.iovec_const) WriteError!void {
var i: usize = 0;
while (true) {
var amt = try self.writev(iovecs[i..]);
while (amt >= iovecs[i].iov_len) {
amt -= iovecs[i].iov_len;
i += 1;
if (i >= iovecs.len) return;
}
iovecs[i].iov_base += amt;
iovecs[i].iov_len -= amt;
}
}

pub fn pwritev(self: File, iovecs: []os.iovec_const, offset: usize) PWriteError!usize {
if (need_async_thread and self.io_mode == .blocking and !self.async_block_allowed) {
return std.event.Loop.instance.?.pwritev(self.handle, iovecs);
return std.event.Loop.instance.?.pwritev(self.handle, iovecs, offset);
} else {
return os.pwritev(self.handle, iovecs, offset);
}
}

/// The `iovecs` parameter is mutable because this function needs to mutate the fields in
/// order to handle partial writes from the underlying OS layer.
pub fn pwritevAll(self: File, iovecs: []os.iovec_const, offset: usize) PWriteError!void {
var i: usize = 0;
var off: usize = 0;
while (true) {
var amt = try self.pwritev(iovecs[i..], offset + off);
off += amt;
while (amt >= iovecs[i].iov_len) {
amt -= iovecs[i].iov_len;
i += 1;
if (i >= iovecs.len) return;
}
iovecs[i].iov_base += amt;
iovecs[i].iov_len -= amt;
}
return os.pwritev(self.handle, iovecs);
}

pub fn inStream(file: File) InStream {
Expand Down Expand Up @@ -335,7 +441,7 @@ pub const File = struct {
pub const Error = WriteError;
pub const Stream = io.OutStream(Error);

fn writeFn(out_stream: *Stream, bytes: []const u8) Error!void {
fn writeFn(out_stream: *Stream, bytes: []const u8) Error!usize {
const self = @fieldParentPtr(OutStream, "stream", out_stream);
return self.file.write(bytes);
}
Expand Down

0 comments on commit c81345c

Please sign in to comment.