Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vsr: fix races in client_replies #1515

Merged
merged 1 commit into from Feb 5, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
53 changes: 46 additions & 7 deletions src/vsr/client_replies.zig
Expand Up @@ -52,7 +52,7 @@ pub fn ClientRepliesType(comptime Storage: type) type {
const Read = struct {
client_replies: *ClientReplies,
completion: Storage.Read,
callback: *const fn (
callback: ?*const fn (
client_replies: *ClientReplies,
reply_header: *const vsr.Header.Reply,
reply: ?*Message.Reply,
Expand Down Expand Up @@ -233,7 +233,7 @@ pub fn ClientRepliesType(comptime Storage: type) type {
const client_replies = read.client_replies;
const header = read.header;
const message = read.message;
const callback = read.callback;
const callback_or_null = read.callback;
const destination_replica = read.destination_replica;

client_replies.reads.release(read);
Expand All @@ -243,6 +243,15 @@ pub fn ClientRepliesType(comptime Storage: type) type {
client_replies.write_reply_next();
}

const callback = callback_or_null orelse {
log.debug("{}: read_reply: already resolved (client={} reply={})", .{
client_replies.replica,
header.client,
header.checksum,
});
return;
};

if (!message.header.valid_checksum() or
!message.header.valid_checksum_body(message.body()))
{
Expand Down Expand Up @@ -338,6 +347,26 @@ pub fn ClientRepliesType(comptime Storage: type) type {
},
}

// Resolve any pending reads for this reply.
// If we don't do this, an earlier started read can complete with an error, and
// erroneously clobber the faulty bit.
// For simplicity, resolve the reads synchronously, instead of going through next tick
// machinery.
var reads = client_replies.reads.iterate();
while (reads.next()) |read| {
if (read.callback == null) continue; // Already resolved.
if (read.header.checksum == message.header.checksum) {
defer read.callback = null;

read.callback.?(
client_replies,
&read.header,
message,
read.destination_replica,
);
}
}

// Clear the fault *before* the write completes, not after.
// Otherwise, a replica exiting state sync might mark a reply as faulty, then the
// ClientReplies clears that bit due to an unrelated write that was already queued.
Expand Down Expand Up @@ -365,17 +394,14 @@ pub fn ClientRepliesType(comptime Storage: type) type {
client_replies.write_queue.push_assume_capacity(write);
client_replies.write_reply_next();
}

assert(client_replies.writing.isSet(write.slot.index));
}

fn write_reply_next(client_replies: *ClientReplies) void {
while (client_replies.write_queue.head()) |write| {
if (client_replies.writing.isSet(write.slot.index)) return;

var reads = client_replies.reads.iterate();
while (reads.next()) |read| {
if (read.slot.index == write.slot.index) return;
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe in write_reply_callback we could iterate the reads and assert that none of them are to the slot that we just wrote?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There actually can be races after the state sync.

TIL: in our testing storage, we forbid write-write races, but allow write-read races. Which ... seems fine? At least in this case.

const message = write.message;
_ = client_replies.write_queue.pop();

Expand All @@ -402,6 +428,19 @@ pub fn ClientRepliesType(comptime Storage: type) type {
assert(client_replies.writing.isSet(write.slot.index));
maybe(client_replies.faulty.isSet(write.slot.index));

var reads = client_replies.reads.iterate();
while (reads.next()) |read| {
if (read.slot.index == write.slot.index) {
if (read.header.checksum == message.header.checksum) {
assert(read.callback == null);
} else {
// A read and a write can race on the slot if:
// - the write is from before the latest state sync (outdated write)
// - the read is from before the write (outdated read)
}
}
}

log.debug("{}: write_reply: wrote (client={} request={})", .{
client_replies.replica,
message.header.client,
Expand Down