Skip to content

Commit

Permalink
vsr: fix races in client_replies
Browse files Browse the repository at this point in the history
- it might be the case that `writing` bitset is not set, but there's a
  write for the slot in the queue: replies can wait on read to complete.
- when a faulty read completes, it might clobber faulty bit, unset by a
  a write which was scheduled after the read.

SEED: 2517747396662708227
Closes: #1511
  • Loading branch information
matklad committed Feb 5, 2024
1 parent 7f92126 commit be2091a
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 13 deletions.
29 changes: 21 additions & 8 deletions src/iops.zig
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,35 @@ pub fn IOPS(comptime T: type, comptime size: u6) type {
return size - self.available();
}

pub const Iterator = struct {
iops: *Self,
bitset_iterator: Map.Iterator(.{ .kind = .unset }),
fn IteratorType(comptime IOPSPointer: type, comptime ItemPointer: type) type {
return struct {
iops: IOPSPointer,
bitset_iterator: Map.Iterator(.{ .kind = .unset }),

pub fn next(iterator: *@This()) ?ItemPointer {
const i = iterator.bitset_iterator.next() orelse return null;
return &iterator.iops.items[i];
}
};
}

pub fn next(iterator: *@This()) ?*T {
const i = iterator.bitset_iterator.next() orelse return null;
return &iterator.iops.items[i];
}
};
pub const Iterator = IteratorType(*Self, *T);

pub fn iterate(self: *Self) Iterator {
return .{
.iops = self,
.bitset_iterator = self.free.iterator(.{ .kind = .unset }),
};
}

pub const IteratorConst = IteratorType(*const Self, *const T);

pub fn iterate_const(self: *const Self) IteratorConst {
return .{
.iops = self,
.bitset_iterator = self.free.iterator(.{ .kind = .unset }),
};
}
};
}

Expand Down
15 changes: 12 additions & 3 deletions src/vsr/client_replies.zig
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,15 @@ pub fn ClientRepliesType(comptime Storage: type) type {
// Don't unref `write_queue`'s Writes — they are a subset of `writes`.
}

pub fn writing_or_queued(client_replies: *const ClientReplies, slot: Slot) bool {
var writes = client_replies.writes.iterate_const();
while (writes.next()) |write| {
if (write.slot.index == slot.index) return true;
}
assert(!client_replies.writing.isSet(slot.index));
return false;
}

/// Returns true if the reply at the given slot is durably persisted to disk. The
/// difference with `faulty` bit set is that `faulty` is cleared at the start of a write
/// when the reply is still in RAM. In contrast, `reply_durable` checks that the
Expand All @@ -138,7 +147,7 @@ pub fn ClientRepliesType(comptime Storage: type) type {
slot: Slot,
) bool {
return !client_replies.faulty.isSet(slot.index) and
!client_replies.writing.isSet(slot.index);
!client_replies.writing_or_queued(slot);
}

pub fn read_reply_sync(
Expand All @@ -148,8 +157,6 @@ pub fn ClientRepliesType(comptime Storage: type) type {
) ?*Message.Reply {
const client = session.header.client;

if (!client_replies.writing.isSet(slot.index)) return null;

var writes = client_replies.writes.iterate();
var write_latest: ?*const Write = null;
while (writes.next()) |write| {
Expand All @@ -162,6 +169,8 @@ pub fn ClientRepliesType(comptime Storage: type) type {
}
}

if (write_latest == null) return null;

if (write_latest.?.message.header.checksum != session.header.checksum) {
// We are writing a reply, but that's a wrong reply according to `client_sessions`.
// This happens after state sync, where we update `client_sessions` without
Expand Down
8 changes: 6 additions & 2 deletions src/vsr/replica.zig
Original file line number Diff line number Diff line change
Expand Up @@ -2237,7 +2237,9 @@ pub fn ReplicaType(
});

if (self.client_sessions.get_slot_for_header(reply_header)) |slot| {
self.client_replies.faulty.set(slot.index);
if (!self.client_replies.writing_or_queued(slot)) {
self.client_replies.faulty.set(slot.index);
}
}
return;
};
Expand Down Expand Up @@ -4490,7 +4492,9 @@ pub fn ReplicaType(

const reply = reply_ orelse {
if (self.client_sessions.get_slot_for_header(reply_header)) |slot| {
self.client_replies.faulty.set(slot.index);
if (!self.client_replies.writing_or_queued(slot)) {
self.client_replies.faulty.set(slot.index);
}
} else {
// The read may have been a repair for an older op,
// or a newer op that we haven't seen yet.
Expand Down

0 comments on commit be2091a

Please sign in to comment.