-
Notifications
You must be signed in to change notification settings - Fork 552
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
archival: Add offset_range_size method to the storage::log #13927
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks neat. I understand avoiding the batch cache, but why not go through the readers cache?
5a9682c
to
f5b9af4
Compare
ducktape was retried in job https://buildkite.com/redpanda/redpanda/builds/38576#018b15bf-a837-4a2d-bb9e-9dcc405bc341 |
4d025a5
to
6fda80f
Compare
new failures detected in https://buildkite.com/redpanda/redpanda/builds/40235#018b8b12-6e0e-4316-b2d3-ad6cd8e762ba: "rptest.tests.cluster_features_test.FeaturesSingleNodeTest.test_get_features" |
new failures detected in https://buildkite.com/redpanda/redpanda/builds/40235#018b8b12-6e11-49da-acf9-e7d2d52e0588: "rptest.tests.cluster_features_test.FeaturesSingleNodeUpgradeTest.test_upgrade" |
new failures detected in https://buildkite.com/redpanda/redpanda/builds/40235#018b8b12-6e15-461b-98f6-f3f4add83f13: "rptest.tests.node_pool_migration_test.NodePoolMigrationTest.test_migrating_redpanda_nodes_to_new_pool.balancing_mode=node_add" |
new failures detected in https://buildkite.com/redpanda/redpanda/builds/40235#018b8b12-6e18-4b90-8a98-df658a37a2f7: "rptest.tests.node_pool_migration_test.NodePoolMigrationTest.test_migrating_redpanda_nodes_to_new_pool.balancing_mode=off" |
new failures detected in https://buildkite.com/redpanda/redpanda/builds/40311#018b905d-db3b-499d-b8c7-f9884842adf2: "rptest.tests.tiered_storage_model_test.TieredStorageTest.test_tiered_storage.cloud_storage_type=CloudStorageType.S3.test_case=.TS_Read==True.AdjacentSegmentMergerReupload==True" |
new failures detected in https://buildkite.com/redpanda/redpanda/builds/40311#018b905d-db42-4df8-8ced-824474b78247: "rptest.tests.tiered_storage_model_test.TieredStorageTest.test_tiered_storage.cloud_storage_type=CloudStorageType.ABS.test_case=.TS_Read==True.AdjacentSegmentMergerReupload==True" |
new failures detected in https://buildkite.com/redpanda/redpanda/builds/40311#018b905d-db45-4711-929c-5ad180e97c75: "rptest.tests.offset_for_leader_epoch_test.OffsetForLeaderEpochTest.test_offset_for_leader_epoch_transfer" |
new failures detected in https://buildkite.com/redpanda/redpanda/builds/40311#018b905d-db3e-4d6b-be5b-61bb7c0b881e: "rptest.tests.rpk_registry_test.RpkRegistryTest.test_produce_consume_proto" |
/ci-repeat 1 |
6fda80f
to
fbeb495
Compare
new failures in https://buildkite.com/redpanda/redpanda/builds/42192#018c3450-60bd-4327-b78f-6c1cdfb0b985:
new failures in https://buildkite.com/redpanda/redpanda/builds/43904#018d1e55-2030-45ce-a2b7-a6437ed73083:
new failures in https://buildkite.com/redpanda/redpanda/builds/43904#018d1e55-202a-4620-9510-6a3dd28b53d3:
new failures in https://buildkite.com/redpanda/redpanda/builds/44004#018d2739-57c8-41c3-89a8-b06d7b32d1ab:
new failures in https://buildkite.com/redpanda/redpanda/builds/44004#018d2739-57d3-4fbe-980c-16d74ee0f278:
new failures in https://buildkite.com/redpanda/redpanda/builds/44004#018d2739-57cc-4694-8929-6e0172af439c:
new failures in https://buildkite.com/redpanda/redpanda/builds/44004#018d2739-57cf-4952-afbb-4ad8cb55b15b:
new failures in https://buildkite.com/redpanda/redpanda/builds/44240#018d3d9e-ffb1-41df-957c-5d67c059f032:
new failures in https://buildkite.com/redpanda/redpanda/builds/44535#018d6097-cb18-4eb6-9d0f-24e28ec59d17:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like we're at most doing a subscan of two segments. Is it possible that the upload range covers multiple segments? If so, why don't we need to accumulate the contents of the intermediate segments?
Maybe I'm missing something, but it seems possible that there are no data records in the left or right subscan, but there are entire segments in the middle that have offsets to upload and data timestamps that get skipped.
Curious whether you considered accumulating over the entire upload range rather than just left and right, and found it to be insufficient. Even if it'd be more IO, it feels like it'd be worth avoiding the complexity and introducing new file-based dependencies (figuring out file size, etc) on the storage layer
src/v/archival/async_data_uploader.h
Outdated
|
||
/// Result of the upload size calculation. | ||
/// Contains size of the region in bytes and locks. | ||
struct cloud_upload_parameters { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think "metadata" would be a more accurate name. To me "parameter" implies that these will affect how the upload happens
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It actually affects the created upload. This is a result of the reconciliation process which is used to find the upload candidate. Then the actual upload should be created out of this structure. The upload is a bytestream + metadata for the manifest. I'll try to come up with a better name but metadata
is a bit loaded. Maybe upload_reconciliation_result
?
private: | ||
model::record_batch_reader _reader; | ||
iobuf _buffer; | ||
ssize_t _max_bytes; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: why does this need to be signed?
src/v/archival/async_data_uploader.h
Outdated
// Scan te beginning of the segment range | ||
ss::future<result<subscan_res>> subscan_left( | ||
std::vector<ss::lw_shared_ptr<storage::segment>> segments, | ||
model::offset range_base, | ||
model::timeout_clock::time_point deadline); | ||
|
||
/// Scan the end of the segment range | ||
ss::future<result<subscan_res>> subscan_right( | ||
std::vector<ss::lw_shared_ptr<storage::segment>> segments, | ||
model::offset range_last, | ||
model::timeout_clock::time_point deadline); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I feel like I'm missing a mental image of what "left" and "right" mean in this context. Could you elaborate? Maybe it'd help to be more explicit about what range_base
and range_last
are and how they are intended to be used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The subscans here are referring to having to scan partial segments, right? If so, I'm surprised that these don't take individual segments as arguments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have to upload an offset range that may span multiple segments and it may also start and stop in the middle of the segment. We have to find an upload candidate based on start offset and end offset of the upload or based on start offset and desired size of the upload. The algorithm is
- call
subscan_left
to find the exact beginning of the upload. - call
subscan_right
to find the end of the upload. - compute size and metadata of the upload based on two subscan results and segments in the middle.
The subscan uses segment index to find index entry closest to the target and then it scans up to the target to calculate precise file location. It may also scan past the target to find the data timestamp.
src/v/archival/async_data_uploader.h
Outdated
ss::future<result<subscan_res>> subscan( | ||
ss::lw_shared_ptr<cluster::partition> part, | ||
inclusive_offset_range range, | ||
size_t initial_offset, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should this be some offset type? Or perhaps name it to some filepos or something
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
src/v/archival/async_data_uploader.h
Outdated
/// the timestamp and offset of the 'target'. | ||
/// The timestamp is a data timestamp (if the target is a config batch | ||
/// the subscan will scan until the data batch is found and use first data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these are referring to the members of subscan_res? If so, could you add them to the struct definition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated the comment
src/v/archival/async_data_uploader.h
Outdated
/// Calculate upload size using segment indexes | ||
ss::future<result<cloud_upload_parameters>> compute_upload_parameters( | ||
inclusive_offset_range range, model::timeout_clock::time_point deadline); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be nice to avoid this dependency on file index... Perhaps we should bake a size estimate interface into the storage layer or log reader
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, but at the moment this is not possible.
co_return std::move(upl); | ||
} | ||
|
||
ss::future<result<void>> segment_upload::initialize( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: alternatively make this a static constructor, and make segment_upload
's constructor take the params? Then we wouldn't need to worry about checking for inited
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is that this initialization is asynchronous so we have to expose some sort of async method anyway.
// timestamp. | ||
result->done = true; | ||
co_return ss::stop_iteration::yes; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's still possible that returned timestamp isn't initialized if all records aren't data batches. Is that intentional?
If we're unable to find a timestamp in this range, I wonder if it makes sense to just use the preceding remote segment's max timestamp (not at this level of abstraction, somewhere else)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is possible to do this and I was actually going to use it in this case (if we have preceding segment in the manifest). But if we scanning the end of the segment we don't have such option. This could be tuned in a followup when this code will be used to start uploads.
} | ||
} | ||
|
||
ss::future<result<segment_upload::subscan_res>> segment_upload::subscan( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: gate and logger aside, feels like this should be a static storage utility method
Eliminate scheduling point inside the loop. The scheduling point may cause the iteration to become invalidated which is an UB.
Allow index entry to be an optional to get rid of the code that initializes entry to default values.
bd5800f
to
06b2c72
Compare
Return std::optional instead of encoding absense of the value using the fields of the struct.
Test failures is #16308 |
|
||
auto holders = co_await ss::when_all_succeed( | ||
std::begin(f_locks), std::end(f_locks)); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect we'll need to check if the segment is closed while the locks are held, and throw if so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the close
method of the segment is trying to acquire the write lock so it should be impossible to get the read lock for the closed segment. but I added the check just in case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does take the write lock, but it drops it once the segment is closed, after which even if we have the read lock, I don't think we'll be able to read the segment
std::optional<entry> find_above_size_bytes(size_t distance); | ||
/// Find entry by file offset (the value will undershoot or find precise | ||
/// match) | ||
std::optional<entry> find_below_size_bytes(size_t distance); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bumping this question
5b594e0
to
8070e50
Compare
CI failure is unrelated #16402 |
storage::log
storage::log
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry the reviews here have dragged on here. Structurally, and ignoring side conversations about the right interface between storage and archival, I think this is looking pretty good.
While I appreciate that this has already split up the storage/archival changes, it'd really help as a reviewer to break things down even further. One approach to consider is to split out PRs into independtly testable units. For instance, this PR could probably have been split into:
- new method for the segment_index
- batch cache option for the log reader
- get_file_offset() + maybe batch_size_accumulator
- offset_range_size(start, last)
- offset_range_size(start, size)
It's much harder to introduce bugs when the PRs are small. That said, given there's already quite a lot of review on this PR, I can understand if you would prefer to punt on this.
for (; it < _segs.end(); it++) { | ||
if (it->get()->is_closed()) { | ||
co_return std::nullopt; | ||
} | ||
if (*it == first_segment) { | ||
vlog( | ||
stlog.debug, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can/should we be iterating over segments
? It doesn't look like there are scheduling points below, but it's a little surprising to not use segments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe in a followup
if (num_segments == 1) { | ||
// There are two cases here. | ||
// 1. The segment has at least target_size bytes after | ||
// base_file_pos, | ||
// 2. The segment is too small. In this case we need to clamp | ||
// the result. | ||
truncate_after = first_segment_file_pos + target.target_size; | ||
truncate_after = std::clamp( | ||
truncate_after, | ||
first_segment_file_pos, | ||
it->get()->file_size()); | ||
} else { | ||
// In this case we need to find the truncation point | ||
// always starting from the beginning of the segment. | ||
// | ||
// prev is guaranteed to be smaller than target_size | ||
// because we reached this branch. | ||
auto prev = current_size - it->get()->file_size(); | ||
auto delta = target.target_size - prev; | ||
truncate_after = delta; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little confused about why this has to be two distinct cases. In either case, isn't the idea to remove current_size - target.target_size
bytes? And aren't we guaranteed that it's this segment that pushed us over the top?
Would something like this work:
auto cur_seg_pos = it->get()->file_size(); // Above calculation `current_size` has assumed that we've accepted to the end of the file, and now we need to truncate.
auto truncate_by = current_size - target.target_size;
auto truncate_after = cur_seg_pos - truncate_by;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the computation is different in both cases
- single segment, the current_size includes full segment size - the prefix which is not included into the result. we need to use the size of the truncated prefix to locate the end of the offset range.
- more than one segment, current_size includes more than one segment, so we don't need to use the size of the prefix;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to use the size of the truncated prefix to locate the end of the offset range.
Right, but doesn't current_size
already account for the missing (or not) prefix?
I suspect these are the same, translating my suggestion, ignoring the clamp:
case of 1 segment:
cur_seg_pos = first_seg_file_size
current_size = first_seg_file_size - first_seg_file_pos
truncate_by = first_seg_file_size - first_seg_file_pos - target.target_size
truncate_after = first_seg_file_size - (first_seg_file_size - first_seg_file_pos - target.target_size)
... = first_seg_file_pos + target.target_size (same as L2147, and sure we can clamp after)
case of more than 1 segment:
cur_seg_pos = iter->file_size()
current_size = size of previous segments + iter->file_size()
truncate_by = size of previous segments + iter->file_size() - target.target_size
truncate_after = iter->file_size() - (size of previous segments + iter->file_size() - target.target_size)
... = target.target_size - size of previous segments (same as L2160)
After writing it all out, I'm more convinced these are the same, though it's somewhat a nit. I do think it makes the code more readable though.
} else if (current_size > target.min_size) { | ||
vlog( | ||
stlog.debug, | ||
"Setting offset range to {} - {}", | ||
first, | ||
it->get()->offsets().committed_offset); | ||
// We can include full segment to the list of segments | ||
last_included_offset = it->get()->offsets().committed_offset; | ||
continue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this gated on current_size > target.min_size
? Doesn't the current_size < target.min_size
check below kind of encompass that? Or put another way, if the first segments we iterated through didn't meet the min_size
requirement, why is it important to avoid setting this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the previous version(s) of this code the variable was used to indicate that we found the offset range.
After the return type was changed to optional I started using std::nullopt to return empty result. I'm inclined to keep it the way it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, thanks for the explanation. IMO it makes it more confusing leaving it in, especially because there are so many branches in this code already
Lock the whole segment range in advance. Also, fix the error in compacted test case by handling the situation when the whole offset range is fully compacted and has size 0.
8070e50
to
0e95ffc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remaining comments are pretty much nits. I know it's been a long process, but thanks for continuing to improve this code!
@@ -153,6 +153,12 @@ class segment_index { | |||
size_t filepos); | |||
std::optional<entry> find_nearest(model::offset); | |||
std::optional<entry> find_nearest(model::timestamp); | |||
/// Find entry by file offset (the value may overshoot or find precise | |||
/// match) | |||
std::optional<entry> find_above_size_bytes(size_t distance); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this be const
?
std::optional<entry> find_above_size_bytes(size_t distance); | ||
/// Find entry by file offset (the value will undershoot or find precise | ||
/// match) | ||
std::optional<entry> find_below_size_bytes(size_t distance); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this be const?
struct batch_size_accumulator { | ||
ss::future<ss::stop_iteration> operator()(model::record_batch b) { | ||
vassert( | ||
result_size_bytes != nullptr, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems to me like you could add a constructor to this struct and not pay this cost on each operator() invocation?
auto reader = co_await make_reader(reader_cfg); | ||
|
||
try { | ||
co_await std::move(reader).consume(acc, model::no_timeout); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i thought that consume() allowed the consumer to return a value, which would mean, you probably don't need to pass in a pointer to size_bytes in order to get size_bytes back out after the consumer is done processing data from the reader?
src/v/storage/disk_log_impl.cc
Outdated
const auto& offsets = it->get()->offsets(); | ||
auto r_lock = co_await it->get()->read_lock(); | ||
if (offsets.base_offset <= first && offsets.committed_offset >= first) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
generally speaking anytime we have a reference to something accessed via an iterator held across co_await
calls we should see a comment that explains why it is safe. can you add that here?
src/v/storage/disk_log_impl.cc
Outdated
auto file_pos = co_await get_file_offset( | ||
*it, index_entry, first, false, io_priority); | ||
auto sz = it->get()->file_size() - file_pos; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how is any of this safe w.r.t. iterator invalidations?
@@ -3667,3 +3668,183 @@ FIXTURE_TEST(test_offset_range_size, storage_test_fixture) { | |||
|
|||
#endif | |||
}; | |||
|
|||
FIXTURE_TEST(test_offset_range_size2, storage_test_fixture) { | |||
#ifdef NDEBUG |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there should definitely be a comment explaining why this doesn't work in debug mode.
@@ -3848,3 +3848,175 @@ FIXTURE_TEST(test_offset_range_size2, storage_test_fixture) { | |||
|
|||
#endif | |||
}; | |||
|
|||
FIXTURE_TEST(test_offset_range_size_compacted, storage_test_fixture) { | |||
#ifdef NDEBUG |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there should definitely be a comment explaining why this doesn't work in debug mode.
@@ -4020,3 +4031,262 @@ FIXTURE_TEST(test_offset_range_size_compacted, storage_test_fixture) { | |||
|
|||
#endif | |||
}; | |||
|
|||
FIXTURE_TEST(test_offset_range_size2_compacted, storage_test_fixture) { | |||
#ifdef NDEBUG |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why doesn't this work in debug mode
This PR adds a
offset_range_size
methods to thestorage::log
. The implementation instorage::disk_log_impl
uses segment indexes to find approximate location of the start of the range and approximate location of the last batch. Then it performs short scan to find the precise location.There are two overloads of the method.
The method is supposed to be used to create S3 uploads. Currently, the code reads from disk directly and therefore is coupled with the storage format and a lot of internal storage details. Our goal is to decouple local and remote storage by providing clear API boundary.
Backports Required
Release Notes