Skip to content

Commit

Permalink
io_uring: add tests for automatic buffer selection
Browse files Browse the repository at this point in the history
  • Loading branch information
vrischmann committed May 20, 2022
1 parent 8ad71ba commit f2a1d6c
Showing 1 changed file with 384 additions and 0 deletions.
384 changes: 384 additions & 0 deletions lib/std/os/linux/io_uring.zig
Original file line number Diff line number Diff line change
Expand Up @@ -2830,6 +2830,390 @@ test "linkat" {
try testing.expectEqualStrings("hello", second_file_data[0..read]);
}

test "provide_buffers: read" {
if (builtin.os.tag != .linux) return error.SkipZigTest;

var ring = IO_Uring.init(1, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
};
defer ring.deinit();

const fd = try os.openZ("/dev/zero", os.O.RDONLY | os.O.CLOEXEC, 0);
defer os.close(fd);

const group_id = 1337;
const buffer_id = 0;

const buffer_len = 128;

var buffers: [4][buffer_len]u8 = undefined;

// Provide 4 buffers

{
const sqe = try ring.provide_buffers(0xcccccccc, @ptrCast([*]u8, &buffers), buffers.len, buffer_len, group_id, buffer_id);
try testing.expectEqual(linux.IORING_OP.PROVIDE_BUFFERS, sqe.opcode);
try testing.expectEqual(@as(i32, buffers.len), sqe.fd);
try testing.expectEqual(@as(u32, buffers[0].len), sqe.len);
try testing.expectEqual(@as(u16, group_id), sqe.buf_index);
try testing.expectEqual(@as(u32, 1), try ring.submit());

const cqe = try ring.copy_cqe();
switch (cqe.err()) {
// Happens when the kernel is < 5.7
.INVAL => return error.SkipZigTest,
.SUCCESS => {},
else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
}
try testing.expectEqual(@as(u64, 0xcccccccc), cqe.user_data);
}

// Do 4 reads which should consume all buffers

var i: usize = 0;
while (i < buffers.len) : (i += 1) {
var sqe = try ring.read(0xdededede, fd, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0);
try testing.expectEqual(linux.IORING_OP.READ, sqe.opcode);
try testing.expectEqual(@as(i32, fd), sqe.fd);
try testing.expectEqual(@as(u64, 0), sqe.addr);
try testing.expectEqual(@as(u32, buffer_len), sqe.len);
try testing.expectEqual(@as(u16, group_id), sqe.buf_index);
try testing.expectEqual(@as(u32, 1), try ring.submit());

const cqe = try ring.copy_cqe();
switch (cqe.err()) {
.SUCCESS => {},
else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
}

try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER);
const used_buffer_id = cqe.flags >> 16;
try testing.expect(used_buffer_id >= 0 and used_buffer_id <= 3);
try testing.expectEqual(@as(i32, buffer_len), cqe.res);

try testing.expectEqual(@as(u64, 0xdededede), cqe.user_data);
try testing.expectEqualSlices(u8, &([_]u8{0} ** buffer_len), buffers[used_buffer_id][0..@intCast(usize, cqe.res)]);
}

// This read should fail

{
var sqe = try ring.read(0xdfdfdfdf, fd, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0);
try testing.expectEqual(linux.IORING_OP.READ, sqe.opcode);
try testing.expectEqual(@as(i32, fd), sqe.fd);
try testing.expectEqual(@as(u64, 0), sqe.addr);
try testing.expectEqual(@as(u32, buffer_len), sqe.len);
try testing.expectEqual(@as(u16, group_id), sqe.buf_index);
try testing.expectEqual(@as(u32, 1), try ring.submit());

const cqe = try ring.copy_cqe();
switch (cqe.err()) {
// Expected
.NOBUFS => {},
.SUCCESS => std.debug.panic("unexpected success", .{}),
else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
}
try testing.expectEqual(@as(u64, 0xdfdfdfdf), cqe.user_data);
}

// Provide 1 buffer again

// Deliberately put something we don't expect in the buffers
mem.set(u8, mem.sliceAsBytes(&buffers), 42);

const reprovided_buffer_id = 2;

{
_ = try ring.provide_buffers(0xabababab, @ptrCast([*]u8, &buffers[reprovided_buffer_id]), 1, buffer_len, group_id, reprovided_buffer_id);
try testing.expectEqual(@as(u32, 1), try ring.submit());

const cqe = try ring.copy_cqe();
switch (cqe.err()) {
.SUCCESS => {},
else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
}
}

// Final read which should work

