Skip to content

Commit

Permalink
vsr: fix races in client_replies
Browse files Browse the repository at this point in the history
- it might be the case that `writing` bitset is not set, but there's a
  write for the slot in the queue: replies can wait on read to complete.
- when a faulty read completes, it might clobber faulty bit, unset by a
  a write which was scheduled after the read.

The specific series of events here:

1. Replica receives a RequestReply and starts a reply read
2. The read completes with a failure, replica sets the faulty bit
3. Replica receives RequestReply starts a reply read
4. Replica receives Reply and starts a reply write
    - the write unsets faulty bit
    - the write doesn't start, because there's a read executing
4. The read completes, setting the faulty bit _again_
5. Replica receives RequestReply
   - It _doesn't_ start reply read, because there's an in-progress write
     that can resolve a read.
   - But the faulty bit is set, tripping up an assertion.

The root issue here is the race between a read and a write for the same
reply. Remove the race by explicitly handling the interleaving:

* When submitting a read, resolve it immediately if there's a pending
  write (this was already handled by `read_reply_sync`)
* When submitting a write, resolve any pending reads for the same reply.
* Remove the code to block the write while the read is in-progress, as
  this is no longer possible.

Note that it is still possible that a read and a write for the same slot
race, if they target different replies. In this case, there won't be
clobbering, as, when the read completes, we double-check freshness by
consulting `client_sessions`.

SEED: 2517747396662708227
Closes: #1511
  • Loading branch information
matklad committed Feb 5, 2024
1 parent 7f92126 commit 0be16be
Showing 1 changed file with 33 additions and 10 deletions.
43 changes: 33 additions & 10 deletions src/vsr/client_replies.zig
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub fn ClientRepliesType(comptime Storage: type) type {
const Read = struct {
client_replies: *ClientReplies,
completion: Storage.Read,
callback: *const fn (
callback: ?*const fn (
client_replies: *ClientReplies,
reply_header: *const vsr.Header.Reply,
reply: ?*Message.Reply,
Expand Down Expand Up @@ -243,6 +243,15 @@ pub fn ClientRepliesType(comptime Storage: type) type {
client_replies.write_reply_next();
}

if (callback == null) {
log.warn("{}: read_reply: already resolved (client={} reply={})", .{
client_replies.replica,
header.client,
header.checksum,
});
return;
}

if (!message.header.valid_checksum() or
!message.header.valid_checksum_body(message.body()))
{
Expand All @@ -252,7 +261,7 @@ pub fn ClientRepliesType(comptime Storage: type) type {
header.checksum,
});

callback(client_replies, &header, null, destination_replica);
callback.?(client_replies, &header, null, destination_replica);
return;
}

Expand All @@ -268,7 +277,7 @@ pub fn ClientRepliesType(comptime Storage: type) type {
message.header.checksum,
});

callback(client_replies, &header, null, destination_replica);
callback.?(client_replies, &header, null, destination_replica);
return;
}

Expand All @@ -281,7 +290,7 @@ pub fn ClientRepliesType(comptime Storage: type) type {
header.checksum,
});

callback(client_replies, &header, message, destination_replica);
callback.?(client_replies, &header, message, destination_replica);
}

pub fn ready_sync(client_replies: *ClientReplies) bool {
Expand Down Expand Up @@ -338,6 +347,26 @@ pub fn ClientRepliesType(comptime Storage: type) type {
},
}

// Resolve any pending reads for this reply.
// If we don't do this, an earlier started read can complete with an error, and
// erroneously clobber the faulty bit.
// For simplicity, resolve the reads synchronously, instead of going through next tick
// machinery.
var reads = client_replies.reads.iterate();
while (reads.next()) |read| {
if (read.callback == null) continue; // Already resolved.
if (read.header.checksum == message.header.checksum) {
defer read.callback = null;

read.callback.?(
client_replies,
&read.header,
message,
read.destination_replica,
);
}
}

// Clear the fault *before* the write completes, not after.
// Otherwise, a replica exiting state sync might mark a reply as faulty, then the
// ClientReplies clears that bit due to an unrelated write that was already queued.
Expand Down Expand Up @@ -371,19 +400,13 @@ pub fn ClientRepliesType(comptime Storage: type) type {
while (client_replies.write_queue.head()) |write| {
if (client_replies.writing.isSet(write.slot.index)) return;

var reads = client_replies.reads.iterate();
while (reads.next()) |read| {
if (read.slot.index == write.slot.index) return;
}

const message = write.message;
_ = client_replies.write_queue.pop();

// Zero sector padding to ensure deterministic storage.
const size = message.header.size;
const size_ceil = vsr.sector_ceil(size);
@memset(message.buffer[size..size_ceil], 0);

client_replies.writing.set(write.slot.index);
client_replies.storage.write_sectors(
write_reply_callback,
Expand Down

0 comments on commit 0be16be

Please sign in to comment.