diff --git a/lib/vector-buffers/src/variants/disk_v2/reader.rs b/lib/vector-buffers/src/variants/disk_v2/reader.rs index 2587160f3becab..7d4da1faafa1e9 100644 --- a/lib/vector-buffers/src/variants/disk_v2/reader.rs +++ b/lib/vector-buffers/src/variants/disk_v2/reader.rs @@ -901,12 +901,8 @@ where while self.last_reader_record_id < ledger_last { match self.next().await { Ok(maybe_record) => { - if maybe_record.is_none() && self.last_reader_record_id == 0 { - // We've hit a point where there's no more data to read. If our "last reader record - // ID" hasn't moved at all, that means the buffer was already empty and we're caught - // up, so we just pin ourselves to where the ledger says we left off, and we're good - // to go. - self.last_reader_record_id = ledger_last; + if maybe_record.is_none() { + // We've hit the end of the current data file so we've gone as far as we can. break; } } diff --git a/lib/vector-buffers/src/variants/disk_v2/tests/initialization.rs b/lib/vector-buffers/src/variants/disk_v2/tests/initialization.rs index 8c44937563170f..217057d3e0c7ec 100644 --- a/lib/vector-buffers/src/variants/disk_v2/tests/initialization.rs +++ b/lib/vector-buffers/src/variants/disk_v2/tests/initialization.rs @@ -87,3 +87,98 @@ async fn reader_doesnt_block_from_partial_write_on_last_record() { let parent = trace_span!("reader_doesnt_block_from_partial_write_on_last_record"); fut.instrument(parent.or_current()).await; } + +#[tokio::test] +async fn reader_doesnt_block_when_ahead_of_last_record_in_current_data_file() { + // When initializing, the reader will be catching up to the last record it read, which involves + // reading individual records in the current reader data file until a record is returned whose + // record ID matches the "last record ID read" field from the ledger. + // + // If the current data file contains a valid last record when we initialize, but that last + // record is _behind_ the last record read as tracked by the ledger, then we need to ensure we + // can break out of the catch-up loop when we get to the end of the current data file. + // + // Our existing logic for corrupted event detection, and the writer's own initialization logic, + // will emit an error message when we realize that data is missing based on record ID gaps. + let _a = install_tracing_helpers(); + + let fut = with_temp_dir(|dir| { + let data_dir = dir.to_path_buf(); + + async move { + // Create a regular buffer, no customizations required. + let (mut writer, mut reader, ledger) = create_default_buffer_v2(data_dir.clone()).await; + + // Write two records, and then read and acknowledge both. + // + // This puts the buffer into a state where there's data in the current data file, and + // the ledger has a non-zero record ID for where it thinks the reader needs to be. This + // ensures that the reader actually does at least two calls to `Reader::next` during + // `Reader::seek_to_next_record`, which is necessary to ensure that the reader leaves + // the default state of `self.last_reader_record_id == 0`. + let first_bytes_written = writer + .write_record(SizedRecord::new(64)) + .await + .expect("should not fail to write"); + writer.flush().await.expect("flush should not fail"); + + let second_bytes_written = writer + .write_record(SizedRecord::new(68)) + .await + .expect("should not fail to write"); + writer.flush().await.expect("flush should not fail"); + + writer.close(); + + let first_read = reader + .next() + .await + .expect("should not fail to read record") + .expect("should contain first record"); + assert_eq!(SizedRecord::new(64), first_read); + acknowledge(first_read).await; + + let second_read = reader + .next() + .await + .expect("should not fail to read record") + .expect("should contain first record"); + assert_eq!(SizedRecord::new(68), second_read); + acknowledge(second_read).await; + + let third_read = reader.next().await.expect("should not fail to read record"); + assert!(third_read.is_none()); + + ledger.flush().expect("should not fail to flush ledger"); + + // Grab the current writer data file path before dropping the buffer. + let data_file_path = ledger.get_current_writer_data_file_path(); + drop(reader); + drop(writer); + drop(ledger); + + // Open the data file and truncate the second record. This will ensure that the reader + // hits EOF after the first read, which we need to do in order to exercise the logic + // that breaks out of the loop. + let initial_len = first_bytes_written as u64 + second_bytes_written as u64; + let target_len = first_bytes_written as u64; + set_file_length(&data_file_path, initial_len, target_len) + .await + .expect("should not fail to truncate data file"); + + // Now reopen the buffer, which should complete in a timely fashion without an immediate error. + let reopen = timeout( + Duration::from_millis(500), + create_default_buffer_v2::<_, SizedRecord>(data_dir), + ) + .await; + assert!( + reopen.is_ok(), + "failed to reopen buffer in a timely fashion; likely deadlock" + ); + } + }); + + let parent = trace_span!("reader_doesnt_block_when_ahead_of_last_record_in_current_data_file"); + fut.instrument(parent.or_current()).await; +} diff --git a/lib/vector-buffers/src/variants/disk_v2/tests/invariants.rs b/lib/vector-buffers/src/variants/disk_v2/tests/invariants.rs index ef56bb19d3f4e1..6e0c7a251e5469 100644 --- a/lib/vector-buffers/src/variants/disk_v2/tests/invariants.rs +++ b/lib/vector-buffers/src/variants/disk_v2/tests/invariants.rs @@ -5,7 +5,8 @@ use tracing::Instrument; use super::{create_buffer_v2_with_max_data_file_size, read_next, read_next_some}; use crate::{ assert_buffer_is_empty, assert_buffer_records, assert_buffer_size, assert_enough_bytes_written, - assert_reader_writer_v2_file_positions, await_timeout, set_data_file_length, + assert_reader_last_writer_next_positions, assert_reader_writer_v2_file_positions, + await_timeout, set_data_file_length, test::{acknowledge, install_tracing_helpers, with_temp_dir, MultiEventRecord, SizedRecord}, variants::disk_v2::{ common::{DEFAULT_FLUSH_INTERVAL, MAX_FILE_ID}, @@ -820,3 +821,107 @@ async fn writer_updates_ledger_when_buffered_writer_reports_implicit_flush() { }) .await; } + +#[tokio::test] +async fn reader_writer_positions_aligned_through_multiple_files_and_records() { + // This test ensures that the reader/writer position stay aligned through multiple records and + // data files. This is to say, that, if we write 5 records, each with 10 events, and then read + // and acknowledge all of those events... the writer's next record ID should be 51 (the 50th + // event would correspond to ID 50, so next ID would be 51) and the reader's last read record ID + // should be 50. + // + // Testing this across multiple data files isn't super germane to the position logic, but it + // just ensures we're also testing that aspect. + + let _a = install_tracing_helpers(); + let fut = with_temp_dir(|dir| { + let data_dir = dir.to_path_buf(); + + async move { + // Create our buffer with an arbitrarily low maximum data file size. We'll use this to + // cntrol how many records make it into a given data file. Just another way to ernsure + // we're testing the position logic with multiple writes to one data file, one write to + // a data file, etc. + let (mut writer, mut reader, ledger) = + create_buffer_v2_with_max_data_file_size(data_dir, 256).await; + + // We'll write multi-event records with N events based on these sizes, and as we do so, + // we'll assert that our writer position moves as expected after the write, and that + // after reading and acknowledging, the reader position also moves as expected. + let record_sizes = &[176, 52, 91, 137, 54, 87]; + + let mut expected_writer_position = ledger.state().get_next_writer_record_id(); + let mut expected_reader_position = ledger.state().get_last_reader_record_id(); + let mut trailing_reader_position_delta = 0; + + for record_size in record_sizes { + // Initial check before writing/reading the next record. + assert_reader_last_writer_next_positions!( + ledger, + expected_reader_position, + expected_writer_position + ); + + let record = MultiEventRecord::new(*record_size); + assert_eq!(record.event_count() as u32, *record_size); + + writer + .write_record(record) + .await + .expect("write should not fail"); + writer.flush().await.expect("flush should not fail"); + + expected_writer_position += u64::from(*record_size); + + // Make sure the writer position advanced after flushing. + assert_reader_last_writer_next_positions!( + ledger, + expected_reader_position, + expected_writer_position + ); + + let record_via_read = read_next_some(&mut reader).await; + assert_eq!(record_via_read, MultiEventRecord::new(*record_size)); + acknowledge(record_via_read).await; + + // Increment the expected reader position by the trailing reader position delta, and + // then now that we've done a read, we should be able to have seen actually move + // forward. + expected_reader_position += trailing_reader_position_delta; + assert_reader_last_writer_next_positions!( + ledger, + expected_reader_position, + expected_writer_position + ); + + // Set the trailing reader position delta to the record we just read. + // + // We do it this way because reads themselves have to drive acknowledgement logic to + // then drive updates to the ledger, so we will only see the change in the reader's + // position the _next_ time we do a read. + trailing_reader_position_delta = u64::from(*record_size); + } + + // Close the writer and do a final read, thus driving the acknowledgement logic, and + // position update logic, before we do our final position check. + writer.close(); + assert_eq!(reader.next().await, Ok(None)); + + // Calculate the absolute reader/writer positions we would expect based on all of the + // records/events written and read. This is to double check our work and make sure that + // the "expected" positions didn't hide any bugs from us. + let expected_final_reader_position = + record_sizes.iter().copied().map(u64::from).sum::(); + let expected_final_writer_position = expected_final_reader_position + 1; + + assert_reader_last_writer_next_positions!( + ledger, + expected_final_reader_position, + expected_final_writer_position + ); + } + }); + + let parent = trace_span!("reader_writer_positions_aligned_through_multiple_files_and_records"); + fut.instrument(parent.or_current()).await; +} diff --git a/lib/vector-buffers/src/variants/disk_v2/tests/mod.rs b/lib/vector-buffers/src/variants/disk_v2/tests/mod.rs index e123a39c228d63..8c479827355fd2 100644 --- a/lib/vector-buffers/src/variants/disk_v2/tests/mod.rs +++ b/lib/vector-buffers/src/variants/disk_v2/tests/mod.rs @@ -134,6 +134,24 @@ macro_rules! assert_reader_writer_v2_file_positions { }}; } +#[macro_export] +macro_rules! assert_reader_last_writer_next_positions { + ($ledger:expr, $reader_expected:expr, $writer_expected:expr) => {{ + let reader_actual = $ledger.state().get_last_reader_record_id(); + let writer_actual = $ledger.state().get_next_writer_record_id(); + assert_eq!( + $reader_expected, reader_actual, + "expected reader last read record ID of {}, got {} instead", + $reader_expected, reader_actual, + ); + assert_eq!( + $writer_expected, writer_actual, + "expected writer next record ID of {}, got {} instead", + $writer_expected, writer_actual, + ); + }}; +} + #[macro_export] macro_rules! assert_enough_bytes_written { ($written:expr, $record_type:ty, $record_payload_size:expr) => {