From be2091a5671a65b41b9df97132b7ba65ba4203ae Mon Sep 17 00:00:00 2001 From: Alex Kladov Date: Fri, 2 Feb 2024 19:31:13 +0000 Subject: [PATCH] vsr: fix races in client_replies - it might be the case that `writing` bitset is not set, but there's a write for the slot in the queue: replies can wait on read to complete. - when a faulty read completes, it might clobber faulty bit, unset by a a write which was scheduled after the read. SEED: 2517747396662708227 Closes: #1511 --- src/iops.zig | 29 +++++++++++++++++++++-------- src/vsr/client_replies.zig | 15 ++++++++++++--- src/vsr/replica.zig | 8 ++++++-- 3 files changed, 39 insertions(+), 13 deletions(-) diff --git a/src/iops.zig b/src/iops.zig index c896f0f8bb..4f5430d3cc 100644 --- a/src/iops.zig +++ b/src/iops.zig @@ -40,15 +40,19 @@ pub fn IOPS(comptime T: type, comptime size: u6) type { return size - self.available(); } - pub const Iterator = struct { - iops: *Self, - bitset_iterator: Map.Iterator(.{ .kind = .unset }), + fn IteratorType(comptime IOPSPointer: type, comptime ItemPointer: type) type { + return struct { + iops: IOPSPointer, + bitset_iterator: Map.Iterator(.{ .kind = .unset }), + + pub fn next(iterator: *@This()) ?ItemPointer { + const i = iterator.bitset_iterator.next() orelse return null; + return &iterator.iops.items[i]; + } + }; + } - pub fn next(iterator: *@This()) ?*T { - const i = iterator.bitset_iterator.next() orelse return null; - return &iterator.iops.items[i]; - } - }; + pub const Iterator = IteratorType(*Self, *T); pub fn iterate(self: *Self) Iterator { return .{ @@ -56,6 +60,15 @@ pub fn IOPS(comptime T: type, comptime size: u6) type { .bitset_iterator = self.free.iterator(.{ .kind = .unset }), }; } + + pub const IteratorConst = IteratorType(*const Self, *const T); + + pub fn iterate_const(self: *const Self) IteratorConst { + return .{ + .iops = self, + .bitset_iterator = self.free.iterator(.{ .kind = .unset }), + }; + } }; } diff --git a/src/vsr/client_replies.zig b/src/vsr/client_replies.zig index 7fabcf243d..5d24cdbfd8 100644 --- a/src/vsr/client_replies.zig +++ b/src/vsr/client_replies.zig @@ -129,6 +129,15 @@ pub fn ClientRepliesType(comptime Storage: type) type { // Don't unref `write_queue`'s Writes — they are a subset of `writes`. } + pub fn writing_or_queued(client_replies: *const ClientReplies, slot: Slot) bool { + var writes = client_replies.writes.iterate_const(); + while (writes.next()) |write| { + if (write.slot.index == slot.index) return true; + } + assert(!client_replies.writing.isSet(slot.index)); + return false; + } + /// Returns true if the reply at the given slot is durably persisted to disk. The /// difference with `faulty` bit set is that `faulty` is cleared at the start of a write /// when the reply is still in RAM. In contrast, `reply_durable` checks that the @@ -138,7 +147,7 @@ pub fn ClientRepliesType(comptime Storage: type) type { slot: Slot, ) bool { return !client_replies.faulty.isSet(slot.index) and - !client_replies.writing.isSet(slot.index); + !client_replies.writing_or_queued(slot); } pub fn read_reply_sync( @@ -148,8 +157,6 @@ pub fn ClientRepliesType(comptime Storage: type) type { ) ?*Message.Reply { const client = session.header.client; - if (!client_replies.writing.isSet(slot.index)) return null; - var writes = client_replies.writes.iterate(); var write_latest: ?*const Write = null; while (writes.next()) |write| { @@ -162,6 +169,8 @@ pub fn ClientRepliesType(comptime Storage: type) type { } } + if (write_latest == null) return null; + if (write_latest.?.message.header.checksum != session.header.checksum) { // We are writing a reply, but that's a wrong reply according to `client_sessions`. // This happens after state sync, where we update `client_sessions` without diff --git a/src/vsr/replica.zig b/src/vsr/replica.zig index 96459e2545..cb8c95f91e 100644 --- a/src/vsr/replica.zig +++ b/src/vsr/replica.zig @@ -2237,7 +2237,9 @@ pub fn ReplicaType( }); if (self.client_sessions.get_slot_for_header(reply_header)) |slot| { - self.client_replies.faulty.set(slot.index); + if (!self.client_replies.writing_or_queued(slot)) { + self.client_replies.faulty.set(slot.index); + } } return; }; @@ -4490,7 +4492,9 @@ pub fn ReplicaType( const reply = reply_ orelse { if (self.client_sessions.get_slot_for_header(reply_header)) |slot| { - self.client_replies.faulty.set(slot.index); + if (!self.client_replies.writing_or_queued(slot)) { + self.client_replies.faulty.set(slot.index); + } } else { // The read may have been a repair for an older op, // or a newer op that we haven't seen yet.