{
var sqe = try ring.read(0xdfdfdfdf, fd, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0);
try testing.expectEqual(linux.IORING_OP.READ, sqe.opcode);
try testing.expectEqual(@as(i32, fd), sqe.fd);
try testing.expectEqual(@as(u64, 0), sqe.addr);
try testing.expectEqual(@as(u32, buffer_len), sqe.len);
try testing.expectEqual(@as(u16, group_id), sqe.buf_index);
try testing.expectEqual(@as(u32, 1), try ring.submit());

const cqe = try ring.copy_cqe();
switch (cqe.err()) {
.SUCCESS => {},
else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
}

try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER);
const used_buffer_id = cqe.flags >> 16;
try testing.expectEqual(used_buffer_id, reprovided_buffer_id);
try testing.expectEqual(@as(i32, buffer_len), cqe.res);
try testing.expectEqual(@as(u64, 0xdfdfdfdf), cqe.user_data);
try testing.expectEqualSlices(u8, &([_]u8{0} ** buffer_len), buffers[used_buffer_id][0..@intCast(usize, cqe.res)]);
}
}

test "remove_buffers" {
if (builtin.os.tag != .linux) return error.SkipZigTest;

var ring = IO_Uring.init(1, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
};
defer ring.deinit();

const fd = try os.openZ("/dev/zero", os.O.RDONLY | os.O.CLOEXEC, 0);
defer os.close(fd);

const group_id = 1337;
const buffer_id = 0;

const buffer_len = 128;

var buffers: [4][buffer_len]u8 = undefined;

// Provide 4 buffers

{
_ = try ring.provide_buffers(0xcccccccc, @ptrCast([*]u8, &buffers), buffers.len, buffer_len, group_id, buffer_id);
try testing.expectEqual(@as(u32, 1), try ring.submit());

const cqe = try ring.copy_cqe();
switch (cqe.err()) {
.SUCCESS => {},
else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
}
try testing.expectEqual(@as(u64, 0xcccccccc), cqe.user_data);
}

// Remove the first 3 buffers

{
var sqe = try ring.remove_buffers(0xbababababa, 3, group_id);
try testing.expectEqual(linux.IORING_OP.REMOVE_BUFFERS, sqe.opcode);
try testing.expectEqual(@as(i32, 3), sqe.fd);
try testing.expectEqual(@as(u64, 0), sqe.addr);
try testing.expectEqual(@as(u16, group_id), sqe.buf_index);
try testing.expectEqual(@as(u32, 1), try ring.submit());

const cqe = try ring.copy_cqe();
switch (cqe.err()) {
.SUCCESS => {},
else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
}
try testing.expectEqual(@as(u64, 0xbababababa), cqe.user_data);
}

// This read should work

{
_ = try ring.read(0xdfdfdfdf, fd, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0);
try testing.expectEqual(@as(u32, 1), try ring.submit());

const cqe = try ring.copy_cqe();
switch (cqe.err()) {
.SUCCESS => {},
else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
}

try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER);
const used_buffer_id = cqe.flags >> 16;
try testing.expectEqual(used_buffer_id, 0);
try testing.expectEqual(@as(i32, buffer_len), cqe.res);
try testing.expectEqual(@as(u64, 0xdfdfdfdf), cqe.user_data);
try testing.expectEqualSlices(u8, &([_]u8{0} ** buffer_len), buffers[used_buffer_id][0..@intCast(usize, cqe.res)]);
}

// Final read should _not_ work

{
_ = try ring.read(0xdfdfdfdf, fd, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0);
try testing.expectEqual(@as(u32, 1), try ring.submit());

const cqe = try ring.copy_cqe();
switch (cqe.err()) {
// Expected
.NOBUFS => {},
.SUCCESS => std.debug.panic("unexpected success", .{}),
else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
}
}
}

test "provide_buffers: accept/connect/send/recv" {
if (builtin.os.tag != .linux) return error.SkipZigTest;

var ring = IO_Uring.init(16, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
};
defer ring.deinit();

const group_id = 1337;
const buffer_id = 0;

const buffer_len = 128;
var buffers: [4][buffer_len]u8 = undefined;

// Provide 4 buffers

{
const sqe = try ring.provide_buffers(0xcccccccc, @ptrCast([*]u8, &buffers), buffers.len, buffer_len, group_id, buffer_id);
try testing.expectEqual(linux.IORING_OP.PROVIDE_BUFFERS, sqe.opcode);
try testing.expectEqual(@as(i32, buffers.len), sqe.fd);
try testing.expectEqual(@as(u32, buffer_len), sqe.len);
try testing.expectEqual(@as(u16, group_id), sqe.buf_index);
try testing.expectEqual(@as(u32, 1), try ring.submit());

const cqe = try ring.copy_cqe();
switch (cqe.err()) {
// Happens when the kernel is < 5.7
.INVAL => return error.SkipZigTest,
.SUCCESS => {},
else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
}
try testing.expectEqual(@as(u64, 0xcccccccc), cqe.user_data);
}

const socket_test_harness = try createSocketTestHarness(&ring);
defer socket_test_harness.close();

// Do 4 send on the socket

{
var i: usize = 0;
while (i < buffers.len) : (i += 1) {
_ = try ring.send(0xdeaddead, socket_test_harness.server, &([_]u8{'z'} ** buffer_len), 0);
try testing.expectEqual(@as(u32, 1), try ring.submit());
}

var cqes: [4]linux.io_uring_cqe = undefined;
try testing.expectEqual(@as(u32, 4), try ring.copy_cqes(&cqes, 4));
}

// Do 4 recv which should consume all buffers

// Deliberately put something we don't expect in the buffers
mem.set(u8, mem.sliceAsBytes(&buffers), 1);

var i: usize = 0;
while (i < buffers.len) : (i += 1) {
var sqe = try ring.recv(0xdededede, socket_test_harness.client, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0);
try testing.expectEqual(linux.IORING_OP.RECV, sqe.opcode);
try testing.expectEqual(@as(i32, socket_test_harness.client), sqe.fd);
try testing.expectEqual(@as(u64, 0), sqe.addr);
try testing.expectEqual(@as(u32, buffer_len), sqe.len);
try testing.expectEqual(@as(u16, group_id), sqe.buf_index);
try testing.expectEqual(@as(u32, 0), sqe.rw_flags);
try testing.expectEqual(@as(u32, linux.IOSQE_BUFFER_SELECT), sqe.flags);
try testing.expectEqual(@as(u32, 1), try ring.submit());

const cqe = try ring.copy_cqe();
switch (cqe.err()) {
.SUCCESS => {},
else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
}

try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER);
const used_buffer_id = cqe.flags >> 16;
try testing.expect(used_buffer_id >= 0 and used_buffer_id <= 3);
try testing.expectEqual(@as(i32, buffer_len), cqe.res);

try testing.expectEqual(@as(u64, 0xdededede), cqe.user_data);
const buffer = buffers[used_buffer_id][0..@intCast(usize, cqe.res)];
try testing.expectEqualSlices(u8, &([_]u8{'z'} ** buffer_len), buffer);
}

