From 0be16beef8e8d24c6eab8f78e86a85beaf74f54b Mon Sep 17 00:00:00 2001 From: Alex Kladov Date: Mon, 5 Feb 2024 12:16:53 +0000 Subject: [PATCH] vsr: fix races in client_replies - it might be the case that `writing` bitset is not set, but there's a write for the slot in the queue: replies can wait on read to complete. - when a faulty read completes, it might clobber faulty bit, unset by a a write which was scheduled after the read. The specific series of events here: 1. Replica receives a RequestReply and starts a reply read 2. The read completes with a failure, replica sets the faulty bit 3. Replica receives RequestReply starts a reply read 4. Replica receives Reply and starts a reply write - the write unsets faulty bit - the write doesn't start, because there's a read executing 4. The read completes, setting the faulty bit _again_ 5. Replica receives RequestReply - It _doesn't_ start reply read, because there's an in-progress write that can resolve a read. - But the faulty bit is set, tripping up an assertion. The root issue here is the race between a read and a write for the same reply. Remove the race by explicitly handling the interleaving: * When submitting a read, resolve it immediately if there's a pending write (this was already handled by `read_reply_sync`) * When submitting a write, resolve any pending reads for the same reply. * Remove the code to block the write while the read is in-progress, as this is no longer possible. Note that it is still possible that a read and a write for the same slot race, if they target different replies. In this case, there won't be clobbering, as, when the read completes, we double-check freshness by consulting `client_sessions`. SEED: 2517747396662708227 Closes: https://github.com/tigerbeetle/tigerbeetle/issues/1511 --- src/vsr/client_replies.zig | 43 +++++++++++++++++++++++++++++--------- 1 file changed, 33 insertions(+), 10 deletions(-) diff --git a/src/vsr/client_replies.zig b/src/vsr/client_replies.zig index 7fabcf243d..f57cbc3d81 100644 --- a/src/vsr/client_replies.zig +++ b/src/vsr/client_replies.zig @@ -52,7 +52,7 @@ pub fn ClientRepliesType(comptime Storage: type) type { const Read = struct { client_replies: *ClientReplies, completion: Storage.Read, - callback: *const fn ( + callback: ?*const fn ( client_replies: *ClientReplies, reply_header: *const vsr.Header.Reply, reply: ?*Message.Reply, @@ -243,6 +243,15 @@ pub fn ClientRepliesType(comptime Storage: type) type { client_replies.write_reply_next(); } + if (callback == null) { + log.warn("{}: read_reply: already resolved (client={} reply={})", .{ + client_replies.replica, + header.client, + header.checksum, + }); + return; + } + if (!message.header.valid_checksum() or !message.header.valid_checksum_body(message.body())) { @@ -252,7 +261,7 @@ pub fn ClientRepliesType(comptime Storage: type) type { header.checksum, }); - callback(client_replies, &header, null, destination_replica); + callback.?(client_replies, &header, null, destination_replica); return; } @@ -268,7 +277,7 @@ pub fn ClientRepliesType(comptime Storage: type) type { message.header.checksum, }); - callback(client_replies, &header, null, destination_replica); + callback.?(client_replies, &header, null, destination_replica); return; } @@ -281,7 +290,7 @@ pub fn ClientRepliesType(comptime Storage: type) type { header.checksum, }); - callback(client_replies, &header, message, destination_replica); + callback.?(client_replies, &header, message, destination_replica); } pub fn ready_sync(client_replies: *ClientReplies) bool { @@ -338,6 +347,26 @@ pub fn ClientRepliesType(comptime Storage: type) type { }, } + // Resolve any pending reads for this reply. + // If we don't do this, an earlier started read can complete with an error, and + // erroneously clobber the faulty bit. + // For simplicity, resolve the reads synchronously, instead of going through next tick + // machinery. + var reads = client_replies.reads.iterate(); + while (reads.next()) |read| { + if (read.callback == null) continue; // Already resolved. + if (read.header.checksum == message.header.checksum) { + defer read.callback = null; + + read.callback.?( + client_replies, + &read.header, + message, + read.destination_replica, + ); + } + } + // Clear the fault *before* the write completes, not after. // Otherwise, a replica exiting state sync might mark a reply as faulty, then the // ClientReplies clears that bit due to an unrelated write that was already queued. @@ -371,11 +400,6 @@ pub fn ClientRepliesType(comptime Storage: type) type { while (client_replies.write_queue.head()) |write| { if (client_replies.writing.isSet(write.slot.index)) return; - var reads = client_replies.reads.iterate(); - while (reads.next()) |read| { - if (read.slot.index == write.slot.index) return; - } - const message = write.message; _ = client_replies.write_queue.pop(); @@ -383,7 +407,6 @@ pub fn ClientRepliesType(comptime Storage: type) type { const size = message.header.size; const size_ceil = vsr.sector_ceil(size); @memset(message.buffer[size..size_ceil], 0); - client_replies.writing.set(write.slot.index); client_replies.storage.write_sectors( write_reply_callback,