Skip to content

Commit

Permalink
vsr: fix races in client_replies
Browse files Browse the repository at this point in the history
- it might be the case that `writing` bitset is not set, but there's a
  write for the slot in the queue: replies can wait on read to complete.
- when a faulty read completes, it might clobber faulty bit, unset by a
  a write which was scheduled after the read.

The specific series of events here:

1. Replica receives a RequestReply and starts a reply read
2. The read completes with a failure, replica sets the faulty bit
3. Replica receives RequestReply starts a reply read
4. Replica receives Reply and starts a reply write
    - the write unsets faulty bit
    - the write doesn't start, because there's a read executing
4. The read completes, setting the faulty bit _again_
5. Replica receives RequestReply
   - It _doesn't_ start reply read, because there's an in-progress write
     that can resolve a read.
   - But the faulty bit is set, tripping up an assertion.

The root issue here is the race between a read and a write for the same
reply. Remove the race by explicitly handling the interleaving:

* When submitting a read, resolve it immediately if there's a pending
  write (this was already handled by `read_reply_sync`)
* When submitting a write, resolve any pending reads for the same reply.
* Remove the code to block the write while the read is in-progress, as
  this is no longer possible.

Note that it is still possible that a read and a write for the same slot
race, if they target different replies. In this case, there won't be
clobbering, as, when the read completes, we double-check freshness by
consulting `client_sessions`.

SEED: 2517747396662708227
Closes: #1511
  • Loading branch information
matklad committed Feb 5, 2024
1 parent 7f92126 commit 9db2919
Showing 1 changed file with 44 additions and 7 deletions.
51 changes: 44 additions & 7 deletions src/vsr/client_replies.zig
Expand Up @@ -52,7 +52,7 @@ pub fn ClientRepliesType(comptime Storage: type) type {
const Read = struct {
client_replies: *ClientReplies,
completion: Storage.Read,
callback: *const fn (
callback: ?*const fn (
client_replies: *ClientReplies,
reply_header: *const vsr.Header.Reply,
reply: ?*Message.Reply,
Expand Down Expand Up @@ -233,7 +233,7 @@ pub fn ClientRepliesType(comptime Storage: type) type {
const client_replies = read.client_replies;
const header = read.header;
const message = read.message;
const callback = read.callback;
const callback_or_null = read.callback;
const destination_replica = read.destination_replica;

client_replies.reads.release(read);
Expand All @@ -243,6 +243,15 @@ pub fn ClientRepliesType(comptime Storage: type) type {
client_replies.write_reply_next();
}

const callback = callback_or_null orelse {
log.debug("{}: read_reply: already resolved (client={} reply={})", .{
client_replies.replica,
header.client,
header.checksum,
});
return;
};

if (!message.header.valid_checksum() or
!message.header.valid_checksum_body(message.body()))
{
Expand Down Expand Up @@ -338,6 +347,26 @@ pub fn ClientRepliesType(comptime Storage: type) type {
},
}

// Resolve any pending reads for this reply.
// If we don't do this, an earlier started read can complete with an error, and
// erroneously clobber the faulty bit.
// For simplicity, resolve the reads synchronously, instead of going through next tick
// machinery.
var reads = client_replies.reads.iterate();
while (reads.next()) |read| {
if (read.callback == null) continue; // Already resolved.
if (read.header.checksum == message.header.checksum) {
defer read.callback = null;

read.callback.?(
client_replies,
&read.header,
message,
read.destination_replica,
);
}
}

// Clear the fault *before* the write completes, not after.
// Otherwise, a replica exiting state sync might mark a reply as faulty, then the
// ClientReplies clears that bit due to an unrelated write that was already queued.
Expand Down Expand Up @@ -365,17 +394,14 @@ pub fn ClientRepliesType(comptime Storage: type) type {
client_replies.write_queue.push_assume_capacity(write);
client_replies.write_reply_next();
}

assert(client_replies.writing.isSet(write.slot.index));
}

fn write_reply_next(client_replies: *ClientReplies) void {
while (client_replies.write_queue.head()) |write| {
if (client_replies.writing.isSet(write.slot.index)) return;

var reads = client_replies.reads.iterate();
while (reads.next()) |read| {
if (read.slot.index == write.slot.index) return;
}

const message = write.message;
_ = client_replies.write_queue.pop();

Expand All @@ -402,6 +428,17 @@ pub fn ClientRepliesType(comptime Storage: type) type {
assert(client_replies.writing.isSet(write.slot.index));
maybe(client_replies.faulty.isSet(write.slot.index));

var reads = client_replies.reads.iterate();
while (reads.next()) |read| {
if (read.slot.index == write.slot.index) {
if (read.header.checksum == message.header.checksum) {
assert(read.callback == null);
} else {
// A read and a write can race on the slot after state sync.
}
}
}

log.debug("{}: write_reply: wrote (client={} request={})", .{
client_replies.replica,
message.header.client,
Expand Down

0 comments on commit 9db2919

Please sign in to comment.