// This recv should fail

{
var sqe = try ring.recv(0xdfdfdfdf, socket_test_harness.client, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0);
try testing.expectEqual(linux.IORING_OP.RECV, sqe.opcode);
try testing.expectEqual(@as(i32, socket_test_harness.client), sqe.fd);
try testing.expectEqual(@as(u64, 0), sqe.addr);
try testing.expectEqual(@as(u32, buffer_len), sqe.len);
try testing.expectEqual(@as(u16, group_id), sqe.buf_index);
try testing.expectEqual(@as(u32, 0), sqe.rw_flags);
try testing.expectEqual(@as(u32, linux.IOSQE_BUFFER_SELECT), sqe.flags);
try testing.expectEqual(@as(u32, 1), try ring.submit());

const cqe = try ring.copy_cqe();
switch (cqe.err()) {
// Expected
.NOBUFS => {},
.SUCCESS => std.debug.panic("unexpected success", .{}),
else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
}
try testing.expectEqual(@as(u64, 0xdfdfdfdf), cqe.user_data);
}

// Provide 1 buffer again

const reprovided_buffer_id = 2;

{
_ = try ring.provide_buffers(0xabababab, @ptrCast([*]u8, &buffers[reprovided_buffer_id]), 1, buffer_len, group_id, reprovided_buffer_id);
try testing.expectEqual(@as(u32, 1), try ring.submit());

const cqe = try ring.copy_cqe();
switch (cqe.err()) {
.SUCCESS => {},
else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
}
}

// Redo 1 send on the server socket

{
_ = try ring.send(0xdeaddead, socket_test_harness.server, &([_]u8{'w'} ** buffer_len), 0);
try testing.expectEqual(@as(u32, 1), try ring.submit());

_ = try ring.copy_cqe();
}

// Final recv which should work

// Deliberately put something we don't expect in the buffers
mem.set(u8, mem.sliceAsBytes(&buffers), 1);

{
var sqe = try ring.recv(0xdfdfdfdf, socket_test_harness.client, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0);
try testing.expectEqual(linux.IORING_OP.RECV, sqe.opcode);
try testing.expectEqual(@as(i32, socket_test_harness.client), sqe.fd);
try testing.expectEqual(@as(u64, 0), sqe.addr);
try testing.expectEqual(@as(u32, buffer_len), sqe.len);
try testing.expectEqual(@as(u16, group_id), sqe.buf_index);
try testing.expectEqual(@as(u32, 0), sqe.rw_flags);
try testing.expectEqual(@as(u32, linux.IOSQE_BUFFER_SELECT), sqe.flags);
try testing.expectEqual(@as(u32, 1), try ring.submit());

const cqe = try ring.copy_cqe();
switch (cqe.err()) {
.SUCCESS => {},
else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
}

try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER);
const used_buffer_id = cqe.flags >> 16;
try testing.expectEqual(used_buffer_id, reprovided_buffer_id);
try testing.expectEqual(@as(i32, buffer_len), cqe.res);
try testing.expectEqual(@as(u64, 0xdfdfdfdf), cqe.user_data);
const buffer = buffers[used_buffer_id][0..@intCast(usize, cqe.res)];
try testing.expectEqualSlices(u8, &([_]u8{'w'} ** buffer_len), buffer);
}
}

/// Used for testing server/client interactions.
const SocketTestHarness = struct {
listener: os.socket_t,
Expand Down

0 comments on commit f2a1d6c

Please sign in to comment.