Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -263,14 +263,42 @@ std::map<size_t, size_t>::const_iterator AlwaysContiguousDataStreamBufferImpl::G
return iter;
}

std::string_view AlwaysContiguousDataStreamBufferImpl::Get(size_t pos) const {
void AlwaysContiguousDataStreamBufferImpl::EnforceTimestampMonotonicity(size_t pos,
size_t chunk_end) {
// Get timestamp for chunk which is <= pos.
auto it = timestamps_.upper_bound(pos);
if (it == timestamps_.begin()) {
return;
}
--it;

// Loop from chunk_start up to but not including chunk_end.
// The next element after chunk_end should not be part of the contiguous block.
prev_timestamp_ = 0;
for (; it != timestamps_.end() && it->first < chunk_end; ++it) {
if (prev_timestamp_ > 0 && it->second < prev_timestamp_) {
LOG(WARNING) << absl::Substitute(
"For chunk pos $0, detected non-monotonically increasing timestamp $1. Adjusting to "
"previous timestamp + 1: $2",
it->first, it->second, prev_timestamp_ + 1);
it->second = prev_timestamp_ + 1;
}
prev_timestamp_ = it->second;
}
}

std::string_view AlwaysContiguousDataStreamBufferImpl::Get(size_t pos) {
auto iter = GetChunkForPos(pos);
if (iter == chunks_.cend()) {
return {};
}

size_t chunk_pos = iter->first;
size_t chunk_size = iter->second;
size_t chunk_pos = iter->first; // start of contiguous head
size_t chunk_size = iter->second; // size of contiguous (already merged in Add)

// since we only call Get() in Head() and the event parser fully processes a contiguous head,
// we need only enforce timestamp monotonicity once per head.
EnforceTimestampMonotonicity(chunk_pos, chunk_pos + chunk_size);

ssize_t bytes_available = chunk_size - (pos - chunk_pos);
DCHECK_GT(bytes_available, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,10 @@ class AlwaysContiguousDataStreamBufferImpl : public DataStreamBufferImpl {
size_t EndPosition();

// Get a string_view for the chunk at pos.
std::string_view Get(size_t pos) const;
std::string_view Get(size_t pos);

// Ensure that timestamps are monotonically increasing for a given chunk.
void EnforceTimestampMonotonicity(size_t chunk_start, size_t chunk_end);

const size_t capacity_;
const size_t max_gap_size_;
Expand All @@ -101,6 +104,7 @@ class AlwaysContiguousDataStreamBufferImpl : public DataStreamBufferImpl {
// Unlike chunks_, which will fuse when adjacent, timestamps never fuse.
// Also, we don't track gaps in the buffer with timestamps; must use chunks_ for that.
std::map<size_t, uint64_t> timestamps_;
size_t prev_timestamp_ = 0;
};

} // namespace protocols
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,20 +96,7 @@ class DataStreamBuffer {
* @param pos The logical position of the data.
* @return The timestamp or error if the position does not contain valid data.
*/
StatusOr<uint64_t> GetTimestamp(size_t pos) {
StatusOr<uint64_t> timestamp_ns_status = impl_->GetTimestamp(pos);
if (!timestamp_ns_status.ok()) {
return timestamp_ns_status;
}
uint64_t current_timestamp_ns = timestamp_ns_status.ConsumeValueOrDie();
if (current_timestamp_ns < prev_timestamp_ns_) {
LOG(WARNING) << "Detected non-monotonically increasing timestamp " << current_timestamp_ns
<< ". Adjusting to previous timestamp + 1: " << prev_timestamp_ns_ + 1;
current_timestamp_ns = prev_timestamp_ns_ + 1;
}
prev_timestamp_ns_ = current_timestamp_ns;
return current_timestamp_ns;
}
StatusOr<uint64_t> GetTimestamp(size_t pos) { return impl_->GetTimestamp(pos); }

/**
* Remove n bytes from the head of the buffer.
Expand Down Expand Up @@ -162,7 +149,6 @@ class DataStreamBuffer {

private:
std::unique_ptr<DataStreamBufferImpl> impl_;
uint64_t prev_timestamp_ns_ = 0;
};

} // namespace protocols
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,15 @@ TEST_P(DataStreamBufferTest, Timestamp) {
EXPECT_OK_AND_EQ(stream_buffer.GetTimestamp(7), 4);
EXPECT_NOT_OK(stream_buffer.GetTimestamp(8));

// Test automatic adjustment of non-monotonic timestamp
stream_buffer.Add(8, "89", 3); // timestamp is 3, which is less than previous timestamp 4
// Same timestamp as previous timestamp
stream_buffer.Add(8, "89", 4);
EXPECT_EQ(stream_buffer.Head(), "123456789");
EXPECT_OK_AND_EQ(stream_buffer.GetTimestamp(8),
EXPECT_OK_AND_EQ(stream_buffer.GetTimestamp(8), 4);

// Test automatic adjustment of non-monotonic timestamp
stream_buffer.Add(10, "ab", 3); // timestamp is 3, which is less than previous timestamp 4
EXPECT_EQ(stream_buffer.Head(), "123456789ab");
EXPECT_OK_AND_EQ(stream_buffer.GetTimestamp(10),
5); // timestamp is adjusted to previous timestamp + 1
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,19 @@ void LazyContiguousDataStreamBufferImpl::MergeContiguousEventsIntoHead() {
}

it = events_.begin();
// end_it stopped at the first non-contiguous event (at the end of current head)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we guaranteed to call MergeContiguousEventsIntoHead for the situations we care about? I noticed that Head only calls this function if IsHeadAndEventsMergeable is true (source).

Copy link
Copy Markdown
Member Author

@benkilimnik benkilimnik Nov 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IsHeadAndEventsMergeable is true whenever we have events that line up with the end of the previous head_ such that we can merge at least one event from events_.

Since we only check for monotonicity when merging events into head_, any existing head_ will have monotonic timestamps (stored in head_pos_to_ts_).

Does that answer your question?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, just wanted to double check my understanding of that merge logic 👍

while (it != end_it) {
size_t event_size = it->second.data.size();
memcpy(new_buffer->Data() + offset, it->second.data.data(), event_size);
// Ensure that the event timestamps are monotonically increasing for a given contiguous head
if (prev_timestamp_ > 0 && it->second.timestamp < prev_timestamp_) {
LOG(WARNING) << absl::Substitute(
"Detected non-monotonically increasing timestamp $0. Adjusting to previous timestamp + "
"1: $1",
it->second.timestamp, prev_timestamp_ + 1);
it->second.timestamp = prev_timestamp_ + 1;
}
prev_timestamp_ = it->second.timestamp;
head_pos_to_ts_.emplace(it->first, it->second.timestamp);
offset += event_size;
events_size_ -= event_size;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ class LazyContiguousDataStreamBufferImpl : public DataStreamBufferImpl {
size_t head_position_ = 0;
std::unique_ptr<FixedSizeContiguousBuffer> head_;
std::map<size_t, uint64_t> head_pos_to_ts_;
uint64_t prev_timestamp_ = 0;

std::map<size_t, Event> events_;
size_t events_size_ = 0;
Expand Down