Skip to content

Commit

Permalink
fix(buffers): deadlock when seeking after entire write fails to be fl…
Browse files Browse the repository at this point in the history
…ushed
  • Loading branch information
tobz committed Jun 9, 2023
1 parent bf7d796 commit 137de0b
Show file tree
Hide file tree
Showing 4 changed files with 221 additions and 7 deletions.
8 changes: 2 additions & 6 deletions lib/vector-buffers/src/variants/disk_v2/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -901,12 +901,8 @@ where
while self.last_reader_record_id < ledger_last {
match self.next().await {
Ok(maybe_record) => {
if maybe_record.is_none() && self.last_reader_record_id == 0 {
// We've hit a point where there's no more data to read. If our "last reader record
// ID" hasn't moved at all, that means the buffer was already empty and we're caught
// up, so we just pin ourselves to where the ledger says we left off, and we're good
// to go.
self.last_reader_record_id = ledger_last;
if maybe_record.is_none() {
// We've hit the end of the current data file so we've gone as far as we can.
break;
}
}
Expand Down
95 changes: 95 additions & 0 deletions lib/vector-buffers/src/variants/disk_v2/tests/initialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,98 @@ async fn reader_doesnt_block_from_partial_write_on_last_record() {
let parent = trace_span!("reader_doesnt_block_from_partial_write_on_last_record");
fut.instrument(parent.or_current()).await;
}

#[tokio::test]
async fn reader_doesnt_block_when_ahead_of_last_record_in_current_data_file() {
// When initializing, the reader will be catching up to the last record it read, which involves
// reading individual records in the current reader data file until a record is returned whose
// record ID matches the "last record ID read" field from the ledger.
//
// If the current data file contains a valid last record when we initialize, but that last
// record is _behind_ the last record read as tracked by the ledger, then we need to ensure we
// can break out of the catch-up loop when we get to the end of the current data file.
//
// Our existing logic for corrupted event detection, and the writer's own initialization logic,
// will emit an error message when we realize that data is missing based on record ID gaps.
let _a = install_tracing_helpers();

let fut = with_temp_dir(|dir| {
let data_dir = dir.to_path_buf();

async move {
// Create a regular buffer, no customizations required.
let (mut writer, mut reader, ledger) = create_default_buffer_v2(data_dir.clone()).await;

// Write two records, and then read and acknowledge both.
//
// This puts the buffer into a state where there's data in the current data file, and
// the ledger has a non-zero record ID for where it thinks the reader needs to be. This
// ensures that the reader actually does at least two calls to `Reader::next` during
// `Reader::seek_to_next_record`, which is necessary to ensure that the reader leaves
// the default state of `self.last_reader_record_id == 0`.
let first_bytes_written = writer
.write_record(SizedRecord::new(64))
.await
.expect("should not fail to write");
writer.flush().await.expect("flush should not fail");

let second_bytes_written = writer
.write_record(SizedRecord::new(68))
.await
.expect("should not fail to write");
writer.flush().await.expect("flush should not fail");

writer.close();

let first_read = reader
.next()
.await
.expect("should not fail to read record")
.expect("should contain first record");
assert_eq!(SizedRecord::new(64), first_read);
acknowledge(first_read).await;

let second_read = reader
.next()
.await
.expect("should not fail to read record")
.expect("should contain first record");
assert_eq!(SizedRecord::new(68), second_read);
acknowledge(second_read).await;

let third_read = reader.next().await.expect("should not fail to read record");
assert!(third_read.is_none());

ledger.flush().expect("should not fail to flush ledger");

// Grab the current writer data file path before dropping the buffer.
let data_file_path = ledger.get_current_writer_data_file_path();
drop(reader);
drop(writer);
drop(ledger);

// Open the data file and truncate the second record. This will ensure that the reader
// hits EOF after the first read, which we need to do in order to exercise the logic
// that breaks out of the loop.
let initial_len = first_bytes_written as u64 + second_bytes_written as u64;
let target_len = first_bytes_written as u64;
set_file_length(&data_file_path, initial_len, target_len)
.await
.expect("should not fail to truncate data file");

// Now reopen the buffer, which should complete in a timely fashion without an immediate error.
let reopen = timeout(
Duration::from_millis(500),
create_default_buffer_v2::<_, SizedRecord>(data_dir),
)
.await;
assert!(
reopen.is_ok(),
"failed to reopen buffer in a timely fashion; likely deadlock"
);
}
});

let parent = trace_span!("reader_doesnt_block_when_ahead_of_last_record_in_current_data_file");
fut.instrument(parent.or_current()).await;
}
107 changes: 106 additions & 1 deletion lib/vector-buffers/src/variants/disk_v2/tests/invariants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use tracing::Instrument;
use super::{create_buffer_v2_with_max_data_file_size, read_next, read_next_some};
use crate::{
assert_buffer_is_empty, assert_buffer_records, assert_buffer_size, assert_enough_bytes_written,
assert_reader_writer_v2_file_positions, await_timeout, set_data_file_length,
assert_reader_last_writer_next_positions, assert_reader_writer_v2_file_positions,
await_timeout, set_data_file_length,
test::{acknowledge, install_tracing_helpers, with_temp_dir, MultiEventRecord, SizedRecord},
variants::disk_v2::{
common::{DEFAULT_FLUSH_INTERVAL, MAX_FILE_ID},
Expand Down Expand Up @@ -820,3 +821,107 @@ async fn writer_updates_ledger_when_buffered_writer_reports_implicit_flush() {
})
.await;
}

#[tokio::test]
async fn reader_writer_positions_aligned_through_multiple_files_and_records() {
// This test ensures that the reader/writer position stay aligned through multiple records and
// data files. This is to say, that, if we write 5 records, each with 10 events, and then read
// and acknowledge all of those events... the writer's next record ID should be 51 (the 50th
// event would correspond to ID 50, so next ID would be 51) and the reader's last read record ID
// should be 50.
//
// Testing this across multiple data files isn't super germane to the position logic, but it
// just ensures we're also testing that aspect.

let _a = install_tracing_helpers();
let fut = with_temp_dir(|dir| {
let data_dir = dir.to_path_buf();

async move {
// Create our buffer with an arbitrarily low maximum data file size. We'll use this to
// cntrol how many records make it into a given data file. Just another way to ernsure
// we're testing the position logic with multiple writes to one data file, one write to
// a data file, etc.
let (mut writer, mut reader, ledger) =
create_buffer_v2_with_max_data_file_size(data_dir, 256).await;

// We'll write multi-event records with N events based on these sizes, and as we do so,
// we'll assert that our writer position moves as expected after the write, and that
// after reading and acknowledging, the reader position also moves as expected.
let record_sizes = &[176, 52, 91, 137, 54, 87];

let mut expected_writer_position = ledger.state().get_next_writer_record_id();
let mut expected_reader_position = ledger.state().get_last_reader_record_id();
let mut trailing_reader_position_delta = 0;

for record_size in record_sizes {
// Initial check before writing/reading the next record.
assert_reader_last_writer_next_positions!(
ledger,
expected_reader_position,
expected_writer_position
);

let record = MultiEventRecord::new(*record_size);
assert_eq!(record.event_count() as u32, *record_size);

writer
.write_record(record)
.await
.expect("write should not fail");
writer.flush().await.expect("flush should not fail");

expected_writer_position += u64::from(*record_size);

// Make sure the writer position advanced after flushing.
assert_reader_last_writer_next_positions!(
ledger,
expected_reader_position,
expected_writer_position
);

let record_via_read = read_next_some(&mut reader).await;
assert_eq!(record_via_read, MultiEventRecord::new(*record_size));
acknowledge(record_via_read).await;

// Increment the expected reader position by the trailing reader position delta, and
// then now that we've done a read, we should be able to have seen actually move
// forward.
expected_reader_position += trailing_reader_position_delta;
assert_reader_last_writer_next_positions!(
ledger,
expected_reader_position,
expected_writer_position
);

// Set the trailing reader position delta to the record we just read.
//
// We do it this way because reads themselves have to drive acknowledgement logic to
// then drive updates to the ledger, so we will only see the change in the reader's
// position the _next_ time we do a read.
trailing_reader_position_delta = u64::from(*record_size);
}

// Close the writer and do a final read, thus driving the acknowledgement logic, and
// position update logic, before we do our final position check.
writer.close();
assert_eq!(reader.next().await, Ok(None));

// Calculate the absolute reader/writer positions we would expect based on all of the
// records/events written and read. This is to double check our work and make sure that
// the "expected" positions didn't hide any bugs from us.
let expected_final_reader_position =
record_sizes.iter().copied().map(u64::from).sum::<u64>();
let expected_final_writer_position = expected_final_reader_position + 1;

assert_reader_last_writer_next_positions!(
ledger,
expected_final_reader_position,
expected_final_writer_position
);
}
});

let parent = trace_span!("reader_writer_positions_aligned_through_multiple_files_and_records");
fut.instrument(parent.or_current()).await;
}
18 changes: 18 additions & 0 deletions lib/vector-buffers/src/variants/disk_v2/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,24 @@ macro_rules! assert_reader_writer_v2_file_positions {
}};
}

#[macro_export]
macro_rules! assert_reader_last_writer_next_positions {
($ledger:expr, $reader_expected:expr, $writer_expected:expr) => {{
let reader_actual = $ledger.state().get_last_reader_record_id();
let writer_actual = $ledger.state().get_next_writer_record_id();
assert_eq!(
$reader_expected, reader_actual,
"expected reader last read record ID of {}, got {} instead",
$reader_expected, reader_actual,
);
assert_eq!(
$writer_expected, writer_actual,
"expected writer next record ID of {}, got {} instead",
$writer_expected, writer_actual,
);
}};
}

#[macro_export]
macro_rules! assert_enough_bytes_written {
($written:expr, $record_type:ty, $record_payload_size:expr) => {
Expand Down

0 comments on commit 137de0b

Please sign in to comment.