-
Notifications
You must be signed in to change notification settings - Fork 552
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
archival: Use log reader to upload data #16999
archival: Use log reader to upload data #16999
Conversation
/// True if at least one segment is compacted | ||
bool is_compacted; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember there being a reason for this, but could you explain again why we can't use size_bytes
to infer whether segments have been compacted? Presumably if we're reuploading, we specifically care about whether size_bytes is decreasing, no? Or is is_compacted
used for something else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in this place it's just for convenience
we know in advance that we're requesting compacted range
The reason why we not relying on size is that segment alignment doesn't necessary match in cloud storage and local storage so it's not trivial to compare the size. So we have a method in the storage layer that will tell us if something is compacted in the offset range. It doesn't compare sizes but checks the segment properties instead.
|
||
/// Result of the upload size calculation. | ||
/// Contains size of the region in bytes and locks. | ||
struct upload_reconciliation_result { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it's not quite clear what is being reconciled? Perhaps we can name this upload_candidate
or uploadable_range
or somesuch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it was renamed this way after your comment in previous PR #13927 (comment)
The idea is that this is the result of the reconciliation process, the archiver has to figure out what can be uploaded by asking storage layer. So basically, it checks what has changed since last time, is there enough data to upload etc. This is why it's called "reconciliation".
private: | ||
model::record_batch_reader _reader; | ||
iobuf _buffer; | ||
ssize_t _max_bytes; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the significance of this being size_t
instead of ssize_t
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
vlog( | ||
_ctxlog.trace, "Buffer is empty, pulling data from the reader"); | ||
consumer c(this, _range); | ||
auto done = co_await _reader.consume(c, _deadline); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this always the consumer's end_of_stream()
(ie false?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it's returned for some reason. I'm just logging it.
auto load_log_segment( | ||
ss::lw_shared_ptr<storage::segment> s, inclusive_offset_range range) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: same here? Maybe stream_segment_to_buf
?
|
||
FIXTURE_TEST( | ||
test_async_segment_upload_random_compacted, async_data_uploader_fixture) { | ||
#ifdef NDEBUG |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: if you switch over to gtest, you can have these tests call GTEST_SKIP()
, so at least when they're run they're clearly labeled "skipped" instead of opaquely being no-ops
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
switching to gtest won't happen in this PR but I consider doing this in the near future
std::vector<ss::sstring> keys; | ||
}; | ||
|
||
void dump_to_disk(iobuf buf, ss::sstring fname) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this looks unused? (or perhaps there for debugging?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it was used by debug logging, might be useful in the future
bool max_bytes_reached = _parent->_buffer.size_bytes() | ||
> static_cast<size_t>(_parent->_max_bytes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the expectation that max_bytes
will roughly limit the size of a remote segment? Or is it only exposed as a way to bound the buffer size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's an additional check. I guess the reader should stop at the end offset.
/// Load individual log segment as an iobuf | ||
auto load_log_segment( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not advocating to switch these tests over, but moving forward I think it's worth trying to avoid having tests rely on the on-disk file format.
One thought is perhaps this test could instantiate a reader with a remote_segment_batch_consumer and validated the returned batches are as expected. In that way, we can avoid the complicated filepos computations repeated in this test code, and even if the local storage format changes, these tests will uphold the invariant that we can still read via a cloud reader.
} | ||
|
||
ss::future<result<upload_reconciliation_result>> | ||
segment_upload::compute_upload_parameters( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: it looks like there are a few pairs of methods here for the two range types, is there any way to combine these? They are quite similar with only minor differences. I see:
archival::segment_upload::compute_upload_parameters
archival::segment_upload::make_segment_upload
archival::segment_upload::initialize
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
on it
} | ||
|
||
/// Result of the upload size calculation. | ||
/// Contains size of the region in bytes and locks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are the locks yet to be added or stored in some nested structure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
they're supposed to be stored while the upload operation is active
they should probably go away once we have new storage layer with MVCC but now it's better to hold them
23c1d9e
to
68e8d5c
Compare
ducktape was retried in https://buildkite.com/redpanda/redpanda/builds/46566#018e61b1-7166-4dd8-854c-0953d2c288e6 ducktape was retried in https://buildkite.com/redpanda/redpanda/builds/46566#018e61c4-1cd7-4696-a439-09078c62d175 |
68e8d5c
to
3f15a37
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reader is bypassing both batch cache and readers cache to avoid
interference with kafka fetch requests and avoid cache pollution.
cc @nvartolomei
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem to tagged as being part of a larger epic, so can you put the PR in context for us, such as updating the cover letter with the big picture details? For example, it doesn't look like the code in this PR is hooked up to anything other than tests, but the title makes it sound like this PR is now live: archival: Use log reader to upload data
, is that not the case?
3f15a37
to
2e7b873
Compare
The async_data_uploader can be used to provide the ss::input_stream<char> that cover certain offset range and generate metadata for it. The class makes a storage::reader and uses it to go through the offset range while serializing everything into disk format. The reader is bypassing both batch cache and readers cache to avoid interference with kafka fetch requests and avoid cache pollution. The uploader can upload offset ranges when inclusive range is provided. It can also find the upload that matches search predicate that includes start offset, desired size and smallest acceptable upload size. In this case the upload always ends on an index boundary so the next search will start on the index boundary as well. This eliminates the necessity to scan the segment in order to find the boundary. The sizing is not precise and can overshoot by up to 32KiB.
All tests are generating segments with data and different randomized test cases. The size limited mode is only tested with non-compacted segments because the compacted segments are only reuploaded. The use of rpfixture in this case is justified because every fixture test generates data once and then uses it to run hundereds of test cases.
2e7b873
to
708aceb
Compare
I tagged it in the epic. It's part of the bigger change and will be used by the next PR. |
bool end_of_stream() const { return false; } | ||
|
||
private: | ||
reader_ds* _parent; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: would prefer to see reader_ds* _parent
as reader_ds& _parent
since lifetimes are tied and _parent
is non-nullable in this context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will fix in a followup
CI failure is #17354 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
Decouple storage layer from the archiver by using log-reader interface instead of fetching data from segment files directly.
This PR is based on
log::offset_range_size
implementation in thedisk_log_impl
. It uses this method to compute upload size and creates the upload by converting log reader toss::input_stream
interface.We need to know the size of the upload before hand to create correct
PutObject
request and produce correct metadata. Size is also embedded into the name of the uploaded object.Force push - rebase with dev
Force push - fix code review issues
Backports Required
Release Notes
Improvements